vector/sinks/aws_s_s/sns/
config.rs1use aws_sdk_sns::Client as SnsClient;
2
3use crate::aws::RegionOrEndpoint;
4
5use crate::config::{
6 AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig, SinkContext,
7};
8use vector_lib::configurable::configurable_component;
9
10use super::{
11 client::SnsMessagePublisher, message_deduplication_id, message_group_id, BaseSSSinkConfig,
12 SSRequestBuilder, SSSink,
13};
14use crate::aws::create_client;
15use crate::aws::ClientBuilder;
16
17#[configurable_component(sink(
19 "aws_sns",
20 "Publish observability events to AWS Simple Notification Service topics."
21))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub(super) struct SnsSinkConfig {
25 #[configurable(validation(format = "uri"))]
27 #[configurable(metadata(docs::examples = "arn:aws:sns:us-east-2:123456789012:MyTopic"))]
28 pub(super) topic_arn: String,
29
30 #[serde(flatten)]
31 pub(super) region: RegionOrEndpoint,
32
33 #[serde(flatten)]
34 pub(super) base_config: BaseSSSinkConfig,
35}
36
37impl GenerateConfig for SnsSinkConfig {
38 fn generate_config() -> toml::Value {
39 toml::from_str(
40 r#"topic_arn = "arn:aws:sns:us-east-2:123456789012:MyTopic"
41 region = "us-east-2"
42 encoding.codec = "json""#,
43 )
44 .unwrap()
45 }
46}
47
48impl SnsSinkConfig {
49 pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<SnsClient> {
50 create_client::<SnsClientBuilder>(
51 &SnsClientBuilder {},
52 &self.base_config.auth,
53 self.region.region(),
54 self.region.endpoint(),
55 proxy,
56 self.base_config.tls.as_ref(),
57 None,
58 )
59 .await
60 }
61}
62
63#[async_trait::async_trait]
64#[typetag::serde(name = "aws_sns")]
65impl SinkConfig for SnsSinkConfig {
66 async fn build(
67 &self,
68 cx: SinkContext,
69 ) -> crate::Result<(crate::sinks::VectorSink, crate::sinks::Healthcheck)> {
70 let client = self.create_client(&cx.proxy).await?;
71
72 let publisher = SnsMessagePublisher::new(client.clone(), self.topic_arn.clone());
73
74 let healthcheck = Box::pin(healthcheck(client.clone(), self.topic_arn.clone()));
75
76 let message_group_id = message_group_id(
77 self.base_config.message_group_id.clone(),
78 self.topic_arn.ends_with(".fifo"),
79 );
80 let message_deduplication_id =
81 message_deduplication_id(self.base_config.message_deduplication_id.clone());
82
83 let sink = SSSink::new(
84 SSRequestBuilder::new(
85 message_group_id?,
86 message_deduplication_id?,
87 self.base_config.encoding.clone(),
88 )?,
89 self.base_config.request,
90 publisher,
91 )?;
92 Ok((
93 crate::sinks::VectorSink::from_event_streamsink(sink),
94 healthcheck,
95 ))
96 }
97
98 fn input(&self) -> Input {
99 Input::new(self.base_config.encoding.config().input_type() & DataType::Log)
100 }
101
102 fn acknowledgements(&self) -> &AcknowledgementsConfig {
103 &self.base_config.acknowledgements
104 }
105}
106
107pub(super) struct SnsClientBuilder;
108
109impl ClientBuilder for SnsClientBuilder {
110 type Client = aws_sdk_sns::client::Client;
111
112 fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
113 aws_sdk_sns::client::Client::new(config)
114 }
115}
116
117pub(super) async fn healthcheck(client: SnsClient, topic_arn: String) -> crate::Result<()> {
118 client
119 .get_topic_attributes()
120 .topic_arn(topic_arn.clone())
121 .send()
122 .await
123 .map(|_| ())
124 .map_err(Into::into)
125}