vector/sources/aws_ecs_metrics/
mod.rs

1use std::{env, time::Duration};
2
3use futures::StreamExt;
4use hyper::{Body, Request};
5use serde_with::serde_as;
6use tokio::time;
7use tokio_stream::wrappers::IntervalStream;
8use vector_lib::configurable::configurable_component;
9use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
10use vector_lib::{config::LogNamespace, EstimatedJsonEncodedSizeOf};
11
12use crate::{
13    config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
14    http::HttpClient,
15    internal_events::{
16        AwsEcsMetricsEventsReceived, AwsEcsMetricsParseError, HttpClientHttpError,
17        HttpClientHttpResponseError, StreamClosedError,
18    },
19    shutdown::ShutdownSignal,
20    SourceSender,
21};
22
23mod parser;
24
25/// Version of the AWS ECS task metadata endpoint to use.
26///
27/// More information about the different versions can be found
28/// [here][meta_endpoint].
29///
30/// [meta_endpoint]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html
31#[configurable_component]
32#[derive(Clone, Debug)]
33#[serde(rename_all = "lowercase")]
34pub enum Version {
35    /// Version 2.
36    ///
37    /// More information about version 2 of the task metadata endpoint can be found [here][endpoint_v2].
38    ///
39    /// [endpoint_v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
40    V2,
41    /// Version 3.
42    ///
43    /// More information about version 3 of the task metadata endpoint can be found [here][endpoint_v3].
44    ///
45    /// [endpoint_v3]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
46    V3,
47    /// Version 4.
48    ///
49    /// More information about version 4 of the task metadata endpoint can be found [here][endpoint_v4].
50    ///
51    /// [endpoint_v4]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html
52    V4,
53}
54
55/// Configuration for the `aws_ecs_metrics` source.
56#[serde_as]
57#[configurable_component(source(
58    "aws_ecs_metrics",
59    "Collect Docker container stats for tasks running in AWS ECS and AWS Fargate."
60))]
61#[derive(Clone, Debug)]
62#[serde(deny_unknown_fields)]
63pub struct AwsEcsMetricsSourceConfig {
64    /// Base URI of the task metadata endpoint.
65    ///
66    /// If empty, the URI is automatically discovered based on the latest version detected.
67    ///
68    /// By default:
69    /// - The version 4 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI_V4`.
70    /// - The version 3 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI`.
71    /// - The version 2 endpoint base URI is `169.254.170.2/v2/`.
72    #[serde(default = "default_endpoint")]
73    endpoint: String,
74
75    /// The version of the task metadata endpoint to use.
76    ///
77    /// If empty, the version is automatically discovered based on environment variables.
78    ///
79    /// By default:
80    /// - Version 4 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is defined.
81    /// - Version 3 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is not defined, but the
82    ///   environment variable `ECS_CONTAINER_METADATA_URI` _is_ defined.
83    /// - Version 2 is used if neither of the environment variables `ECS_CONTAINER_METADATA_URI_V4` or
84    ///   `ECS_CONTAINER_METADATA_URI` are defined.
85    #[serde(default = "default_version")]
86    version: Version,
87
88    /// The interval between scrapes, in seconds.
89    #[serde(default = "default_scrape_interval_secs")]
90    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
91    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
92    scrape_interval_secs: Duration,
93
94    /// The namespace of the metric.
95    ///
96    /// Disabled if empty.
97    #[serde(default = "default_namespace")]
98    namespace: String,
99}
100
101const METADATA_URI_V4: &str = "ECS_CONTAINER_METADATA_URI";
102const METADATA_URI_V3: &str = "ECS_CONTAINER_METADATA_URI_V4";
103
104pub fn default_endpoint() -> String {
105    env::var(METADATA_URI_V4)
106        .or_else(|_| env::var(METADATA_URI_V3))
107        .unwrap_or_else(|_| "http://169.254.170.2/v2".into())
108}
109
110pub fn default_version() -> Version {
111    if env::var(METADATA_URI_V4).is_ok() {
112        Version::V4
113    } else if env::var(METADATA_URI_V3).is_ok() {
114        Version::V3
115    } else {
116        Version::V2
117    }
118}
119
120pub const fn default_scrape_interval_secs() -> Duration {
121    Duration::from_secs(15)
122}
123
124pub fn default_namespace() -> String {
125    "awsecs".to_string()
126}
127
128impl AwsEcsMetricsSourceConfig {
129    fn stats_endpoint(&self) -> String {
130        match self.version {
131            Version::V2 => format!("{}/stats", self.endpoint),
132            _ => format!("{}/task/stats", self.endpoint),
133        }
134    }
135}
136
137impl GenerateConfig for AwsEcsMetricsSourceConfig {
138    fn generate_config() -> toml::Value {
139        toml::Value::try_from(Self {
140            endpoint: default_endpoint(),
141            version: default_version(),
142            scrape_interval_secs: default_scrape_interval_secs(),
143            namespace: default_namespace(),
144        })
145        .unwrap()
146    }
147}
148
149#[async_trait::async_trait]
150#[typetag::serde(name = "aws_ecs_metrics")]
151impl SourceConfig for AwsEcsMetricsSourceConfig {
152    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
153        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
154        let http_client = HttpClient::new(None, &cx.proxy)?;
155
156        Ok(Box::pin(aws_ecs_metrics(
157            http_client,
158            self.stats_endpoint(),
159            self.scrape_interval_secs,
160            namespace,
161            cx.out,
162            cx.shutdown,
163        )))
164    }
165
166    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
167        vec![SourceOutput::new_metrics()]
168    }
169
170    fn can_acknowledge(&self) -> bool {
171        false
172    }
173}
174
175async fn aws_ecs_metrics(
176    http_client: HttpClient,
177    url: String,
178    interval: Duration,
179    namespace: Option<String>,
180    mut out: SourceSender,
181    shutdown: ShutdownSignal,
182) -> Result<(), ()> {
183    let mut interval = IntervalStream::new(time::interval(interval)).take_until(shutdown);
184    let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
185    while interval.next().await.is_some() {
186        let request = Request::get(&url)
187            .body(Body::empty())
188            .expect("error creating request");
189        let uri = request.uri().clone();
190
191        match http_client.send(request).await {
192            Ok(response) if response.status() == hyper::StatusCode::OK => {
193                match hyper::body::to_bytes(response).await {
194                    Ok(body) => {
195                        bytes_received.emit(ByteSize(body.len()));
196
197                        match parser::parse(body.as_ref(), namespace.clone()) {
198                            Ok(metrics) => {
199                                let count = metrics.len();
200                                emit!(AwsEcsMetricsEventsReceived {
201                                    byte_size: metrics.estimated_json_encoded_size_of(),
202                                    count,
203                                    endpoint: uri.path(),
204                                });
205
206                                if (out.send_batch(metrics).await).is_err() {
207                                    emit!(StreamClosedError { count });
208                                    return Err(());
209                                }
210                            }
211                            Err(error) => {
212                                emit!(AwsEcsMetricsParseError {
213                                    error,
214                                    endpoint: &url,
215                                    body: String::from_utf8_lossy(&body),
216                                });
217                            }
218                        }
219                    }
220                    Err(error) => {
221                        emit!(HttpClientHttpError {
222                            error: crate::Error::from(error),
223                            url: url.to_owned(),
224                        });
225                    }
226                }
227            }
228            Ok(response) => {
229                emit!(HttpClientHttpResponseError {
230                    code: response.status(),
231                    url: url.to_owned(),
232                });
233            }
234            Err(error) => {
235                emit!(HttpClientHttpError {
236                    error: crate::Error::from(error),
237                    url: url.to_owned(),
238                });
239            }
240        }
241    }
242
243    Ok(())
244}
245
246#[cfg(test)]
247mod test {
248    use hyper::{
249        service::{make_service_fn, service_fn},
250        Body, Response, Server,
251    };
252    use tokio::time::Duration;
253
254    use super::*;
255    use crate::{
256        event::MetricValue,
257        test_util::{
258            components::{run_and_assert_source_compliance, SOURCE_TAGS},
259            next_addr, wait_for_tcp,
260        },
261        Error,
262    };
263
264    #[tokio::test]
265    async fn test_aws_ecs_metrics_source() {
266        let in_addr = next_addr();
267
268        let make_svc = make_service_fn(|_| async {
269            Ok::<_, Error>(service_fn(|_| async {
270                Ok::<_, Error>(Response::new(Body::from(
271                    r#"
272                    {
273                        "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590": {
274                            "read": "2020-09-23T20:32:26.292561674Z",
275                            "preread": "2020-09-23T20:32:21.290708273Z",
276                            "pids_stats": {},
277                            "blkio_stats": {
278                                "io_service_bytes_recursive": [],
279                                "io_serviced_recursive": [],
280                                "io_queue_recursive": [],
281                                "io_service_time_recursive": [],
282                                "io_wait_time_recursive": [],
283                                "io_merged_recursive": [],
284                                "io_time_recursive": [],
285                                "sectors_recursive": []
286                            },
287                            "num_procs": 0,
288                            "storage_stats": {},
289                            "cpu_stats": {
290                                "cpu_usage": {
291                                    "total_usage": 863993897,
292                                    "percpu_usage": [
293                                        607511353,
294                                        256482544,
295                                        0,
296                                        0,
297                                        0,
298                                        0,
299                                        0,
300                                        0,
301                                        0,
302                                        0,
303                                        0,
304                                        0,
305                                        0,
306                                        0,
307                                        0
308                                    ],
309                                    "usage_in_kernelmode": 80000000,
310                                    "usage_in_usermode": 610000000
311                                },
312                                "system_cpu_usage": 2007100000000,
313                                "online_cpus": 2,
314                                "throttling_data": {
315                                    "periods": 0,
316                                    "throttled_periods": 0,
317                                    "throttled_time": 0
318                                }
319                            },
320                            "precpu_stats": {
321                                "cpu_usage": {
322                                    "total_usage": 0,
323                                    "usage_in_kernelmode": 0,
324                                    "usage_in_usermode": 0
325                                },
326                                "throttling_data": {
327                                    "periods": 0,
328                                    "throttled_periods": 0,
329                                    "throttled_time": 0
330                                }
331                            },
332                            "memory_stats": {
333                                "usage": 39931904,
334                                "max_usage": 40054784,
335                                "stats": {
336                                    "active_anon": 37457920,
337                                    "active_file": 4096,
338                                    "cache": 4096,
339                                    "dirty": 0,
340                                    "hierarchical_memory_limit": 536870912,
341                                    "hierarchical_memsw_limit": 9223372036854771712,
342                                    "inactive_anon": 0,
343                                    "inactive_file": 0,
344                                    "mapped_file": 0,
345                                    "pgfault": 15745,
346                                    "pgmajfault": 0,
347                                    "pgpgin": 12086,
348                                    "pgpgout": 2940,
349                                    "rss": 37457920,
350                                    "rss_huge": 0,
351                                    "total_active_anon": 37457920,
352                                    "total_active_file": 4096,
353                                    "total_cache": 4096,
354                                    "total_dirty": 0,
355                                    "total_inactive_anon": 0,
356                                    "total_inactive_file": 0,
357                                    "total_mapped_file": 0,
358                                    "total_pgfault": 15745,
359                                    "total_pgmajfault": 0,
360                                    "total_pgpgin": 12086,
361                                    "total_pgpgout": 2940,
362                                    "total_rss": 37457920,
363                                    "total_rss_huge": 0,
364                                    "total_unevictable": 0,
365                                    "total_writeback": 0,
366                                    "unevictable": 0,
367                                    "writeback": 0
368                                },
369                                "limit": 9223372036854771712
370                            },
371                            "name": "vector1",
372                            "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590",
373                            "networks": {
374                                "eth1": {
375                                    "rx_bytes": 329932716,
376                                    "rx_packets": 224158,
377                                    "rx_errors": 0,
378                                    "rx_dropped": 0,
379                                    "tx_bytes": 2001229,
380                                    "tx_packets": 29201,
381                                    "tx_errors": 0,
382                                    "tx_dropped": 0
383                                }
384                            }
385                        },
386                        "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352": {
387                            "read": "2020-09-23T20:32:26.314100759Z",
388                            "preread": "2020-09-23T20:32:21.315056862Z",
389                            "pids_stats": {},
390                            "blkio_stats": {
391                                "io_service_bytes_recursive": [
392                                    {
393                                        "major": 202,
394                                        "minor": 26368,
395                                        "op": "Read",
396                                        "value": 0
397                                    },
398                                    {
399                                        "major": 202,
400                                        "minor": 26368,
401                                        "op": "Write",
402                                        "value": 520192
403                                    },
404                                    {
405                                        "major": 202,
406                                        "minor": 26368,
407                                        "op": "Sync",
408                                        "value": 516096
409                                    },
410                                    {
411                                        "major": 202,
412                                        "minor": 26368,
413                                        "op": "Async",
414                                        "value": 4096
415                                    },
416                                    {
417                                        "major": 202,
418                                        "minor": 26368,
419                                        "op": "Total",
420                                        "value": 520192
421                                    }
422                                ],
423                                "io_serviced_recursive": [
424                                    {
425                                        "major": 202,
426                                        "minor": 26368,
427                                        "op": "Read",
428                                        "value": 0
429                                    },
430                                    {
431                                        "major": 202,
432                                        "minor": 26368,
433                                        "op": "Write",
434                                        "value": 10
435                                    },
436                                    {
437                                        "major": 202,
438                                        "minor": 26368,
439                                        "op": "Sync",
440                                        "value": 9
441                                    },
442                                    {
443                                        "major": 202,
444                                        "minor": 26368,
445                                        "op": "Async",
446                                        "value": 1
447                                    },
448                                    {
449                                        "major": 202,
450                                        "minor": 26368,
451                                        "op": "Total",
452                                        "value": 10
453                                    }
454                                ],
455                                "io_queue_recursive": [],
456                                "io_service_time_recursive": [],
457                                "io_wait_time_recursive": [],
458                                "io_merged_recursive": [],
459                                "io_time_recursive": [],
460                                "sectors_recursive": []
461                            },
462                            "num_procs": 0,
463                            "storage_stats": {},
464                            "cpu_stats": {
465                                "cpu_usage": {
466                                    "total_usage": 2324920942,
467                                    "percpu_usage": [
468                                        1095931487,
469                                        1228989455,
470                                        0,
471                                        0,
472                                        0,
473                                        0,
474                                        0,
475                                        0,
476                                        0,
477                                        0,
478                                        0,
479                                        0,
480                                        0,
481                                        0,
482                                        0
483                                    ],
484                                    "usage_in_kernelmode": 190000000,
485                                    "usage_in_usermode": 510000000
486                                },
487                                "system_cpu_usage": 2007130000000,
488                                "online_cpus": 2,
489                                "throttling_data": {
490                                    "periods": 0,
491                                    "throttled_periods": 0,
492                                    "throttled_time": 0
493                                }
494                            },
495                            "precpu_stats": {
496                                "cpu_usage": {
497                                    "total_usage": 0,
498                                    "usage_in_kernelmode": 0,
499                                    "usage_in_usermode": 0
500                                },
501                                "throttling_data": {
502                                    "periods": 0,
503                                    "throttled_periods": 0,
504                                    "throttled_time": 0
505                                }
506                            },
507                            "memory_stats": {
508                                "usage": 40120320,
509                                "max_usage": 47177728,
510                                "stats": {
511                                    "active_anon": 34885632,
512                                    "active_file": 65536,
513                                    "cache": 413696,
514                                    "dirty": 0,
515                                    "hierarchical_memory_limit": 536870912,
516                                    "hierarchical_memsw_limit": 9223372036854771712,
517                                    "inactive_anon": 4096,
518                                    "inactive_file": 344064,
519                                    "mapped_file": 4096,
520                                    "pgfault": 31131,
521                                    "pgmajfault": 0,
522                                    "pgpgin": 22360,
523                                    "pgpgout": 13742,
524                                    "rss": 34885632,
525                                    "rss_huge": 0,
526                                    "total_active_anon": 34885632,
527                                    "total_active_file": 65536,
528                                    "total_cache": 413696,
529                                    "total_dirty": 0,
530                                    "total_inactive_anon": 4096,
531                                    "total_inactive_file": 344064,
532                                    "total_mapped_file": 4096,
533                                    "total_pgfault": 31131,
534                                    "total_pgmajfault": 0,
535                                    "total_pgpgin": 22360,
536                                    "total_pgpgout": 13742,
537                                    "total_rss": 34885632,
538                                    "total_rss_huge": 0,
539                                    "total_unevictable": 0,
540                                    "total_writeback": 0,
541                                    "unevictable": 0,
542                                    "writeback": 0
543                                },
544                                "limit": 9223372036854771712
545                            },
546                            "name": "vector2",
547                            "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352",
548                            "networks": {
549                                "eth1": {
550                                    "rx_bytes": 329932716,
551                                    "rx_packets": 224158,
552                                    "rx_errors": 0,
553                                    "rx_dropped": 0,
554                                    "tx_bytes": 2001229,
555                                    "tx_packets": 29201,
556                                    "tx_errors": 0,
557                                    "tx_dropped": 0
558                                }
559                            }
560                        }
561                    }
562                    "#,
563                )))
564            }))
565        });
566
567        tokio::spawn(async move {
568            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
569                error!(message = "Server error.", %error);
570            }
571        });
572        wait_for_tcp(in_addr).await;
573
574        let config = AwsEcsMetricsSourceConfig {
575            endpoint: format!("http://{in_addr}"),
576            version: Version::V4,
577            scrape_interval_secs: Duration::from_secs(1),
578            namespace: default_namespace(),
579        };
580
581        let events =
582            run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
583        assert!(!events.is_empty());
584
585        let metrics = events
586            .into_iter()
587            .map(|e| e.into_metric())
588            .collect::<Vec<_>>();
589
590        match metrics
591            .iter()
592            .find(|m| m.name() == "network_receive_bytes_total")
593        {
594            Some(m) => {
595                assert_eq!(m.value(), &MetricValue::Counter { value: 329932716.0 });
596                assert_eq!(m.namespace(), Some("awsecs"));
597
598                match m.tags() {
599                    Some(tags) => assert_eq!(tags.get("device"), Some("eth1")),
600                    None => panic!("No tags for metric. {m:?}"),
601                }
602            }
603            None => panic!("Could not find 'network_receive_bytes_total' in {metrics:?}."),
604        }
605    }
606}
607
608#[cfg(feature = "aws-ecs-metrics-integration-tests")]
609#[cfg(test)]
610mod integration_tests {
611    use tokio::time::Duration;
612
613    use super::*;
614    use crate::test_util::components::{run_and_assert_source_compliance, SOURCE_TAGS};
615
616    fn ecs_address() -> String {
617        env::var("ECS_ADDRESS").unwrap_or_else(|_| "http://localhost:9088".into())
618    }
619
620    fn ecs_url(version: &str) -> String {
621        format!("{}/{}", ecs_address(), version)
622    }
623
624    async fn scrape_metrics(endpoint: String, version: Version) {
625        let config = AwsEcsMetricsSourceConfig {
626            endpoint,
627            version,
628            scrape_interval_secs: Duration::from_secs(1),
629            namespace: default_namespace(),
630        };
631
632        let events =
633            run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
634        assert!(!events.is_empty());
635    }
636
637    #[tokio::test]
638    async fn scrapes_metrics_v2() {
639        scrape_metrics(ecs_url("v2"), Version::V2).await;
640    }
641
642    #[tokio::test]
643    async fn scrapes_metrics_v3() {
644        scrape_metrics(ecs_url("v3"), Version::V3).await;
645    }
646
647    #[tokio::test]
648    async fn scrapes_metrics_v4() {
649        // mock uses same endpoint for v4 as v3
650        // https://github.com/awslabs/amazon-ecs-local-container-endpoints/blob/mainline/docs/features.md#task-metadata-v4
651        scrape_metrics(ecs_url("v3"), Version::V4).await;
652    }
653}