vector/sinks/aws_cloudwatch_metrics/
mod.rs1#[cfg(all(test, feature = "aws-cloudwatch-metrics-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6use std::task::{Context, Poll};
7
8use aws_config::Region;
9use aws_sdk_cloudwatch::{
10 Client as CloudwatchClient,
11 error::SdkError,
12 operation::put_metric_data::PutMetricDataError,
13 types::{Dimension, MetricDatum},
14};
15use aws_smithy_types::DateTime as AwsDateTime;
16use futures::{FutureExt, SinkExt, stream};
17use futures_util::{future, future::BoxFuture};
18use indexmap::IndexMap;
19use tower::Service;
20use vector_lib::{
21 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, sink::VectorSink,
22};
23
24use super::util::service::TowerRequestConfigDefaults;
25use crate::{
26 aws::{
27 ClientBuilder, RegionOrEndpoint, auth::AwsAuthentication, create_client, is_retriable_error,
28 },
29 config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
30 event::{
31 Event,
32 metric::{Metric, MetricTags, MetricValue},
33 },
34 sinks::util::{
35 Compression, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, SinkBatchSettings,
36 TowerRequestConfig,
37 batch::BatchConfig,
38 buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
39 retries::RetryLogic,
40 },
41 tls::TlsConfig,
42};
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct CloudWatchMetricsDefaultBatchSettings;
46
47impl SinkBatchSettings for CloudWatchMetricsDefaultBatchSettings {
48 const MAX_EVENTS: Option<usize> = Some(20);
49 const MAX_BYTES: Option<usize> = None;
50 const TIMEOUT_SECS: f64 = 1.0;
51}
52
53#[derive(Clone, Copy, Debug)]
54pub struct CloudWatchMetricsTowerRequestConfigDefaults;
55
56impl TowerRequestConfigDefaults for CloudWatchMetricsTowerRequestConfigDefaults {
57 const RATE_LIMIT_NUM: u64 = 150;
58}
59
60#[configurable_component(sink(
62 "aws_cloudwatch_metrics",
63 "Publish metric events to AWS CloudWatch Metrics."
64))]
65#[derive(Clone, Debug, Default)]
66#[serde(deny_unknown_fields)]
67pub struct CloudWatchMetricsSinkConfig {
68 #[serde(alias = "namespace")]
75 #[configurable(metadata(docs::examples = "service"))]
76 pub default_namespace: String,
77
78 #[serde(flatten)]
82 pub region: RegionOrEndpoint,
83
84 #[configurable(derived)]
85 #[serde(default)]
86 pub compression: Compression,
87
88 #[configurable(derived)]
89 #[serde(default)]
90 pub batch: BatchConfig<CloudWatchMetricsDefaultBatchSettings>,
91
92 #[configurable(derived)]
93 #[serde(default)]
94 pub request: TowerRequestConfig<CloudWatchMetricsTowerRequestConfigDefaults>,
95
96 #[configurable(derived)]
97 pub tls: Option<TlsConfig>,
98
99 #[configurable(deprecated)]
103 #[configurable(metadata(docs::hidden))]
104 assume_role: Option<String>,
105
106 #[configurable(derived)]
107 #[serde(default)]
108 pub auth: AwsAuthentication,
109
110 #[configurable(derived)]
111 #[serde(
112 default,
113 deserialize_with = "crate::serde::bool_or_struct",
114 skip_serializing_if = "crate::serde::is_default"
115 )]
116 acknowledgements: AcknowledgementsConfig,
117
118 #[configurable(metadata(docs::additional_props_description = "An AWS storage resolution."))]
124 #[serde(default)]
125 pub storage_resolution: IndexMap<String, i32>,
126}
127
128impl_generate_config_from_default!(CloudWatchMetricsSinkConfig);
129
130struct CloudwatchMetricsClientBuilder;
131
132impl ClientBuilder for CloudwatchMetricsClientBuilder {
133 type Client = aws_sdk_cloudwatch::client::Client;
134
135 fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
136 aws_sdk_cloudwatch::client::Client::new(config)
137 }
138}
139
140#[async_trait::async_trait]
141#[typetag::serde(name = "aws_cloudwatch_metrics")]
142impl SinkConfig for CloudWatchMetricsSinkConfig {
143 async fn build(
144 &self,
145 cx: SinkContext,
146 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
147 let client = self.create_client(&cx.proxy).await?;
148 let healthcheck = self.clone().healthcheck(client.clone()).boxed();
149 let sink = CloudWatchMetricsSvc::new(self.clone(), client)?;
150 Ok((sink, healthcheck))
151 }
152
153 fn input(&self) -> Input {
154 Input::metric()
155 }
156
157 fn acknowledgements(&self) -> &AcknowledgementsConfig {
158 &self.acknowledgements
159 }
160}
161
162impl CloudWatchMetricsSinkConfig {
163 async fn healthcheck(self, client: CloudwatchClient) -> crate::Result<()> {
164 client
165 .put_metric_data()
166 .metric_data(
167 MetricDatum::builder()
168 .metric_name("healthcheck")
169 .value(1.0)
170 .build(),
171 )
172 .namespace(&self.default_namespace)
173 .send()
174 .await?;
175
176 Ok(())
177 }
178
179 async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchClient> {
180 let region = if cfg!(test) {
181 Some(Region::new("us-east-1"))
183 } else {
184 self.region.region()
185 };
186
187 create_client::<CloudwatchMetricsClientBuilder>(
188 &CloudwatchMetricsClientBuilder {},
189 &self.auth,
190 region,
191 self.region.endpoint(),
192 proxy,
193 self.tls.as_ref(),
194 None,
195 )
196 .await
197 }
198}
199
200#[derive(Default)]
201struct AwsCloudwatchMetricNormalize;
202
203impl MetricNormalize for AwsCloudwatchMetricNormalize {
204 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
205 match metric.value() {
206 MetricValue::Gauge { .. } => state.make_absolute(metric),
207 _ => state.make_incremental(metric),
208 }
209 }
210}
211
212#[derive(Debug, Clone)]
213struct CloudWatchMetricsRetryLogic;
214
215impl RetryLogic for CloudWatchMetricsRetryLogic {
216 type Error = SdkError<PutMetricDataError>;
217 type Request = PartitionInnerBuffer<Vec<Metric>, String>;
218 type Response = ();
219
220 fn is_retriable_error(&self, error: &Self::Error) -> bool {
221 is_retriable_error(error)
222 }
223}
224
225fn tags_to_dimensions(tags: &MetricTags) -> Vec<Dimension> {
226 tags.iter_single()
228 .take(30)
229 .map(|(k, v)| Dimension::builder().name(k).value(v).build())
230 .collect()
231}
232
233#[derive(Clone)]
234pub struct CloudWatchMetricsSvc {
235 client: CloudwatchClient,
236 storage_resolution: IndexMap<String, i32>,
237}
238
239impl CloudWatchMetricsSvc {
240 pub fn new(
241 config: CloudWatchMetricsSinkConfig,
242 client: CloudwatchClient,
243 ) -> crate::Result<VectorSink> {
244 let default_namespace = config.default_namespace.clone();
245 let batch = config.batch.into_batch_settings()?;
246 let request_settings = config.request.into_settings();
247
248 let service = CloudWatchMetricsSvc {
249 client,
250 storage_resolution: validate_storage_resolutions(config.storage_resolution)?,
251 };
252 let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size));
253 let mut normalizer = MetricNormalizer::<AwsCloudwatchMetricNormalize>::default();
254
255 let sink = request_settings
256 .partition_sink(CloudWatchMetricsRetryLogic, service, buffer, batch.timeout)
257 .sink_map_err(|error| error!(message = "Fatal CloudwatchMetrics sink error.", %error, internal_log_rate_limit = false))
258 .with_flat_map(move |event: Event| {
259 stream::iter({
260 let byte_size = event.allocated_bytes();
261 let json_byte_size = event.estimated_json_encoded_size_of();
262 normalizer.normalize(event.into_metric()).map(|mut metric| {
263 let namespace = metric
264 .take_namespace()
265 .unwrap_or_else(|| default_namespace.clone());
266 Ok(EncodedEvent::new(
267 PartitionInnerBuffer::new(metric, namespace),
268 byte_size,
269 json_byte_size,
270 ))
271 })
272 })
273 });
274
275 #[allow(deprecated)]
276 Ok(VectorSink::from_event_sink(sink))
277 }
278
279 fn encode_events(&mut self, events: Vec<Metric>) -> Vec<MetricDatum> {
280 let resolutions = &self.storage_resolution;
281 events
282 .into_iter()
283 .filter_map(|event| {
284 let metric_name = event.name().to_string();
285 let timestamp = event
286 .timestamp()
287 .map(|x| AwsDateTime::from_millis(x.timestamp_millis()));
288 let dimensions = event.tags().map(tags_to_dimensions);
289 let resolution = resolutions.get(&metric_name).copied();
290 match event.value() {
292 MetricValue::Counter { value } => Some(
293 MetricDatum::builder()
294 .metric_name(metric_name)
295 .value(*value)
296 .set_timestamp(timestamp)
297 .set_dimensions(dimensions)
298 .set_storage_resolution(resolution)
299 .build(),
300 ),
301 MetricValue::Distribution {
302 samples,
303 statistic: _,
304 } => Some(
305 MetricDatum::builder()
306 .metric_name(metric_name)
307 .set_values(Some(samples.iter().map(|s| s.value).collect()))
308 .set_counts(Some(samples.iter().map(|s| s.rate as f64).collect()))
309 .set_timestamp(timestamp)
310 .set_dimensions(dimensions)
311 .set_storage_resolution(resolution)
312 .build(),
313 ),
314 MetricValue::Set { values } => Some(
315 MetricDatum::builder()
316 .metric_name(metric_name)
317 .value(values.len() as f64)
318 .set_timestamp(timestamp)
319 .set_dimensions(dimensions)
320 .set_storage_resolution(resolution)
321 .build(),
322 ),
323 MetricValue::Gauge { value } => Some(
324 MetricDatum::builder()
325 .metric_name(metric_name)
326 .value(*value)
327 .set_timestamp(timestamp)
328 .set_dimensions(dimensions)
329 .set_storage_resolution(resolution)
330 .build(),
331 ),
332 _ => None,
333 }
334 })
335 .collect()
336 }
337}
338
339impl Service<PartitionInnerBuffer<Vec<Metric>, String>> for CloudWatchMetricsSvc {
340 type Response = ();
341 type Error = SdkError<PutMetricDataError>;
342 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
343
344 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
346 Poll::Ready(Ok(()))
347 }
348
349 fn call(&mut self, items: PartitionInnerBuffer<Vec<Metric>, String>) -> Self::Future {
351 let (items, namespace) = items.into_parts();
352 let metric_data = self.encode_events(items);
353 if metric_data.is_empty() {
354 return future::ok(()).boxed();
355 }
356
357 let client = self.client.clone();
358
359 Box::pin(async move {
360 client
361 .put_metric_data()
362 .namespace(namespace)
363 .set_metric_data(Some(metric_data))
364 .send()
365 .await?;
366 Ok(())
367 })
368 }
369}
370
371fn validate_storage_resolutions(
372 storage_resolutions: IndexMap<String, i32>,
373) -> crate::Result<IndexMap<String, i32>> {
374 for (metric_name, storage_resolution) in storage_resolutions.iter() {
375 if !matches!(storage_resolution, 1 | 60) {
376 return Err(
377 format!("Storage resolution for {metric_name} should be '1' or '60'").into(),
378 );
379 }
380 }
381 Ok(storage_resolutions)
382}