vector/sinks/aws_cloudwatch_metrics/
mod.rs

1#[cfg(all(test, feature = "aws-cloudwatch-metrics-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6use std::task::{Context, Poll};
7
8use aws_config::Region;
9use aws_sdk_cloudwatch::{
10    Client as CloudwatchClient,
11    error::SdkError,
12    operation::put_metric_data::PutMetricDataError,
13    types::{Dimension, MetricDatum},
14};
15use aws_smithy_types::DateTime as AwsDateTime;
16use futures::{FutureExt, SinkExt, stream};
17use futures_util::{future, future::BoxFuture};
18use indexmap::IndexMap;
19use tower::Service;
20use vector_lib::{
21    ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, sink::VectorSink,
22};
23
24use super::util::service::TowerRequestConfigDefaults;
25use crate::{
26    aws::{
27        ClientBuilder, RegionOrEndpoint, auth::AwsAuthentication, create_client, is_retriable_error,
28    },
29    config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
30    event::{
31        Event,
32        metric::{Metric, MetricTags, MetricValue},
33    },
34    sinks::util::{
35        Compression, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, SinkBatchSettings,
36        TowerRequestConfig,
37        batch::BatchConfig,
38        buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
39        retries::RetryLogic,
40    },
41    tls::TlsConfig,
42};
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct CloudWatchMetricsDefaultBatchSettings;
46
47impl SinkBatchSettings for CloudWatchMetricsDefaultBatchSettings {
48    const MAX_EVENTS: Option<usize> = Some(20);
49    const MAX_BYTES: Option<usize> = None;
50    const TIMEOUT_SECS: f64 = 1.0;
51}
52
53#[derive(Clone, Copy, Debug)]
54pub struct CloudWatchMetricsTowerRequestConfigDefaults;
55
56impl TowerRequestConfigDefaults for CloudWatchMetricsTowerRequestConfigDefaults {
57    const RATE_LIMIT_NUM: u64 = 150;
58}
59
60/// Configuration for the `aws_cloudwatch_metrics` sink.
61#[configurable_component(sink(
62    "aws_cloudwatch_metrics",
63    "Publish metric events to AWS CloudWatch Metrics."
64))]
65#[derive(Clone, Debug, Default)]
66#[serde(deny_unknown_fields)]
67pub struct CloudWatchMetricsSinkConfig {
68    /// The default [namespace][namespace] to use for metrics that do not have one.
69    ///
70    /// Metrics with the same name can only be differentiated by their namespace, and not all
71    /// metrics have their own namespace.
72    ///
73    /// [namespace]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Namespace
74    #[serde(alias = "namespace")]
75    #[configurable(metadata(docs::examples = "service"))]
76    pub default_namespace: String,
77
78    /// The [AWS region][aws_region] of the target service.
79    ///
80    /// [aws_region]: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html
81    #[serde(flatten)]
82    pub region: RegionOrEndpoint,
83
84    #[configurable(derived)]
85    #[serde(default)]
86    pub compression: Compression,
87
88    #[configurable(derived)]
89    #[serde(default)]
90    pub batch: BatchConfig<CloudWatchMetricsDefaultBatchSettings>,
91
92    #[configurable(derived)]
93    #[serde(default)]
94    pub request: TowerRequestConfig<CloudWatchMetricsTowerRequestConfigDefaults>,
95
96    #[configurable(derived)]
97    pub tls: Option<TlsConfig>,
98
99    /// The ARN of an [IAM role][iam_role] to assume at startup.
100    ///
101    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
102    #[configurable(deprecated)]
103    #[configurable(metadata(docs::hidden))]
104    assume_role: Option<String>,
105
106    #[configurable(derived)]
107    #[serde(default)]
108    pub auth: AwsAuthentication,
109
110    #[configurable(derived)]
111    #[serde(
112        default,
113        deserialize_with = "crate::serde::bool_or_struct",
114        skip_serializing_if = "crate::serde::is_default"
115    )]
116    acknowledgements: AcknowledgementsConfig,
117
118    /// A map from metric name to AWS storage resolution.
119    /// Valid values are 1 (high resolution) and 60 (standard resolution).
120    /// If unset, the AWS SDK default of 60 (standard resolution) is used.
121    /// See [AWS Metrics Resolution](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Resolution_definition)
122    /// See [MetricDatum::storage_resolution](https://docs.rs/aws-sdk-cloudwatch/1.91.0/aws_sdk_cloudwatch/types/struct.MetricDatum.html#structfield.storage_resolution)
123    #[configurable(metadata(docs::additional_props_description = "An AWS storage resolution."))]
124    #[serde(default)]
125    pub storage_resolution: IndexMap<String, i32>,
126}
127
128impl_generate_config_from_default!(CloudWatchMetricsSinkConfig);
129
130struct CloudwatchMetricsClientBuilder;
131
132impl ClientBuilder for CloudwatchMetricsClientBuilder {
133    type Client = aws_sdk_cloudwatch::client::Client;
134
135    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
136        aws_sdk_cloudwatch::client::Client::new(config)
137    }
138}
139
140#[async_trait::async_trait]
141#[typetag::serde(name = "aws_cloudwatch_metrics")]
142impl SinkConfig for CloudWatchMetricsSinkConfig {
143    async fn build(
144        &self,
145        cx: SinkContext,
146    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
147        let client = self.create_client(&cx.proxy).await?;
148        let healthcheck = self.clone().healthcheck(client.clone()).boxed();
149        let sink = CloudWatchMetricsSvc::new(self.clone(), client)?;
150        Ok((sink, healthcheck))
151    }
152
153    fn input(&self) -> Input {
154        Input::metric()
155    }
156
157    fn acknowledgements(&self) -> &AcknowledgementsConfig {
158        &self.acknowledgements
159    }
160}
161
162impl CloudWatchMetricsSinkConfig {
163    async fn healthcheck(self, client: CloudwatchClient) -> crate::Result<()> {
164        client
165            .put_metric_data()
166            .metric_data(
167                MetricDatum::builder()
168                    .metric_name("healthcheck")
169                    .value(1.0)
170                    .build(),
171            )
172            .namespace(&self.default_namespace)
173            .send()
174            .await?;
175
176        Ok(())
177    }
178
179    async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchClient> {
180        let region = if cfg!(test) {
181            // Moto (used for mocking AWS) doesn't recognize 'custom' as valid region name
182            Some(Region::new("us-east-1"))
183        } else {
184            self.region.region()
185        };
186
187        create_client::<CloudwatchMetricsClientBuilder>(
188            &CloudwatchMetricsClientBuilder {},
189            &self.auth,
190            region,
191            self.region.endpoint(),
192            proxy,
193            self.tls.as_ref(),
194            None,
195        )
196        .await
197    }
198}
199
200#[derive(Default)]
201struct AwsCloudwatchMetricNormalize;
202
203impl MetricNormalize for AwsCloudwatchMetricNormalize {
204    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
205        match metric.value() {
206            MetricValue::Gauge { .. } => state.make_absolute(metric),
207            _ => state.make_incremental(metric),
208        }
209    }
210}
211
212#[derive(Debug, Clone)]
213struct CloudWatchMetricsRetryLogic;
214
215impl RetryLogic for CloudWatchMetricsRetryLogic {
216    type Error = SdkError<PutMetricDataError>;
217    type Request = PartitionInnerBuffer<Vec<Metric>, String>;
218    type Response = ();
219
220    fn is_retriable_error(&self, error: &Self::Error) -> bool {
221        is_retriable_error(error)
222    }
223}
224
225fn tags_to_dimensions(tags: &MetricTags) -> Vec<Dimension> {
226    // according to the API, up to 30 dimensions per metric can be provided
227    tags.iter_single()
228        .take(30)
229        .map(|(k, v)| Dimension::builder().name(k).value(v).build())
230        .collect()
231}
232
233#[derive(Clone)]
234pub struct CloudWatchMetricsSvc {
235    client: CloudwatchClient,
236    storage_resolution: IndexMap<String, i32>,
237}
238
239impl CloudWatchMetricsSvc {
240    pub fn new(
241        config: CloudWatchMetricsSinkConfig,
242        client: CloudwatchClient,
243    ) -> crate::Result<VectorSink> {
244        let default_namespace = config.default_namespace.clone();
245        let batch = config.batch.into_batch_settings()?;
246        let request_settings = config.request.into_settings();
247
248        let service = CloudWatchMetricsSvc {
249            client,
250            storage_resolution: validate_storage_resolutions(config.storage_resolution)?,
251        };
252        let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size));
253        let mut normalizer = MetricNormalizer::<AwsCloudwatchMetricNormalize>::default();
254
255        let sink = request_settings
256            .partition_sink(CloudWatchMetricsRetryLogic, service, buffer, batch.timeout)
257            .sink_map_err(|error| error!(message = "Fatal CloudwatchMetrics sink error.", %error, internal_log_rate_limit = false))
258            .with_flat_map(move |event: Event| {
259                stream::iter({
260                    let byte_size = event.allocated_bytes();
261                    let json_byte_size = event.estimated_json_encoded_size_of();
262                    normalizer.normalize(event.into_metric()).map(|mut metric| {
263                        let namespace = metric
264                            .take_namespace()
265                            .unwrap_or_else(|| default_namespace.clone());
266                        Ok(EncodedEvent::new(
267                            PartitionInnerBuffer::new(metric, namespace),
268                            byte_size,
269                            json_byte_size,
270                        ))
271                    })
272                })
273            });
274
275        #[allow(deprecated)]
276        Ok(VectorSink::from_event_sink(sink))
277    }
278
279    fn encode_events(&mut self, events: Vec<Metric>) -> Vec<MetricDatum> {
280        let resolutions = &self.storage_resolution;
281        events
282            .into_iter()
283            .filter_map(|event| {
284                let metric_name = event.name().to_string();
285                let timestamp = event
286                    .timestamp()
287                    .map(|x| AwsDateTime::from_millis(x.timestamp_millis()));
288                let dimensions = event.tags().map(tags_to_dimensions);
289                let resolution = resolutions.get(&metric_name).copied();
290                // AwsCloudwatchMetricNormalize converts these to the right MetricKind
291                match event.value() {
292                    MetricValue::Counter { value } => Some(
293                        MetricDatum::builder()
294                            .metric_name(metric_name)
295                            .value(*value)
296                            .set_timestamp(timestamp)
297                            .set_dimensions(dimensions)
298                            .set_storage_resolution(resolution)
299                            .build(),
300                    ),
301                    MetricValue::Distribution {
302                        samples,
303                        statistic: _,
304                    } => Some(
305                        MetricDatum::builder()
306                            .metric_name(metric_name)
307                            .set_values(Some(samples.iter().map(|s| s.value).collect()))
308                            .set_counts(Some(samples.iter().map(|s| s.rate as f64).collect()))
309                            .set_timestamp(timestamp)
310                            .set_dimensions(dimensions)
311                            .set_storage_resolution(resolution)
312                            .build(),
313                    ),
314                    MetricValue::Set { values } => Some(
315                        MetricDatum::builder()
316                            .metric_name(metric_name)
317                            .value(values.len() as f64)
318                            .set_timestamp(timestamp)
319                            .set_dimensions(dimensions)
320                            .set_storage_resolution(resolution)
321                            .build(),
322                    ),
323                    MetricValue::Gauge { value } => Some(
324                        MetricDatum::builder()
325                            .metric_name(metric_name)
326                            .value(*value)
327                            .set_timestamp(timestamp)
328                            .set_dimensions(dimensions)
329                            .set_storage_resolution(resolution)
330                            .build(),
331                    ),
332                    _ => None,
333                }
334            })
335            .collect()
336    }
337}
338
339impl Service<PartitionInnerBuffer<Vec<Metric>, String>> for CloudWatchMetricsSvc {
340    type Response = ();
341    type Error = SdkError<PutMetricDataError>;
342    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
343
344    // Emission of an internal event in case of errors is handled upstream by the caller.
345    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
346        Poll::Ready(Ok(()))
347    }
348
349    // Emission of internal events for errors and dropped events is handled upstream by the caller.
350    fn call(&mut self, items: PartitionInnerBuffer<Vec<Metric>, String>) -> Self::Future {
351        let (items, namespace) = items.into_parts();
352        let metric_data = self.encode_events(items);
353        if metric_data.is_empty() {
354            return future::ok(()).boxed();
355        }
356
357        let client = self.client.clone();
358
359        Box::pin(async move {
360            client
361                .put_metric_data()
362                .namespace(namespace)
363                .set_metric_data(Some(metric_data))
364                .send()
365                .await?;
366            Ok(())
367        })
368    }
369}
370
371fn validate_storage_resolutions(
372    storage_resolutions: IndexMap<String, i32>,
373) -> crate::Result<IndexMap<String, i32>> {
374    for (metric_name, storage_resolution) in storage_resolutions.iter() {
375        if !matches!(storage_resolution, 1 | 60) {
376            return Err(
377                format!("Storage resolution for {metric_name} should be '1' or '60'").into(),
378            );
379        }
380    }
381    Ok(storage_resolutions)
382}