vector/sources/aws_ecs_metrics/mod.rs
1use std::{env, time::Duration};
2
3use futures::StreamExt;
4use hyper::{Body, Request};
5use serde_with::serde_as;
6use tokio::time;
7use tokio_stream::wrappers::IntervalStream;
8use vector_lib::configurable::configurable_component;
9use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
10use vector_lib::{config::LogNamespace, EstimatedJsonEncodedSizeOf};
11
12use crate::{
13 config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
14 http::HttpClient,
15 internal_events::{
16 AwsEcsMetricsEventsReceived, AwsEcsMetricsParseError, HttpClientHttpError,
17 HttpClientHttpResponseError, StreamClosedError,
18 },
19 shutdown::ShutdownSignal,
20 SourceSender,
21};
22
23mod parser;
24
25/// Version of the AWS ECS task metadata endpoint to use.
26///
27/// More information about the different versions can be found
28/// [here][meta_endpoint].
29///
30/// [meta_endpoint]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html
31#[configurable_component]
32#[derive(Clone, Debug)]
33#[serde(rename_all = "lowercase")]
34pub enum Version {
35 /// Version 2.
36 ///
37 /// More information about version 2 of the task metadata endpoint can be found [here][endpoint_v2].
38 ///
39 /// [endpoint_v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
40 V2,
41 /// Version 3.
42 ///
43 /// More information about version 3 of the task metadata endpoint can be found [here][endpoint_v3].
44 ///
45 /// [endpoint_v3]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
46 V3,
47 /// Version 4.
48 ///
49 /// More information about version 4 of the task metadata endpoint can be found [here][endpoint_v4].
50 ///
51 /// [endpoint_v4]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html
52 V4,
53}
54
55/// Configuration for the `aws_ecs_metrics` source.
56#[serde_as]
57#[configurable_component(source(
58 "aws_ecs_metrics",
59 "Collect Docker container stats for tasks running in AWS ECS and AWS Fargate."
60))]
61#[derive(Clone, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct AwsEcsMetricsSourceConfig {
64 /// Base URI of the task metadata endpoint.
65 ///
66 /// If empty, the URI is automatically discovered based on the latest version detected.
67 ///
68 /// By default:
69 /// - The version 4 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI_V4`.
70 /// - The version 3 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI`.
71 /// - The version 2 endpoint base URI is `169.254.170.2/v2/`.
72 #[serde(default = "default_endpoint")]
73 endpoint: String,
74
75 /// The version of the task metadata endpoint to use.
76 ///
77 /// If empty, the version is automatically discovered based on environment variables.
78 ///
79 /// By default:
80 /// - Version 4 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is defined.
81 /// - Version 3 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is not defined, but the
82 /// environment variable `ECS_CONTAINER_METADATA_URI` _is_ defined.
83 /// - Version 2 is used if neither of the environment variables `ECS_CONTAINER_METADATA_URI_V4` or
84 /// `ECS_CONTAINER_METADATA_URI` are defined.
85 #[serde(default = "default_version")]
86 version: Version,
87
88 /// The interval between scrapes, in seconds.
89 #[serde(default = "default_scrape_interval_secs")]
90 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
91 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
92 scrape_interval_secs: Duration,
93
94 /// The namespace of the metric.
95 ///
96 /// Disabled if empty.
97 #[serde(default = "default_namespace")]
98 namespace: String,
99}
100
101const METADATA_URI_V4: &str = "ECS_CONTAINER_METADATA_URI";
102const METADATA_URI_V3: &str = "ECS_CONTAINER_METADATA_URI_V4";
103
104pub fn default_endpoint() -> String {
105 env::var(METADATA_URI_V4)
106 .or_else(|_| env::var(METADATA_URI_V3))
107 .unwrap_or_else(|_| "http://169.254.170.2/v2".into())
108}
109
110pub fn default_version() -> Version {
111 if env::var(METADATA_URI_V4).is_ok() {
112 Version::V4
113 } else if env::var(METADATA_URI_V3).is_ok() {
114 Version::V3
115 } else {
116 Version::V2
117 }
118}
119
120pub const fn default_scrape_interval_secs() -> Duration {
121 Duration::from_secs(15)
122}
123
124pub fn default_namespace() -> String {
125 "awsecs".to_string()
126}
127
128impl AwsEcsMetricsSourceConfig {
129 fn stats_endpoint(&self) -> String {
130 match self.version {
131 Version::V2 => format!("{}/stats", self.endpoint),
132 _ => format!("{}/task/stats", self.endpoint),
133 }
134 }
135}
136
137impl GenerateConfig for AwsEcsMetricsSourceConfig {
138 fn generate_config() -> toml::Value {
139 toml::Value::try_from(Self {
140 endpoint: default_endpoint(),
141 version: default_version(),
142 scrape_interval_secs: default_scrape_interval_secs(),
143 namespace: default_namespace(),
144 })
145 .unwrap()
146 }
147}
148
149#[async_trait::async_trait]
150#[typetag::serde(name = "aws_ecs_metrics")]
151impl SourceConfig for AwsEcsMetricsSourceConfig {
152 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
153 let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
154 let http_client = HttpClient::new(None, &cx.proxy)?;
155
156 Ok(Box::pin(aws_ecs_metrics(
157 http_client,
158 self.stats_endpoint(),
159 self.scrape_interval_secs,
160 namespace,
161 cx.out,
162 cx.shutdown,
163 )))
164 }
165
166 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
167 vec![SourceOutput::new_metrics()]
168 }
169
170 fn can_acknowledge(&self) -> bool {
171 false
172 }
173}
174
175async fn aws_ecs_metrics(
176 http_client: HttpClient,
177 url: String,
178 interval: Duration,
179 namespace: Option<String>,
180 mut out: SourceSender,
181 shutdown: ShutdownSignal,
182) -> Result<(), ()> {
183 let mut interval = IntervalStream::new(time::interval(interval)).take_until(shutdown);
184 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
185 while interval.next().await.is_some() {
186 let request = Request::get(&url)
187 .body(Body::empty())
188 .expect("error creating request");
189 let uri = request.uri().clone();
190
191 match http_client.send(request).await {
192 Ok(response) if response.status() == hyper::StatusCode::OK => {
193 match hyper::body::to_bytes(response).await {
194 Ok(body) => {
195 bytes_received.emit(ByteSize(body.len()));
196
197 match parser::parse(body.as_ref(), namespace.clone()) {
198 Ok(metrics) => {
199 let count = metrics.len();
200 emit!(AwsEcsMetricsEventsReceived {
201 byte_size: metrics.estimated_json_encoded_size_of(),
202 count,
203 endpoint: uri.path(),
204 });
205
206 if (out.send_batch(metrics).await).is_err() {
207 emit!(StreamClosedError { count });
208 return Err(());
209 }
210 }
211 Err(error) => {
212 emit!(AwsEcsMetricsParseError {
213 error,
214 endpoint: &url,
215 body: String::from_utf8_lossy(&body),
216 });
217 }
218 }
219 }
220 Err(error) => {
221 emit!(HttpClientHttpError {
222 error: crate::Error::from(error),
223 url: url.to_owned(),
224 });
225 }
226 }
227 }
228 Ok(response) => {
229 emit!(HttpClientHttpResponseError {
230 code: response.status(),
231 url: url.to_owned(),
232 });
233 }
234 Err(error) => {
235 emit!(HttpClientHttpError {
236 error: crate::Error::from(error),
237 url: url.to_owned(),
238 });
239 }
240 }
241 }
242
243 Ok(())
244}
245
246#[cfg(test)]
247mod test {
248 use hyper::{
249 service::{make_service_fn, service_fn},
250 Body, Response, Server,
251 };
252 use tokio::time::Duration;
253
254 use super::*;
255 use crate::{
256 event::MetricValue,
257 test_util::{
258 components::{run_and_assert_source_compliance, SOURCE_TAGS},
259 next_addr, wait_for_tcp,
260 },
261 Error,
262 };
263
264 #[tokio::test]
265 async fn test_aws_ecs_metrics_source() {
266 let in_addr = next_addr();
267
268 let make_svc = make_service_fn(|_| async {
269 Ok::<_, Error>(service_fn(|_| async {
270 Ok::<_, Error>(Response::new(Body::from(
271 r#"
272 {
273 "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590": {
274 "read": "2020-09-23T20:32:26.292561674Z",
275 "preread": "2020-09-23T20:32:21.290708273Z",
276 "pids_stats": {},
277 "blkio_stats": {
278 "io_service_bytes_recursive": [],
279 "io_serviced_recursive": [],
280 "io_queue_recursive": [],
281 "io_service_time_recursive": [],
282 "io_wait_time_recursive": [],
283 "io_merged_recursive": [],
284 "io_time_recursive": [],
285 "sectors_recursive": []
286 },
287 "num_procs": 0,
288 "storage_stats": {},
289 "cpu_stats": {
290 "cpu_usage": {
291 "total_usage": 863993897,
292 "percpu_usage": [
293 607511353,
294 256482544,
295 0,
296 0,
297 0,
298 0,
299 0,
300 0,
301 0,
302 0,
303 0,
304 0,
305 0,
306 0,
307 0
308 ],
309 "usage_in_kernelmode": 80000000,
310 "usage_in_usermode": 610000000
311 },
312 "system_cpu_usage": 2007100000000,
313 "online_cpus": 2,
314 "throttling_data": {
315 "periods": 0,
316 "throttled_periods": 0,
317 "throttled_time": 0
318 }
319 },
320 "precpu_stats": {
321 "cpu_usage": {
322 "total_usage": 0,
323 "usage_in_kernelmode": 0,
324 "usage_in_usermode": 0
325 },
326 "throttling_data": {
327 "periods": 0,
328 "throttled_periods": 0,
329 "throttled_time": 0
330 }
331 },
332 "memory_stats": {
333 "usage": 39931904,
334 "max_usage": 40054784,
335 "stats": {
336 "active_anon": 37457920,
337 "active_file": 4096,
338 "cache": 4096,
339 "dirty": 0,
340 "hierarchical_memory_limit": 536870912,
341 "hierarchical_memsw_limit": 9223372036854771712,
342 "inactive_anon": 0,
343 "inactive_file": 0,
344 "mapped_file": 0,
345 "pgfault": 15745,
346 "pgmajfault": 0,
347 "pgpgin": 12086,
348 "pgpgout": 2940,
349 "rss": 37457920,
350 "rss_huge": 0,
351 "total_active_anon": 37457920,
352 "total_active_file": 4096,
353 "total_cache": 4096,
354 "total_dirty": 0,
355 "total_inactive_anon": 0,
356 "total_inactive_file": 0,
357 "total_mapped_file": 0,
358 "total_pgfault": 15745,
359 "total_pgmajfault": 0,
360 "total_pgpgin": 12086,
361 "total_pgpgout": 2940,
362 "total_rss": 37457920,
363 "total_rss_huge": 0,
364 "total_unevictable": 0,
365 "total_writeback": 0,
366 "unevictable": 0,
367 "writeback": 0
368 },
369 "limit": 9223372036854771712
370 },
371 "name": "vector1",
372 "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590",
373 "networks": {
374 "eth1": {
375 "rx_bytes": 329932716,
376 "rx_packets": 224158,
377 "rx_errors": 0,
378 "rx_dropped": 0,
379 "tx_bytes": 2001229,
380 "tx_packets": 29201,
381 "tx_errors": 0,
382 "tx_dropped": 0
383 }
384 }
385 },
386 "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352": {
387 "read": "2020-09-23T20:32:26.314100759Z",
388 "preread": "2020-09-23T20:32:21.315056862Z",
389 "pids_stats": {},
390 "blkio_stats": {
391 "io_service_bytes_recursive": [
392 {
393 "major": 202,
394 "minor": 26368,
395 "op": "Read",
396 "value": 0
397 },
398 {
399 "major": 202,
400 "minor": 26368,
401 "op": "Write",
402 "value": 520192
403 },
404 {
405 "major": 202,
406 "minor": 26368,
407 "op": "Sync",
408 "value": 516096
409 },
410 {
411 "major": 202,
412 "minor": 26368,
413 "op": "Async",
414 "value": 4096
415 },
416 {
417 "major": 202,
418 "minor": 26368,
419 "op": "Total",
420 "value": 520192
421 }
422 ],
423 "io_serviced_recursive": [
424 {
425 "major": 202,
426 "minor": 26368,
427 "op": "Read",
428 "value": 0
429 },
430 {
431 "major": 202,
432 "minor": 26368,
433 "op": "Write",
434 "value": 10
435 },
436 {
437 "major": 202,
438 "minor": 26368,
439 "op": "Sync",
440 "value": 9
441 },
442 {
443 "major": 202,
444 "minor": 26368,
445 "op": "Async",
446 "value": 1
447 },
448 {
449 "major": 202,
450 "minor": 26368,
451 "op": "Total",
452 "value": 10
453 }
454 ],
455 "io_queue_recursive": [],
456 "io_service_time_recursive": [],
457 "io_wait_time_recursive": [],
458 "io_merged_recursive": [],
459 "io_time_recursive": [],
460 "sectors_recursive": []
461 },
462 "num_procs": 0,
463 "storage_stats": {},
464 "cpu_stats": {
465 "cpu_usage": {
466 "total_usage": 2324920942,
467 "percpu_usage": [
468 1095931487,
469 1228989455,
470 0,
471 0,
472 0,
473 0,
474 0,
475 0,
476 0,
477 0,
478 0,
479 0,
480 0,
481 0,
482 0
483 ],
484 "usage_in_kernelmode": 190000000,
485 "usage_in_usermode": 510000000
486 },
487 "system_cpu_usage": 2007130000000,
488 "online_cpus": 2,
489 "throttling_data": {
490 "periods": 0,
491 "throttled_periods": 0,
492 "throttled_time": 0
493 }
494 },
495 "precpu_stats": {
496 "cpu_usage": {
497 "total_usage": 0,
498 "usage_in_kernelmode": 0,
499 "usage_in_usermode": 0
500 },
501 "throttling_data": {
502 "periods": 0,
503 "throttled_periods": 0,
504 "throttled_time": 0
505 }
506 },
507 "memory_stats": {
508 "usage": 40120320,
509 "max_usage": 47177728,
510 "stats": {
511 "active_anon": 34885632,
512 "active_file": 65536,
513 "cache": 413696,
514 "dirty": 0,
515 "hierarchical_memory_limit": 536870912,
516 "hierarchical_memsw_limit": 9223372036854771712,
517 "inactive_anon": 4096,
518 "inactive_file": 344064,
519 "mapped_file": 4096,
520 "pgfault": 31131,
521 "pgmajfault": 0,
522 "pgpgin": 22360,
523 "pgpgout": 13742,
524 "rss": 34885632,
525 "rss_huge": 0,
526 "total_active_anon": 34885632,
527 "total_active_file": 65536,
528 "total_cache": 413696,
529 "total_dirty": 0,
530 "total_inactive_anon": 4096,
531 "total_inactive_file": 344064,
532 "total_mapped_file": 4096,
533 "total_pgfault": 31131,
534 "total_pgmajfault": 0,
535 "total_pgpgin": 22360,
536 "total_pgpgout": 13742,
537 "total_rss": 34885632,
538 "total_rss_huge": 0,
539 "total_unevictable": 0,
540 "total_writeback": 0,
541 "unevictable": 0,
542 "writeback": 0
543 },
544 "limit": 9223372036854771712
545 },
546 "name": "vector2",
547 "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352",
548 "networks": {
549 "eth1": {
550 "rx_bytes": 329932716,
551 "rx_packets": 224158,
552 "rx_errors": 0,
553 "rx_dropped": 0,
554 "tx_bytes": 2001229,
555 "tx_packets": 29201,
556 "tx_errors": 0,
557 "tx_dropped": 0
558 }
559 }
560 }
561 }
562 "#,
563 )))
564 }))
565 });
566
567 tokio::spawn(async move {
568 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
569 error!(message = "Server error.", %error);
570 }
571 });
572 wait_for_tcp(in_addr).await;
573
574 let config = AwsEcsMetricsSourceConfig {
575 endpoint: format!("http://{in_addr}"),
576 version: Version::V4,
577 scrape_interval_secs: Duration::from_secs(1),
578 namespace: default_namespace(),
579 };
580
581 let events =
582 run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
583 assert!(!events.is_empty());
584
585 let metrics = events
586 .into_iter()
587 .map(|e| e.into_metric())
588 .collect::<Vec<_>>();
589
590 match metrics
591 .iter()
592 .find(|m| m.name() == "network_receive_bytes_total")
593 {
594 Some(m) => {
595 assert_eq!(m.value(), &MetricValue::Counter { value: 329932716.0 });
596 assert_eq!(m.namespace(), Some("awsecs"));
597
598 match m.tags() {
599 Some(tags) => assert_eq!(tags.get("device"), Some("eth1")),
600 None => panic!("No tags for metric. {m:?}"),
601 }
602 }
603 None => panic!("Could not find 'network_receive_bytes_total' in {metrics:?}."),
604 }
605 }
606}
607
608#[cfg(feature = "aws-ecs-metrics-integration-tests")]
609#[cfg(test)]
610mod integration_tests {
611 use tokio::time::Duration;
612
613 use super::*;
614 use crate::test_util::components::{run_and_assert_source_compliance, SOURCE_TAGS};
615
616 fn ecs_address() -> String {
617 env::var("ECS_ADDRESS").unwrap_or_else(|_| "http://localhost:9088".into())
618 }
619
620 fn ecs_url(version: &str) -> String {
621 format!("{}/{}", ecs_address(), version)
622 }
623
624 async fn scrape_metrics(endpoint: String, version: Version) {
625 let config = AwsEcsMetricsSourceConfig {
626 endpoint,
627 version,
628 scrape_interval_secs: Duration::from_secs(1),
629 namespace: default_namespace(),
630 };
631
632 let events =
633 run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
634 assert!(!events.is_empty());
635 }
636
637 #[tokio::test]
638 async fn scrapes_metrics_v2() {
639 scrape_metrics(ecs_url("v2"), Version::V2).await;
640 }
641
642 #[tokio::test]
643 async fn scrapes_metrics_v3() {
644 scrape_metrics(ecs_url("v3"), Version::V3).await;
645 }
646
647 #[tokio::test]
648 async fn scrapes_metrics_v4() {
649 // mock uses same endpoint for v4 as v3
650 // https://github.com/awslabs/amazon-ecs-local-container-endpoints/blob/mainline/docs/features.md#task-metadata-v4
651 scrape_metrics(ecs_url("v3"), Version::V4).await;
652 }
653}