vector/sources/aws_ecs_metrics/
mod.rs

1use std::{env, time::Duration};
2
3use futures::StreamExt;
4use hyper::{Body, Request};
5use serde_with::serde_as;
6use tokio::time;
7use tokio_stream::wrappers::IntervalStream;
8use vector_lib::{
9    EstimatedJsonEncodedSizeOf,
10    config::LogNamespace,
11    configurable::configurable_component,
12    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
13};
14
15use crate::{
16    SourceSender,
17    config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
18    http::HttpClient,
19    internal_events::{
20        AwsEcsMetricsEventsReceived, AwsEcsMetricsParseError, HttpClientHttpError,
21        HttpClientHttpResponseError, StreamClosedError,
22    },
23    shutdown::ShutdownSignal,
24};
25
26mod parser;
27
28/// Version of the AWS ECS task metadata endpoint to use.
29///
30/// More information about the different versions can be found
31/// [here][meta_endpoint].
32///
33/// [meta_endpoint]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint.html
34#[configurable_component]
35#[derive(Clone, Debug)]
36#[serde(rename_all = "lowercase")]
37pub enum Version {
38    /// Version 2.
39    ///
40    /// More information about version 2 of the task metadata endpoint can be found [here][endpoint_v2].
41    ///
42    /// [endpoint_v2]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v2.html
43    V2,
44    /// Version 3.
45    ///
46    /// More information about version 3 of the task metadata endpoint can be found [here][endpoint_v3].
47    ///
48    /// [endpoint_v3]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v3.html
49    V3,
50    /// Version 4.
51    ///
52    /// More information about version 4 of the task metadata endpoint can be found [here][endpoint_v4].
53    ///
54    /// [endpoint_v4]: https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-metadata-endpoint-v4.html
55    V4,
56}
57
58/// Configuration for the `aws_ecs_metrics` source.
59#[serde_as]
60#[configurable_component(source(
61    "aws_ecs_metrics",
62    "Collect Docker container stats for tasks running in AWS ECS and AWS Fargate."
63))]
64#[derive(Clone, Debug)]
65#[serde(deny_unknown_fields)]
66pub struct AwsEcsMetricsSourceConfig {
67    /// Base URI of the task metadata endpoint.
68    ///
69    /// If empty, the URI is automatically discovered based on the latest version detected.
70    ///
71    /// By default:
72    /// - The version 4 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI_V4`.
73    /// - The version 3 endpoint base URI is stored in the environment variable `ECS_CONTAINER_METADATA_URI`.
74    /// - The version 2 endpoint base URI is `169.254.170.2/v2/`.
75    #[serde(default = "default_endpoint")]
76    endpoint: String,
77
78    /// The version of the task metadata endpoint to use.
79    ///
80    /// If empty, the version is automatically discovered based on environment variables.
81    ///
82    /// By default:
83    /// - Version 4 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is defined.
84    /// - Version 3 is used if the environment variable `ECS_CONTAINER_METADATA_URI_V4` is not defined, but the
85    ///   environment variable `ECS_CONTAINER_METADATA_URI` _is_ defined.
86    /// - Version 2 is used if neither of the environment variables `ECS_CONTAINER_METADATA_URI_V4` or
87    ///   `ECS_CONTAINER_METADATA_URI` are defined.
88    #[serde(default = "default_version")]
89    version: Version,
90
91    /// The interval between scrapes, in seconds.
92    #[serde(default = "default_scrape_interval_secs")]
93    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
94    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
95    scrape_interval_secs: Duration,
96
97    /// The namespace of the metric.
98    ///
99    /// Disabled if empty.
100    #[serde(default = "default_namespace")]
101    namespace: String,
102}
103
104const METADATA_URI_V4: &str = "ECS_CONTAINER_METADATA_URI";
105const METADATA_URI_V3: &str = "ECS_CONTAINER_METADATA_URI_V4";
106
107pub fn default_endpoint() -> String {
108    env::var(METADATA_URI_V4)
109        .or_else(|_| env::var(METADATA_URI_V3))
110        .unwrap_or_else(|_| "http://169.254.170.2/v2".into())
111}
112
113pub fn default_version() -> Version {
114    if env::var(METADATA_URI_V4).is_ok() {
115        Version::V4
116    } else if env::var(METADATA_URI_V3).is_ok() {
117        Version::V3
118    } else {
119        Version::V2
120    }
121}
122
123pub const fn default_scrape_interval_secs() -> Duration {
124    Duration::from_secs(15)
125}
126
127pub fn default_namespace() -> String {
128    "awsecs".to_string()
129}
130
131impl AwsEcsMetricsSourceConfig {
132    fn stats_endpoint(&self) -> String {
133        match self.version {
134            Version::V2 => format!("{}/stats", self.endpoint),
135            _ => format!("{}/task/stats", self.endpoint),
136        }
137    }
138}
139
140impl GenerateConfig for AwsEcsMetricsSourceConfig {
141    fn generate_config() -> toml::Value {
142        toml::Value::try_from(Self {
143            endpoint: default_endpoint(),
144            version: default_version(),
145            scrape_interval_secs: default_scrape_interval_secs(),
146            namespace: default_namespace(),
147        })
148        .unwrap()
149    }
150}
151
152#[async_trait::async_trait]
153#[typetag::serde(name = "aws_ecs_metrics")]
154impl SourceConfig for AwsEcsMetricsSourceConfig {
155    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
156        let namespace = Some(self.namespace.clone()).filter(|namespace| !namespace.is_empty());
157        let http_client = HttpClient::new(None, &cx.proxy)?;
158
159        Ok(Box::pin(aws_ecs_metrics(
160            http_client,
161            self.stats_endpoint(),
162            self.scrape_interval_secs,
163            namespace,
164            cx.out,
165            cx.shutdown,
166        )))
167    }
168
169    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
170        vec![SourceOutput::new_metrics()]
171    }
172
173    fn can_acknowledge(&self) -> bool {
174        false
175    }
176}
177
178async fn aws_ecs_metrics(
179    http_client: HttpClient,
180    url: String,
181    interval: Duration,
182    namespace: Option<String>,
183    mut out: SourceSender,
184    shutdown: ShutdownSignal,
185) -> Result<(), ()> {
186    let mut interval = IntervalStream::new(time::interval(interval)).take_until(shutdown);
187    let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
188    while interval.next().await.is_some() {
189        let request = Request::get(&url)
190            .body(Body::empty())
191            .expect("error creating request");
192        let uri = request.uri().clone();
193
194        match http_client.send(request).await {
195            Ok(response) if response.status() == hyper::StatusCode::OK => {
196                match hyper::body::to_bytes(response).await {
197                    Ok(body) => {
198                        bytes_received.emit(ByteSize(body.len()));
199
200                        match parser::parse(body.as_ref(), namespace.clone()) {
201                            Ok(metrics) => {
202                                let count = metrics.len();
203                                emit!(AwsEcsMetricsEventsReceived {
204                                    byte_size: metrics.estimated_json_encoded_size_of(),
205                                    count,
206                                    endpoint: uri.path(),
207                                });
208
209                                if (out.send_batch(metrics).await).is_err() {
210                                    emit!(StreamClosedError { count });
211                                    return Err(());
212                                }
213                            }
214                            Err(error) => {
215                                emit!(AwsEcsMetricsParseError {
216                                    error,
217                                    endpoint: &url,
218                                    body: String::from_utf8_lossy(&body),
219                                });
220                            }
221                        }
222                    }
223                    Err(error) => {
224                        emit!(HttpClientHttpError {
225                            error: crate::Error::from(error),
226                            url: url.to_owned(),
227                        });
228                    }
229                }
230            }
231            Ok(response) => {
232                emit!(HttpClientHttpResponseError {
233                    code: response.status(),
234                    url: url.to_owned(),
235                });
236            }
237            Err(error) => {
238                emit!(HttpClientHttpError {
239                    error: crate::Error::from(error),
240                    url: url.to_owned(),
241                });
242            }
243        }
244    }
245
246    Ok(())
247}
248
249#[cfg(test)]
250mod test {
251    use hyper::{
252        Body, Response, Server,
253        service::{make_service_fn, service_fn},
254    };
255    use tokio::time::Duration;
256
257    use super::*;
258    use crate::{
259        Error,
260        event::MetricValue,
261        test_util::{
262            components::{SOURCE_TAGS, run_and_assert_source_compliance},
263            next_addr, wait_for_tcp,
264        },
265    };
266
267    #[tokio::test]
268    async fn test_aws_ecs_metrics_source() {
269        let in_addr = next_addr();
270
271        let make_svc = make_service_fn(|_| async {
272            Ok::<_, Error>(service_fn(|_| async {
273                Ok::<_, Error>(Response::new(Body::from(
274                    r#"
275                    {
276                        "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590": {
277                            "read": "2020-09-23T20:32:26.292561674Z",
278                            "preread": "2020-09-23T20:32:21.290708273Z",
279                            "pids_stats": {},
280                            "blkio_stats": {
281                                "io_service_bytes_recursive": [],
282                                "io_serviced_recursive": [],
283                                "io_queue_recursive": [],
284                                "io_service_time_recursive": [],
285                                "io_wait_time_recursive": [],
286                                "io_merged_recursive": [],
287                                "io_time_recursive": [],
288                                "sectors_recursive": []
289                            },
290                            "num_procs": 0,
291                            "storage_stats": {},
292                            "cpu_stats": {
293                                "cpu_usage": {
294                                    "total_usage": 863993897,
295                                    "percpu_usage": [
296                                        607511353,
297                                        256482544,
298                                        0,
299                                        0,
300                                        0,
301                                        0,
302                                        0,
303                                        0,
304                                        0,
305                                        0,
306                                        0,
307                                        0,
308                                        0,
309                                        0,
310                                        0
311                                    ],
312                                    "usage_in_kernelmode": 80000000,
313                                    "usage_in_usermode": 610000000
314                                },
315                                "system_cpu_usage": 2007100000000,
316                                "online_cpus": 2,
317                                "throttling_data": {
318                                    "periods": 0,
319                                    "throttled_periods": 0,
320                                    "throttled_time": 0
321                                }
322                            },
323                            "precpu_stats": {
324                                "cpu_usage": {
325                                    "total_usage": 0,
326                                    "usage_in_kernelmode": 0,
327                                    "usage_in_usermode": 0
328                                },
329                                "throttling_data": {
330                                    "periods": 0,
331                                    "throttled_periods": 0,
332                                    "throttled_time": 0
333                                }
334                            },
335                            "memory_stats": {
336                                "usage": 39931904,
337                                "max_usage": 40054784,
338                                "stats": {
339                                    "active_anon": 37457920,
340                                    "active_file": 4096,
341                                    "cache": 4096,
342                                    "dirty": 0,
343                                    "hierarchical_memory_limit": 536870912,
344                                    "hierarchical_memsw_limit": 9223372036854771712,
345                                    "inactive_anon": 0,
346                                    "inactive_file": 0,
347                                    "mapped_file": 0,
348                                    "pgfault": 15745,
349                                    "pgmajfault": 0,
350                                    "pgpgin": 12086,
351                                    "pgpgout": 2940,
352                                    "rss": 37457920,
353                                    "rss_huge": 0,
354                                    "total_active_anon": 37457920,
355                                    "total_active_file": 4096,
356                                    "total_cache": 4096,
357                                    "total_dirty": 0,
358                                    "total_inactive_anon": 0,
359                                    "total_inactive_file": 0,
360                                    "total_mapped_file": 0,
361                                    "total_pgfault": 15745,
362                                    "total_pgmajfault": 0,
363                                    "total_pgpgin": 12086,
364                                    "total_pgpgout": 2940,
365                                    "total_rss": 37457920,
366                                    "total_rss_huge": 0,
367                                    "total_unevictable": 0,
368                                    "total_writeback": 0,
369                                    "unevictable": 0,
370                                    "writeback": 0
371                                },
372                                "limit": 9223372036854771712
373                            },
374                            "name": "vector1",
375                            "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-3822082590",
376                            "networks": {
377                                "eth1": {
378                                    "rx_bytes": 329932716,
379                                    "rx_packets": 224158,
380                                    "rx_errors": 0,
381                                    "rx_dropped": 0,
382                                    "tx_bytes": 2001229,
383                                    "tx_packets": 29201,
384                                    "tx_errors": 0,
385                                    "tx_dropped": 0
386                                }
387                            }
388                        },
389                        "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352": {
390                            "read": "2020-09-23T20:32:26.314100759Z",
391                            "preread": "2020-09-23T20:32:21.315056862Z",
392                            "pids_stats": {},
393                            "blkio_stats": {
394                                "io_service_bytes_recursive": [
395                                    {
396                                        "major": 202,
397                                        "minor": 26368,
398                                        "op": "Read",
399                                        "value": 0
400                                    },
401                                    {
402                                        "major": 202,
403                                        "minor": 26368,
404                                        "op": "Write",
405                                        "value": 520192
406                                    },
407                                    {
408                                        "major": 202,
409                                        "minor": 26368,
410                                        "op": "Sync",
411                                        "value": 516096
412                                    },
413                                    {
414                                        "major": 202,
415                                        "minor": 26368,
416                                        "op": "Async",
417                                        "value": 4096
418                                    },
419                                    {
420                                        "major": 202,
421                                        "minor": 26368,
422                                        "op": "Total",
423                                        "value": 520192
424                                    }
425                                ],
426                                "io_serviced_recursive": [
427                                    {
428                                        "major": 202,
429                                        "minor": 26368,
430                                        "op": "Read",
431                                        "value": 0
432                                    },
433                                    {
434                                        "major": 202,
435                                        "minor": 26368,
436                                        "op": "Write",
437                                        "value": 10
438                                    },
439                                    {
440                                        "major": 202,
441                                        "minor": 26368,
442                                        "op": "Sync",
443                                        "value": 9
444                                    },
445                                    {
446                                        "major": 202,
447                                        "minor": 26368,
448                                        "op": "Async",
449                                        "value": 1
450                                    },
451                                    {
452                                        "major": 202,
453                                        "minor": 26368,
454                                        "op": "Total",
455                                        "value": 10
456                                    }
457                                ],
458                                "io_queue_recursive": [],
459                                "io_service_time_recursive": [],
460                                "io_wait_time_recursive": [],
461                                "io_merged_recursive": [],
462                                "io_time_recursive": [],
463                                "sectors_recursive": []
464                            },
465                            "num_procs": 0,
466                            "storage_stats": {},
467                            "cpu_stats": {
468                                "cpu_usage": {
469                                    "total_usage": 2324920942,
470                                    "percpu_usage": [
471                                        1095931487,
472                                        1228989455,
473                                        0,
474                                        0,
475                                        0,
476                                        0,
477                                        0,
478                                        0,
479                                        0,
480                                        0,
481                                        0,
482                                        0,
483                                        0,
484                                        0,
485                                        0
486                                    ],
487                                    "usage_in_kernelmode": 190000000,
488                                    "usage_in_usermode": 510000000
489                                },
490                                "system_cpu_usage": 2007130000000,
491                                "online_cpus": 2,
492                                "throttling_data": {
493                                    "periods": 0,
494                                    "throttled_periods": 0,
495                                    "throttled_time": 0
496                                }
497                            },
498                            "precpu_stats": {
499                                "cpu_usage": {
500                                    "total_usage": 0,
501                                    "usage_in_kernelmode": 0,
502                                    "usage_in_usermode": 0
503                                },
504                                "throttling_data": {
505                                    "periods": 0,
506                                    "throttled_periods": 0,
507                                    "throttled_time": 0
508                                }
509                            },
510                            "memory_stats": {
511                                "usage": 40120320,
512                                "max_usage": 47177728,
513                                "stats": {
514                                    "active_anon": 34885632,
515                                    "active_file": 65536,
516                                    "cache": 413696,
517                                    "dirty": 0,
518                                    "hierarchical_memory_limit": 536870912,
519                                    "hierarchical_memsw_limit": 9223372036854771712,
520                                    "inactive_anon": 4096,
521                                    "inactive_file": 344064,
522                                    "mapped_file": 4096,
523                                    "pgfault": 31131,
524                                    "pgmajfault": 0,
525                                    "pgpgin": 22360,
526                                    "pgpgout": 13742,
527                                    "rss": 34885632,
528                                    "rss_huge": 0,
529                                    "total_active_anon": 34885632,
530                                    "total_active_file": 65536,
531                                    "total_cache": 413696,
532                                    "total_dirty": 0,
533                                    "total_inactive_anon": 4096,
534                                    "total_inactive_file": 344064,
535                                    "total_mapped_file": 4096,
536                                    "total_pgfault": 31131,
537                                    "total_pgmajfault": 0,
538                                    "total_pgpgin": 22360,
539                                    "total_pgpgout": 13742,
540                                    "total_rss": 34885632,
541                                    "total_rss_huge": 0,
542                                    "total_unevictable": 0,
543                                    "total_writeback": 0,
544                                    "unevictable": 0,
545                                    "writeback": 0
546                                },
547                                "limit": 9223372036854771712
548                            },
549                            "name": "vector2",
550                            "id": "0cf54b87-f0f0-4044-b9d6-20dc54d5c414-4057181352",
551                            "networks": {
552                                "eth1": {
553                                    "rx_bytes": 329932716,
554                                    "rx_packets": 224158,
555                                    "rx_errors": 0,
556                                    "rx_dropped": 0,
557                                    "tx_bytes": 2001229,
558                                    "tx_packets": 29201,
559                                    "tx_errors": 0,
560                                    "tx_dropped": 0
561                                }
562                            }
563                        }
564                    }
565                    "#,
566                )))
567            }))
568        });
569
570        tokio::spawn(async move {
571            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
572                error!(message = "Server error.", %error);
573            }
574        });
575        wait_for_tcp(in_addr).await;
576
577        let config = AwsEcsMetricsSourceConfig {
578            endpoint: format!("http://{in_addr}"),
579            version: Version::V4,
580            scrape_interval_secs: Duration::from_secs(1),
581            namespace: default_namespace(),
582        };
583
584        let events =
585            run_and_assert_source_compliance(config, Duration::from_secs(1), &SOURCE_TAGS).await;
586        assert!(!events.is_empty());
587
588        let metrics = events
589            .into_iter()
590            .map(|e| e.into_metric())
591            .collect::<Vec<_>>();
592
593        match metrics
594            .iter()
595            .find(|m| m.name() == "network_receive_bytes_total")
596        {
597            Some(m) => {
598                assert_eq!(m.value(), &MetricValue::Counter { value: 329932716.0 });
599                assert_eq!(m.namespace(), Some("awsecs"));
600
601                match m.tags() {
602                    Some(tags) => assert_eq!(tags.get("device"), Some("eth1")),
603                    None => panic!("No tags for metric. {m:?}"),
604                }
605            }
606            None => panic!("Could not find 'network_receive_bytes_total' in {metrics:?}."),
607        }
608    }
609}
610
611#[cfg(feature = "aws-ecs-metrics-integration-tests")]
612#[cfg(test)]
613mod integration_tests {
614    use tokio::time::Duration;
615
616    use super::*;
617    use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
618
619    fn ecs_address() -> String {
620        env::var("ECS_ADDRESS").unwrap_or_else(|_| "http://localhost:9088".into())
621    }
622
623    fn ecs_url(version: &str) -> String {
624        format!("{}/{}", ecs_address(), version)
625    }
626
627    async fn scrape_metrics(endpoint: String, version: Version) {
628        let config = AwsEcsMetricsSourceConfig {
629            endpoint,
630            version,
631            scrape_interval_secs: Duration::from_secs(1),
632            namespace: default_namespace(),
633        };
634
635        let events =
636            run_and_assert_source_compliance(config, Duration::from_secs(5), &SOURCE_TAGS).await;
637        assert!(!events.is_empty());
638    }
639
640    #[tokio::test]
641    async fn scrapes_metrics_v2() {
642        scrape_metrics(ecs_url("v2"), Version::V2).await;
643    }
644
645    #[tokio::test]
646    async fn scrapes_metrics_v3() {
647        scrape_metrics(ecs_url("v3"), Version::V3).await;
648    }
649
650    #[tokio::test]
651    async fn scrapes_metrics_v4() {
652        // mock uses same endpoint for v4 as v3
653        // https://github.com/awslabs/amazon-ecs-local-container-endpoints/blob/mainline/docs/features.md#task-metadata-v4
654        scrape_metrics(ecs_url("v3"), Version::V4).await;
655    }
656}