codecs/decoding/format/
influxdb.rs

1use std::borrow::Cow;
2
3use bytes::Bytes;
4use chrono::DateTime;
5use derivative::Derivative;
6use influxdb_line_protocol::{FieldValue, ParsedLine};
7use smallvec::SmallVec;
8use vector_config::configurable_component;
9use vector_core::config::LogNamespace;
10use vector_core::event::{Event, Metric, MetricKind, MetricTags, MetricValue};
11use vector_core::{config::DataType, schema};
12use vrl::value::kind::Collection;
13use vrl::value::Kind;
14
15use crate::decoding::format::default_lossy;
16
17use super::Deserializer;
18
19/// Config used to build a `InfluxdbDeserializer`.
20/// - [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v1/write_protocols/line_protocol_tutorial/):
21#[configurable_component]
22#[derive(Debug, Clone, Default)]
23pub struct InfluxdbDeserializerConfig {
24    /// Influxdb-specific decoding options.
25    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
26    pub influxdb: InfluxdbDeserializerOptions,
27}
28
29impl InfluxdbDeserializerConfig {
30    /// new constructs a new InfluxdbDeserializerConfig
31    pub fn new(options: InfluxdbDeserializerOptions) -> Self {
32        Self { influxdb: options }
33    }
34
35    /// build constructs a new InfluxdbDeserializer
36    pub fn build(&self) -> InfluxdbDeserializer {
37        Into::<InfluxdbDeserializer>::into(self)
38    }
39
40    /// The output type produced by the deserializer.
41    pub fn output_type(&self) -> DataType {
42        DataType::Metric
43    }
44
45    /// The schema produced by the deserializer.
46    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
47        schema::Definition::new_with_default_metadata(
48            Kind::object(Collection::empty()),
49            [log_namespace],
50        )
51    }
52}
53
54/// Influxdb-specific decoding options.
55#[configurable_component]
56#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
57#[derivative(Default)]
58pub struct InfluxdbDeserializerOptions {
59    /// Determines whether to replace invalid UTF-8 sequences instead of failing.
60    ///
61    /// When true, invalid UTF-8 sequences are replaced with the [`U+FFFD REPLACEMENT CHARACTER`][U+FFFD].
62    ///
63    /// [U+FFFD]: https://en.wikipedia.org/wiki/Specials_(Unicode_block)#Replacement_character
64    #[serde(
65        default = "default_lossy",
66        skip_serializing_if = "vector_core::serde::is_default"
67    )]
68    #[derivative(Default(value = "default_lossy()"))]
69    pub lossy: bool,
70}
71
72/// Deserializer for the influxdb line protocol
73#[derive(Debug, Clone, Derivative)]
74#[derivative(Default)]
75pub struct InfluxdbDeserializer {
76    #[derivative(Default(value = "default_lossy()"))]
77    lossy: bool,
78}
79
80impl InfluxdbDeserializer {
81    /// new constructs a new InfluxdbDeserializer
82    pub fn new(lossy: bool) -> Self {
83        Self { lossy }
84    }
85}
86
87impl Deserializer for InfluxdbDeserializer {
88    fn parse(
89        &self,
90        bytes: Bytes,
91        _log_namespace: LogNamespace,
92    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
93        let line: Cow<str> = match self.lossy {
94            true => String::from_utf8_lossy(&bytes),
95            false => Cow::from(std::str::from_utf8(&bytes)?),
96        };
97        let parsed_line = influxdb_line_protocol::parse_lines(&line);
98
99        let res = parsed_line
100            .collect::<Result<Vec<_>, _>>()?
101            .iter()
102            .flat_map(|line| {
103                let ParsedLine {
104                    series,
105                    field_set,
106                    timestamp,
107                } = line;
108
109                field_set
110                    .iter()
111                    .filter_map(|f| {
112                        let measurement = series.measurement.clone();
113                        let tags = series.tag_set.as_ref();
114                        let val = match f.1 {
115                            FieldValue::I64(v) => v as f64,
116                            FieldValue::U64(v) => v as f64,
117                            FieldValue::F64(v) => v,
118                            FieldValue::Boolean(v) => {
119                                if v {
120                                    1.0
121                                } else {
122                                    0.0
123                                }
124                            }
125                            FieldValue::String(_) => return None, // String values cannot be modelled in our schema
126                        };
127                        Some(Event::Metric(
128                            Metric::new(
129                                format!("{0}_{1}", measurement, f.0),
130                                MetricKind::Absolute,
131                                MetricValue::Gauge { value: val },
132                            )
133                            .with_tags(tags.map(|ts| {
134                                MetricTags::from_iter(
135                                    ts.iter().map(|t| (t.0.to_string(), t.1.to_string())),
136                                )
137                            }))
138                            .with_timestamp(timestamp.map(DateTime::from_timestamp_nanos)),
139                        ))
140                    })
141                    .collect::<Vec<_>>()
142            })
143            .collect();
144
145        Ok(res)
146    }
147}
148
149impl From<&InfluxdbDeserializerConfig> for InfluxdbDeserializer {
150    fn from(config: &InfluxdbDeserializerConfig) -> Self {
151        Self {
152            lossy: config.influxdb.lossy,
153        }
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use bytes::Bytes;
160    use vector_core::{
161        config::LogNamespace,
162        event::{Metric, MetricKind, MetricTags, MetricValue},
163    };
164
165    use crate::decoding::format::{Deserializer, InfluxdbDeserializer};
166
167    #[test]
168    fn deserialize_success() {
169        let deser = InfluxdbDeserializer::new(true);
170        let now = chrono::Utc::now();
171        let now_timestamp_nanos = now.timestamp_nanos_opt().unwrap();
172        let buffer = Bytes::from(format!(
173            "cpu,host=A,region=west usage_system=64i,usage_user=10i {now_timestamp_nanos}"
174        ));
175        let events = deser.parse(buffer, LogNamespace::default()).unwrap();
176        assert_eq!(events.len(), 2);
177
178        assert_eq!(
179            events[0].as_metric(),
180            &Metric::new(
181                "cpu_usage_system",
182                MetricKind::Absolute,
183                MetricValue::Gauge { value: 64. },
184            )
185            .with_tags(Some(MetricTags::from_iter([
186                ("host".to_string(), "A".to_string()),
187                ("region".to_string(), "west".to_string()),
188            ])))
189            .with_timestamp(Some(now))
190        );
191        assert_eq!(
192            events[1].as_metric(),
193            &Metric::new(
194                "cpu_usage_user",
195                MetricKind::Absolute,
196                MetricValue::Gauge { value: 10. },
197            )
198            .with_tags(Some(MetricTags::from_iter([
199                ("host".to_string(), "A".to_string()),
200                ("region".to_string(), "west".to_string()),
201            ])))
202            .with_timestamp(Some(now))
203        );
204    }
205
206    #[test]
207    fn deserialize_error() {
208        let deser = InfluxdbDeserializer::new(true);
209        let buffer = Bytes::from("some invalid string");
210        assert!(deser.parse(buffer, LogNamespace::default()).is_err());
211    }
212}