vector/sinks/aws_s_s/sns/
config.rs1use aws_sdk_sns::Client as SnsClient;
2use vector_lib::configurable::configurable_component;
3
4use super::{
5 BaseSSSinkConfig, SSRequestBuilder, SSSink, client::SnsMessagePublisher,
6 message_deduplication_id, message_group_id,
7};
8use crate::{
9 aws::{ClientBuilder, RegionOrEndpoint, create_client},
10 config::{
11 AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
12 SinkContext,
13 },
14};
15
16#[configurable_component(sink(
18 "aws_sns",
19 "Publish observability events to AWS Simple Notification Service topics."
20))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub(super) struct SnsSinkConfig {
24 #[configurable(validation(format = "uri"))]
26 #[configurable(metadata(docs::examples = "arn:aws:sns:us-east-2:123456789012:MyTopic"))]
27 pub(super) topic_arn: String,
28
29 #[serde(flatten)]
30 pub(super) region: RegionOrEndpoint,
31
32 #[serde(flatten)]
33 pub(super) base_config: BaseSSSinkConfig,
34}
35
36impl GenerateConfig for SnsSinkConfig {
37 fn generate_config() -> toml::Value {
38 toml::from_str(
39 r#"topic_arn = "arn:aws:sns:us-east-2:123456789012:MyTopic"
40 region = "us-east-2"
41 encoding.codec = "json""#,
42 )
43 .unwrap()
44 }
45}
46
47impl SnsSinkConfig {
48 pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<SnsClient> {
49 create_client::<SnsClientBuilder>(
50 &SnsClientBuilder {},
51 &self.base_config.auth,
52 self.region.region(),
53 self.region.endpoint(),
54 proxy,
55 self.base_config.tls.as_ref(),
56 None,
57 )
58 .await
59 }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde(name = "aws_sns")]
64impl SinkConfig for SnsSinkConfig {
65 async fn build(
66 &self,
67 cx: SinkContext,
68 ) -> crate::Result<(crate::sinks::VectorSink, crate::sinks::Healthcheck)> {
69 let client = self.create_client(&cx.proxy).await?;
70
71 let publisher = SnsMessagePublisher::new(client.clone(), self.topic_arn.clone());
72
73 let healthcheck = Box::pin(healthcheck(client.clone(), self.topic_arn.clone()));
74
75 let message_group_id = message_group_id(
76 self.base_config.message_group_id.clone(),
77 self.topic_arn.ends_with(".fifo"),
78 );
79 let message_deduplication_id =
80 message_deduplication_id(self.base_config.message_deduplication_id.clone());
81
82 let sink = SSSink::new(
83 SSRequestBuilder::new(
84 message_group_id?,
85 message_deduplication_id?,
86 self.base_config.encoding.clone(),
87 )?,
88 self.base_config.request,
89 publisher,
90 )?;
91 Ok((
92 crate::sinks::VectorSink::from_event_streamsink(sink),
93 healthcheck,
94 ))
95 }
96
97 fn input(&self) -> Input {
98 Input::new(self.base_config.encoding.config().input_type() & DataType::Log)
99 }
100
101 fn acknowledgements(&self) -> &AcknowledgementsConfig {
102 &self.base_config.acknowledgements
103 }
104}
105
106pub(super) struct SnsClientBuilder;
107
108impl ClientBuilder for SnsClientBuilder {
109 type Client = aws_sdk_sns::client::Client;
110
111 fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
112 aws_sdk_sns::client::Client::new(config)
113 }
114}
115
116pub(super) async fn healthcheck(client: SnsClient, topic_arn: String) -> crate::Result<()> {
117 client
118 .get_topic_attributes()
119 .topic_arn(topic_arn.clone())
120 .send()
121 .await
122 .map(|_| ())
123 .map_err(Into::into)
124}