use std::sync::{Arc, LazyLock};
use std::{collections::HashSet, error, fmt, future::ready, pin::Pin};
use arc_swap::ArcSwap;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use http::{uri::PathAndQuery, Request, StatusCode, Uri};
use hyper::{body::to_bytes as body_to_bytes, Body};
use serde::Deserialize;
use serde_with::serde_as;
use snafu::ResultExt as _;
use tokio::time::{sleep, Duration, Instant};
use tracing::Instrument;
use vector_lib::config::LogNamespace;
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::lookup_v2::{OptionalTargetPath, OwnedSegment};
use vector_lib::lookup::owned_value_path;
use vector_lib::lookup::OwnedTargetPath;
use vrl::value::kind::Collection;
use vrl::value::Kind;
use crate::config::OutputId;
use crate::{
config::{DataType, Input, ProxyConfig, TransformConfig, TransformContext, TransformOutput},
event::Event,
http::HttpClient,
internal_events::{AwsEc2MetadataRefreshError, AwsEc2MetadataRefreshSuccessful},
schema,
transforms::{TaskTransform, Transform},
};
const ACCOUNT_ID_KEY: &str = "account-id";
const AMI_ID_KEY: &str = "ami-id";
const AVAILABILITY_ZONE_KEY: &str = "availability-zone";
const INSTANCE_ID_KEY: &str = "instance-id";
const INSTANCE_TYPE_KEY: &str = "instance-type";
const LOCAL_HOSTNAME_KEY: &str = "local-hostname";
const LOCAL_IPV4_KEY: &str = "local-ipv4";
const PUBLIC_HOSTNAME_KEY: &str = "public-hostname";
const PUBLIC_IPV4_KEY: &str = "public-ipv4";
const REGION_KEY: &str = "region";
const SUBNET_ID_KEY: &str = "subnet-id";
const VPC_ID_KEY: &str = "vpc-id";
const ROLE_NAME_KEY: &str = "role-name";
const TAGS_KEY: &str = "tags";
static AVAILABILITY_ZONE: LazyLock<PathAndQuery> =
LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/placement/availability-zone"));
static LOCAL_HOSTNAME: LazyLock<PathAndQuery> =
LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-hostname"));
static LOCAL_IPV4: LazyLock<PathAndQuery> =
LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/local-ipv4"));
static PUBLIC_HOSTNAME: LazyLock<PathAndQuery> =
LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-hostname"));
static PUBLIC_IPV4: LazyLock<PathAndQuery> =
LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/public-ipv4"));
static ROLE_NAME: LazyLock<PathAndQuery> =
LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/iam/security-credentials/"));
static MAC: LazyLock<PathAndQuery> =
LazyLock::new(|| PathAndQuery::from_static("/latest/meta-data/mac"));
static DYNAMIC_DOCUMENT: LazyLock<PathAndQuery> =
LazyLock::new(|| PathAndQuery::from_static("/latest/dynamic/instance-identity/document"));
static DEFAULT_FIELD_ALLOWLIST: &[&str] = &[
AMI_ID_KEY,
AVAILABILITY_ZONE_KEY,
INSTANCE_ID_KEY,
INSTANCE_TYPE_KEY,
LOCAL_HOSTNAME_KEY,
LOCAL_IPV4_KEY,
PUBLIC_HOSTNAME_KEY,
PUBLIC_IPV4_KEY,
REGION_KEY,
SUBNET_ID_KEY,
VPC_ID_KEY,
ROLE_NAME_KEY,
];
static API_TOKEN: LazyLock<PathAndQuery> =
LazyLock::new(|| PathAndQuery::from_static("/latest/api/token"));
static TOKEN_HEADER: LazyLock<Bytes> = LazyLock::new(|| Bytes::from("X-aws-ec2-metadata-token"));
#[serde_as]
#[configurable_component(transform(
"aws_ec2_metadata",
"Parse metadata emitted by AWS EC2 instances."
))]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
pub struct Ec2Metadata {
#[serde(alias = "host", default = "default_endpoint")]
#[derivative(Default(value = "default_endpoint()"))]
endpoint: String,
#[configurable(metadata(
docs::examples = "",
docs::examples = "ec2",
docs::examples = "aws.ec2",
))]
namespace: Option<OptionalTargetPath>,
#[serde(default = "default_refresh_interval_secs")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[derivative(Default(value = "default_refresh_interval_secs()"))]
refresh_interval_secs: Duration,
#[serde(default = "default_fields")]
#[derivative(Default(value = "default_fields()"))]
#[configurable(metadata(docs::examples = "instance-id", docs::examples = "local-hostname",))]
fields: Vec<String>,
#[serde(default = "default_tags")]
#[derivative(Default(value = "default_tags()"))]
#[configurable(metadata(docs::examples = "Name", docs::examples = "Project",))]
tags: Vec<String>,
#[serde(default = "default_refresh_timeout_secs")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[derivative(Default(value = "default_refresh_timeout_secs()"))]
refresh_timeout_secs: Duration,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
proxy: ProxyConfig,
#[serde(default = "default_required")]
#[derivative(Default(value = "default_required()"))]
required: bool,
}
fn default_endpoint() -> String {
String::from("http://169.254.169.254")
}
const fn default_refresh_interval_secs() -> Duration {
Duration::from_secs(10)
}
const fn default_refresh_timeout_secs() -> Duration {
Duration::from_secs(1)
}
fn default_fields() -> Vec<String> {
DEFAULT_FIELD_ALLOWLIST
.iter()
.map(|s| s.to_string())
.collect()
}
const fn default_tags() -> Vec<String> {
Vec::<String>::new()
}
const fn default_required() -> bool {
true
}
#[derive(Clone, Debug)]
pub struct Ec2MetadataTransform {
state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
}
#[derive(Debug, Clone)]
struct MetadataKey {
log_path: OwnedTargetPath,
metric_tag: String,
}
#[derive(Debug)]
struct Keys {
account_id_key: MetadataKey,
ami_id_key: MetadataKey,
availability_zone_key: MetadataKey,
instance_id_key: MetadataKey,
instance_type_key: MetadataKey,
local_hostname_key: MetadataKey,
local_ipv4_key: MetadataKey,
public_hostname_key: MetadataKey,
public_ipv4_key: MetadataKey,
region_key: MetadataKey,
subnet_id_key: MetadataKey,
vpc_id_key: MetadataKey,
role_name_key: MetadataKey,
tags_key: MetadataKey,
}
impl_generate_config_from_default!(Ec2Metadata);
#[async_trait::async_trait]
#[typetag::serde(name = "aws_ec2_metadata")]
impl TransformConfig for Ec2Metadata {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
let state = Arc::new(ArcSwap::new(Arc::new(vec![])));
let keys = Keys::new(self.namespace.clone());
let host = Uri::from_maybe_shared(self.endpoint.clone()).unwrap();
let refresh_interval = self.refresh_interval_secs;
let fields = self.fields.clone();
let tags = self.tags.clone();
let refresh_timeout = self.refresh_timeout_secs;
let required = self.required;
let proxy = ProxyConfig::merge_with_env(&context.globals.proxy, &self.proxy);
let http_client = HttpClient::new(None, &proxy)?;
let mut client = MetadataClient::new(
http_client,
host,
keys,
Arc::clone(&state),
refresh_interval,
refresh_timeout,
fields,
tags,
);
if let Err(error) = client.refresh_metadata().await {
if required {
return Err(error);
} else {
emit!(AwsEc2MetadataRefreshError { error });
}
}
tokio::spawn(
async move {
client.run().await;
}
.instrument(info_span!("aws_ec2_metadata: worker").or_current()),
);
Ok(Transform::event_task(Ec2MetadataTransform { state }))
}
fn input(&self) -> Input {
Input::new(DataType::Metric | DataType::Log)
}
fn outputs(
&self,
_: vector_lib::enrichment::TableRegistry,
input_definitions: &[(OutputId, schema::Definition)],
_: LogNamespace,
) -> Vec<TransformOutput> {
let added_keys = Keys::new(self.namespace.clone());
let paths = [
&added_keys.account_id_key.log_path,
&added_keys.ami_id_key.log_path,
&added_keys.availability_zone_key.log_path,
&added_keys.instance_id_key.log_path,
&added_keys.instance_type_key.log_path,
&added_keys.local_hostname_key.log_path,
&added_keys.local_ipv4_key.log_path,
&added_keys.public_hostname_key.log_path,
&added_keys.public_ipv4_key.log_path,
&added_keys.region_key.log_path,
&added_keys.subnet_id_key.log_path,
&added_keys.vpc_id_key.log_path,
&added_keys.role_name_key.log_path,
&added_keys.tags_key.log_path,
];
let schema_definition = input_definitions
.iter()
.map(|(output, definition)| {
let mut schema_definition = definition.clone();
if !schema_definition.event_kind().contains_object() {
*schema_definition.event_kind_mut() = Kind::object(Collection::empty());
}
for path in paths {
schema_definition =
schema_definition.with_field(path, Kind::bytes().or_undefined(), None);
}
(output.clone(), schema_definition)
})
.collect();
vec![TransformOutput::new(
DataType::Metric | DataType::Log,
schema_definition,
)]
}
}
impl TaskTransform<Event> for Ec2MetadataTransform {
fn transform(
self: Box<Self>,
task: Pin<Box<dyn Stream<Item = Event> + Send>>,
) -> Pin<Box<dyn Stream<Item = Event> + Send>>
where
Self: 'static,
{
let mut inner = self;
Box::pin(task.filter_map(move |event| ready(Some(inner.transform_one(event)))))
}
}
impl Ec2MetadataTransform {
fn transform_one(&mut self, mut event: Event) -> Event {
let state = self.state.load();
match event {
Event::Log(ref mut log) => {
state.iter().for_each(|(k, v)| {
log.insert(&k.log_path, v.clone());
});
}
Event::Metric(ref mut metric) => {
state.iter().for_each(|(k, v)| {
metric
.replace_tag(k.metric_tag.clone(), String::from_utf8_lossy(v).to_string());
});
}
Event::Trace(_) => panic!("Traces are not supported."),
}
event
}
}
struct MetadataClient {
client: HttpClient<Body>,
host: Uri,
token: Option<(Bytes, Instant)>,
keys: Keys,
state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
refresh_interval: Duration,
refresh_timeout: Duration,
fields: HashSet<String>,
tags: HashSet<String>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
#[allow(dead_code)] struct IdentityDocument {
account_id: String,
architecture: String,
image_id: String,
instance_id: String,
instance_type: String,
private_ip: String,
region: String,
version: String,
}
impl MetadataClient {
#[allow(clippy::too_many_arguments)]
pub fn new(
client: HttpClient<Body>,
host: Uri,
keys: Keys,
state: Arc<ArcSwap<Vec<(MetadataKey, Bytes)>>>,
refresh_interval: Duration,
refresh_timeout: Duration,
fields: Vec<String>,
tags: Vec<String>,
) -> Self {
Self {
client,
host,
token: None,
keys,
state,
refresh_interval,
refresh_timeout,
fields: fields.into_iter().collect(),
tags: tags.into_iter().collect(),
}
}
async fn run(&mut self) {
loop {
match self.refresh_metadata().await {
Ok(_) => {
emit!(AwsEc2MetadataRefreshSuccessful);
}
Err(error) => {
emit!(AwsEc2MetadataRefreshError { error });
}
}
sleep(self.refresh_interval).await;
}
}
pub async fn get_token(&mut self) -> Result<Bytes, crate::Error> {
if let Some((token, next_refresh)) = self.token.clone() {
if next_refresh > Instant::now() {
return Ok(token);
}
}
let mut parts = self.host.clone().into_parts();
parts.path_and_query = Some(API_TOKEN.clone());
let uri = Uri::from_parts(parts)?;
let req = Request::put(uri)
.header("X-aws-ec2-metadata-token-ttl-seconds", "21600")
.body(Body::empty())?;
let res = tokio::time::timeout(self.refresh_timeout, self.client.send(req))
.await?
.map_err(crate::Error::from)
.and_then(|res| match res.status() {
StatusCode::OK => Ok(res),
status_code => Err(UnexpectedHttpStatusError {
status: status_code,
}
.into()),
})?;
let token = body_to_bytes(res.into_body()).await?;
let next_refresh = Instant::now() + Duration::from_secs(21600);
self.token = Some((token.clone(), next_refresh));
Ok(token)
}
pub async fn get_document(&mut self) -> Result<Option<IdentityDocument>, crate::Error> {
self.get_metadata(&DYNAMIC_DOCUMENT)
.await?
.map(|body| {
serde_json::from_slice(&body[..])
.context(ParseIdentityDocumentSnafu {})
.map_err(Into::into)
})
.transpose()
}
pub async fn refresh_metadata(&mut self) -> Result<(), crate::Error> {
let mut new_state = vec![];
if let Some(document) = self.get_document().await? {
if self.fields.contains(ACCOUNT_ID_KEY) {
new_state.push((self.keys.account_id_key.clone(), document.account_id.into()));
}
if self.fields.contains(AMI_ID_KEY) {
new_state.push((self.keys.ami_id_key.clone(), document.image_id.into()));
}
if self.fields.contains(INSTANCE_ID_KEY) {
new_state.push((
self.keys.instance_id_key.clone(),
document.instance_id.into(),
));
}
if self.fields.contains(INSTANCE_TYPE_KEY) {
new_state.push((
self.keys.instance_type_key.clone(),
document.instance_type.into(),
));
}
if self.fields.contains(REGION_KEY) {
new_state.push((self.keys.region_key.clone(), document.region.into()));
}
if self.fields.contains(AVAILABILITY_ZONE_KEY) {
if let Some(availability_zone) = self.get_metadata(&AVAILABILITY_ZONE).await? {
new_state.push((self.keys.availability_zone_key.clone(), availability_zone));
}
}
if self.fields.contains(LOCAL_HOSTNAME_KEY) {
if let Some(local_hostname) = self.get_metadata(&LOCAL_HOSTNAME).await? {
new_state.push((self.keys.local_hostname_key.clone(), local_hostname));
}
}
if self.fields.contains(LOCAL_IPV4_KEY) {
if let Some(local_ipv4) = self.get_metadata(&LOCAL_IPV4).await? {
new_state.push((self.keys.local_ipv4_key.clone(), local_ipv4));
}
}
if self.fields.contains(PUBLIC_HOSTNAME_KEY) {
if let Some(public_hostname) = self.get_metadata(&PUBLIC_HOSTNAME).await? {
new_state.push((self.keys.public_hostname_key.clone(), public_hostname));
}
}
if self.fields.contains(PUBLIC_IPV4_KEY) {
if let Some(public_ipv4) = self.get_metadata(&PUBLIC_IPV4).await? {
new_state.push((self.keys.public_ipv4_key.clone(), public_ipv4));
}
}
if self.fields.contains(SUBNET_ID_KEY) || self.fields.contains(VPC_ID_KEY) {
if let Some(mac) = self.get_metadata(&MAC).await? {
let mac = String::from_utf8_lossy(&mac[..]);
if self.fields.contains(SUBNET_ID_KEY) {
let subnet_path = format!(
"/latest/meta-data/network/interfaces/macs/{}/subnet-id",
mac
);
let subnet_path = subnet_path.parse().context(ParsePathSnafu {
value: subnet_path.clone(),
})?;
if let Some(subnet_id) = self.get_metadata(&subnet_path).await? {
new_state.push((self.keys.subnet_id_key.clone(), subnet_id));
}
}
if self.fields.contains(VPC_ID_KEY) {
let vpc_path =
format!("/latest/meta-data/network/interfaces/macs/{}/vpc-id", mac);
let vpc_path = vpc_path.parse().context(ParsePathSnafu {
value: vpc_path.clone(),
})?;
if let Some(vpc_id) = self.get_metadata(&vpc_path).await? {
new_state.push((self.keys.vpc_id_key.clone(), vpc_id));
}
}
}
}
if self.fields.contains(ROLE_NAME_KEY) {
if let Some(role_names) = self.get_metadata(&ROLE_NAME).await? {
let role_names = String::from_utf8_lossy(&role_names[..]);
for (i, role_name) in role_names.lines().enumerate() {
new_state.push((
MetadataKey {
log_path: self
.keys
.role_name_key
.log_path
.with_index_appended(i as isize),
metric_tag: format!(
"{}[{}]",
self.keys.role_name_key.metric_tag, i
),
},
role_name.to_string().into(),
));
}
}
}
for tag in self.tags.clone() {
let tag_path = format!("/latest/meta-data/tags/instance/{}", tag);
let tag_path = tag_path.parse().context(ParsePathSnafu {
value: tag_path.clone(),
})?;
if let Some(tag_content) = self.get_metadata(&tag_path).await? {
new_state.push((
MetadataKey {
log_path: self.keys.tags_key.log_path.with_field_appended(&tag),
metric_tag: format!("{}[{}]", self.keys.tags_key.metric_tag, &tag),
},
tag_content,
));
}
}
self.state.store(Arc::new(new_state));
}
Ok(())
}
async fn get_metadata(&mut self, path: &PathAndQuery) -> Result<Option<Bytes>, crate::Error> {
let token = self
.get_token()
.await
.with_context(|_| FetchTokenSnafu {})?;
let mut parts = self.host.clone().into_parts();
parts.path_and_query = Some(path.clone());
let uri = Uri::from_parts(parts)?;
debug!(message = "Sending metadata request.", %uri);
let req = Request::get(uri)
.header(TOKEN_HEADER.as_ref(), token.as_ref())
.body(Body::empty())?;
match tokio::time::timeout(self.refresh_timeout, self.client.send(req))
.await?
.map_err(crate::Error::from)
.and_then(|res| match res.status() {
StatusCode::OK => Ok(Some(res)),
StatusCode::NOT_FOUND => Ok(None),
status_code => Err(UnexpectedHttpStatusError {
status: status_code,
}
.into()),
})? {
Some(res) => {
let body = body_to_bytes(res.into_body()).await?;
Ok(Some(body))
}
None => Ok(None),
}
}
}
fn create_metric_namespace(namespace: &OwnedTargetPath) -> String {
let mut output = String::new();
for segment in &namespace.path.segments {
if !output.is_empty() {
output += ".";
}
match segment {
OwnedSegment::Field(field) => {
output += field;
}
OwnedSegment::Index(i) => {
output += &i.to_string();
}
}
}
output
}
fn create_key(namespace: &Option<OwnedTargetPath>, key: &str) -> MetadataKey {
if let Some(namespace) = namespace {
MetadataKey {
log_path: namespace.with_field_appended(key),
metric_tag: format!("{}.{}", create_metric_namespace(namespace), key),
}
} else {
MetadataKey {
log_path: OwnedTargetPath::event(owned_value_path!(key)),
metric_tag: key.to_owned(),
}
}
}
impl Keys {
pub fn new(namespace: Option<OptionalTargetPath>) -> Self {
let namespace = namespace.and_then(|namespace| namespace.path);
Keys {
account_id_key: create_key(&namespace, ACCOUNT_ID_KEY),
ami_id_key: create_key(&namespace, AMI_ID_KEY),
availability_zone_key: create_key(&namespace, AVAILABILITY_ZONE_KEY),
instance_id_key: create_key(&namespace, INSTANCE_ID_KEY),
instance_type_key: create_key(&namespace, INSTANCE_TYPE_KEY),
local_hostname_key: create_key(&namespace, LOCAL_HOSTNAME_KEY),
local_ipv4_key: create_key(&namespace, LOCAL_IPV4_KEY),
public_hostname_key: create_key(&namespace, PUBLIC_HOSTNAME_KEY),
public_ipv4_key: create_key(&namespace, PUBLIC_IPV4_KEY),
region_key: create_key(&namespace, REGION_KEY),
subnet_id_key: create_key(&namespace, SUBNET_ID_KEY),
vpc_id_key: create_key(&namespace, VPC_ID_KEY),
role_name_key: create_key(&namespace, ROLE_NAME_KEY),
tags_key: create_key(&namespace, TAGS_KEY),
}
}
}
#[derive(Debug)]
struct UnexpectedHttpStatusError {
status: http::StatusCode,
}
impl fmt::Display for UnexpectedHttpStatusError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "got unexpected status code: {}", self.status)
}
}
impl error::Error for UnexpectedHttpStatusError {}
#[derive(Debug, snafu::Snafu)]
enum Ec2MetadataError {
#[snafu(display("Unable to fetch metadata authentication token: {}.", source))]
FetchToken { source: crate::Error },
#[snafu(display("Unable to parse identity document: {}.", source))]
ParseIdentityDocument { source: serde_json::Error },
#[snafu(display("Unable to parse metadata path {}, {}.", value, source))]
ParsePath {
value: String,
source: http::uri::InvalidUri,
},
}
#[cfg(test)]
mod test {
use crate::config::schema::Definition;
use crate::config::{LogNamespace, OutputId, TransformConfig};
use crate::transforms::aws_ec2_metadata::Ec2Metadata;
use vector_lib::enrichment::TableRegistry;
use vector_lib::lookup::OwnedTargetPath;
use vrl::owned_value_path;
use vrl::value::Kind;
#[tokio::test]
async fn schema_def_with_string_input() {
let transform_config = Ec2Metadata {
namespace: Some(OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into()),
..Default::default()
};
let input_definition =
Definition::new(Kind::bytes(), Kind::any_object(), [LogNamespace::Vector]);
let mut outputs = transform_config.outputs(
TableRegistry::default(),
&[(OutputId::dummy(), input_definition)],
LogNamespace::Vector,
);
assert_eq!(outputs.len(), 1);
let output = outputs.pop().unwrap();
let actual_schema_def = output.schema_definitions(true)[&OutputId::dummy()].clone();
assert!(actual_schema_def.event_kind().is_object());
}
}
#[cfg(feature = "aws-ec2-metadata-integration-tests")]
#[cfg(test)]
mod integration_tests {
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use vector_lib::lookup::lookup_v2::{OwnedSegment, OwnedValuePath};
use vector_lib::lookup::{event_path, PathPrefix};
use super::*;
use crate::{
event::{metric, LogEvent, Metric},
test_util::{components::assert_transform_compliance, next_addr},
transforms::test::create_topology,
};
use vector_lib::assert_event_data_eq;
use vrl::value::{ObjectMap, Value};
use warp::Filter;
fn ec2_metadata_address() -> String {
std::env::var("EC2_METADATA_ADDRESS").unwrap_or_else(|_| "http://localhost:1338".into())
}
fn expected_log_fields() -> Vec<(OwnedValuePath, &'static str)> {
vec![
(
vec![OwnedSegment::field(AVAILABILITY_ZONE_KEY)].into(),
"us-east-1a",
),
(
vec![OwnedSegment::field(PUBLIC_IPV4_KEY)].into(),
"192.0.2.54",
),
(
vec![OwnedSegment::field(PUBLIC_HOSTNAME_KEY)].into(),
"ec2-192-0-2-54.compute-1.amazonaws.com",
),
(
vec![OwnedSegment::field(LOCAL_IPV4_KEY)].into(),
"172.16.34.43",
),
(
vec![OwnedSegment::field(LOCAL_HOSTNAME_KEY)].into(),
"ip-172-16-34-43.ec2.internal",
),
(
vec![OwnedSegment::field(INSTANCE_ID_KEY)].into(),
"i-1234567890abcdef0",
),
(
vec![OwnedSegment::field(ACCOUNT_ID_KEY)].into(),
"0123456789",
),
(
vec![OwnedSegment::field(AMI_ID_KEY)].into(),
"ami-0b69ea66ff7391e80",
),
(
vec![OwnedSegment::field(INSTANCE_TYPE_KEY)].into(),
"m4.xlarge",
),
(vec![OwnedSegment::field(REGION_KEY)].into(), "us-east-1"),
(vec![OwnedSegment::field(VPC_ID_KEY)].into(), "vpc-d295a6a7"),
(
vec![OwnedSegment::field(SUBNET_ID_KEY)].into(),
"subnet-0ac62554",
),
(owned_value_path!("role-name", 0), "baskinc-role"),
(owned_value_path!("tags", "Name"), "test-instance"),
(owned_value_path!("tags", "Test"), "test-tag"),
]
}
fn expected_metric_fields() -> Vec<(&'static str, &'static str)> {
vec![
(AVAILABILITY_ZONE_KEY, "us-east-1a"),
(PUBLIC_IPV4_KEY, "192.0.2.54"),
(
PUBLIC_HOSTNAME_KEY,
"ec2-192-0-2-54.compute-1.amazonaws.com",
),
(LOCAL_IPV4_KEY, "172.16.34.43"),
(LOCAL_HOSTNAME_KEY, "ip-172-16-34-43.ec2.internal"),
(INSTANCE_ID_KEY, "i-1234567890abcdef0"),
(ACCOUNT_ID_KEY, "0123456789"),
(AMI_ID_KEY, "ami-0b69ea66ff7391e80"),
(INSTANCE_TYPE_KEY, "m4.xlarge"),
(REGION_KEY, "us-east-1"),
(VPC_ID_KEY, "vpc-d295a6a7"),
(SUBNET_ID_KEY, "subnet-0ac62554"),
("role-name[0]", "baskinc-role"),
("tags[Name]", "test-instance"),
("tags[Test]", "test-tag"),
]
}
fn make_metric() -> Metric {
Metric::new(
"event",
metric::MetricKind::Incremental,
metric::MetricValue::Counter { value: 1.0 },
)
}
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<Ec2Metadata>();
}
#[tokio::test]
async fn enrich_log() {
assert_transform_compliance(async {
let mut fields = default_fields();
fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
let tags = vec![
String::from("Name"),
String::from("Test"),
String::from("MISSING_TAG"),
];
let transform_config = Ec2Metadata {
endpoint: ec2_metadata_address(),
fields,
tags,
..Default::default()
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) =
create_topology(ReceiverStream::new(rx), transform_config).await;
sleep(Duration::from_secs(1)).await;
let log = LogEvent::default();
let mut expected_log = log.clone();
for (k, v) in expected_log_fields().iter().cloned() {
expected_log.insert((PathPrefix::Event, &k), v);
}
tx.send(log.into()).await.unwrap();
let event = out.recv().await.unwrap();
assert_event_data_eq!(event.into_log(), expected_log);
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
#[tokio::test(flavor = "multi_thread")]
async fn timeout() {
let addr = next_addr();
async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
tokio::time::sleep(Duration::from_secs(3)).await;
Ok("I waited 3 seconds!")
}
let slow = warp::any().and_then(sleepy);
let server = warp::serve(slow).bind(addr);
let _server = tokio::spawn(server);
let config = Ec2Metadata {
endpoint: format!("http://{}", addr),
refresh_timeout_secs: Duration::from_secs(1),
..Default::default()
};
match config.build(&TransformContext::default()).await {
Ok(_) => panic!("expected timeout failure"),
Err(err) => assert_eq!(
err.to_string(),
"Unable to fetch metadata authentication token: deadline has elapsed."
),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn not_required() {
let addr = next_addr();
async fn sleepy() -> Result<impl warp::Reply, std::convert::Infallible> {
tokio::time::sleep(Duration::from_secs(3)).await;
Ok("I waited 3 seconds!")
}
let slow = warp::any().and_then(sleepy);
let server = warp::serve(slow).bind(addr);
let _server = tokio::spawn(server);
let config = Ec2Metadata {
endpoint: format!("http://{}", addr),
refresh_timeout_secs: Duration::from_secs(1),
required: false,
..Default::default()
};
assert!(
config.build(&TransformContext::default()).await.is_ok(),
"expected no failure because 'required' config value set to false"
);
}
#[tokio::test]
async fn enrich_metric() {
assert_transform_compliance(async {
let mut fields = default_fields();
fields.extend(vec![String::from(ACCOUNT_ID_KEY)].into_iter());
let tags = vec![
String::from("Name"),
String::from("Test"),
String::from("MISSING_TAG"),
];
let transform_config = Ec2Metadata {
endpoint: ec2_metadata_address(),
fields,
tags,
..Default::default()
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) =
create_topology(ReceiverStream::new(rx), transform_config).await;
sleep(Duration::from_secs(1)).await;
let metric = make_metric();
let mut expected_metric = metric.clone();
for (k, v) in expected_metric_fields().iter() {
expected_metric.replace_tag(k.to_string(), v.to_string());
}
tx.send(metric.into()).await.unwrap();
let event = out.recv().await.unwrap();
assert_event_data_eq!(event.into_metric(), expected_metric);
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
#[tokio::test]
async fn fields_log() {
assert_transform_compliance(async {
let transform_config = Ec2Metadata {
endpoint: ec2_metadata_address(),
fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
tags: vec![
String::from("Name"),
String::from("Test"),
String::from("MISSING_TAG"),
],
..Default::default()
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) =
create_topology(ReceiverStream::new(rx), transform_config).await;
sleep(Duration::from_secs(1)).await;
let log = LogEvent::default();
let mut expected_log = log.clone();
expected_log.insert(format!("\"{}\"", PUBLIC_IPV4_KEY).as_str(), "192.0.2.54");
expected_log.insert(format!("\"{}\"", REGION_KEY).as_str(), "us-east-1");
expected_log.insert(
format!("\"{}\"", TAGS_KEY).as_str(),
ObjectMap::from([
("Name".into(), Value::from("test-instance")),
("Test".into(), Value::from("test-tag")),
]),
);
tx.send(log.into()).await.unwrap();
let event = out.recv().await.unwrap();
assert_event_data_eq!(event.into_log(), expected_log);
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
#[tokio::test]
async fn fields_metric() {
assert_transform_compliance(async {
let transform_config = Ec2Metadata {
endpoint: ec2_metadata_address(),
fields: vec![PUBLIC_IPV4_KEY.into(), REGION_KEY.into()],
tags: vec![
String::from("Name"),
String::from("Test"),
String::from("MISSING_TAG"),
],
..Default::default()
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) =
create_topology(ReceiverStream::new(rx), transform_config).await;
sleep(Duration::from_secs(1)).await;
let metric = make_metric();
let mut expected_metric = metric.clone();
expected_metric.replace_tag(PUBLIC_IPV4_KEY.to_string(), "192.0.2.54".to_string());
expected_metric.replace_tag(REGION_KEY.to_string(), "us-east-1".to_string());
expected_metric.replace_tag(
format!("{}[{}]", TAGS_KEY, "Name"),
"test-instance".to_string(),
);
expected_metric
.replace_tag(format!("{}[{}]", TAGS_KEY, "Test"), "test-tag".to_string());
tx.send(metric.into()).await.unwrap();
let event = out.recv().await.unwrap();
assert_event_data_eq!(event.into_metric(), expected_metric);
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
#[tokio::test]
async fn namespace_log() {
{
assert_transform_compliance(async {
let transform_config = Ec2Metadata {
endpoint: ec2_metadata_address(),
namespace: Some(
OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
),
..Default::default()
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) =
create_topology(ReceiverStream::new(rx), transform_config).await;
sleep(Duration::from_secs(1)).await;
let log = LogEvent::default();
tx.send(log.into()).await.unwrap();
let event = out.recv().await.unwrap();
assert_eq!(
event.as_log().get("ec2.metadata.\"availability-zone\""),
Some(&"us-east-1a".into())
);
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
{
assert_transform_compliance(async {
let transform_config = Ec2Metadata {
endpoint: ec2_metadata_address(),
namespace: Some(OptionalTargetPath::none()),
..Default::default()
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) =
create_topology(ReceiverStream::new(rx), transform_config).await;
sleep(Duration::from_secs(1)).await;
let log = LogEvent::default();
tx.send(log.into()).await.unwrap();
let event = out.recv().await.unwrap();
assert_eq!(
event.as_log().get(event_path!(AVAILABILITY_ZONE_KEY)),
Some(&"us-east-1a".into())
);
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
}
#[tokio::test]
async fn namespace_metric() {
{
assert_transform_compliance(async {
let transform_config = Ec2Metadata {
endpoint: ec2_metadata_address(),
namespace: Some(
OwnedTargetPath::event(owned_value_path!("ec2", "metadata")).into(),
),
..Default::default()
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) =
create_topology(ReceiverStream::new(rx), transform_config).await;
sleep(Duration::from_secs(1)).await;
let metric = make_metric();
tx.send(metric.into()).await.unwrap();
let event = out.recv().await.unwrap();
assert_eq!(
event
.as_metric()
.tag_value("ec2.metadata.availability-zone"),
Some("us-east-1a".to_string())
);
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
{
assert_transform_compliance(async {
let transform_config = Ec2Metadata {
endpoint: ec2_metadata_address(),
namespace: Some(OptionalTargetPath::none()),
..Default::default()
};
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) =
create_topology(ReceiverStream::new(rx), transform_config).await;
sleep(Duration::from_secs(1)).await;
let metric = make_metric();
tx.send(metric.into()).await.unwrap();
let event = out.recv().await.unwrap();
assert_eq!(
event.as_metric().tag_value(AVAILABILITY_ZONE_KEY),
Some("us-east-1a".to_string())
);
drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}
}
}