1use std::collections::BTreeMap;
2
3use chrono::DateTime;
4use influxdb_line_protocol::{FieldValue, ParsedLine};
5
6use crate::compiler::prelude::*;
7use crate::{btreemap, value};
8
9#[allow(clippy::cast_precision_loss)] fn influxdb_line_to_metrics(line: ParsedLine) -> Result<Vec<ObjectMap>, ExpressionError> {
11 let ParsedLine {
12 series,
13 field_set,
14 timestamp,
15 } = line;
16
17 let timestamp = timestamp.map(DateTime::from_timestamp_nanos);
18
19 let tags: Option<ObjectMap> = series.tag_set.as_ref().map(|tags| {
20 tags.iter()
21 .map(|t| (t.0.to_string().into(), t.1.to_string().into()))
22 .collect()
23 });
24
25 field_set
26 .into_iter()
27 .map(|f| {
28 let mut metric = ObjectMap::new();
29 let measurement = &series.measurement;
30 let field_key = f.0.to_string();
31 let field_value = match f.1 {
32 FieldValue::I64(v) => v as f64,
33 FieldValue::U64(v) => v as f64,
34 FieldValue::F64(v) => v,
35 FieldValue::Boolean(v) => {
36 if v {
37 1.0
38 } else {
39 0.0
40 }
41 }
42 FieldValue::String(_) => {
43 return Err(Error::StringFieldSetValuesNotSupported.into());
44 }
45 };
46
47 let Ok(field_value) = NotNan::new(field_value) else {
51 return Err(Error::NaNFieldSetValuesNotSupported.into());
52 };
53
54 let metric_name = format!("{measurement}_{field_key}");
55 metric.insert("name".into(), metric_name.into());
56
57 if let Some(tags) = tags.as_ref() {
58 metric.insert("tags".into(), tags.clone().into());
59 }
60
61 if let Some(timestamp) = timestamp {
62 metric.insert("timestamp".into(), timestamp.into());
63 }
64
65 metric.insert("kind".into(), "absolute".into());
66
67 let gauge_object = value!({
68 value: field_value
69 });
70 metric.insert("gauge".into(), gauge_object);
71
72 Ok(metric)
73 })
74 .collect()
75}
76
77#[derive(Debug, Clone, thiserror::Error)]
78enum Error {
79 #[error("field set values of type string are not supported")]
80 StringFieldSetValuesNotSupported,
81 #[error("NaN field set values are not supported")]
82 NaNFieldSetValuesNotSupported,
83}
84
85impl From<Error> for ExpressionError {
86 fn from(error: Error) -> Self {
87 Self::Error {
88 message: format!(
89 "Error while converting InfluxDB line protocol metric to Vector's metric model: {error}"
90 ),
91 labels: vec![],
92 notes: vec![],
93 }
94 }
95}
96
97fn parse_influxdb(bytes: Value) -> Resolved {
98 let bytes = bytes.try_bytes()?;
99 let line = String::from_utf8_lossy(&bytes);
100 let parsed_line = influxdb_line_protocol::parse_lines(&line);
101
102 let metrics = parsed_line
103 .into_iter()
104 .map(|line_result| line_result.map_err(ExpressionError::from))
105 .map(|line_result| line_result.and_then(influxdb_line_to_metrics))
106 .collect::<Result<Vec<_>, _>>()?
107 .into_iter()
108 .flatten()
109 .map(Value::from)
110 .collect();
111
112 Ok(Value::Array(metrics))
113}
114
115impl From<influxdb_line_protocol::Error> for ExpressionError {
116 fn from(error: influxdb_line_protocol::Error) -> Self {
117 Self::Error {
118 message: format!("InfluxDB line protocol parsing error: {error}"),
119 labels: vec![],
120 notes: vec![],
121 }
122 }
123}
124
125#[derive(Clone, Copy, Debug)]
126pub struct ParseInfluxDB;
127
128impl Function for ParseInfluxDB {
129 fn identifier(&self) -> &'static str {
130 "parse_influxdb"
131 }
132
133 fn summary(&self) -> &'static str {
134 "parse an InfluxDB line protocol string into a list of vector-compatible metrics"
135 }
136
137 fn usage(&self) -> &'static str {
138 "Parses the `value` as an [InfluxDB line protocol](https://docs.influxdata.com/influxdb/cloud/reference/syntax/line-protocol/) string, producing a list of Vector-compatible metrics."
139 }
140
141 fn category(&self) -> &'static str {
142 Category::Parse.as_ref()
143 }
144
145 fn internal_failure_reasons(&self) -> &'static [&'static str] {
146 &[
147 "`value` is not a valid InfluxDB line protocol string.",
148 "field set contains a field value of type `string`.",
149 "field set contains a `NaN` field value.",
150 ]
151 }
152
153 fn return_kind(&self) -> u16 {
154 kind::ARRAY
155 }
156
157 fn notices(&self) -> &'static [&'static str] {
158 &[
159 indoc! {"
160 This function will return a log event with the shape of a Vector-compatible metric,
161 but not a metric event itself. You will likely want to pipe the output of this
162 function through a `log_to_metric` transform with the option `all_metrics` set to
163 `true` to convert the metric-shaped log events to metric events so _real_ metrics
164 are produced.
165 "},
166 indoc! {"
167 The only metric type that is produced is a `gauge`. Each metric name is prefixed
168 with the `measurement` field, followed by an underscore (`_`), and then the
169 `field key` field.
170 "},
171 indoc! {"
172 `string` is the only type that is not supported as a field value, due to limitations
173 of Vector's metric model.
174 "},
175 ]
176 }
177
178 fn parameters(&self) -> &'static [Parameter] {
179 const PARAMETERS: &[Parameter] = &[Parameter::required(
180 "value",
181 kind::BYTES,
182 "The string representation of the InfluxDB line protocol to parse.",
183 )];
184 PARAMETERS
185 }
186
187 fn examples(&self) -> &'static [Example] {
188 &[example! {
189 title: "Parse InfluxDB line protocol",
190 source: r#"parse_influxdb!("cpu,host=A,region=us-west usage_system=64i,usage_user=10u,temperature=50.5,on=true,sleep=false 1590488773254420000")"#,
191 result: Ok(indoc! {r#"
192 [
193 {
194 "name": "cpu_usage_system",
195 "tags": {
196 "host": "A",
197 "region": "us-west"
198 },
199 "timestamp": "2020-05-26T10:26:13.254420Z",
200 "kind": "absolute",
201 "gauge": {
202 "value": 64.0
203 }
204 },
205 {
206 "name": "cpu_usage_user",
207 "tags": {
208 "host": "A",
209 "region": "us-west"
210 },
211 "timestamp": "2020-05-26T10:26:13.254420Z",
212 "kind": "absolute",
213 "gauge": {
214 "value": 10.0
215 }
216 },
217 {
218 "name": "cpu_temperature",
219 "tags": {
220 "host": "A",
221 "region": "us-west"
222 },
223 "timestamp": "2020-05-26T10:26:13.254420Z",
224 "kind": "absolute",
225 "gauge": {
226 "value": 50.5
227 }
228 },
229 {
230 "name": "cpu_on",
231 "tags": {
232 "host": "A",
233 "region": "us-west"
234 },
235 "timestamp": "2020-05-26T10:26:13.254420Z",
236 "kind": "absolute",
237 "gauge": {
238 "value": 1.0
239 }
240 },
241 {
242 "name": "cpu_sleep",
243 "tags": {
244 "host": "A",
245 "region": "us-west"
246 },
247 "timestamp": "2020-05-26T10:26:13.254420Z",
248 "kind": "absolute",
249 "gauge": {
250 "value": 0.0
251 }
252 }
253 ]
254 "#}),
255 }]
256 }
257
258 fn compile(
259 &self,
260 _state: &state::TypeState,
261 _ctx: &mut FunctionCompileContext,
262 arguments: ArgumentList,
263 ) -> Compiled {
264 let value = arguments.required("value");
265
266 Ok(ParseInfluxDBFn { value }.as_expr())
267 }
268}
269
270#[derive(Clone, Debug)]
271struct ParseInfluxDBFn {
272 value: Box<dyn Expression>,
273}
274
275impl FunctionExpression for ParseInfluxDBFn {
276 fn resolve(&self, ctx: &mut Context) -> Resolved {
277 let value = self.value.resolve(ctx)?;
278
279 parse_influxdb(value)
280 }
281
282 fn type_def(&self, _: &state::TypeState) -> TypeDef {
283 type_def()
284 }
285}
286
287fn tags_kind() -> Kind {
288 Kind::object(Collection::from_unknown(Kind::bytes())) | Kind::null()
289}
290
291fn gauge_kind() -> Kind {
292 Kind::object(btreemap! {
293 "value" => Kind::float(),
294 })
295}
296
297fn metric_kind() -> BTreeMap<Field, Kind> {
298 btreemap! {
299 "name" => Kind::bytes(),
300 "tags" => tags_kind(),
301 "timestamp" => Kind::timestamp() | Kind::null(),
302 "kind" => Kind::bytes(),
303 "gauge" => gauge_kind(),
304 }
305}
306
307fn inner_kind() -> Kind {
308 Kind::object(metric_kind())
309}
310
311fn type_def() -> TypeDef {
312 TypeDef::array(Collection::from_unknown(inner_kind())).fallible()
313}
314
315#[cfg(test)]
316mod test {
317 use super::*;
318 use crate::btreemap;
319
320 test_function![
321 parse_influxdb => ParseInfluxDB;
322
323 influxdb_valid {
324 args: func_args![ value: "cpu,host=A,region=us-west usage_system=64i,usage_user=10u,temperature=50.5,on=true,sleep=false 1590488773254420000" ],
325 want: Ok(Value::from(vec![
326 Value::from(btreemap! {
327 "name" => "cpu_usage_system",
328 "tags" => btreemap! {
329 "host" => "A",
330 "region" => "us-west",
331 },
332 "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
333 "kind" => "absolute",
334 "gauge" => btreemap! {
335 "value" => 64.0,
336 },
337 }),
338 Value::from(btreemap! {
339 "name" => "cpu_usage_user",
340 "tags" => btreemap! {
341 "host" => "A",
342 "region" => "us-west",
343 },
344 "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
345 "kind" => "absolute",
346 "gauge" => btreemap! {
347 "value" => 10.0,
348 },
349 }),
350 Value::from(btreemap! {
351 "name" => "cpu_temperature",
352 "tags" => btreemap! {
353 "host" => "A",
354 "region" => "us-west",
355 },
356 "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
357 "kind" => "absolute",
358 "gauge" => btreemap! {
359 "value" => 50.5,
360 },
361 }),
362 Value::from(btreemap! {
363 "name" => "cpu_on",
364 "tags" => btreemap! {
365 "host" => "A",
366 "region" => "us-west",
367 },
368 "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
369 "kind" => "absolute",
370 "gauge" => btreemap! {
371 "value" => 1.0,
372 },
373 }),
374 Value::from(btreemap! {
375 "name" => "cpu_sleep",
376 "tags" => btreemap! {
377 "host" => "A",
378 "region" => "us-west",
379 },
380 "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
381 "kind" => "absolute",
382 "gauge" => btreemap! {
383 "value" => 0.0,
384 },
385 }),
386 ])),
387 tdef: type_def(),
388 }
389
390
391 influxdb_valid_no_timestamp {
392 args: func_args![ value: "cpu,host=A,region=us-west usage_system=64i,usage_user=10i" ],
393 want: Ok(Value::from(vec![
394 Value::from(btreemap! {
395 "name" => "cpu_usage_system",
396 "tags" => btreemap! {
397 "host" => "A",
398 "region" => "us-west",
399 },
400 "kind" => "absolute",
401 "gauge" => btreemap! {
402 "value" => 64.0,
403 },
404 }),
405 Value::from(btreemap! {
406 "name" => "cpu_usage_user",
407 "tags" => btreemap! {
408 "host" => "A",
409 "region" => "us-west",
410 },
411 "kind" => "absolute",
412 "gauge" => btreemap! {
413 "value" => 10.0,
414 },
415 }),
416 ])),
417 tdef: type_def(),
418 }
419
420 influxdb_valid_no_tags {
421 args: func_args![ value: "cpu usage_system=64i,usage_user=10i 1590488773254420000" ],
422 want: Ok(Value::from(vec![
423 Value::from(btreemap! {
424 "name" => "cpu_usage_system",
425 "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
426 "kind" => "absolute",
427 "gauge" => btreemap! {
428 "value" => 64.0,
429 },
430 }),
431 Value::from(btreemap! {
432 "name" => "cpu_usage_user",
433 "timestamp" => DateTime::from_timestamp_nanos(1_590_488_773_254_420_000),
434 "kind" => "absolute",
435 "gauge" => btreemap! {
436 "value" => 10.0,
437 },
438 }),
439 ])),
440 tdef: type_def(),
441 }
442
443 influxdb_valid_no_tags_no_timestamp {
444 args: func_args![ value: "cpu usage_system=64i,usage_user=10i" ],
445 want: Ok(Value::from(vec![
446 Value::from(btreemap! {
447 "name" => "cpu_usage_system",
448 "kind" => "absolute",
449 "gauge" => btreemap! {
450 "value" => 64.0,
451 },
452 }),
453 Value::from(btreemap! {
454 "name" => "cpu_usage_user",
455 "kind" => "absolute",
456 "gauge" => btreemap! {
457 "value" => 10.0,
458 },
459 }),
460 ])),
461 tdef: type_def(),
462 }
463
464 influxdb_invalid_string_field_set_value {
465 args: func_args![ value: r#"valid foo="bar""# ],
466 want: Err("Error while converting InfluxDB line protocol metric to Vector's metric model: field set values of type string are not supported"),
467 tdef: type_def(),
468 }
469
470 influxdb_invalid_no_fields{
471 args: func_args![ value: "cpu " ],
472 want: Err("InfluxDB line protocol parsing error: No fields were provided"),
473 tdef: type_def(),
474 }
475 ];
476}