use std::borrow::Cow;
use bytes::Bytes;
use chrono::DateTime;
use derivative::Derivative;
use influxdb_line_protocol::{FieldValue, ParsedLine};
use smallvec::SmallVec;
use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::event::{Event, Metric, MetricKind, MetricTags, MetricValue};
use vector_core::{config::DataType, schema};
use vrl::value::kind::Collection;
use vrl::value::Kind;
use crate::decoding::format::default_lossy;
use super::Deserializer;
#[configurable_component]
#[derive(Debug, Clone, Default)]
pub struct InfluxdbDeserializerConfig {
#[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
pub influxdb: InfluxdbDeserializerOptions,
}
impl InfluxdbDeserializerConfig {
pub fn new(options: InfluxdbDeserializerOptions) -> Self {
Self { influxdb: options }
}
pub fn build(&self) -> InfluxdbDeserializer {
Into::<InfluxdbDeserializer>::into(self)
}
pub fn output_type(&self) -> DataType {
DataType::Metric
}
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
schema::Definition::new_with_default_metadata(
Kind::object(Collection::empty()),
[log_namespace],
)
}
}
#[configurable_component]
#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
#[derivative(Default)]
pub struct InfluxdbDeserializerOptions {
#[serde(
default = "default_lossy",
skip_serializing_if = "vector_core::serde::is_default"
)]
#[derivative(Default(value = "default_lossy()"))]
pub lossy: bool,
}
#[derive(Debug, Clone, Derivative)]
#[derivative(Default)]
pub struct InfluxdbDeserializer {
#[derivative(Default(value = "default_lossy()"))]
lossy: bool,
}
impl InfluxdbDeserializer {
pub fn new(lossy: bool) -> Self {
Self { lossy }
}
}
impl Deserializer for InfluxdbDeserializer {
fn parse(
&self,
bytes: Bytes,
_log_namespace: LogNamespace,
) -> vector_common::Result<SmallVec<[Event; 1]>> {
let line: Cow<str> = match self.lossy {
true => String::from_utf8_lossy(&bytes),
false => Cow::from(std::str::from_utf8(&bytes)?),
};
let parsed_line = influxdb_line_protocol::parse_lines(&line);
let res = parsed_line
.collect::<Result<Vec<_>, _>>()?
.iter()
.flat_map(|line| {
let ParsedLine {
series,
field_set,
timestamp,
} = line;
field_set
.iter()
.filter_map(|f| {
let measurement = series.measurement.clone();
let tags = series.tag_set.as_ref();
let val = match f.1 {
FieldValue::I64(v) => v as f64,
FieldValue::U64(v) => v as f64,
FieldValue::F64(v) => v,
FieldValue::Boolean(v) => {
if v {
1.0
} else {
0.0
}
}
FieldValue::String(_) => return None, };
Some(Event::Metric(
Metric::new(
format!("{0}_{1}", measurement, f.0),
MetricKind::Absolute,
MetricValue::Gauge { value: val },
)
.with_tags(tags.map(|ts| {
MetricTags::from_iter(
ts.iter().map(|t| (t.0.to_string(), t.1.to_string())),
)
}))
.with_timestamp(timestamp.map(DateTime::from_timestamp_nanos)),
))
})
.collect::<Vec<_>>()
})
.collect();
Ok(res)
}
}
impl From<&InfluxdbDeserializerConfig> for InfluxdbDeserializer {
fn from(config: &InfluxdbDeserializerConfig) -> Self {
Self {
lossy: config.influxdb.lossy,
}
}
}
#[cfg(test)]
mod tests {
use bytes::Bytes;
use vector_core::{
config::LogNamespace,
event::{Metric, MetricKind, MetricTags, MetricValue},
};
use crate::decoding::format::{Deserializer, InfluxdbDeserializer};
#[test]
fn deserialize_success() {
let deser = InfluxdbDeserializer::new(true);
let now = chrono::Utc::now();
let now_timestamp_nanos = now.timestamp_nanos_opt().unwrap();
let buffer = Bytes::from(format!(
"cpu,host=A,region=west usage_system=64i,usage_user=10i {now_timestamp_nanos}"
));
let events = deser.parse(buffer, LogNamespace::default()).unwrap();
assert_eq!(events.len(), 2);
assert_eq!(
events[0].as_metric(),
&Metric::new(
"cpu_usage_system",
MetricKind::Absolute,
MetricValue::Gauge { value: 64. },
)
.with_tags(Some(MetricTags::from_iter([
("host".to_string(), "A".to_string()),
("region".to_string(), "west".to_string()),
])))
.with_timestamp(Some(now))
);
assert_eq!(
events[1].as_metric(),
&Metric::new(
"cpu_usage_user",
MetricKind::Absolute,
MetricValue::Gauge { value: 10. },
)
.with_tags(Some(MetricTags::from_iter([
("host".to_string(), "A".to_string()),
("region".to_string(), "west".to_string()),
])))
.with_timestamp(Some(now))
);
}
#[test]
fn deserialize_error() {
let deser = InfluxdbDeserializer::new(true);
let buffer = Bytes::from("some invalid string");
assert!(deser.parse(buffer, LogNamespace::default()).is_err());
}
}