vector/transforms/
aws_ec2_metadata.rs

1use std::{
2    collections::HashSet,
3    error, fmt,
4    future::ready,
5    pin::Pin,
6    sync::{Arc, LazyLock},
7};
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use futures::{Stream, StreamExt};
12use http::{Request, StatusCode, Uri, uri::PathAndQuery};
13use hyper::{Body, body::to_bytes as body_to_bytes};
14use serde::Deserialize;
15use serde_with::serde_as;
16use snafu::ResultExt as _;
17use tokio::time::{Duration, Instant, sleep};
18use tracing::Instrument;
19use vector_lib::{
20    config::LogNamespace,
21    configurable::configurable_component,
22    lookup::{
23        OwnedTargetPath,
24        lookup_v2::{OptionalTargetPath, OwnedSegment},
25        owned_value_path,
26    },
27};
28use vrl::value::{Kind, kind::Collection};
29
30use crate::{
31    config::{
32        DataType, Input, OutputId, ProxyConfig, TransformConfig, TransformContext, TransformOutput,
33    },
34    event::Event,
35    http::HttpClient,
36    internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful},
37    schema,
38    transforms::{TaskTransform, Transform},
39};
40
41const ACCOUNT_ID_KEY: &str = "account-id";
42const AMI_ID_KEY: &str = "ami-id";
43const AVAILABILITY_ZONE_KEY: &str = "availability-zone";
44const INSTANCE_ID_KEY: &str = "instance-id";
45const INSTANCE_TYPE_KEY: &str = "instance-type";
46const LOCAL_HOSTNAME_KEY: &str = "local-hostname";
47const LOCAL_IPV4_KEY: &str = "local-ipv4";
48const PUBLIC_HOSTNAME_KEY: &str = "public-hostname";
49const PUBLIC_IPV4_KEY: &str = "public-ipv4";
50const REGION_KEY: &str = "region";
51const SUBNET_ID_KEY: &str = "subnet-id";
52const VPC_ID_KEY: &str = "vpc-id";
53const ROLE_NAME_KEY: &str = "role-name";
54const TAGS_KEY: &str = "tags";
55
56static AVAILABILITY_ZONE: LazyLock<PathAndQuery> =
57    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/placement/availability-zone"));
58static LOCAL_HOSTNAME: LazyLock<PathAndQuery> =
59    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-hostname"));
60static LOCAL_IPV4: LazyLock<PathAndQuery> =
61    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-ipv4"));
62static PUBLIC_HOSTNAME: LazyLock<PathAndQuery> =
63    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-hostname"));
64static PUBLIC_IPV4: LazyLock<PathAndQuery> =
65    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-ipv4"));
66static ROLE_NAME: LazyLock<PathAndQuery> =
67    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/iam/security-credentials/"));
68static MAC: LazyLock<PathAndQuery> =
69    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/mac"));
70static DYNAMIC_DOCUMENT: LazyLock<PathAndQuery> =
71    LazyLock::new(|| PathAndQuery::from_static("/latest/dynamic/instance-identity/document"));
72static DEFAULT_FIELD_ALLOWLIST: &[&str] = &[
73    AMI_ID_KEY,
74    AVAILABILITY_ZONE_KEY,
75    INSTANCE_ID_KEY,
76    INSTANCE_TYPE_KEY,
77    LOCAL_HOSTNAME_KEY,
78    LOCAL_IPV4_KEY,
79    PUBLIC_HOSTNAME_KEY,
80    PUBLIC_IPV4_KEY,
81    REGION_KEY,
82    SUBNET_ID_KEY,
83    VPC_ID_KEY,
84    ROLE_NAME_KEY,
85];
86static API_TOKEN: LazyLock<PathAndQuery> =
87    LazyLock::new(|| PathAndQuery::from_static("/latest/api/token"));
88static TOKEN_HEADER: LazyLock<Bytes> = LazyLock::new(|| Bytes::from("X-aws-ec2-metadata-token"));
89
90/// Configuration for the `aws_ec2_metadata` transform.
91#[serde_as]
92#[configurable_component(transform(
93    "aws_ec2_metadata",
94    "Parse metadata emitted by AWS EC2 instances."
95))]
96#[derive(Clone, Debug, Derivative)]
97#[derivative(Default)]
98pub struct Ec2Metadata {
99    /// Overrides the default EC2 metadata endpoint.
100    #[serde(alias = "host", default = "default_endpoint")]
101    #[derivative(Default(value = "default_endpoint()"))]
102    endpoint: String,
103
104    /// Sets a prefix for all event fields added by the transform.
105    #[configurable(metadata(
106        docs::examples = "",
107        docs::examples = "ec2",
108        docs::examples = "aws.ec2",
109    ))]
110    namespace: Option<OptionalTargetPath>,
111
112    /// The interval between querying for updated metadata, in seconds.
113    #[serde(default = "default_refresh_interval_secs")]
114    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
115    #[derivative(Default(value = "default_refresh_interval_secs()"))]
116    refresh_interval_secs: Duration,
117
118    /// A list of metadata fields to include in each transformed event.
119    #[serde(default = "default_fields")]
120    #[derivative(Default(value = "default_fields()"))]
121    #[configurable(metadata(docs::examples = "instance-id", docs::examples = "local-hostname",))]
122    fields: Vec<String>,
123
124    /// A list of instance tags to include in each transformed event.
125    #[serde(default = "default_tags")]
126    #[derivative(Default(value = "default_tags()"))]
127    #[configurable(metadata(docs::examples = "Name", docs::examples = "Project",))]
128    tags: Vec<String>,
129
130    /// The timeout for querying the EC2 metadata endpoint, in seconds.
131    #[serde(default = "default_refresh_timeout_secs")]
132    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
133    #[derivative(Default(value = "default_refresh_timeout_secs()"))]
134    refresh_timeout_secs: Duration,
135
136    #[configurable(derived)]
137    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
138    proxy: ProxyConfig,
139
140    /// Requires the transform to be able to successfully query the EC2 metadata before starting to process the data.
141    #[serde(default = "default_required")]
142    #[derivative(Default(value = "default_required()"))]
143    required: bool,
144}
145
146fn default_endpoint() -> String {
147    String::from("http://169.254.169.254")
148}
149
150const fn default_refresh_interval_secs() -> Duration {
151    Duration::from_secs(10)
152}
153
154const fn default_refresh_timeout_secs() -> Duration {
155    Duration::from_secs(1)
156}
157
158fn default_fields() -> Vec<String> {
159    DEFAULT_FIELD_ALLOWLIST
160        .iter()
161        .map(|s| s.to_string())
162        .collect()
163}
164
165const fn default_tags() -> Vec<String> {
166    Vec::<String>::new()
167}
168
169const fn default_required() -> bool {
170    true
171}
172
173#[derive(Clone, Debug)]
174pub struct Ec2MetadataTransform {
175    state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
176}
177
178#[derive(Debug, Clone)]
179struct MetadataKey {
180    log_path: OwnedTargetPath,
181    metric_tag: String,
182}
183
184#[derive(Debug)]
185struct Keys {
186    account_id_key: MetadataKey,
187    ami_id_key: MetadataKey,
188    availability_zone_key: MetadataKey,
189    instance_id_key: MetadataKey,
190    instance_type_key: MetadataKey,
191    local_hostname_key: MetadataKey,
192    local_ipv4_key: MetadataKey,
193    public_hostname_key: MetadataKey,
194    public_ipv4_key: MetadataKey,
195    region_key: MetadataKey,
196    subnet_id_key: MetadataKey,
197    vpc_id_key: MetadataKey,
198    role_name_key: MetadataKey,
199    tags_key: MetadataKey,
200}
201
202impl_generate_config_from_default!(Ec2Metadata);
203
204#[async_trait::async_trait]
205#[typetag::serde(name = "aws_ec2_metadata")]
206impl TransformConfig for Ec2Metadata {
207    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
208        let state = Arc::new(ArcSwap::new(Arc::new(vec![])));
209
210        let keys = Keys::new(self.namespace.clone());
211        let host = Uri::from_maybe_shared(self.endpoint.clone()).unwrap();
212        let refresh_interval = self.refresh_interval_secs;
213        let fields = self.fields.clone();
214        let tags = self.tags.clone();
215        let refresh_timeout = self.refresh_timeout_secs;
216        let required = self.required;
217
218        let proxy = ProxyConfig::merge_with_env(&context.globals.proxy, &self.proxy);
219        let http_client = HttpClient::new(None, &proxy)?;
220
221        let mut client = MetadataClient::new(
222            http_client,
223            host,
224            keys,
225            Arc::clone(&state),
226            refresh_interval,
227            refresh_timeout,
228            fields,
229            tags,
230        );
231
232        // If initial metadata is not required, log and proceed. Otherwise return error.
233        if let Err(error) = client.refresh_metadata().await {
234            if required {
235                return Err(error);
236            } else {
237                emit!(AwsEc2MetadataRefreshError { error });
238            }
239        }
240
241        tokio::spawn(
242            async move {
243                client.run().await;
244            }
245            // TODO: Once #1338 is done we can fetch the current span
246            .instrument(info_span!("aws_ec2_metadata: worker").or_current()),
247        );
248
249        Ok(Transform::event_task(Ec2MetadataTransform { state }))
250    }
251
252    fn input(&self) -> Input {
253        Input::new(DataType::Metric | DataType::Log)
254    }
255
256    fn outputs(
257        &self,
258        _: vector_lib::enrichment::TableRegistry,
259        input_definitions: &[(OutputId, schema::Definition)],
260        _: LogNamespace,
261    ) -> Vec<TransformOutput> {
262        let added_keys = Keys::new(self.namespace.clone());
263
264        let paths = [
265            &added_keys.account_id_key.log_path,
266            &added_keys.ami_id_key.log_path,
267            &added_keys.availability_zone_key.log_path,
268            &added_keys.instance_id_key.log_path,
269            &added_keys.instance_type_key.log_path,
270            &added_keys.local_hostname_key.log_path,
271            &added_keys.local_ipv4_key.log_path,
272            &added_keys.public_hostname_key.log_path,
273            &added_keys.public_ipv4_key.log_path,
274            &added_keys.region_key.log_path,
275            &added_keys.subnet_id_key.log_path,
276            &added_keys.vpc_id_key.log_path,
277            &added_keys.role_name_key.log_path,
278            &added_keys.tags_key.log_path,
279        ];
280
281        let schema_definition = input_definitions
282            .iter()
283            .map(|(output, definition)| {
284                let mut schema_definition = definition.clone();
285
286                // If the event is not an object, it will be converted to an object in this transform
287                if !schema_definition.event_kind().contains_object() {
288                    *schema_definition.event_kind_mut() = Kind::object(Collection::empty());
289                }
290
291                for path in paths {
292                    schema_definition =
293                        schema_definition.with_field(path, Kind::bytes().or_undefined(), None);
294                }
295
296                (output.clone(), schema_definition)
297            })
298            .collect();
299
300        vec![TransformOutput::new(
301            DataType::Metric | DataType::Log,
302            schema_definition,
303        )]
304    }
305}
306
307impl TaskTransform<Event> for Ec2MetadataTransform {
308    fn transform(
309        self: Box<Self>,
310        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
311    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
312    where
313        Self: 'static,
314    {
315        let mut inner = self;
316        Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event)))))
317    }
318}
319
320impl Ec2MetadataTransform {
321    fn transform_one(&mut self, mut event: Event) -> Event {
322        let state = self.state.load();
323        match event {
324            Event::Log(ref mut log) => {
325                state.iter().for_each(|(k, v)| {
326                    log.insert(&k.log_path, v.clone());
327                });
328            }
329            Event::Metric(ref mut metric) => {
330                state.iter().for_each(|(k, v)| {
331                    metric
332                        .replace_tag(k.metric_tag.clone(), String::from_utf8_lossy(v).to_string());
333                });
334            }
335            Event::Trace(_) => panic!("Traces are not supported."),
336        }
337        event
338    }
339}
340
341struct MetadataClient {
342    client: HttpClient<Body>,
343    host: Uri,
344    token: Option<(Bytes, Instant)>,
345    keys: Keys,
346    state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
347    refresh_interval: Duration,
348    refresh_timeout: Duration,
349    fields: HashSet<String>,
350    tags: HashSet<String>,
351}
352
353#[derive(Debug, Deserialize)]
354#[serde(rename_all = "camelCase")]
355#[allow(dead_code)] // deserialize all fields
356struct IdentityDocument {
357    account_id: String,
358    architecture: String,
359    image_id: String,
360    instance_id: String,
361    instance_type: String,
362    private_ip: String,
363    region: String,
364    version: String,
365}
366
367impl MetadataClient {
368    #[allow(clippy::too_many_arguments)]
369    pub fn new(
370        client: HttpClient<Body>,
371        host: Uri,
372        keys: Keys,
373        state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
374        refresh_interval: Duration,
375        refresh_timeout: Duration,
376        fields: Vec<String>,
377        tags: Vec<String>,
378    ) -> Self {
379        Self {
380            client,
381            host,
382            token: None,
383            keys,
384            state,
385            refresh_interval,
386            refresh_timeout,
387            fields: fields.into_iter().collect(),
388            tags: tags.into_iter().collect(),
389        }
390    }
391
392    async fn run(&mut self) {
393        loop {
394            match self.refresh_metadata().await {
395                Ok(_) => {
396                    emit!(AwsEc2MetadataRefreshSuccessful);
397                }
398                Err(error) => {
399                    emit!(AwsEc2MetadataRefreshError { error });
400                }
401            }
402
403            sleep(self.refresh_interval).await;
404        }
405    }
406
407    pub async fn get_token(&mut self) -> Result<Bytes, crate::Error> {
408        if let Some((token, next_refresh)) = self.token.clone() {
409            // If the next refresh is greater (in the future) than
410            // the current time we can return the token since its still valid
411            // otherwise lets refresh it.
412            if next_refresh > Instant::now() {
413                return Ok(token);
414            }
415        }
416
417        let mut parts = self.host.clone().into_parts();
418        parts.path_and_query = Some(API_TOKEN.clone());
419        let uri = Uri::from_parts(parts)?;
420
421        let req = Request::put(uri)
422            .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
423            .body(Body::empty())?;
424
425        let res = tokio::time::timeout(self.refresh_timeout, self.client.send(req))
426            .await?
427            .map_err(crate::Error::from)
428            .and_then(|res| match res.status() {
429                StatusCode::OK => Ok(res),
430                status_code => Err(UnexpectedHttpStatusError {
431                    status: status_code,
432                }
433                .into()),
434            })?;
435
436        let token = body_to_bytes(res.into_body()).await?;
437
438        let next_refresh = Instant::now() + Duration::from_secs(21600);
439        self.token = Some((token.clone(), next_refresh));
440
441        Ok(token)
442    }
443
444    pub async fn get_document(&mut self) -> Result<Option<IdentityDocument>, crate::Error> {
445        self.get_metadata(&DYNAMIC_DOCUMENT)
446            .await?
447            .map(|body| {
448                serde_json::from_slice(&body[..])
449                    .context(ParseIdentityDocumentSnafu {})
450                    .map_err(Into::into)
451            })
452            .transpose()
453    }
454
455    pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
456        let mut new_state = vec![];
457
458        // Fetch all resources, _then_ add them to the state map.
459        if let Some(document) = self.get_document().await? {
460            if self.fields.contains(ACCOUNT_ID_KEY) {
461                new_state.push((self.keys.account_id_key.clone(), document.account_id.into()));
462            }
463
464            if self.fields.contains(AMI_ID_KEY) {
465                new_state.push((self.keys.ami_id_key.clone(), document.image_id.into()));
466            }
467
468            if self.fields.contains(INSTANCE_ID_KEY) {
469                new_state.push((
470                    self.keys.instance_id_key.clone(),
471                    document.instance_id.into(),
472                ));
473            }
474
475            if self.fields.contains(INSTANCE_TYPE_KEY) {
476                new_state.push((
477                    self.keys.instance_type_key.clone(),
478                    document.instance_type.into(),
479                ));
480            }
481
482            if self.fields.contains(REGION_KEY) {
483                new_state.push((self.keys.region_key.clone(), document.region.into()));
484            }
485
486            if self.fields.contains(AVAILABILITY_ZONE_KEY)
487                && let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await?
488            {
489                new_state.push((self.keys.availability_zone_key.clone(), availability_zone));
490            }
491
492            if self.fields.contains(LOCAL_HOSTNAME_KEY)
493                && let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await?
494            {
495                new_state.push((self.keys.local_hostname_key.clone(), local_hostname));
496            }
497
498            if self.fields.contains(LOCAL_IPV4_KEY)
499                && let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await?
500            {
501                new_state.push((self.keys.local_ipv4_key.clone(), local_ipv4));
502            }
503
504            if self.fields.contains(PUBLIC_HOSTNAME_KEY)
505                && let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await?
506            {
507                new_state.push((self.keys.public_hostname_key.clone(), public_hostname));
508            }
509
510            if self.fields.contains(PUBLIC_IPV4_KEY)
511                && let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await?
512            {
513                new_state.push((self.keys.public_ipv4_key.clone(), public_ipv4));
514            }
515
516            if (self.fields.contains(SUBNET_ID_KEY) || self.fields.contains(VPC_ID_KEY))
517                && let Some(mac) = self.get_metadata(&MAC).await?
518            {
519                let mac = String::from_utf8_lossy(&mac[..]);
520
521                if self.fields.contains(SUBNET_ID_KEY) {
522                    let subnet_path =
523                        format!("/latest/meta-data/network/interfaces/macs/{mac}/subnet-id");
524
525                    let subnet_path = subnet_path.parse().context(ParsePathSnafu {
526                        value: subnet_path.clone(),
527                    })?;
528
529                    if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
530                        new_state.push((self.keys.subnet_id_key.clone(), subnet_id));
531                    }
532                }
533
534                if self.fields.contains(VPC_ID_KEY) {
535                    let vpc_path =
536                        format!("/latest/meta-data/network/interfaces/macs/{mac}/vpc-id");
537
538                    let vpc_path = vpc_path.parse().context(ParsePathSnafu {
539                        value: vpc_path.clone(),
540                    })?;
541
542                    if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
543                        new_state.push((self.keys.vpc_id_key.clone(), vpc_id));
544                    }
545                }
546            }
547
548            if self.fields.contains(ROLE_NAME_KEY)
549                && let Some(role_names) = self.get_metadata(&ROLE_NAME).await?
550            {
551                let role_names = String::from_utf8_lossy(&role_names[..]);
552
553                for (i, role_name) in role_names.lines().enumerate() {
554                    new_state.push((
555                        MetadataKey {
556                            log_path: self
557                                .keys
558                                .role_name_key
559                                .log_path
560                                .with_index_appended(i as isize),
561                            metric_tag: format!("{}[{}]", self.keys.role_name_key.metric_tag, i),
562                        },
563                        role_name.to_string().into(),
564                    ));
565                }
566            }
567
568            for tag in self.tags.clone() {
569                let tag_path = format!("/latest/meta-data/tags/instance/{tag}");
570
571                let tag_path = tag_path.parse().context(ParsePathSnafu {
572                    value: tag_path.clone(),
573                })?;
574
575                if let Some(tag_content) = self.get_metadata(&tag_path).await? {
576                    new_state.push((
577                        MetadataKey {
578                            log_path: self.keys.tags_key.log_path.with_field_appended(&tag),
579                            metric_tag: format!("{}[{}]", self.keys.tags_key.metric_tag, &tag),
580                        },
581                        tag_content,
582                    ));
583                }
584            }
585
586            self.state.store(Arc::new(new_state));
587        }
588
589        Ok(())
590    }
591
592    async fn get_metadata(&mut self, path: &PathAndQuery) -> Result<Option<Bytes>, crate::Error> {
593        let token = self
594            .get_token()
595            .await
596            .with_context(|_| FetchTokenSnafu {})?;
597
598        let mut parts = self.host.clone().into_parts();
599
600        parts.path_and_query = Some(path.clone());
601
602        let uri = Uri::from_parts(parts)?;
603
604        debug!(message = "Sending metadata request.", %uri);
605
606        let req = Request::get(uri)
607            .header(TOKEN_HEADER.as_ref(), token.as_ref())
608            .body(Body::empty())?;
609
610        match tokio::time::timeout(self.refresh_timeout, self.client.send(req))
611            .await?
612            .map_err(crate::Error::from)
613            .and_then(|res| match res.status() {
614                StatusCode::OK => Ok(Some(res)),
615                StatusCode::NOT_FOUND => Ok(None),
616                status_code => Err(UnexpectedHttpStatusError {
617                    status: status_code,
618                }
619                .into()),
620            })? {
621            Some(res) => {
622                let body = body_to_bytes(res.into_body()).await?;
623                Ok(Some(body))
624            }
625            None => Ok(None),
626        }
627    }
628}
629
630// This creates a simplified string from the namespace. Since the namespace is technically
631// a target path, it can contain syntax that is undesirable for a metric tag (such as prefix, quotes, etc)
632// This is mainly used for backwards compatibility.
633// see: https://github.com/vectordotdev/vector/issues/14931
634fn create_metric_namespace(namespace: &OwnedTargetPath) -> String {
635    let mut output = String::new();
636    for segment in &namespace.path.segments {
637        if !output.is_empty() {
638            output += ".";
639        }
640        match segment {
641            OwnedSegment::Field(field) => {
642                output += field;
643            }
644            OwnedSegment::Index(i) => {
645                output += &i.to_string();
646            }
647        }
648    }
649    output
650}
651
652fn create_key(namespace: &Option<OwnedTargetPath>, key: &str) -> MetadataKey {
653    if let Some(namespace) = namespace {
654        MetadataKey {
655            log_path: namespace.with_field_appended(key),
656            metric_tag: format!("{}.{}", create_metric_namespace(namespace), key),
657        }
658    } else {
659        MetadataKey {
660            log_path: OwnedTargetPath::event(owned_value_path!(key)),
661            metric_tag: key.to_owned(),
662        }
663    }
664}
665
666impl Keys {
667    pub fn new(namespace: Option<OptionalTargetPath>) -> Self {
668        let namespace = namespace.and_then(|namespace| namespace.path);
669
670        Keys {
671            account_id_key: create_key(&namespace, ACCOUNT_ID_KEY),
672            ami_id_key: create_key(&namespace, AMI_ID_KEY),
673            availability_zone_key: create_key(&namespace, AVAILABILITY_ZONE_KEY),
674            instance_id_key: create_key(&namespace, INSTANCE_ID_KEY),
675            instance_type_key: create_key(&namespace, INSTANCE_TYPE_KEY),
676            local_hostname_key: create_key(&namespace, LOCAL_HOSTNAME_KEY),
677            local_ipv4_key: create_key(&namespace, LOCAL_IPV4_KEY),
678            public_hostname_key: create_key(&namespace, PUBLIC_HOSTNAME_KEY),
679            public_ipv4_key: create_key(&namespace, PUBLIC_IPV4_KEY),
680            region_key: create_key(&namespace, REGION_KEY),
681            subnet_id_key: create_key(&namespace, SUBNET_ID_KEY),
682            vpc_id_key: create_key(&namespace, VPC_ID_KEY),
683            role_name_key: create_key(&namespace, ROLE_NAME_KEY),
684            tags_key: create_key(&namespace, TAGS_KEY),
685        }
686    }
687}
688
689#[derive(Debug)]
690struct UnexpectedHttpStatusError {
691    status: http::StatusCode,
692}
693
694impl fmt::Display for UnexpectedHttpStatusError {
695    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
696        write!(f, "got unexpected status code: {}", self.status)
697    }
698}
699
700impl error::Error for UnexpectedHttpStatusError {}
701
702#[derive(Debug, snafu::Snafu)]
703enum Ec2MetadataError {
704    #[snafu(display("Unable to fetch metadata authentication token: {}.", source))]
705    FetchToken { source: crate::Error },
706    #[snafu(display("Unable to parse identity document: {}.", source))]
707    ParseIdentityDocument { source: serde_json::Error },
708    #[snafu(display("Unable to parse metadata path {}, {}.", value, source))]
709    ParsePath {
710        value: String,
711        source: http::uri::InvalidUri,
712    },
713}
714
715#[cfg(test)]
716mod test {
717    use vector_lib::{enrichment::TableRegistry, lookup::OwnedTargetPath};
718    use vrl::{owned_value_path, value::Kind};
719
720    use crate::{
721        config::{LogNamespace, OutputId, TransformConfig, schema::Definition},
722        transforms::aws_ec2_metadata::Ec2Metadata,
723    };
724
725    #[tokio::test]
726    async fn schema_def_with_string_input() {
727        let transform_config = Ec2Metadata {
728            namespace: Some(OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into()),
729            ..Default::default()
730        };
731
732        let input_definition =
733            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
734
735        let mut outputs = transform_config.outputs(
736            TableRegistry::default(),
737            &[(OutputId::dummy(), input_definition)],
738            LogNamespace::Vector,
739        );
740        assert_eq!(outputs.len(), 1);
741        let output = outputs.pop().unwrap();
742        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
743        assert!(actual_schema_def.event_kind().is_object());
744    }
745}
746
747#[cfg(feature = "aws-ec2-metadata-integration-tests")]
748#[cfg(test)]
749mod integration_tests {
750    use tokio::sync::mpsc;
751    use tokio_stream::wrappers::ReceiverStream;
752    use vector_lib::{
753        assert_event_data_eq,
754        lookup::{
755            PathPrefix, event_path,
756            lookup_v2::{OwnedSegment, OwnedValuePath},
757        },
758    };
759    use vrl::value::{ObjectMap, Value};
760    use warp::Filter;
761
762    use super::*;
763    use crate::{
764        event::{LogEvent, Metric, metric},
765        test_util::{components::assert_transform_compliance, next_addr},
766        transforms::test::create_topology,
767    };
768
769    fn ec2_metadata_address() -> String {
770        std::env::var("EC2_METADATA_ADDRESS").unwrap_or_else(|_| "http://localhost:1338".into())
771    }
772
773    fn expected_log_fields() -> Vec<(OwnedValuePath, &'static str)> {
774        vec![
775            (
776                vec![OwnedSegment::field(AVAILABILITY_ZONE_KEY)].into(),
777                "us-east-1a",
778            ),
779            (
780                vec![OwnedSegment::field(PUBLIC_IPV4_KEY)].into(),
781                "192.0.2.54",
782            ),
783            (
784                vec![OwnedSegment::field(PUBLIC_HOSTNAME_KEY)].into(),
785                "ec2-192-0-2-54.compute-1.amazonaws.com",
786            ),
787            (
788                vec![OwnedSegment::field(LOCAL_IPV4_KEY)].into(),
789                "172.16.34.43",
790            ),
791            (
792                vec![OwnedSegment::field(LOCAL_HOSTNAME_KEY)].into(),
793                "ip-172-16-34-43.ec2.internal",
794            ),
795            (
796                vec![OwnedSegment::field(INSTANCE_ID_KEY)].into(),
797                "i-1234567890abcdef0",
798            ),
799            (
800                vec![OwnedSegment::field(ACCOUNT_ID_KEY)].into(),
801                "0123456789",
802            ),
803            (
804                vec![OwnedSegment::field(AMI_ID_KEY)].into(),
805                "ami-0b69ea66ff7391e80",
806            ),
807            (
808                vec![OwnedSegment::field(INSTANCE_TYPE_KEY)].into(),
809                "m4.xlarge",
810            ),
811            (vec![OwnedSegment::field(REGION_KEY)].into(), "us-east-1"),
812            (vec![OwnedSegment::field(VPC_ID_KEY)].into(), "vpc-d295a6a7"),
813            (
814                vec![OwnedSegment::field(SUBNET_ID_KEY)].into(),
815                "subnet-0ac62554",
816            ),
817            (owned_value_path!("role-name", 0), "baskinc-role"),
818            (owned_value_path!("tags", "Name"), "test-instance"),
819            (owned_value_path!("tags", "Test"), "test-tag"),
820        ]
821    }
822
823    fn expected_metric_fields() -> Vec<(&'static str, &'static str)> {
824        vec![
825            (AVAILABILITY_ZONE_KEY, "us-east-1a"),
826            (PUBLIC_IPV4_KEY, "192.0.2.54"),
827            (
828                PUBLIC_HOSTNAME_KEY,
829                "ec2-192-0-2-54.compute-1.amazonaws.com",
830            ),
831            (LOCAL_IPV4_KEY, "172.16.34.43"),
832            (LOCAL_HOSTNAME_KEY, "ip-172-16-34-43.ec2.internal"),
833            (INSTANCE_ID_KEY, "i-1234567890abcdef0"),
834            (ACCOUNT_ID_KEY, "0123456789"),
835            (AMI_ID_KEY, "ami-0b69ea66ff7391e80"),
836            (INSTANCE_TYPE_KEY, "m4.xlarge"),
837            (REGION_KEY, "us-east-1"),
838            (VPC_ID_KEY, "vpc-d295a6a7"),
839            (SUBNET_ID_KEY, "subnet-0ac62554"),
840            ("role-name[0]", "baskinc-role"),
841            ("tags[Name]", "test-instance"),
842            ("tags[Test]", "test-tag"),
843        ]
844    }
845
846    fn make_metric() -> Metric {
847        Metric::new(
848            "event",
849            metric::MetricKind::Incremental,
850            metric::MetricValue::Counter { value: 1.0 },
851        )
852    }
853
854    #[test]
855    fn generate_config() {
856        crate::test_util::test_generate_config::<Ec2Metadata>();
857    }
858
859    #[tokio::test]
860    async fn enrich_log() {
861        assert_transform_compliance(async {
862            let mut fields = default_fields();
863            fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
864
865            let tags = vec![
866                String::from("Name"),
867                String::from("Test"),
868                String::from("MISSING_TAG"),
869            ];
870
871            let transform_config = Ec2Metadata {
872                endpoint: ec2_metadata_address(),
873                fields,
874                tags,
875                ..Default::default()
876            };
877
878            let (tx, rx) = mpsc::channel(1);
879            let (topology, mut out) =
880                create_topology(ReceiverStream::new(rx), transform_config).await;
881
882            // We need to sleep to let the background task fetch the data.
883            sleep(Duration::from_secs(1)).await;
884
885            let log = LogEvent::default();
886            let mut expected_log = log.clone();
887            for (k, v) in expected_log_fields().iter().cloned() {
888                expected_log.insert((PathPrefix::Event, &k), v);
889            }
890
891            tx.send(log.into()).await.unwrap();
892
893            let event = out.recv().await.unwrap();
894            assert_event_data_eq!(event.into_log(), expected_log);
895
896            drop(tx);
897            topology.stop().await;
898            assert_eq!(out.recv().await, None);
899        })
900        .await;
901    }
902
903    #[tokio::test(flavor = "multi_thread")]
904    async fn timeout() {
905        let addr = next_addr();
906
907        async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
908            tokio::time::sleep(Duration::from_secs(3)).await;
909            Ok("I waited 3 seconds!")
910        }
911
912        let slow = warp::any().and_then(sleepy);
913        let server = warp::serve(slow).bind(addr);
914        let _server = tokio::spawn(server);
915
916        let config = Ec2Metadata {
917            endpoint: format!("http://{addr}"),
918            refresh_timeout_secs: Duration::from_secs(1),
919            ..Default::default()
920        };
921
922        match config.build(&TransformContext::default()).await {
923            Ok(_) => panic!("expected timeout failure"),
924            // cannot create tokio::time::error::Elapsed to compare with since constructor is
925            // private
926            Err(err) => assert_eq!(
927                err.to_string(),
928                "Unable to fetch metadata authentication token: deadline has elapsed."
929            ),
930        }
931    }
932
933    // validates the configuration setting 'required'=false allows vector to run
934    #[tokio::test(flavor = "multi_thread")]
935    async fn not_required() {
936        let addr = next_addr();
937
938        async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
939            tokio::time::sleep(Duration::from_secs(3)).await;
940            Ok("I waited 3 seconds!")
941        }
942
943        let slow = warp::any().and_then(sleepy);
944        let server = warp::serve(slow).bind(addr);
945        let _server = tokio::spawn(server);
946
947        let config = Ec2Metadata {
948            endpoint: format!("http://{addr}"),
949            refresh_timeout_secs: Duration::from_secs(1),
950            required: false,
951            ..Default::default()
952        };
953
954        assert!(
955            config.build(&TransformContext::default()).await.is_ok(),
956            "expected no failure because 'required' config value set to false"
957        );
958    }
959
960    #[tokio::test]
961    async fn enrich_metric() {
962        assert_transform_compliance(async {
963            let mut fields = default_fields();
964            fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
965
966            let tags = vec![
967                String::from("Name"),
968                String::from("Test"),
969                String::from("MISSING_TAG"),
970            ];
971
972            let transform_config = Ec2Metadata {
973                endpoint: ec2_metadata_address(),
974                fields,
975                tags,
976                ..Default::default()
977            };
978
979            let (tx, rx) = mpsc::channel(1);
980            let (topology, mut out) =
981                create_topology(ReceiverStream::new(rx), transform_config).await;
982
983            // We need to sleep to let the background task fetch the data.
984            sleep(Duration::from_secs(1)).await;
985
986            let metric = make_metric();
987            let mut expected_metric = metric.clone();
988            for (k, v) in expected_metric_fields().iter() {
989                expected_metric.replace_tag(k.to_string(), v.to_string());
990            }
991
992            tx.send(metric.into()).await.unwrap();
993
994            let event = out.recv().await.unwrap();
995            assert_event_data_eq!(event.into_metric(), expected_metric);
996
997            drop(tx);
998            topology.stop().await;
999            assert_eq!(out.recv().await, None);
1000        })
1001        .await;
1002    }
1003
1004    #[tokio::test]
1005    async fn fields_log() {
1006        assert_transform_compliance(async {
1007            let transform_config = Ec2Metadata {
1008                endpoint: ec2_metadata_address(),
1009                fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1010                tags: vec![
1011                    String::from("Name"),
1012                    String::from("Test"),
1013                    String::from("MISSING_TAG"),
1014                ],
1015                ..Default::default()
1016            };
1017
1018            let (tx, rx) = mpsc::channel(1);
1019            let (topology, mut out) =
1020                create_topology(ReceiverStream::new(rx), transform_config).await;
1021
1022            // We need to sleep to let the background task fetch the data.
1023            sleep(Duration::from_secs(1)).await;
1024
1025            let log = LogEvent::default();
1026            let mut expected_log = log.clone();
1027            expected_log.insert(format!("\"{PUBLIC_IPV4_KEY}\"").as_str(), "192.0.2.54");
1028            expected_log.insert(format!("\"{REGION_KEY}\"").as_str(), "us-east-1");
1029            expected_log.insert(
1030                format!("\"{TAGS_KEY}\"").as_str(),
1031                ObjectMap::from([
1032                    ("Name".into(), Value::from("test-instance")),
1033                    ("Test".into(), Value::from("test-tag")),
1034                ]),
1035            );
1036
1037            tx.send(log.into()).await.unwrap();
1038
1039            let event = out.recv().await.unwrap();
1040            assert_event_data_eq!(event.into_log(), expected_log);
1041
1042            drop(tx);
1043            topology.stop().await;
1044            assert_eq!(out.recv().await, None);
1045        })
1046        .await;
1047    }
1048
1049    #[tokio::test]
1050    async fn fields_metric() {
1051        assert_transform_compliance(async {
1052            let transform_config = Ec2Metadata {
1053                endpoint: ec2_metadata_address(),
1054                fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1055                tags: vec![
1056                    String::from("Name"),
1057                    String::from("Test"),
1058                    String::from("MISSING_TAG"),
1059                ],
1060                ..Default::default()
1061            };
1062
1063            let (tx, rx) = mpsc::channel(1);
1064            let (topology, mut out) =
1065                create_topology(ReceiverStream::new(rx), transform_config).await;
1066
1067            // We need to sleep to let the background task fetch the data.
1068            sleep(Duration::from_secs(1)).await;
1069
1070            let metric = make_metric();
1071            let mut expected_metric = metric.clone();
1072            expected_metric.replace_tag(PUBLIC_IPV4_KEY.to_string(), "192.0.2.54".to_string());
1073            expected_metric.replace_tag(REGION_KEY.to_string(), "us-east-1".to_string());
1074            expected_metric.replace_tag(
1075                format!("{}[{}]", TAGS_KEY, "Name"),
1076                "test-instance".to_string(),
1077            );
1078            expected_metric
1079                .replace_tag(format!("{}[{}]", TAGS_KEY, "Test"), "test-tag".to_string());
1080
1081            tx.send(metric.into()).await.unwrap();
1082
1083            let event = out.recv().await.unwrap();
1084            assert_event_data_eq!(event.into_metric(), expected_metric);
1085
1086            drop(tx);
1087            topology.stop().await;
1088            assert_eq!(out.recv().await, None);
1089        })
1090        .await;
1091    }
1092
1093    #[tokio::test]
1094    async fn namespace_log() {
1095        {
1096            assert_transform_compliance(async {
1097                let transform_config = Ec2Metadata {
1098                    endpoint: ec2_metadata_address(),
1099                    namespace: Some(
1100                        OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1101                    ),
1102                    ..Default::default()
1103                };
1104
1105                let (tx, rx) = mpsc::channel(1);
1106                let (topology, mut out) =
1107                    create_topology(ReceiverStream::new(rx), transform_config).await;
1108
1109                // We need to sleep to let the background task fetch the data.
1110                sleep(Duration::from_secs(1)).await;
1111
1112                let log = LogEvent::default();
1113
1114                tx.send(log.into()).await.unwrap();
1115
1116                let event = out.recv().await.unwrap();
1117
1118                assert_eq!(
1119                    event.as_log().get("ec2.metadata.\"availability-zone\""),
1120                    Some(&"us-east-1a".into())
1121                );
1122
1123                drop(tx);
1124                topology.stop().await;
1125                assert_eq!(out.recv().await, None);
1126            })
1127            .await;
1128        }
1129
1130        {
1131            assert_transform_compliance(async {
1132                // Set an empty namespace to ensure we don't prepend one.
1133                let transform_config = Ec2Metadata {
1134                    endpoint: ec2_metadata_address(),
1135                    namespace: Some(OptionalTargetPath::none()),
1136                    ..Default::default()
1137                };
1138
1139                let (tx, rx) = mpsc::channel(1);
1140                let (topology, mut out) =
1141                    create_topology(ReceiverStream::new(rx), transform_config).await;
1142
1143                // We need to sleep to let the background task fetch the data.
1144                sleep(Duration::from_secs(1)).await;
1145
1146                let log = LogEvent::default();
1147
1148                tx.send(log.into()).await.unwrap();
1149
1150                let event = out.recv().await.unwrap();
1151                assert_eq!(
1152                    event.as_log().get(event_path!(AVAILABILITY_ZONE_KEY)),
1153                    Some(&"us-east-1a".into())
1154                );
1155
1156                drop(tx);
1157                topology.stop().await;
1158                assert_eq!(out.recv().await, None);
1159            })
1160            .await;
1161        }
1162    }
1163
1164    #[tokio::test]
1165    async fn namespace_metric() {
1166        {
1167            assert_transform_compliance(async {
1168                let transform_config = Ec2Metadata {
1169                    endpoint: ec2_metadata_address(),
1170                    namespace: Some(
1171                        OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1172                    ),
1173                    ..Default::default()
1174                };
1175
1176                let (tx, rx) = mpsc::channel(1);
1177                let (topology, mut out) =
1178                    create_topology(ReceiverStream::new(rx), transform_config).await;
1179
1180                // We need to sleep to let the background task fetch the data.
1181                sleep(Duration::from_secs(1)).await;
1182
1183                let metric = make_metric();
1184
1185                tx.send(metric.into()).await.unwrap();
1186
1187                let event = out.recv().await.unwrap();
1188                assert_eq!(
1189                    event
1190                        .as_metric()
1191                        .tag_value("ec2.metadata.availability-zone"),
1192                    Some("us-east-1a".to_string())
1193                );
1194
1195                drop(tx);
1196                topology.stop().await;
1197                assert_eq!(out.recv().await, None);
1198            })
1199            .await;
1200        }
1201
1202        {
1203            assert_transform_compliance(async {
1204                // Set an empty namespace to ensure we don't prepend one.
1205                let transform_config = Ec2Metadata {
1206                    endpoint: ec2_metadata_address(),
1207                    namespace: Some(OptionalTargetPath::none()),
1208                    ..Default::default()
1209                };
1210
1211                let (tx, rx) = mpsc::channel(1);
1212                let (topology, mut out) =
1213                    create_topology(ReceiverStream::new(rx), transform_config).await;
1214
1215                // We need to sleep to let the background task fetch the data.
1216                sleep(Duration::from_secs(1)).await;
1217
1218                let metric = make_metric();
1219
1220                tx.send(metric.into()).await.unwrap();
1221
1222                let event = out.recv().await.unwrap();
1223                assert_eq!(
1224                    event.as_metric().tag_value(AVAILABILITY_ZONE_KEY),
1225                    Some("us-east-1a".to_string())
1226                );
1227
1228                drop(tx);
1229                topology.stop().await;
1230                assert_eq!(out.recv().await, None);
1231            })
1232            .await;
1233        }
1234    }
1235}