vector/sources/aws_ecs_metrics/
mod.rs

1use std::{env, time::Duration};
2
3use futures::StreamExt;
4use http_body::Collected;
5use hyper::{Body, Request};
6use serde_with::serde_as;
7use tokio::time;
8use tokio_stream::wrappers::IntervalStream;
9use vector_lib::{
10    EstimatedJsonEncodedSizeOf,
11    config::LogNamespace,
12    configurable::configurable_component,
13    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
14};
15
16use crate::{
17    SourceSender,
18    config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
19    http::HttpClient,
20    internal_events::{
21        AwsEcsMetricsEventsReceived, AwsEcsMetricsParseError, HttpClientHttpError,
22        HttpClientHttpResponseError, StreamClosedError,
23    },
24    shutdown::ShutdownSignal,
25};
26
27mod parser;
28
29/// Version of the AWS ECS task metadata endpoint to use.
30///
31/// More information about the different versions can be found
32/// [here][meta_endpoint].
33///
34/// [meta_endpoint]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html
35#[configurable_component]
36#[derive(Clone, Debug)]
37#[serde(rename_all = "lowercase")]
38pub enum Version {
39    /// Version 2.
40    ///
41    /// More information about version 2 of the task metadata endpoint can be found [here][endpoint_v2].
42    ///
43    /// [endpoint_v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
44    V2,
45    /// Version 3.
46    ///
47    /// More information about version 3 of the task metadata endpoint can be found [here][endpoint_v3].
48    ///
49    /// [endpoint_v3]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
50    V3,
51    /// Version 4.
52    ///
53    /// More information about version 4 of the task metadata endpoint can be found [here][endpoint_v4].
54    ///
55    /// [endpoint_v4]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html
56    V4,
57}
58
59/// Configuration for the `aws_ecs_metrics` source.
60#[serde_as]
61#[configurable_component(source(
62    "aws_ecs_metrics",
63    "Collect Docker container stats for tasks running in AWS ECS and AWS Fargate."
64))]
65#[derive(Clone, Debug)]
66#[serde(deny_unknown_fields)]
67pub struct AwsEcsMetricsSourceConfig {
68    /// Base URI of the task metadata endpoint.
69    ///
70    /// If empty, the URI is automatically discovered based on the latest version detected.
71    ///
72    /// By default:
73    /// - The version 4 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI_V4`.
74    /// - The version 3 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI`.
75    /// - The version 2 endpoint base URI is `169.254.170.2/v2/`.
76    #[serde(default = "default_endpoint")]
77    endpoint: String,
78
79    /// The version of the task metadata endpoint to use.
80    ///
81    /// If empty, the version is automatically discovered based on environment variables.
82    ///
83    /// By default:
84    /// - Version 4 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is defined.
85    /// - Version 3 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is not defined, but the
86    ///   environment variable `ECS_CONTAINER_METADATA_URI` _is_ defined.
87    /// - Version 2 is used if neither of the environment variables `ECS_CONTAINER_METADATA_URI_V4` or
88    ///   `ECS_CONTAINER_METADATA_URI` are defined.
89    #[serde(default = "default_version")]
90    version: Version,
91
92    /// The interval between scrapes, in seconds.
93    #[serde(default = "default_scrape_interval_secs")]
94    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
95    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
96    scrape_interval_secs: Duration,
97
98    /// The namespace of the metric.
99    ///
100    /// Disabled if empty.
101    #[serde(default = "default_namespace")]
102    namespace: String,
103}
104
105const METADATA_URI_V4: &str = "ECS_CONTAINER_METADATA_URI";
106const METADATA_URI_V3: &str = "ECS_CONTAINER_METADATA_URI_V4";
107
108pub fn default_endpoint() -> String {
109    env::var(METADATA_URI_V4)
110        .or_else(|_| env::var(METADATA_URI_V3))
111        .unwrap_or_else(|_| "http://169.254.170.2/v2".into())
112}
113
114pub fn default_version() -> Version {
115    if env::var(METADATA_URI_V4).is_ok() {
116        Version::V4
117    } else if env::var(METADATA_URI_V3).is_ok() {
118        Version::V3
119    } else {
120        Version::V2
121    }
122}
123
124pub const fn default_scrape_interval_secs() -> Duration {
125    Duration::from_secs(15)
126}
127
128pub fn default_namespace() -> String {
129    "awsecs".to_string()
130}
131
132impl AwsEcsMetricsSourceConfig {
133    fn stats_endpoint(&self) -> String {
134        match self.version {
135            Version::V2 => format!("{}/stats", self.endpoint),
136            _ => format!("{}/task/stats", self.endpoint),
137        }
138    }
139}
140
141impl GenerateConfig for AwsEcsMetricsSourceConfig {
142    fn generate_config() -> toml::Value {
143        toml::Value::try_from(Self {
144            endpoint: default_endpoint(),
145            version: default_version(),
146            scrape_interval_secs: default_scrape_interval_secs(),
147            namespace: default_namespace(),
148        })
149        .unwrap()
150    }
151}
152
153#[async_trait::async_trait]
154#[typetag::serde(name = "aws_ecs_metrics")]
155impl SourceConfig for AwsEcsMetricsSourceConfig {
156    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
157        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
158        let http_client = HttpClient::new(None, &cx.proxy)?;
159
160        Ok(Box::pin(aws_ecs_metrics(
161            http_client,
162            self.stats_endpoint(),
163            self.scrape_interval_secs,
164            namespace,
165            cx.out,
166            cx.shutdown,
167        )))
168    }
169
170    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
171        vec![SourceOutput::new_metrics()]
172    }
173
174    fn can_acknowledge(&self) -> bool {
175        false
176    }
177}
178
179async fn aws_ecs_metrics(
180    http_client: HttpClient,
181    url: String,
182    interval: Duration,
183    namespace: Option<String>,
184    mut out: SourceSender,
185    shutdown: ShutdownSignal,
186) -> Result<(), ()> {
187    let mut interval = IntervalStream::new(time::interval(interval)).take_until(shutdown);
188    let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
189    while interval.next().await.is_some() {
190        let request = Request::get(&url)
191            .body(Body::empty())
192            .expect("error creating request");
193        let uri = request.uri().clone();
194
195        match http_client.send(request).await {
196            Ok(response) if response.status() == hyper::StatusCode::OK => {
197                match http_body::Body::collect(response.into_body())
198                    .await
199                    .map(Collected::to_bytes)
200                {
201                    Ok(body) => {
202                        bytes_received.emit(ByteSize(body.len()));
203
204                        match parser::parse(body.as_ref(), namespace.clone()) {
205                            Ok(metrics) => {
206                                let count = metrics.len();
207                                emit!(AwsEcsMetricsEventsReceived {
208                                    byte_size: metrics.estimated_json_encoded_size_of(),
209                                    count,
210                                    endpoint: uri.path(),
211                                });
212
213                                if (out.send_batch(metrics).await).is_err() {
214                                    emit!(StreamClosedError { count });
215                                    return Err(());
216                                }
217                            }
218                            Err(error) => {
219                                emit!(AwsEcsMetricsParseError {
220                                    error,
221                                    endpoint: &url,
222                                    body: String::from_utf8_lossy(&body),
223                                });
224                            }
225                        }
226                    }
227                    Err(error) => {
228                        emit!(HttpClientHttpError {
229                            error: crate::Error::from(error),
230                            url: url.to_owned(),
231                        });
232                    }
233                }
234            }
235            Ok(response) => {
236                emit!(HttpClientHttpResponseError {
237                    code: response.status(),
238                    url: url.to_owned(),
239                });
240            }
241            Err(error) => {
242                emit!(HttpClientHttpError {
243                    error: crate::Error::from(error),
244                    url: url.to_owned(),
245                });
246            }
247        }
248    }
249
250    Ok(())
251}
252
253#[cfg(test)]
254mod test {
255    use hyper::{
256        Body, Response, Server,
257        service::{make_service_fn, service_fn},
258    };
259    use tokio::time::Duration;
260
261    use super::*;
262    use crate::{
263        Error,
264        event::MetricValue,
265        test_util::{
266            addr::next_addr,
267            components::{SOURCE_TAGS, run_and_assert_source_compliance},
268            wait_for_tcp,
269        },
270    };
271
272    #[tokio::test]
273    async fn test_aws_ecs_metrics_source() {
274        let (_guard, in_addr) = next_addr();
275
276        let make_svc = make_service_fn(|_| async {
277            Ok::<_, Error>(service_fn(|_| async {
278                Ok::<_, Error>(Response::new(Body::from(
279                    r#"
280                    {
281                        "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590": {
282                            "read": "2020-09-23T20:32:26.292561674Z",
283                            "preread": "2020-09-23T20:32:21.290708273Z",
284                            "pids_stats": {},
285                            "blkio_stats": {
286                                "io_service_bytes_recursive": [],
287                                "io_serviced_recursive": [],
288                                "io_queue_recursive": [],
289                                "io_service_time_recursive": [],
290                                "io_wait_time_recursive": [],
291                                "io_merged_recursive": [],
292                                "io_time_recursive": [],
293                                "sectors_recursive": []
294                            },
295                            "num_procs": 0,
296                            "storage_stats": {},
297                            "cpu_stats": {
298                                "cpu_usage": {
299                                    "total_usage": 863993897,
300                                    "percpu_usage": [
301                                        607511353,
302                                        256482544,
303                                        0,
304                                        0,
305                                        0,
306                                        0,
307                                        0,
308                                        0,
309                                        0,
310                                        0,
311                                        0,
312                                        0,
313                                        0,
314                                        0,
315                                        0
316                                    ],
317                                    "usage_in_kernelmode": 80000000,
318                                    "usage_in_usermode": 610000000
319                                },
320                                "system_cpu_usage": 2007100000000,
321                                "online_cpus": 2,
322                                "throttling_data": {
323                                    "periods": 0,
324                                    "throttled_periods": 0,
325                                    "throttled_time": 0
326                                }
327                            },
328                            "precpu_stats": {
329                                "cpu_usage": {
330                                    "total_usage": 0,
331                                    "usage_in_kernelmode": 0,
332                                    "usage_in_usermode": 0
333                                },
334                                "throttling_data": {
335                                    "periods": 0,
336                                    "throttled_periods": 0,
337                                    "throttled_time": 0
338                                }
339                            },
340                            "memory_stats": {
341                                "usage": 39931904,
342                                "max_usage": 40054784,
343                                "stats": {
344                                    "active_anon": 37457920,
345                                    "active_file": 4096,
346                                    "cache": 4096,
347                                    "dirty": 0,
348                                    "hierarchical_memory_limit": 536870912,
349                                    "hierarchical_memsw_limit": 9223372036854771712,
350                                    "inactive_anon": 0,
351                                    "inactive_file": 0,
352                                    "mapped_file": 0,
353                                    "pgfault": 15745,
354                                    "pgmajfault": 0,
355                                    "pgpgin": 12086,
356                                    "pgpgout": 2940,
357                                    "rss": 37457920,
358                                    "rss_huge": 0,
359                                    "total_active_anon": 37457920,
360                                    "total_active_file": 4096,
361                                    "total_cache": 4096,
362                                    "total_dirty": 0,
363                                    "total_inactive_anon": 0,
364                                    "total_inactive_file": 0,
365                                    "total_mapped_file": 0,
366                                    "total_pgfault": 15745,
367                                    "total_pgmajfault": 0,
368                                    "total_pgpgin": 12086,
369                                    "total_pgpgout": 2940,
370                                    "total_rss": 37457920,
371                                    "total_rss_huge": 0,
372                                    "total_unevictable": 0,
373                                    "total_writeback": 0,
374                                    "unevictable": 0,
375                                    "writeback": 0
376                                },
377                                "limit": 9223372036854771712
378                            },
379                            "name": "vector1",
380                            "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590",
381                            "networks": {
382                                "eth1": {
383                                    "rx_bytes": 329932716,
384                                    "rx_packets": 224158,
385                                    "rx_errors": 0,
386                                    "rx_dropped": 0,
387                                    "tx_bytes": 2001229,
388                                    "tx_packets": 29201,
389                                    "tx_errors": 0,
390                                    "tx_dropped": 0
391                                }
392                            }
393                        },
394                        "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352": {
395                            "read": "2020-09-23T20:32:26.314100759Z",
396                            "preread": "2020-09-23T20:32:21.315056862Z",
397                            "pids_stats": {},
398                            "blkio_stats": {
399                                "io_service_bytes_recursive": [
400                                    {
401                                        "major": 202,
402                                        "minor": 26368,
403                                        "op": "Read",
404                                        "value": 0
405                                    },
406                                    {
407                                        "major": 202,
408                                        "minor": 26368,
409                                        "op": "Write",
410                                        "value": 520192
411                                    },
412                                    {
413                                        "major": 202,
414                                        "minor": 26368,
415                                        "op": "Sync",
416                                        "value": 516096
417                                    },
418                                    {
419                                        "major": 202,
420                                        "minor": 26368,
421                                        "op": "Async",
422                                        "value": 4096
423                                    },
424                                    {
425                                        "major": 202,
426                                        "minor": 26368,
427                                        "op": "Total",
428                                        "value": 520192
429                                    }
430                                ],
431                                "io_serviced_recursive": [
432                                    {
433                                        "major": 202,
434                                        "minor": 26368,
435                                        "op": "Read",
436                                        "value": 0
437                                    },
438                                    {
439                                        "major": 202,
440                                        "minor": 26368,
441                                        "op": "Write",
442                                        "value": 10
443                                    },
444                                    {
445                                        "major": 202,
446                                        "minor": 26368,
447                                        "op": "Sync",
448                                        "value": 9
449                                    },
450                                    {
451                                        "major": 202,
452                                        "minor": 26368,
453                                        "op": "Async",
454                                        "value": 1
455                                    },
456                                    {
457                                        "major": 202,
458                                        "minor": 26368,
459                                        "op": "Total",
460                                        "value": 10
461                                    }
462                                ],
463                                "io_queue_recursive": [],
464                                "io_service_time_recursive": [],
465                                "io_wait_time_recursive": [],
466                                "io_merged_recursive": [],
467                                "io_time_recursive": [],
468                                "sectors_recursive": []
469                            },
470                            "num_procs": 0,
471                            "storage_stats": {},
472                            "cpu_stats": {
473                                "cpu_usage": {
474                                    "total_usage": 2324920942,
475                                    "percpu_usage": [
476                                        1095931487,
477                                        1228989455,
478                                        0,
479                                        0,
480                                        0,
481                                        0,
482                                        0,
483                                        0,
484                                        0,
485                                        0,
486                                        0,
487                                        0,
488                                        0,
489                                        0,
490                                        0
491                                    ],
492                                    "usage_in_kernelmode": 190000000,
493                                    "usage_in_usermode": 510000000
494                                },
495                                "system_cpu_usage": 2007130000000,
496                                "online_cpus": 2,
497                                "throttling_data": {
498                                    "periods": 0,
499                                    "throttled_periods": 0,
500                                    "throttled_time": 0
501                                }
502                            },
503                            "precpu_stats": {
504                                "cpu_usage": {
505                                    "total_usage": 0,
506                                    "usage_in_kernelmode": 0,
507                                    "usage_in_usermode": 0
508                                },
509                                "throttling_data": {
510                                    "periods": 0,
511                                    "throttled_periods": 0,
512                                    "throttled_time": 0
513                                }
514                            },
515                            "memory_stats": {
516                                "usage": 40120320,
517                                "max_usage": 47177728,
518                                "stats": {
519                                    "active_anon": 34885632,
520                                    "active_file": 65536,
521                                    "cache": 413696,
522                                    "dirty": 0,
523                                    "hierarchical_memory_limit": 536870912,
524                                    "hierarchical_memsw_limit": 9223372036854771712,
525                                    "inactive_anon": 4096,
526                                    "inactive_file": 344064,
527                                    "mapped_file": 4096,
528                                    "pgfault": 31131,
529                                    "pgmajfault": 0,
530                                    "pgpgin": 22360,
531                                    "pgpgout": 13742,
532                                    "rss": 34885632,
533                                    "rss_huge": 0,
534                                    "total_active_anon": 34885632,
535                                    "total_active_file": 65536,
536                                    "total_cache": 413696,
537                                    "total_dirty": 0,
538                                    "total_inactive_anon": 4096,
539                                    "total_inactive_file": 344064,
540                                    "total_mapped_file": 4096,
541                                    "total_pgfault": 31131,
542                                    "total_pgmajfault": 0,
543                                    "total_pgpgin": 22360,
544                                    "total_pgpgout": 13742,
545                                    "total_rss": 34885632,
546                                    "total_rss_huge": 0,
547                                    "total_unevictable": 0,
548                                    "total_writeback": 0,
549                                    "unevictable": 0,
550                                    "writeback": 0
551                                },
552                                "limit": 9223372036854771712
553                            },
554                            "name": "vector2",
555                            "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352",
556                            "networks": {
557                                "eth1": {
558                                    "rx_bytes": 329932716,
559                                    "rx_packets": 224158,
560                                    "rx_errors": 0,
561                                    "rx_dropped": 0,
562                                    "tx_bytes": 2001229,
563                                    "tx_packets": 29201,
564                                    "tx_errors": 0,
565                                    "tx_dropped": 0
566                                }
567                            }
568                        }
569                    }
570                    "#,
571                )))
572            }))
573        });
574
575        tokio::spawn(async move {
576            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
577                error!(message = "Server error.", %error);
578            }
579        });
580        wait_for_tcp(in_addr).await;
581
582        let config = AwsEcsMetricsSourceConfig {
583            endpoint: format!("http://{in_addr}"),
584            version: Version::V4,
585            scrape_interval_secs: Duration::from_secs(1),
586            namespace: default_namespace(),
587        };
588
589        let events =
590            run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
591        assert!(!events.is_empty());
592
593        let metrics = events
594            .into_iter()
595            .map(|e| e.into_metric())
596            .collect::<Vec<_>>();
597
598        match metrics
599            .iter()
600            .find(|m| m.name() == "network_receive_bytes_total")
601        {
602            Some(m) => {
603                assert_eq!(m.value(), &MetricValue::Counter { value: 329932716.0 });
604                assert_eq!(m.namespace(), Some("awsecs"));
605
606                match m.tags() {
607                    Some(tags) => assert_eq!(tags.get("device"), Some("eth1")),
608                    None => panic!("No tags for metric. {m:?}"),
609                }
610            }
611            None => panic!("Could not find 'network_receive_bytes_total' in {metrics:?}."),
612        }
613    }
614}
615
616#[cfg(feature = "aws-ecs-metrics-integration-tests")]
617#[cfg(test)]
618mod integration_tests {
619    use tokio::time::Duration;
620
621    use super::*;
622    use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
623
624    fn ecs_address() -> String {
625        env::var("ECS_ADDRESS").unwrap_or_else(|_| "http://localhost:9088".into())
626    }
627
628    fn ecs_url(version: &str) -> String {
629        format!("{}/{}", ecs_address(), version)
630    }
631
632    async fn scrape_metrics(endpoint: String, version: Version) {
633        let config = AwsEcsMetricsSourceConfig {
634            endpoint,
635            version,
636            scrape_interval_secs: Duration::from_secs(1),
637            namespace: default_namespace(),
638        };
639
640        let events =
641            run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
642        assert!(!events.is_empty());
643    }
644
645    #[tokio::test]
646    async fn scrapes_metrics_v2() {
647        scrape_metrics(ecs_url("v2"), Version::V2).await;
648    }
649
650    #[tokio::test]
651    async fn scrapes_metrics_v3() {
652        scrape_metrics(ecs_url("v3"), Version::V3).await;
653    }
654
655    #[tokio::test]
656    async fn scrapes_metrics_v4() {
657        // mock uses same endpoint for v4 as v3
658        // https://github.com/awslabs/amazon-ecs-local-container-endpoints/blob/mainline/docs/features.md#task-metadata-v4
659        scrape_metrics(ecs_url("v3"), Version::V4).await;
660    }
661}