vector/sinks/aws_cloudwatch_logs/
config.rs

1use std::collections::HashMap;
2
3use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
4use futures::FutureExt;
5use serde::{Deserialize, Deserializer, de};
6use tower::ServiceBuilder;
7use vector_lib::{codecs::JsonSerializerConfig, configurable::configurable_component, schema};
8use vrl::value::Kind;
9
10use crate::{
11    aws::{AwsAuthentication, ClientBuilder, RegionOrEndpoint, create_client},
12    codecs::{Encoder, EncodingConfig},
13    config::{
14        AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
15        SinkContext,
16    },
17    sinks::{
18        Healthcheck, VectorSink,
19        aws_cloudwatch_logs::{
20            healthcheck::healthcheck, request_builder::CloudwatchRequestBuilder,
21            retry::CloudwatchRetryLogic, service::CloudwatchLogsPartitionSvc, sink::CloudwatchSink,
22        },
23        util::{
24            BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, http::RequestConfig,
25        },
26    },
27    template::Template,
28    tls::TlsConfig,
29};
30
31pub struct CloudwatchLogsClientBuilder;
32
33impl ClientBuilder for CloudwatchLogsClientBuilder {
34    type Client = aws_sdk_cloudwatchlogs::client::Client;
35
36    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
37        aws_sdk_cloudwatchlogs::client::Client::new(config)
38    }
39}
40
41#[configurable_component]
42#[derive(Clone, Debug, Default)]
43/// Retention policy configuration for AWS CloudWatch Log Group
44pub struct Retention {
45    /// Whether or not to set a retention policy when creating a new Log Group.
46    #[serde(default)]
47    pub enabled: bool,
48
49    /// If retention is enabled, the number of days to retain logs for.
50    #[serde(
51        default,
52        deserialize_with = "retention_days",
53        skip_serializing_if = "crate::serde::is_default"
54    )]
55    pub days: u32,
56}
57
58fn retention_days<'de, D>(deserializer: D) -> Result<u32, D::Error>
59where
60    D: Deserializer<'de>,
61{
62    let days: u32 = Deserialize::deserialize(deserializer)?;
63    const ALLOWED_VALUES: &[u32] = &[
64        1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557,
65        2922, 3288, 3653,
66    ];
67    if ALLOWED_VALUES.contains(&days) {
68        Ok(days)
69    } else {
70        let msg = format!("one of allowed values: {ALLOWED_VALUES:?}").to_owned();
71        let expected: &str = &msg[..];
72        Err(de::Error::invalid_value(
73            de::Unexpected::Signed(days.into()),
74            &expected,
75        ))
76    }
77}
78
79/// Configuration for the `aws_cloudwatch_logs` sink.
80#[configurable_component(sink(
81    "aws_cloudwatch_logs",
82    "Publish log events to AWS CloudWatch Logs."
83))]
84#[derive(Clone, Debug)]
85#[serde(deny_unknown_fields)]
86pub struct CloudwatchLogsSinkConfig {
87    /// The [group name][group_name] of the target CloudWatch Logs stream.
88    ///
89    /// [group_name]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
90    #[configurable(metadata(docs::examples = "group-name"))]
91    #[configurable(metadata(docs::examples = "{{ file }}"))]
92    pub group_name: Template,
93
94    /// The [stream name][stream_name] of the target CloudWatch Logs stream.
95    ///
96    /// There can only be one writer to a log stream at a time. If multiple instances are writing to
97    /// the same log group, the stream name must include an identifier that is guaranteed to be
98    /// unique per instance.
99    ///
100    /// [stream_name]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
101    #[configurable(metadata(docs::examples = "{{ host }}"))]
102    #[configurable(metadata(docs::examples = "%Y-%m-%d"))]
103    #[configurable(metadata(docs::examples = "stream-name"))]
104    pub stream_name: Template,
105
106    /// The [AWS region][aws_region] of the target service.
107    ///
108    /// [aws_region]: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html
109    #[serde(flatten)]
110    pub region: RegionOrEndpoint,
111
112    /// Dynamically create a [log group][log_group] if it does not already exist.
113    ///
114    /// This ignores `create_missing_stream` directly after creating the group and creates
115    /// the first stream.
116    ///
117    /// [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
118    #[serde(default = "crate::serde::default_true")]
119    pub create_missing_group: bool,
120
121    /// Dynamically create a [log stream][log_stream] if it does not already exist.
122    ///
123    /// [log_stream]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html
124    #[serde(default = "crate::serde::default_true")]
125    pub create_missing_stream: bool,
126
127    #[configurable(derived)]
128    #[serde(default)]
129    pub retention: Retention,
130
131    #[configurable(derived)]
132    pub encoding: EncodingConfig,
133
134    #[configurable(derived)]
135    #[serde(default)]
136    pub compression: Compression,
137
138    #[configurable(derived)]
139    #[serde(default)]
140    pub batch: BatchConfig<CloudwatchLogsDefaultBatchSettings>,
141
142    #[configurable(derived)]
143    #[serde(default)]
144    pub request: RequestConfig,
145
146    #[configurable(derived)]
147    pub tls: Option<TlsConfig>,
148
149    /// The ARN of an [IAM role][iam_role] to assume at startup.
150    ///
151    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
152    #[configurable(deprecated)]
153    #[configurable(metadata(docs::hidden))]
154    pub assume_role: Option<String>,
155
156    #[configurable(derived)]
157    #[serde(default)]
158    pub auth: AwsAuthentication,
159
160    #[configurable(derived)]
161    #[serde(
162        default,
163        deserialize_with = "crate::serde::bool_or_struct",
164        skip_serializing_if = "crate::serde::is_default"
165    )]
166    pub acknowledgements: AcknowledgementsConfig,
167
168    /// The [ARN][arn] (Amazon Resource Name) of the [KMS key][kms_key] to use when encrypting log data.
169    ///
170    /// [arn]: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html
171    /// [kms_key]: https://docs.aws.amazon.com/kms/latest/developerguide/overview.html
172    #[configurable(derived)]
173    #[serde(default)]
174    pub kms_key: Option<String>,
175
176    /// The Key-value pairs to be applied as [tags][tags] to the log group and stream.
177    ///
178    /// [tags]: https://docs.aws.amazon.com/whitepapers/latest/tagging-best-practices/what-are-tags.html
179    #[configurable(derived)]
180    #[serde(default)]
181    #[configurable(metadata(
182        docs::additional_props_description = "A tag represented as a key-value pair"
183    ))]
184    pub tags: Option<HashMap<String, String>>,
185}
186
187impl CloudwatchLogsSinkConfig {
188    pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchLogsClient> {
189        create_client::<CloudwatchLogsClientBuilder>(
190            &CloudwatchLogsClientBuilder {},
191            &self.auth,
192            self.region.region(),
193            self.region.endpoint(),
194            proxy,
195            self.tls.as_ref(),
196            None,
197        )
198        .await
199    }
200}
201
202#[async_trait::async_trait]
203#[typetag::serde(name = "aws_cloudwatch_logs")]
204impl SinkConfig for CloudwatchLogsSinkConfig {
205    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
206        let batcher_settings = self.batch.into_batcher_settings()?;
207        let request_settings = self.request.tower.into_settings();
208        let client = self.create_client(cx.proxy()).await?;
209        let svc = ServiceBuilder::new()
210            .settings(request_settings, CloudwatchRetryLogic::new())
211            .service(CloudwatchLogsPartitionSvc::new(
212                self.clone(),
213                client.clone(),
214            )?);
215        let transformer = self.encoding.transformer();
216        let serializer = self.encoding.build()?;
217        let encoder = Encoder::<()>::new(serializer);
218        let healthcheck = healthcheck(self.clone(), client).boxed();
219        let sink = CloudwatchSink {
220            batcher_settings,
221            request_builder: CloudwatchRequestBuilder {
222                group_template: self.group_name.clone(),
223                stream_template: self.stream_name.clone(),
224                transformer,
225                encoder,
226            },
227
228            service: svc,
229        };
230
231        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
232    }
233
234    fn input(&self) -> Input {
235        let requirement =
236            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
237
238        Input::new(self.encoding.config().input_type() & DataType::Log)
239            .with_schema_requirement(requirement)
240    }
241
242    fn acknowledgements(&self) -> &AcknowledgementsConfig {
243        &self.acknowledgements
244    }
245}
246
247impl GenerateConfig for CloudwatchLogsSinkConfig {
248    fn generate_config() -> toml::Value {
249        toml::Value::try_from(default_config(JsonSerializerConfig::default().into())).unwrap()
250    }
251}
252
253fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
254    CloudwatchLogsSinkConfig {
255        encoding,
256        group_name: Default::default(),
257        stream_name: Default::default(),
258        region: Default::default(),
259        create_missing_group: true,
260        create_missing_stream: true,
261        retention: Default::default(),
262        compression: Default::default(),
263        batch: Default::default(),
264        request: Default::default(),
265        tls: Default::default(),
266        assume_role: Default::default(),
267        auth: Default::default(),
268        acknowledgements: Default::default(),
269        kms_key: Default::default(),
270        tags: Default::default(),
271    }
272}
273
274#[derive(Clone, Copy, Debug, Default)]
275pub struct CloudwatchLogsDefaultBatchSettings;
276
277impl SinkBatchSettings for CloudwatchLogsDefaultBatchSettings {
278    const MAX_EVENTS: Option<usize> = Some(10_000);
279    const MAX_BYTES: Option<usize> = Some(1_048_576);
280    const TIMEOUT_SECS: f64 = 1.0;
281}
282
283#[cfg(test)]
284mod tests {
285    use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsSinkConfig;
286
287    #[test]
288    fn test_generate_config() {
289        crate::test_util::test_generate_config::<CloudwatchLogsSinkConfig>();
290    }
291}