1use std::sync::{Arc, LazyLock};
2use std::{collections::HashSet, error, fmt, future::ready, pin::Pin};
3
4use arc_swap::ArcSwap;
5use bytes::Bytes;
6use futures::{Stream, StreamExt};
7use http::{uri::PathAndQuery, Request, StatusCode, Uri};
8use hyper::{body::to_bytes as body_to_bytes, Body};
9use serde::Deserialize;
10use serde_with::serde_as;
11use snafu::ResultExt as _;
12use tokio::time::{sleep, Duration, Instant};
13use tracing::Instrument;
14use vector_lib::config::LogNamespace;
15use vector_lib::configurable::configurable_component;
16use vector_lib::lookup::lookup_v2::{OptionalTargetPath, OwnedSegment};
17use vector_lib::lookup::owned_value_path;
18use vector_lib::lookup::OwnedTargetPath;
19use vrl::value::kind::Collection;
20use vrl::value::Kind;
21
22use crate::config::OutputId;
23use crate::{
24 config::{DataType, Input, ProxyConfig, TransformConfig, TransformContext, TransformOutput},
25 event::Event,
26 http::HttpClient,
27 internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful},
28 schema,
29 transforms::{TaskTransform, Transform},
30};
31
32const ACCOUNT_ID_KEY: &str = "account-id";
33const AMI_ID_KEY: &str = "ami-id";
34const AVAILABILITY_ZONE_KEY: &str = "availability-zone";
35const INSTANCE_ID_KEY: &str = "instance-id";
36const INSTANCE_TYPE_KEY: &str = "instance-type";
37const LOCAL_HOSTNAME_KEY: &str = "local-hostname";
38const LOCAL_IPV4_KEY: &str = "local-ipv4";
39const PUBLIC_HOSTNAME_KEY: &str = "public-hostname";
40const PUBLIC_IPV4_KEY: &str = "public-ipv4";
41const REGION_KEY: &str = "region";
42const SUBNET_ID_KEY: &str = "subnet-id";
43const VPC_ID_KEY: &str = "vpc-id";
44const ROLE_NAME_KEY: &str = "role-name";
45const TAGS_KEY: &str = "tags";
46
47static AVAILABILITY_ZONE: LazyLock<PathAndQuery> =
48 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/placement/availability-zone"));
49static LOCAL_HOSTNAME: LazyLock<PathAndQuery> =
50 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-hostname"));
51static LOCAL_IPV4: LazyLock<PathAndQuery> =
52 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-ipv4"));
53static PUBLIC_HOSTNAME: LazyLock<PathAndQuery> =
54 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-hostname"));
55static PUBLIC_IPV4: LazyLock<PathAndQuery> =
56 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-ipv4"));
57static ROLE_NAME: LazyLock<PathAndQuery> =
58 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/iam/security-credentials/"));
59static MAC: LazyLock<PathAndQuery> =
60 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/mac"));
61static DYNAMIC_DOCUMENT: LazyLock<PathAndQuery> =
62 LazyLock::new(|| PathAndQuery::from_static("/latest/dynamic/instance-identity/document"));
63static DEFAULT_FIELD_ALLOWLIST: &[&str] = &[
64 AMI_ID_KEY,
65 AVAILABILITY_ZONE_KEY,
66 INSTANCE_ID_KEY,
67 INSTANCE_TYPE_KEY,
68 LOCAL_HOSTNAME_KEY,
69 LOCAL_IPV4_KEY,
70 PUBLIC_HOSTNAME_KEY,
71 PUBLIC_IPV4_KEY,
72 REGION_KEY,
73 SUBNET_ID_KEY,
74 VPC_ID_KEY,
75 ROLE_NAME_KEY,
76];
77static API_TOKEN: LazyLock<PathAndQuery> =
78 LazyLock::new(|| PathAndQuery::from_static("/latest/api/token"));
79static TOKEN_HEADER: LazyLock<Bytes> = LazyLock::new(|| Bytes::from("X-aws-ec2-metadata-token"));
80
81#[serde_as]
83#[configurable_component(transform(
84 "aws_ec2_metadata",
85 "Parse metadata emitted by AWS EC2 instances."
86))]
87#[derive(Clone, Debug, Derivative)]
88#[derivative(Default)]
89pub struct Ec2Metadata {
90 #[serde(alias = "host", default = "default_endpoint")]
92 #[derivative(Default(value = "default_endpoint()"))]
93 endpoint: String,
94
95 #[configurable(metadata(
97 docs::examples = "",
98 docs::examples = "ec2",
99 docs::examples = "aws.ec2",
100 ))]
101 namespace: Option<OptionalTargetPath>,
102
103 #[serde(default = "default_refresh_interval_secs")]
105 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
106 #[derivative(Default(value = "default_refresh_interval_secs()"))]
107 refresh_interval_secs: Duration,
108
109 #[serde(default = "default_fields")]
111 #[derivative(Default(value = "default_fields()"))]
112 #[configurable(metadata(docs::examples = "instance-id", docs::examples = "local-hostname",))]
113 fields: Vec<String>,
114
115 #[serde(default = "default_tags")]
117 #[derivative(Default(value = "default_tags()"))]
118 #[configurable(metadata(docs::examples = "Name", docs::examples = "Project",))]
119 tags: Vec<String>,
120
121 #[serde(default = "default_refresh_timeout_secs")]
123 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
124 #[derivative(Default(value = "default_refresh_timeout_secs()"))]
125 refresh_timeout_secs: Duration,
126
127 #[configurable(derived)]
128 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
129 proxy: ProxyConfig,
130
131 #[serde(default = "default_required")]
133 #[derivative(Default(value = "default_required()"))]
134 required: bool,
135}
136
137fn default_endpoint() -> String {
138 String::from("http://169.254.169.254")
139}
140
141const fn default_refresh_interval_secs() -> Duration {
142 Duration::from_secs(10)
143}
144
145const fn default_refresh_timeout_secs() -> Duration {
146 Duration::from_secs(1)
147}
148
149fn default_fields() -> Vec<String> {
150 DEFAULT_FIELD_ALLOWLIST
151 .iter()
152 .map(|s| s.to_string())
153 .collect()
154}
155
156const fn default_tags() -> Vec<String> {
157 Vec::<String>::new()
158}
159
160const fn default_required() -> bool {
161 true
162}
163
164#[derive(Clone, Debug)]
165pub struct Ec2MetadataTransform {
166 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
167}
168
169#[derive(Debug, Clone)]
170struct MetadataKey {
171 log_path: OwnedTargetPath,
172 metric_tag: String,
173}
174
175#[derive(Debug)]
176struct Keys {
177 account_id_key: MetadataKey,
178 ami_id_key: MetadataKey,
179 availability_zone_key: MetadataKey,
180 instance_id_key: MetadataKey,
181 instance_type_key: MetadataKey,
182 local_hostname_key: MetadataKey,
183 local_ipv4_key: MetadataKey,
184 public_hostname_key: MetadataKey,
185 public_ipv4_key: MetadataKey,
186 region_key: MetadataKey,
187 subnet_id_key: MetadataKey,
188 vpc_id_key: MetadataKey,
189 role_name_key: MetadataKey,
190 tags_key: MetadataKey,
191}
192
193impl_generate_config_from_default!(Ec2Metadata);
194
195#[async_trait::async_trait]
196#[typetag::serde(name = "aws_ec2_metadata")]
197impl TransformConfig for Ec2Metadata {
198 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
199 let state = Arc::new(ArcSwap::new(Arc::new(vec![])));
200
201 let keys = Keys::new(self.namespace.clone());
202 let host = Uri::from_maybe_shared(self.endpoint.clone()).unwrap();
203 let refresh_interval = self.refresh_interval_secs;
204 let fields = self.fields.clone();
205 let tags = self.tags.clone();
206 let refresh_timeout = self.refresh_timeout_secs;
207 let required = self.required;
208
209 let proxy = ProxyConfig::merge_with_env(&context.globals.proxy, &self.proxy);
210 let http_client = HttpClient::new(None, &proxy)?;
211
212 let mut client = MetadataClient::new(
213 http_client,
214 host,
215 keys,
216 Arc::clone(&state),
217 refresh_interval,
218 refresh_timeout,
219 fields,
220 tags,
221 );
222
223 if let Err(error) = client.refresh_metadata().await {
225 if required {
226 return Err(error);
227 } else {
228 emit!(AwsEc2MetadataRefreshError { error });
229 }
230 }
231
232 tokio::spawn(
233 async move {
234 client.run().await;
235 }
236 .instrument(info_span!("aws_ec2_metadata: worker").or_current()),
238 );
239
240 Ok(Transform::event_task(Ec2MetadataTransform { state }))
241 }
242
243 fn input(&self) -> Input {
244 Input::new(DataType::Metric | DataType::Log)
245 }
246
247 fn outputs(
248 &self,
249 _: vector_lib::enrichment::TableRegistry,
250 input_definitions: &[(OutputId, schema::Definition)],
251 _: LogNamespace,
252 ) -> Vec<TransformOutput> {
253 let added_keys = Keys::new(self.namespace.clone());
254
255 let paths = [
256 &added_keys.account_id_key.log_path,
257 &added_keys.ami_id_key.log_path,
258 &added_keys.availability_zone_key.log_path,
259 &added_keys.instance_id_key.log_path,
260 &added_keys.instance_type_key.log_path,
261 &added_keys.local_hostname_key.log_path,
262 &added_keys.local_ipv4_key.log_path,
263 &added_keys.public_hostname_key.log_path,
264 &added_keys.public_ipv4_key.log_path,
265 &added_keys.region_key.log_path,
266 &added_keys.subnet_id_key.log_path,
267 &added_keys.vpc_id_key.log_path,
268 &added_keys.role_name_key.log_path,
269 &added_keys.tags_key.log_path,
270 ];
271
272 let schema_definition = input_definitions
273 .iter()
274 .map(|(output, definition)| {
275 let mut schema_definition = definition.clone();
276
277 if !schema_definition.event_kind().contains_object() {
279 *schema_definition.event_kind_mut() = Kind::object(Collection::empty());
280 }
281
282 for path in paths {
283 schema_definition =
284 schema_definition.with_field(path, Kind::bytes().or_undefined(), None);
285 }
286
287 (output.clone(), schema_definition)
288 })
289 .collect();
290
291 vec![TransformOutput::new(
292 DataType::Metric | DataType::Log,
293 schema_definition,
294 )]
295 }
296}
297
298impl TaskTransform<Event> for Ec2MetadataTransform {
299 fn transform(
300 self: Box<Self>,
301 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
302 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
303 where
304 Self: 'static,
305 {
306 let mut inner = self;
307 Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event)))))
308 }
309}
310
311impl Ec2MetadataTransform {
312 fn transform_one(&mut self, mut event: Event) -> Event {
313 let state = self.state.load();
314 match event {
315 Event::Log(ref mut log) => {
316 state.iter().for_each(|(k, v)| {
317 log.insert(&k.log_path, v.clone());
318 });
319 }
320 Event::Metric(ref mut metric) => {
321 state.iter().for_each(|(k, v)| {
322 metric
323 .replace_tag(k.metric_tag.clone(), String::from_utf8_lossy(v).to_string());
324 });
325 }
326 Event::Trace(_) => panic!("Traces are not supported."),
327 }
328 event
329 }
330}
331
332struct MetadataClient {
333 client: HttpClient<Body>,
334 host: Uri,
335 token: Option<(Bytes, Instant)>,
336 keys: Keys,
337 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
338 refresh_interval: Duration,
339 refresh_timeout: Duration,
340 fields: HashSet<String>,
341 tags: HashSet<String>,
342}
343
344#[derive(Debug, Deserialize)]
345#[serde(rename_all = "camelCase")]
346#[allow(dead_code)] struct IdentityDocument {
348 account_id: String,
349 architecture: String,
350 image_id: String,
351 instance_id: String,
352 instance_type: String,
353 private_ip: String,
354 region: String,
355 version: String,
356}
357
358impl MetadataClient {
359 #[allow(clippy::too_many_arguments)]
360 pub fn new(
361 client: HttpClient<Body>,
362 host: Uri,
363 keys: Keys,
364 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
365 refresh_interval: Duration,
366 refresh_timeout: Duration,
367 fields: Vec<String>,
368 tags: Vec<String>,
369 ) -> Self {
370 Self {
371 client,
372 host,
373 token: None,
374 keys,
375 state,
376 refresh_interval,
377 refresh_timeout,
378 fields: fields.into_iter().collect(),
379 tags: tags.into_iter().collect(),
380 }
381 }
382
383 async fn run(&mut self) {
384 loop {
385 match self.refresh_metadata().await {
386 Ok(_) => {
387 emit!(AwsEc2MetadataRefreshSuccessful);
388 }
389 Err(error) => {
390 emit!(AwsEc2MetadataRefreshError { error });
391 }
392 }
393
394 sleep(self.refresh_interval).await;
395 }
396 }
397
398 pub async fn get_token(&mut self) -> Result<Bytes, crate::Error> {
399 if let Some((token, next_refresh)) = self.token.clone() {
400 if next_refresh > Instant::now() {
404 return Ok(token);
405 }
406 }
407
408 let mut parts = self.host.clone().into_parts();
409 parts.path_and_query = Some(API_TOKEN.clone());
410 let uri = Uri::from_parts(parts)?;
411
412 let req = Request::put(uri)
413 .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
414 .body(Body::empty())?;
415
416 let res = tokio::time::timeout(self.refresh_timeout, self.client.send(req))
417 .await?
418 .map_err(crate::Error::from)
419 .and_then(|res| match res.status() {
420 StatusCode::OK => Ok(res),
421 status_code => Err(UnexpectedHttpStatusError {
422 status: status_code,
423 }
424 .into()),
425 })?;
426
427 let token = body_to_bytes(res.into_body()).await?;
428
429 let next_refresh = Instant::now() + Duration::from_secs(21600);
430 self.token = Some((token.clone(), next_refresh));
431
432 Ok(token)
433 }
434
435 pub async fn get_document(&mut self) -> Result<Option<IdentityDocument>, crate::Error> {
436 self.get_metadata(&DYNAMIC_DOCUMENT)
437 .await?
438 .map(|body| {
439 serde_json::from_slice(&body[..])
440 .context(ParseIdentityDocumentSnafu {})
441 .map_err(Into::into)
442 })
443 .transpose()
444 }
445
446 pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
447 let mut new_state = vec![];
448
449 if let Some(document) = self.get_document().await? {
451 if self.fields.contains(ACCOUNT_ID_KEY) {
452 new_state.push((self.keys.account_id_key.clone(), document.account_id.into()));
453 }
454
455 if self.fields.contains(AMI_ID_KEY) {
456 new_state.push((self.keys.ami_id_key.clone(), document.image_id.into()));
457 }
458
459 if self.fields.contains(INSTANCE_ID_KEY) {
460 new_state.push((
461 self.keys.instance_id_key.clone(),
462 document.instance_id.into(),
463 ));
464 }
465
466 if self.fields.contains(INSTANCE_TYPE_KEY) {
467 new_state.push((
468 self.keys.instance_type_key.clone(),
469 document.instance_type.into(),
470 ));
471 }
472
473 if self.fields.contains(REGION_KEY) {
474 new_state.push((self.keys.region_key.clone(), document.region.into()));
475 }
476
477 if self.fields.contains(AVAILABILITY_ZONE_KEY) {
478 if let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await? {
479 new_state.push((self.keys.availability_zone_key.clone(), availability_zone));
480 }
481 }
482
483 if self.fields.contains(LOCAL_HOSTNAME_KEY) {
484 if let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await? {
485 new_state.push((self.keys.local_hostname_key.clone(), local_hostname));
486 }
487 }
488
489 if self.fields.contains(LOCAL_IPV4_KEY) {
490 if let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await? {
491 new_state.push((self.keys.local_ipv4_key.clone(), local_ipv4));
492 }
493 }
494
495 if self.fields.contains(PUBLIC_HOSTNAME_KEY) {
496 if let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await? {
497 new_state.push((self.keys.public_hostname_key.clone(), public_hostname));
498 }
499 }
500
501 if self.fields.contains(PUBLIC_IPV4_KEY) {
502 if let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await? {
503 new_state.push((self.keys.public_ipv4_key.clone(), public_ipv4));
504 }
505 }
506
507 if self.fields.contains(SUBNET_ID_KEY) || self.fields.contains(VPC_ID_KEY) {
508 if let Some(mac) = self.get_metadata(&MAC).await? {
509 let mac = String::from_utf8_lossy(&mac[..]);
510
511 if self.fields.contains(SUBNET_ID_KEY) {
512 let subnet_path =
513 format!("/latest/meta-data/network/interfaces/macs/{mac}/subnet-id");
514
515 let subnet_path = subnet_path.parse().context(ParsePathSnafu {
516 value: subnet_path.clone(),
517 })?;
518
519 if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
520 new_state.push((self.keys.subnet_id_key.clone(), subnet_id));
521 }
522 }
523
524 if self.fields.contains(VPC_ID_KEY) {
525 let vpc_path =
526 format!("/latest/meta-data/network/interfaces/macs/{mac}/vpc-id");
527
528 let vpc_path = vpc_path.parse().context(ParsePathSnafu {
529 value: vpc_path.clone(),
530 })?;
531
532 if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
533 new_state.push((self.keys.vpc_id_key.clone(), vpc_id));
534 }
535 }
536 }
537 }
538
539 if self.fields.contains(ROLE_NAME_KEY) {
540 if let Some(role_names) = self.get_metadata(&ROLE_NAME).await? {
541 let role_names = String::from_utf8_lossy(&role_names[..]);
542
543 for (i, role_name) in role_names.lines().enumerate() {
544 new_state.push((
545 MetadataKey {
546 log_path: self
547 .keys
548 .role_name_key
549 .log_path
550 .with_index_appended(i as isize),
551 metric_tag: format!(
552 "{}[{}]",
553 self.keys.role_name_key.metric_tag, i
554 ),
555 },
556 role_name.to_string().into(),
557 ));
558 }
559 }
560 }
561
562 for tag in self.tags.clone() {
563 let tag_path = format!("/latest/meta-data/tags/instance/{tag}");
564
565 let tag_path = tag_path.parse().context(ParsePathSnafu {
566 value: tag_path.clone(),
567 })?;
568
569 if let Some(tag_content) = self.get_metadata(&tag_path).await? {
570 new_state.push((
571 MetadataKey {
572 log_path: self.keys.tags_key.log_path.with_field_appended(&tag),
573 metric_tag: format!("{}[{}]", self.keys.tags_key.metric_tag, &tag),
574 },
575 tag_content,
576 ));
577 }
578 }
579
580 self.state.store(Arc::new(new_state));
581 }
582
583 Ok(())
584 }
585
586 async fn get_metadata(&mut self, path: &PathAndQuery) -> Result<Option<Bytes>, crate::Error> {
587 let token = self
588 .get_token()
589 .await
590 .with_context(|_| FetchTokenSnafu {})?;
591
592 let mut parts = self.host.clone().into_parts();
593
594 parts.path_and_query = Some(path.clone());
595
596 let uri = Uri::from_parts(parts)?;
597
598 debug!(message = "Sending metadata request.", %uri);
599
600 let req = Request::get(uri)
601 .header(TOKEN_HEADER.as_ref(), token.as_ref())
602 .body(Body::empty())?;
603
604 match tokio::time::timeout(self.refresh_timeout, self.client.send(req))
605 .await?
606 .map_err(crate::Error::from)
607 .and_then(|res| match res.status() {
608 StatusCode::OK => Ok(Some(res)),
609 StatusCode::NOT_FOUND => Ok(None),
610 status_code => Err(UnexpectedHttpStatusError {
611 status: status_code,
612 }
613 .into()),
614 })? {
615 Some(res) => {
616 let body = body_to_bytes(res.into_body()).await?;
617 Ok(Some(body))
618 }
619 None => Ok(None),
620 }
621 }
622}
623
624fn create_metric_namespace(namespace: &OwnedTargetPath) -> String {
629 let mut output = String::new();
630 for segment in &namespace.path.segments {
631 if !output.is_empty() {
632 output += ".";
633 }
634 match segment {
635 OwnedSegment::Field(field) => {
636 output += field;
637 }
638 OwnedSegment::Index(i) => {
639 output += &i.to_string();
640 }
641 }
642 }
643 output
644}
645
646fn create_key(namespace: &Option<OwnedTargetPath>, key: &str) -> MetadataKey {
647 if let Some(namespace) = namespace {
648 MetadataKey {
649 log_path: namespace.with_field_appended(key),
650 metric_tag: format!("{}.{}", create_metric_namespace(namespace), key),
651 }
652 } else {
653 MetadataKey {
654 log_path: OwnedTargetPath::event(owned_value_path!(key)),
655 metric_tag: key.to_owned(),
656 }
657 }
658}
659
660impl Keys {
661 pub fn new(namespace: Option<OptionalTargetPath>) -> Self {
662 let namespace = namespace.and_then(|namespace| namespace.path);
663
664 Keys {
665 account_id_key: create_key(&namespace, ACCOUNT_ID_KEY),
666 ami_id_key: create_key(&namespace, AMI_ID_KEY),
667 availability_zone_key: create_key(&namespace, AVAILABILITY_ZONE_KEY),
668 instance_id_key: create_key(&namespace, INSTANCE_ID_KEY),
669 instance_type_key: create_key(&namespace, INSTANCE_TYPE_KEY),
670 local_hostname_key: create_key(&namespace, LOCAL_HOSTNAME_KEY),
671 local_ipv4_key: create_key(&namespace, LOCAL_IPV4_KEY),
672 public_hostname_key: create_key(&namespace, PUBLIC_HOSTNAME_KEY),
673 public_ipv4_key: create_key(&namespace, PUBLIC_IPV4_KEY),
674 region_key: create_key(&namespace, REGION_KEY),
675 subnet_id_key: create_key(&namespace, SUBNET_ID_KEY),
676 vpc_id_key: create_key(&namespace, VPC_ID_KEY),
677 role_name_key: create_key(&namespace, ROLE_NAME_KEY),
678 tags_key: create_key(&namespace, TAGS_KEY),
679 }
680 }
681}
682
683#[derive(Debug)]
684struct UnexpectedHttpStatusError {
685 status: http::StatusCode,
686}
687
688impl fmt::Display for UnexpectedHttpStatusError {
689 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
690 write!(f, "got unexpected status code: {}", self.status)
691 }
692}
693
694impl error::Error for UnexpectedHttpStatusError {}
695
696#[derive(Debug, snafu::Snafu)]
697enum Ec2MetadataError {
698 #[snafu(display("Unable to fetch metadata authentication token: {}.", source))]
699 FetchToken { source: crate::Error },
700 #[snafu(display("Unable to parse identity document: {}.", source))]
701 ParseIdentityDocument { source: serde_json::Error },
702 #[snafu(display("Unable to parse metadata path {}, {}.", value, source))]
703 ParsePath {
704 value: String,
705 source: http::uri::InvalidUri,
706 },
707}
708
709#[cfg(test)]
710mod test {
711 use crate::config::schema::Definition;
712 use crate::config::{LogNamespace, OutputId, TransformConfig};
713 use crate::transforms::aws_ec2_metadata::Ec2Metadata;
714 use vector_lib::enrichment::TableRegistry;
715 use vector_lib::lookup::OwnedTargetPath;
716 use vrl::owned_value_path;
717 use vrl::value::Kind;
718
719 #[tokio::test]
720 async fn schema_def_with_string_input() {
721 let transform_config = Ec2Metadata {
722 namespace: Some(OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into()),
723 ..Default::default()
724 };
725
726 let input_definition =
727 Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
728
729 let mut outputs = transform_config.outputs(
730 TableRegistry::default(),
731 &[(OutputId::dummy(), input_definition)],
732 LogNamespace::Vector,
733 );
734 assert_eq!(outputs.len(), 1);
735 let output = outputs.pop().unwrap();
736 let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
737 assert!(actual_schema_def.event_kind().is_object());
738 }
739}
740
741#[cfg(feature = "aws-ec2-metadata-integration-tests")]
742#[cfg(test)]
743mod integration_tests {
744 use tokio::sync::mpsc;
745 use tokio_stream::wrappers::ReceiverStream;
746 use vector_lib::lookup::lookup_v2::{OwnedSegment, OwnedValuePath};
747 use vector_lib::lookup::{event_path, PathPrefix};
748
749 use super::*;
750 use crate::{
751 event::{metric, LogEvent, Metric},
752 test_util::{components::assert_transform_compliance, next_addr},
753 transforms::test::create_topology,
754 };
755 use vector_lib::assert_event_data_eq;
756 use vrl::value::{ObjectMap, Value};
757 use warp::Filter;
758
759 fn ec2_metadata_address() -> String {
760 std::env::var("EC2_METADATA_ADDRESS").unwrap_or_else(|_| "http://localhost:1338".into())
761 }
762
763 fn expected_log_fields() -> Vec<(OwnedValuePath, &'static str)> {
764 vec![
765 (
766 vec![OwnedSegment::field(AVAILABILITY_ZONE_KEY)].into(),
767 "us-east-1a",
768 ),
769 (
770 vec![OwnedSegment::field(PUBLIC_IPV4_KEY)].into(),
771 "192.0.2.54",
772 ),
773 (
774 vec![OwnedSegment::field(PUBLIC_HOSTNAME_KEY)].into(),
775 "ec2-192-0-2-54.compute-1.amazonaws.com",
776 ),
777 (
778 vec![OwnedSegment::field(LOCAL_IPV4_KEY)].into(),
779 "172.16.34.43",
780 ),
781 (
782 vec![OwnedSegment::field(LOCAL_HOSTNAME_KEY)].into(),
783 "ip-172-16-34-43.ec2.internal",
784 ),
785 (
786 vec![OwnedSegment::field(INSTANCE_ID_KEY)].into(),
787 "i-1234567890abcdef0",
788 ),
789 (
790 vec![OwnedSegment::field(ACCOUNT_ID_KEY)].into(),
791 "0123456789",
792 ),
793 (
794 vec![OwnedSegment::field(AMI_ID_KEY)].into(),
795 "ami-0b69ea66ff7391e80",
796 ),
797 (
798 vec![OwnedSegment::field(INSTANCE_TYPE_KEY)].into(),
799 "m4.xlarge",
800 ),
801 (vec![OwnedSegment::field(REGION_KEY)].into(), "us-east-1"),
802 (vec![OwnedSegment::field(VPC_ID_KEY)].into(), "vpc-d295a6a7"),
803 (
804 vec![OwnedSegment::field(SUBNET_ID_KEY)].into(),
805 "subnet-0ac62554",
806 ),
807 (owned_value_path!("role-name", 0), "baskinc-role"),
808 (owned_value_path!("tags", "Name"), "test-instance"),
809 (owned_value_path!("tags", "Test"), "test-tag"),
810 ]
811 }
812
813 fn expected_metric_fields() -> Vec<(&'static str, &'static str)> {
814 vec![
815 (AVAILABILITY_ZONE_KEY, "us-east-1a"),
816 (PUBLIC_IPV4_KEY, "192.0.2.54"),
817 (
818 PUBLIC_HOSTNAME_KEY,
819 "ec2-192-0-2-54.compute-1.amazonaws.com",
820 ),
821 (LOCAL_IPV4_KEY, "172.16.34.43"),
822 (LOCAL_HOSTNAME_KEY, "ip-172-16-34-43.ec2.internal"),
823 (INSTANCE_ID_KEY, "i-1234567890abcdef0"),
824 (ACCOUNT_ID_KEY, "0123456789"),
825 (AMI_ID_KEY, "ami-0b69ea66ff7391e80"),
826 (INSTANCE_TYPE_KEY, "m4.xlarge"),
827 (REGION_KEY, "us-east-1"),
828 (VPC_ID_KEY, "vpc-d295a6a7"),
829 (SUBNET_ID_KEY, "subnet-0ac62554"),
830 ("role-name[0]", "baskinc-role"),
831 ("tags[Name]", "test-instance"),
832 ("tags[Test]", "test-tag"),
833 ]
834 }
835
836 fn make_metric() -> Metric {
837 Metric::new(
838 "event",
839 metric::MetricKind::Incremental,
840 metric::MetricValue::Counter { value: 1.0 },
841 )
842 }
843
844 #[test]
845 fn generate_config() {
846 crate::test_util::test_generate_config::<Ec2Metadata>();
847 }
848
849 #[tokio::test]
850 async fn enrich_log() {
851 assert_transform_compliance(async {
852 let mut fields = default_fields();
853 fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
854
855 let tags = vec![
856 String::from("Name"),
857 String::from("Test"),
858 String::from("MISSING_TAG"),
859 ];
860
861 let transform_config = Ec2Metadata {
862 endpoint: ec2_metadata_address(),
863 fields,
864 tags,
865 ..Default::default()
866 };
867
868 let (tx, rx) = mpsc::channel(1);
869 let (topology, mut out) =
870 create_topology(ReceiverStream::new(rx), transform_config).await;
871
872 sleep(Duration::from_secs(1)).await;
874
875 let log = LogEvent::default();
876 let mut expected_log = log.clone();
877 for (k, v) in expected_log_fields().iter().cloned() {
878 expected_log.insert((PathPrefix::Event, &k), v);
879 }
880
881 tx.send(log.into()).await.unwrap();
882
883 let event = out.recv().await.unwrap();
884 assert_event_data_eq!(event.into_log(), expected_log);
885
886 drop(tx);
887 topology.stop().await;
888 assert_eq!(out.recv().await, None);
889 })
890 .await;
891 }
892
893 #[tokio::test(flavor = "multi_thread")]
894 async fn timeout() {
895 let addr = next_addr();
896
897 async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
898 tokio::time::sleep(Duration::from_secs(3)).await;
899 Ok("I waited 3 seconds!")
900 }
901
902 let slow = warp::any().and_then(sleepy);
903 let server = warp::serve(slow).bind(addr);
904 let _server = tokio::spawn(server);
905
906 let config = Ec2Metadata {
907 endpoint: format!("http://{addr}"),
908 refresh_timeout_secs: Duration::from_secs(1),
909 ..Default::default()
910 };
911
912 match config.build(&TransformContext::default()).await {
913 Ok(_) => panic!("expected timeout failure"),
914 Err(err) => assert_eq!(
917 err.to_string(),
918 "Unable to fetch metadata authentication token: deadline has elapsed."
919 ),
920 }
921 }
922
923 #[tokio::test(flavor = "multi_thread")]
925 async fn not_required() {
926 let addr = next_addr();
927
928 async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
929 tokio::time::sleep(Duration::from_secs(3)).await;
930 Ok("I waited 3 seconds!")
931 }
932
933 let slow = warp::any().and_then(sleepy);
934 let server = warp::serve(slow).bind(addr);
935 let _server = tokio::spawn(server);
936
937 let config = Ec2Metadata {
938 endpoint: format!("http://{addr}"),
939 refresh_timeout_secs: Duration::from_secs(1),
940 required: false,
941 ..Default::default()
942 };
943
944 assert!(
945 config.build(&TransformContext::default()).await.is_ok(),
946 "expected no failure because 'required' config value set to false"
947 );
948 }
949
950 #[tokio::test]
951 async fn enrich_metric() {
952 assert_transform_compliance(async {
953 let mut fields = default_fields();
954 fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
955
956 let tags = vec![
957 String::from("Name"),
958 String::from("Test"),
959 String::from("MISSING_TAG"),
960 ];
961
962 let transform_config = Ec2Metadata {
963 endpoint: ec2_metadata_address(),
964 fields,
965 tags,
966 ..Default::default()
967 };
968
969 let (tx, rx) = mpsc::channel(1);
970 let (topology, mut out) =
971 create_topology(ReceiverStream::new(rx), transform_config).await;
972
973 sleep(Duration::from_secs(1)).await;
975
976 let metric = make_metric();
977 let mut expected_metric = metric.clone();
978 for (k, v) in expected_metric_fields().iter() {
979 expected_metric.replace_tag(k.to_string(), v.to_string());
980 }
981
982 tx.send(metric.into()).await.unwrap();
983
984 let event = out.recv().await.unwrap();
985 assert_event_data_eq!(event.into_metric(), expected_metric);
986
987 drop(tx);
988 topology.stop().await;
989 assert_eq!(out.recv().await, None);
990 })
991 .await;
992 }
993
994 #[tokio::test]
995 async fn fields_log() {
996 assert_transform_compliance(async {
997 let transform_config = Ec2Metadata {
998 endpoint: ec2_metadata_address(),
999 fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1000 tags: vec![
1001 String::from("Name"),
1002 String::from("Test"),
1003 String::from("MISSING_TAG"),
1004 ],
1005 ..Default::default()
1006 };
1007
1008 let (tx, rx) = mpsc::channel(1);
1009 let (topology, mut out) =
1010 create_topology(ReceiverStream::new(rx), transform_config).await;
1011
1012 sleep(Duration::from_secs(1)).await;
1014
1015 let log = LogEvent::default();
1016 let mut expected_log = log.clone();
1017 expected_log.insert(format!("\"{PUBLIC_IPV4_KEY}\"").as_str(), "192.0.2.54");
1018 expected_log.insert(format!("\"{REGION_KEY}\"").as_str(), "us-east-1");
1019 expected_log.insert(
1020 format!("\"{TAGS_KEY}\"").as_str(),
1021 ObjectMap::from([
1022 ("Name".into(), Value::from("test-instance")),
1023 ("Test".into(), Value::from("test-tag")),
1024 ]),
1025 );
1026
1027 tx.send(log.into()).await.unwrap();
1028
1029 let event = out.recv().await.unwrap();
1030 assert_event_data_eq!(event.into_log(), expected_log);
1031
1032 drop(tx);
1033 topology.stop().await;
1034 assert_eq!(out.recv().await, None);
1035 })
1036 .await;
1037 }
1038
1039 #[tokio::test]
1040 async fn fields_metric() {
1041 assert_transform_compliance(async {
1042 let transform_config = Ec2Metadata {
1043 endpoint: ec2_metadata_address(),
1044 fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1045 tags: vec![
1046 String::from("Name"),
1047 String::from("Test"),
1048 String::from("MISSING_TAG"),
1049 ],
1050 ..Default::default()
1051 };
1052
1053 let (tx, rx) = mpsc::channel(1);
1054 let (topology, mut out) =
1055 create_topology(ReceiverStream::new(rx), transform_config).await;
1056
1057 sleep(Duration::from_secs(1)).await;
1059
1060 let metric = make_metric();
1061 let mut expected_metric = metric.clone();
1062 expected_metric.replace_tag(PUBLIC_IPV4_KEY.to_string(), "192.0.2.54".to_string());
1063 expected_metric.replace_tag(REGION_KEY.to_string(), "us-east-1".to_string());
1064 expected_metric.replace_tag(
1065 format!("{}[{}]", TAGS_KEY, "Name"),
1066 "test-instance".to_string(),
1067 );
1068 expected_metric
1069 .replace_tag(format!("{}[{}]", TAGS_KEY, "Test"), "test-tag".to_string());
1070
1071 tx.send(metric.into()).await.unwrap();
1072
1073 let event = out.recv().await.unwrap();
1074 assert_event_data_eq!(event.into_metric(), expected_metric);
1075
1076 drop(tx);
1077 topology.stop().await;
1078 assert_eq!(out.recv().await, None);
1079 })
1080 .await;
1081 }
1082
1083 #[tokio::test]
1084 async fn namespace_log() {
1085 {
1086 assert_transform_compliance(async {
1087 let transform_config = Ec2Metadata {
1088 endpoint: ec2_metadata_address(),
1089 namespace: Some(
1090 OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1091 ),
1092 ..Default::default()
1093 };
1094
1095 let (tx, rx) = mpsc::channel(1);
1096 let (topology, mut out) =
1097 create_topology(ReceiverStream::new(rx), transform_config).await;
1098
1099 sleep(Duration::from_secs(1)).await;
1101
1102 let log = LogEvent::default();
1103
1104 tx.send(log.into()).await.unwrap();
1105
1106 let event = out.recv().await.unwrap();
1107
1108 assert_eq!(
1109 event.as_log().get("ec2.metadata.\"availability-zone\""),
1110 Some(&"us-east-1a".into())
1111 );
1112
1113 drop(tx);
1114 topology.stop().await;
1115 assert_eq!(out.recv().await, None);
1116 })
1117 .await;
1118 }
1119
1120 {
1121 assert_transform_compliance(async {
1122 let transform_config = Ec2Metadata {
1124 endpoint: ec2_metadata_address(),
1125 namespace: Some(OptionalTargetPath::none()),
1126 ..Default::default()
1127 };
1128
1129 let (tx, rx) = mpsc::channel(1);
1130 let (topology, mut out) =
1131 create_topology(ReceiverStream::new(rx), transform_config).await;
1132
1133 sleep(Duration::from_secs(1)).await;
1135
1136 let log = LogEvent::default();
1137
1138 tx.send(log.into()).await.unwrap();
1139
1140 let event = out.recv().await.unwrap();
1141 assert_eq!(
1142 event.as_log().get(event_path!(AVAILABILITY_ZONE_KEY)),
1143 Some(&"us-east-1a".into())
1144 );
1145
1146 drop(tx);
1147 topology.stop().await;
1148 assert_eq!(out.recv().await, None);
1149 })
1150 .await;
1151 }
1152 }
1153
1154 #[tokio::test]
1155 async fn namespace_metric() {
1156 {
1157 assert_transform_compliance(async {
1158 let transform_config = Ec2Metadata {
1159 endpoint: ec2_metadata_address(),
1160 namespace: Some(
1161 OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1162 ),
1163 ..Default::default()
1164 };
1165
1166 let (tx, rx) = mpsc::channel(1);
1167 let (topology, mut out) =
1168 create_topology(ReceiverStream::new(rx), transform_config).await;
1169
1170 sleep(Duration::from_secs(1)).await;
1172
1173 let metric = make_metric();
1174
1175 tx.send(metric.into()).await.unwrap();
1176
1177 let event = out.recv().await.unwrap();
1178 assert_eq!(
1179 event
1180 .as_metric()
1181 .tag_value("ec2.metadata.availability-zone"),
1182 Some("us-east-1a".to_string())
1183 );
1184
1185 drop(tx);
1186 topology.stop().await;
1187 assert_eq!(out.recv().await, None);
1188 })
1189 .await;
1190 }
1191
1192 {
1193 assert_transform_compliance(async {
1194 let transform_config = Ec2Metadata {
1196 endpoint: ec2_metadata_address(),
1197 namespace: Some(OptionalTargetPath::none()),
1198 ..Default::default()
1199 };
1200
1201 let (tx, rx) = mpsc::channel(1);
1202 let (topology, mut out) =
1203 create_topology(ReceiverStream::new(rx), transform_config).await;
1204
1205 sleep(Duration::from_secs(1)).await;
1207
1208 let metric = make_metric();
1209
1210 tx.send(metric.into()).await.unwrap();
1211
1212 let event = out.recv().await.unwrap();
1213 assert_eq!(
1214 event.as_metric().tag_value(AVAILABILITY_ZONE_KEY),
1215 Some("us-east-1a".to_string())
1216 );
1217
1218 drop(tx);
1219 topology.stop().await;
1220 assert_eq!(out.recv().await, None);
1221 })
1222 .await;
1223 }
1224 }
1225}