use bytes::Bytes;
use chrono::{DateTime, TimeZone, Utc};
use lookup::path;
use ordered_float::NotNan;
use std::collections::BTreeMap;
use vector_core::{
config::{log_schema, LegacyKey, LogNamespace},
event::{Event, LogEvent, TraceEvent},
};
use vrl::value::KeyString;
use vrl::{
event_path,
value::{ObjectMap, Value},
};
use super::proto::{
common::v1::{any_value::Value as PBValue, InstrumentationScope, KeyValue},
logs::v1::{LogRecord, ResourceLogs, SeverityNumber},
resource::v1::Resource,
trace::v1::{
span::{Event as SpanEvent, Link},
ResourceSpans, Span, Status as SpanStatus,
},
};
const SOURCE_NAME: &str = "opentelemetry";
pub const RESOURCE_KEY: &str = "resources";
pub const ATTRIBUTES_KEY: &str = "attributes";
pub const SCOPE_KEY: &str = "scope";
pub const NAME_KEY: &str = "name";
pub const VERSION_KEY: &str = "version";
pub const TRACE_ID_KEY: &str = "trace_id";
pub const SPAN_ID_KEY: &str = "span_id";
pub const SEVERITY_TEXT_KEY: &str = "severity_text";
pub const SEVERITY_NUMBER_KEY: &str = "severity_number";
pub const OBSERVED_TIMESTAMP_KEY: &str = "observed_timestamp";
pub const DROPPED_ATTRIBUTES_COUNT_KEY: &str = "dropped_attributes_count";
pub const FLAGS_KEY: &str = "flags";
impl ResourceLogs {
pub fn into_event_iter(self, log_namespace: LogNamespace) -> impl Iterator<Item = Event> {
let now = Utc::now();
self.scope_logs.into_iter().flat_map(move |scope_log| {
let scope = scope_log.scope;
let resource = self.resource.clone();
scope_log.log_records.into_iter().map(move |log_record| {
ResourceLog {
resource: resource.clone(),
scope: scope.clone(),
log_record,
}
.into_event(log_namespace, now)
})
})
}
}
impl ResourceSpans {
pub fn into_event_iter(self) -> impl Iterator<Item = Event> {
let resource = self.resource;
let now = Utc::now();
self.scope_spans
.into_iter()
.flat_map(|instrumentation_library_spans| instrumentation_library_spans.spans)
.map(move |span| {
ResourceSpan {
resource: resource.clone(),
span,
}
.into_event(now)
})
}
}
impl From<PBValue> for Value {
fn from(av: PBValue) -> Self {
match av {
PBValue::StringValue(v) => Value::Bytes(Bytes::from(v)),
PBValue::BoolValue(v) => Value::Boolean(v),
PBValue::IntValue(v) => Value::Integer(v),
PBValue::DoubleValue(v) => Value::Float(NotNan::new(v).unwrap()),
PBValue::BytesValue(v) => Value::Bytes(Bytes::from(v)),
PBValue::ArrayValue(arr) => Value::Array(
arr.values
.into_iter()
.map(|av| av.value.map(Into::into).unwrap_or(Value::Null))
.collect::<Vec<Value>>(),
),
PBValue::KvlistValue(arr) => kv_list_into_value(arr.values),
}
}
}
struct ResourceLog {
resource: Option<Resource>,
scope: Option<InstrumentationScope>,
log_record: LogRecord,
}
struct ResourceSpan {
resource: Option<Resource>,
span: Span,
}
fn kv_list_into_value(arr: Vec<KeyValue>) -> Value {
Value::Object(
arr.into_iter()
.filter_map(|kv| {
kv.value.map(|av| {
(
kv.key.into(),
av.value.map(Into::into).unwrap_or(Value::Null),
)
})
})
.collect::<ObjectMap>(),
)
}
fn to_hex(d: &[u8]) -> String {
if d.is_empty() {
return "".to_string();
}
hex::encode(d)
}
impl ResourceSpan {
fn into_event(self, now: DateTime<Utc>) -> Event {
let mut trace = TraceEvent::default();
let span = self.span;
trace.insert(
event_path!(TRACE_ID_KEY),
Value::from(to_hex(&span.trace_id)),
);
trace.insert(event_path!(SPAN_ID_KEY), Value::from(to_hex(&span.span_id)));
trace.insert(event_path!("trace_state"), span.trace_state);
trace.insert(
event_path!("parent_span_id"),
Value::from(to_hex(&span.parent_span_id)),
);
trace.insert(event_path!("name"), span.name);
trace.insert(event_path!("kind"), span.kind);
trace.insert(
event_path!("start_time_unix_nano"),
Value::from(Utc.timestamp_nanos(span.start_time_unix_nano as i64)),
);
trace.insert(
event_path!("end_time_unix_nano"),
Value::from(Utc.timestamp_nanos(span.end_time_unix_nano as i64)),
);
if !span.attributes.is_empty() {
trace.insert(
event_path!(ATTRIBUTES_KEY),
kv_list_into_value(span.attributes),
);
}
trace.insert(
event_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
Value::from(span.dropped_attributes_count),
);
if !span.events.is_empty() {
trace.insert(
event_path!("events"),
Value::Array(span.events.into_iter().map(Into::into).collect()),
);
}
trace.insert(
event_path!("dropped_events_count"),
Value::from(span.dropped_events_count),
);
if !span.links.is_empty() {
trace.insert(
event_path!("links"),
Value::Array(span.links.into_iter().map(Into::into).collect()),
);
}
trace.insert(
event_path!("dropped_links_count"),
Value::from(span.dropped_links_count),
);
trace.insert(event_path!("status"), Value::from(span.status));
if let Some(resource) = self.resource {
if !resource.attributes.is_empty() {
trace.insert(
event_path!(RESOURCE_KEY),
kv_list_into_value(resource.attributes),
);
}
}
trace.insert(event_path!("ingest_timestamp"), Value::from(now));
trace.into()
}
}
impl ResourceLog {
fn into_event(self, log_namespace: LogNamespace, now: DateTime<Utc>) -> Event {
let mut log = match log_namespace {
LogNamespace::Vector => {
if let Some(v) = self.log_record.body.and_then(|av| av.value) {
LogEvent::from(<PBValue as Into<Value>>::into(v))
} else {
LogEvent::from(Value::Null)
}
}
LogNamespace::Legacy => {
let mut log = LogEvent::default();
if let Some(v) = self.log_record.body.and_then(|av| av.value) {
log.maybe_insert(log_schema().message_key_target_path(), v);
}
log
}
};
if let Some(scope) = self.scope {
if !scope.name.is_empty() {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(SCOPE_KEY, NAME_KEY))),
path!(SCOPE_KEY, NAME_KEY),
scope.name,
);
}
if !scope.version.is_empty() {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(SCOPE_KEY, VERSION_KEY))),
path!(SCOPE_KEY, VERSION_KEY),
scope.version,
);
}
if !scope.attributes.is_empty() {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(SCOPE_KEY, ATTRIBUTES_KEY))),
path!(SCOPE_KEY, ATTRIBUTES_KEY),
kv_list_into_value(scope.attributes),
);
}
if scope.dropped_attributes_count > 0 {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(
SCOPE_KEY,
DROPPED_ATTRIBUTES_COUNT_KEY
))),
path!(SCOPE_KEY, DROPPED_ATTRIBUTES_COUNT_KEY),
scope.dropped_attributes_count,
);
}
}
if let Some(resource) = self.resource {
if !resource.attributes.is_empty() {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(RESOURCE_KEY))),
path!(RESOURCE_KEY),
kv_list_into_value(resource.attributes),
);
}
}
if !self.log_record.attributes.is_empty() {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(ATTRIBUTES_KEY))),
path!(ATTRIBUTES_KEY),
kv_list_into_value(self.log_record.attributes),
);
}
if !self.log_record.trace_id.is_empty() {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(TRACE_ID_KEY))),
path!(TRACE_ID_KEY),
Bytes::from(to_hex(&self.log_record.trace_id)),
);
}
if !self.log_record.span_id.is_empty() {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(SPAN_ID_KEY))),
path!(SPAN_ID_KEY),
Bytes::from(to_hex(&self.log_record.span_id)),
);
}
if !self.log_record.severity_text.is_empty() {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(SEVERITY_TEXT_KEY))),
path!(SEVERITY_TEXT_KEY),
self.log_record.severity_text,
);
}
if self.log_record.severity_number != SeverityNumber::Unspecified as i32 {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(SEVERITY_NUMBER_KEY))),
path!(SEVERITY_NUMBER_KEY),
self.log_record.severity_number,
);
}
if self.log_record.flags > 0 {
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(FLAGS_KEY))),
path!(FLAGS_KEY),
self.log_record.flags,
);
}
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(DROPPED_ATTRIBUTES_COUNT_KEY))),
path!(DROPPED_ATTRIBUTES_COUNT_KEY),
self.log_record.dropped_attributes_count,
);
let observed_timestamp = if self.log_record.observed_time_unix_nano > 0 {
Utc.timestamp_nanos(self.log_record.observed_time_unix_nano as i64)
.into()
} else {
Value::Timestamp(now)
};
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
Some(LegacyKey::Overwrite(path!(OBSERVED_TIMESTAMP_KEY))),
path!(OBSERVED_TIMESTAMP_KEY),
observed_timestamp.clone(),
);
let timestamp = if self.log_record.time_unix_nano > 0 {
Utc.timestamp_nanos(self.log_record.time_unix_nano as i64)
.into()
} else {
observed_timestamp
};
log_namespace.insert_source_metadata(
SOURCE_NAME,
&mut log,
log_schema().timestamp_key().map(LegacyKey::Overwrite),
path!("timestamp"),
timestamp,
);
log_namespace.insert_vector_metadata(
&mut log,
log_schema().source_type_key(),
path!("source_type"),
Bytes::from_static(SOURCE_NAME.as_bytes()),
);
if log_namespace == LogNamespace::Vector {
log.metadata_mut()
.value_mut()
.insert(path!("vector", "ingest_timestamp"), now);
}
log.into()
}
}
impl From<SpanEvent> for Value {
fn from(ev: SpanEvent) -> Self {
let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
obj.insert("name".into(), ev.name.into());
obj.insert(
"time_unix_nano".into(),
Value::Timestamp(Utc.timestamp_nanos(ev.time_unix_nano as i64)),
);
obj.insert("attributes".into(), kv_list_into_value(ev.attributes));
obj.insert(
"dropped_attributes_count".into(),
Value::Integer(ev.dropped_attributes_count as i64),
);
Value::Object(obj)
}
}
impl From<Link> for Value {
fn from(link: Link) -> Self {
let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
obj.insert("trace_id".into(), Value::from(to_hex(&link.trace_id)));
obj.insert("span_id".into(), Value::from(to_hex(&link.span_id)));
obj.insert("trace_state".into(), link.trace_state.into());
obj.insert("attributes".into(), kv_list_into_value(link.attributes));
obj.insert(
"dropped_attributes_count".into(),
Value::Integer(link.dropped_attributes_count as i64),
);
Value::Object(obj)
}
}
impl From<SpanStatus> for Value {
fn from(status: SpanStatus) -> Self {
let mut obj: BTreeMap<KeyString, Value> = BTreeMap::new();
obj.insert("message".into(), status.message.into());
obj.insert("code".into(), status.code.into());
Value::Object(obj)
}
}