vector/sinks/aws_cloudwatch_metrics/
mod.rs

1#[cfg(all(test, feature = "aws-cloudwatch-metrics-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6use aws_config::Region;
7use aws_sdk_cloudwatch::error::SdkError;
8use aws_sdk_cloudwatch::operation::put_metric_data::PutMetricDataError;
9use aws_sdk_cloudwatch::types::{Dimension, MetricDatum};
10use aws_sdk_cloudwatch::Client as CloudwatchClient;
11use aws_smithy_types::DateTime as AwsDateTime;
12use futures::{stream, FutureExt, SinkExt};
13use futures_util::{future, future::BoxFuture};
14use std::task::{Context, Poll};
15use tower::Service;
16use vector_lib::configurable::configurable_component;
17use vector_lib::{sink::VectorSink, ByteSizeOf, EstimatedJsonEncodedSizeOf};
18
19use crate::{
20    aws::{
21        auth::AwsAuthentication, create_client, is_retriable_error, ClientBuilder, RegionOrEndpoint,
22    },
23    config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
24    event::{
25        metric::{Metric, MetricTags, MetricValue},
26        Event,
27    },
28    sinks::util::{
29        batch::BatchConfig,
30        buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
31        retries::RetryLogic,
32        Compression, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, SinkBatchSettings,
33        TowerRequestConfig,
34    },
35    tls::TlsConfig,
36};
37
38use super::util::service::TowerRequestConfigDefaults;
39
40#[derive(Clone, Copy, Debug, Default)]
41pub struct CloudWatchMetricsDefaultBatchSettings;
42
43impl SinkBatchSettings for CloudWatchMetricsDefaultBatchSettings {
44    const MAX_EVENTS: Option<usize> = Some(20);
45    const MAX_BYTES: Option<usize> = None;
46    const TIMEOUT_SECS: f64 = 1.0;
47}
48
49#[derive(Clone, Copy, Debug)]
50pub struct CloudWatchMetricsTowerRequestConfigDefaults;
51
52impl TowerRequestConfigDefaults for CloudWatchMetricsTowerRequestConfigDefaults {
53    const RATE_LIMIT_NUM: u64 = 150;
54}
55
56/// Configuration for the `aws_cloudwatch_metrics` sink.
57#[configurable_component(sink(
58    "aws_cloudwatch_metrics",
59    "Publish metric events to AWS CloudWatch Metrics."
60))]
61#[derive(Clone, Debug, Default)]
62#[serde(deny_unknown_fields)]
63pub struct CloudWatchMetricsSinkConfig {
64    /// The default [namespace][namespace] to use for metrics that do not have one.
65    ///
66    /// Metrics with the same name can only be differentiated by their namespace, and not all
67    /// metrics have their own namespace.
68    ///
69    /// [namespace]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Namespace
70    #[serde(alias = "namespace")]
71    #[configurable(metadata(docs::examples = "service"))]
72    pub default_namespace: String,
73
74    /// The [AWS region][aws_region] of the target service.
75    ///
76    /// [aws_region]: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html
77    #[serde(flatten)]
78    pub region: RegionOrEndpoint,
79
80    #[configurable(derived)]
81    #[serde(default)]
82    pub compression: Compression,
83
84    #[configurable(derived)]
85    #[serde(default)]
86    pub batch: BatchConfig<CloudWatchMetricsDefaultBatchSettings>,
87
88    #[configurable(derived)]
89    #[serde(default)]
90    pub request: TowerRequestConfig<CloudWatchMetricsTowerRequestConfigDefaults>,
91
92    #[configurable(derived)]
93    pub tls: Option<TlsConfig>,
94
95    /// The ARN of an [IAM role][iam_role] to assume at startup.
96    ///
97    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
98    #[configurable(deprecated)]
99    #[configurable(metadata(docs::hidden))]
100    assume_role: Option<String>,
101
102    #[configurable(derived)]
103    #[serde(default)]
104    pub auth: AwsAuthentication,
105
106    #[configurable(derived)]
107    #[serde(
108        default,
109        deserialize_with = "crate::serde::bool_or_struct",
110        skip_serializing_if = "crate::serde::is_default"
111    )]
112    acknowledgements: AcknowledgementsConfig,
113}
114
115impl_generate_config_from_default!(CloudWatchMetricsSinkConfig);
116
117struct CloudwatchMetricsClientBuilder;
118
119impl ClientBuilder for CloudwatchMetricsClientBuilder {
120    type Client = aws_sdk_cloudwatch::client::Client;
121
122    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
123        aws_sdk_cloudwatch::client::Client::new(config)
124    }
125}
126
127#[async_trait::async_trait]
128#[typetag::serde(name = "aws_cloudwatch_metrics")]
129impl SinkConfig for CloudWatchMetricsSinkConfig {
130    async fn build(
131        &self,
132        cx: SinkContext,
133    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
134        let client = self.create_client(&cx.proxy).await?;
135        let healthcheck = self.clone().healthcheck(client.clone()).boxed();
136        let sink = CloudWatchMetricsSvc::new(self.clone(), client)?;
137        Ok((sink, healthcheck))
138    }
139
140    fn input(&self) -> Input {
141        Input::metric()
142    }
143
144    fn acknowledgements(&self) -> &AcknowledgementsConfig {
145        &self.acknowledgements
146    }
147}
148
149impl CloudWatchMetricsSinkConfig {
150    async fn healthcheck(self, client: CloudwatchClient) -> crate::Result<()> {
151        client
152            .put_metric_data()
153            .metric_data(
154                MetricDatum::builder()
155                    .metric_name("healthcheck")
156                    .value(1.0)
157                    .build(),
158            )
159            .namespace(&self.default_namespace)
160            .send()
161            .await?;
162
163        Ok(())
164    }
165
166    async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchClient> {
167        let region = if cfg!(test) {
168            // Moto (used for mocking AWS) doesn't recognize 'custom' as valid region name
169            Some(Region::new("us-east-1"))
170        } else {
171            self.region.region()
172        };
173
174        create_client::<CloudwatchMetricsClientBuilder>(
175            &CloudwatchMetricsClientBuilder {},
176            &self.auth,
177            region,
178            self.region.endpoint(),
179            proxy,
180            self.tls.as_ref(),
181            None,
182        )
183        .await
184    }
185}
186
187#[derive(Default)]
188struct AwsCloudwatchMetricNormalize;
189
190impl MetricNormalize for AwsCloudwatchMetricNormalize {
191    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
192        match metric.value() {
193            MetricValue::Gauge { .. } => state.make_absolute(metric),
194            _ => state.make_incremental(metric),
195        }
196    }
197}
198
199#[derive(Debug, Clone)]
200struct CloudWatchMetricsRetryLogic;
201
202impl RetryLogic for CloudWatchMetricsRetryLogic {
203    type Error = SdkError<PutMetricDataError>;
204    type Request = PartitionInnerBuffer<Vec<Metric>, String>;
205    type Response = ();
206
207    fn is_retriable_error(&self, error: &Self::Error) -> bool {
208        is_retriable_error(error)
209    }
210}
211
212fn tags_to_dimensions(tags: &MetricTags) -> Vec<Dimension> {
213    // according to the API, up to 30 dimensions per metric can be provided
214    tags.iter_single()
215        .take(30)
216        .map(|(k, v)| Dimension::builder().name(k).value(v).build())
217        .collect()
218}
219
220#[derive(Clone)]
221pub struct CloudWatchMetricsSvc {
222    client: CloudwatchClient,
223}
224
225impl CloudWatchMetricsSvc {
226    pub fn new(
227        config: CloudWatchMetricsSinkConfig,
228        client: CloudwatchClient,
229    ) -> crate::Result<VectorSink> {
230        let default_namespace = config.default_namespace.clone();
231        let batch = config.batch.into_batch_settings()?;
232        let request_settings = config.request.into_settings();
233
234        let service = CloudWatchMetricsSvc { client };
235        let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size));
236        let mut normalizer = MetricNormalizer::<AwsCloudwatchMetricNormalize>::default();
237
238        let sink = request_settings
239            .partition_sink(CloudWatchMetricsRetryLogic, service, buffer, batch.timeout)
240            .sink_map_err(|error| error!(message = "Fatal CloudwatchMetrics sink error.", %error))
241            .with_flat_map(move |event: Event| {
242                stream::iter({
243                    let byte_size = event.allocated_bytes();
244                    let json_byte_size = event.estimated_json_encoded_size_of();
245                    normalizer.normalize(event.into_metric()).map(|mut metric| {
246                        let namespace = metric
247                            .take_namespace()
248                            .unwrap_or_else(|| default_namespace.clone());
249                        Ok(EncodedEvent::new(
250                            PartitionInnerBuffer::new(metric, namespace),
251                            byte_size,
252                            json_byte_size,
253                        ))
254                    })
255                })
256            });
257
258        #[allow(deprecated)]
259        Ok(VectorSink::from_event_sink(sink))
260    }
261
262    fn encode_events(&mut self, events: Vec<Metric>) -> Vec<MetricDatum> {
263        events
264            .into_iter()
265            .filter_map(|event| {
266                let metric_name = event.name().to_string();
267                let timestamp = event
268                    .timestamp()
269                    .map(|x| AwsDateTime::from_millis(x.timestamp_millis()));
270                let dimensions = event.tags().map(tags_to_dimensions);
271                // AwsCloudwatchMetricNormalize converts these to the right MetricKind
272                match event.value() {
273                    MetricValue::Counter { value } => Some(
274                        MetricDatum::builder()
275                            .metric_name(metric_name)
276                            .value(*value)
277                            .set_timestamp(timestamp)
278                            .set_dimensions(dimensions)
279                            .build(),
280                    ),
281                    MetricValue::Distribution {
282                        samples,
283                        statistic: _,
284                    } => Some(
285                        MetricDatum::builder()
286                            .metric_name(metric_name)
287                            .set_values(Some(samples.iter().map(|s| s.value).collect()))
288                            .set_counts(Some(samples.iter().map(|s| s.rate as f64).collect()))
289                            .set_timestamp(timestamp)
290                            .set_dimensions(dimensions)
291                            .build(),
292                    ),
293                    MetricValue::Set { values } => Some(
294                        MetricDatum::builder()
295                            .metric_name(metric_name)
296                            .value(values.len() as f64)
297                            .set_timestamp(timestamp)
298                            .set_dimensions(dimensions)
299                            .build(),
300                    ),
301                    MetricValue::Gauge { value } => Some(
302                        MetricDatum::builder()
303                            .metric_name(metric_name)
304                            .value(*value)
305                            .set_timestamp(timestamp)
306                            .set_dimensions(dimensions)
307                            .build(),
308                    ),
309                    _ => None,
310                }
311            })
312            .collect()
313    }
314}
315
316impl Service<PartitionInnerBuffer<Vec<Metric>, String>> for CloudWatchMetricsSvc {
317    type Response = ();
318    type Error = SdkError<PutMetricDataError>;
319    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
320
321    // Emission of an internal event in case of errors is handled upstream by the caller.
322    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
323        Poll::Ready(Ok(()))
324    }
325
326    // Emission of internal events for errors and dropped events is handled upstream by the caller.
327    fn call(&mut self, items: PartitionInnerBuffer<Vec<Metric>, String>) -> Self::Future {
328        let (items, namespace) = items.into_parts();
329        let metric_data = self.encode_events(items);
330        if metric_data.is_empty() {
331            return future::ok(()).boxed();
332        }
333
334        let client = self.client.clone();
335
336        Box::pin(async move {
337            client
338                .put_metric_data()
339                .namespace(namespace)
340                .set_metric_data(Some(metric_data))
341                .send()
342                .await?;
343            Ok(())
344        })
345    }
346}