vector/sources/aws_ecs_metrics/mod.rs
1use std::{env, time::Duration};
2
3use futures::StreamExt;
4use hyper::{Body, Request};
5use serde_with::serde_as;
6use tokio::time;
7use tokio_stream::wrappers::IntervalStream;
8use vector_lib::{
9 EstimatedJsonEncodedSizeOf,
10 config::LogNamespace,
11 configurable::configurable_component,
12 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
13};
14
15use crate::{
16 SourceSender,
17 config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
18 http::HttpClient,
19 internal_events::{
20 AwsEcsMetricsEventsReceived, AwsEcsMetricsParseError, HttpClientHttpError,
21 HttpClientHttpResponseError, StreamClosedError,
22 },
23 shutdown::ShutdownSignal,
24};
25
26mod parser;
27
28/// Version of the AWS ECS task metadata endpoint to use.
29///
30/// More information about the different versions can be found
31/// [here][meta_endpoint].
32///
33/// [meta_endpoint]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html
34#[configurable_component]
35#[derive(Clone, Debug)]
36#[serde(rename_all = "lowercase")]
37pub enum Version {
38 /// Version 2.
39 ///
40 /// More information about version 2 of the task metadata endpoint can be found [here][endpoint_v2].
41 ///
42 /// [endpoint_v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
43 V2,
44 /// Version 3.
45 ///
46 /// More information about version 3 of the task metadata endpoint can be found [here][endpoint_v3].
47 ///
48 /// [endpoint_v3]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
49 V3,
50 /// Version 4.
51 ///
52 /// More information about version 4 of the task metadata endpoint can be found [here][endpoint_v4].
53 ///
54 /// [endpoint_v4]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html
55 V4,
56}
57
58/// Configuration for the `aws_ecs_metrics` source.
59#[serde_as]
60#[configurable_component(source(
61 "aws_ecs_metrics",
62 "Collect Docker container stats for tasks running in AWS ECS and AWS Fargate."
63))]
64#[derive(Clone, Debug)]
65#[serde(deny_unknown_fields)]
66pub struct AwsEcsMetricsSourceConfig {
67 /// Base URI of the task metadata endpoint.
68 ///
69 /// If empty, the URI is automatically discovered based on the latest version detected.
70 ///
71 /// By default:
72 /// - The version 4 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI_V4`.
73 /// - The version 3 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI`.
74 /// - The version 2 endpoint base URI is `169.254.170.2/v2/`.
75 #[serde(default = "default_endpoint")]
76 endpoint: String,
77
78 /// The version of the task metadata endpoint to use.
79 ///
80 /// If empty, the version is automatically discovered based on environment variables.
81 ///
82 /// By default:
83 /// - Version 4 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is defined.
84 /// - Version 3 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is not defined, but the
85 /// environment variable `ECS_CONTAINER_METADATA_URI` _is_ defined.
86 /// - Version 2 is used if neither of the environment variables `ECS_CONTAINER_METADATA_URI_V4` or
87 /// `ECS_CONTAINER_METADATA_URI` are defined.
88 #[serde(default = "default_version")]
89 version: Version,
90
91 /// The interval between scrapes, in seconds.
92 #[serde(default = "default_scrape_interval_secs")]
93 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
94 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
95 scrape_interval_secs: Duration,
96
97 /// The namespace of the metric.
98 ///
99 /// Disabled if empty.
100 #[serde(default = "default_namespace")]
101 namespace: String,
102}
103
104const METADATA_URI_V4: &str = "ECS_CONTAINER_METADATA_URI";
105const METADATA_URI_V3: &str = "ECS_CONTAINER_METADATA_URI_V4";
106
107pub fn default_endpoint() -> String {
108 env::var(METADATA_URI_V4)
109 .or_else(|_| env::var(METADATA_URI_V3))
110 .unwrap_or_else(|_| "http://169.254.170.2/v2".into())
111}
112
113pub fn default_version() -> Version {
114 if env::var(METADATA_URI_V4).is_ok() {
115 Version::V4
116 } else if env::var(METADATA_URI_V3).is_ok() {
117 Version::V3
118 } else {
119 Version::V2
120 }
121}
122
123pub const fn default_scrape_interval_secs() -> Duration {
124 Duration::from_secs(15)
125}
126
127pub fn default_namespace() -> String {
128 "awsecs".to_string()
129}
130
131impl AwsEcsMetricsSourceConfig {
132 fn stats_endpoint(&self) -> String {
133 match self.version {
134 Version::V2 => format!("{}/stats", self.endpoint),
135 _ => format!("{}/task/stats", self.endpoint),
136 }
137 }
138}
139
140impl GenerateConfig for AwsEcsMetricsSourceConfig {
141 fn generate_config() -> toml::Value {
142 toml::Value::try_from(Self {
143 endpoint: default_endpoint(),
144 version: default_version(),
145 scrape_interval_secs: default_scrape_interval_secs(),
146 namespace: default_namespace(),
147 })
148 .unwrap()
149 }
150}
151
152#[async_trait::async_trait]
153#[typetag::serde(name = "aws_ecs_metrics")]
154impl SourceConfig for AwsEcsMetricsSourceConfig {
155 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
156 let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
157 let http_client = HttpClient::new(None, &cx.proxy)?;
158
159 Ok(Box::pin(aws_ecs_metrics(
160 http_client,
161 self.stats_endpoint(),
162 self.scrape_interval_secs,
163 namespace,
164 cx.out,
165 cx.shutdown,
166 )))
167 }
168
169 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
170 vec![SourceOutput::new_metrics()]
171 }
172
173 fn can_acknowledge(&self) -> bool {
174 false
175 }
176}
177
178async fn aws_ecs_metrics(
179 http_client: HttpClient,
180 url: String,
181 interval: Duration,
182 namespace: Option<String>,
183 mut out: SourceSender,
184 shutdown: ShutdownSignal,
185) -> Result<(), ()> {
186 let mut interval = IntervalStream::new(time::interval(interval)).take_until(shutdown);
187 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
188 while interval.next().await.is_some() {
189 let request = Request::get(&url)
190 .body(Body::empty())
191 .expect("error creating request");
192 let uri = request.uri().clone();
193
194 match http_client.send(request).await {
195 Ok(response) if response.status() == hyper::StatusCode::OK => {
196 match hyper::body::to_bytes(response).await {
197 Ok(body) => {
198 bytes_received.emit(ByteSize(body.len()));
199
200 match parser::parse(body.as_ref(), namespace.clone()) {
201 Ok(metrics) => {
202 let count = metrics.len();
203 emit!(AwsEcsMetricsEventsReceived {
204 byte_size: metrics.estimated_json_encoded_size_of(),
205 count,
206 endpoint: uri.path(),
207 });
208
209 if (out.send_batch(metrics).await).is_err() {
210 emit!(StreamClosedError { count });
211 return Err(());
212 }
213 }
214 Err(error) => {
215 emit!(AwsEcsMetricsParseError {
216 error,
217 endpoint: &url,
218 body: String::from_utf8_lossy(&body),
219 });
220 }
221 }
222 }
223 Err(error) => {
224 emit!(HttpClientHttpError {
225 error: crate::Error::from(error),
226 url: url.to_owned(),
227 });
228 }
229 }
230 }
231 Ok(response) => {
232 emit!(HttpClientHttpResponseError {
233 code: response.status(),
234 url: url.to_owned(),
235 });
236 }
237 Err(error) => {
238 emit!(HttpClientHttpError {
239 error: crate::Error::from(error),
240 url: url.to_owned(),
241 });
242 }
243 }
244 }
245
246 Ok(())
247}
248
249#[cfg(test)]
250mod test {
251 use hyper::{
252 Body, Response, Server,
253 service::{make_service_fn, service_fn},
254 };
255 use tokio::time::Duration;
256
257 use super::*;
258 use crate::{
259 Error,
260 event::MetricValue,
261 test_util::{
262 components::{SOURCE_TAGS, run_and_assert_source_compliance},
263 next_addr, wait_for_tcp,
264 },
265 };
266
267 #[tokio::test]
268 async fn test_aws_ecs_metrics_source() {
269 let in_addr = next_addr();
270
271 let make_svc = make_service_fn(|_| async {
272 Ok::<_, Error>(service_fn(|_| async {
273 Ok::<_, Error>(Response::new(Body::from(
274 r#"
275 {
276 "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590": {
277 "read": "2020-09-23T20:32:26.292561674Z",
278 "preread": "2020-09-23T20:32:21.290708273Z",
279 "pids_stats": {},
280 "blkio_stats": {
281 "io_service_bytes_recursive": [],
282 "io_serviced_recursive": [],
283 "io_queue_recursive": [],
284 "io_service_time_recursive": [],
285 "io_wait_time_recursive": [],
286 "io_merged_recursive": [],
287 "io_time_recursive": [],
288 "sectors_recursive": []
289 },
290 "num_procs": 0,
291 "storage_stats": {},
292 "cpu_stats": {
293 "cpu_usage": {
294 "total_usage": 863993897,
295 "percpu_usage": [
296 607511353,
297 256482544,
298 0,
299 0,
300 0,
301 0,
302 0,
303 0,
304 0,
305 0,
306 0,
307 0,
308 0,
309 0,
310 0
311 ],
312 "usage_in_kernelmode": 80000000,
313 "usage_in_usermode": 610000000
314 },
315 "system_cpu_usage": 2007100000000,
316 "online_cpus": 2,
317 "throttling_data": {
318 "periods": 0,
319 "throttled_periods": 0,
320 "throttled_time": 0
321 }
322 },
323 "precpu_stats": {
324 "cpu_usage": {
325 "total_usage": 0,
326 "usage_in_kernelmode": 0,
327 "usage_in_usermode": 0
328 },
329 "throttling_data": {
330 "periods": 0,
331 "throttled_periods": 0,
332 "throttled_time": 0
333 }
334 },
335 "memory_stats": {
336 "usage": 39931904,
337 "max_usage": 40054784,
338 "stats": {
339 "active_anon": 37457920,
340 "active_file": 4096,
341 "cache": 4096,
342 "dirty": 0,
343 "hierarchical_memory_limit": 536870912,
344 "hierarchical_memsw_limit": 9223372036854771712,
345 "inactive_anon": 0,
346 "inactive_file": 0,
347 "mapped_file": 0,
348 "pgfault": 15745,
349 "pgmajfault": 0,
350 "pgpgin": 12086,
351 "pgpgout": 2940,
352 "rss": 37457920,
353 "rss_huge": 0,
354 "total_active_anon": 37457920,
355 "total_active_file": 4096,
356 "total_cache": 4096,
357 "total_dirty": 0,
358 "total_inactive_anon": 0,
359 "total_inactive_file": 0,
360 "total_mapped_file": 0,
361 "total_pgfault": 15745,
362 "total_pgmajfault": 0,
363 "total_pgpgin": 12086,
364 "total_pgpgout": 2940,
365 "total_rss": 37457920,
366 "total_rss_huge": 0,
367 "total_unevictable": 0,
368 "total_writeback": 0,
369 "unevictable": 0,
370 "writeback": 0
371 },
372 "limit": 9223372036854771712
373 },
374 "name": "vector1",
375 "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590",
376 "networks": {
377 "eth1": {
378 "rx_bytes": 329932716,
379 "rx_packets": 224158,
380 "rx_errors": 0,
381 "rx_dropped": 0,
382 "tx_bytes": 2001229,
383 "tx_packets": 29201,
384 "tx_errors": 0,
385 "tx_dropped": 0
386 }
387 }
388 },
389 "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352": {
390 "read": "2020-09-23T20:32:26.314100759Z",
391 "preread": "2020-09-23T20:32:21.315056862Z",
392 "pids_stats": {},
393 "blkio_stats": {
394 "io_service_bytes_recursive": [
395 {
396 "major": 202,
397 "minor": 26368,
398 "op": "Read",
399 "value": 0
400 },
401 {
402 "major": 202,
403 "minor": 26368,
404 "op": "Write",
405 "value": 520192
406 },
407 {
408 "major": 202,
409 "minor": 26368,
410 "op": "Sync",
411 "value": 516096
412 },
413 {
414 "major": 202,
415 "minor": 26368,
416 "op": "Async",
417 "value": 4096
418 },
419 {
420 "major": 202,
421 "minor": 26368,
422 "op": "Total",
423 "value": 520192
424 }
425 ],
426 "io_serviced_recursive": [
427 {
428 "major": 202,
429 "minor": 26368,
430 "op": "Read",
431 "value": 0
432 },
433 {
434 "major": 202,
435 "minor": 26368,
436 "op": "Write",
437 "value": 10
438 },
439 {
440 "major": 202,
441 "minor": 26368,
442 "op": "Sync",
443 "value": 9
444 },
445 {
446 "major": 202,
447 "minor": 26368,
448 "op": "Async",
449 "value": 1
450 },
451 {
452 "major": 202,
453 "minor": 26368,
454 "op": "Total",
455 "value": 10
456 }
457 ],
458 "io_queue_recursive": [],
459 "io_service_time_recursive": [],
460 "io_wait_time_recursive": [],
461 "io_merged_recursive": [],
462 "io_time_recursive": [],
463 "sectors_recursive": []
464 },
465 "num_procs": 0,
466 "storage_stats": {},
467 "cpu_stats": {
468 "cpu_usage": {
469 "total_usage": 2324920942,
470 "percpu_usage": [
471 1095931487,
472 1228989455,
473 0,
474 0,
475 0,
476 0,
477 0,
478 0,
479 0,
480 0,
481 0,
482 0,
483 0,
484 0,
485 0
486 ],
487 "usage_in_kernelmode": 190000000,
488 "usage_in_usermode": 510000000
489 },
490 "system_cpu_usage": 2007130000000,
491 "online_cpus": 2,
492 "throttling_data": {
493 "periods": 0,
494 "throttled_periods": 0,
495 "throttled_time": 0
496 }
497 },
498 "precpu_stats": {
499 "cpu_usage": {
500 "total_usage": 0,
501 "usage_in_kernelmode": 0,
502 "usage_in_usermode": 0
503 },
504 "throttling_data": {
505 "periods": 0,
506 "throttled_periods": 0,
507 "throttled_time": 0
508 }
509 },
510 "memory_stats": {
511 "usage": 40120320,
512 "max_usage": 47177728,
513 "stats": {
514 "active_anon": 34885632,
515 "active_file": 65536,
516 "cache": 413696,
517 "dirty": 0,
518 "hierarchical_memory_limit": 536870912,
519 "hierarchical_memsw_limit": 9223372036854771712,
520 "inactive_anon": 4096,
521 "inactive_file": 344064,
522 "mapped_file": 4096,
523 "pgfault": 31131,
524 "pgmajfault": 0,
525 "pgpgin": 22360,
526 "pgpgout": 13742,
527 "rss": 34885632,
528 "rss_huge": 0,
529 "total_active_anon": 34885632,
530 "total_active_file": 65536,
531 "total_cache": 413696,
532 "total_dirty": 0,
533 "total_inactive_anon": 4096,
534 "total_inactive_file": 344064,
535 "total_mapped_file": 4096,
536 "total_pgfault": 31131,
537 "total_pgmajfault": 0,
538 "total_pgpgin": 22360,
539 "total_pgpgout": 13742,
540 "total_rss": 34885632,
541 "total_rss_huge": 0,
542 "total_unevictable": 0,
543 "total_writeback": 0,
544 "unevictable": 0,
545 "writeback": 0
546 },
547 "limit": 9223372036854771712
548 },
549 "name": "vector2",
550 "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352",
551 "networks": {
552 "eth1": {
553 "rx_bytes": 329932716,
554 "rx_packets": 224158,
555 "rx_errors": 0,
556 "rx_dropped": 0,
557 "tx_bytes": 2001229,
558 "tx_packets": 29201,
559 "tx_errors": 0,
560 "tx_dropped": 0
561 }
562 }
563 }
564 }
565 "#,
566 )))
567 }))
568 });
569
570 tokio::spawn(async move {
571 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
572 error!(message = "Server error.", %error);
573 }
574 });
575 wait_for_tcp(in_addr).await;
576
577 let config = AwsEcsMetricsSourceConfig {
578 endpoint: format!("http://{in_addr}"),
579 version: Version::V4,
580 scrape_interval_secs: Duration::from_secs(1),
581 namespace: default_namespace(),
582 };
583
584 let events =
585 run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
586 assert!(!events.is_empty());
587
588 let metrics = events
589 .into_iter()
590 .map(|e| e.into_metric())
591 .collect::<Vec<_>>();
592
593 match metrics
594 .iter()
595 .find(|m| m.name() == "network_receive_bytes_total")
596 {
597 Some(m) => {
598 assert_eq!(m.value(), &MetricValue::Counter { value: 329932716.0 });
599 assert_eq!(m.namespace(), Some("awsecs"));
600
601 match m.tags() {
602 Some(tags) => assert_eq!(tags.get("device"), Some("eth1")),
603 None => panic!("No tags for metric. {m:?}"),
604 }
605 }
606 None => panic!("Could not find 'network_receive_bytes_total' in {metrics:?}."),
607 }
608 }
609}
610
611#[cfg(feature = "aws-ecs-metrics-integration-tests")]
612#[cfg(test)]
613mod integration_tests {
614 use tokio::time::Duration;
615
616 use super::*;
617 use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
618
619 fn ecs_address() -> String {
620 env::var("ECS_ADDRESS").unwrap_or_else(|_| "http://localhost:9088".into())
621 }
622
623 fn ecs_url(version: &str) -> String {
624 format!("{}/{}", ecs_address(), version)
625 }
626
627 async fn scrape_metrics(endpoint: String, version: Version) {
628 let config = AwsEcsMetricsSourceConfig {
629 endpoint,
630 version,
631 scrape_interval_secs: Duration::from_secs(1),
632 namespace: default_namespace(),
633 };
634
635 let events =
636 run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
637 assert!(!events.is_empty());
638 }
639
640 #[tokio::test]
641 async fn scrapes_metrics_v2() {
642 scrape_metrics(ecs_url("v2"), Version::V2).await;
643 }
644
645 #[tokio::test]
646 async fn scrapes_metrics_v3() {
647 scrape_metrics(ecs_url("v3"), Version::V3).await;
648 }
649
650 #[tokio::test]
651 async fn scrapes_metrics_v4() {
652 // mock uses same endpoint for v4 as v3
653 // https://github.com/awslabs/amazon-ecs-local-container-endpoints/blob/mainline/docs/features.md#task-metadata-v4
654 scrape_metrics(ecs_url("v3"), Version::V4).await;
655 }
656}