vector/sinks/aws_s_s/
config.rs

1use std::convert::TryFrom;
2
3use snafu::{ResultExt, Snafu};
4use vector_lib::configurable::configurable_component;
5
6use crate::{
7    aws::AwsAuthentication,
8    codecs::EncodingConfig,
9    config::AcknowledgementsConfig,
10    sinks::util::TowerRequestConfig,
11    template::{Template, TemplateParseError},
12    tls::TlsConfig,
13};
14
15#[derive(Debug, Snafu)]
16pub(super) enum BuildError {
17    #[snafu(display("`message_group_id` should be defined for FIFO queue."))]
18    MessageGroupIdMissing,
19    #[snafu(display("`message_group_id` is not allowed with non-FIFO queue."))]
20    MessageGroupIdNotAllowed,
21    #[snafu(display("invalid topic template: {}", source))]
22    TopicTemplate { source: TemplateParseError },
23}
24
25/// Base Configuration `aws_s_s` for sns and sqs sink.
26#[configurable_component]
27#[derive(Clone, Debug)]
28#[serde(deny_unknown_fields)]
29pub(super) struct BaseSSSinkConfig {
30    #[configurable(derived)]
31    pub(super) encoding: EncodingConfig,
32
33    /// The tag that specifies that a message belongs to a specific message group.
34    ///
35    /// Can be applied only to FIFO queues.
36    #[configurable(metadata(docs::examples = "vector"))]
37    #[configurable(metadata(docs::examples = "vector-%Y-%m-%d"))]
38    pub(super) message_group_id: Option<String>,
39
40    /// The message deduplication ID value to allow AWS to identify duplicate messages.
41    ///
42    /// This value is a template which should result in a unique string for each event. See the [AWS
43    /// documentation][deduplication_id_docs] for more about how AWS does message deduplication.
44    ///
45    /// [deduplication_id_docs]: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagededuplicationid-property.html
46    #[configurable(metadata(docs::examples = "{{ transaction_id }}"))]
47    pub(super) message_deduplication_id: Option<String>,
48
49    #[configurable(derived)]
50    #[serde(default)]
51    pub(super) request: TowerRequestConfig,
52
53    #[configurable(derived)]
54    pub(super) tls: Option<TlsConfig>,
55
56    /// The ARN of an [IAM role][iam_role] to assume at startup.
57    ///
58    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
59    #[configurable(deprecated)]
60    #[configurable(metadata(docs::hidden))]
61    pub(super) assume_role: Option<String>,
62
63    #[configurable(derived)]
64    #[serde(default)]
65    pub(super) auth: AwsAuthentication,
66
67    #[configurable(derived)]
68    #[serde(
69        default,
70        deserialize_with = "crate::serde::bool_or_struct",
71        skip_serializing_if = "crate::serde::is_default"
72    )]
73    pub(super) acknowledgements: AcknowledgementsConfig,
74}
75
76pub(super) fn message_group_id(
77    message_group_id: Option<String>,
78    fifo: bool,
79) -> crate::Result<Option<Template>> {
80    match (message_group_id.as_ref(), fifo) {
81        (Some(value), true) => Ok(Some(
82            Template::try_from(value.clone()).context(TopicTemplateSnafu)?,
83        )),
84        (Some(_), false) => Err(Box::new(BuildError::MessageGroupIdNotAllowed)),
85        (None, true) => Err(Box::new(BuildError::MessageGroupIdMissing)),
86        (None, false) => Ok(None),
87    }
88}
89pub(super) fn message_deduplication_id(
90    message_deduplication_id: Option<String>,
91) -> crate::Result<Option<Template>> {
92    Ok(message_deduplication_id
93        .clone()
94        .map(Template::try_from)
95        .transpose()?)
96}