vector/sources/statsd/
parser.rs

1use std::{
2    error, fmt,
3    num::{ParseFloatError, ParseIntError},
4    str::Utf8Error,
5    sync::LazyLock,
6};
7
8use regex::Regex;
9
10use crate::{
11    event::metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind},
12    sources::statsd::ConversionUnit,
13    sources::util::extract_tag_key_and_value,
14};
15
16static WHITESPACE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\s+").unwrap());
17static NONALPHANUM: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^a-zA-Z_\-0-9\.]").unwrap());
18
19#[derive(Clone)]
20pub struct Parser {
21    sanitize: bool,
22    convert_to: ConversionUnit,
23}
24
25impl Parser {
26    pub const fn new(sanitize_keys: bool, convert_to: ConversionUnit) -> Self {
27        Self {
28            sanitize: sanitize_keys,
29            convert_to,
30        }
31    }
32
33    pub fn parse(&self, packet: &str) -> Result<Metric, ParseError> {
34        // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#datagram-format
35        let key_and_body = packet.splitn(2, ':').collect::<Vec<_>>();
36        if key_and_body.len() != 2 {
37            return Err(ParseError::Malformed(
38                "should be key and body with ':' separator",
39            ));
40        }
41        let (key, body) = (key_and_body[0], key_and_body[1]);
42
43        let parts = body.split('|').collect::<Vec<_>>();
44        if parts.len() < 2 {
45            return Err(ParseError::Malformed(
46                "body should have at least two pipe separated components",
47            ));
48        }
49
50        let name = sanitize_key(key, self.sanitize);
51        let metric_type = parts[1];
52
53        // sampling part is optional and comes after metric type part
54        let sampling = parts.get(2).filter(|s| s.starts_with('@'));
55        let sample_rate = if let Some(s) = sampling {
56            1.0 / sanitize_sampling(parse_sampling(s)?)
57        } else {
58            1.0
59        };
60
61        // tags are optional and could be found either after sampling of after metric type part
62        let tags = if sampling.is_none() {
63            parts.get(2)
64        } else {
65            parts.get(3)
66        };
67        let tags = tags.filter(|s| s.starts_with('#'));
68        let tags = tags.map(parse_tags).transpose()?;
69
70        let metric = match metric_type {
71            "c" => {
72                let val: f64 = parts[0].parse()?;
73                Metric::new(
74                    name,
75                    MetricKind::Incremental,
76                    MetricValue::Counter {
77                        value: val * sample_rate,
78                    },
79                )
80                .with_tags(tags)
81            }
82            unit @ "h" | unit @ "ms" | unit @ "d" => {
83                let val: f64 = parts[0].parse()?;
84                let converted_val = match unit {
85                    "ms" => match self.convert_to {
86                        ConversionUnit::Seconds => val / 1000.0,
87                        ConversionUnit::Milliseconds => val,
88                    },
89                    _ => val,
90                };
91                Metric::new(
92                    name,
93                    MetricKind::Incremental,
94                    MetricValue::Distribution {
95                        samples: vector_lib::samples![converted_val => sample_rate as u32],
96                        statistic: convert_to_statistic(unit),
97                    },
98                )
99                .with_tags(tags)
100            }
101            "g" => {
102                let value = if parts[0]
103                    .chars()
104                    .next()
105                    .map(|c| c.is_ascii_digit())
106                    .ok_or(ParseError::Malformed("empty first body component"))?
107                {
108                    parts[0].parse()?
109                } else {
110                    parts[0][1..].parse()?
111                };
112
113                match parse_direction(parts[0])? {
114                    None => Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
115                        .with_tags(tags),
116                    Some(sign) => Metric::new(
117                        name,
118                        MetricKind::Incremental,
119                        MetricValue::Gauge {
120                            value: value * sign,
121                        },
122                    )
123                    .with_tags(tags),
124                }
125            }
126            "s" => Metric::new(
127                name,
128                MetricKind::Incremental,
129                MetricValue::Set {
130                    values: vec![parts[0].into()].into_iter().collect(),
131                },
132            )
133            .with_tags(tags),
134            other => return Err(ParseError::UnknownMetricType(other.into())),
135        };
136        Ok(metric)
137    }
138}
139
140fn parse_sampling(input: &str) -> Result<f64, ParseError> {
141    if !input.starts_with('@') || input.len() < 2 {
142        return Err(ParseError::Malformed(
143            "expected non empty '@'-prefixed sampling component",
144        ));
145    }
146
147    let num: f64 = input[1..].parse()?;
148    if num.is_sign_positive() {
149        Ok(num)
150    } else {
151        Err(ParseError::Malformed("sample rate can't be negative"))
152    }
153}
154
155/// Statsd (and dogstatsd) support bare, single and multi-value tags.
156fn parse_tags(input: &&str) -> Result<MetricTags, ParseError> {
157    if !input.starts_with('#') || input.len() < 2 {
158        return Err(ParseError::Malformed(
159            "expected non empty '#'-prefixed tags component",
160        ));
161    }
162
163    Ok(input[1..]
164        .split(',')
165        .map(extract_tag_key_and_value)
166        .collect())
167}
168
169fn parse_direction(input: &str) -> Result<Option<f64>, ParseError> {
170    match input
171        .chars()
172        .next()
173        .ok_or(ParseError::Malformed("empty body component"))?
174    {
175        '+' => Ok(Some(1.0)),
176        '-' => Ok(Some(-1.0)),
177        c if c.is_ascii_digit() => Ok(None),
178        _other => Err(ParseError::Malformed("invalid gauge value prefix")),
179    }
180}
181
182fn sanitize_key(key: &str, sanitize: bool) -> String {
183    if !sanitize {
184        key.to_owned()
185    } else {
186        let s = key.replace('/', "'-");
187        let s = WHITESPACE.replace_all(&s, "_");
188        let s = NONALPHANUM.replace_all(&s, "");
189        s.into()
190    }
191}
192
193fn sanitize_sampling(sampling: f64) -> f64 {
194    if sampling == 0.0 {
195        1.0
196    } else {
197        sampling
198    }
199}
200
201fn convert_to_statistic(unit: &str) -> StatisticKind {
202    match unit {
203        "d" => StatisticKind::Summary,
204        _ => StatisticKind::Histogram,
205    }
206}
207
208#[derive(Debug, PartialEq, Eq)]
209pub enum ParseError {
210    InvalidUtf8(Utf8Error),
211    Malformed(&'static str),
212    UnknownMetricType(String),
213    InvalidInteger(ParseIntError),
214    InvalidFloat(ParseFloatError),
215}
216
217impl fmt::Display for ParseError {
218    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219        write!(f, "Statsd parse error: {self:?}")
220    }
221}
222
223vector_lib::impl_event_data_eq!(ParseError);
224
225impl error::Error for ParseError {}
226
227impl From<ParseIntError> for ParseError {
228    fn from(e: ParseIntError) -> ParseError {
229        ParseError::InvalidInteger(e)
230    }
231}
232
233impl From<ParseFloatError> for ParseError {
234    fn from(e: ParseFloatError) -> ParseError {
235        ParseError::InvalidFloat(e)
236    }
237}
238
239#[cfg(test)]
240mod test {
241    use vector_lib::assert_event_data_eq;
242    use vector_lib::{event::metric::TagValue, metric_tags};
243
244    use super::{sanitize_key, sanitize_sampling, ParseError, Parser};
245    use crate::event::metric::{Metric, MetricKind, MetricValue, StatisticKind};
246    use crate::sources::statsd::ConversionUnit;
247
248    const SANITIZING_PARSER: Parser = Parser::new(true, ConversionUnit::Seconds);
249    fn parse(packet: &str) -> Result<Metric, ParseError> {
250        SANITIZING_PARSER.parse(packet)
251    }
252
253    const NON_CONVERTING_PARSER: Parser = Parser::new(true, ConversionUnit::Milliseconds);
254    fn parse_non_converting(packet: &str) -> Result<Metric, ParseError> {
255        NON_CONVERTING_PARSER.parse(packet)
256    }
257
258    const NON_SANITIZING_PARSER: Parser = Parser::new(false, ConversionUnit::Seconds);
259    fn unsanitized_parse(packet: &str) -> Result<Metric, ParseError> {
260        NON_SANITIZING_PARSER.parse(packet)
261    }
262
263    #[test]
264    fn basic_counter() {
265        assert_event_data_eq!(
266            parse("foo:1|c"),
267            Ok(Metric::new(
268                "foo",
269                MetricKind::Incremental,
270                MetricValue::Counter { value: 1.0 },
271            )),
272        );
273    }
274
275    #[test]
276    fn tagged_counter() {
277        assert_event_data_eq!(
278            parse("foo/how@ever baz:1|c|#tag1,tag2:value"),
279            Ok(Metric::new(
280                "foo-however_baz",
281                MetricKind::Incremental,
282                MetricValue::Counter { value: 1.0 },
283            )
284            .with_tags(Some(metric_tags!(
285                "tag1" => TagValue::Bare,
286                "tag2" => "value",
287            )))),
288        );
289    }
290
291    #[test]
292    fn tagged_not_sanitized_counter() {
293        assert_event_data_eq!(
294            unsanitized_parse("foo/bar@baz baz:1|c|#tag1,tag2:value"),
295            Ok(Metric::new(
296                "foo/bar@baz baz",
297                MetricKind::Incremental,
298                MetricValue::Counter { value: 1.0 },
299            )
300            .with_tags(Some(metric_tags!(
301                "tag1" => TagValue::Bare,
302                "tag2" => "value",
303            )))),
304        );
305    }
306
307    #[test]
308    fn enhanced_tags() {
309        assert_event_data_eq!(
310            parse("foo:1|c|#tag1,tag2:valueA,tag2:valueB,tag3:value,tag3,tag4:"),
311            Ok(Metric::new(
312                "foo",
313                MetricKind::Incremental,
314                MetricValue::Counter { value: 1.0 },
315            )
316            .with_tags(Some(metric_tags!(
317                "tag1" => TagValue::Bare,
318                "tag2" => "valueA",
319                "tag2" => "valueB",
320                "tag3" => "value",
321                "tag3" => TagValue::Bare,
322                "tag4" => "",
323            )))),
324        );
325    }
326
327    #[test]
328    fn sampled_counter() {
329        assert_event_data_eq!(
330            parse("bar:2|c|@0.1"),
331            Ok(Metric::new(
332                "bar",
333                MetricKind::Incremental,
334                MetricValue::Counter { value: 20.0 },
335            )),
336        );
337    }
338
339    #[test]
340    fn zero_sampled_counter() {
341        assert_event_data_eq!(
342            parse("bar:2|c|@0"),
343            Ok(Metric::new(
344                "bar",
345                MetricKind::Incremental,
346                MetricValue::Counter { value: 2.0 },
347            )),
348        );
349    }
350
351    #[test]
352    fn sampled_timer() {
353        assert_event_data_eq!(
354            parse("glork:320|ms|@0.1"),
355            Ok(Metric::new(
356                "glork",
357                MetricKind::Incremental,
358                MetricValue::Distribution {
359                    samples: vector_lib::samples![0.320 => 10],
360                    statistic: StatisticKind::Histogram
361                },
362            )),
363        );
364    }
365
366    #[test]
367    fn sampled_timer_non_converting() {
368        assert_event_data_eq!(
369            parse_non_converting("glork:320|ms|@0.1"),
370            Ok(Metric::new(
371                "glork",
372                MetricKind::Incremental,
373                MetricValue::Distribution {
374                    samples: vector_lib::samples![320.0 => 10],
375                    statistic: StatisticKind::Histogram
376                },
377            )),
378        );
379    }
380
381    #[test]
382    fn sampled_tagged_histogram() {
383        assert_event_data_eq!(
384            parse("glork:320|h|@0.1|#region:us-west1,production,e:"),
385            Ok(Metric::new(
386                "glork",
387                MetricKind::Incremental,
388                MetricValue::Distribution {
389                    samples: vector_lib::samples![320.0 => 10],
390                    statistic: StatisticKind::Histogram
391                },
392            )
393            .with_tags(Some(metric_tags!(
394                "region" => "us-west1",
395                "production" => TagValue::Bare,
396                "e" => "",
397            )))),
398        );
399    }
400
401    #[test]
402    fn sampled_distribution() {
403        assert_event_data_eq!(
404            parse("glork:320|d|@0.1|#region:us-west1,production,e:"),
405            Ok(Metric::new(
406                "glork",
407                MetricKind::Incremental,
408                MetricValue::Distribution {
409                    samples: vector_lib::samples![320.0 => 10],
410                    statistic: StatisticKind::Summary
411                },
412            )
413            .with_tags(Some(metric_tags!(
414                "region" => "us-west1",
415                "production" => TagValue::Bare,
416                "e" => "",
417            )))),
418        );
419    }
420
421    #[test]
422    fn simple_gauge() {
423        assert_event_data_eq!(
424            parse("gaugor:333|g"),
425            Ok(Metric::new(
426                "gaugor",
427                MetricKind::Absolute,
428                MetricValue::Gauge { value: 333.0 },
429            )),
430        );
431    }
432
433    #[test]
434    fn signed_gauge() {
435        assert_event_data_eq!(
436            parse("gaugor:-4|g"),
437            Ok(Metric::new(
438                "gaugor",
439                MetricKind::Incremental,
440                MetricValue::Gauge { value: -4.0 },
441            )),
442        );
443        assert_event_data_eq!(
444            parse("gaugor:+10|g"),
445            Ok(Metric::new(
446                "gaugor",
447                MetricKind::Incremental,
448                MetricValue::Gauge { value: 10.0 },
449            )),
450        );
451    }
452
453    #[test]
454    fn sets() {
455        assert_event_data_eq!(
456            parse("uniques:765|s"),
457            Ok(Metric::new(
458                "uniques",
459                MetricKind::Incremental,
460                MetricValue::Set {
461                    values: vec!["765".into()].into_iter().collect()
462                },
463            )),
464        );
465    }
466
467    #[test]
468    fn sanitizing_keys() {
469        assert_eq!("foo-bar-baz", sanitize_key("foo/bar/baz", true));
470        assert_eq!("foo/bar/baz", sanitize_key("foo/bar/baz", false));
471        assert_eq!("foo_bar_baz", sanitize_key("foo bar  baz", true));
472        assert_eq!("foo.__bar_.baz", sanitize_key("foo. @& bar_$!#.baz", true));
473        assert_eq!(
474            "foo. @& bar_$!#.baz",
475            sanitize_key("foo. @& bar_$!#.baz", false)
476        );
477    }
478
479    #[test]
480    fn sanitizing_sampling() {
481        assert_eq!(1.0, sanitize_sampling(0.0));
482        assert_eq!(2.5, sanitize_sampling(2.5));
483        assert_eq!(-5.0, sanitize_sampling(-5.0));
484    }
485}