vector/sources/aws_ecs_metrics/mod.rs
1use std::{env, time::Duration};
2
3use futures::StreamExt;
4use http_body::Collected;
5use hyper::{Body, Request};
6use serde_with::serde_as;
7use tokio::time;
8use tokio_stream::wrappers::IntervalStream;
9use vector_lib::{
10 EstimatedJsonEncodedSizeOf,
11 config::LogNamespace,
12 configurable::configurable_component,
13 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
14};
15
16use crate::{
17 SourceSender,
18 config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
19 http::HttpClient,
20 internal_events::{
21 AwsEcsMetricsEventsReceived, AwsEcsMetricsParseError, HttpClientHttpError,
22 HttpClientHttpResponseError, StreamClosedError,
23 },
24 shutdown::ShutdownSignal,
25};
26
27mod parser;
28
29/// Version of the AWS ECS task metadata endpoint to use.
30///
31/// More information about the different versions can be found
32/// [here][meta_endpoint].
33///
34/// [meta_endpoint]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html
35#[configurable_component]
36#[derive(Clone, Debug)]
37#[serde(rename_all = "lowercase")]
38pub enum Version {
39 /// Version 2.
40 ///
41 /// More information about version 2 of the task metadata endpoint can be found [here][endpoint_v2].
42 ///
43 /// [endpoint_v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
44 V2,
45 /// Version 3.
46 ///
47 /// More information about version 3 of the task metadata endpoint can be found [here][endpoint_v3].
48 ///
49 /// [endpoint_v3]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
50 V3,
51 /// Version 4.
52 ///
53 /// More information about version 4 of the task metadata endpoint can be found [here][endpoint_v4].
54 ///
55 /// [endpoint_v4]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html
56 V4,
57}
58
59/// Configuration for the `aws_ecs_metrics` source.
60#[serde_as]
61#[configurable_component(source(
62 "aws_ecs_metrics",
63 "Collect Docker container stats for tasks running in AWS ECS and AWS Fargate."
64))]
65#[derive(Clone, Debug)]
66#[serde(deny_unknown_fields)]
67pub struct AwsEcsMetricsSourceConfig {
68 /// Base URI of the task metadata endpoint.
69 ///
70 /// If empty, the URI is automatically discovered based on the latest version detected.
71 ///
72 /// By default:
73 /// - The version 4 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI_V4`.
74 /// - The version 3 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI`.
75 /// - The version 2 endpoint base URI is `169.254.170.2/v2/`.
76 #[serde(default = "default_endpoint")]
77 endpoint: String,
78
79 /// The version of the task metadata endpoint to use.
80 ///
81 /// If empty, the version is automatically discovered based on environment variables.
82 ///
83 /// By default:
84 /// - Version 4 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is defined.
85 /// - Version 3 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is not defined, but the
86 /// environment variable `ECS_CONTAINER_METADATA_URI` _is_ defined.
87 /// - Version 2 is used if neither of the environment variables `ECS_CONTAINER_METADATA_URI_V4` or
88 /// `ECS_CONTAINER_METADATA_URI` are defined.
89 #[serde(default = "default_version")]
90 version: Version,
91
92 /// The interval between scrapes, in seconds.
93 #[serde(default = "default_scrape_interval_secs")]
94 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
95 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
96 scrape_interval_secs: Duration,
97
98 /// The namespace of the metric.
99 ///
100 /// Disabled if empty.
101 #[serde(default = "default_namespace")]
102 namespace: String,
103}
104
105const METADATA_URI_V4: &str = "ECS_CONTAINER_METADATA_URI";
106const METADATA_URI_V3: &str = "ECS_CONTAINER_METADATA_URI_V4";
107
108pub fn default_endpoint() -> String {
109 env::var(METADATA_URI_V4)
110 .or_else(|_| env::var(METADATA_URI_V3))
111 .unwrap_or_else(|_| "http://169.254.170.2/v2".into())
112}
113
114pub fn default_version() -> Version {
115 if env::var(METADATA_URI_V4).is_ok() {
116 Version::V4
117 } else if env::var(METADATA_URI_V3).is_ok() {
118 Version::V3
119 } else {
120 Version::V2
121 }
122}
123
124pub const fn default_scrape_interval_secs() -> Duration {
125 Duration::from_secs(15)
126}
127
128pub fn default_namespace() -> String {
129 "awsecs".to_string()
130}
131
132impl AwsEcsMetricsSourceConfig {
133 fn stats_endpoint(&self) -> String {
134 match self.version {
135 Version::V2 => format!("{}/stats", self.endpoint),
136 _ => format!("{}/task/stats", self.endpoint),
137 }
138 }
139}
140
141impl GenerateConfig for AwsEcsMetricsSourceConfig {
142 fn generate_config() -> toml::Value {
143 toml::Value::try_from(Self {
144 endpoint: default_endpoint(),
145 version: default_version(),
146 scrape_interval_secs: default_scrape_interval_secs(),
147 namespace: default_namespace(),
148 })
149 .unwrap()
150 }
151}
152
153#[async_trait::async_trait]
154#[typetag::serde(name = "aws_ecs_metrics")]
155impl SourceConfig for AwsEcsMetricsSourceConfig {
156 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
157 let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
158 let http_client = HttpClient::new(None, &cx.proxy)?;
159
160 Ok(Box::pin(aws_ecs_metrics(
161 http_client,
162 self.stats_endpoint(),
163 self.scrape_interval_secs,
164 namespace,
165 cx.out,
166 cx.shutdown,
167 )))
168 }
169
170 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
171 vec![SourceOutput::new_metrics()]
172 }
173
174 fn can_acknowledge(&self) -> bool {
175 false
176 }
177}
178
179async fn aws_ecs_metrics(
180 http_client: HttpClient,
181 url: String,
182 interval: Duration,
183 namespace: Option<String>,
184 mut out: SourceSender,
185 shutdown: ShutdownSignal,
186) -> Result<(), ()> {
187 let mut interval = IntervalStream::new(time::interval(interval)).take_until(shutdown);
188 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
189 while interval.next().await.is_some() {
190 let request = Request::get(&url)
191 .body(Body::empty())
192 .expect("error creating request");
193 let uri = request.uri().clone();
194
195 match http_client.send(request).await {
196 Ok(response) if response.status() == hyper::StatusCode::OK => {
197 match http_body::Body::collect(response.into_body())
198 .await
199 .map(Collected::to_bytes)
200 {
201 Ok(body) => {
202 bytes_received.emit(ByteSize(body.len()));
203
204 match parser::parse(body.as_ref(), namespace.clone()) {
205 Ok(metrics) => {
206 let count = metrics.len();
207 emit!(AwsEcsMetricsEventsReceived {
208 byte_size: metrics.estimated_json_encoded_size_of(),
209 count,
210 endpoint: uri.path(),
211 });
212
213 if (out.send_batch(metrics).await).is_err() {
214 emit!(StreamClosedError { count });
215 return Err(());
216 }
217 }
218 Err(error) => {
219 emit!(AwsEcsMetricsParseError {
220 error,
221 endpoint: &url,
222 body: String::from_utf8_lossy(&body),
223 });
224 }
225 }
226 }
227 Err(error) => {
228 emit!(HttpClientHttpError {
229 error: crate::Error::from(error),
230 url: url.to_owned(),
231 });
232 }
233 }
234 }
235 Ok(response) => {
236 emit!(HttpClientHttpResponseError {
237 code: response.status(),
238 url: url.to_owned(),
239 });
240 }
241 Err(error) => {
242 emit!(HttpClientHttpError {
243 error: crate::Error::from(error),
244 url: url.to_owned(),
245 });
246 }
247 }
248 }
249
250 Ok(())
251}
252
253#[cfg(test)]
254mod test {
255 use hyper::{
256 Body, Response, Server,
257 service::{make_service_fn, service_fn},
258 };
259 use tokio::time::Duration;
260
261 use super::*;
262 use crate::{
263 Error,
264 event::MetricValue,
265 test_util::{
266 addr::next_addr,
267 components::{SOURCE_TAGS, run_and_assert_source_compliance},
268 wait_for_tcp,
269 },
270 };
271
272 #[tokio::test]
273 async fn test_aws_ecs_metrics_source() {
274 let (_guard, in_addr) = next_addr();
275
276 let make_svc = make_service_fn(|_| async {
277 Ok::<_, Error>(service_fn(|_| async {
278 Ok::<_, Error>(Response::new(Body::from(
279 r#"
280 {
281 "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590": {
282 "read": "2020-09-23T20:32:26.292561674Z",
283 "preread": "2020-09-23T20:32:21.290708273Z",
284 "pids_stats": {},
285 "blkio_stats": {
286 "io_service_bytes_recursive": [],
287 "io_serviced_recursive": [],
288 "io_queue_recursive": [],
289 "io_service_time_recursive": [],
290 "io_wait_time_recursive": [],
291 "io_merged_recursive": [],
292 "io_time_recursive": [],
293 "sectors_recursive": []
294 },
295 "num_procs": 0,
296 "storage_stats": {},
297 "cpu_stats": {
298 "cpu_usage": {
299 "total_usage": 863993897,
300 "percpu_usage": [
301 607511353,
302 256482544,
303 0,
304 0,
305 0,
306 0,
307 0,
308 0,
309 0,
310 0,
311 0,
312 0,
313 0,
314 0,
315 0
316 ],
317 "usage_in_kernelmode": 80000000,
318 "usage_in_usermode": 610000000
319 },
320 "system_cpu_usage": 2007100000000,
321 "online_cpus": 2,
322 "throttling_data": {
323 "periods": 0,
324 "throttled_periods": 0,
325 "throttled_time": 0
326 }
327 },
328 "precpu_stats": {
329 "cpu_usage": {
330 "total_usage": 0,
331 "usage_in_kernelmode": 0,
332 "usage_in_usermode": 0
333 },
334 "throttling_data": {
335 "periods": 0,
336 "throttled_periods": 0,
337 "throttled_time": 0
338 }
339 },
340 "memory_stats": {
341 "usage": 39931904,
342 "max_usage": 40054784,
343 "stats": {
344 "active_anon": 37457920,
345 "active_file": 4096,
346 "cache": 4096,
347 "dirty": 0,
348 "hierarchical_memory_limit": 536870912,
349 "hierarchical_memsw_limit": 9223372036854771712,
350 "inactive_anon": 0,
351 "inactive_file": 0,
352 "mapped_file": 0,
353 "pgfault": 15745,
354 "pgmajfault": 0,
355 "pgpgin": 12086,
356 "pgpgout": 2940,
357 "rss": 37457920,
358 "rss_huge": 0,
359 "total_active_anon": 37457920,
360 "total_active_file": 4096,
361 "total_cache": 4096,
362 "total_dirty": 0,
363 "total_inactive_anon": 0,
364 "total_inactive_file": 0,
365 "total_mapped_file": 0,
366 "total_pgfault": 15745,
367 "total_pgmajfault": 0,
368 "total_pgpgin": 12086,
369 "total_pgpgout": 2940,
370 "total_rss": 37457920,
371 "total_rss_huge": 0,
372 "total_unevictable": 0,
373 "total_writeback": 0,
374 "unevictable": 0,
375 "writeback": 0
376 },
377 "limit": 9223372036854771712
378 },
379 "name": "vector1",
380 "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590",
381 "networks": {
382 "eth1": {
383 "rx_bytes": 329932716,
384 "rx_packets": 224158,
385 "rx_errors": 0,
386 "rx_dropped": 0,
387 "tx_bytes": 2001229,
388 "tx_packets": 29201,
389 "tx_errors": 0,
390 "tx_dropped": 0
391 }
392 }
393 },
394 "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352": {
395 "read": "2020-09-23T20:32:26.314100759Z",
396 "preread": "2020-09-23T20:32:21.315056862Z",
397 "pids_stats": {},
398 "blkio_stats": {
399 "io_service_bytes_recursive": [
400 {
401 "major": 202,
402 "minor": 26368,
403 "op": "Read",
404 "value": 0
405 },
406 {
407 "major": 202,
408 "minor": 26368,
409 "op": "Write",
410 "value": 520192
411 },
412 {
413 "major": 202,
414 "minor": 26368,
415 "op": "Sync",
416 "value": 516096
417 },
418 {
419 "major": 202,
420 "minor": 26368,
421 "op": "Async",
422 "value": 4096
423 },
424 {
425 "major": 202,
426 "minor": 26368,
427 "op": "Total",
428 "value": 520192
429 }
430 ],
431 "io_serviced_recursive": [
432 {
433 "major": 202,
434 "minor": 26368,
435 "op": "Read",
436 "value": 0
437 },
438 {
439 "major": 202,
440 "minor": 26368,
441 "op": "Write",
442 "value": 10
443 },
444 {
445 "major": 202,
446 "minor": 26368,
447 "op": "Sync",
448 "value": 9
449 },
450 {
451 "major": 202,
452 "minor": 26368,
453 "op": "Async",
454 "value": 1
455 },
456 {
457 "major": 202,
458 "minor": 26368,
459 "op": "Total",
460 "value": 10
461 }
462 ],
463 "io_queue_recursive": [],
464 "io_service_time_recursive": [],
465 "io_wait_time_recursive": [],
466 "io_merged_recursive": [],
467 "io_time_recursive": [],
468 "sectors_recursive": []
469 },
470 "num_procs": 0,
471 "storage_stats": {},
472 "cpu_stats": {
473 "cpu_usage": {
474 "total_usage": 2324920942,
475 "percpu_usage": [
476 1095931487,
477 1228989455,
478 0,
479 0,
480 0,
481 0,
482 0,
483 0,
484 0,
485 0,
486 0,
487 0,
488 0,
489 0,
490 0
491 ],
492 "usage_in_kernelmode": 190000000,
493 "usage_in_usermode": 510000000
494 },
495 "system_cpu_usage": 2007130000000,
496 "online_cpus": 2,
497 "throttling_data": {
498 "periods": 0,
499 "throttled_periods": 0,
500 "throttled_time": 0
501 }
502 },
503 "precpu_stats": {
504 "cpu_usage": {
505 "total_usage": 0,
506 "usage_in_kernelmode": 0,
507 "usage_in_usermode": 0
508 },
509 "throttling_data": {
510 "periods": 0,
511 "throttled_periods": 0,
512 "throttled_time": 0
513 }
514 },
515 "memory_stats": {
516 "usage": 40120320,
517 "max_usage": 47177728,
518 "stats": {
519 "active_anon": 34885632,
520 "active_file": 65536,
521 "cache": 413696,
522 "dirty": 0,
523 "hierarchical_memory_limit": 536870912,
524 "hierarchical_memsw_limit": 9223372036854771712,
525 "inactive_anon": 4096,
526 "inactive_file": 344064,
527 "mapped_file": 4096,
528 "pgfault": 31131,
529 "pgmajfault": 0,
530 "pgpgin": 22360,
531 "pgpgout": 13742,
532 "rss": 34885632,
533 "rss_huge": 0,
534 "total_active_anon": 34885632,
535 "total_active_file": 65536,
536 "total_cache": 413696,
537 "total_dirty": 0,
538 "total_inactive_anon": 4096,
539 "total_inactive_file": 344064,
540 "total_mapped_file": 4096,
541 "total_pgfault": 31131,
542 "total_pgmajfault": 0,
543 "total_pgpgin": 22360,
544 "total_pgpgout": 13742,
545 "total_rss": 34885632,
546 "total_rss_huge": 0,
547 "total_unevictable": 0,
548 "total_writeback": 0,
549 "unevictable": 0,
550 "writeback": 0
551 },
552 "limit": 9223372036854771712
553 },
554 "name": "vector2",
555 "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352",
556 "networks": {
557 "eth1": {
558 "rx_bytes": 329932716,
559 "rx_packets": 224158,
560 "rx_errors": 0,
561 "rx_dropped": 0,
562 "tx_bytes": 2001229,
563 "tx_packets": 29201,
564 "tx_errors": 0,
565 "tx_dropped": 0
566 }
567 }
568 }
569 }
570 "#,
571 )))
572 }))
573 });
574
575 tokio::spawn(async move {
576 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
577 error!(message = "Server error.", %error);
578 }
579 });
580 wait_for_tcp(in_addr).await;
581
582 let config = AwsEcsMetricsSourceConfig {
583 endpoint: format!("http://{in_addr}"),
584 version: Version::V4,
585 scrape_interval_secs: Duration::from_secs(1),
586 namespace: default_namespace(),
587 };
588
589 let events =
590 run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
591 assert!(!events.is_empty());
592
593 let metrics = events
594 .into_iter()
595 .map(|e| e.into_metric())
596 .collect::<Vec<_>>();
597
598 match metrics
599 .iter()
600 .find(|m| m.name() == "network_receive_bytes_total")
601 {
602 Some(m) => {
603 assert_eq!(m.value(), &MetricValue::Counter { value: 329932716.0 });
604 assert_eq!(m.namespace(), Some("awsecs"));
605
606 match m.tags() {
607 Some(tags) => assert_eq!(tags.get("device"), Some("eth1")),
608 None => panic!("No tags for metric. {m:?}"),
609 }
610 }
611 None => panic!("Could not find 'network_receive_bytes_total' in {metrics:?}."),
612 }
613 }
614}
615
616#[cfg(feature = "aws-ecs-metrics-integration-tests")]
617#[cfg(test)]
618mod integration_tests {
619 use tokio::time::Duration;
620
621 use super::*;
622 use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
623
624 fn ecs_address() -> String {
625 env::var("ECS_ADDRESS").unwrap_or_else(|_| "http://localhost:9088".into())
626 }
627
628 fn ecs_url(version: &str) -> String {
629 format!("{}/{}", ecs_address(), version)
630 }
631
632 async fn scrape_metrics(endpoint: String, version: Version) {
633 let config = AwsEcsMetricsSourceConfig {
634 endpoint,
635 version,
636 scrape_interval_secs: Duration::from_secs(1),
637 namespace: default_namespace(),
638 };
639
640 let events =
641 run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
642 assert!(!events.is_empty());
643 }
644
645 #[tokio::test]
646 async fn scrapes_metrics_v2() {
647 scrape_metrics(ecs_url("v2"), Version::V2).await;
648 }
649
650 #[tokio::test]
651 async fn scrapes_metrics_v3() {
652 scrape_metrics(ecs_url("v3"), Version::V3).await;
653 }
654
655 #[tokio::test]
656 async fn scrapes_metrics_v4() {
657 // mock uses same endpoint for v4 as v3
658 // https://github.com/awslabs/amazon-ecs-local-container-endpoints/blob/mainline/docs/features.md#task-metadata-v4
659 scrape_metrics(ecs_url("v3"), Version::V4).await;
660 }
661}