vector/sources/statsd/
parser.rs

1use std::{
2    error, fmt,
3    num::{ParseFloatError, ParseIntError},
4    str::Utf8Error,
5    sync::LazyLock,
6};
7
8use regex::Regex;
9
10use crate::{
11    event::metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind},
12    sources::{statsd::ConversionUnit, util::extract_tag_key_and_value},
13};
14
15static WHITESPACE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\s+").unwrap());
16static NONALPHANUM: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^a-zA-Z_\-0-9\.]").unwrap());
17
18#[derive(Clone)]
19pub struct Parser {
20    sanitize: bool,
21    convert_to: ConversionUnit,
22}
23
24impl Parser {
25    pub const fn new(sanitize_keys: bool, convert_to: ConversionUnit) -> Self {
26        Self {
27            sanitize: sanitize_keys,
28            convert_to,
29        }
30    }
31
32    pub fn parse(&self, packet: &str) -> Result<Metric, ParseError> {
33        // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/#datagram-format
34        let key_and_body = packet.splitn(2, ':').collect::<Vec<_>>();
35        if key_and_body.len() != 2 {
36            return Err(ParseError::Malformed(
37                "should be key and body with ':' separator",
38            ));
39        }
40        let (key, body) = (key_and_body[0], key_and_body[1]);
41
42        let parts = body.split('|').collect::<Vec<_>>();
43        if parts.len() < 2 {
44            return Err(ParseError::Malformed(
45                "body should have at least two pipe separated components",
46            ));
47        }
48
49        let name = sanitize_key(key, self.sanitize);
50        let metric_type = parts[1];
51
52        // sampling part is optional and comes after metric type part
53        let sampling = parts.get(2).filter(|s| s.starts_with('@'));
54        let sample_rate = if let Some(s) = sampling {
55            1.0 / sanitize_sampling(parse_sampling(s)?)
56        } else {
57            1.0
58        };
59
60        // tags are optional and could be found either after sampling of after metric type part
61        let tags = if sampling.is_none() {
62            parts.get(2)
63        } else {
64            parts.get(3)
65        };
66        let tags = tags.filter(|s| s.starts_with('#'));
67        let tags = tags.map(parse_tags).transpose()?;
68
69        let metric = match metric_type {
70            "c" => {
71                let val: f64 = parts[0].parse()?;
72                Metric::new(
73                    name,
74                    MetricKind::Incremental,
75                    MetricValue::Counter {
76                        value: val * sample_rate,
77                    },
78                )
79                .with_tags(tags)
80            }
81            unit @ "h" | unit @ "ms" | unit @ "d" => {
82                let val: f64 = parts[0].parse()?;
83                let converted_val = match unit {
84                    "ms" => match self.convert_to {
85                        ConversionUnit::Seconds => val / 1000.0,
86                        ConversionUnit::Milliseconds => val,
87                    },
88                    _ => val,
89                };
90                Metric::new(
91                    name,
92                    MetricKind::Incremental,
93                    MetricValue::Distribution {
94                        samples: vector_lib::samples![converted_val => sample_rate as u32],
95                        statistic: convert_to_statistic(unit),
96                    },
97                )
98                .with_tags(tags)
99            }
100            "g" => {
101                let value = if parts[0]
102                    .chars()
103                    .next()
104                    .map(|c| c.is_ascii_digit())
105                    .ok_or(ParseError::Malformed("empty first body component"))?
106                {
107                    parts[0].parse()?
108                } else {
109                    parts[0][1..].parse()?
110                };
111
112                match parse_direction(parts[0])? {
113                    None => Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
114                        .with_tags(tags),
115                    Some(sign) => Metric::new(
116                        name,
117                        MetricKind::Incremental,
118                        MetricValue::Gauge {
119                            value: value * sign,
120                        },
121                    )
122                    .with_tags(tags),
123                }
124            }
125            "s" => Metric::new(
126                name,
127                MetricKind::Incremental,
128                MetricValue::Set {
129                    values: vec![parts[0].into()].into_iter().collect(),
130                },
131            )
132            .with_tags(tags),
133            other => return Err(ParseError::UnknownMetricType(other.into())),
134        };
135        Ok(metric)
136    }
137}
138
139fn parse_sampling(input: &str) -> Result<f64, ParseError> {
140    if !input.starts_with('@') || input.len() < 2 {
141        return Err(ParseError::Malformed(
142            "expected non empty '@'-prefixed sampling component",
143        ));
144    }
145
146    let num: f64 = input[1..].parse()?;
147    if num.is_sign_positive() {
148        Ok(num)
149    } else {
150        Err(ParseError::Malformed("sample rate can't be negative"))
151    }
152}
153
154/// Statsd (and dogstatsd) support bare, single and multi-value tags.
155fn parse_tags(input: &&str) -> Result<MetricTags, ParseError> {
156    if !input.starts_with('#') || input.len() < 2 {
157        return Err(ParseError::Malformed(
158            "expected non empty '#'-prefixed tags component",
159        ));
160    }
161
162    Ok(input[1..]
163        .split(',')
164        .map(extract_tag_key_and_value)
165        .collect())
166}
167
168fn parse_direction(input: &str) -> Result<Option<f64>, ParseError> {
169    match input
170        .chars()
171        .next()
172        .ok_or(ParseError::Malformed("empty body component"))?
173    {
174        '+' => Ok(Some(1.0)),
175        '-' => Ok(Some(-1.0)),
176        c if c.is_ascii_digit() => Ok(None),
177        _other => Err(ParseError::Malformed("invalid gauge value prefix")),
178    }
179}
180
181fn sanitize_key(key: &str, sanitize: bool) -> String {
182    if !sanitize {
183        key.to_owned()
184    } else {
185        let s = key.replace('/', "'-");
186        let s = WHITESPACE.replace_all(&s, "_");
187        let s = NONALPHANUM.replace_all(&s, "");
188        s.into()
189    }
190}
191
192fn sanitize_sampling(sampling: f64) -> f64 {
193    if sampling == 0.0 { 1.0 } else { sampling }
194}
195
196fn convert_to_statistic(unit: &str) -> StatisticKind {
197    match unit {
198        "d" => StatisticKind::Summary,
199        _ => StatisticKind::Histogram,
200    }
201}
202
203#[derive(Debug, PartialEq, Eq)]
204pub enum ParseError {
205    InvalidUtf8(Utf8Error),
206    Malformed(&'static str),
207    UnknownMetricType(String),
208    InvalidInteger(ParseIntError),
209    InvalidFloat(ParseFloatError),
210}
211
212impl fmt::Display for ParseError {
213    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214        write!(f, "Statsd parse error: {self:?}")
215    }
216}
217
218vector_lib::impl_event_data_eq!(ParseError);
219
220impl error::Error for ParseError {}
221
222impl From<ParseIntError> for ParseError {
223    fn from(e: ParseIntError) -> ParseError {
224        ParseError::InvalidInteger(e)
225    }
226}
227
228impl From<ParseFloatError> for ParseError {
229    fn from(e: ParseFloatError) -> ParseError {
230        ParseError::InvalidFloat(e)
231    }
232}
233
234#[cfg(test)]
235mod test {
236    use vector_lib::{assert_event_data_eq, event::metric::TagValue, metric_tags};
237
238    use super::{ParseError, Parser, sanitize_key, sanitize_sampling};
239    use crate::{
240        event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
241        sources::statsd::ConversionUnit,
242    };
243
244    const SANITIZING_PARSER: Parser = Parser::new(true, ConversionUnit::Seconds);
245    fn parse(packet: &str) -> Result<Metric, ParseError> {
246        SANITIZING_PARSER.parse(packet)
247    }
248
249    const NON_CONVERTING_PARSER: Parser = Parser::new(true, ConversionUnit::Milliseconds);
250    fn parse_non_converting(packet: &str) -> Result<Metric, ParseError> {
251        NON_CONVERTING_PARSER.parse(packet)
252    }
253
254    const NON_SANITIZING_PARSER: Parser = Parser::new(false, ConversionUnit::Seconds);
255    fn unsanitized_parse(packet: &str) -> Result<Metric, ParseError> {
256        NON_SANITIZING_PARSER.parse(packet)
257    }
258
259    #[test]
260    fn basic_counter() {
261        assert_event_data_eq!(
262            parse("foo:1|c"),
263            Ok(Metric::new(
264                "foo",
265                MetricKind::Incremental,
266                MetricValue::Counter { value: 1.0 },
267            )),
268        );
269    }
270
271    #[test]
272    fn tagged_counter() {
273        assert_event_data_eq!(
274            parse("foo/how@ever baz:1|c|#tag1,tag2:value"),
275            Ok(Metric::new(
276                "foo-however_baz",
277                MetricKind::Incremental,
278                MetricValue::Counter { value: 1.0 },
279            )
280            .with_tags(Some(metric_tags!(
281                "tag1" => TagValue::Bare,
282                "tag2" => "value",
283            )))),
284        );
285    }
286
287    #[test]
288    fn tagged_not_sanitized_counter() {
289        assert_event_data_eq!(
290            unsanitized_parse("foo/bar@baz baz:1|c|#tag1,tag2:value"),
291            Ok(Metric::new(
292                "foo/bar@baz baz",
293                MetricKind::Incremental,
294                MetricValue::Counter { value: 1.0 },
295            )
296            .with_tags(Some(metric_tags!(
297                "tag1" => TagValue::Bare,
298                "tag2" => "value",
299            )))),
300        );
301    }
302
303    #[test]
304    fn enhanced_tags() {
305        assert_event_data_eq!(
306            parse("foo:1|c|#tag1,tag2:valueA,tag2:valueB,tag3:value,tag3,tag4:"),
307            Ok(Metric::new(
308                "foo",
309                MetricKind::Incremental,
310                MetricValue::Counter { value: 1.0 },
311            )
312            .with_tags(Some(metric_tags!(
313                "tag1" => TagValue::Bare,
314                "tag2" => "valueA",
315                "tag2" => "valueB",
316                "tag3" => "value",
317                "tag3" => TagValue::Bare,
318                "tag4" => "",
319            )))),
320        );
321    }
322
323    #[test]
324    fn sampled_counter() {
325        assert_event_data_eq!(
326            parse("bar:2|c|@0.1"),
327            Ok(Metric::new(
328                "bar",
329                MetricKind::Incremental,
330                MetricValue::Counter { value: 20.0 },
331            )),
332        );
333    }
334
335    #[test]
336    fn zero_sampled_counter() {
337        assert_event_data_eq!(
338            parse("bar:2|c|@0"),
339            Ok(Metric::new(
340                "bar",
341                MetricKind::Incremental,
342                MetricValue::Counter { value: 2.0 },
343            )),
344        );
345    }
346
347    #[test]
348    fn sampled_timer() {
349        assert_event_data_eq!(
350            parse("glork:320|ms|@0.1"),
351            Ok(Metric::new(
352                "glork",
353                MetricKind::Incremental,
354                MetricValue::Distribution {
355                    samples: vector_lib::samples![0.320 => 10],
356                    statistic: StatisticKind::Histogram
357                },
358            )),
359        );
360    }
361
362    #[test]
363    fn sampled_timer_non_converting() {
364        assert_event_data_eq!(
365            parse_non_converting("glork:320|ms|@0.1"),
366            Ok(Metric::new(
367                "glork",
368                MetricKind::Incremental,
369                MetricValue::Distribution {
370                    samples: vector_lib::samples![320.0 => 10],
371                    statistic: StatisticKind::Histogram
372                },
373            )),
374        );
375    }
376
377    #[test]
378    fn sampled_tagged_histogram() {
379        assert_event_data_eq!(
380            parse("glork:320|h|@0.1|#region:us-west1,production,e:"),
381            Ok(Metric::new(
382                "glork",
383                MetricKind::Incremental,
384                MetricValue::Distribution {
385                    samples: vector_lib::samples![320.0 => 10],
386                    statistic: StatisticKind::Histogram
387                },
388            )
389            .with_tags(Some(metric_tags!(
390                "region" => "us-west1",
391                "production" => TagValue::Bare,
392                "e" => "",
393            )))),
394        );
395    }
396
397    #[test]
398    fn sampled_distribution() {
399        assert_event_data_eq!(
400            parse("glork:320|d|@0.1|#region:us-west1,production,e:"),
401            Ok(Metric::new(
402                "glork",
403                MetricKind::Incremental,
404                MetricValue::Distribution {
405                    samples: vector_lib::samples![320.0 => 10],
406                    statistic: StatisticKind::Summary
407                },
408            )
409            .with_tags(Some(metric_tags!(
410                "region" => "us-west1",
411                "production" => TagValue::Bare,
412                "e" => "",
413            )))),
414        );
415    }
416
417    #[test]
418    fn simple_gauge() {
419        assert_event_data_eq!(
420            parse("gaugor:333|g"),
421            Ok(Metric::new(
422                "gaugor",
423                MetricKind::Absolute,
424                MetricValue::Gauge { value: 333.0 },
425            )),
426        );
427    }
428
429    #[test]
430    fn signed_gauge() {
431        assert_event_data_eq!(
432            parse("gaugor:-4|g"),
433            Ok(Metric::new(
434                "gaugor",
435                MetricKind::Incremental,
436                MetricValue::Gauge { value: -4.0 },
437            )),
438        );
439        assert_event_data_eq!(
440            parse("gaugor:+10|g"),
441            Ok(Metric::new(
442                "gaugor",
443                MetricKind::Incremental,
444                MetricValue::Gauge { value: 10.0 },
445            )),
446        );
447    }
448
449    #[test]
450    fn sets() {
451        assert_event_data_eq!(
452            parse("uniques:765|s"),
453            Ok(Metric::new(
454                "uniques",
455                MetricKind::Incremental,
456                MetricValue::Set {
457                    values: vec!["765".into()].into_iter().collect()
458                },
459            )),
460        );
461    }
462
463    #[test]
464    fn sanitizing_keys() {
465        assert_eq!("foo-bar-baz", sanitize_key("foo/bar/baz", true));
466        assert_eq!("foo/bar/baz", sanitize_key("foo/bar/baz", false));
467        assert_eq!("foo_bar_baz", sanitize_key("foo bar  baz", true));
468        assert_eq!("foo.__bar_.baz", sanitize_key("foo. @& bar_$!#.baz", true));
469        assert_eq!(
470            "foo. @& bar_$!#.baz",
471            sanitize_key("foo. @& bar_$!#.baz", false)
472        );
473    }
474
475    #[test]
476    fn sanitizing_sampling() {
477        assert_eq!(1.0, sanitize_sampling(0.0));
478        assert_eq!(2.5, sanitize_sampling(2.5));
479        assert_eq!(-5.0, sanitize_sampling(-5.0));
480    }
481}