vector/sinks/aws_cloudwatch_metrics/
mod.rs1#[cfg(all(test, feature = "aws-cloudwatch-metrics-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6use aws_config::Region;
7use aws_sdk_cloudwatch::error::SdkError;
8use aws_sdk_cloudwatch::operation::put_metric_data::PutMetricDataError;
9use aws_sdk_cloudwatch::types::{Dimension, MetricDatum};
10use aws_sdk_cloudwatch::Client as CloudwatchClient;
11use aws_smithy_types::DateTime as AwsDateTime;
12use futures::{stream, FutureExt, SinkExt};
13use futures_util::{future, future::BoxFuture};
14use std::task::{Context, Poll};
15use tower::Service;
16use vector_lib::configurable::configurable_component;
17use vector_lib::{sink::VectorSink, ByteSizeOf, EstimatedJsonEncodedSizeOf};
18
19use crate::{
20 aws::{
21 auth::AwsAuthentication, create_client, is_retriable_error, ClientBuilder, RegionOrEndpoint,
22 },
23 config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
24 event::{
25 metric::{Metric, MetricTags, MetricValue},
26 Event,
27 },
28 sinks::util::{
29 batch::BatchConfig,
30 buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
31 retries::RetryLogic,
32 Compression, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, SinkBatchSettings,
33 TowerRequestConfig,
34 },
35 tls::TlsConfig,
36};
37
38use super::util::service::TowerRequestConfigDefaults;
39
40#[derive(Clone, Copy, Debug, Default)]
41pub struct CloudWatchMetricsDefaultBatchSettings;
42
43impl SinkBatchSettings for CloudWatchMetricsDefaultBatchSettings {
44 const MAX_EVENTS: Option<usize> = Some(20);
45 const MAX_BYTES: Option<usize> = None;
46 const TIMEOUT_SECS: f64 = 1.0;
47}
48
49#[derive(Clone, Copy, Debug)]
50pub struct CloudWatchMetricsTowerRequestConfigDefaults;
51
52impl TowerRequestConfigDefaults for CloudWatchMetricsTowerRequestConfigDefaults {
53 const RATE_LIMIT_NUM: u64 = 150;
54}
55
56#[configurable_component(sink(
58 "aws_cloudwatch_metrics",
59 "Publish metric events to AWS CloudWatch Metrics."
60))]
61#[derive(Clone, Debug, Default)]
62#[serde(deny_unknown_fields)]
63pub struct CloudWatchMetricsSinkConfig {
64 #[serde(alias = "namespace")]
71 #[configurable(metadata(docs::examples = "service"))]
72 pub default_namespace: String,
73
74 #[serde(flatten)]
78 pub region: RegionOrEndpoint,
79
80 #[configurable(derived)]
81 #[serde(default)]
82 pub compression: Compression,
83
84 #[configurable(derived)]
85 #[serde(default)]
86 pub batch: BatchConfig<CloudWatchMetricsDefaultBatchSettings>,
87
88 #[configurable(derived)]
89 #[serde(default)]
90 pub request: TowerRequestConfig<CloudWatchMetricsTowerRequestConfigDefaults>,
91
92 #[configurable(derived)]
93 pub tls: Option<TlsConfig>,
94
95 #[configurable(deprecated)]
99 #[configurable(metadata(docs::hidden))]
100 assume_role: Option<String>,
101
102 #[configurable(derived)]
103 #[serde(default)]
104 pub auth: AwsAuthentication,
105
106 #[configurable(derived)]
107 #[serde(
108 default,
109 deserialize_with = "crate::serde::bool_or_struct",
110 skip_serializing_if = "crate::serde::is_default"
111 )]
112 acknowledgements: AcknowledgementsConfig,
113}
114
115impl_generate_config_from_default!(CloudWatchMetricsSinkConfig);
116
117struct CloudwatchMetricsClientBuilder;
118
119impl ClientBuilder for CloudwatchMetricsClientBuilder {
120 type Client = aws_sdk_cloudwatch::client::Client;
121
122 fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
123 aws_sdk_cloudwatch::client::Client::new(config)
124 }
125}
126
127#[async_trait::async_trait]
128#[typetag::serde(name = "aws_cloudwatch_metrics")]
129impl SinkConfig for CloudWatchMetricsSinkConfig {
130 async fn build(
131 &self,
132 cx: SinkContext,
133 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
134 let client = self.create_client(&cx.proxy).await?;
135 let healthcheck = self.clone().healthcheck(client.clone()).boxed();
136 let sink = CloudWatchMetricsSvc::new(self.clone(), client)?;
137 Ok((sink, healthcheck))
138 }
139
140 fn input(&self) -> Input {
141 Input::metric()
142 }
143
144 fn acknowledgements(&self) -> &AcknowledgementsConfig {
145 &self.acknowledgements
146 }
147}
148
149impl CloudWatchMetricsSinkConfig {
150 async fn healthcheck(self, client: CloudwatchClient) -> crate::Result<()> {
151 client
152 .put_metric_data()
153 .metric_data(
154 MetricDatum::builder()
155 .metric_name("healthcheck")
156 .value(1.0)
157 .build(),
158 )
159 .namespace(&self.default_namespace)
160 .send()
161 .await?;
162
163 Ok(())
164 }
165
166 async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchClient> {
167 let region = if cfg!(test) {
168 Some(Region::new("us-east-1"))
170 } else {
171 self.region.region()
172 };
173
174 create_client::<CloudwatchMetricsClientBuilder>(
175 &CloudwatchMetricsClientBuilder {},
176 &self.auth,
177 region,
178 self.region.endpoint(),
179 proxy,
180 self.tls.as_ref(),
181 None,
182 )
183 .await
184 }
185}
186
187#[derive(Default)]
188struct AwsCloudwatchMetricNormalize;
189
190impl MetricNormalize for AwsCloudwatchMetricNormalize {
191 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
192 match metric.value() {
193 MetricValue::Gauge { .. } => state.make_absolute(metric),
194 _ => state.make_incremental(metric),
195 }
196 }
197}
198
199#[derive(Debug, Clone)]
200struct CloudWatchMetricsRetryLogic;
201
202impl RetryLogic for CloudWatchMetricsRetryLogic {
203 type Error = SdkError<PutMetricDataError>;
204 type Request = PartitionInnerBuffer<Vec<Metric>, String>;
205 type Response = ();
206
207 fn is_retriable_error(&self, error: &Self::Error) -> bool {
208 is_retriable_error(error)
209 }
210}
211
212fn tags_to_dimensions(tags: &MetricTags) -> Vec<Dimension> {
213 tags.iter_single()
215 .take(30)
216 .map(|(k, v)| Dimension::builder().name(k).value(v).build())
217 .collect()
218}
219
220#[derive(Clone)]
221pub struct CloudWatchMetricsSvc {
222 client: CloudwatchClient,
223}
224
225impl CloudWatchMetricsSvc {
226 pub fn new(
227 config: CloudWatchMetricsSinkConfig,
228 client: CloudwatchClient,
229 ) -> crate::Result<VectorSink> {
230 let default_namespace = config.default_namespace.clone();
231 let batch = config.batch.into_batch_settings()?;
232 let request_settings = config.request.into_settings();
233
234 let service = CloudWatchMetricsSvc { client };
235 let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size));
236 let mut normalizer = MetricNormalizer::<AwsCloudwatchMetricNormalize>::default();
237
238 let sink = request_settings
239 .partition_sink(CloudWatchMetricsRetryLogic, service, buffer, batch.timeout)
240 .sink_map_err(|error| error!(message = "Fatal CloudwatchMetrics sink error.", %error))
241 .with_flat_map(move |event: Event| {
242 stream::iter({
243 let byte_size = event.allocated_bytes();
244 let json_byte_size = event.estimated_json_encoded_size_of();
245 normalizer.normalize(event.into_metric()).map(|mut metric| {
246 let namespace = metric
247 .take_namespace()
248 .unwrap_or_else(|| default_namespace.clone());
249 Ok(EncodedEvent::new(
250 PartitionInnerBuffer::new(metric, namespace),
251 byte_size,
252 json_byte_size,
253 ))
254 })
255 })
256 });
257
258 #[allow(deprecated)]
259 Ok(VectorSink::from_event_sink(sink))
260 }
261
262 fn encode_events(&mut self, events: Vec<Metric>) -> Vec<MetricDatum> {
263 events
264 .into_iter()
265 .filter_map(|event| {
266 let metric_name = event.name().to_string();
267 let timestamp = event
268 .timestamp()
269 .map(|x| AwsDateTime::from_millis(x.timestamp_millis()));
270 let dimensions = event.tags().map(tags_to_dimensions);
271 match event.value() {
273 MetricValue::Counter { value } => Some(
274 MetricDatum::builder()
275 .metric_name(metric_name)
276 .value(*value)
277 .set_timestamp(timestamp)
278 .set_dimensions(dimensions)
279 .build(),
280 ),
281 MetricValue::Distribution {
282 samples,
283 statistic: _,
284 } => Some(
285 MetricDatum::builder()
286 .metric_name(metric_name)
287 .set_values(Some(samples.iter().map(|s| s.value).collect()))
288 .set_counts(Some(samples.iter().map(|s| s.rate as f64).collect()))
289 .set_timestamp(timestamp)
290 .set_dimensions(dimensions)
291 .build(),
292 ),
293 MetricValue::Set { values } => Some(
294 MetricDatum::builder()
295 .metric_name(metric_name)
296 .value(values.len() as f64)
297 .set_timestamp(timestamp)
298 .set_dimensions(dimensions)
299 .build(),
300 ),
301 MetricValue::Gauge { value } => Some(
302 MetricDatum::builder()
303 .metric_name(metric_name)
304 .value(*value)
305 .set_timestamp(timestamp)
306 .set_dimensions(dimensions)
307 .build(),
308 ),
309 _ => None,
310 }
311 })
312 .collect()
313 }
314}
315
316impl Service<PartitionInnerBuffer<Vec<Metric>, String>> for CloudWatchMetricsSvc {
317 type Response = ();
318 type Error = SdkError<PutMetricDataError>;
319 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
320
321 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
323 Poll::Ready(Ok(()))
324 }
325
326 fn call(&mut self, items: PartitionInnerBuffer<Vec<Metric>, String>) -> Self::Future {
328 let (items, namespace) = items.into_parts();
329 let metric_data = self.encode_events(items);
330 if metric_data.is_empty() {
331 return future::ok(()).boxed();
332 }
333
334 let client = self.client.clone();
335
336 Box::pin(async move {
337 client
338 .put_metric_data()
339 .namespace(namespace)
340 .set_metric_data(Some(metric_data))
341 .send()
342 .await?;
343 Ok(())
344 })
345 }
346}