vector/transforms/
aws_ec2_metadata.rs

1use std::{
2    collections::HashSet,
3    error, fmt,
4    future::ready,
5    pin::Pin,
6    sync::{Arc, LazyLock},
7};
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use futures::{Stream, StreamExt};
12use http::{Request, StatusCode, Uri, uri::PathAndQuery};
13use hyper::Body;
14use serde::Deserialize;
15use serde_with::serde_as;
16use snafu::ResultExt as _;
17use tokio::time::{Duration, Instant, sleep};
18use tracing::Instrument;
19use vector_lib::{
20    configurable::configurable_component,
21    lookup::{
22        OwnedTargetPath,
23        lookup_v2::{OptionalTargetPath, OwnedSegment},
24        owned_value_path,
25    },
26};
27use vrl::value::{Kind, kind::Collection};
28
29use crate::{
30    config::{
31        DataType, Input, OutputId, ProxyConfig, TransformConfig, TransformContext, TransformOutput,
32    },
33    event::Event,
34    http::HttpClient,
35    internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful},
36    schema,
37    transforms::{TaskTransform, Transform},
38};
39
40const ACCOUNT_ID_KEY: &str = "account-id";
41const AMI_ID_KEY: &str = "ami-id";
42const AVAILABILITY_ZONE_KEY: &str = "availability-zone";
43const INSTANCE_ID_KEY: &str = "instance-id";
44const INSTANCE_TYPE_KEY: &str = "instance-type";
45const LOCAL_HOSTNAME_KEY: &str = "local-hostname";
46const LOCAL_IPV4_KEY: &str = "local-ipv4";
47const PUBLIC_HOSTNAME_KEY: &str = "public-hostname";
48const PUBLIC_IPV4_KEY: &str = "public-ipv4";
49const REGION_KEY: &str = "region";
50const SUBNET_ID_KEY: &str = "subnet-id";
51const VPC_ID_KEY: &str = "vpc-id";
52const ROLE_NAME_KEY: &str = "role-name";
53const TAGS_KEY: &str = "tags";
54
55static AVAILABILITY_ZONE: LazyLock<PathAndQuery> =
56    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/placement/availability-zone"));
57static LOCAL_HOSTNAME: LazyLock<PathAndQuery> =
58    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-hostname"));
59static LOCAL_IPV4: LazyLock<PathAndQuery> =
60    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-ipv4"));
61static PUBLIC_HOSTNAME: LazyLock<PathAndQuery> =
62    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-hostname"));
63static PUBLIC_IPV4: LazyLock<PathAndQuery> =
64    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-ipv4"));
65static ROLE_NAME: LazyLock<PathAndQuery> =
66    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/iam/security-credentials/"));
67static MAC: LazyLock<PathAndQuery> =
68    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/mac"));
69static DYNAMIC_DOCUMENT: LazyLock<PathAndQuery> =
70    LazyLock::new(|| PathAndQuery::from_static("/latest/dynamic/instance-identity/document"));
71static DEFAULT_FIELD_ALLOWLIST: &[&str] = &[
72    AMI_ID_KEY,
73    AVAILABILITY_ZONE_KEY,
74    INSTANCE_ID_KEY,
75    INSTANCE_TYPE_KEY,
76    LOCAL_HOSTNAME_KEY,
77    LOCAL_IPV4_KEY,
78    PUBLIC_HOSTNAME_KEY,
79    PUBLIC_IPV4_KEY,
80    REGION_KEY,
81    SUBNET_ID_KEY,
82    VPC_ID_KEY,
83    ROLE_NAME_KEY,
84];
85static API_TOKEN: LazyLock<PathAndQuery> =
86    LazyLock::new(|| PathAndQuery::from_static("/latest/api/token"));
87static TOKEN_HEADER: LazyLock<Bytes> = LazyLock::new(|| Bytes::from("X-aws-ec2-metadata-token"));
88
89/// Configuration for the `aws_ec2_metadata` transform.
90#[serde_as]
91#[configurable_component(transform(
92    "aws_ec2_metadata",
93    "Parse metadata emitted by AWS EC2 instances."
94))]
95#[derive(Clone, Debug, Derivative)]
96#[derivative(Default)]
97pub struct Ec2Metadata {
98    /// Overrides the default EC2 metadata endpoint.
99    #[serde(alias = "host", default = "default_endpoint")]
100    #[derivative(Default(value = "default_endpoint()"))]
101    endpoint: String,
102
103    /// Sets a prefix for all event fields added by the transform.
104    #[configurable(metadata(
105        docs::examples = "",
106        docs::examples = "ec2",
107        docs::examples = "aws.ec2",
108    ))]
109    namespace: Option<OptionalTargetPath>,
110
111    /// The interval between querying for updated metadata, in seconds.
112    #[serde(default = "default_refresh_interval_secs")]
113    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
114    #[derivative(Default(value = "default_refresh_interval_secs()"))]
115    refresh_interval_secs: Duration,
116
117    /// A list of metadata fields to include in each transformed event.
118    #[serde(default = "default_fields")]
119    #[derivative(Default(value = "default_fields()"))]
120    #[configurable(metadata(docs::examples = "instance-id", docs::examples = "local-hostname",))]
121    fields: Vec<String>,
122
123    /// A list of instance tags to include in each transformed event.
124    #[serde(default = "default_tags")]
125    #[derivative(Default(value = "default_tags()"))]
126    #[configurable(metadata(docs::examples = "Name", docs::examples = "Project",))]
127    tags: Vec<String>,
128
129    /// The timeout for querying the EC2 metadata endpoint, in seconds.
130    #[serde(default = "default_refresh_timeout_secs")]
131    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
132    #[derivative(Default(value = "default_refresh_timeout_secs()"))]
133    refresh_timeout_secs: Duration,
134
135    #[configurable(derived)]
136    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
137    proxy: ProxyConfig,
138
139    /// Requires the transform to be able to successfully query the EC2 metadata before starting to process the data.
140    #[serde(default = "default_required")]
141    #[derivative(Default(value = "default_required()"))]
142    required: bool,
143}
144
145fn default_endpoint() -> String {
146    String::from("http://169.254.169.254")
147}
148
149const fn default_refresh_interval_secs() -> Duration {
150    Duration::from_secs(10)
151}
152
153const fn default_refresh_timeout_secs() -> Duration {
154    Duration::from_secs(1)
155}
156
157fn default_fields() -> Vec<String> {
158    DEFAULT_FIELD_ALLOWLIST
159        .iter()
160        .map(|s| s.to_string())
161        .collect()
162}
163
164const fn default_tags() -> Vec<String> {
165    Vec::<String>::new()
166}
167
168const fn default_required() -> bool {
169    true
170}
171
172#[derive(Clone, Debug)]
173pub struct Ec2MetadataTransform {
174    state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
175}
176
177#[derive(Debug, Clone)]
178struct MetadataKey {
179    log_path: OwnedTargetPath,
180    metric_tag: String,
181}
182
183#[derive(Debug)]
184struct Keys {
185    account_id_key: MetadataKey,
186    ami_id_key: MetadataKey,
187    availability_zone_key: MetadataKey,
188    instance_id_key: MetadataKey,
189    instance_type_key: MetadataKey,
190    local_hostname_key: MetadataKey,
191    local_ipv4_key: MetadataKey,
192    public_hostname_key: MetadataKey,
193    public_ipv4_key: MetadataKey,
194    region_key: MetadataKey,
195    subnet_id_key: MetadataKey,
196    vpc_id_key: MetadataKey,
197    role_name_key: MetadataKey,
198    tags_key: MetadataKey,
199}
200
201impl_generate_config_from_default!(Ec2Metadata);
202
203#[async_trait::async_trait]
204#[typetag::serde(name = "aws_ec2_metadata")]
205impl TransformConfig for Ec2Metadata {
206    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
207        let state = Arc::new(ArcSwap::new(Arc::new(vec![])));
208
209        let keys = Keys::new(self.namespace.clone());
210        let host = Uri::from_maybe_shared(self.endpoint.clone()).unwrap();
211        let refresh_interval = self.refresh_interval_secs;
212        let fields = self.fields.clone();
213        let tags = self.tags.clone();
214        let refresh_timeout = self.refresh_timeout_secs;
215        let required = self.required;
216
217        let proxy = ProxyConfig::merge_with_env(&context.globals.proxy, &self.proxy);
218        let http_client = HttpClient::new(None, &proxy)?;
219
220        let mut client = MetadataClient::new(
221            http_client,
222            host,
223            keys,
224            Arc::clone(&state),
225            refresh_interval,
226            refresh_timeout,
227            fields,
228            tags,
229        );
230
231        // If initial metadata is not required, log and proceed. Otherwise return error.
232        if let Err(error) = client.refresh_metadata().await {
233            if required {
234                return Err(error);
235            } else {
236                emit!(AwsEc2MetadataRefreshError { error });
237            }
238        }
239
240        tokio::spawn(
241            async move {
242                client.run().await;
243            }
244            // TODO: Once #1338 is done we can fetch the current span
245            .instrument(info_span!("aws_ec2_metadata: worker").or_current()),
246        );
247
248        Ok(Transform::event_task(Ec2MetadataTransform { state }))
249    }
250
251    fn input(&self) -> Input {
252        Input::new(DataType::Metric | DataType::Log)
253    }
254
255    fn outputs(
256        &self,
257        _: &TransformContext,
258        input_definitions: &[(OutputId, schema::Definition)],
259    ) -> Vec<TransformOutput> {
260        let added_keys = Keys::new(self.namespace.clone());
261
262        let paths = [
263            &added_keys.account_id_key.log_path,
264            &added_keys.ami_id_key.log_path,
265            &added_keys.availability_zone_key.log_path,
266            &added_keys.instance_id_key.log_path,
267            &added_keys.instance_type_key.log_path,
268            &added_keys.local_hostname_key.log_path,
269            &added_keys.local_ipv4_key.log_path,
270            &added_keys.public_hostname_key.log_path,
271            &added_keys.public_ipv4_key.log_path,
272            &added_keys.region_key.log_path,
273            &added_keys.subnet_id_key.log_path,
274            &added_keys.vpc_id_key.log_path,
275            &added_keys.role_name_key.log_path,
276            &added_keys.tags_key.log_path,
277        ];
278
279        let schema_definition = input_definitions
280            .iter()
281            .map(|(output, definition)| {
282                let mut schema_definition = definition.clone();
283
284                // If the event is not an object, it will be converted to an object in this transform
285                if !schema_definition.event_kind().contains_object() {
286                    *schema_definition.event_kind_mut() = Kind::object(Collection::empty());
287                }
288
289                for path in paths {
290                    schema_definition =
291                        schema_definition.with_field(path, Kind::bytes().or_undefined(), None);
292                }
293
294                (output.clone(), schema_definition)
295            })
296            .collect();
297
298        vec![TransformOutput::new(
299            DataType::Metric | DataType::Log,
300            schema_definition,
301        )]
302    }
303}
304
305impl TaskTransform<Event> for Ec2MetadataTransform {
306    fn transform(
307        self: Box<Self>,
308        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
309    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
310    where
311        Self: 'static,
312    {
313        let mut inner = self;
314        Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event)))))
315    }
316}
317
318impl Ec2MetadataTransform {
319    fn transform_one(&mut self, mut event: Event) -> Event {
320        let state = self.state.load();
321        match event {
322            Event::Log(ref mut log) => {
323                state.iter().for_each(|(k, v)| {
324                    log.insert(&k.log_path, v.clone());
325                });
326            }
327            Event::Metric(ref mut metric) => {
328                state.iter().for_each(|(k, v)| {
329                    metric
330                        .replace_tag(k.metric_tag.clone(), String::from_utf8_lossy(v).to_string());
331                });
332            }
333            Event::Trace(_) => panic!("Traces are not supported."),
334        }
335        event
336    }
337}
338
339struct MetadataClient {
340    client: HttpClient<Body>,
341    host: Uri,
342    token: Option<(Bytes, Instant)>,
343    keys: Keys,
344    state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
345    refresh_interval: Duration,
346    refresh_timeout: Duration,
347    fields: HashSet<String>,
348    tags: HashSet<String>,
349}
350
351#[derive(Debug, Deserialize)]
352#[serde(rename_all = "camelCase")]
353#[allow(dead_code)] // deserialize all fields
354struct IdentityDocument {
355    account_id: String,
356    architecture: String,
357    image_id: String,
358    instance_id: String,
359    instance_type: String,
360    private_ip: String,
361    region: String,
362    version: String,
363}
364
365impl MetadataClient {
366    #[allow(clippy::too_many_arguments)]
367    pub fn new(
368        client: HttpClient<Body>,
369        host: Uri,
370        keys: Keys,
371        state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
372        refresh_interval: Duration,
373        refresh_timeout: Duration,
374        fields: Vec<String>,
375        tags: Vec<String>,
376    ) -> Self {
377        Self {
378            client,
379            host,
380            token: None,
381            keys,
382            state,
383            refresh_interval,
384            refresh_timeout,
385            fields: fields.into_iter().collect(),
386            tags: tags.into_iter().collect(),
387        }
388    }
389
390    async fn run(&mut self) {
391        loop {
392            match self.refresh_metadata().await {
393                Ok(_) => {
394                    emit!(AwsEc2MetadataRefreshSuccessful);
395                }
396                Err(error) => {
397                    emit!(AwsEc2MetadataRefreshError { error });
398                }
399            }
400
401            sleep(self.refresh_interval).await;
402        }
403    }
404
405    pub async fn get_token(&mut self) -> Result<Bytes, crate::Error> {
406        if let Some((token, next_refresh)) = self.token.clone() {
407            // If the next refresh is greater (in the future) than
408            // the current time we can return the token since its still valid
409            // otherwise lets refresh it.
410            if next_refresh > Instant::now() {
411                return Ok(token);
412            }
413        }
414
415        let mut parts = self.host.clone().into_parts();
416        parts.path_and_query = Some(API_TOKEN.clone());
417        let uri = Uri::from_parts(parts)?;
418
419        let req = Request::put(uri)
420            .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
421            .body(Body::empty())?;
422
423        let res = tokio::time::timeout(self.refresh_timeout, self.client.send(req))
424            .await?
425            .map_err(crate::Error::from)
426            .and_then(|res| match res.status() {
427                StatusCode::OK => Ok(res),
428                status_code => Err(UnexpectedHttpStatusError {
429                    status: status_code,
430                }
431                .into()),
432            })?;
433
434        let token = http_body::Body::collect(res.into_body()).await?.to_bytes();
435
436        let next_refresh = Instant::now() + Duration::from_secs(21600);
437        self.token = Some((token.clone(), next_refresh));
438
439        Ok(token)
440    }
441
442    pub async fn get_document(&mut self) -> Result<Option<IdentityDocument>, crate::Error> {
443        self.get_metadata(&DYNAMIC_DOCUMENT)
444            .await?
445            .map(|body| {
446                serde_json::from_slice(&body[..])
447                    .context(ParseIdentityDocumentSnafu {})
448                    .map_err(Into::into)
449            })
450            .transpose()
451    }
452
453    pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
454        let mut new_state = vec![];
455
456        // Fetch all resources, _then_ add them to the state map.
457        if let Some(document) = self.get_document().await? {
458            if self.fields.contains(ACCOUNT_ID_KEY) {
459                new_state.push((self.keys.account_id_key.clone(), document.account_id.into()));
460            }
461
462            if self.fields.contains(AMI_ID_KEY) {
463                new_state.push((self.keys.ami_id_key.clone(), document.image_id.into()));
464            }
465
466            if self.fields.contains(INSTANCE_ID_KEY) {
467                new_state.push((
468                    self.keys.instance_id_key.clone(),
469                    document.instance_id.into(),
470                ));
471            }
472
473            if self.fields.contains(INSTANCE_TYPE_KEY) {
474                new_state.push((
475                    self.keys.instance_type_key.clone(),
476                    document.instance_type.into(),
477                ));
478            }
479
480            if self.fields.contains(REGION_KEY) {
481                new_state.push((self.keys.region_key.clone(), document.region.into()));
482            }
483
484            if self.fields.contains(AVAILABILITY_ZONE_KEY)
485                && let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await?
486            {
487                new_state.push((self.keys.availability_zone_key.clone(), availability_zone));
488            }
489
490            if self.fields.contains(LOCAL_HOSTNAME_KEY)
491                && let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await?
492            {
493                new_state.push((self.keys.local_hostname_key.clone(), local_hostname));
494            }
495
496            if self.fields.contains(LOCAL_IPV4_KEY)
497                && let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await?
498            {
499                new_state.push((self.keys.local_ipv4_key.clone(), local_ipv4));
500            }
501
502            if self.fields.contains(PUBLIC_HOSTNAME_KEY)
503                && let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await?
504            {
505                new_state.push((self.keys.public_hostname_key.clone(), public_hostname));
506            }
507
508            if self.fields.contains(PUBLIC_IPV4_KEY)
509                && let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await?
510            {
511                new_state.push((self.keys.public_ipv4_key.clone(), public_ipv4));
512            }
513
514            if (self.fields.contains(SUBNET_ID_KEY) || self.fields.contains(VPC_ID_KEY))
515                && let Some(mac) = self.get_metadata(&MAC).await?
516            {
517                let mac = String::from_utf8_lossy(&mac[..]);
518
519                if self.fields.contains(SUBNET_ID_KEY) {
520                    let subnet_path =
521                        format!("/latest/meta-data/network/interfaces/macs/{mac}/subnet-id");
522
523                    let subnet_path = subnet_path.parse().context(ParsePathSnafu {
524                        value: subnet_path.clone(),
525                    })?;
526
527                    if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
528                        new_state.push((self.keys.subnet_id_key.clone(), subnet_id));
529                    }
530                }
531
532                if self.fields.contains(VPC_ID_KEY) {
533                    let vpc_path =
534                        format!("/latest/meta-data/network/interfaces/macs/{mac}/vpc-id");
535
536                    let vpc_path = vpc_path.parse().context(ParsePathSnafu {
537                        value: vpc_path.clone(),
538                    })?;
539
540                    if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
541                        new_state.push((self.keys.vpc_id_key.clone(), vpc_id));
542                    }
543                }
544            }
545
546            if self.fields.contains(ROLE_NAME_KEY)
547                && let Some(role_names) = self.get_metadata(&ROLE_NAME).await?
548            {
549                let role_names = String::from_utf8_lossy(&role_names[..]);
550
551                for (i, role_name) in role_names.lines().enumerate() {
552                    new_state.push((
553                        MetadataKey {
554                            log_path: self
555                                .keys
556                                .role_name_key
557                                .log_path
558                                .with_index_appended(i as isize),
559                            metric_tag: format!("{}[{}]", self.keys.role_name_key.metric_tag, i),
560                        },
561                        role_name.to_string().into(),
562                    ));
563                }
564            }
565
566            for tag in self.tags.clone() {
567                let tag_path = format!("/latest/meta-data/tags/instance/{tag}");
568
569                let tag_path = tag_path.parse().context(ParsePathSnafu {
570                    value: tag_path.clone(),
571                })?;
572
573                if let Some(tag_content) = self.get_metadata(&tag_path).await? {
574                    new_state.push((
575                        MetadataKey {
576                            log_path: self.keys.tags_key.log_path.with_field_appended(&tag),
577                            metric_tag: format!("{}[{}]", self.keys.tags_key.metric_tag, &tag),
578                        },
579                        tag_content,
580                    ));
581                }
582            }
583
584            self.state.store(Arc::new(new_state));
585        }
586
587        Ok(())
588    }
589
590    async fn get_metadata(&mut self, path: &PathAndQuery) -> Result<Option<Bytes>, crate::Error> {
591        let token = self
592            .get_token()
593            .await
594            .with_context(|_| FetchTokenSnafu {})?;
595
596        let mut parts = self.host.clone().into_parts();
597
598        parts.path_and_query = Some(path.clone());
599
600        let uri = Uri::from_parts(parts)?;
601
602        debug!(message = "Sending metadata request.", %uri);
603
604        let req = Request::get(uri)
605            .header(TOKEN_HEADER.as_ref(), token.as_ref())
606            .body(Body::empty())?;
607
608        match tokio::time::timeout(self.refresh_timeout, self.client.send(req))
609            .await?
610            .map_err(crate::Error::from)
611            .and_then(|res| match res.status() {
612                StatusCode::OK => Ok(Some(res)),
613                StatusCode::NOT_FOUND => Ok(None),
614                status_code => Err(UnexpectedHttpStatusError {
615                    status: status_code,
616                }
617                .into()),
618            })? {
619            Some(res) => {
620                let body = http_body::Body::collect(res.into_body()).await?.to_bytes();
621                Ok(Some(body))
622            }
623            None => Ok(None),
624        }
625    }
626}
627
628// This creates a simplified string from the namespace. Since the namespace is technically
629// a target path, it can contain syntax that is undesirable for a metric tag (such as prefix, quotes, etc)
630// This is mainly used for backwards compatibility.
631// see: https://github.com/vectordotdev/vector/issues/14931
632fn create_metric_namespace(namespace: &OwnedTargetPath) -> String {
633    let mut output = String::new();
634    for segment in &namespace.path.segments {
635        if !output.is_empty() {
636            output += ".";
637        }
638        match segment {
639            OwnedSegment::Field(field) => {
640                output += field;
641            }
642            OwnedSegment::Index(i) => {
643                output += &i.to_string();
644            }
645        }
646    }
647    output
648}
649
650fn create_key(namespace: &Option<OwnedTargetPath>, key: &str) -> MetadataKey {
651    if let Some(namespace) = namespace {
652        MetadataKey {
653            log_path: namespace.with_field_appended(key),
654            metric_tag: format!("{}.{}", create_metric_namespace(namespace), key),
655        }
656    } else {
657        MetadataKey {
658            log_path: OwnedTargetPath::event(owned_value_path!(key)),
659            metric_tag: key.to_owned(),
660        }
661    }
662}
663
664impl Keys {
665    pub fn new(namespace: Option<OptionalTargetPath>) -> Self {
666        let namespace = namespace.and_then(|namespace| namespace.path);
667
668        Keys {
669            account_id_key: create_key(&namespace, ACCOUNT_ID_KEY),
670            ami_id_key: create_key(&namespace, AMI_ID_KEY),
671            availability_zone_key: create_key(&namespace, AVAILABILITY_ZONE_KEY),
672            instance_id_key: create_key(&namespace, INSTANCE_ID_KEY),
673            instance_type_key: create_key(&namespace, INSTANCE_TYPE_KEY),
674            local_hostname_key: create_key(&namespace, LOCAL_HOSTNAME_KEY),
675            local_ipv4_key: create_key(&namespace, LOCAL_IPV4_KEY),
676            public_hostname_key: create_key(&namespace, PUBLIC_HOSTNAME_KEY),
677            public_ipv4_key: create_key(&namespace, PUBLIC_IPV4_KEY),
678            region_key: create_key(&namespace, REGION_KEY),
679            subnet_id_key: create_key(&namespace, SUBNET_ID_KEY),
680            vpc_id_key: create_key(&namespace, VPC_ID_KEY),
681            role_name_key: create_key(&namespace, ROLE_NAME_KEY),
682            tags_key: create_key(&namespace, TAGS_KEY),
683        }
684    }
685}
686
687#[derive(Debug)]
688struct UnexpectedHttpStatusError {
689    status: http::StatusCode,
690}
691
692impl fmt::Display for UnexpectedHttpStatusError {
693    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
694        write!(f, "got unexpected status code: {}", self.status)
695    }
696}
697
698impl error::Error for UnexpectedHttpStatusError {}
699
700#[derive(Debug, snafu::Snafu)]
701enum Ec2MetadataError {
702    #[snafu(display("Unable to fetch metadata authentication token: {}.", source))]
703    FetchToken { source: crate::Error },
704    #[snafu(display("Unable to parse identity document: {}.", source))]
705    ParseIdentityDocument { source: serde_json::Error },
706    #[snafu(display("Unable to parse metadata path {}, {}.", value, source))]
707    ParsePath {
708        value: String,
709        source: http::uri::InvalidUri,
710    },
711}
712
713#[cfg(test)]
714mod test {
715    use vector_lib::lookup::OwnedTargetPath;
716    use vrl::{owned_value_path, value::Kind};
717
718    use crate::{
719        config::{LogNamespace, OutputId, TransformConfig, schema::Definition},
720        transforms::aws_ec2_metadata::Ec2Metadata,
721    };
722
723    #[tokio::test]
724    async fn schema_def_with_string_input() {
725        let transform_config = Ec2Metadata {
726            namespace: Some(OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into()),
727            ..Default::default()
728        };
729
730        let input_definition =
731            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
732
733        let mut outputs = transform_config.outputs(
734            &Default::default(),
735            &[(OutputId::dummy(), input_definition)],
736        );
737        assert_eq!(outputs.len(), 1);
738        let output = outputs.pop().unwrap();
739        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
740        assert!(actual_schema_def.event_kind().is_object());
741    }
742}
743
744#[cfg(feature = "aws-ec2-metadata-integration-tests")]
745#[cfg(test)]
746mod integration_tests {
747    use tokio::sync::mpsc;
748    use tokio_stream::wrappers::ReceiverStream;
749    use vector_lib::{
750        assert_event_data_eq,
751        lookup::{
752            PathPrefix, event_path,
753            lookup_v2::{OwnedSegment, OwnedValuePath},
754        },
755    };
756    use vrl::value::{ObjectMap, Value};
757    use warp::Filter;
758
759    use super::*;
760    use crate::{
761        event::{LogEvent, Metric, metric},
762        test_util::{addr::next_addr, components::assert_transform_compliance},
763        transforms::test::create_topology,
764    };
765
766    fn ec2_metadata_address() -> String {
767        std::env::var("EC2_METADATA_ADDRESS").unwrap_or_else(|_| "http://localhost:1338".into())
768    }
769
770    fn expected_log_fields() -> Vec<(OwnedValuePath, &'static str)> {
771        vec![
772            (
773                vec![OwnedSegment::field(AVAILABILITY_ZONE_KEY)].into(),
774                "us-east-1a",
775            ),
776            (
777                vec![OwnedSegment::field(PUBLIC_IPV4_KEY)].into(),
778                "192.0.2.54",
779            ),
780            (
781                vec![OwnedSegment::field(PUBLIC_HOSTNAME_KEY)].into(),
782                "ec2-192-0-2-54.compute-1.amazonaws.com",
783            ),
784            (
785                vec![OwnedSegment::field(LOCAL_IPV4_KEY)].into(),
786                "172.16.34.43",
787            ),
788            (
789                vec![OwnedSegment::field(LOCAL_HOSTNAME_KEY)].into(),
790                "ip-172-16-34-43.ec2.internal",
791            ),
792            (
793                vec![OwnedSegment::field(INSTANCE_ID_KEY)].into(),
794                "i-1234567890abcdef0",
795            ),
796            (
797                vec![OwnedSegment::field(ACCOUNT_ID_KEY)].into(),
798                "0123456789",
799            ),
800            (
801                vec![OwnedSegment::field(AMI_ID_KEY)].into(),
802                "ami-0b69ea66ff7391e80",
803            ),
804            (
805                vec![OwnedSegment::field(INSTANCE_TYPE_KEY)].into(),
806                "m4.xlarge",
807            ),
808            (vec![OwnedSegment::field(REGION_KEY)].into(), "us-east-1"),
809            (vec![OwnedSegment::field(VPC_ID_KEY)].into(), "vpc-d295a6a7"),
810            (
811                vec![OwnedSegment::field(SUBNET_ID_KEY)].into(),
812                "subnet-0ac62554",
813            ),
814            (owned_value_path!("role-name", 0), "baskinc-role"),
815            (owned_value_path!("tags", "Name"), "test-instance"),
816            (owned_value_path!("tags", "Test"), "test-tag"),
817        ]
818    }
819
820    fn expected_metric_fields() -> Vec<(&'static str, &'static str)> {
821        vec![
822            (AVAILABILITY_ZONE_KEY, "us-east-1a"),
823            (PUBLIC_IPV4_KEY, "192.0.2.54"),
824            (
825                PUBLIC_HOSTNAME_KEY,
826                "ec2-192-0-2-54.compute-1.amazonaws.com",
827            ),
828            (LOCAL_IPV4_KEY, "172.16.34.43"),
829            (LOCAL_HOSTNAME_KEY, "ip-172-16-34-43.ec2.internal"),
830            (INSTANCE_ID_KEY, "i-1234567890abcdef0"),
831            (ACCOUNT_ID_KEY, "0123456789"),
832            (AMI_ID_KEY, "ami-0b69ea66ff7391e80"),
833            (INSTANCE_TYPE_KEY, "m4.xlarge"),
834            (REGION_KEY, "us-east-1"),
835            (VPC_ID_KEY, "vpc-d295a6a7"),
836            (SUBNET_ID_KEY, "subnet-0ac62554"),
837            ("role-name[0]", "baskinc-role"),
838            ("tags[Name]", "test-instance"),
839            ("tags[Test]", "test-tag"),
840        ]
841    }
842
843    fn make_metric() -> Metric {
844        Metric::new(
845            "event",
846            metric::MetricKind::Incremental,
847            metric::MetricValue::Counter { value: 1.0 },
848        )
849    }
850
851    #[test]
852    fn generate_config() {
853        crate::test_util::test_generate_config::<Ec2Metadata>();
854    }
855
856    #[tokio::test]
857    async fn enrich_log() {
858        assert_transform_compliance(async {
859            let mut fields = default_fields();
860            fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
861
862            let tags = vec![
863                String::from("Name"),
864                String::from("Test"),
865                String::from("MISSING_TAG"),
866            ];
867
868            let transform_config = Ec2Metadata {
869                endpoint: ec2_metadata_address(),
870                fields,
871                tags,
872                ..Default::default()
873            };
874
875            let (tx, rx) = mpsc::channel(1);
876            let (topology, mut out) =
877                create_topology(ReceiverStream::new(rx), transform_config).await;
878
879            // We need to sleep to let the background task fetch the data.
880            sleep(Duration::from_secs(1)).await;
881
882            let log = LogEvent::default();
883            let mut expected_log = log.clone();
884            for (k, v) in expected_log_fields().iter().cloned() {
885                expected_log.insert((PathPrefix::Event, &k), v);
886            }
887
888            tx.send(log.into()).await.unwrap();
889
890            let event = out.recv().await.unwrap();
891            assert_event_data_eq!(event.into_log(), expected_log);
892
893            drop(tx);
894            topology.stop().await;
895            assert_eq!(out.recv().await, None);
896        })
897        .await;
898    }
899
900    #[tokio::test(flavor = "multi_thread")]
901    async fn timeout() {
902        let (_guard, addr) = next_addr();
903
904        async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
905            tokio::time::sleep(Duration::from_secs(3)).await;
906            Ok("I waited 3 seconds!")
907        }
908
909        let slow = warp::any().and_then(sleepy);
910        let server = warp::serve(slow).bind(addr);
911        let _server = tokio::spawn(server);
912
913        let config = Ec2Metadata {
914            endpoint: format!("http://{addr}"),
915            refresh_timeout_secs: Duration::from_secs(1),
916            ..Default::default()
917        };
918
919        match config.build(&TransformContext::default()).await {
920            Ok(_) => panic!("expected timeout failure"),
921            // cannot create tokio::time::error::Elapsed to compare with since constructor is
922            // private
923            Err(err) => assert_eq!(
924                err.to_string(),
925                "Unable to fetch metadata authentication token: deadline has elapsed."
926            ),
927        }
928    }
929
930    // validates the configuration setting 'required'=false allows vector to run
931    #[tokio::test(flavor = "multi_thread")]
932    async fn not_required() {
933        let (_guard, addr) = next_addr();
934
935        async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
936            tokio::time::sleep(Duration::from_secs(3)).await;
937            Ok("I waited 3 seconds!")
938        }
939
940        let slow = warp::any().and_then(sleepy);
941        let server = warp::serve(slow).bind(addr);
942        let _server = tokio::spawn(server);
943
944        let config = Ec2Metadata {
945            endpoint: format!("http://{addr}"),
946            refresh_timeout_secs: Duration::from_secs(1),
947            required: false,
948            ..Default::default()
949        };
950
951        assert!(
952            config.build(&TransformContext::default()).await.is_ok(),
953            "expected no failure because 'required' config value set to false"
954        );
955    }
956
957    #[tokio::test]
958    async fn enrich_metric() {
959        assert_transform_compliance(async {
960            let mut fields = default_fields();
961            fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
962
963            let tags = vec![
964                String::from("Name"),
965                String::from("Test"),
966                String::from("MISSING_TAG"),
967            ];
968
969            let transform_config = Ec2Metadata {
970                endpoint: ec2_metadata_address(),
971                fields,
972                tags,
973                ..Default::default()
974            };
975
976            let (tx, rx) = mpsc::channel(1);
977            let (topology, mut out) =
978                create_topology(ReceiverStream::new(rx), transform_config).await;
979
980            // We need to sleep to let the background task fetch the data.
981            sleep(Duration::from_secs(1)).await;
982
983            let metric = make_metric();
984            let mut expected_metric = metric.clone();
985            for (k, v) in expected_metric_fields().iter() {
986                expected_metric.replace_tag(k.to_string(), v.to_string());
987            }
988
989            tx.send(metric.into()).await.unwrap();
990
991            let event = out.recv().await.unwrap();
992            assert_event_data_eq!(event.into_metric(), expected_metric);
993
994            drop(tx);
995            topology.stop().await;
996            assert_eq!(out.recv().await, None);
997        })
998        .await;
999    }
1000
1001    #[tokio::test]
1002    async fn fields_log() {
1003        assert_transform_compliance(async {
1004            let transform_config = Ec2Metadata {
1005                endpoint: ec2_metadata_address(),
1006                fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1007                tags: vec![
1008                    String::from("Name"),
1009                    String::from("Test"),
1010                    String::from("MISSING_TAG"),
1011                ],
1012                ..Default::default()
1013            };
1014
1015            let (tx, rx) = mpsc::channel(1);
1016            let (topology, mut out) =
1017                create_topology(ReceiverStream::new(rx), transform_config).await;
1018
1019            // We need to sleep to let the background task fetch the data.
1020            sleep(Duration::from_secs(1)).await;
1021
1022            let log = LogEvent::default();
1023            let mut expected_log = log.clone();
1024            expected_log.insert(format!("\"{PUBLIC_IPV4_KEY}\"").as_str(), "192.0.2.54");
1025            expected_log.insert(format!("\"{REGION_KEY}\"").as_str(), "us-east-1");
1026            expected_log.insert(
1027                format!("\"{TAGS_KEY}\"").as_str(),
1028                ObjectMap::from([
1029                    ("Name".into(), Value::from("test-instance")),
1030                    ("Test".into(), Value::from("test-tag")),
1031                ]),
1032            );
1033
1034            tx.send(log.into()).await.unwrap();
1035
1036            let event = out.recv().await.unwrap();
1037            assert_event_data_eq!(event.into_log(), expected_log);
1038
1039            drop(tx);
1040            topology.stop().await;
1041            assert_eq!(out.recv().await, None);
1042        })
1043        .await;
1044    }
1045
1046    #[tokio::test]
1047    async fn fields_metric() {
1048        assert_transform_compliance(async {
1049            let transform_config = Ec2Metadata {
1050                endpoint: ec2_metadata_address(),
1051                fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1052                tags: vec![
1053                    String::from("Name"),
1054                    String::from("Test"),
1055                    String::from("MISSING_TAG"),
1056                ],
1057                ..Default::default()
1058            };
1059
1060            let (tx, rx) = mpsc::channel(1);
1061            let (topology, mut out) =
1062                create_topology(ReceiverStream::new(rx), transform_config).await;
1063
1064            // We need to sleep to let the background task fetch the data.
1065            sleep(Duration::from_secs(1)).await;
1066
1067            let metric = make_metric();
1068            let mut expected_metric = metric.clone();
1069            expected_metric.replace_tag(PUBLIC_IPV4_KEY.to_string(), "192.0.2.54".to_string());
1070            expected_metric.replace_tag(REGION_KEY.to_string(), "us-east-1".to_string());
1071            expected_metric.replace_tag(
1072                format!("{}[{}]", TAGS_KEY, "Name"),
1073                "test-instance".to_string(),
1074            );
1075            expected_metric
1076                .replace_tag(format!("{}[{}]", TAGS_KEY, "Test"), "test-tag".to_string());
1077
1078            tx.send(metric.into()).await.unwrap();
1079
1080            let event = out.recv().await.unwrap();
1081            assert_event_data_eq!(event.into_metric(), expected_metric);
1082
1083            drop(tx);
1084            topology.stop().await;
1085            assert_eq!(out.recv().await, None);
1086        })
1087        .await;
1088    }
1089
1090    #[tokio::test]
1091    async fn namespace_log() {
1092        {
1093            assert_transform_compliance(async {
1094                let transform_config = Ec2Metadata {
1095                    endpoint: ec2_metadata_address(),
1096                    namespace: Some(
1097                        OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1098                    ),
1099                    ..Default::default()
1100                };
1101
1102                let (tx, rx) = mpsc::channel(1);
1103                let (topology, mut out) =
1104                    create_topology(ReceiverStream::new(rx), transform_config).await;
1105
1106                // We need to sleep to let the background task fetch the data.
1107                sleep(Duration::from_secs(1)).await;
1108
1109                let log = LogEvent::default();
1110
1111                tx.send(log.into()).await.unwrap();
1112
1113                let event = out.recv().await.unwrap();
1114
1115                assert_eq!(
1116                    event.as_log().get("ec2.metadata.\"availability-zone\""),
1117                    Some(&"us-east-1a".into())
1118                );
1119
1120                drop(tx);
1121                topology.stop().await;
1122                assert_eq!(out.recv().await, None);
1123            })
1124            .await;
1125        }
1126
1127        {
1128            assert_transform_compliance(async {
1129                // Set an empty namespace to ensure we don't prepend one.
1130                let transform_config = Ec2Metadata {
1131                    endpoint: ec2_metadata_address(),
1132                    namespace: Some(OptionalTargetPath::none()),
1133                    ..Default::default()
1134                };
1135
1136                let (tx, rx) = mpsc::channel(1);
1137                let (topology, mut out) =
1138                    create_topology(ReceiverStream::new(rx), transform_config).await;
1139
1140                // We need to sleep to let the background task fetch the data.
1141                sleep(Duration::from_secs(1)).await;
1142
1143                let log = LogEvent::default();
1144
1145                tx.send(log.into()).await.unwrap();
1146
1147                let event = out.recv().await.unwrap();
1148                assert_eq!(
1149                    event.as_log().get(event_path!(AVAILABILITY_ZONE_KEY)),
1150                    Some(&"us-east-1a".into())
1151                );
1152
1153                drop(tx);
1154                topology.stop().await;
1155                assert_eq!(out.recv().await, None);
1156            })
1157            .await;
1158        }
1159    }
1160
1161    #[tokio::test]
1162    async fn namespace_metric() {
1163        {
1164            assert_transform_compliance(async {
1165                let transform_config = Ec2Metadata {
1166                    endpoint: ec2_metadata_address(),
1167                    namespace: Some(
1168                        OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1169                    ),
1170                    ..Default::default()
1171                };
1172
1173                let (tx, rx) = mpsc::channel(1);
1174                let (topology, mut out) =
1175                    create_topology(ReceiverStream::new(rx), transform_config).await;
1176
1177                // We need to sleep to let the background task fetch the data.
1178                sleep(Duration::from_secs(1)).await;
1179
1180                let metric = make_metric();
1181
1182                tx.send(metric.into()).await.unwrap();
1183
1184                let event = out.recv().await.unwrap();
1185                assert_eq!(
1186                    event
1187                        .as_metric()
1188                        .tag_value("ec2.metadata.availability-zone"),
1189                    Some("us-east-1a".to_string())
1190                );
1191
1192                drop(tx);
1193                topology.stop().await;
1194                assert_eq!(out.recv().await, None);
1195            })
1196            .await;
1197        }
1198
1199        {
1200            assert_transform_compliance(async {
1201                // Set an empty namespace to ensure we don't prepend one.
1202                let transform_config = Ec2Metadata {
1203                    endpoint: ec2_metadata_address(),
1204                    namespace: Some(OptionalTargetPath::none()),
1205                    ..Default::default()
1206                };
1207
1208                let (tx, rx) = mpsc::channel(1);
1209                let (topology, mut out) =
1210                    create_topology(ReceiverStream::new(rx), transform_config).await;
1211
1212                // We need to sleep to let the background task fetch the data.
1213                sleep(Duration::from_secs(1)).await;
1214
1215                let metric = make_metric();
1216
1217                tx.send(metric.into()).await.unwrap();
1218
1219                let event = out.recv().await.unwrap();
1220                assert_eq!(
1221                    event.as_metric().tag_value(AVAILABILITY_ZONE_KEY),
1222                    Some("us-east-1a".to_string())
1223                );
1224
1225                drop(tx);
1226                topology.stop().await;
1227                assert_eq!(out.recv().await, None);
1228            })
1229            .await;
1230        }
1231    }
1232}