vector/transforms/
aws_ec2_metadata.rs

1use std::sync::{Arc, LazyLock};
2use std::{collections::HashSet, error, fmt, future::ready, pin::Pin};
3
4use arc_swap::ArcSwap;
5use bytes::Bytes;
6use futures::{Stream, StreamExt};
7use http::{uri::PathAndQuery, Request, StatusCode, Uri};
8use hyper::{body::to_bytes as body_to_bytes, Body};
9use serde::Deserialize;
10use serde_with::serde_as;
11use snafu::ResultExt as _;
12use tokio::time::{sleep, Duration, Instant};
13use tracing::Instrument;
14use vector_lib::config::LogNamespace;
15use vector_lib::configurable::configurable_component;
16use vector_lib::lookup::lookup_v2::{OptionalTargetPath, OwnedSegment};
17use vector_lib::lookup::owned_value_path;
18use vector_lib::lookup::OwnedTargetPath;
19use vrl::value::kind::Collection;
20use vrl::value::Kind;
21
22use crate::config::OutputId;
23use crate::{
24    config::{DataType, Input, ProxyConfig, TransformConfig, TransformContext, TransformOutput},
25    event::Event,
26    http::HttpClient,
27    internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful},
28    schema,
29    transforms::{TaskTransform, Transform},
30};
31
32const ACCOUNT_ID_KEY: &str = "account-id";
33const AMI_ID_KEY: &str = "ami-id";
34const AVAILABILITY_ZONE_KEY: &str = "availability-zone";
35const INSTANCE_ID_KEY: &str = "instance-id";
36const INSTANCE_TYPE_KEY: &str = "instance-type";
37const LOCAL_HOSTNAME_KEY: &str = "local-hostname";
38const LOCAL_IPV4_KEY: &str = "local-ipv4";
39const PUBLIC_HOSTNAME_KEY: &str = "public-hostname";
40const PUBLIC_IPV4_KEY: &str = "public-ipv4";
41const REGION_KEY: &str = "region";
42const SUBNET_ID_KEY: &str = "subnet-id";
43const VPC_ID_KEY: &str = "vpc-id";
44const ROLE_NAME_KEY: &str = "role-name";
45const TAGS_KEY: &str = "tags";
46
47static AVAILABILITY_ZONE: LazyLock<PathAndQuery> =
48    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/placement/availability-zone"));
49static LOCAL_HOSTNAME: LazyLock<PathAndQuery> =
50    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-hostname"));
51static LOCAL_IPV4: LazyLock<PathAndQuery> =
52    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-ipv4"));
53static PUBLIC_HOSTNAME: LazyLock<PathAndQuery> =
54    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-hostname"));
55static PUBLIC_IPV4: LazyLock<PathAndQuery> =
56    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-ipv4"));
57static ROLE_NAME: LazyLock<PathAndQuery> =
58    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/iam/security-credentials/"));
59static MAC: LazyLock<PathAndQuery> =
60    LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/mac"));
61static DYNAMIC_DOCUMENT: LazyLock<PathAndQuery> =
62    LazyLock::new(|| PathAndQuery::from_static("/latest/dynamic/instance-identity/document"));
63static DEFAULT_FIELD_ALLOWLIST: &[&str] = &[
64    AMI_ID_KEY,
65    AVAILABILITY_ZONE_KEY,
66    INSTANCE_ID_KEY,
67    INSTANCE_TYPE_KEY,
68    LOCAL_HOSTNAME_KEY,
69    LOCAL_IPV4_KEY,
70    PUBLIC_HOSTNAME_KEY,
71    PUBLIC_IPV4_KEY,
72    REGION_KEY,
73    SUBNET_ID_KEY,
74    VPC_ID_KEY,
75    ROLE_NAME_KEY,
76];
77static API_TOKEN: LazyLock<PathAndQuery> =
78    LazyLock::new(|| PathAndQuery::from_static("/latest/api/token"));
79static TOKEN_HEADER: LazyLock<Bytes> = LazyLock::new(|| Bytes::from("X-aws-ec2-metadata-token"));
80
81/// Configuration for the `aws_ec2_metadata` transform.
82#[serde_as]
83#[configurable_component(transform(
84    "aws_ec2_metadata",
85    "Parse metadata emitted by AWS EC2 instances."
86))]
87#[derive(Clone, Debug, Derivative)]
88#[derivative(Default)]
89pub struct Ec2Metadata {
90    /// Overrides the default EC2 metadata endpoint.
91    #[serde(alias = "host", default = "default_endpoint")]
92    #[derivative(Default(value = "default_endpoint()"))]
93    endpoint: String,
94
95    /// Sets a prefix for all event fields added by the transform.
96    #[configurable(metadata(
97        docs::examples = "",
98        docs::examples = "ec2",
99        docs::examples = "aws.ec2",
100    ))]
101    namespace: Option<OptionalTargetPath>,
102
103    /// The interval between querying for updated metadata, in seconds.
104    #[serde(default = "default_refresh_interval_secs")]
105    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
106    #[derivative(Default(value = "default_refresh_interval_secs()"))]
107    refresh_interval_secs: Duration,
108
109    /// A list of metadata fields to include in each transformed event.
110    #[serde(default = "default_fields")]
111    #[derivative(Default(value = "default_fields()"))]
112    #[configurable(metadata(docs::examples = "instance-id", docs::examples = "local-hostname",))]
113    fields: Vec<String>,
114
115    /// A list of instance tags to include in each transformed event.
116    #[serde(default = "default_tags")]
117    #[derivative(Default(value = "default_tags()"))]
118    #[configurable(metadata(docs::examples = "Name", docs::examples = "Project",))]
119    tags: Vec<String>,
120
121    /// The timeout for querying the EC2 metadata endpoint, in seconds.
122    #[serde(default = "default_refresh_timeout_secs")]
123    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
124    #[derivative(Default(value = "default_refresh_timeout_secs()"))]
125    refresh_timeout_secs: Duration,
126
127    #[configurable(derived)]
128    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
129    proxy: ProxyConfig,
130
131    /// Requires the transform to be able to successfully query the EC2 metadata before starting to process the data.
132    #[serde(default = "default_required")]
133    #[derivative(Default(value = "default_required()"))]
134    required: bool,
135}
136
137fn default_endpoint() -> String {
138    String::from("http://169.254.169.254")
139}
140
141const fn default_refresh_interval_secs() -> Duration {
142    Duration::from_secs(10)
143}
144
145const fn default_refresh_timeout_secs() -> Duration {
146    Duration::from_secs(1)
147}
148
149fn default_fields() -> Vec<String> {
150    DEFAULT_FIELD_ALLOWLIST
151        .iter()
152        .map(|s| s.to_string())
153        .collect()
154}
155
156const fn default_tags() -> Vec<String> {
157    Vec::<String>::new()
158}
159
160const fn default_required() -> bool {
161    true
162}
163
164#[derive(Clone, Debug)]
165pub struct Ec2MetadataTransform {
166    state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
167}
168
169#[derive(Debug, Clone)]
170struct MetadataKey {
171    log_path: OwnedTargetPath,
172    metric_tag: String,
173}
174
175#[derive(Debug)]
176struct Keys {
177    account_id_key: MetadataKey,
178    ami_id_key: MetadataKey,
179    availability_zone_key: MetadataKey,
180    instance_id_key: MetadataKey,
181    instance_type_key: MetadataKey,
182    local_hostname_key: MetadataKey,
183    local_ipv4_key: MetadataKey,
184    public_hostname_key: MetadataKey,
185    public_ipv4_key: MetadataKey,
186    region_key: MetadataKey,
187    subnet_id_key: MetadataKey,
188    vpc_id_key: MetadataKey,
189    role_name_key: MetadataKey,
190    tags_key: MetadataKey,
191}
192
193impl_generate_config_from_default!(Ec2Metadata);
194
195#[async_trait::async_trait]
196#[typetag::serde(name = "aws_ec2_metadata")]
197impl TransformConfig for Ec2Metadata {
198    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
199        let state = Arc::new(ArcSwap::new(Arc::new(vec![])));
200
201        let keys = Keys::new(self.namespace.clone());
202        let host = Uri::from_maybe_shared(self.endpoint.clone()).unwrap();
203        let refresh_interval = self.refresh_interval_secs;
204        let fields = self.fields.clone();
205        let tags = self.tags.clone();
206        let refresh_timeout = self.refresh_timeout_secs;
207        let required = self.required;
208
209        let proxy = ProxyConfig::merge_with_env(&context.globals.proxy, &self.proxy);
210        let http_client = HttpClient::new(None, &proxy)?;
211
212        let mut client = MetadataClient::new(
213            http_client,
214            host,
215            keys,
216            Arc::clone(&state),
217            refresh_interval,
218            refresh_timeout,
219            fields,
220            tags,
221        );
222
223        // If initial metadata is not required, log and proceed. Otherwise return error.
224        if let Err(error) = client.refresh_metadata().await {
225            if required {
226                return Err(error);
227            } else {
228                emit!(AwsEc2MetadataRefreshError { error });
229            }
230        }
231
232        tokio::spawn(
233            async move {
234                client.run().await;
235            }
236            // TODO: Once #1338 is done we can fetch the current span
237            .instrument(info_span!("aws_ec2_metadata: worker").or_current()),
238        );
239
240        Ok(Transform::event_task(Ec2MetadataTransform { state }))
241    }
242
243    fn input(&self) -> Input {
244        Input::new(DataType::Metric | DataType::Log)
245    }
246
247    fn outputs(
248        &self,
249        _: vector_lib::enrichment::TableRegistry,
250        input_definitions: &[(OutputId, schema::Definition)],
251        _: LogNamespace,
252    ) -> Vec<TransformOutput> {
253        let added_keys = Keys::new(self.namespace.clone());
254
255        let paths = [
256            &added_keys.account_id_key.log_path,
257            &added_keys.ami_id_key.log_path,
258            &added_keys.availability_zone_key.log_path,
259            &added_keys.instance_id_key.log_path,
260            &added_keys.instance_type_key.log_path,
261            &added_keys.local_hostname_key.log_path,
262            &added_keys.local_ipv4_key.log_path,
263            &added_keys.public_hostname_key.log_path,
264            &added_keys.public_ipv4_key.log_path,
265            &added_keys.region_key.log_path,
266            &added_keys.subnet_id_key.log_path,
267            &added_keys.vpc_id_key.log_path,
268            &added_keys.role_name_key.log_path,
269            &added_keys.tags_key.log_path,
270        ];
271
272        let schema_definition = input_definitions
273            .iter()
274            .map(|(output, definition)| {
275                let mut schema_definition = definition.clone();
276
277                // If the event is not an object, it will be converted to an object in this transform
278                if !schema_definition.event_kind().contains_object() {
279                    *schema_definition.event_kind_mut() = Kind::object(Collection::empty());
280                }
281
282                for path in paths {
283                    schema_definition =
284                        schema_definition.with_field(path, Kind::bytes().or_undefined(), None);
285                }
286
287                (output.clone(), schema_definition)
288            })
289            .collect();
290
291        vec![TransformOutput::new(
292            DataType::Metric | DataType::Log,
293            schema_definition,
294        )]
295    }
296}
297
298impl TaskTransform<Event> for Ec2MetadataTransform {
299    fn transform(
300        self: Box<Self>,
301        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
302    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
303    where
304        Self: 'static,
305    {
306        let mut inner = self;
307        Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event)))))
308    }
309}
310
311impl Ec2MetadataTransform {
312    fn transform_one(&mut self, mut event: Event) -> Event {
313        let state = self.state.load();
314        match event {
315            Event::Log(ref mut log) => {
316                state.iter().for_each(|(k, v)| {
317                    log.insert(&k.log_path, v.clone());
318                });
319            }
320            Event::Metric(ref mut metric) => {
321                state.iter().for_each(|(k, v)| {
322                    metric
323                        .replace_tag(k.metric_tag.clone(), String::from_utf8_lossy(v).to_string());
324                });
325            }
326            Event::Trace(_) => panic!("Traces are not supported."),
327        }
328        event
329    }
330}
331
332struct MetadataClient {
333    client: HttpClient<Body>,
334    host: Uri,
335    token: Option<(Bytes, Instant)>,
336    keys: Keys,
337    state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
338    refresh_interval: Duration,
339    refresh_timeout: Duration,
340    fields: HashSet<String>,
341    tags: HashSet<String>,
342}
343
344#[derive(Debug, Deserialize)]
345#[serde(rename_all = "camelCase")]
346#[allow(dead_code)] // deserialize all fields
347struct IdentityDocument {
348    account_id: String,
349    architecture: String,
350    image_id: String,
351    instance_id: String,
352    instance_type: String,
353    private_ip: String,
354    region: String,
355    version: String,
356}
357
358impl MetadataClient {
359    #[allow(clippy::too_many_arguments)]
360    pub fn new(
361        client: HttpClient<Body>,
362        host: Uri,
363        keys: Keys,
364        state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
365        refresh_interval: Duration,
366        refresh_timeout: Duration,
367        fields: Vec<String>,
368        tags: Vec<String>,
369    ) -> Self {
370        Self {
371            client,
372            host,
373            token: None,
374            keys,
375            state,
376            refresh_interval,
377            refresh_timeout,
378            fields: fields.into_iter().collect(),
379            tags: tags.into_iter().collect(),
380        }
381    }
382
383    async fn run(&mut self) {
384        loop {
385            match self.refresh_metadata().await {
386                Ok(_) => {
387                    emit!(AwsEc2MetadataRefreshSuccessful);
388                }
389                Err(error) => {
390                    emit!(AwsEc2MetadataRefreshError { error });
391                }
392            }
393
394            sleep(self.refresh_interval).await;
395        }
396    }
397
398    pub async fn get_token(&mut self) -> Result<Bytes, crate::Error> {
399        if let Some((token, next_refresh)) = self.token.clone() {
400            // If the next refresh is greater (in the future) than
401            // the current time we can return the token since its still valid
402            // otherwise lets refresh it.
403            if next_refresh > Instant::now() {
404                return Ok(token);
405            }
406        }
407
408        let mut parts = self.host.clone().into_parts();
409        parts.path_and_query = Some(API_TOKEN.clone());
410        let uri = Uri::from_parts(parts)?;
411
412        let req = Request::put(uri)
413            .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
414            .body(Body::empty())?;
415
416        let res = tokio::time::timeout(self.refresh_timeout, self.client.send(req))
417            .await?
418            .map_err(crate::Error::from)
419            .and_then(|res| match res.status() {
420                StatusCode::OK => Ok(res),
421                status_code => Err(UnexpectedHttpStatusError {
422                    status: status_code,
423                }
424                .into()),
425            })?;
426
427        let token = body_to_bytes(res.into_body()).await?;
428
429        let next_refresh = Instant::now() + Duration::from_secs(21600);
430        self.token = Some((token.clone(), next_refresh));
431
432        Ok(token)
433    }
434
435    pub async fn get_document(&mut self) -> Result<Option<IdentityDocument>, crate::Error> {
436        self.get_metadata(&DYNAMIC_DOCUMENT)
437            .await?
438            .map(|body| {
439                serde_json::from_slice(&body[..])
440                    .context(ParseIdentityDocumentSnafu {})
441                    .map_err(Into::into)
442            })
443            .transpose()
444    }
445
446    pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
447        let mut new_state = vec![];
448
449        // Fetch all resources, _then_ add them to the state map.
450        if let Some(document) = self.get_document().await? {
451            if self.fields.contains(ACCOUNT_ID_KEY) {
452                new_state.push((self.keys.account_id_key.clone(), document.account_id.into()));
453            }
454
455            if self.fields.contains(AMI_ID_KEY) {
456                new_state.push((self.keys.ami_id_key.clone(), document.image_id.into()));
457            }
458
459            if self.fields.contains(INSTANCE_ID_KEY) {
460                new_state.push((
461                    self.keys.instance_id_key.clone(),
462                    document.instance_id.into(),
463                ));
464            }
465
466            if self.fields.contains(INSTANCE_TYPE_KEY) {
467                new_state.push((
468                    self.keys.instance_type_key.clone(),
469                    document.instance_type.into(),
470                ));
471            }
472
473            if self.fields.contains(REGION_KEY) {
474                new_state.push((self.keys.region_key.clone(), document.region.into()));
475            }
476
477            if self.fields.contains(AVAILABILITY_ZONE_KEY) {
478                if let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await? {
479                    new_state.push((self.keys.availability_zone_key.clone(), availability_zone));
480                }
481            }
482
483            if self.fields.contains(LOCAL_HOSTNAME_KEY) {
484                if let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await? {
485                    new_state.push((self.keys.local_hostname_key.clone(), local_hostname));
486                }
487            }
488
489            if self.fields.contains(LOCAL_IPV4_KEY) {
490                if let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await? {
491                    new_state.push((self.keys.local_ipv4_key.clone(), local_ipv4));
492                }
493            }
494
495            if self.fields.contains(PUBLIC_HOSTNAME_KEY) {
496                if let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await? {
497                    new_state.push((self.keys.public_hostname_key.clone(), public_hostname));
498                }
499            }
500
501            if self.fields.contains(PUBLIC_IPV4_KEY) {
502                if let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await? {
503                    new_state.push((self.keys.public_ipv4_key.clone(), public_ipv4));
504                }
505            }
506
507            if self.fields.contains(SUBNET_ID_KEY) || self.fields.contains(VPC_ID_KEY) {
508                if let Some(mac) = self.get_metadata(&MAC).await? {
509                    let mac = String::from_utf8_lossy(&mac[..]);
510
511                    if self.fields.contains(SUBNET_ID_KEY) {
512                        let subnet_path =
513                            format!("/latest/meta-data/network/interfaces/macs/{mac}/subnet-id");
514
515                        let subnet_path = subnet_path.parse().context(ParsePathSnafu {
516                            value: subnet_path.clone(),
517                        })?;
518
519                        if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
520                            new_state.push((self.keys.subnet_id_key.clone(), subnet_id));
521                        }
522                    }
523
524                    if self.fields.contains(VPC_ID_KEY) {
525                        let vpc_path =
526                            format!("/latest/meta-data/network/interfaces/macs/{mac}/vpc-id");
527
528                        let vpc_path = vpc_path.parse().context(ParsePathSnafu {
529                            value: vpc_path.clone(),
530                        })?;
531
532                        if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
533                            new_state.push((self.keys.vpc_id_key.clone(), vpc_id));
534                        }
535                    }
536                }
537            }
538
539            if self.fields.contains(ROLE_NAME_KEY) {
540                if let Some(role_names) = self.get_metadata(&ROLE_NAME).await? {
541                    let role_names = String::from_utf8_lossy(&role_names[..]);
542
543                    for (i, role_name) in role_names.lines().enumerate() {
544                        new_state.push((
545                            MetadataKey {
546                                log_path: self
547                                    .keys
548                                    .role_name_key
549                                    .log_path
550                                    .with_index_appended(i as isize),
551                                metric_tag: format!(
552                                    "{}[{}]",
553                                    self.keys.role_name_key.metric_tag, i
554                                ),
555                            },
556                            role_name.to_string().into(),
557                        ));
558                    }
559                }
560            }
561
562            for tag in self.tags.clone() {
563                let tag_path = format!("/latest/meta-data/tags/instance/{tag}");
564
565                let tag_path = tag_path.parse().context(ParsePathSnafu {
566                    value: tag_path.clone(),
567                })?;
568
569                if let Some(tag_content) = self.get_metadata(&tag_path).await? {
570                    new_state.push((
571                        MetadataKey {
572                            log_path: self.keys.tags_key.log_path.with_field_appended(&tag),
573                            metric_tag: format!("{}[{}]", self.keys.tags_key.metric_tag, &tag),
574                        },
575                        tag_content,
576                    ));
577                }
578            }
579
580            self.state.store(Arc::new(new_state));
581        }
582
583        Ok(())
584    }
585
586    async fn get_metadata(&mut self, path: &PathAndQuery) -> Result<Option<Bytes>, crate::Error> {
587        let token = self
588            .get_token()
589            .await
590            .with_context(|_| FetchTokenSnafu {})?;
591
592        let mut parts = self.host.clone().into_parts();
593
594        parts.path_and_query = Some(path.clone());
595
596        let uri = Uri::from_parts(parts)?;
597
598        debug!(message = "Sending metadata request.", %uri);
599
600        let req = Request::get(uri)
601            .header(TOKEN_HEADER.as_ref(), token.as_ref())
602            .body(Body::empty())?;
603
604        match tokio::time::timeout(self.refresh_timeout, self.client.send(req))
605            .await?
606            .map_err(crate::Error::from)
607            .and_then(|res| match res.status() {
608                StatusCode::OK => Ok(Some(res)),
609                StatusCode::NOT_FOUND => Ok(None),
610                status_code => Err(UnexpectedHttpStatusError {
611                    status: status_code,
612                }
613                .into()),
614            })? {
615            Some(res) => {
616                let body = body_to_bytes(res.into_body()).await?;
617                Ok(Some(body))
618            }
619            None => Ok(None),
620        }
621    }
622}
623
624// This creates a simplified string from the namespace. Since the namespace is technically
625// a target path, it can contain syntax that is undesirable for a metric tag (such as prefix, quotes, etc)
626// This is mainly used for backwards compatibility.
627// see: https://github.com/vectordotdev/vector/issues/14931
628fn create_metric_namespace(namespace: &OwnedTargetPath) -> String {
629    let mut output = String::new();
630    for segment in &namespace.path.segments {
631        if !output.is_empty() {
632            output += ".";
633        }
634        match segment {
635            OwnedSegment::Field(field) => {
636                output += field;
637            }
638            OwnedSegment::Index(i) => {
639                output += &i.to_string();
640            }
641        }
642    }
643    output
644}
645
646fn create_key(namespace: &Option<OwnedTargetPath>, key: &str) -> MetadataKey {
647    if let Some(namespace) = namespace {
648        MetadataKey {
649            log_path: namespace.with_field_appended(key),
650            metric_tag: format!("{}.{}", create_metric_namespace(namespace), key),
651        }
652    } else {
653        MetadataKey {
654            log_path: OwnedTargetPath::event(owned_value_path!(key)),
655            metric_tag: key.to_owned(),
656        }
657    }
658}
659
660impl Keys {
661    pub fn new(namespace: Option<OptionalTargetPath>) -> Self {
662        let namespace = namespace.and_then(|namespace| namespace.path);
663
664        Keys {
665            account_id_key: create_key(&namespace, ACCOUNT_ID_KEY),
666            ami_id_key: create_key(&namespace, AMI_ID_KEY),
667            availability_zone_key: create_key(&namespace, AVAILABILITY_ZONE_KEY),
668            instance_id_key: create_key(&namespace, INSTANCE_ID_KEY),
669            instance_type_key: create_key(&namespace, INSTANCE_TYPE_KEY),
670            local_hostname_key: create_key(&namespace, LOCAL_HOSTNAME_KEY),
671            local_ipv4_key: create_key(&namespace, LOCAL_IPV4_KEY),
672            public_hostname_key: create_key(&namespace, PUBLIC_HOSTNAME_KEY),
673            public_ipv4_key: create_key(&namespace, PUBLIC_IPV4_KEY),
674            region_key: create_key(&namespace, REGION_KEY),
675            subnet_id_key: create_key(&namespace, SUBNET_ID_KEY),
676            vpc_id_key: create_key(&namespace, VPC_ID_KEY),
677            role_name_key: create_key(&namespace, ROLE_NAME_KEY),
678            tags_key: create_key(&namespace, TAGS_KEY),
679        }
680    }
681}
682
683#[derive(Debug)]
684struct UnexpectedHttpStatusError {
685    status: http::StatusCode,
686}
687
688impl fmt::Display for UnexpectedHttpStatusError {
689    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
690        write!(f, "got unexpected status code: {}", self.status)
691    }
692}
693
694impl error::Error for UnexpectedHttpStatusError {}
695
696#[derive(Debug, snafu::Snafu)]
697enum Ec2MetadataError {
698    #[snafu(display("Unable to fetch metadata authentication token: {}.", source))]
699    FetchToken { source: crate::Error },
700    #[snafu(display("Unable to parse identity document: {}.", source))]
701    ParseIdentityDocument { source: serde_json::Error },
702    #[snafu(display("Unable to parse metadata path {}, {}.", value, source))]
703    ParsePath {
704        value: String,
705        source: http::uri::InvalidUri,
706    },
707}
708
709#[cfg(test)]
710mod test {
711    use crate::config::schema::Definition;
712    use crate::config::{LogNamespace, OutputId, TransformConfig};
713    use crate::transforms::aws_ec2_metadata::Ec2Metadata;
714    use vector_lib::enrichment::TableRegistry;
715    use vector_lib::lookup::OwnedTargetPath;
716    use vrl::owned_value_path;
717    use vrl::value::Kind;
718
719    #[tokio::test]
720    async fn schema_def_with_string_input() {
721        let transform_config = Ec2Metadata {
722            namespace: Some(OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into()),
723            ..Default::default()
724        };
725
726        let input_definition =
727            Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
728
729        let mut outputs = transform_config.outputs(
730            TableRegistry::default(),
731            &[(OutputId::dummy(), input_definition)],
732            LogNamespace::Vector,
733        );
734        assert_eq!(outputs.len(), 1);
735        let output = outputs.pop().unwrap();
736        let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
737        assert!(actual_schema_def.event_kind().is_object());
738    }
739}
740
741#[cfg(feature = "aws-ec2-metadata-integration-tests")]
742#[cfg(test)]
743mod integration_tests {
744    use tokio::sync::mpsc;
745    use tokio_stream::wrappers::ReceiverStream;
746    use vector_lib::lookup::lookup_v2::{OwnedSegment, OwnedValuePath};
747    use vector_lib::lookup::{event_path, PathPrefix};
748
749    use super::*;
750    use crate::{
751        event::{metric, LogEvent, Metric},
752        test_util::{components::assert_transform_compliance, next_addr},
753        transforms::test::create_topology,
754    };
755    use vector_lib::assert_event_data_eq;
756    use vrl::value::{ObjectMap, Value};
757    use warp::Filter;
758
759    fn ec2_metadata_address() -> String {
760        std::env::var("EC2_METADATA_ADDRESS").unwrap_or_else(|_| "http://localhost:1338".into())
761    }
762
763    fn expected_log_fields() -> Vec<(OwnedValuePath, &'static str)> {
764        vec![
765            (
766                vec![OwnedSegment::field(AVAILABILITY_ZONE_KEY)].into(),
767                "us-east-1a",
768            ),
769            (
770                vec![OwnedSegment::field(PUBLIC_IPV4_KEY)].into(),
771                "192.0.2.54",
772            ),
773            (
774                vec![OwnedSegment::field(PUBLIC_HOSTNAME_KEY)].into(),
775                "ec2-192-0-2-54.compute-1.amazonaws.com",
776            ),
777            (
778                vec![OwnedSegment::field(LOCAL_IPV4_KEY)].into(),
779                "172.16.34.43",
780            ),
781            (
782                vec![OwnedSegment::field(LOCAL_HOSTNAME_KEY)].into(),
783                "ip-172-16-34-43.ec2.internal",
784            ),
785            (
786                vec![OwnedSegment::field(INSTANCE_ID_KEY)].into(),
787                "i-1234567890abcdef0",
788            ),
789            (
790                vec![OwnedSegment::field(ACCOUNT_ID_KEY)].into(),
791                "0123456789",
792            ),
793            (
794                vec![OwnedSegment::field(AMI_ID_KEY)].into(),
795                "ami-0b69ea66ff7391e80",
796            ),
797            (
798                vec![OwnedSegment::field(INSTANCE_TYPE_KEY)].into(),
799                "m4.xlarge",
800            ),
801            (vec![OwnedSegment::field(REGION_KEY)].into(), "us-east-1"),
802            (vec![OwnedSegment::field(VPC_ID_KEY)].into(), "vpc-d295a6a7"),
803            (
804                vec![OwnedSegment::field(SUBNET_ID_KEY)].into(),
805                "subnet-0ac62554",
806            ),
807            (owned_value_path!("role-name", 0), "baskinc-role"),
808            (owned_value_path!("tags", "Name"), "test-instance"),
809            (owned_value_path!("tags", "Test"), "test-tag"),
810        ]
811    }
812
813    fn expected_metric_fields() -> Vec<(&'static str, &'static str)> {
814        vec![
815            (AVAILABILITY_ZONE_KEY, "us-east-1a"),
816            (PUBLIC_IPV4_KEY, "192.0.2.54"),
817            (
818                PUBLIC_HOSTNAME_KEY,
819                "ec2-192-0-2-54.compute-1.amazonaws.com",
820            ),
821            (LOCAL_IPV4_KEY, "172.16.34.43"),
822            (LOCAL_HOSTNAME_KEY, "ip-172-16-34-43.ec2.internal"),
823            (INSTANCE_ID_KEY, "i-1234567890abcdef0"),
824            (ACCOUNT_ID_KEY, "0123456789"),
825            (AMI_ID_KEY, "ami-0b69ea66ff7391e80"),
826            (INSTANCE_TYPE_KEY, "m4.xlarge"),
827            (REGION_KEY, "us-east-1"),
828            (VPC_ID_KEY, "vpc-d295a6a7"),
829            (SUBNET_ID_KEY, "subnet-0ac62554"),
830            ("role-name[0]", "baskinc-role"),
831            ("tags[Name]", "test-instance"),
832            ("tags[Test]", "test-tag"),
833        ]
834    }
835
836    fn make_metric() -> Metric {
837        Metric::new(
838            "event",
839            metric::MetricKind::Incremental,
840            metric::MetricValue::Counter { value: 1.0 },
841        )
842    }
843
844    #[test]
845    fn generate_config() {
846        crate::test_util::test_generate_config::<Ec2Metadata>();
847    }
848
849    #[tokio::test]
850    async fn enrich_log() {
851        assert_transform_compliance(async {
852            let mut fields = default_fields();
853            fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
854
855            let tags = vec![
856                String::from("Name"),
857                String::from("Test"),
858                String::from("MISSING_TAG"),
859            ];
860
861            let transform_config = Ec2Metadata {
862                endpoint: ec2_metadata_address(),
863                fields,
864                tags,
865                ..Default::default()
866            };
867
868            let (tx, rx) = mpsc::channel(1);
869            let (topology, mut out) =
870                create_topology(ReceiverStream::new(rx), transform_config).await;
871
872            // We need to sleep to let the background task fetch the data.
873            sleep(Duration::from_secs(1)).await;
874
875            let log = LogEvent::default();
876            let mut expected_log = log.clone();
877            for (k, v) in expected_log_fields().iter().cloned() {
878                expected_log.insert((PathPrefix::Event, &k), v);
879            }
880
881            tx.send(log.into()).await.unwrap();
882
883            let event = out.recv().await.unwrap();
884            assert_event_data_eq!(event.into_log(), expected_log);
885
886            drop(tx);
887            topology.stop().await;
888            assert_eq!(out.recv().await, None);
889        })
890        .await;
891    }
892
893    #[tokio::test(flavor = "multi_thread")]
894    async fn timeout() {
895        let addr = next_addr();
896
897        async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
898            tokio::time::sleep(Duration::from_secs(3)).await;
899            Ok("I waited 3 seconds!")
900        }
901
902        let slow = warp::any().and_then(sleepy);
903        let server = warp::serve(slow).bind(addr);
904        let _server = tokio::spawn(server);
905
906        let config = Ec2Metadata {
907            endpoint: format!("http://{addr}"),
908            refresh_timeout_secs: Duration::from_secs(1),
909            ..Default::default()
910        };
911
912        match config.build(&TransformContext::default()).await {
913            Ok(_) => panic!("expected timeout failure"),
914            // cannot create tokio::time::error::Elapsed to compare with since constructor is
915            // private
916            Err(err) => assert_eq!(
917                err.to_string(),
918                "Unable to fetch metadata authentication token: deadline has elapsed."
919            ),
920        }
921    }
922
923    // validates the configuration setting 'required'=false allows vector to run
924    #[tokio::test(flavor = "multi_thread")]
925    async fn not_required() {
926        let addr = next_addr();
927
928        async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
929            tokio::time::sleep(Duration::from_secs(3)).await;
930            Ok("I waited 3 seconds!")
931        }
932
933        let slow = warp::any().and_then(sleepy);
934        let server = warp::serve(slow).bind(addr);
935        let _server = tokio::spawn(server);
936
937        let config = Ec2Metadata {
938            endpoint: format!("http://{addr}"),
939            refresh_timeout_secs: Duration::from_secs(1),
940            required: false,
941            ..Default::default()
942        };
943
944        assert!(
945            config.build(&TransformContext::default()).await.is_ok(),
946            "expected no failure because 'required' config value set to false"
947        );
948    }
949
950    #[tokio::test]
951    async fn enrich_metric() {
952        assert_transform_compliance(async {
953            let mut fields = default_fields();
954            fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
955
956            let tags = vec![
957                String::from("Name"),
958                String::from("Test"),
959                String::from("MISSING_TAG"),
960            ];
961
962            let transform_config = Ec2Metadata {
963                endpoint: ec2_metadata_address(),
964                fields,
965                tags,
966                ..Default::default()
967            };
968
969            let (tx, rx) = mpsc::channel(1);
970            let (topology, mut out) =
971                create_topology(ReceiverStream::new(rx), transform_config).await;
972
973            // We need to sleep to let the background task fetch the data.
974            sleep(Duration::from_secs(1)).await;
975
976            let metric = make_metric();
977            let mut expected_metric = metric.clone();
978            for (k, v) in expected_metric_fields().iter() {
979                expected_metric.replace_tag(k.to_string(), v.to_string());
980            }
981
982            tx.send(metric.into()).await.unwrap();
983
984            let event = out.recv().await.unwrap();
985            assert_event_data_eq!(event.into_metric(), expected_metric);
986
987            drop(tx);
988            topology.stop().await;
989            assert_eq!(out.recv().await, None);
990        })
991        .await;
992    }
993
994    #[tokio::test]
995    async fn fields_log() {
996        assert_transform_compliance(async {
997            let transform_config = Ec2Metadata {
998                endpoint: ec2_metadata_address(),
999                fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1000                tags: vec![
1001                    String::from("Name"),
1002                    String::from("Test"),
1003                    String::from("MISSING_TAG"),
1004                ],
1005                ..Default::default()
1006            };
1007
1008            let (tx, rx) = mpsc::channel(1);
1009            let (topology, mut out) =
1010                create_topology(ReceiverStream::new(rx), transform_config).await;
1011
1012            // We need to sleep to let the background task fetch the data.
1013            sleep(Duration::from_secs(1)).await;
1014
1015            let log = LogEvent::default();
1016            let mut expected_log = log.clone();
1017            expected_log.insert(format!("\"{PUBLIC_IPV4_KEY}\"").as_str(), "192.0.2.54");
1018            expected_log.insert(format!("\"{REGION_KEY}\"").as_str(), "us-east-1");
1019            expected_log.insert(
1020                format!("\"{TAGS_KEY}\"").as_str(),
1021                ObjectMap::from([
1022                    ("Name".into(), Value::from("test-instance")),
1023                    ("Test".into(), Value::from("test-tag")),
1024                ]),
1025            );
1026
1027            tx.send(log.into()).await.unwrap();
1028
1029            let event = out.recv().await.unwrap();
1030            assert_event_data_eq!(event.into_log(), expected_log);
1031
1032            drop(tx);
1033            topology.stop().await;
1034            assert_eq!(out.recv().await, None);
1035        })
1036        .await;
1037    }
1038
1039    #[tokio::test]
1040    async fn fields_metric() {
1041        assert_transform_compliance(async {
1042            let transform_config = Ec2Metadata {
1043                endpoint: ec2_metadata_address(),
1044                fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1045                tags: vec![
1046                    String::from("Name"),
1047                    String::from("Test"),
1048                    String::from("MISSING_TAG"),
1049                ],
1050                ..Default::default()
1051            };
1052
1053            let (tx, rx) = mpsc::channel(1);
1054            let (topology, mut out) =
1055                create_topology(ReceiverStream::new(rx), transform_config).await;
1056
1057            // We need to sleep to let the background task fetch the data.
1058            sleep(Duration::from_secs(1)).await;
1059
1060            let metric = make_metric();
1061            let mut expected_metric = metric.clone();
1062            expected_metric.replace_tag(PUBLIC_IPV4_KEY.to_string(), "192.0.2.54".to_string());
1063            expected_metric.replace_tag(REGION_KEY.to_string(), "us-east-1".to_string());
1064            expected_metric.replace_tag(
1065                format!("{}[{}]", TAGS_KEY, "Name"),
1066                "test-instance".to_string(),
1067            );
1068            expected_metric
1069                .replace_tag(format!("{}[{}]", TAGS_KEY, "Test"), "test-tag".to_string());
1070
1071            tx.send(metric.into()).await.unwrap();
1072
1073            let event = out.recv().await.unwrap();
1074            assert_event_data_eq!(event.into_metric(), expected_metric);
1075
1076            drop(tx);
1077            topology.stop().await;
1078            assert_eq!(out.recv().await, None);
1079        })
1080        .await;
1081    }
1082
1083    #[tokio::test]
1084    async fn namespace_log() {
1085        {
1086            assert_transform_compliance(async {
1087                let transform_config = Ec2Metadata {
1088                    endpoint: ec2_metadata_address(),
1089                    namespace: Some(
1090                        OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1091                    ),
1092                    ..Default::default()
1093                };
1094
1095                let (tx, rx) = mpsc::channel(1);
1096                let (topology, mut out) =
1097                    create_topology(ReceiverStream::new(rx), transform_config).await;
1098
1099                // We need to sleep to let the background task fetch the data.
1100                sleep(Duration::from_secs(1)).await;
1101
1102                let log = LogEvent::default();
1103
1104                tx.send(log.into()).await.unwrap();
1105
1106                let event = out.recv().await.unwrap();
1107
1108                assert_eq!(
1109                    event.as_log().get("ec2.metadata.\"availability-zone\""),
1110                    Some(&"us-east-1a".into())
1111                );
1112
1113                drop(tx);
1114                topology.stop().await;
1115                assert_eq!(out.recv().await, None);
1116            })
1117            .await;
1118        }
1119
1120        {
1121            assert_transform_compliance(async {
1122                // Set an empty namespace to ensure we don't prepend one.
1123                let transform_config = Ec2Metadata {
1124                    endpoint: ec2_metadata_address(),
1125                    namespace: Some(OptionalTargetPath::none()),
1126                    ..Default::default()
1127                };
1128
1129                let (tx, rx) = mpsc::channel(1);
1130                let (topology, mut out) =
1131                    create_topology(ReceiverStream::new(rx), transform_config).await;
1132
1133                // We need to sleep to let the background task fetch the data.
1134                sleep(Duration::from_secs(1)).await;
1135
1136                let log = LogEvent::default();
1137
1138                tx.send(log.into()).await.unwrap();
1139
1140                let event = out.recv().await.unwrap();
1141                assert_eq!(
1142                    event.as_log().get(event_path!(AVAILABILITY_ZONE_KEY)),
1143                    Some(&"us-east-1a".into())
1144                );
1145
1146                drop(tx);
1147                topology.stop().await;
1148                assert_eq!(out.recv().await, None);
1149            })
1150            .await;
1151        }
1152    }
1153
1154    #[tokio::test]
1155    async fn namespace_metric() {
1156        {
1157            assert_transform_compliance(async {
1158                let transform_config = Ec2Metadata {
1159                    endpoint: ec2_metadata_address(),
1160                    namespace: Some(
1161                        OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1162                    ),
1163                    ..Default::default()
1164                };
1165
1166                let (tx, rx) = mpsc::channel(1);
1167                let (topology, mut out) =
1168                    create_topology(ReceiverStream::new(rx), transform_config).await;
1169
1170                // We need to sleep to let the background task fetch the data.
1171                sleep(Duration::from_secs(1)).await;
1172
1173                let metric = make_metric();
1174
1175                tx.send(metric.into()).await.unwrap();
1176
1177                let event = out.recv().await.unwrap();
1178                assert_eq!(
1179                    event
1180                        .as_metric()
1181                        .tag_value("ec2.metadata.availability-zone"),
1182                    Some("us-east-1a".to_string())
1183                );
1184
1185                drop(tx);
1186                topology.stop().await;
1187                assert_eq!(out.recv().await, None);
1188            })
1189            .await;
1190        }
1191
1192        {
1193            assert_transform_compliance(async {
1194                // Set an empty namespace to ensure we don't prepend one.
1195                let transform_config = Ec2Metadata {
1196                    endpoint: ec2_metadata_address(),
1197                    namespace: Some(OptionalTargetPath::none()),
1198                    ..Default::default()
1199                };
1200
1201                let (tx, rx) = mpsc::channel(1);
1202                let (topology, mut out) =
1203                    create_topology(ReceiverStream::new(rx), transform_config).await;
1204
1205                // We need to sleep to let the background task fetch the data.
1206                sleep(Duration::from_secs(1)).await;
1207
1208                let metric = make_metric();
1209
1210                tx.send(metric.into()).await.unwrap();
1211
1212                let event = out.recv().await.unwrap();
1213                assert_eq!(
1214                    event.as_metric().tag_value(AVAILABILITY_ZONE_KEY),
1215                    Some("us-east-1a".to_string())
1216                );
1217
1218                drop(tx);
1219                topology.stop().await;
1220                assert_eq!(out.recv().await, None);
1221            })
1222            .await;
1223        }
1224    }
1225}