vector/sources/aws_sqs/
config.rs1use std::num::NonZeroUsize;
2
3use vector_lib::{
4 codecs::decoding::{DeserializerConfig, FramingConfig},
5 config::{LegacyKey, LogNamespace},
6 configurable::configurable_component,
7 lookup::owned_value_path,
8};
9use vrl::value::Kind;
10
11use crate::{
12 aws::{auth::AwsAuthentication, create_client, region::RegionOrEndpoint},
13 codecs::DecodingConfig,
14 common::sqs::SqsClientBuilder,
15 config::{SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
16 serde::{bool_or_struct, default_decoding, default_framing_message_based},
17 sources::aws_sqs::source::SqsSource,
18 tls::TlsConfig,
19};
20
21#[configurable_component(source("aws_sqs", "Collect logs from AWS SQS."))]
23#[derive(Clone, Debug, Derivative)]
24#[derivative(Default)]
25#[serde(deny_unknown_fields)]
26pub struct AwsSqsConfig {
27 #[serde(flatten)]
28 pub region: RegionOrEndpoint,
29
30 #[configurable(derived)]
31 #[serde(default)]
32 pub auth: AwsAuthentication,
33
34 #[configurable(metadata(
36 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
37 ))]
38 pub queue_url: String,
39
40 #[serde(default = "default_poll_secs")]
47 #[derivative(Default(value = "default_poll_secs()"))]
48 #[configurable(metadata(docs::type_unit = "seconds"))]
49 #[configurable(metadata(docs::human_name = "Poll Wait Time"))]
50 pub poll_secs: u32,
51
52 #[serde(default = "default_visibility_timeout_secs")]
61 #[derivative(Default(value = "default_visibility_timeout_secs()"))]
62 #[configurable(metadata(docs::type_unit = "seconds"))]
63 #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
64 pub(super) visibility_timeout_secs: u32,
65
66 #[serde(default = "default_true")]
70 #[derivative(Default(value = "default_true()"))]
71 pub(super) delete_message: bool,
72
73 pub client_concurrency: Option<NonZeroUsize>,
83
84 #[configurable(derived)]
85 #[serde(default = "default_framing_message_based")]
86 #[derivative(Default(value = "default_framing_message_based()"))]
87 pub framing: FramingConfig,
88
89 #[configurable(derived)]
90 #[serde(default = "default_decoding")]
91 #[derivative(Default(value = "default_decoding()"))]
92 pub decoding: DeserializerConfig,
93
94 #[configurable(derived)]
95 #[serde(default, deserialize_with = "bool_or_struct")]
96 pub acknowledgements: SourceAcknowledgementsConfig,
97
98 #[configurable(derived)]
99 pub tls: Option<TlsConfig>,
100
101 #[configurable(metadata(docs::hidden))]
103 #[serde(default)]
104 pub log_namespace: Option<bool>,
105}
106
107#[async_trait::async_trait]
108#[typetag::serde(name = "aws_sqs")]
109impl SourceConfig for AwsSqsConfig {
110 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
111 let log_namespace = cx.log_namespace(self.log_namespace);
112
113 let client = self.build_client(&cx).await?;
114 let decoder =
115 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
116 .build()?;
117 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
118
119 Ok(Box::pin(
120 SqsSource {
121 client,
122 queue_url: self.queue_url.clone(),
123 decoder,
124 poll_secs: self.poll_secs,
125 concurrency: self
126 .client_concurrency
127 .map(|n| n.get())
128 .unwrap_or_else(crate::num_threads),
129 visibility_timeout_secs: self.visibility_timeout_secs,
130 delete_message: self.delete_message,
131 acknowledgements,
132 log_namespace,
133 }
134 .run(cx.out, cx.shutdown),
135 ))
136 }
137
138 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
139 let schema_definition = self
140 .decoding
141 .schema_definition(global_log_namespace.merge(self.log_namespace))
142 .with_standard_vector_source_metadata()
143 .with_source_metadata(
144 Self::NAME,
145 Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
146 &owned_value_path!("timestamp"),
147 Kind::timestamp().or_undefined(),
148 Some("timestamp"),
149 );
150
151 vec![SourceOutput::new_maybe_logs(
152 self.decoding.output_type(),
153 schema_definition,
154 )]
155 }
156
157 fn can_acknowledge(&self) -> bool {
158 true
159 }
160}
161
162impl AwsSqsConfig {
163 async fn build_client(&self, cx: &SourceContext) -> crate::Result<aws_sdk_sqs::Client> {
164 create_client::<SqsClientBuilder>(
165 &SqsClientBuilder {},
166 &self.auth,
167 self.region.region(),
168 self.region.endpoint(),
169 &cx.proxy,
170 self.tls.as_ref(),
171 None,
172 )
173 .await
174 }
175}
176
177const fn default_poll_secs() -> u32 {
178 15
179}
180
181const fn default_visibility_timeout_secs() -> u32 {
182 300
183}
184
185const fn default_true() -> bool {
186 true
187}
188
189impl_generate_config_from_default!(AwsSqsConfig);