vector/sinks/aws_s_s/sns/
config.rs

1use aws_sdk_sns::Client as SnsClient;
2use vector_lib::configurable::configurable_component;
3
4use super::{
5    BaseSSSinkConfig, SSRequestBuilder, SSSink, client::SnsMessagePublisher,
6    message_deduplication_id, message_group_id,
7};
8use crate::{
9    aws::{ClientBuilder, RegionOrEndpoint, create_client},
10    config::{
11        AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
12        SinkContext,
13    },
14};
15
16/// Configuration for the `aws_sns` sink.
17#[configurable_component(sink(
18    "aws_sns",
19    "Publish observability events to AWS Simple Notification Service topics."
20))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub(super) struct SnsSinkConfig {
24    /// The ARN of the Amazon SNS topic to which messages are sent.
25    #[configurable(validation(format = "uri"))]
26    #[configurable(metadata(docs::examples = "arn:aws:sns:us-east-2:123456789012:MyTopic"))]
27    pub(super) topic_arn: String,
28
29    #[serde(flatten)]
30    pub(super) region: RegionOrEndpoint,
31
32    #[serde(flatten)]
33    pub(super) base_config: BaseSSSinkConfig,
34}
35
36impl GenerateConfig for SnsSinkConfig {
37    fn generate_config() -> toml::Value {
38        toml::from_str(
39            r#"topic_arn = "arn:aws:sns:us-east-2:123456789012:MyTopic"
40            region = "us-east-2"
41            encoding.codec = "json""#,
42        )
43        .unwrap()
44    }
45}
46
47impl SnsSinkConfig {
48    pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<SnsClient> {
49        create_client::<SnsClientBuilder>(
50            &SnsClientBuilder {},
51            &self.base_config.auth,
52            self.region.region(),
53            self.region.endpoint(),
54            proxy,
55            self.base_config.tls.as_ref(),
56            None,
57        )
58        .await
59    }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde(name = "aws_sns")]
64impl SinkConfig for SnsSinkConfig {
65    async fn build(
66        &self,
67        cx: SinkContext,
68    ) -> crate::Result<(crate::sinks::VectorSink, crate::sinks::Healthcheck)> {
69        let client = self.create_client(&cx.proxy).await?;
70
71        let publisher = SnsMessagePublisher::new(client.clone(), self.topic_arn.clone());
72
73        let healthcheck = Box::pin(healthcheck(client.clone(), self.topic_arn.clone()));
74
75        let message_group_id = message_group_id(
76            self.base_config.message_group_id.clone(),
77            self.topic_arn.ends_with(".fifo"),
78        );
79        let message_deduplication_id =
80            message_deduplication_id(self.base_config.message_deduplication_id.clone());
81
82        let sink = SSSink::new(
83            SSRequestBuilder::new(
84                message_group_id?,
85                message_deduplication_id?,
86                self.base_config.encoding.clone(),
87            )?,
88            self.base_config.request,
89            publisher,
90        )?;
91        Ok((
92            crate::sinks::VectorSink::from_event_streamsink(sink),
93            healthcheck,
94        ))
95    }
96
97    fn input(&self) -> Input {
98        Input::new(self.base_config.encoding.config().input_type() & DataType::Log)
99    }
100
101    fn acknowledgements(&self) -> &AcknowledgementsConfig {
102        &self.base_config.acknowledgements
103    }
104}
105
106pub(super) struct SnsClientBuilder;
107
108impl ClientBuilder for SnsClientBuilder {
109    type Client = aws_sdk_sns::client::Client;
110
111    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
112        aws_sdk_sns::client::Client::new(config)
113    }
114}
115
116pub(super) async fn healthcheck(client: SnsClient, topic_arn: String) -> crate::Result<()> {
117    client
118        .get_topic_attributes()
119        .topic_arn(topic_arn.clone())
120        .send()
121        .await
122        .map(|_| ())
123        .map_err(Into::into)
124}