vector/sources/aws_sqs/
config.rs

1use std::num::NonZeroUsize;
2
3use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
4use vector_lib::config::{LegacyKey, LogNamespace};
5use vector_lib::configurable::configurable_component;
6use vector_lib::lookup::owned_value_path;
7use vrl::value::Kind;
8
9use crate::aws::create_client;
10use crate::codecs::DecodingConfig;
11use crate::common::sqs::SqsClientBuilder;
12use crate::tls::TlsConfig;
13use crate::{
14    aws::{auth::AwsAuthentication, region::RegionOrEndpoint},
15    config::{SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput},
16    serde::{bool_or_struct, default_decoding, default_framing_message_based},
17    sources::aws_sqs::source::SqsSource,
18};
19
20/// Configuration for the `aws_sqs` source.
21#[configurable_component(source("aws_sqs", "Collect logs from AWS SQS."))]
22#[derive(Clone, Debug, Derivative)]
23#[derivative(Default)]
24#[serde(deny_unknown_fields)]
25pub struct AwsSqsConfig {
26    #[serde(flatten)]
27    pub region: RegionOrEndpoint,
28
29    #[configurable(derived)]
30    #[serde(default)]
31    pub auth: AwsAuthentication,
32
33    /// The URL of the SQS queue to poll for messages.
34    #[configurable(metadata(
35        docs::examples = "https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue"
36    ))]
37    pub queue_url: String,
38
39    /// How long to wait while polling the queue for new messages, in seconds.
40    ///
41    /// Generally, this should not be changed unless instructed to do so, as if messages are available,
42    /// they are always consumed, regardless of the value of `poll_secs`.
43    // NOTE: We restrict this to u32 for safe conversion to i32 later.
44    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
45    #[serde(default = "default_poll_secs")]
46    #[derivative(Default(value = "default_poll_secs()"))]
47    #[configurable(metadata(docs::type_unit = "seconds"))]
48    #[configurable(metadata(docs::human_name = "Poll Wait Time"))]
49    pub poll_secs: u32,
50
51    /// The visibility timeout to use for messages, in seconds.
52    ///
53    /// This controls how long a message is left unavailable after it is received. If a message is received, and
54    /// takes longer than `visibility_timeout_secs` to process and delete the message from the queue, it is made available again for another consumer.
55    ///
56    /// This can happen if there is an issue between consuming a message and deleting it.
57    // NOTE: We restrict this to u32 for safe conversion to i32 later.
58    // NOTE: This value isn't used as a `Duration` downstream, so we don't bother using `serde_with`
59    #[serde(default = "default_visibility_timeout_secs")]
60    #[derivative(Default(value = "default_visibility_timeout_secs()"))]
61    #[configurable(metadata(docs::type_unit = "seconds"))]
62    #[configurable(metadata(docs::human_name = "Visibility Timeout"))]
63    pub(super) visibility_timeout_secs: u32,
64
65    /// Whether to delete the message once it is processed.
66    ///
67    /// It can be useful to set this to `false` for debugging or during the initial setup.
68    #[serde(default = "default_true")]
69    #[derivative(Default(value = "default_true()"))]
70    pub(super) delete_message: bool,
71
72    /// Number of concurrent tasks to create for polling the queue for messages.
73    ///
74    /// Defaults to the number of available CPUs on the system.
75    ///
76    /// Should not typically need to be changed, but it can sometimes be beneficial to raise this
77    /// value when there is a high rate of messages being pushed into the queue and the messages
78    /// being fetched are small. In these cases, system resources may not be fully utilized without
79    /// fetching more messages per second, as it spends more time fetching the messages than
80    /// processing them.
81    pub client_concurrency: Option<NonZeroUsize>,
82
83    #[configurable(derived)]
84    #[serde(default = "default_framing_message_based")]
85    #[derivative(Default(value = "default_framing_message_based()"))]
86    pub framing: FramingConfig,
87
88    #[configurable(derived)]
89    #[serde(default = "default_decoding")]
90    #[derivative(Default(value = "default_decoding()"))]
91    pub decoding: DeserializerConfig,
92
93    #[configurable(derived)]
94    #[serde(default, deserialize_with = "bool_or_struct")]
95    pub acknowledgements: SourceAcknowledgementsConfig,
96
97    #[configurable(derived)]
98    pub tls: Option<TlsConfig>,
99
100    /// The namespace to use for logs. This overrides the global setting.
101    #[configurable(metadata(docs::hidden))]
102    #[serde(default)]
103    pub log_namespace: Option<bool>,
104}
105
106#[async_trait::async_trait]
107#[typetag::serde(name = "aws_sqs")]
108impl SourceConfig for AwsSqsConfig {
109    async fn build(&self, cx: SourceContext) -> crate::Result<crate::sources::Source> {
110        let log_namespace = cx.log_namespace(self.log_namespace);
111
112        let client = self.build_client(&cx).await?;
113        let decoder =
114            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
115                .build()?;
116        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
117
118        Ok(Box::pin(
119            SqsSource {
120                client,
121                queue_url: self.queue_url.clone(),
122                decoder,
123                poll_secs: self.poll_secs,
124                concurrency: self
125                    .client_concurrency
126                    .map(|n| n.get())
127                    .unwrap_or_else(crate::num_threads),
128                visibility_timeout_secs: self.visibility_timeout_secs,
129                delete_message: self.delete_message,
130                acknowledgements,
131                log_namespace,
132            }
133            .run(cx.out, cx.shutdown),
134        ))
135    }
136
137    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
138        let schema_definition = self
139            .decoding
140            .schema_definition(global_log_namespace.merge(self.log_namespace))
141            .with_standard_vector_source_metadata()
142            .with_source_metadata(
143                Self::NAME,
144                Some(LegacyKey::Overwrite(owned_value_path!("timestamp"))),
145                &owned_value_path!("timestamp"),
146                Kind::timestamp().or_undefined(),
147                Some("timestamp"),
148            );
149
150        vec![SourceOutput::new_maybe_logs(
151            self.decoding.output_type(),
152            schema_definition,
153        )]
154    }
155
156    fn can_acknowledge(&self) -> bool {
157        true
158    }
159}
160
161impl AwsSqsConfig {
162    async fn build_client(&self, cx: &SourceContext) -> crate::Result<aws_sdk_sqs::Client> {
163        create_client::<SqsClientBuilder>(
164            &SqsClientBuilder {},
165            &self.auth,
166            self.region.region(),
167            self.region.endpoint(),
168            &cx.proxy,
169            self.tls.as_ref(),
170            None,
171        )
172        .await
173    }
174}
175
176const fn default_poll_secs() -> u32 {
177    15
178}
179
180const fn default_visibility_timeout_secs() -> u32 {
181    300
182}
183
184const fn default_true() -> bool {
185    true
186}
187
188impl_generate_config_from_default!(AwsSqsConfig);