vector/sources/aws_sqs/
config.rs

1use std::num::NonZeroUsize;
2
3use vector_lib::{
4    codecs::decoding::{DeserializerConfig, FramingConfig},
5    config::{LegacyKey, LogNamespace},
6    configurable::configurable_component,
7    lookup::owned_value_path,
8};
9use vrl::value::Kind;
10
11use crate::{
12    aws::{auth::AwsAuthentication, create_client, region::RegionOrEndpoint},
13    codecs::DecodingConfig,
14    common::sqs::SqsClientBuilder,
15    config::{SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
16    serde::{bool_or_struct, default_decoding, default_framing_message_based},
17    sources::aws_sqs::source::SqsSource,
18    tls::TlsConfig,
19};
20
21/// Configuration for the `aws_sqs` source.
22#[configurable_component(source("aws_sqs", "Collect logs from AWS SQS."))]
23#[derive(Clone, Debug, Derivative)]
24#[derivative(Default)]
25#[serde(deny_unknown_fields)]
26pub struct AwsSqsConfig {
27    #[serde(flatten)]
28    pub region: RegionOrEndpoint,
29
30    #[configurable(derived)]
31    #[serde(default)]
32    pub auth: AwsAuthentication,
33
34    /// The URL of the SQS queue to poll for messages.
35    #[configurable(metadata(
36        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
37    ))]
38    pub queue_url: String,
39
40    /// How long to wait while polling the queue for new messages, in seconds.
41    ///
42    /// Generally, this should not be changed unless instructed to do so, as if messages are available,
43    /// they are always consumed, regardless of the value of `poll_secs`.
44    // NOTE: We restrict this to u32 for safe conversion to i32 later.
45    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
46    #[serde(default = "default_poll_secs")]
47    #[derivative(Default(value = "default_poll_secs()"))]
48    #[configurable(metadata(docs::type_unit = "seconds"))]
49    #[configurable(metadata(docs::human_name = "Poll Wait Time"))]
50    pub poll_secs: u32,
51
52    /// The visibility timeout to use for messages, in seconds.
53    ///
54    /// This controls how long a message is left unavailable after it is received. If a message is received, and
55    /// takes longer than `visibility_timeout_secs` to process and delete the message from the queue, it is made available again for another consumer.
56    ///
57    /// This can happen if there is an issue between consuming a message and deleting it.
58    // NOTE: We restrict this to u32 for safe conversion to i32 later.
59    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
60    #[serde(default = "default_visibility_timeout_secs")]
61    #[derivative(Default(value = "default_visibility_timeout_secs()"))]
62    #[configurable(metadata(docs::type_unit = "seconds"))]
63    #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
64    pub(super) visibility_timeout_secs: u32,
65
66    /// Whether to delete the message once it is processed.
67    ///
68    /// It can be useful to set this to `false` for debugging or during the initial setup.
69    #[serde(default = "default_true")]
70    #[derivative(Default(value = "default_true()"))]
71    pub(super) delete_message: bool,
72
73    /// Number of concurrent tasks to create for polling the queue for messages.
74    ///
75    /// Defaults to the number of available CPUs on the system.
76    ///
77    /// Should not typically need to be changed, but it can sometimes be beneficial to raise this
78    /// value when there is a high rate of messages being pushed into the queue and the messages
79    /// being fetched are small. In these cases, system resources may not be fully utilized without
80    /// fetching more messages per second, as it spends more time fetching the messages than
81    /// processing them.
82    pub client_concurrency: Option<NonZeroUsize>,
83
84    #[configurable(derived)]
85    #[serde(default = "default_framing_message_based")]
86    #[derivative(Default(value = "default_framing_message_based()"))]
87    pub framing: FramingConfig,
88
89    #[configurable(derived)]
90    #[serde(default = "default_decoding")]
91    #[derivative(Default(value = "default_decoding()"))]
92    pub decoding: DeserializerConfig,
93
94    #[configurable(derived)]
95    #[serde(default, deserialize_with = "bool_or_struct")]
96    pub acknowledgements: SourceAcknowledgementsConfig,
97
98    #[configurable(derived)]
99    pub tls: Option<TlsConfig>,
100
101    /// The namespace to use for logs. This overrides the global setting.
102    #[configurable(metadata(docs::hidden))]
103    #[serde(default)]
104    pub log_namespace: Option<bool>,
105}
106
107#[async_trait::async_trait]
108#[typetag::serde(name = "aws_sqs")]
109impl SourceConfig for AwsSqsConfig {
110    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
111        let log_namespace = cx.log_namespace(self.log_namespace);
112
113        let client = self.build_client(&cx).await?;
114        let decoder =
115            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
116                .build()?;
117        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
118
119        Ok(Box::pin(
120            SqsSource {
121                client,
122                queue_url: self.queue_url.clone(),
123                decoder,
124                poll_secs: self.poll_secs,
125                concurrency: self
126                    .client_concurrency
127                    .map(|n| n.get())
128                    .unwrap_or_else(crate::num_threads),
129                visibility_timeout_secs: self.visibility_timeout_secs,
130                delete_message: self.delete_message,
131                acknowledgements,
132                log_namespace,
133            }
134            .run(cx.out, cx.shutdown),
135        ))
136    }
137
138    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
139        let schema_definition = self
140            .decoding
141            .schema_definition(global_log_namespace.merge(self.log_namespace))
142            .with_standard_vector_source_metadata()
143            .with_source_metadata(
144                Self::NAME,
145                Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
146                &owned_value_path!("timestamp"),
147                Kind::timestamp().or_undefined(),
148                Some("timestamp"),
149            );
150
151        vec![SourceOutput::new_maybe_logs(
152            self.decoding.output_type(),
153            schema_definition,
154        )]
155    }
156
157    fn can_acknowledge(&self) -> bool {
158        true
159    }
160}
161
162impl AwsSqsConfig {
163    async fn build_client(&self, cx: &SourceContext) -> crate::Result<aws_sdk_sqs::Client> {
164        create_client::<SqsClientBuilder>(
165            &SqsClientBuilder {},
166            &self.auth,
167            self.region.region(),
168            self.region.endpoint(),
169            &cx.proxy,
170            self.tls.as_ref(),
171            None,
172        )
173        .await
174    }
175}
176
177const fn default_poll_secs() -> u32 {
178    15
179}
180
181const fn default_visibility_timeout_secs() -> u32 {
182    300
183}
184
185const fn default_true() -> bool {
186    true
187}
188
189impl_generate_config_from_default!(AwsSqsConfig);