vrl/stdlib/
parse_influxdb.rs

1use std::collections::BTreeMap;
2
3use chrono::DateTime;
4use influxdb_line_protocol::{FieldValue, ParsedLine};
5
6use crate::compiler::prelude::*;
7use crate::{btreemap, value};
8
9#[allow(clippy::cast_precision_loss)] //TODO evaluate removal options
10fn influxdb_line_to_metrics(line: ParsedLine) -> Result<Vec<ObjectMap>, ExpressionError> {
11    let ParsedLine {
12        series,
13        field_set,
14        timestamp,
15    } = line;
16
17    let timestamp = timestamp.map(DateTime::from_timestamp_nanos);
18
19    let tags: Option<ObjectMap> = series.tag_set.as_ref().map(|tags| {
20        tags.iter()
21            .map(|t| (t.0.to_string().into(), t.1.to_string().into()))
22            .collect()
23    });
24
25    field_set
26        .into_iter()
27        .map(|f| {
28            let mut metric = ObjectMap::new();
29            let measurement = &series.measurement;
30            let field_key = f.0.to_string();
31            let field_value = match f.1 {
32                FieldValue::I64(v) => v as f64,
33                FieldValue::U64(v) => v as f64,
34                FieldValue::F64(v) => v,
35                FieldValue::Boolean(v) => {
36                    if v {
37                        1.0
38                    } else {
39                        0.0
40                    }
41                }
42                FieldValue::String(_) => {
43                    return Err(Error::StringFieldSetValuesNotSupported.into());
44                }
45            };
46
47            // `influxdb_line_protocol` crate seems to not allow NaN float values while parsing
48            // field values and this case should not happen, but just in case, we should
49            // handle it.
50            let Ok(field_value) = NotNan::new(field_value) else {
51                return Err(Error::NaNFieldSetValuesNotSupported.into());
52            };
53
54            let metric_name = format!("{measurement}_{field_key}");
55            metric.insert("name".into(), metric_name.into());
56
57            if let Some(tags) = tags.as_ref() {
58                metric.insert("tags".into(), tags.clone().into());
59            }
60
61            if let Some(timestamp) = timestamp {
62                metric.insert("timestamp".into(), timestamp.into());
63            }
64
65            metric.insert("kind".into(), "absolute".into());
66
67            let gauge_object = value!({
68                value: field_value
69            });
70            metric.insert("gauge".into(), gauge_object);
71
72            Ok(metric)
73        })
74        .collect()
75}
76
77#[derive(Debug, Clone, thiserror::Error)]
78enum Error {
79    #[error("field set values of type string are not supported")]
80    StringFieldSetValuesNotSupported,
81    #[error("NaN field set values are not supported")]
82    NaNFieldSetValuesNotSupported,
83}
84
85impl From<Error> for ExpressionError {
86    fn from(error: Error) -> Self {
87        Self::Error {
88            message: format!(
89                "Error while converting InfluxDB line protocol metric to Vector's metric model: {error}"
90            ),
91            labels: vec![],
92            notes: vec![],
93        }
94    }
95}
96
97fn parse_influxdb(bytes: Value) -> Resolved {
98    let bytes = bytes.try_bytes()?;
99    let line = String::from_utf8_lossy(&bytes);
100    let parsed_line = influxdb_line_protocol::parse_lines(&line);
101
102    let metrics = parsed_line
103        .into_iter()
104        .map(|line_result| line_result.map_err(ExpressionError::from))
105        .map(|line_result| line_result.and_then(influxdb_line_to_metrics))
106        .collect::<Result<Vec<_>, _>>()?
107        .into_iter()
108        .flatten()
109        .map(Value::from)
110        .collect();
111
112    Ok(Value::Array(metrics))
113}
114
115impl From<influxdb_line_protocol::Error> for ExpressionError {
116    fn from(error: influxdb_line_protocol::Error) -> Self {
117        Self::Error {
118            message: format!("InfluxDB line protocol parsing error: {error}"),
119            labels: vec![],
120            notes: vec![],
121        }
122    }
123}
124
125#[derive(Clone, Copy, Debug)]
126pub struct ParseInfluxDB;
127
128impl Function for ParseInfluxDB {
129    fn identifier(&self) -> &'static str {
130        "parse_influxdb"
131    }
132
133    fn summary(&self) -> &'static str {
134        "parse an InfluxDB line protocol string into a list of vector-compatible metrics"
135    }
136
137    fn usage(&self) -> &'static str {
138        "Parses the `value` as an [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/) string, producing a list of Vector-compatible metrics."
139    }
140
141    fn category(&self) -> &'static str {
142        Category::Parse.as_ref()
143    }
144
145    fn internal_failure_reasons(&self) -> &'static [&'static str] {
146        &[
147            "`value` is not a valid InfluxDB line protocol string.",
148            "field set contains a field value of type `string`.",
149            "field set contains a `NaN` field value.",
150        ]
151    }
152
153    fn return_kind(&self) -> u16 {
154        kind::ARRAY
155    }
156
157    fn notices(&self) -> &'static [&'static str] {
158        &[
159            indoc! {"
160                This function will return a log event with the shape of a Vector-compatible metric,
161                but not a metric event itself. You will likely want to pipe the output of this
162                function through a `log_to_metric` transform with the option `all_metrics` set to
163                `true` to convert the metric-shaped log events to metric events so _real_ metrics
164                are produced.
165            "},
166            indoc! {"
167                The only metric type that is produced is a `gauge`. Each metric name is prefixed
168                with the `measurement` field, followed by an underscore (`_`), and then the
169                `field key` field.
170            "},
171            indoc! {"
172                `string` is the only type that is not supported as a field value, due to limitations
173                of Vector's metric model.
174            "},
175        ]
176    }
177
178    fn parameters(&self) -> &'static [Parameter] {
179        const PARAMETERS: &[Parameter] = &[Parameter::required(
180            "value",
181            kind::BYTES,
182            "The string representation of the InfluxDB line protocol to parse.",
183        )];
184        PARAMETERS
185    }
186
187    fn examples(&self) -> &'static [Example] {
188        &[example! {
189            title: "Parse InfluxDB line protocol",
190            source: r#"parse_influxdb!("cpu,host=A,region=us-west usage_system=64i,usage_user=10u,temperature=50.5,on=true,sleep=false 1590488773254420000")"#,
191            result: Ok(indoc! {r#"
192                [
193                    {
194                        "name": "cpu_usage_system",
195                        "tags": {
196                            "host": "A",
197                            "region": "us-west"
198                        },
199                        "timestamp": "2020-05-26T10:26:13.254420Z",
200                        "kind": "absolute",
201                        "gauge": {
202                            "value": 64.0
203                        }
204                    },
205                    {
206                        "name": "cpu_usage_user",
207                        "tags": {
208                            "host": "A",
209                            "region": "us-west"
210                        },
211                        "timestamp": "2020-05-26T10:26:13.254420Z",
212                        "kind": "absolute",
213                        "gauge": {
214                            "value": 10.0
215                        }
216                    },
217                    {
218                        "name": "cpu_temperature",
219                        "tags": {
220                            "host": "A",
221                            "region": "us-west"
222                        },
223                        "timestamp": "2020-05-26T10:26:13.254420Z",
224                        "kind": "absolute",
225                        "gauge": {
226                            "value": 50.5
227                        }
228                    },
229                    {
230                        "name": "cpu_on",
231                        "tags": {
232                            "host": "A",
233                            "region": "us-west"
234                        },
235                        "timestamp": "2020-05-26T10:26:13.254420Z",
236                        "kind": "absolute",
237                        "gauge": {
238                            "value": 1.0
239                        }
240                    },
241                    {
242                        "name": "cpu_sleep",
243                        "tags": {
244                            "host": "A",
245                            "region": "us-west"
246                        },
247                        "timestamp": "2020-05-26T10:26:13.254420Z",
248                        "kind": "absolute",
249                        "gauge": {
250                            "value": 0.0
251                        }
252                    }
253                ]
254            "#}),
255        }]
256    }
257
258    fn compile(
259        &self,
260        _state: &state::TypeState,
261        _ctx: &mut FunctionCompileContext,
262        arguments: ArgumentList,
263    ) -> Compiled {
264        let value = arguments.required("value");
265
266        Ok(ParseInfluxDBFn { value }.as_expr())
267    }
268}
269
270#[derive(Clone, Debug)]
271struct ParseInfluxDBFn {
272    value: Box<dyn Expression>,
273}
274
275impl FunctionExpression for ParseInfluxDBFn {
276    fn resolve(&self, ctx: &mut Context) -> Resolved {
277        let value = self.value.resolve(ctx)?;
278
279        parse_influxdb(value)
280    }
281
282    fn type_def(&self, _: &state::TypeState) -> TypeDef {
283        type_def()
284    }
285}
286
287fn tags_kind() -> Kind {
288    Kind::object(Collection::from_unknown(Kind::bytes())) | Kind::null()
289}
290
291fn gauge_kind() -> Kind {
292    Kind::object(btreemap! {
293        "value" => Kind::float(),
294    })
295}
296
297fn metric_kind() -> BTreeMap<Field, Kind> {
298    btreemap! {
299        "name" => Kind::bytes(),
300        "tags" => tags_kind(),
301        "timestamp" => Kind::timestamp() | Kind::null(),
302        "kind" => Kind::bytes(),
303        "gauge" => gauge_kind(),
304    }
305}
306
307fn inner_kind() -> Kind {
308    Kind::object(metric_kind())
309}
310
311fn type_def() -> TypeDef {
312    TypeDef::array(Collection::from_unknown(inner_kind())).fallible()
313}
314
315#[cfg(test)]
316mod test {
317    use super::*;
318    use crate::btreemap;
319
320    test_function![
321        parse_influxdb => ParseInfluxDB;
322
323        influxdb_valid {
324            args: func_args![ value: "cpu,host=A,region=us-west usage_system=64i,usage_user=10u,temperature=50.5,on=true,sleep=false 1590488773254420000" ],
325            want: Ok(Value::from(vec![
326                Value::from(btreemap! {
327                    "name" => "cpu_usage_system",
328                    "tags" => btreemap! {
329                        "host" => "A",
330                        "region" => "us-west",
331                    },
332                    "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
333                    "kind" => "absolute",
334                    "gauge" => btreemap! {
335                        "value" => 64.0,
336                    },
337                }),
338                Value::from(btreemap! {
339                    "name" => "cpu_usage_user",
340                    "tags" => btreemap! {
341                        "host" => "A",
342                        "region" => "us-west",
343                    },
344                    "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
345                    "kind" => "absolute",
346                    "gauge" => btreemap! {
347                        "value" => 10.0,
348                    },
349                }),
350                Value::from(btreemap! {
351                    "name" => "cpu_temperature",
352                    "tags" => btreemap! {
353                        "host" => "A",
354                        "region" => "us-west",
355                    },
356                    "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
357                    "kind" => "absolute",
358                    "gauge" => btreemap! {
359                        "value" => 50.5,
360                    },
361                }),
362                Value::from(btreemap! {
363                    "name" => "cpu_on",
364                    "tags" => btreemap! {
365                        "host" => "A",
366                        "region" => "us-west",
367                    },
368                    "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
369                    "kind" => "absolute",
370                    "gauge" => btreemap! {
371                        "value" => 1.0,
372                    },
373                }),
374                Value::from(btreemap! {
375                    "name" => "cpu_sleep",
376                    "tags" => btreemap! {
377                        "host" => "A",
378                        "region" => "us-west",
379                    },
380                    "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
381                    "kind" => "absolute",
382                    "gauge" => btreemap! {
383                        "value" => 0.0,
384                    },
385                }),
386            ])),
387            tdef: type_def(),
388        }
389
390
391        influxdb_valid_no_timestamp {
392            args: func_args![ value: "cpu,host=A,region=us-west usage_system=64i,usage_user=10i" ],
393            want: Ok(Value::from(vec![
394                Value::from(btreemap! {
395                    "name" => "cpu_usage_system",
396                    "tags" => btreemap! {
397                        "host" => "A",
398                        "region" => "us-west",
399                    },
400                    "kind" => "absolute",
401                    "gauge" => btreemap! {
402                        "value" => 64.0,
403                    },
404                }),
405                Value::from(btreemap! {
406                    "name" => "cpu_usage_user",
407                    "tags" => btreemap! {
408                        "host" => "A",
409                        "region" => "us-west",
410                    },
411                    "kind" => "absolute",
412                    "gauge" => btreemap! {
413                        "value" => 10.0,
414                    },
415                }),
416            ])),
417            tdef: type_def(),
418        }
419
420        influxdb_valid_no_tags {
421            args: func_args![ value: "cpu usage_system=64i,usage_user=10i 1590488773254420000" ],
422            want: Ok(Value::from(vec![
423                Value::from(btreemap! {
424                    "name" => "cpu_usage_system",
425                    "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
426                    "kind" => "absolute",
427                    "gauge" => btreemap! {
428                        "value" => 64.0,
429                    },
430                }),
431                Value::from(btreemap! {
432                    "name" => "cpu_usage_user",
433                    "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
434                    "kind" => "absolute",
435                    "gauge" => btreemap! {
436                        "value" => 10.0,
437                    },
438                }),
439            ])),
440            tdef: type_def(),
441        }
442
443        influxdb_valid_no_tags_no_timestamp {
444            args: func_args![ value: "cpu usage_system=64i,usage_user=10i" ],
445            want: Ok(Value::from(vec![
446                Value::from(btreemap! {
447                    "name" => "cpu_usage_system",
448                    "kind" => "absolute",
449                    "gauge" => btreemap! {
450                        "value" => 64.0,
451                    },
452                }),
453                Value::from(btreemap! {
454                    "name" => "cpu_usage_user",
455                    "kind" => "absolute",
456                    "gauge" => btreemap! {
457                        "value" => 10.0,
458                    },
459                }),
460            ])),
461            tdef: type_def(),
462        }
463
464        influxdb_invalid_string_field_set_value {
465            args: func_args![ value: r#"valid foo="bar""# ],
466            want: Err("Error while converting InfluxDB line protocol metric to Vector's metric model: field set values of type string are not supported"),
467            tdef: type_def(),
468        }
469
470        influxdb_invalid_no_fields{
471            args: func_args![ value: "cpu " ],
472            want: Err("InfluxDB line protocol parsing error: No fields were provided"),
473            tdef: type_def(),
474        }
475    ];
476}