1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use aws_sdk_sns::Client as SnsClient;

use crate::aws::RegionOrEndpoint;

use crate::config::{
    AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext,
};
use vector_lib::configurable::configurable_component;

use super::{
    client::SnsMessagePublisher, message_deduplication_id, message_group_id, BaseSSSinkConfig,
    SSRequestBuilder, SSSink,
};
use crate::aws::create_client;
use crate::aws::ClientBuilder;

/// Configuration for the `aws_sns` sink.
#[configurable_component(sink(
    "aws_sns",
    "Publish observability events to AWS Simple Notification Service topics."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub(super) struct SnsSinkConfig {
    /// The ARN of the Amazon SNS topic to which messages are sent.
    #[configurable(validation(format = "uri"))]
    #[configurable(metadata(docs::examples = "arn:aws:sns:us-east-2:123456789012:MyTopic"))]
    pub(super) topic_arn: String,

    #[serde(flatten)]
    pub(super) region: RegionOrEndpoint,

    #[serde(flatten)]
    pub(super) base_config: BaseSSSinkConfig,
}

impl GenerateConfig for SnsSinkConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(
            r#"topic_arn = "arn:aws:sns:us-east-2:123456789012:MyTopic"
            region = "us-east-2"
            encoding.codec = "json""#,
        )
        .unwrap()
    }
}

impl SnsSinkConfig {
    pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<SnsClient> {
        create_client::<SnsClientBuilder>(
            &SnsClientBuilder {},
            &self.base_config.auth,
            self.region.region(),
            self.region.endpoint(),
            proxy,
            &self.base_config.tls,
            &None,
        )
        .await
    }
}

#[async_trait::async_trait]
#[typetag::serde(name = "aws_sns")]
impl SinkConfig for SnsSinkConfig {
    async fn build(
        &self,
        cx: SinkContext,
    ) -> crate::Result<(crate::sinks::VectorSink, crate::sinks::Healthcheck)> {
        let client = self.create_client(&cx.proxy).await?;

        let publisher = SnsMessagePublisher::new(client.clone(), self.topic_arn.clone());

        let healthcheck = Box::pin(healthcheck(client.clone(), self.topic_arn.clone()));

        let message_group_id = message_group_id(
            self.base_config.message_group_id.clone(),
            self.topic_arn.ends_with(".fifo"),
        );
        let message_deduplication_id =
            message_deduplication_id(self.base_config.message_deduplication_id.clone());

        let sink = SSSink::new(
            SSRequestBuilder::new(
                message_group_id?,
                message_deduplication_id?,
                self.base_config.encoding.clone(),
            )?,
            self.base_config.request,
            publisher,
        )?;
        Ok((
            crate::sinks::VectorSink::from_event_streamsink(sink),
            healthcheck,
        ))
    }

    fn input(&self) -> Input {
        Input::new(self.base_config.encoding.config().input_type() & DataType::Log)
    }

    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.base_config.acknowledgements
    }
}

pub(super) struct SnsClientBuilder;

impl ClientBuilder for SnsClientBuilder {
    type Client = aws_sdk_sns::client::Client;

    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
        aws_sdk_sns::client::Client::new(config)
    }
}

pub(super) async fn healthcheck(client: SnsClient, topic_arn: String) -> crate::Result<()> {
    client
        .get_topic_attributes()
        .topic_arn(topic_arn.clone())
        .send()
        .await
        .map(|_| ())
        .map_err(Into::into)
}