vector/sinks/aws_s_s/sns/
config.rs

1use aws_sdk_sns::Client as SnsClient;
2
3use crate::aws::RegionOrEndpoint;
4
5use crate::config::{
6    AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext,
7};
8use vector_lib::configurable::configurable_component;
9
10use super::{
11    client::SnsMessagePublisher, message_deduplication_id, message_group_id, BaseSSSinkConfig,
12    SSRequestBuilder, SSSink,
13};
14use crate::aws::create_client;
15use crate::aws::ClientBuilder;
16
17/// Configuration for the `aws_sns` sink.
18#[configurable_component(sink(
19    "aws_sns",
20    "Publish observability events to AWS Simple Notification Service topics."
21))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub(super) struct SnsSinkConfig {
25    /// The ARN of the Amazon SNS topic to which messages are sent.
26    #[configurable(validation(format = "uri"))]
27    #[configurable(metadata(docs::examples = "arn:aws:sns:us-east-2:123456789012:MyTopic"))]
28    pub(super) topic_arn: String,
29
30    #[serde(flatten)]
31    pub(super) region: RegionOrEndpoint,
32
33    #[serde(flatten)]
34    pub(super) base_config: BaseSSSinkConfig,
35}
36
37impl GenerateConfig for SnsSinkConfig {
38    fn generate_config() -> toml::Value {
39        toml::from_str(
40            r#"topic_arn = "arn:aws:sns:us-east-2:123456789012:MyTopic"
41            region = "us-east-2"
42            encoding.codec = "json""#,
43        )
44        .unwrap()
45    }
46}
47
48impl SnsSinkConfig {
49    pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<SnsClient> {
50        create_client::<SnsClientBuilder>(
51            &SnsClientBuilder {},
52            &self.base_config.auth,
53            self.region.region(),
54            self.region.endpoint(),
55            proxy,
56            self.base_config.tls.as_ref(),
57            None,
58        )
59        .await
60    }
61}
62
63#[async_trait::async_trait]
64#[typetag::serde(name = "aws_sns")]
65impl SinkConfig for SnsSinkConfig {
66    async fn build(
67        &self,
68        cx: SinkContext,
69    ) -> crate::Result<(crate::sinks::VectorSink, crate::sinks::Healthcheck)> {
70        let client = self.create_client(&cx.proxy).await?;
71
72        let publisher = SnsMessagePublisher::new(client.clone(), self.topic_arn.clone());
73
74        let healthcheck = Box::pin(healthcheck(client.clone(), self.topic_arn.clone()));
75
76        let message_group_id = message_group_id(
77            self.base_config.message_group_id.clone(),
78            self.topic_arn.ends_with(".fifo"),
79        );
80        let message_deduplication_id =
81            message_deduplication_id(self.base_config.message_deduplication_id.clone());
82
83        let sink = SSSink::new(
84            SSRequestBuilder::new(
85                message_group_id?,
86                message_deduplication_id?,
87                self.base_config.encoding.clone(),
88            )?,
89            self.base_config.request,
90            publisher,
91        )?;
92        Ok((
93            crate::sinks::VectorSink::from_event_streamsink(sink),
94            healthcheck,
95        ))
96    }
97
98    fn input(&self) -> Input {
99        Input::new(self.base_config.encoding.config().input_type() & DataType::Log)
100    }
101
102    fn acknowledgements(&self) -> &AcknowledgementsConfig {
103        &self.base_config.acknowledgements
104    }
105}
106
107pub(super) struct SnsClientBuilder;
108
109impl ClientBuilder for SnsClientBuilder {
110    type Client = aws_sdk_sns::client::Client;
111
112    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
113        aws_sdk_sns::client::Client::new(config)
114    }
115}
116
117pub(super) async fn healthcheck(client: SnsClient, topic_arn: String) -> crate::Result<()> {
118    client
119        .get_topic_attributes()
120        .topic_arn(topic_arn.clone())
121        .send()
122        .await
123        .map(|_| ())
124        .map_err(Into::into)
125}