vector/sinks/aws_cloudwatch_metrics/
mod.rs

1#[cfg(all(test, feature = "aws-cloudwatch-metrics-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6use std::task::{Context, Poll};
7
8use aws_config::Region;
9use aws_sdk_cloudwatch::{
10    Client as CloudwatchClient,
11    error::SdkError,
12    operation::put_metric_data::PutMetricDataError,
13    types::{Dimension, MetricDatum},
14};
15use aws_smithy_types::DateTime as AwsDateTime;
16use futures::{FutureExt, SinkExt, stream};
17use futures_util::{future, future::BoxFuture};
18use tower::Service;
19use vector_lib::{
20    ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, sink::VectorSink,
21};
22
23use super::util::service::TowerRequestConfigDefaults;
24use crate::{
25    aws::{
26        ClientBuilder, RegionOrEndpoint, auth::AwsAuthentication, create_client, is_retriable_error,
27    },
28    config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
29    event::{
30        Event,
31        metric::{Metric, MetricTags, MetricValue},
32    },
33    sinks::util::{
34        Compression, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, SinkBatchSettings,
35        TowerRequestConfig,
36        batch::BatchConfig,
37        buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
38        retries::RetryLogic,
39    },
40    tls::TlsConfig,
41};
42
43#[derive(Clone, Copy, Debug, Default)]
44pub struct CloudWatchMetricsDefaultBatchSettings;
45
46impl SinkBatchSettings for CloudWatchMetricsDefaultBatchSettings {
47    const MAX_EVENTS: Option<usize> = Some(20);
48    const MAX_BYTES: Option<usize> = None;
49    const TIMEOUT_SECS: f64 = 1.0;
50}
51
52#[derive(Clone, Copy, Debug)]
53pub struct CloudWatchMetricsTowerRequestConfigDefaults;
54
55impl TowerRequestConfigDefaults for CloudWatchMetricsTowerRequestConfigDefaults {
56    const RATE_LIMIT_NUM: u64 = 150;
57}
58
59/// Configuration for the `aws_cloudwatch_metrics` sink.
60#[configurable_component(sink(
61    "aws_cloudwatch_metrics",
62    "Publish metric events to AWS CloudWatch Metrics."
63))]
64#[derive(Clone, Debug, Default)]
65#[serde(deny_unknown_fields)]
66pub struct CloudWatchMetricsSinkConfig {
67    /// The default [namespace][namespace] to use for metrics that do not have one.
68    ///
69    /// Metrics with the same name can only be differentiated by their namespace, and not all
70    /// metrics have their own namespace.
71    ///
72    /// [namespace]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html#Namespace
73    #[serde(alias = "namespace")]
74    #[configurable(metadata(docs::examples = "service"))]
75    pub default_namespace: String,
76
77    /// The [AWS region][aws_region] of the target service.
78    ///
79    /// [aws_region]: https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html
80    #[serde(flatten)]
81    pub region: RegionOrEndpoint,
82
83    #[configurable(derived)]
84    #[serde(default)]
85    pub compression: Compression,
86
87    #[configurable(derived)]
88    #[serde(default)]
89    pub batch: BatchConfig<CloudWatchMetricsDefaultBatchSettings>,
90
91    #[configurable(derived)]
92    #[serde(default)]
93    pub request: TowerRequestConfig<CloudWatchMetricsTowerRequestConfigDefaults>,
94
95    #[configurable(derived)]
96    pub tls: Option<TlsConfig>,
97
98    /// The ARN of an [IAM role][iam_role] to assume at startup.
99    ///
100    /// [iam_role]: https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html
101    #[configurable(deprecated)]
102    #[configurable(metadata(docs::hidden))]
103    assume_role: Option<String>,
104
105    #[configurable(derived)]
106    #[serde(default)]
107    pub auth: AwsAuthentication,
108
109    #[configurable(derived)]
110    #[serde(
111        default,
112        deserialize_with = "crate::serde::bool_or_struct",
113        skip_serializing_if = "crate::serde::is_default"
114    )]
115    acknowledgements: AcknowledgementsConfig,
116}
117
118impl_generate_config_from_default!(CloudWatchMetricsSinkConfig);
119
120struct CloudwatchMetricsClientBuilder;
121
122impl ClientBuilder for CloudwatchMetricsClientBuilder {
123    type Client = aws_sdk_cloudwatch::client::Client;
124
125    fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
126        aws_sdk_cloudwatch::client::Client::new(config)
127    }
128}
129
130#[async_trait::async_trait]
131#[typetag::serde(name = "aws_cloudwatch_metrics")]
132impl SinkConfig for CloudWatchMetricsSinkConfig {
133    async fn build(
134        &self,
135        cx: SinkContext,
136    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
137        let client = self.create_client(&cx.proxy).await?;
138        let healthcheck = self.clone().healthcheck(client.clone()).boxed();
139        let sink = CloudWatchMetricsSvc::new(self.clone(), client)?;
140        Ok((sink, healthcheck))
141    }
142
143    fn input(&self) -> Input {
144        Input::metric()
145    }
146
147    fn acknowledgements(&self) -> &AcknowledgementsConfig {
148        &self.acknowledgements
149    }
150}
151
152impl CloudWatchMetricsSinkConfig {
153    async fn healthcheck(self, client: CloudwatchClient) -> crate::Result<()> {
154        client
155            .put_metric_data()
156            .metric_data(
157                MetricDatum::builder()
158                    .metric_name("healthcheck")
159                    .value(1.0)
160                    .build(),
161            )
162            .namespace(&self.default_namespace)
163            .send()
164            .await?;
165
166        Ok(())
167    }
168
169    async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchClient> {
170        let region = if cfg!(test) {
171            // Moto (used for mocking AWS) doesn't recognize 'custom' as valid region name
172            Some(Region::new("us-east-1"))
173        } else {
174            self.region.region()
175        };
176
177        create_client::<CloudwatchMetricsClientBuilder>(
178            &CloudwatchMetricsClientBuilder {},
179            &self.auth,
180            region,
181            self.region.endpoint(),
182            proxy,
183            self.tls.as_ref(),
184            None,
185        )
186        .await
187    }
188}
189
190#[derive(Default)]
191struct AwsCloudwatchMetricNormalize;
192
193impl MetricNormalize for AwsCloudwatchMetricNormalize {
194    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
195        match metric.value() {
196            MetricValue::Gauge { .. } => state.make_absolute(metric),
197            _ => state.make_incremental(metric),
198        }
199    }
200}
201
202#[derive(Debug, Clone)]
203struct CloudWatchMetricsRetryLogic;
204
205impl RetryLogic for CloudWatchMetricsRetryLogic {
206    type Error = SdkError<PutMetricDataError>;
207    type Request = PartitionInnerBuffer<Vec<Metric>, String>;
208    type Response = ();
209
210    fn is_retriable_error(&self, error: &Self::Error) -> bool {
211        is_retriable_error(error)
212    }
213}
214
215fn tags_to_dimensions(tags: &MetricTags) -> Vec<Dimension> {
216    // according to the API, up to 30 dimensions per metric can be provided
217    tags.iter_single()
218        .take(30)
219        .map(|(k, v)| Dimension::builder().name(k).value(v).build())
220        .collect()
221}
222
223#[derive(Clone)]
224pub struct CloudWatchMetricsSvc {
225    client: CloudwatchClient,
226}
227
228impl CloudWatchMetricsSvc {
229    pub fn new(
230        config: CloudWatchMetricsSinkConfig,
231        client: CloudwatchClient,
232    ) -> crate::Result<VectorSink> {
233        let default_namespace = config.default_namespace.clone();
234        let batch = config.batch.into_batch_settings()?;
235        let request_settings = config.request.into_settings();
236
237        let service = CloudWatchMetricsSvc { client };
238        let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size));
239        let mut normalizer = MetricNormalizer::<AwsCloudwatchMetricNormalize>::default();
240
241        let sink = request_settings
242            .partition_sink(CloudWatchMetricsRetryLogic, service, buffer, batch.timeout)
243            .sink_map_err(|error| error!(message = "Fatal CloudwatchMetrics sink error.", %error))
244            .with_flat_map(move |event: Event| {
245                stream::iter({
246                    let byte_size = event.allocated_bytes();
247                    let json_byte_size = event.estimated_json_encoded_size_of();
248                    normalizer.normalize(event.into_metric()).map(|mut metric| {
249                        let namespace = metric
250                            .take_namespace()
251                            .unwrap_or_else(|| default_namespace.clone());
252                        Ok(EncodedEvent::new(
253                            PartitionInnerBuffer::new(metric, namespace),
254                            byte_size,
255                            json_byte_size,
256                        ))
257                    })
258                })
259            });
260
261        #[allow(deprecated)]
262        Ok(VectorSink::from_event_sink(sink))
263    }
264
265    fn encode_events(&mut self, events: Vec<Metric>) -> Vec<MetricDatum> {
266        events
267            .into_iter()
268            .filter_map(|event| {
269                let metric_name = event.name().to_string();
270                let timestamp = event
271                    .timestamp()
272                    .map(|x| AwsDateTime::from_millis(x.timestamp_millis()));
273                let dimensions = event.tags().map(tags_to_dimensions);
274                // AwsCloudwatchMetricNormalize converts these to the right MetricKind
275                match event.value() {
276                    MetricValue::Counter { value } => Some(
277                        MetricDatum::builder()
278                            .metric_name(metric_name)
279                            .value(*value)
280                            .set_timestamp(timestamp)
281                            .set_dimensions(dimensions)
282                            .build(),
283                    ),
284                    MetricValue::Distribution {
285                        samples,
286                        statistic: _,
287                    } => Some(
288                        MetricDatum::builder()
289                            .metric_name(metric_name)
290                            .set_values(Some(samples.iter().map(|s| s.value).collect()))
291                            .set_counts(Some(samples.iter().map(|s| s.rate as f64).collect()))
292                            .set_timestamp(timestamp)
293                            .set_dimensions(dimensions)
294                            .build(),
295                    ),
296                    MetricValue::Set { values } => Some(
297                        MetricDatum::builder()
298                            .metric_name(metric_name)
299                            .value(values.len() as f64)
300                            .set_timestamp(timestamp)
301                            .set_dimensions(dimensions)
302                            .build(),
303                    ),
304                    MetricValue::Gauge { value } => Some(
305                        MetricDatum::builder()
306                            .metric_name(metric_name)
307                            .value(*value)
308                            .set_timestamp(timestamp)
309                            .set_dimensions(dimensions)
310                            .build(),
311                    ),
312                    _ => None,
313                }
314            })
315            .collect()
316    }
317}
318
319impl Service<PartitionInnerBuffer<Vec<Metric>, String>> for CloudWatchMetricsSvc {
320    type Response = ();
321    type Error = SdkError<PutMetricDataError>;
322    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
323
324    // Emission of an internal event in case of errors is handled upstream by the caller.
325    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
326        Poll::Ready(Ok(()))
327    }
328
329    // Emission of internal events for errors and dropped events is handled upstream by the caller.
330    fn call(&mut self, items: PartitionInnerBuffer<Vec<Metric>, String>) -> Self::Future {
331        let (items, namespace) = items.into_parts();
332        let metric_data = self.encode_events(items);
333        if metric_data.is_empty() {
334            return future::ok(()).boxed();
335        }
336
337        let client = self.client.clone();
338
339        Box::pin(async move {
340            client
341                .put_metric_data()
342                .namespace(namespace)
343                .set_metric_data(Some(metric_data))
344                .send()
345                .await?;
346            Ok(())
347        })
348    }
349}