1use std::{
2 collections::HashSet,
3 error, fmt,
4 future::ready,
5 pin::Pin,
6 sync::{Arc, LazyLock},
7};
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use futures::{Stream, StreamExt};
12use http::{Request, StatusCode, Uri, uri::PathAndQuery};
13use hyper::{Body, body::to_bytes as body_to_bytes};
14use serde::Deserialize;
15use serde_with::serde_as;
16use snafu::ResultExt as _;
17use tokio::time::{Duration, Instant, sleep};
18use tracing::Instrument;
19use vector_lib::{
20 config::LogNamespace,
21 configurable::configurable_component,
22 lookup::{
23 OwnedTargetPath,
24 lookup_v2::{OptionalTargetPath, OwnedSegment},
25 owned_value_path,
26 },
27};
28use vrl::value::{Kind, kind::Collection};
29
30use crate::{
31 config::{
32 DataType, Input, OutputId, ProxyConfig, TransformConfig, TransformContext, TransformOutput,
33 },
34 event::Event,
35 http::HttpClient,
36 internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful},
37 schema,
38 transforms::{TaskTransform, Transform},
39};
40
41const ACCOUNT_ID_KEY: &str = "account-id";
42const AMI_ID_KEY: &str = "ami-id";
43const AVAILABILITY_ZONE_KEY: &str = "availability-zone";
44const INSTANCE_ID_KEY: &str = "instance-id";
45const INSTANCE_TYPE_KEY: &str = "instance-type";
46const LOCAL_HOSTNAME_KEY: &str = "local-hostname";
47const LOCAL_IPV4_KEY: &str = "local-ipv4";
48const PUBLIC_HOSTNAME_KEY: &str = "public-hostname";
49const PUBLIC_IPV4_KEY: &str = "public-ipv4";
50const REGION_KEY: &str = "region";
51const SUBNET_ID_KEY: &str = "subnet-id";
52const VPC_ID_KEY: &str = "vpc-id";
53const ROLE_NAME_KEY: &str = "role-name";
54const TAGS_KEY: &str = "tags";
55
56static AVAILABILITY_ZONE: LazyLock<PathAndQuery> =
57 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/placement/availability-zone"));
58static LOCAL_HOSTNAME: LazyLock<PathAndQuery> =
59 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-hostname"));
60static LOCAL_IPV4: LazyLock<PathAndQuery> =
61 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-ipv4"));
62static PUBLIC_HOSTNAME: LazyLock<PathAndQuery> =
63 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-hostname"));
64static PUBLIC_IPV4: LazyLock<PathAndQuery> =
65 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-ipv4"));
66static ROLE_NAME: LazyLock<PathAndQuery> =
67 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/iam/security-credentials/"));
68static MAC: LazyLock<PathAndQuery> =
69 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/mac"));
70static DYNAMIC_DOCUMENT: LazyLock<PathAndQuery> =
71 LazyLock::new(|| PathAndQuery::from_static("/latest/dynamic/instance-identity/document"));
72static DEFAULT_FIELD_ALLOWLIST: &[&str] = &[
73 AMI_ID_KEY,
74 AVAILABILITY_ZONE_KEY,
75 INSTANCE_ID_KEY,
76 INSTANCE_TYPE_KEY,
77 LOCAL_HOSTNAME_KEY,
78 LOCAL_IPV4_KEY,
79 PUBLIC_HOSTNAME_KEY,
80 PUBLIC_IPV4_KEY,
81 REGION_KEY,
82 SUBNET_ID_KEY,
83 VPC_ID_KEY,
84 ROLE_NAME_KEY,
85];
86static API_TOKEN: LazyLock<PathAndQuery> =
87 LazyLock::new(|| PathAndQuery::from_static("/latest/api/token"));
88static TOKEN_HEADER: LazyLock<Bytes> = LazyLock::new(|| Bytes::from("X-aws-ec2-metadata-token"));
89
90#[serde_as]
92#[configurable_component(transform(
93 "aws_ec2_metadata",
94 "Parse metadata emitted by AWS EC2 instances."
95))]
96#[derive(Clone, Debug, Derivative)]
97#[derivative(Default)]
98pub struct Ec2Metadata {
99 #[serde(alias = "host", default = "default_endpoint")]
101 #[derivative(Default(value = "default_endpoint()"))]
102 endpoint: String,
103
104 #[configurable(metadata(
106 docs::examples = "",
107 docs::examples = "ec2",
108 docs::examples = "aws.ec2",
109 ))]
110 namespace: Option<OptionalTargetPath>,
111
112 #[serde(default = "default_refresh_interval_secs")]
114 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
115 #[derivative(Default(value = "default_refresh_interval_secs()"))]
116 refresh_interval_secs: Duration,
117
118 #[serde(default = "default_fields")]
120 #[derivative(Default(value = "default_fields()"))]
121 #[configurable(metadata(docs::examples = "instance-id", docs::examples = "local-hostname",))]
122 fields: Vec<String>,
123
124 #[serde(default = "default_tags")]
126 #[derivative(Default(value = "default_tags()"))]
127 #[configurable(metadata(docs::examples = "Name", docs::examples = "Project",))]
128 tags: Vec<String>,
129
130 #[serde(default = "default_refresh_timeout_secs")]
132 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
133 #[derivative(Default(value = "default_refresh_timeout_secs()"))]
134 refresh_timeout_secs: Duration,
135
136 #[configurable(derived)]
137 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
138 proxy: ProxyConfig,
139
140 #[serde(default = "default_required")]
142 #[derivative(Default(value = "default_required()"))]
143 required: bool,
144}
145
146fn default_endpoint() -> String {
147 String::from("http://169.254.169.254")
148}
149
150const fn default_refresh_interval_secs() -> Duration {
151 Duration::from_secs(10)
152}
153
154const fn default_refresh_timeout_secs() -> Duration {
155 Duration::from_secs(1)
156}
157
158fn default_fields() -> Vec<String> {
159 DEFAULT_FIELD_ALLOWLIST
160 .iter()
161 .map(|s| s.to_string())
162 .collect()
163}
164
165const fn default_tags() -> Vec<String> {
166 Vec::<String>::new()
167}
168
169const fn default_required() -> bool {
170 true
171}
172
173#[derive(Clone, Debug)]
174pub struct Ec2MetadataTransform {
175 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
176}
177
178#[derive(Debug, Clone)]
179struct MetadataKey {
180 log_path: OwnedTargetPath,
181 metric_tag: String,
182}
183
184#[derive(Debug)]
185struct Keys {
186 account_id_key: MetadataKey,
187 ami_id_key: MetadataKey,
188 availability_zone_key: MetadataKey,
189 instance_id_key: MetadataKey,
190 instance_type_key: MetadataKey,
191 local_hostname_key: MetadataKey,
192 local_ipv4_key: MetadataKey,
193 public_hostname_key: MetadataKey,
194 public_ipv4_key: MetadataKey,
195 region_key: MetadataKey,
196 subnet_id_key: MetadataKey,
197 vpc_id_key: MetadataKey,
198 role_name_key: MetadataKey,
199 tags_key: MetadataKey,
200}
201
202impl_generate_config_from_default!(Ec2Metadata);
203
204#[async_trait::async_trait]
205#[typetag::serde(name = "aws_ec2_metadata")]
206impl TransformConfig for Ec2Metadata {
207 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
208 let state = Arc::new(ArcSwap::new(Arc::new(vec![])));
209
210 let keys = Keys::new(self.namespace.clone());
211 let host = Uri::from_maybe_shared(self.endpoint.clone()).unwrap();
212 let refresh_interval = self.refresh_interval_secs;
213 let fields = self.fields.clone();
214 let tags = self.tags.clone();
215 let refresh_timeout = self.refresh_timeout_secs;
216 let required = self.required;
217
218 let proxy = ProxyConfig::merge_with_env(&context.globals.proxy, &self.proxy);
219 let http_client = HttpClient::new(None, &proxy)?;
220
221 let mut client = MetadataClient::new(
222 http_client,
223 host,
224 keys,
225 Arc::clone(&state),
226 refresh_interval,
227 refresh_timeout,
228 fields,
229 tags,
230 );
231
232 if let Err(error) = client.refresh_metadata().await {
234 if required {
235 return Err(error);
236 } else {
237 emit!(AwsEc2MetadataRefreshError { error });
238 }
239 }
240
241 tokio::spawn(
242 async move {
243 client.run().await;
244 }
245 .instrument(info_span!("aws_ec2_metadata: worker").or_current()),
247 );
248
249 Ok(Transform::event_task(Ec2MetadataTransform { state }))
250 }
251
252 fn input(&self) -> Input {
253 Input::new(DataType::Metric | DataType::Log)
254 }
255
256 fn outputs(
257 &self,
258 _: vector_lib::enrichment::TableRegistry,
259 input_definitions: &[(OutputId, schema::Definition)],
260 _: LogNamespace,
261 ) -> Vec<TransformOutput> {
262 let added_keys = Keys::new(self.namespace.clone());
263
264 let paths = [
265 &added_keys.account_id_key.log_path,
266 &added_keys.ami_id_key.log_path,
267 &added_keys.availability_zone_key.log_path,
268 &added_keys.instance_id_key.log_path,
269 &added_keys.instance_type_key.log_path,
270 &added_keys.local_hostname_key.log_path,
271 &added_keys.local_ipv4_key.log_path,
272 &added_keys.public_hostname_key.log_path,
273 &added_keys.public_ipv4_key.log_path,
274 &added_keys.region_key.log_path,
275 &added_keys.subnet_id_key.log_path,
276 &added_keys.vpc_id_key.log_path,
277 &added_keys.role_name_key.log_path,
278 &added_keys.tags_key.log_path,
279 ];
280
281 let schema_definition = input_definitions
282 .iter()
283 .map(|(output, definition)| {
284 let mut schema_definition = definition.clone();
285
286 if !schema_definition.event_kind().contains_object() {
288 *schema_definition.event_kind_mut() = Kind::object(Collection::empty());
289 }
290
291 for path in paths {
292 schema_definition =
293 schema_definition.with_field(path, Kind::bytes().or_undefined(), None);
294 }
295
296 (output.clone(), schema_definition)
297 })
298 .collect();
299
300 vec![TransformOutput::new(
301 DataType::Metric | DataType::Log,
302 schema_definition,
303 )]
304 }
305}
306
307impl TaskTransform<Event> for Ec2MetadataTransform {
308 fn transform(
309 self: Box<Self>,
310 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
311 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
312 where
313 Self: 'static,
314 {
315 let mut inner = self;
316 Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event)))))
317 }
318}
319
320impl Ec2MetadataTransform {
321 fn transform_one(&mut self, mut event: Event) -> Event {
322 let state = self.state.load();
323 match event {
324 Event::Log(ref mut log) => {
325 state.iter().for_each(|(k, v)| {
326 log.insert(&k.log_path, v.clone());
327 });
328 }
329 Event::Metric(ref mut metric) => {
330 state.iter().for_each(|(k, v)| {
331 metric
332 .replace_tag(k.metric_tag.clone(), String::from_utf8_lossy(v).to_string());
333 });
334 }
335 Event::Trace(_) => panic!("Traces are not supported."),
336 }
337 event
338 }
339}
340
341struct MetadataClient {
342 client: HttpClient<Body>,
343 host: Uri,
344 token: Option<(Bytes, Instant)>,
345 keys: Keys,
346 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
347 refresh_interval: Duration,
348 refresh_timeout: Duration,
349 fields: HashSet<String>,
350 tags: HashSet<String>,
351}
352
353#[derive(Debug, Deserialize)]
354#[serde(rename_all = "camelCase")]
355#[allow(dead_code)] struct IdentityDocument {
357 account_id: String,
358 architecture: String,
359 image_id: String,
360 instance_id: String,
361 instance_type: String,
362 private_ip: String,
363 region: String,
364 version: String,
365}
366
367impl MetadataClient {
368 #[allow(clippy::too_many_arguments)]
369 pub fn new(
370 client: HttpClient<Body>,
371 host: Uri,
372 keys: Keys,
373 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
374 refresh_interval: Duration,
375 refresh_timeout: Duration,
376 fields: Vec<String>,
377 tags: Vec<String>,
378 ) -> Self {
379 Self {
380 client,
381 host,
382 token: None,
383 keys,
384 state,
385 refresh_interval,
386 refresh_timeout,
387 fields: fields.into_iter().collect(),
388 tags: tags.into_iter().collect(),
389 }
390 }
391
392 async fn run(&mut self) {
393 loop {
394 match self.refresh_metadata().await {
395 Ok(_) => {
396 emit!(AwsEc2MetadataRefreshSuccessful);
397 }
398 Err(error) => {
399 emit!(AwsEc2MetadataRefreshError { error });
400 }
401 }
402
403 sleep(self.refresh_interval).await;
404 }
405 }
406
407 pub async fn get_token(&mut self) -> Result<Bytes, crate::Error> {
408 if let Some((token, next_refresh)) = self.token.clone() {
409 if next_refresh > Instant::now() {
413 return Ok(token);
414 }
415 }
416
417 let mut parts = self.host.clone().into_parts();
418 parts.path_and_query = Some(API_TOKEN.clone());
419 let uri = Uri::from_parts(parts)?;
420
421 let req = Request::put(uri)
422 .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
423 .body(Body::empty())?;
424
425 let res = tokio::time::timeout(self.refresh_timeout, self.client.send(req))
426 .await?
427 .map_err(crate::Error::from)
428 .and_then(|res| match res.status() {
429 StatusCode::OK => Ok(res),
430 status_code => Err(UnexpectedHttpStatusError {
431 status: status_code,
432 }
433 .into()),
434 })?;
435
436 let token = body_to_bytes(res.into_body()).await?;
437
438 let next_refresh = Instant::now() + Duration::from_secs(21600);
439 self.token = Some((token.clone(), next_refresh));
440
441 Ok(token)
442 }
443
444 pub async fn get_document(&mut self) -> Result<Option<IdentityDocument>, crate::Error> {
445 self.get_metadata(&DYNAMIC_DOCUMENT)
446 .await?
447 .map(|body| {
448 serde_json::from_slice(&body[..])
449 .context(ParseIdentityDocumentSnafu {})
450 .map_err(Into::into)
451 })
452 .transpose()
453 }
454
455 pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
456 let mut new_state = vec![];
457
458 if let Some(document) = self.get_document().await? {
460 if self.fields.contains(ACCOUNT_ID_KEY) {
461 new_state.push((self.keys.account_id_key.clone(), document.account_id.into()));
462 }
463
464 if self.fields.contains(AMI_ID_KEY) {
465 new_state.push((self.keys.ami_id_key.clone(), document.image_id.into()));
466 }
467
468 if self.fields.contains(INSTANCE_ID_KEY) {
469 new_state.push((
470 self.keys.instance_id_key.clone(),
471 document.instance_id.into(),
472 ));
473 }
474
475 if self.fields.contains(INSTANCE_TYPE_KEY) {
476 new_state.push((
477 self.keys.instance_type_key.clone(),
478 document.instance_type.into(),
479 ));
480 }
481
482 if self.fields.contains(REGION_KEY) {
483 new_state.push((self.keys.region_key.clone(), document.region.into()));
484 }
485
486 if self.fields.contains(AVAILABILITY_ZONE_KEY)
487 && let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await?
488 {
489 new_state.push((self.keys.availability_zone_key.clone(), availability_zone));
490 }
491
492 if self.fields.contains(LOCAL_HOSTNAME_KEY)
493 && let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await?
494 {
495 new_state.push((self.keys.local_hostname_key.clone(), local_hostname));
496 }
497
498 if self.fields.contains(LOCAL_IPV4_KEY)
499 && let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await?
500 {
501 new_state.push((self.keys.local_ipv4_key.clone(), local_ipv4));
502 }
503
504 if self.fields.contains(PUBLIC_HOSTNAME_KEY)
505 && let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await?
506 {
507 new_state.push((self.keys.public_hostname_key.clone(), public_hostname));
508 }
509
510 if self.fields.contains(PUBLIC_IPV4_KEY)
511 && let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await?
512 {
513 new_state.push((self.keys.public_ipv4_key.clone(), public_ipv4));
514 }
515
516 if (self.fields.contains(SUBNET_ID_KEY) || self.fields.contains(VPC_ID_KEY))
517 && let Some(mac) = self.get_metadata(&MAC).await?
518 {
519 let mac = String::from_utf8_lossy(&mac[..]);
520
521 if self.fields.contains(SUBNET_ID_KEY) {
522 let subnet_path =
523 format!("/latest/meta-data/network/interfaces/macs/{mac}/subnet-id");
524
525 let subnet_path = subnet_path.parse().context(ParsePathSnafu {
526 value: subnet_path.clone(),
527 })?;
528
529 if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
530 new_state.push((self.keys.subnet_id_key.clone(), subnet_id));
531 }
532 }
533
534 if self.fields.contains(VPC_ID_KEY) {
535 let vpc_path =
536 format!("/latest/meta-data/network/interfaces/macs/{mac}/vpc-id");
537
538 let vpc_path = vpc_path.parse().context(ParsePathSnafu {
539 value: vpc_path.clone(),
540 })?;
541
542 if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
543 new_state.push((self.keys.vpc_id_key.clone(), vpc_id));
544 }
545 }
546 }
547
548 if self.fields.contains(ROLE_NAME_KEY)
549 && let Some(role_names) = self.get_metadata(&ROLE_NAME).await?
550 {
551 let role_names = String::from_utf8_lossy(&role_names[..]);
552
553 for (i, role_name) in role_names.lines().enumerate() {
554 new_state.push((
555 MetadataKey {
556 log_path: self
557 .keys
558 .role_name_key
559 .log_path
560 .with_index_appended(i as isize),
561 metric_tag: format!("{}[{}]", self.keys.role_name_key.metric_tag, i),
562 },
563 role_name.to_string().into(),
564 ));
565 }
566 }
567
568 for tag in self.tags.clone() {
569 let tag_path = format!("/latest/meta-data/tags/instance/{tag}");
570
571 let tag_path = tag_path.parse().context(ParsePathSnafu {
572 value: tag_path.clone(),
573 })?;
574
575 if let Some(tag_content) = self.get_metadata(&tag_path).await? {
576 new_state.push((
577 MetadataKey {
578 log_path: self.keys.tags_key.log_path.with_field_appended(&tag),
579 metric_tag: format!("{}[{}]", self.keys.tags_key.metric_tag, &tag),
580 },
581 tag_content,
582 ));
583 }
584 }
585
586 self.state.store(Arc::new(new_state));
587 }
588
589 Ok(())
590 }
591
592 async fn get_metadata(&mut self, path: &PathAndQuery) -> Result<Option<Bytes>, crate::Error> {
593 let token = self
594 .get_token()
595 .await
596 .with_context(|_| FetchTokenSnafu {})?;
597
598 let mut parts = self.host.clone().into_parts();
599
600 parts.path_and_query = Some(path.clone());
601
602 let uri = Uri::from_parts(parts)?;
603
604 debug!(message = "Sending metadata request.", %uri);
605
606 let req = Request::get(uri)
607 .header(TOKEN_HEADER.as_ref(), token.as_ref())
608 .body(Body::empty())?;
609
610 match tokio::time::timeout(self.refresh_timeout, self.client.send(req))
611 .await?
612 .map_err(crate::Error::from)
613 .and_then(|res| match res.status() {
614 StatusCode::OK => Ok(Some(res)),
615 StatusCode::NOT_FOUND => Ok(None),
616 status_code => Err(UnexpectedHttpStatusError {
617 status: status_code,
618 }
619 .into()),
620 })? {
621 Some(res) => {
622 let body = body_to_bytes(res.into_body()).await?;
623 Ok(Some(body))
624 }
625 None => Ok(None),
626 }
627 }
628}
629
630fn create_metric_namespace(namespace: &OwnedTargetPath) -> String {
635 let mut output = String::new();
636 for segment in &namespace.path.segments {
637 if !output.is_empty() {
638 output += ".";
639 }
640 match segment {
641 OwnedSegment::Field(field) => {
642 output += field;
643 }
644 OwnedSegment::Index(i) => {
645 output += &i.to_string();
646 }
647 }
648 }
649 output
650}
651
652fn create_key(namespace: &Option<OwnedTargetPath>, key: &str) -> MetadataKey {
653 if let Some(namespace) = namespace {
654 MetadataKey {
655 log_path: namespace.with_field_appended(key),
656 metric_tag: format!("{}.{}", create_metric_namespace(namespace), key),
657 }
658 } else {
659 MetadataKey {
660 log_path: OwnedTargetPath::event(owned_value_path!(key)),
661 metric_tag: key.to_owned(),
662 }
663 }
664}
665
666impl Keys {
667 pub fn new(namespace: Option<OptionalTargetPath>) -> Self {
668 let namespace = namespace.and_then(|namespace| namespace.path);
669
670 Keys {
671 account_id_key: create_key(&namespace, ACCOUNT_ID_KEY),
672 ami_id_key: create_key(&namespace, AMI_ID_KEY),
673 availability_zone_key: create_key(&namespace, AVAILABILITY_ZONE_KEY),
674 instance_id_key: create_key(&namespace, INSTANCE_ID_KEY),
675 instance_type_key: create_key(&namespace, INSTANCE_TYPE_KEY),
676 local_hostname_key: create_key(&namespace, LOCAL_HOSTNAME_KEY),
677 local_ipv4_key: create_key(&namespace, LOCAL_IPV4_KEY),
678 public_hostname_key: create_key(&namespace, PUBLIC_HOSTNAME_KEY),
679 public_ipv4_key: create_key(&namespace, PUBLIC_IPV4_KEY),
680 region_key: create_key(&namespace, REGION_KEY),
681 subnet_id_key: create_key(&namespace, SUBNET_ID_KEY),
682 vpc_id_key: create_key(&namespace, VPC_ID_KEY),
683 role_name_key: create_key(&namespace, ROLE_NAME_KEY),
684 tags_key: create_key(&namespace, TAGS_KEY),
685 }
686 }
687}
688
689#[derive(Debug)]
690struct UnexpectedHttpStatusError {
691 status: http::StatusCode,
692}
693
694impl fmt::Display for UnexpectedHttpStatusError {
695 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
696 write!(f, "got unexpected status code: {}", self.status)
697 }
698}
699
700impl error::Error for UnexpectedHttpStatusError {}
701
702#[derive(Debug, snafu::Snafu)]
703enum Ec2MetadataError {
704 #[snafu(display("Unable to fetch metadata authentication token: {}.", source))]
705 FetchToken { source: crate::Error },
706 #[snafu(display("Unable to parse identity document: {}.", source))]
707 ParseIdentityDocument { source: serde_json::Error },
708 #[snafu(display("Unable to parse metadata path {}, {}.", value, source))]
709 ParsePath {
710 value: String,
711 source: http::uri::InvalidUri,
712 },
713}
714
715#[cfg(test)]
716mod test {
717 use vector_lib::{enrichment::TableRegistry, lookup::OwnedTargetPath};
718 use vrl::{owned_value_path, value::Kind};
719
720 use crate::{
721 config::{LogNamespace, OutputId, TransformConfig, schema::Definition},
722 transforms::aws_ec2_metadata::Ec2Metadata,
723 };
724
725 #[tokio::test]
726 async fn schema_def_with_string_input() {
727 let transform_config = Ec2Metadata {
728 namespace: Some(OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into()),
729 ..Default::default()
730 };
731
732 let input_definition =
733 Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
734
735 let mut outputs = transform_config.outputs(
736 TableRegistry::default(),
737 &[(OutputId::dummy(), input_definition)],
738 LogNamespace::Vector,
739 );
740 assert_eq!(outputs.len(), 1);
741 let output = outputs.pop().unwrap();
742 let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
743 assert!(actual_schema_def.event_kind().is_object());
744 }
745}
746
747#[cfg(feature = "aws-ec2-metadata-integration-tests")]
748#[cfg(test)]
749mod integration_tests {
750 use tokio::sync::mpsc;
751 use tokio_stream::wrappers::ReceiverStream;
752 use vector_lib::{
753 assert_event_data_eq,
754 lookup::{
755 PathPrefix, event_path,
756 lookup_v2::{OwnedSegment, OwnedValuePath},
757 },
758 };
759 use vrl::value::{ObjectMap, Value};
760 use warp::Filter;
761
762 use super::*;
763 use crate::{
764 event::{LogEvent, Metric, metric},
765 test_util::{components::assert_transform_compliance, next_addr},
766 transforms::test::create_topology,
767 };
768
769 fn ec2_metadata_address() -> String {
770 std::env::var("EC2_METADATA_ADDRESS").unwrap_or_else(|_| "http://localhost:1338".into())
771 }
772
773 fn expected_log_fields() -> Vec<(OwnedValuePath, &'static str)> {
774 vec![
775 (
776 vec![OwnedSegment::field(AVAILABILITY_ZONE_KEY)].into(),
777 "us-east-1a",
778 ),
779 (
780 vec![OwnedSegment::field(PUBLIC_IPV4_KEY)].into(),
781 "192.0.2.54",
782 ),
783 (
784 vec![OwnedSegment::field(PUBLIC_HOSTNAME_KEY)].into(),
785 "ec2-192-0-2-54.compute-1.amazonaws.com",
786 ),
787 (
788 vec![OwnedSegment::field(LOCAL_IPV4_KEY)].into(),
789 "172.16.34.43",
790 ),
791 (
792 vec![OwnedSegment::field(LOCAL_HOSTNAME_KEY)].into(),
793 "ip-172-16-34-43.ec2.internal",
794 ),
795 (
796 vec![OwnedSegment::field(INSTANCE_ID_KEY)].into(),
797 "i-1234567890abcdef0",
798 ),
799 (
800 vec![OwnedSegment::field(ACCOUNT_ID_KEY)].into(),
801 "0123456789",
802 ),
803 (
804 vec![OwnedSegment::field(AMI_ID_KEY)].into(),
805 "ami-0b69ea66ff7391e80",
806 ),
807 (
808 vec![OwnedSegment::field(INSTANCE_TYPE_KEY)].into(),
809 "m4.xlarge",
810 ),
811 (vec![OwnedSegment::field(REGION_KEY)].into(), "us-east-1"),
812 (vec![OwnedSegment::field(VPC_ID_KEY)].into(), "vpc-d295a6a7"),
813 (
814 vec![OwnedSegment::field(SUBNET_ID_KEY)].into(),
815 "subnet-0ac62554",
816 ),
817 (owned_value_path!("role-name", 0), "baskinc-role"),
818 (owned_value_path!("tags", "Name"), "test-instance"),
819 (owned_value_path!("tags", "Test"), "test-tag"),
820 ]
821 }
822
823 fn expected_metric_fields() -> Vec<(&'static str, &'static str)> {
824 vec![
825 (AVAILABILITY_ZONE_KEY, "us-east-1a"),
826 (PUBLIC_IPV4_KEY, "192.0.2.54"),
827 (
828 PUBLIC_HOSTNAME_KEY,
829 "ec2-192-0-2-54.compute-1.amazonaws.com",
830 ),
831 (LOCAL_IPV4_KEY, "172.16.34.43"),
832 (LOCAL_HOSTNAME_KEY, "ip-172-16-34-43.ec2.internal"),
833 (INSTANCE_ID_KEY, "i-1234567890abcdef0"),
834 (ACCOUNT_ID_KEY, "0123456789"),
835 (AMI_ID_KEY, "ami-0b69ea66ff7391e80"),
836 (INSTANCE_TYPE_KEY, "m4.xlarge"),
837 (REGION_KEY, "us-east-1"),
838 (VPC_ID_KEY, "vpc-d295a6a7"),
839 (SUBNET_ID_KEY, "subnet-0ac62554"),
840 ("role-name[0]", "baskinc-role"),
841 ("tags[Name]", "test-instance"),
842 ("tags[Test]", "test-tag"),
843 ]
844 }
845
846 fn make_metric() -> Metric {
847 Metric::new(
848 "event",
849 metric::MetricKind::Incremental,
850 metric::MetricValue::Counter { value: 1.0 },
851 )
852 }
853
854 #[test]
855 fn generate_config() {
856 crate::test_util::test_generate_config::<Ec2Metadata>();
857 }
858
859 #[tokio::test]
860 async fn enrich_log() {
861 assert_transform_compliance(async {
862 let mut fields = default_fields();
863 fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
864
865 let tags = vec![
866 String::from("Name"),
867 String::from("Test"),
868 String::from("MISSING_TAG"),
869 ];
870
871 let transform_config = Ec2Metadata {
872 endpoint: ec2_metadata_address(),
873 fields,
874 tags,
875 ..Default::default()
876 };
877
878 let (tx, rx) = mpsc::channel(1);
879 let (topology, mut out) =
880 create_topology(ReceiverStream::new(rx), transform_config).await;
881
882 sleep(Duration::from_secs(1)).await;
884
885 let log = LogEvent::default();
886 let mut expected_log = log.clone();
887 for (k, v) in expected_log_fields().iter().cloned() {
888 expected_log.insert((PathPrefix::Event, &k), v);
889 }
890
891 tx.send(log.into()).await.unwrap();
892
893 let event = out.recv().await.unwrap();
894 assert_event_data_eq!(event.into_log(), expected_log);
895
896 drop(tx);
897 topology.stop().await;
898 assert_eq!(out.recv().await, None);
899 })
900 .await;
901 }
902
903 #[tokio::test(flavor = "multi_thread")]
904 async fn timeout() {
905 let addr = next_addr();
906
907 async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
908 tokio::time::sleep(Duration::from_secs(3)).await;
909 Ok("I waited 3 seconds!")
910 }
911
912 let slow = warp::any().and_then(sleepy);
913 let server = warp::serve(slow).bind(addr);
914 let _server = tokio::spawn(server);
915
916 let config = Ec2Metadata {
917 endpoint: format!("http://{addr}"),
918 refresh_timeout_secs: Duration::from_secs(1),
919 ..Default::default()
920 };
921
922 match config.build(&TransformContext::default()).await {
923 Ok(_) => panic!("expected timeout failure"),
924 Err(err) => assert_eq!(
927 err.to_string(),
928 "Unable to fetch metadata authentication token: deadline has elapsed."
929 ),
930 }
931 }
932
933 #[tokio::test(flavor = "multi_thread")]
935 async fn not_required() {
936 let addr = next_addr();
937
938 async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
939 tokio::time::sleep(Duration::from_secs(3)).await;
940 Ok("I waited 3 seconds!")
941 }
942
943 let slow = warp::any().and_then(sleepy);
944 let server = warp::serve(slow).bind(addr);
945 let _server = tokio::spawn(server);
946
947 let config = Ec2Metadata {
948 endpoint: format!("http://{addr}"),
949 refresh_timeout_secs: Duration::from_secs(1),
950 required: false,
951 ..Default::default()
952 };
953
954 assert!(
955 config.build(&TransformContext::default()).await.is_ok(),
956 "expected no failure because 'required' config value set to false"
957 );
958 }
959
960 #[tokio::test]
961 async fn enrich_metric() {
962 assert_transform_compliance(async {
963 let mut fields = default_fields();
964 fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
965
966 let tags = vec![
967 String::from("Name"),
968 String::from("Test"),
969 String::from("MISSING_TAG"),
970 ];
971
972 let transform_config = Ec2Metadata {
973 endpoint: ec2_metadata_address(),
974 fields,
975 tags,
976 ..Default::default()
977 };
978
979 let (tx, rx) = mpsc::channel(1);
980 let (topology, mut out) =
981 create_topology(ReceiverStream::new(rx), transform_config).await;
982
983 sleep(Duration::from_secs(1)).await;
985
986 let metric = make_metric();
987 let mut expected_metric = metric.clone();
988 for (k, v) in expected_metric_fields().iter() {
989 expected_metric.replace_tag(k.to_string(), v.to_string());
990 }
991
992 tx.send(metric.into()).await.unwrap();
993
994 let event = out.recv().await.unwrap();
995 assert_event_data_eq!(event.into_metric(), expected_metric);
996
997 drop(tx);
998 topology.stop().await;
999 assert_eq!(out.recv().await, None);
1000 })
1001 .await;
1002 }
1003
1004 #[tokio::test]
1005 async fn fields_log() {
1006 assert_transform_compliance(async {
1007 let transform_config = Ec2Metadata {
1008 endpoint: ec2_metadata_address(),
1009 fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1010 tags: vec![
1011 String::from("Name"),
1012 String::from("Test"),
1013 String::from("MISSING_TAG"),
1014 ],
1015 ..Default::default()
1016 };
1017
1018 let (tx, rx) = mpsc::channel(1);
1019 let (topology, mut out) =
1020 create_topology(ReceiverStream::new(rx), transform_config).await;
1021
1022 sleep(Duration::from_secs(1)).await;
1024
1025 let log = LogEvent::default();
1026 let mut expected_log = log.clone();
1027 expected_log.insert(format!("\"{PUBLIC_IPV4_KEY}\"").as_str(), "192.0.2.54");
1028 expected_log.insert(format!("\"{REGION_KEY}\"").as_str(), "us-east-1");
1029 expected_log.insert(
1030 format!("\"{TAGS_KEY}\"").as_str(),
1031 ObjectMap::from([
1032 ("Name".into(), Value::from("test-instance")),
1033 ("Test".into(), Value::from("test-tag")),
1034 ]),
1035 );
1036
1037 tx.send(log.into()).await.unwrap();
1038
1039 let event = out.recv().await.unwrap();
1040 assert_event_data_eq!(event.into_log(), expected_log);
1041
1042 drop(tx);
1043 topology.stop().await;
1044 assert_eq!(out.recv().await, None);
1045 })
1046 .await;
1047 }
1048
1049 #[tokio::test]
1050 async fn fields_metric() {
1051 assert_transform_compliance(async {
1052 let transform_config = Ec2Metadata {
1053 endpoint: ec2_metadata_address(),
1054 fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1055 tags: vec![
1056 String::from("Name"),
1057 String::from("Test"),
1058 String::from("MISSING_TAG"),
1059 ],
1060 ..Default::default()
1061 };
1062
1063 let (tx, rx) = mpsc::channel(1);
1064 let (topology, mut out) =
1065 create_topology(ReceiverStream::new(rx), transform_config).await;
1066
1067 sleep(Duration::from_secs(1)).await;
1069
1070 let metric = make_metric();
1071 let mut expected_metric = metric.clone();
1072 expected_metric.replace_tag(PUBLIC_IPV4_KEY.to_string(), "192.0.2.54".to_string());
1073 expected_metric.replace_tag(REGION_KEY.to_string(), "us-east-1".to_string());
1074 expected_metric.replace_tag(
1075 format!("{}[{}]", TAGS_KEY, "Name"),
1076 "test-instance".to_string(),
1077 );
1078 expected_metric
1079 .replace_tag(format!("{}[{}]", TAGS_KEY, "Test"), "test-tag".to_string());
1080
1081 tx.send(metric.into()).await.unwrap();
1082
1083 let event = out.recv().await.unwrap();
1084 assert_event_data_eq!(event.into_metric(), expected_metric);
1085
1086 drop(tx);
1087 topology.stop().await;
1088 assert_eq!(out.recv().await, None);
1089 })
1090 .await;
1091 }
1092
1093 #[tokio::test]
1094 async fn namespace_log() {
1095 {
1096 assert_transform_compliance(async {
1097 let transform_config = Ec2Metadata {
1098 endpoint: ec2_metadata_address(),
1099 namespace: Some(
1100 OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1101 ),
1102 ..Default::default()
1103 };
1104
1105 let (tx, rx) = mpsc::channel(1);
1106 let (topology, mut out) =
1107 create_topology(ReceiverStream::new(rx), transform_config).await;
1108
1109 sleep(Duration::from_secs(1)).await;
1111
1112 let log = LogEvent::default();
1113
1114 tx.send(log.into()).await.unwrap();
1115
1116 let event = out.recv().await.unwrap();
1117
1118 assert_eq!(
1119 event.as_log().get("ec2.metadata.\"availability-zone\""),
1120 Some(&"us-east-1a".into())
1121 );
1122
1123 drop(tx);
1124 topology.stop().await;
1125 assert_eq!(out.recv().await, None);
1126 })
1127 .await;
1128 }
1129
1130 {
1131 assert_transform_compliance(async {
1132 let transform_config = Ec2Metadata {
1134 endpoint: ec2_metadata_address(),
1135 namespace: Some(OptionalTargetPath::none()),
1136 ..Default::default()
1137 };
1138
1139 let (tx, rx) = mpsc::channel(1);
1140 let (topology, mut out) =
1141 create_topology(ReceiverStream::new(rx), transform_config).await;
1142
1143 sleep(Duration::from_secs(1)).await;
1145
1146 let log = LogEvent::default();
1147
1148 tx.send(log.into()).await.unwrap();
1149
1150 let event = out.recv().await.unwrap();
1151 assert_eq!(
1152 event.as_log().get(event_path!(AVAILABILITY_ZONE_KEY)),
1153 Some(&"us-east-1a".into())
1154 );
1155
1156 drop(tx);
1157 topology.stop().await;
1158 assert_eq!(out.recv().await, None);
1159 })
1160 .await;
1161 }
1162 }
1163
1164 #[tokio::test]
1165 async fn namespace_metric() {
1166 {
1167 assert_transform_compliance(async {
1168 let transform_config = Ec2Metadata {
1169 endpoint: ec2_metadata_address(),
1170 namespace: Some(
1171 OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1172 ),
1173 ..Default::default()
1174 };
1175
1176 let (tx, rx) = mpsc::channel(1);
1177 let (topology, mut out) =
1178 create_topology(ReceiverStream::new(rx), transform_config).await;
1179
1180 sleep(Duration::from_secs(1)).await;
1182
1183 let metric = make_metric();
1184
1185 tx.send(metric.into()).await.unwrap();
1186
1187 let event = out.recv().await.unwrap();
1188 assert_eq!(
1189 event
1190 .as_metric()
1191 .tag_value("ec2.metadata.availability-zone"),
1192 Some("us-east-1a".to_string())
1193 );
1194
1195 drop(tx);
1196 topology.stop().await;
1197 assert_eq!(out.recv().await, None);
1198 })
1199 .await;
1200 }
1201
1202 {
1203 assert_transform_compliance(async {
1204 let transform_config = Ec2Metadata {
1206 endpoint: ec2_metadata_address(),
1207 namespace: Some(OptionalTargetPath::none()),
1208 ..Default::default()
1209 };
1210
1211 let (tx, rx) = mpsc::channel(1);
1212 let (topology, mut out) =
1213 create_topology(ReceiverStream::new(rx), transform_config).await;
1214
1215 sleep(Duration::from_secs(1)).await;
1217
1218 let metric = make_metric();
1219
1220 tx.send(metric.into()).await.unwrap();
1221
1222 let event = out.recv().await.unwrap();
1223 assert_eq!(
1224 event.as_metric().tag_value(AVAILABILITY_ZONE_KEY),
1225 Some("us-east-1a".to_string())
1226 );
1227
1228 drop(tx);
1229 topology.stop().await;
1230 assert_eq!(out.recv().await, None);
1231 })
1232 .await;
1233 }
1234 }
1235}