vector/sinks/aws_cloudwatch_metrics/
mod.rs1#[cfg(all(test, feature = "aws-cloudwatch-metrics-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6use std::task::{Context, Poll};
7
8use aws_config::Region;
9use aws_sdk_cloudwatch::{
10 Client as CloudwatchClient,
11 error::SdkError,
12 operation::put_metric_data::PutMetricDataError,
13 types::{Dimension, MetricDatum},
14};
15use aws_smithy_types::DateTime as AwsDateTime;
16use futures::{FutureExt, SinkExt, stream};
17use futures_util::{future, future::BoxFuture};
18use tower::Service;
19use vector_lib::{
20 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component, sink::VectorSink,
21};
22
23use super::util::service::TowerRequestConfigDefaults;
24use crate::{
25 aws::{
26 ClientBuilder, RegionOrEndpoint, auth::AwsAuthentication, create_client, is_retriable_error,
27 },
28 config::{AcknowledgementsConfig, Input, ProxyConfig, SinkConfig, SinkContext},
29 event::{
30 Event,
31 metric::{Metric, MetricTags, MetricValue},
32 },
33 sinks::util::{
34 Compression, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, SinkBatchSettings,
35 TowerRequestConfig,
36 batch::BatchConfig,
37 buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
38 retries::RetryLogic,
39 },
40 tls::TlsConfig,
41};
42
43#[derive(Clone, Copy, Debug, Default)]
44pub struct CloudWatchMetricsDefaultBatchSettings;
45
46impl SinkBatchSettings for CloudWatchMetricsDefaultBatchSettings {
47 const MAX_EVENTS: Option<usize> = Some(20);
48 const MAX_BYTES: Option<usize> = None;
49 const TIMEOUT_SECS: f64 = 1.0;
50}
51
52#[derive(Clone, Copy, Debug)]
53pub struct CloudWatchMetricsTowerRequestConfigDefaults;
54
55impl TowerRequestConfigDefaults for CloudWatchMetricsTowerRequestConfigDefaults {
56 const RATE_LIMIT_NUM: u64 = 150;
57}
58
59#[configurable_component(sink(
61 "aws_cloudwatch_metrics",
62 "Publish metric events to AWS CloudWatch Metrics."
63))]
64#[derive(Clone, Debug, Default)]
65#[serde(deny_unknown_fields)]
66pub struct CloudWatchMetricsSinkConfig {
67 #[serde(alias = "namespace")]
74 #[configurable(metadata(docs::examples = "service"))]
75 pub default_namespace: String,
76
77 #[serde(flatten)]
81 pub region: RegionOrEndpoint,
82
83 #[configurable(derived)]
84 #[serde(default)]
85 pub compression: Compression,
86
87 #[configurable(derived)]
88 #[serde(default)]
89 pub batch: BatchConfig<CloudWatchMetricsDefaultBatchSettings>,
90
91 #[configurable(derived)]
92 #[serde(default)]
93 pub request: TowerRequestConfig<CloudWatchMetricsTowerRequestConfigDefaults>,
94
95 #[configurable(derived)]
96 pub tls: Option<TlsConfig>,
97
98 #[configurable(deprecated)]
102 #[configurable(metadata(docs::hidden))]
103 assume_role: Option<String>,
104
105 #[configurable(derived)]
106 #[serde(default)]
107 pub auth: AwsAuthentication,
108
109 #[configurable(derived)]
110 #[serde(
111 default,
112 deserialize_with = "crate::serde::bool_or_struct",
113 skip_serializing_if = "crate::serde::is_default"
114 )]
115 acknowledgements: AcknowledgementsConfig,
116}
117
118impl_generate_config_from_default!(CloudWatchMetricsSinkConfig);
119
120struct CloudwatchMetricsClientBuilder;
121
122impl ClientBuilder for CloudwatchMetricsClientBuilder {
123 type Client = aws_sdk_cloudwatch::client::Client;
124
125 fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
126 aws_sdk_cloudwatch::client::Client::new(config)
127 }
128}
129
130#[async_trait::async_trait]
131#[typetag::serde(name = "aws_cloudwatch_metrics")]
132impl SinkConfig for CloudWatchMetricsSinkConfig {
133 async fn build(
134 &self,
135 cx: SinkContext,
136 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
137 let client = self.create_client(&cx.proxy).await?;
138 let healthcheck = self.clone().healthcheck(client.clone()).boxed();
139 let sink = CloudWatchMetricsSvc::new(self.clone(), client)?;
140 Ok((sink, healthcheck))
141 }
142
143 fn input(&self) -> Input {
144 Input::metric()
145 }
146
147 fn acknowledgements(&self) -> &AcknowledgementsConfig {
148 &self.acknowledgements
149 }
150}
151
152impl CloudWatchMetricsSinkConfig {
153 async fn healthcheck(self, client: CloudwatchClient) -> crate::Result<()> {
154 client
155 .put_metric_data()
156 .metric_data(
157 MetricDatum::builder()
158 .metric_name("healthcheck")
159 .value(1.0)
160 .build(),
161 )
162 .namespace(&self.default_namespace)
163 .send()
164 .await?;
165
166 Ok(())
167 }
168
169 async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchClient> {
170 let region = if cfg!(test) {
171 Some(Region::new("us-east-1"))
173 } else {
174 self.region.region()
175 };
176
177 create_client::<CloudwatchMetricsClientBuilder>(
178 &CloudwatchMetricsClientBuilder {},
179 &self.auth,
180 region,
181 self.region.endpoint(),
182 proxy,
183 self.tls.as_ref(),
184 None,
185 )
186 .await
187 }
188}
189
190#[derive(Default)]
191struct AwsCloudwatchMetricNormalize;
192
193impl MetricNormalize for AwsCloudwatchMetricNormalize {
194 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
195 match metric.value() {
196 MetricValue::Gauge { .. } => state.make_absolute(metric),
197 _ => state.make_incremental(metric),
198 }
199 }
200}
201
202#[derive(Debug, Clone)]
203struct CloudWatchMetricsRetryLogic;
204
205impl RetryLogic for CloudWatchMetricsRetryLogic {
206 type Error = SdkError<PutMetricDataError>;
207 type Request = PartitionInnerBuffer<Vec<Metric>, String>;
208 type Response = ();
209
210 fn is_retriable_error(&self, error: &Self::Error) -> bool {
211 is_retriable_error(error)
212 }
213}
214
215fn tags_to_dimensions(tags: &MetricTags) -> Vec<Dimension> {
216 tags.iter_single()
218 .take(30)
219 .map(|(k, v)| Dimension::builder().name(k).value(v).build())
220 .collect()
221}
222
223#[derive(Clone)]
224pub struct CloudWatchMetricsSvc {
225 client: CloudwatchClient,
226}
227
228impl CloudWatchMetricsSvc {
229 pub fn new(
230 config: CloudWatchMetricsSinkConfig,
231 client: CloudwatchClient,
232 ) -> crate::Result<VectorSink> {
233 let default_namespace = config.default_namespace.clone();
234 let batch = config.batch.into_batch_settings()?;
235 let request_settings = config.request.into_settings();
236
237 let service = CloudWatchMetricsSvc { client };
238 let buffer = PartitionBuffer::new(MetricsBuffer::new(batch.size));
239 let mut normalizer = MetricNormalizer::<AwsCloudwatchMetricNormalize>::default();
240
241 let sink = request_settings
242 .partition_sink(CloudWatchMetricsRetryLogic, service, buffer, batch.timeout)
243 .sink_map_err(|error| error!(message = "Fatal CloudwatchMetrics sink error.", %error, internal_log_rate_limit = false))
244 .with_flat_map(move |event: Event| {
245 stream::iter({
246 let byte_size = event.allocated_bytes();
247 let json_byte_size = event.estimated_json_encoded_size_of();
248 normalizer.normalize(event.into_metric()).map(|mut metric| {
249 let namespace = metric
250 .take_namespace()
251 .unwrap_or_else(|| default_namespace.clone());
252 Ok(EncodedEvent::new(
253 PartitionInnerBuffer::new(metric, namespace),
254 byte_size,
255 json_byte_size,
256 ))
257 })
258 })
259 });
260
261 #[allow(deprecated)]
262 Ok(VectorSink::from_event_sink(sink))
263 }
264
265 fn encode_events(&mut self, events: Vec<Metric>) -> Vec<MetricDatum> {
266 events
267 .into_iter()
268 .filter_map(|event| {
269 let metric_name = event.name().to_string();
270 let timestamp = event
271 .timestamp()
272 .map(|x| AwsDateTime::from_millis(x.timestamp_millis()));
273 let dimensions = event.tags().map(tags_to_dimensions);
274 match event.value() {
276 MetricValue::Counter { value } => Some(
277 MetricDatum::builder()
278 .metric_name(metric_name)
279 .value(*value)
280 .set_timestamp(timestamp)
281 .set_dimensions(dimensions)
282 .build(),
283 ),
284 MetricValue::Distribution {
285 samples,
286 statistic: _,
287 } => Some(
288 MetricDatum::builder()
289 .metric_name(metric_name)
290 .set_values(Some(samples.iter().map(|s| s.value).collect()))
291 .set_counts(Some(samples.iter().map(|s| s.rate as f64).collect()))
292 .set_timestamp(timestamp)
293 .set_dimensions(dimensions)
294 .build(),
295 ),
296 MetricValue::Set { values } => Some(
297 MetricDatum::builder()
298 .metric_name(metric_name)
299 .value(values.len() as f64)
300 .set_timestamp(timestamp)
301 .set_dimensions(dimensions)
302 .build(),
303 ),
304 MetricValue::Gauge { value } => Some(
305 MetricDatum::builder()
306 .metric_name(metric_name)
307 .value(*value)
308 .set_timestamp(timestamp)
309 .set_dimensions(dimensions)
310 .build(),
311 ),
312 _ => None,
313 }
314 })
315 .collect()
316 }
317}
318
319impl Service<PartitionInnerBuffer<Vec<Metric>, String>> for CloudWatchMetricsSvc {
320 type Response = ();
321 type Error = SdkError<PutMetricDataError>;
322 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
323
324 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
326 Poll::Ready(Ok(()))
327 }
328
329 fn call(&mut self, items: PartitionInnerBuffer<Vec<Metric>, String>) -> Self::Future {
331 let (items, namespace) = items.into_parts();
332 let metric_data = self.encode_events(items);
333 if metric_data.is_empty() {
334 return future::ok(()).boxed();
335 }
336
337 let client = self.client.clone();
338
339 Box::pin(async move {
340 client
341 .put_metric_data()
342 .namespace(namespace)
343 .set_metric_data(Some(metric_data))
344 .send()
345 .await?;
346 Ok(())
347 })
348 }
349}