codecs/decoding/format/
influxdb.rs1use std::borrow::Cow;
2
3use bytes::Bytes;
4use chrono::DateTime;
5use derivative::Derivative;
6use influxdb_line_protocol::{FieldValue, ParsedLine};
7use smallvec::SmallVec;
8use vector_config::configurable_component;
9use vector_core::config::LogNamespace;
10use vector_core::event::{Event, Metric, MetricKind, MetricTags, MetricValue};
11use vector_core::{config::DataType, schema};
12use vrl::value::kind::Collection;
13use vrl::value::Kind;
14
15use crate::decoding::format::default_lossy;
16
17use super::Deserializer;
18
19#[configurable_component]
22#[derive(Debug, Clone, Default)]
23pub struct InfluxdbDeserializerConfig {
24 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
26 pub influxdb: InfluxdbDeserializerOptions,
27}
28
29impl InfluxdbDeserializerConfig {
30 pub fn new(options: InfluxdbDeserializerOptions) -> Self {
32 Self { influxdb: options }
33 }
34
35 pub fn build(&self) -> InfluxdbDeserializer {
37 Into::<InfluxdbDeserializer>::into(self)
38 }
39
40 pub fn output_type(&self) -> DataType {
42 DataType::Metric
43 }
44
45 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
47 schema::Definition::new_with_default_metadata(
48 Kind::object(Collection::empty()),
49 [log_namespace],
50 )
51 }
52}
53
54#[configurable_component]
56#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
57#[derivative(Default)]
58pub struct InfluxdbDeserializerOptions {
59 #[serde(
65 default = "default_lossy",
66 skip_serializing_if = "vector_core::serde::is_default"
67 )]
68 #[derivative(Default(value = "default_lossy()"))]
69 pub lossy: bool,
70}
71
72#[derive(Debug, Clone, Derivative)]
74#[derivative(Default)]
75pub struct InfluxdbDeserializer {
76 #[derivative(Default(value = "default_lossy()"))]
77 lossy: bool,
78}
79
80impl InfluxdbDeserializer {
81 pub fn new(lossy: bool) -> Self {
83 Self { lossy }
84 }
85}
86
87impl Deserializer for InfluxdbDeserializer {
88 fn parse(
89 &self,
90 bytes: Bytes,
91 _log_namespace: LogNamespace,
92 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
93 let line: Cow<str> = match self.lossy {
94 true => String::from_utf8_lossy(&bytes),
95 false => Cow::from(std::str::from_utf8(&bytes)?),
96 };
97 let parsed_line = influxdb_line_protocol::parse_lines(&line);
98
99 let res = parsed_line
100 .collect::<Result<Vec<_>, _>>()?
101 .iter()
102 .flat_map(|line| {
103 let ParsedLine {
104 series,
105 field_set,
106 timestamp,
107 } = line;
108
109 field_set
110 .iter()
111 .filter_map(|f| {
112 let measurement = series.measurement.clone();
113 let tags = series.tag_set.as_ref();
114 let val = match f.1 {
115 FieldValue::I64(v) => v as f64,
116 FieldValue::U64(v) => v as f64,
117 FieldValue::F64(v) => v,
118 FieldValue::Boolean(v) => {
119 if v {
120 1.0
121 } else {
122 0.0
123 }
124 }
125 FieldValue::String(_) => return None, };
127 Some(Event::Metric(
128 Metric::new(
129 format!("{0}_{1}", measurement, f.0),
130 MetricKind::Absolute,
131 MetricValue::Gauge { value: val },
132 )
133 .with_tags(tags.map(|ts| {
134 MetricTags::from_iter(
135 ts.iter().map(|t| (t.0.to_string(), t.1.to_string())),
136 )
137 }))
138 .with_timestamp(timestamp.map(DateTime::from_timestamp_nanos)),
139 ))
140 })
141 .collect::<Vec<_>>()
142 })
143 .collect();
144
145 Ok(res)
146 }
147}
148
149impl From<&InfluxdbDeserializerConfig> for InfluxdbDeserializer {
150 fn from(config: &InfluxdbDeserializerConfig) -> Self {
151 Self {
152 lossy: config.influxdb.lossy,
153 }
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use bytes::Bytes;
160 use vector_core::{
161 config::LogNamespace,
162 event::{Metric, MetricKind, MetricTags, MetricValue},
163 };
164
165 use crate::decoding::format::{Deserializer, InfluxdbDeserializer};
166
167 #[test]
168 fn deserialize_success() {
169 let deser = InfluxdbDeserializer::new(true);
170 let now = chrono::Utc::now();
171 let now_timestamp_nanos = now.timestamp_nanos_opt().unwrap();
172 let buffer = Bytes::from(format!(
173 "cpu,host=A,region=west usage_system=64i,usage_user=10i {now_timestamp_nanos}"
174 ));
175 let events = deser.parse(buffer, LogNamespace::default()).unwrap();
176 assert_eq!(events.len(), 2);
177
178 assert_eq!(
179 events[0].as_metric(),
180 &Metric::new(
181 "cpu_usage_system",
182 MetricKind::Absolute,
183 MetricValue::Gauge { value: 64. },
184 )
185 .with_tags(Some(MetricTags::from_iter([
186 ("host".to_string(), "A".to_string()),
187 ("region".to_string(), "west".to_string()),
188 ])))
189 .with_timestamp(Some(now))
190 );
191 assert_eq!(
192 events[1].as_metric(),
193 &Metric::new(
194 "cpu_usage_user",
195 MetricKind::Absolute,
196 MetricValue::Gauge { value: 10. },
197 )
198 .with_tags(Some(MetricTags::from_iter([
199 ("host".to_string(), "A".to_string()),
200 ("region".to_string(), "west".to_string()),
201 ])))
202 .with_timestamp(Some(now))
203 );
204 }
205
206 #[test]
207 fn deserialize_error() {
208 let deser = InfluxdbDeserializer::new(true);
209 let buffer = Bytes::from("some invalid string");
210 assert!(deser.parse(buffer, LogNamespace::default()).is_err());
211 }
212}