1use std::{
2 collections::HashSet,
3 error, fmt,
4 future::ready,
5 pin::Pin,
6 sync::{Arc, LazyLock},
7};
8
9use arc_swap::ArcSwap;
10use bytes::Bytes;
11use futures::{Stream, StreamExt};
12use http::{Request, StatusCode, Uri, uri::PathAndQuery};
13use hyper::Body;
14use serde::Deserialize;
15use serde_with::serde_as;
16use snafu::ResultExt as _;
17use tokio::time::{Duration, Instant, sleep};
18use tracing::Instrument;
19use vector_lib::{
20 configurable::configurable_component,
21 lookup::{
22 OwnedTargetPath,
23 lookup_v2::{OptionalTargetPath, OwnedSegment},
24 owned_value_path,
25 },
26};
27use vrl::value::{Kind, kind::Collection};
28
29use crate::{
30 config::{
31 DataType, Input, OutputId, ProxyConfig, TransformConfig, TransformContext, TransformOutput,
32 },
33 event::Event,
34 http::HttpClient,
35 internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful},
36 schema,
37 transforms::{TaskTransform, Transform},
38};
39
40const ACCOUNT_ID_KEY: &str = "account-id";
41const AMI_ID_KEY: &str = "ami-id";
42const AVAILABILITY_ZONE_KEY: &str = "availability-zone";
43const INSTANCE_ID_KEY: &str = "instance-id";
44const INSTANCE_TYPE_KEY: &str = "instance-type";
45const LOCAL_HOSTNAME_KEY: &str = "local-hostname";
46const LOCAL_IPV4_KEY: &str = "local-ipv4";
47const PUBLIC_HOSTNAME_KEY: &str = "public-hostname";
48const PUBLIC_IPV4_KEY: &str = "public-ipv4";
49const REGION_KEY: &str = "region";
50const SUBNET_ID_KEY: &str = "subnet-id";
51const VPC_ID_KEY: &str = "vpc-id";
52const ROLE_NAME_KEY: &str = "role-name";
53const TAGS_KEY: &str = "tags";
54
55static AVAILABILITY_ZONE: LazyLock<PathAndQuery> =
56 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/placement/availability-zone"));
57static LOCAL_HOSTNAME: LazyLock<PathAndQuery> =
58 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-hostname"));
59static LOCAL_IPV4: LazyLock<PathAndQuery> =
60 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-ipv4"));
61static PUBLIC_HOSTNAME: LazyLock<PathAndQuery> =
62 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-hostname"));
63static PUBLIC_IPV4: LazyLock<PathAndQuery> =
64 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-ipv4"));
65static ROLE_NAME: LazyLock<PathAndQuery> =
66 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/iam/security-credentials/"));
67static MAC: LazyLock<PathAndQuery> =
68 LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/mac"));
69static DYNAMIC_DOCUMENT: LazyLock<PathAndQuery> =
70 LazyLock::new(|| PathAndQuery::from_static("/latest/dynamic/instance-identity/document"));
71static DEFAULT_FIELD_ALLOWLIST: &[&str] = &[
72 AMI_ID_KEY,
73 AVAILABILITY_ZONE_KEY,
74 INSTANCE_ID_KEY,
75 INSTANCE_TYPE_KEY,
76 LOCAL_HOSTNAME_KEY,
77 LOCAL_IPV4_KEY,
78 PUBLIC_HOSTNAME_KEY,
79 PUBLIC_IPV4_KEY,
80 REGION_KEY,
81 SUBNET_ID_KEY,
82 VPC_ID_KEY,
83 ROLE_NAME_KEY,
84];
85static API_TOKEN: LazyLock<PathAndQuery> =
86 LazyLock::new(|| PathAndQuery::from_static("/latest/api/token"));
87static TOKEN_HEADER: LazyLock<Bytes> = LazyLock::new(|| Bytes::from("X-aws-ec2-metadata-token"));
88
89#[serde_as]
91#[configurable_component(transform(
92 "aws_ec2_metadata",
93 "Parse metadata emitted by AWS EC2 instances."
94))]
95#[derive(Clone, Debug, Derivative)]
96#[derivative(Default)]
97pub struct Ec2Metadata {
98 #[serde(alias = "host", default = "default_endpoint")]
100 #[derivative(Default(value = "default_endpoint()"))]
101 endpoint: String,
102
103 #[configurable(metadata(
105 docs::examples = "",
106 docs::examples = "ec2",
107 docs::examples = "aws.ec2",
108 ))]
109 namespace: Option<OptionalTargetPath>,
110
111 #[serde(default = "default_refresh_interval_secs")]
113 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
114 #[derivative(Default(value = "default_refresh_interval_secs()"))]
115 refresh_interval_secs: Duration,
116
117 #[serde(default = "default_fields")]
119 #[derivative(Default(value = "default_fields()"))]
120 #[configurable(metadata(docs::examples = "instance-id", docs::examples = "local-hostname",))]
121 fields: Vec<String>,
122
123 #[serde(default = "default_tags")]
125 #[derivative(Default(value = "default_tags()"))]
126 #[configurable(metadata(docs::examples = "Name", docs::examples = "Project",))]
127 tags: Vec<String>,
128
129 #[serde(default = "default_refresh_timeout_secs")]
131 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
132 #[derivative(Default(value = "default_refresh_timeout_secs()"))]
133 refresh_timeout_secs: Duration,
134
135 #[configurable(derived)]
136 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
137 proxy: ProxyConfig,
138
139 #[serde(default = "default_required")]
141 #[derivative(Default(value = "default_required()"))]
142 required: bool,
143}
144
145fn default_endpoint() -> String {
146 String::from("http://169.254.169.254")
147}
148
149const fn default_refresh_interval_secs() -> Duration {
150 Duration::from_secs(10)
151}
152
153const fn default_refresh_timeout_secs() -> Duration {
154 Duration::from_secs(1)
155}
156
157fn default_fields() -> Vec<String> {
158 DEFAULT_FIELD_ALLOWLIST
159 .iter()
160 .map(|s| s.to_string())
161 .collect()
162}
163
164const fn default_tags() -> Vec<String> {
165 Vec::<String>::new()
166}
167
168const fn default_required() -> bool {
169 true
170}
171
172#[derive(Clone, Debug)]
173pub struct Ec2MetadataTransform {
174 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
175}
176
177#[derive(Debug, Clone)]
178struct MetadataKey {
179 log_path: OwnedTargetPath,
180 metric_tag: String,
181}
182
183#[derive(Debug)]
184struct Keys {
185 account_id_key: MetadataKey,
186 ami_id_key: MetadataKey,
187 availability_zone_key: MetadataKey,
188 instance_id_key: MetadataKey,
189 instance_type_key: MetadataKey,
190 local_hostname_key: MetadataKey,
191 local_ipv4_key: MetadataKey,
192 public_hostname_key: MetadataKey,
193 public_ipv4_key: MetadataKey,
194 region_key: MetadataKey,
195 subnet_id_key: MetadataKey,
196 vpc_id_key: MetadataKey,
197 role_name_key: MetadataKey,
198 tags_key: MetadataKey,
199}
200
201impl_generate_config_from_default!(Ec2Metadata);
202
203#[async_trait::async_trait]
204#[typetag::serde(name = "aws_ec2_metadata")]
205impl TransformConfig for Ec2Metadata {
206 async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
207 let state = Arc::new(ArcSwap::new(Arc::new(vec![])));
208
209 let keys = Keys::new(self.namespace.clone());
210 let host = Uri::from_maybe_shared(self.endpoint.clone()).unwrap();
211 let refresh_interval = self.refresh_interval_secs;
212 let fields = self.fields.clone();
213 let tags = self.tags.clone();
214 let refresh_timeout = self.refresh_timeout_secs;
215 let required = self.required;
216
217 let proxy = ProxyConfig::merge_with_env(&context.globals.proxy, &self.proxy);
218 let http_client = HttpClient::new(None, &proxy)?;
219
220 let mut client = MetadataClient::new(
221 http_client,
222 host,
223 keys,
224 Arc::clone(&state),
225 refresh_interval,
226 refresh_timeout,
227 fields,
228 tags,
229 );
230
231 if let Err(error) = client.refresh_metadata().await {
233 if required {
234 return Err(error);
235 } else {
236 emit!(AwsEc2MetadataRefreshError { error });
237 }
238 }
239
240 tokio::spawn(
241 async move {
242 client.run().await;
243 }
244 .instrument(info_span!("aws_ec2_metadata: worker").or_current()),
246 );
247
248 Ok(Transform::event_task(Ec2MetadataTransform { state }))
249 }
250
251 fn input(&self) -> Input {
252 Input::new(DataType::Metric | DataType::Log)
253 }
254
255 fn outputs(
256 &self,
257 _: &TransformContext,
258 input_definitions: &[(OutputId, schema::Definition)],
259 ) -> Vec<TransformOutput> {
260 let added_keys = Keys::new(self.namespace.clone());
261
262 let paths = [
263 &added_keys.account_id_key.log_path,
264 &added_keys.ami_id_key.log_path,
265 &added_keys.availability_zone_key.log_path,
266 &added_keys.instance_id_key.log_path,
267 &added_keys.instance_type_key.log_path,
268 &added_keys.local_hostname_key.log_path,
269 &added_keys.local_ipv4_key.log_path,
270 &added_keys.public_hostname_key.log_path,
271 &added_keys.public_ipv4_key.log_path,
272 &added_keys.region_key.log_path,
273 &added_keys.subnet_id_key.log_path,
274 &added_keys.vpc_id_key.log_path,
275 &added_keys.role_name_key.log_path,
276 &added_keys.tags_key.log_path,
277 ];
278
279 let schema_definition = input_definitions
280 .iter()
281 .map(|(output, definition)| {
282 let mut schema_definition = definition.clone();
283
284 if !schema_definition.event_kind().contains_object() {
286 *schema_definition.event_kind_mut() = Kind::object(Collection::empty());
287 }
288
289 for path in paths {
290 schema_definition =
291 schema_definition.with_field(path, Kind::bytes().or_undefined(), None);
292 }
293
294 (output.clone(), schema_definition)
295 })
296 .collect();
297
298 vec![TransformOutput::new(
299 DataType::Metric | DataType::Log,
300 schema_definition,
301 )]
302 }
303}
304
305impl TaskTransform<Event> for Ec2MetadataTransform {
306 fn transform(
307 self: Box<Self>,
308 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
309 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
310 where
311 Self: 'static,
312 {
313 let mut inner = self;
314 Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event)))))
315 }
316}
317
318impl Ec2MetadataTransform {
319 fn transform_one(&mut self, mut event: Event) -> Event {
320 let state = self.state.load();
321 match event {
322 Event::Log(ref mut log) => {
323 state.iter().for_each(|(k, v)| {
324 log.insert(&k.log_path, v.clone());
325 });
326 }
327 Event::Metric(ref mut metric) => {
328 state.iter().for_each(|(k, v)| {
329 metric
330 .replace_tag(k.metric_tag.clone(), String::from_utf8_lossy(v).to_string());
331 });
332 }
333 Event::Trace(_) => panic!("Traces are not supported."),
334 }
335 event
336 }
337}
338
339struct MetadataClient {
340 client: HttpClient<Body>,
341 host: Uri,
342 token: Option<(Bytes, Instant)>,
343 keys: Keys,
344 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
345 refresh_interval: Duration,
346 refresh_timeout: Duration,
347 fields: HashSet<String>,
348 tags: HashSet<String>,
349}
350
351#[derive(Debug, Deserialize)]
352#[serde(rename_all = "camelCase")]
353#[allow(dead_code)] struct IdentityDocument {
355 account_id: String,
356 architecture: String,
357 image_id: String,
358 instance_id: String,
359 instance_type: String,
360 private_ip: String,
361 region: String,
362 version: String,
363}
364
365impl MetadataClient {
366 #[allow(clippy::too_many_arguments)]
367 pub fn new(
368 client: HttpClient<Body>,
369 host: Uri,
370 keys: Keys,
371 state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
372 refresh_interval: Duration,
373 refresh_timeout: Duration,
374 fields: Vec<String>,
375 tags: Vec<String>,
376 ) -> Self {
377 Self {
378 client,
379 host,
380 token: None,
381 keys,
382 state,
383 refresh_interval,
384 refresh_timeout,
385 fields: fields.into_iter().collect(),
386 tags: tags.into_iter().collect(),
387 }
388 }
389
390 async fn run(&mut self) {
391 loop {
392 match self.refresh_metadata().await {
393 Ok(_) => {
394 emit!(AwsEc2MetadataRefreshSuccessful);
395 }
396 Err(error) => {
397 emit!(AwsEc2MetadataRefreshError { error });
398 }
399 }
400
401 sleep(self.refresh_interval).await;
402 }
403 }
404
405 pub async fn get_token(&mut self) -> Result<Bytes, crate::Error> {
406 if let Some((token, next_refresh)) = self.token.clone() {
407 if next_refresh > Instant::now() {
411 return Ok(token);
412 }
413 }
414
415 let mut parts = self.host.clone().into_parts();
416 parts.path_and_query = Some(API_TOKEN.clone());
417 let uri = Uri::from_parts(parts)?;
418
419 let req = Request::put(uri)
420 .header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
421 .body(Body::empty())?;
422
423 let res = tokio::time::timeout(self.refresh_timeout, self.client.send(req))
424 .await?
425 .map_err(crate::Error::from)
426 .and_then(|res| match res.status() {
427 StatusCode::OK => Ok(res),
428 status_code => Err(UnexpectedHttpStatusError {
429 status: status_code,
430 }
431 .into()),
432 })?;
433
434 let token = http_body::Body::collect(res.into_body()).await?.to_bytes();
435
436 let next_refresh = Instant::now() + Duration::from_secs(21600);
437 self.token = Some((token.clone(), next_refresh));
438
439 Ok(token)
440 }
441
442 pub async fn get_document(&mut self) -> Result<Option<IdentityDocument>, crate::Error> {
443 self.get_metadata(&DYNAMIC_DOCUMENT)
444 .await?
445 .map(|body| {
446 serde_json::from_slice(&body[..])
447 .context(ParseIdentityDocumentSnafu {})
448 .map_err(Into::into)
449 })
450 .transpose()
451 }
452
453 pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
454 let mut new_state = vec![];
455
456 if let Some(document) = self.get_document().await? {
458 if self.fields.contains(ACCOUNT_ID_KEY) {
459 new_state.push((self.keys.account_id_key.clone(), document.account_id.into()));
460 }
461
462 if self.fields.contains(AMI_ID_KEY) {
463 new_state.push((self.keys.ami_id_key.clone(), document.image_id.into()));
464 }
465
466 if self.fields.contains(INSTANCE_ID_KEY) {
467 new_state.push((
468 self.keys.instance_id_key.clone(),
469 document.instance_id.into(),
470 ));
471 }
472
473 if self.fields.contains(INSTANCE_TYPE_KEY) {
474 new_state.push((
475 self.keys.instance_type_key.clone(),
476 document.instance_type.into(),
477 ));
478 }
479
480 if self.fields.contains(REGION_KEY) {
481 new_state.push((self.keys.region_key.clone(), document.region.into()));
482 }
483
484 if self.fields.contains(AVAILABILITY_ZONE_KEY)
485 && let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await?
486 {
487 new_state.push((self.keys.availability_zone_key.clone(), availability_zone));
488 }
489
490 if self.fields.contains(LOCAL_HOSTNAME_KEY)
491 && let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await?
492 {
493 new_state.push((self.keys.local_hostname_key.clone(), local_hostname));
494 }
495
496 if self.fields.contains(LOCAL_IPV4_KEY)
497 && let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await?
498 {
499 new_state.push((self.keys.local_ipv4_key.clone(), local_ipv4));
500 }
501
502 if self.fields.contains(PUBLIC_HOSTNAME_KEY)
503 && let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await?
504 {
505 new_state.push((self.keys.public_hostname_key.clone(), public_hostname));
506 }
507
508 if self.fields.contains(PUBLIC_IPV4_KEY)
509 && let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await?
510 {
511 new_state.push((self.keys.public_ipv4_key.clone(), public_ipv4));
512 }
513
514 if (self.fields.contains(SUBNET_ID_KEY) || self.fields.contains(VPC_ID_KEY))
515 && let Some(mac) = self.get_metadata(&MAC).await?
516 {
517 let mac = String::from_utf8_lossy(&mac[..]);
518
519 if self.fields.contains(SUBNET_ID_KEY) {
520 let subnet_path =
521 format!("/latest/meta-data/network/interfaces/macs/{mac}/subnet-id");
522
523 let subnet_path = subnet_path.parse().context(ParsePathSnafu {
524 value: subnet_path.clone(),
525 })?;
526
527 if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
528 new_state.push((self.keys.subnet_id_key.clone(), subnet_id));
529 }
530 }
531
532 if self.fields.contains(VPC_ID_KEY) {
533 let vpc_path =
534 format!("/latest/meta-data/network/interfaces/macs/{mac}/vpc-id");
535
536 let vpc_path = vpc_path.parse().context(ParsePathSnafu {
537 value: vpc_path.clone(),
538 })?;
539
540 if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
541 new_state.push((self.keys.vpc_id_key.clone(), vpc_id));
542 }
543 }
544 }
545
546 if self.fields.contains(ROLE_NAME_KEY)
547 && let Some(role_names) = self.get_metadata(&ROLE_NAME).await?
548 {
549 let role_names = String::from_utf8_lossy(&role_names[..]);
550
551 for (i, role_name) in role_names.lines().enumerate() {
552 new_state.push((
553 MetadataKey {
554 log_path: self
555 .keys
556 .role_name_key
557 .log_path
558 .with_index_appended(i as isize),
559 metric_tag: format!("{}[{}]", self.keys.role_name_key.metric_tag, i),
560 },
561 role_name.to_string().into(),
562 ));
563 }
564 }
565
566 for tag in self.tags.clone() {
567 let tag_path = format!("/latest/meta-data/tags/instance/{tag}");
568
569 let tag_path = tag_path.parse().context(ParsePathSnafu {
570 value: tag_path.clone(),
571 })?;
572
573 if let Some(tag_content) = self.get_metadata(&tag_path).await? {
574 new_state.push((
575 MetadataKey {
576 log_path: self.keys.tags_key.log_path.with_field_appended(&tag),
577 metric_tag: format!("{}[{}]", self.keys.tags_key.metric_tag, &tag),
578 },
579 tag_content,
580 ));
581 }
582 }
583
584 self.state.store(Arc::new(new_state));
585 }
586
587 Ok(())
588 }
589
590 async fn get_metadata(&mut self, path: &PathAndQuery) -> Result<Option<Bytes>, crate::Error> {
591 let token = self
592 .get_token()
593 .await
594 .with_context(|_| FetchTokenSnafu {})?;
595
596 let mut parts = self.host.clone().into_parts();
597
598 parts.path_and_query = Some(path.clone());
599
600 let uri = Uri::from_parts(parts)?;
601
602 debug!(message = "Sending metadata request.", %uri);
603
604 let req = Request::get(uri)
605 .header(TOKEN_HEADER.as_ref(), token.as_ref())
606 .body(Body::empty())?;
607
608 match tokio::time::timeout(self.refresh_timeout, self.client.send(req))
609 .await?
610 .map_err(crate::Error::from)
611 .and_then(|res| match res.status() {
612 StatusCode::OK => Ok(Some(res)),
613 StatusCode::NOT_FOUND => Ok(None),
614 status_code => Err(UnexpectedHttpStatusError {
615 status: status_code,
616 }
617 .into()),
618 })? {
619 Some(res) => {
620 let body = http_body::Body::collect(res.into_body()).await?.to_bytes();
621 Ok(Some(body))
622 }
623 None => Ok(None),
624 }
625 }
626}
627
628fn create_metric_namespace(namespace: &OwnedTargetPath) -> String {
633 let mut output = String::new();
634 for segment in &namespace.path.segments {
635 if !output.is_empty() {
636 output += ".";
637 }
638 match segment {
639 OwnedSegment::Field(field) => {
640 output += field;
641 }
642 OwnedSegment::Index(i) => {
643 output += &i.to_string();
644 }
645 }
646 }
647 output
648}
649
650fn create_key(namespace: &Option<OwnedTargetPath>, key: &str) -> MetadataKey {
651 if let Some(namespace) = namespace {
652 MetadataKey {
653 log_path: namespace.with_field_appended(key),
654 metric_tag: format!("{}.{}", create_metric_namespace(namespace), key),
655 }
656 } else {
657 MetadataKey {
658 log_path: OwnedTargetPath::event(owned_value_path!(key)),
659 metric_tag: key.to_owned(),
660 }
661 }
662}
663
664impl Keys {
665 pub fn new(namespace: Option<OptionalTargetPath>) -> Self {
666 let namespace = namespace.and_then(|namespace| namespace.path);
667
668 Keys {
669 account_id_key: create_key(&namespace, ACCOUNT_ID_KEY),
670 ami_id_key: create_key(&namespace, AMI_ID_KEY),
671 availability_zone_key: create_key(&namespace, AVAILABILITY_ZONE_KEY),
672 instance_id_key: create_key(&namespace, INSTANCE_ID_KEY),
673 instance_type_key: create_key(&namespace, INSTANCE_TYPE_KEY),
674 local_hostname_key: create_key(&namespace, LOCAL_HOSTNAME_KEY),
675 local_ipv4_key: create_key(&namespace, LOCAL_IPV4_KEY),
676 public_hostname_key: create_key(&namespace, PUBLIC_HOSTNAME_KEY),
677 public_ipv4_key: create_key(&namespace, PUBLIC_IPV4_KEY),
678 region_key: create_key(&namespace, REGION_KEY),
679 subnet_id_key: create_key(&namespace, SUBNET_ID_KEY),
680 vpc_id_key: create_key(&namespace, VPC_ID_KEY),
681 role_name_key: create_key(&namespace, ROLE_NAME_KEY),
682 tags_key: create_key(&namespace, TAGS_KEY),
683 }
684 }
685}
686
687#[derive(Debug)]
688struct UnexpectedHttpStatusError {
689 status: http::StatusCode,
690}
691
692impl fmt::Display for UnexpectedHttpStatusError {
693 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
694 write!(f, "got unexpected status code: {}", self.status)
695 }
696}
697
698impl error::Error for UnexpectedHttpStatusError {}
699
700#[derive(Debug, snafu::Snafu)]
701enum Ec2MetadataError {
702 #[snafu(display("Unable to fetch metadata authentication token: {}.", source))]
703 FetchToken { source: crate::Error },
704 #[snafu(display("Unable to parse identity document: {}.", source))]
705 ParseIdentityDocument { source: serde_json::Error },
706 #[snafu(display("Unable to parse metadata path {}, {}.", value, source))]
707 ParsePath {
708 value: String,
709 source: http::uri::InvalidUri,
710 },
711}
712
713#[cfg(test)]
714mod test {
715 use vector_lib::lookup::OwnedTargetPath;
716 use vrl::{owned_value_path, value::Kind};
717
718 use crate::{
719 config::{LogNamespace, OutputId, TransformConfig, schema::Definition},
720 transforms::aws_ec2_metadata::Ec2Metadata,
721 };
722
723 #[tokio::test]
724 async fn schema_def_with_string_input() {
725 let transform_config = Ec2Metadata {
726 namespace: Some(OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into()),
727 ..Default::default()
728 };
729
730 let input_definition =
731 Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
732
733 let mut outputs = transform_config.outputs(
734 &Default::default(),
735 &[(OutputId::dummy(), input_definition)],
736 );
737 assert_eq!(outputs.len(), 1);
738 let output = outputs.pop().unwrap();
739 let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
740 assert!(actual_schema_def.event_kind().is_object());
741 }
742}
743
744#[cfg(feature = "aws-ec2-metadata-integration-tests")]
745#[cfg(test)]
746mod integration_tests {
747 use tokio::sync::mpsc;
748 use tokio_stream::wrappers::ReceiverStream;
749 use vector_lib::{
750 assert_event_data_eq,
751 lookup::{
752 PathPrefix, event_path,
753 lookup_v2::{OwnedSegment, OwnedValuePath},
754 },
755 };
756 use vrl::value::{ObjectMap, Value};
757 use warp::Filter;
758
759 use super::*;
760 use crate::{
761 event::{LogEvent, Metric, metric},
762 test_util::{addr::next_addr, components::assert_transform_compliance},
763 transforms::test::create_topology,
764 };
765
766 fn ec2_metadata_address() -> String {
767 std::env::var("EC2_METADATA_ADDRESS").unwrap_or_else(|_| "http://localhost:1338".into())
768 }
769
770 fn expected_log_fields() -> Vec<(OwnedValuePath, &'static str)> {
771 vec![
772 (
773 vec![OwnedSegment::field(AVAILABILITY_ZONE_KEY)].into(),
774 "us-east-1a",
775 ),
776 (
777 vec![OwnedSegment::field(PUBLIC_IPV4_KEY)].into(),
778 "192.0.2.54",
779 ),
780 (
781 vec![OwnedSegment::field(PUBLIC_HOSTNAME_KEY)].into(),
782 "ec2-192-0-2-54.compute-1.amazonaws.com",
783 ),
784 (
785 vec![OwnedSegment::field(LOCAL_IPV4_KEY)].into(),
786 "172.16.34.43",
787 ),
788 (
789 vec![OwnedSegment::field(LOCAL_HOSTNAME_KEY)].into(),
790 "ip-172-16-34-43.ec2.internal",
791 ),
792 (
793 vec![OwnedSegment::field(INSTANCE_ID_KEY)].into(),
794 "i-1234567890abcdef0",
795 ),
796 (
797 vec![OwnedSegment::field(ACCOUNT_ID_KEY)].into(),
798 "0123456789",
799 ),
800 (
801 vec![OwnedSegment::field(AMI_ID_KEY)].into(),
802 "ami-0b69ea66ff7391e80",
803 ),
804 (
805 vec![OwnedSegment::field(INSTANCE_TYPE_KEY)].into(),
806 "m4.xlarge",
807 ),
808 (vec![OwnedSegment::field(REGION_KEY)].into(), "us-east-1"),
809 (vec![OwnedSegment::field(VPC_ID_KEY)].into(), "vpc-d295a6a7"),
810 (
811 vec![OwnedSegment::field(SUBNET_ID_KEY)].into(),
812 "subnet-0ac62554",
813 ),
814 (owned_value_path!("role-name", 0), "baskinc-role"),
815 (owned_value_path!("tags", "Name"), "test-instance"),
816 (owned_value_path!("tags", "Test"), "test-tag"),
817 ]
818 }
819
820 fn expected_metric_fields() -> Vec<(&'static str, &'static str)> {
821 vec![
822 (AVAILABILITY_ZONE_KEY, "us-east-1a"),
823 (PUBLIC_IPV4_KEY, "192.0.2.54"),
824 (
825 PUBLIC_HOSTNAME_KEY,
826 "ec2-192-0-2-54.compute-1.amazonaws.com",
827 ),
828 (LOCAL_IPV4_KEY, "172.16.34.43"),
829 (LOCAL_HOSTNAME_KEY, "ip-172-16-34-43.ec2.internal"),
830 (INSTANCE_ID_KEY, "i-1234567890abcdef0"),
831 (ACCOUNT_ID_KEY, "0123456789"),
832 (AMI_ID_KEY, "ami-0b69ea66ff7391e80"),
833 (INSTANCE_TYPE_KEY, "m4.xlarge"),
834 (REGION_KEY, "us-east-1"),
835 (VPC_ID_KEY, "vpc-d295a6a7"),
836 (SUBNET_ID_KEY, "subnet-0ac62554"),
837 ("role-name[0]", "baskinc-role"),
838 ("tags[Name]", "test-instance"),
839 ("tags[Test]", "test-tag"),
840 ]
841 }
842
843 fn make_metric() -> Metric {
844 Metric::new(
845 "event",
846 metric::MetricKind::Incremental,
847 metric::MetricValue::Counter { value: 1.0 },
848 )
849 }
850
851 #[test]
852 fn generate_config() {
853 crate::test_util::test_generate_config::<Ec2Metadata>();
854 }
855
856 #[tokio::test]
857 async fn enrich_log() {
858 assert_transform_compliance(async {
859 let mut fields = default_fields();
860 fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
861
862 let tags = vec![
863 String::from("Name"),
864 String::from("Test"),
865 String::from("MISSING_TAG"),
866 ];
867
868 let transform_config = Ec2Metadata {
869 endpoint: ec2_metadata_address(),
870 fields,
871 tags,
872 ..Default::default()
873 };
874
875 let (tx, rx) = mpsc::channel(1);
876 let (topology, mut out) =
877 create_topology(ReceiverStream::new(rx), transform_config).await;
878
879 sleep(Duration::from_secs(1)).await;
881
882 let log = LogEvent::default();
883 let mut expected_log = log.clone();
884 for (k, v) in expected_log_fields().iter().cloned() {
885 expected_log.insert((PathPrefix::Event, &k), v);
886 }
887
888 tx.send(log.into()).await.unwrap();
889
890 let event = out.recv().await.unwrap();
891 assert_event_data_eq!(event.into_log(), expected_log);
892
893 drop(tx);
894 topology.stop().await;
895 assert_eq!(out.recv().await, None);
896 })
897 .await;
898 }
899
900 #[tokio::test(flavor = "multi_thread")]
901 async fn timeout() {
902 let (_guard, addr) = next_addr();
903
904 async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
905 tokio::time::sleep(Duration::from_secs(3)).await;
906 Ok("I waited 3 seconds!")
907 }
908
909 let slow = warp::any().and_then(sleepy);
910 let server = warp::serve(slow).bind(addr);
911 let _server = tokio::spawn(server);
912
913 let config = Ec2Metadata {
914 endpoint: format!("http://{addr}"),
915 refresh_timeout_secs: Duration::from_secs(1),
916 ..Default::default()
917 };
918
919 match config.build(&TransformContext::default()).await {
920 Ok(_) => panic!("expected timeout failure"),
921 Err(err) => assert_eq!(
924 err.to_string(),
925 "Unable to fetch metadata authentication token: deadline has elapsed."
926 ),
927 }
928 }
929
930 #[tokio::test(flavor = "multi_thread")]
932 async fn not_required() {
933 let (_guard, addr) = next_addr();
934
935 async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
936 tokio::time::sleep(Duration::from_secs(3)).await;
937 Ok("I waited 3 seconds!")
938 }
939
940 let slow = warp::any().and_then(sleepy);
941 let server = warp::serve(slow).bind(addr);
942 let _server = tokio::spawn(server);
943
944 let config = Ec2Metadata {
945 endpoint: format!("http://{addr}"),
946 refresh_timeout_secs: Duration::from_secs(1),
947 required: false,
948 ..Default::default()
949 };
950
951 assert!(
952 config.build(&TransformContext::default()).await.is_ok(),
953 "expected no failure because 'required' config value set to false"
954 );
955 }
956
957 #[tokio::test]
958 async fn enrich_metric() {
959 assert_transform_compliance(async {
960 let mut fields = default_fields();
961 fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
962
963 let tags = vec![
964 String::from("Name"),
965 String::from("Test"),
966 String::from("MISSING_TAG"),
967 ];
968
969 let transform_config = Ec2Metadata {
970 endpoint: ec2_metadata_address(),
971 fields,
972 tags,
973 ..Default::default()
974 };
975
976 let (tx, rx) = mpsc::channel(1);
977 let (topology, mut out) =
978 create_topology(ReceiverStream::new(rx), transform_config).await;
979
980 sleep(Duration::from_secs(1)).await;
982
983 let metric = make_metric();
984 let mut expected_metric = metric.clone();
985 for (k, v) in expected_metric_fields().iter() {
986 expected_metric.replace_tag(k.to_string(), v.to_string());
987 }
988
989 tx.send(metric.into()).await.unwrap();
990
991 let event = out.recv().await.unwrap();
992 assert_event_data_eq!(event.into_metric(), expected_metric);
993
994 drop(tx);
995 topology.stop().await;
996 assert_eq!(out.recv().await, None);
997 })
998 .await;
999 }
1000
1001 #[tokio::test]
1002 async fn fields_log() {
1003 assert_transform_compliance(async {
1004 let transform_config = Ec2Metadata {
1005 endpoint: ec2_metadata_address(),
1006 fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1007 tags: vec![
1008 String::from("Name"),
1009 String::from("Test"),
1010 String::from("MISSING_TAG"),
1011 ],
1012 ..Default::default()
1013 };
1014
1015 let (tx, rx) = mpsc::channel(1);
1016 let (topology, mut out) =
1017 create_topology(ReceiverStream::new(rx), transform_config).await;
1018
1019 sleep(Duration::from_secs(1)).await;
1021
1022 let log = LogEvent::default();
1023 let mut expected_log = log.clone();
1024 expected_log.insert(format!("\"{PUBLIC_IPV4_KEY}\"").as_str(), "192.0.2.54");
1025 expected_log.insert(format!("\"{REGION_KEY}\"").as_str(), "us-east-1");
1026 expected_log.insert(
1027 format!("\"{TAGS_KEY}\"").as_str(),
1028 ObjectMap::from([
1029 ("Name".into(), Value::from("test-instance")),
1030 ("Test".into(), Value::from("test-tag")),
1031 ]),
1032 );
1033
1034 tx.send(log.into()).await.unwrap();
1035
1036 let event = out.recv().await.unwrap();
1037 assert_event_data_eq!(event.into_log(), expected_log);
1038
1039 drop(tx);
1040 topology.stop().await;
1041 assert_eq!(out.recv().await, None);
1042 })
1043 .await;
1044 }
1045
1046 #[tokio::test]
1047 async fn fields_metric() {
1048 assert_transform_compliance(async {
1049 let transform_config = Ec2Metadata {
1050 endpoint: ec2_metadata_address(),
1051 fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
1052 tags: vec![
1053 String::from("Name"),
1054 String::from("Test"),
1055 String::from("MISSING_TAG"),
1056 ],
1057 ..Default::default()
1058 };
1059
1060 let (tx, rx) = mpsc::channel(1);
1061 let (topology, mut out) =
1062 create_topology(ReceiverStream::new(rx), transform_config).await;
1063
1064 sleep(Duration::from_secs(1)).await;
1066
1067 let metric = make_metric();
1068 let mut expected_metric = metric.clone();
1069 expected_metric.replace_tag(PUBLIC_IPV4_KEY.to_string(), "192.0.2.54".to_string());
1070 expected_metric.replace_tag(REGION_KEY.to_string(), "us-east-1".to_string());
1071 expected_metric.replace_tag(
1072 format!("{}[{}]", TAGS_KEY, "Name"),
1073 "test-instance".to_string(),
1074 );
1075 expected_metric
1076 .replace_tag(format!("{}[{}]", TAGS_KEY, "Test"), "test-tag".to_string());
1077
1078 tx.send(metric.into()).await.unwrap();
1079
1080 let event = out.recv().await.unwrap();
1081 assert_event_data_eq!(event.into_metric(), expected_metric);
1082
1083 drop(tx);
1084 topology.stop().await;
1085 assert_eq!(out.recv().await, None);
1086 })
1087 .await;
1088 }
1089
1090 #[tokio::test]
1091 async fn namespace_log() {
1092 {
1093 assert_transform_compliance(async {
1094 let transform_config = Ec2Metadata {
1095 endpoint: ec2_metadata_address(),
1096 namespace: Some(
1097 OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1098 ),
1099 ..Default::default()
1100 };
1101
1102 let (tx, rx) = mpsc::channel(1);
1103 let (topology, mut out) =
1104 create_topology(ReceiverStream::new(rx), transform_config).await;
1105
1106 sleep(Duration::from_secs(1)).await;
1108
1109 let log = LogEvent::default();
1110
1111 tx.send(log.into()).await.unwrap();
1112
1113 let event = out.recv().await.unwrap();
1114
1115 assert_eq!(
1116 event.as_log().get("ec2.metadata.\"availability-zone\""),
1117 Some(&"us-east-1a".into())
1118 );
1119
1120 drop(tx);
1121 topology.stop().await;
1122 assert_eq!(out.recv().await, None);
1123 })
1124 .await;
1125 }
1126
1127 {
1128 assert_transform_compliance(async {
1129 let transform_config = Ec2Metadata {
1131 endpoint: ec2_metadata_address(),
1132 namespace: Some(OptionalTargetPath::none()),
1133 ..Default::default()
1134 };
1135
1136 let (tx, rx) = mpsc::channel(1);
1137 let (topology, mut out) =
1138 create_topology(ReceiverStream::new(rx), transform_config).await;
1139
1140 sleep(Duration::from_secs(1)).await;
1142
1143 let log = LogEvent::default();
1144
1145 tx.send(log.into()).await.unwrap();
1146
1147 let event = out.recv().await.unwrap();
1148 assert_eq!(
1149 event.as_log().get(event_path!(AVAILABILITY_ZONE_KEY)),
1150 Some(&"us-east-1a".into())
1151 );
1152
1153 drop(tx);
1154 topology.stop().await;
1155 assert_eq!(out.recv().await, None);
1156 })
1157 .await;
1158 }
1159 }
1160
1161 #[tokio::test]
1162 async fn namespace_metric() {
1163 {
1164 assert_transform_compliance(async {
1165 let transform_config = Ec2Metadata {
1166 endpoint: ec2_metadata_address(),
1167 namespace: Some(
1168 OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
1169 ),
1170 ..Default::default()
1171 };
1172
1173 let (tx, rx) = mpsc::channel(1);
1174 let (topology, mut out) =
1175 create_topology(ReceiverStream::new(rx), transform_config).await;
1176
1177 sleep(Duration::from_secs(1)).await;
1179
1180 let metric = make_metric();
1181
1182 tx.send(metric.into()).await.unwrap();
1183
1184 let event = out.recv().await.unwrap();
1185 assert_eq!(
1186 event
1187 .as_metric()
1188 .tag_value("ec2.metadata.availability-zone"),
1189 Some("us-east-1a".to_string())
1190 );
1191
1192 drop(tx);
1193 topology.stop().await;
1194 assert_eq!(out.recv().await, None);
1195 })
1196 .await;
1197 }
1198
1199 {
1200 assert_transform_compliance(async {
1201 let transform_config = Ec2Metadata {
1203 endpoint: ec2_metadata_address(),
1204 namespace: Some(OptionalTargetPath::none()),
1205 ..Default::default()
1206 };
1207
1208 let (tx, rx) = mpsc::channel(1);
1209 let (topology, mut out) =
1210 create_topology(ReceiverStream::new(rx), transform_config).await;
1211
1212 sleep(Duration::from_secs(1)).await;
1214
1215 let metric = make_metric();
1216
1217 tx.send(metric.into()).await.unwrap();
1218
1219 let event = out.recv().await.unwrap();
1220 assert_eq!(
1221 event.as_metric().tag_value(AVAILABILITY_ZONE_KEY),
1222 Some("us-east-1a".to_string())
1223 );
1224
1225 drop(tx);
1226 topology.stop().await;
1227 assert_eq!(out.recv().await, None);
1228 })
1229 .await;
1230 }
1231 }
1232}