vector/sinks/aws_cloudwatch_logs/
config.rs

1use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
2use futures::FutureExt;
3use serde::{de, Deserialize, Deserializer};
4use std::collections::HashMap;
5use tower::ServiceBuilder;
6use vector_lib::codecs::JsonSerializerConfig;
7use vector_lib::configurable::configurable_component;
8use vector_lib::schema;
9use vrl::value::Kind;
10
11use crate::{
12    aws::{create_client, AwsAuthentication, ClientBuilder, RegionOrEndpoint},
13    codecs::{Encoder, EncodingConfig},
14    config::{
15        AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
16        SinkContext,
17    },
18    sinks::{
19        aws_cloudwatch_logs::{
20            healthcheck::healthcheck, request_builder::CloudwatchRequestBuilder,
21            retry::CloudwatchRetryLogic, service::CloudwatchLogsPartitionSvc, sink::CloudwatchSink,
22        },
23        util::{
24            http::RequestConfig, BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings,
25        },
26        Healthcheck, VectorSink,
27    },
28    template::Template,
29    tls::TlsConfig,
30};
31
32pub struct CloudwatchLogsClientBuilder;
33
34impl ClientBuilder for CloudwatchLogsClientBuilder {
35    type Client = aws_sdk_cloudwatchlogs::client::Client;
36
37    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
38        aws_sdk_cloudwatchlogs::client::Client::new(config)
39    }
40}
41
42#[configurable_component]
43#[derive(Clone, Debug, Default)]
44/// Retention policy configuration for AWS CloudWatch Log Group
45pub struct Retention {
46    /// Whether or not to set a retention policy when creating a new Log Group.
47    #[serde(default)]
48    pub enabled: bool,
49
50    /// If retention is enabled, the number of days to retain logs for.
51    #[serde(
52        default,
53        deserialize_with = "retention_days",
54        skip_serializing_if = "crate::serde::is_default"
55    )]
56    pub days: u32,
57}
58
59fn retention_days<'de, D>(deserializer: D) -> Result<u32, D::Error>
60where
61    D: Deserializer<'de>,
62{
63    let days: u32 = Deserialize::deserialize(deserializer)?;
64    const ALLOWED_VALUES: &[u32] = &[
65        1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557,
66        2922, 3288, 3653,
67    ];
68    if ALLOWED_VALUES.contains(&days) {
69        Ok(days)
70    } else {
71        let msg = format!("one of allowed values: {ALLOWED_VALUES:?}").to_owned();
72        let expected: &str = &msg[..];
73        Err(de::Error::invalid_value(
74            de::Unexpected::Signed(days.into()),
75            &expected,
76        ))
77    }
78}
79
80/// Configuration for the `aws_cloudwatch_logs` sink.
81#[configurable_component(sink(
82    "aws_cloudwatch_logs",
83    "Publish log events to AWS CloudWatch Logs."
84))]
85#[derive(Clone, Debug)]
86#[serde(deny_unknown_fields)]
87pub struct CloudwatchLogsSinkConfig {
88    /// The [group name][group_name] of the target CloudWatch Logs stream.
89    ///
90    /// [group_name]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
91    #[configurable(metadata(docs::examples = "group-name"))]
92    #[configurable(metadata(docs::examples = "{{ file }}"))]
93    pub group_name: Template,
94
95    /// The [stream name][stream_name] of the target CloudWatch Logs stream.
96    ///
97    /// There can only be one writer to a log stream at a time. If multiple instances are writing to
98    /// the same log group, the stream name must include an identifier that is guaranteed to be
99    /// unique per instance.
100    ///
101    /// [stream_name]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
102    #[configurable(metadata(docs::examples = "{{ host }}"))]
103    #[configurable(metadata(docs::examples = "%Y-%m-%d"))]
104    #[configurable(metadata(docs::examples = "stream-name"))]
105    pub stream_name: Template,
106
107    /// The [AWS region][aws_region] of the target service.
108    ///
109    /// [aws_region]: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html
110    #[serde(flatten)]
111    pub region: RegionOrEndpoint,
112
113    /// Dynamically create a [log group][log_group] if it does not already exist.
114    ///
115    /// This ignores `create_missing_stream` directly after creating the group and creates
116    /// the first stream.
117    ///
118    /// [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
119    #[serde(default = "crate::serde::default_true")]
120    pub create_missing_group: bool,
121
122    /// Dynamically create a [log stream][log_stream] if it does not already exist.
123    ///
124    /// [log_stream]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
125    #[serde(default = "crate::serde::default_true")]
126    pub create_missing_stream: bool,
127
128    #[configurable(derived)]
129    #[serde(default)]
130    pub retention: Retention,
131
132    #[configurable(derived)]
133    pub encoding: EncodingConfig,
134
135    #[configurable(derived)]
136    #[serde(default)]
137    pub compression: Compression,
138
139    #[configurable(derived)]
140    #[serde(default)]
141    pub batch: BatchConfig<CloudwatchLogsDefaultBatchSettings>,
142
143    #[configurable(derived)]
144    #[serde(default)]
145    pub request: RequestConfig,
146
147    #[configurable(derived)]
148    pub tls: Option<TlsConfig>,
149
150    /// The ARN of an [IAM role][iam_role] to assume at startup.
151    ///
152    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
153    #[configurable(deprecated)]
154    #[configurable(metadata(docs::hidden))]
155    pub assume_role: Option<String>,
156
157    #[configurable(derived)]
158    #[serde(default)]
159    pub auth: AwsAuthentication,
160
161    #[configurable(derived)]
162    #[serde(
163        default,
164        deserialize_with = "crate::serde::bool_or_struct",
165        skip_serializing_if = "crate::serde::is_default"
166    )]
167    pub acknowledgements: AcknowledgementsConfig,
168
169    /// The [ARN][arn] (Amazon Resource Name) of the [KMS key][kms_key] to use when encrypting log data.
170    ///
171    /// [arn]: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html
172    /// [kms_key]: https://docs.aws.amazon.com/kms/latest/developerguide/overview.html
173    #[configurable(derived)]
174    #[serde(default)]
175    pub kms_key: Option<String>,
176
177    /// The Key-value pairs to be applied as [tags][tags] to the log group and stream.
178    ///
179    /// [tags]: https://docs.aws.amazon.com/whitepapers/latest/tagging-best-practices/what-are-tags.html
180    #[configurable(derived)]
181    #[serde(default)]
182    #[configurable(metadata(
183        docs::additional_props_description = "A tag represented as a key-value pair"
184    ))]
185    pub tags: Option<HashMap<String, String>>,
186}
187
188impl CloudwatchLogsSinkConfig {
189    pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchLogsClient> {
190        create_client::<CloudwatchLogsClientBuilder>(
191            &CloudwatchLogsClientBuilder {},
192            &self.auth,
193            self.region.region(),
194            self.region.endpoint(),
195            proxy,
196            self.tls.as_ref(),
197            None,
198        )
199        .await
200    }
201}
202
203#[async_trait::async_trait]
204#[typetag::serde(name = "aws_cloudwatch_logs")]
205impl SinkConfig for CloudwatchLogsSinkConfig {
206    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
207        let batcher_settings = self.batch.into_batcher_settings()?;
208        let request_settings = self.request.tower.into_settings();
209        let client = self.create_client(cx.proxy()).await?;
210        let svc = ServiceBuilder::new()
211            .settings(request_settings, CloudwatchRetryLogic::new())
212            .service(CloudwatchLogsPartitionSvc::new(
213                self.clone(),
214                client.clone(),
215            )?);
216        let transformer = self.encoding.transformer();
217        let serializer = self.encoding.build()?;
218        let encoder = Encoder::<()>::new(serializer);
219        let healthcheck = healthcheck(self.clone(), client).boxed();
220        let sink = CloudwatchSink {
221            batcher_settings,
222            request_builder: CloudwatchRequestBuilder {
223                group_template: self.group_name.clone(),
224                stream_template: self.stream_name.clone(),
225                transformer,
226                encoder,
227            },
228
229            service: svc,
230        };
231
232        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
233    }
234
235    fn input(&self) -> Input {
236        let requirement =
237            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
238
239        Input::new(self.encoding.config().input_type() & DataType::Log)
240            .with_schema_requirement(requirement)
241    }
242
243    fn acknowledgements(&self) -> &AcknowledgementsConfig {
244        &self.acknowledgements
245    }
246}
247
248impl GenerateConfig for CloudwatchLogsSinkConfig {
249    fn generate_config() -> toml::Value {
250        toml::Value::try_from(default_config(JsonSerializerConfig::default().into())).unwrap()
251    }
252}
253
254fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
255    CloudwatchLogsSinkConfig {
256        encoding,
257        group_name: Default::default(),
258        stream_name: Default::default(),
259        region: Default::default(),
260        create_missing_group: true,
261        create_missing_stream: true,
262        retention: Default::default(),
263        compression: Default::default(),
264        batch: Default::default(),
265        request: Default::default(),
266        tls: Default::default(),
267        assume_role: Default::default(),
268        auth: Default::default(),
269        acknowledgements: Default::default(),
270        kms_key: Default::default(),
271        tags: Default::default(),
272    }
273}
274
275#[derive(Clone, Copy, Debug, Default)]
276pub struct CloudwatchLogsDefaultBatchSettings;
277
278impl SinkBatchSettings for CloudwatchLogsDefaultBatchSettings {
279    const MAX_EVENTS: Option<usize> = Some(10_000);
280    const MAX_BYTES: Option<usize> = Some(1_048_576);
281    const TIMEOUT_SECS: f64 = 1.0;
282}
283
284#[cfg(test)]
285mod tests {
286    use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsSinkConfig;
287
288    #[test]
289    fn test_generate_config() {
290        crate::test_util::test_generate_config::<CloudwatchLogsSinkConfig>();
291    }
292}