vector/sources/aws_sqs/
config.rs1use std::num::NonZeroUsize;
2
3use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
4use vector_lib::config::{LegacyKey, LogNamespace};
5use vector_lib::configurable::configurable_component;
6use vector_lib::lookup::owned_value_path;
7use vrl::value::Kind;
8
9use crate::aws::create_client;
10use crate::codecs::DecodingConfig;
11use crate::common::sqs::SqsClientBuilder;
12use crate::tls::TlsConfig;
13use crate::{
14 aws::{auth::AwsAuthentication, region::RegionOrEndpoint},
15 config::{SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
16 serde::{bool_or_struct, default_decoding, default_framing_message_based},
17 sources::aws_sqs::source::SqsSource,
18};
19
20#[configurable_component(source("aws_sqs", "Collect logs from AWS SQS."))]
22#[derive(Clone, Debug, Derivative)]
23#[derivative(Default)]
24#[serde(deny_unknown_fields)]
25pub struct AwsSqsConfig {
26 #[serde(flatten)]
27 pub region: RegionOrEndpoint,
28
29 #[configurable(derived)]
30 #[serde(default)]
31 pub auth: AwsAuthentication,
32
33 #[configurable(metadata(
35 docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
36 ))]
37 pub queue_url: String,
38
39 #[serde(default = "default_poll_secs")]
46 #[derivative(Default(value = "default_poll_secs()"))]
47 #[configurable(metadata(docs::type_unit = "seconds"))]
48 #[configurable(metadata(docs::human_name = "Poll Wait Time"))]
49 pub poll_secs: u32,
50
51 #[serde(default = "default_visibility_timeout_secs")]
60 #[derivative(Default(value = "default_visibility_timeout_secs()"))]
61 #[configurable(metadata(docs::type_unit = "seconds"))]
62 #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
63 pub(super) visibility_timeout_secs: u32,
64
65 #[serde(default = "default_true")]
69 #[derivative(Default(value = "default_true()"))]
70 pub(super) delete_message: bool,
71
72 pub client_concurrency: Option<NonZeroUsize>,
82
83 #[configurable(derived)]
84 #[serde(default = "default_framing_message_based")]
85 #[derivative(Default(value = "default_framing_message_based()"))]
86 pub framing: FramingConfig,
87
88 #[configurable(derived)]
89 #[serde(default = "default_decoding")]
90 #[derivative(Default(value = "default_decoding()"))]
91 pub decoding: DeserializerConfig,
92
93 #[configurable(derived)]
94 #[serde(default, deserialize_with = "bool_or_struct")]
95 pub acknowledgements: SourceAcknowledgementsConfig,
96
97 #[configurable(derived)]
98 pub tls: Option<TlsConfig>,
99
100 #[configurable(metadata(docs::hidden))]
102 #[serde(default)]
103 pub log_namespace: Option<bool>,
104}
105
106#[async_trait::async_trait]
107#[typetag::serde(name = "aws_sqs")]
108impl SourceConfig for AwsSqsConfig {
109 async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
110 let log_namespace = cx.log_namespace(self.log_namespace);
111
112 let client = self.build_client(&cx).await?;
113 let decoder =
114 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
115 .build()?;
116 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
117
118 Ok(Box::pin(
119 SqsSource {
120 client,
121 queue_url: self.queue_url.clone(),
122 decoder,
123 poll_secs: self.poll_secs,
124 concurrency: self
125 .client_concurrency
126 .map(|n| n.get())
127 .unwrap_or_else(crate::num_threads),
128 visibility_timeout_secs: self.visibility_timeout_secs,
129 delete_message: self.delete_message,
130 acknowledgements,
131 log_namespace,
132 }
133 .run(cx.out, cx.shutdown),
134 ))
135 }
136
137 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
138 let schema_definition = self
139 .decoding
140 .schema_definition(global_log_namespace.merge(self.log_namespace))
141 .with_standard_vector_source_metadata()
142 .with_source_metadata(
143 Self::NAME,
144 Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
145 &owned_value_path!("timestamp"),
146 Kind::timestamp().or_undefined(),
147 Some("timestamp"),
148 );
149
150 vec![SourceOutput::new_maybe_logs(
151 self.decoding.output_type(),
152 schema_definition,
153 )]
154 }
155
156 fn can_acknowledge(&self) -> bool {
157 true
158 }
159}
160
161impl AwsSqsConfig {
162 async fn build_client(&self, cx: &SourceContext) -> crate::Result<aws_sdk_sqs::Client> {
163 create_client::<SqsClientBuilder>(
164 &SqsClientBuilder {},
165 &self.auth,
166 self.region.region(),
167 self.region.endpoint(),
168 &cx.proxy,
169 self.tls.as_ref(),
170 None,
171 )
172 .await
173 }
174}
175
176const fn default_poll_secs() -> u32 {
177 15
178}
179
180const fn default_visibility_timeout_secs() -> u32 {
181 300
182}
183
184const fn default_true() -> bool {
185 true
186}
187
188impl_generate_config_from_default!(AwsSqsConfig);