1use std::{
2 error, fmt,
3 num::{ParseFloatError, ParseIntError},
4 str::Utf8Error,
5 sync::LazyLock,
6};
7
8use regex::Regex;
9
10use crate::{
11 event::metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind},
12 sources::statsd::ConversionUnit,
13 sources::util::extract_tag_key_and_value,
14};
15
16static WHITESPACE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\s+").unwrap());
17static NONALPHANUM: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^a-zA-Z_\-0-9\.]").unwrap());
18
19#[derive(Clone)]
20pub struct Parser {
21 sanitize: bool,
22 convert_to: ConversionUnit,
23}
24
25impl Parser {
26 pub const fn new(sanitize_keys: bool, convert_to: ConversionUnit) -> Self {
27 Self {
28 sanitize: sanitize_keys,
29 convert_to,
30 }
31 }
32
33 pub fn parse(&self, packet: &str) -> Result<Metric, ParseError> {
34 let key_and_body = packet.splitn(2, ':').collect::<Vec<_>>();
36 if key_and_body.len() != 2 {
37 return Err(ParseError::Malformed(
38 "should be key and body with ':' separator",
39 ));
40 }
41 let (key, body) = (key_and_body[0], key_and_body[1]);
42
43 let parts = body.split('|').collect::<Vec<_>>();
44 if parts.len() < 2 {
45 return Err(ParseError::Malformed(
46 "body should have at least two pipe separated components",
47 ));
48 }
49
50 let name = sanitize_key(key, self.sanitize);
51 let metric_type = parts[1];
52
53 let sampling = parts.get(2).filter(|s| s.starts_with('@'));
55 let sample_rate = if let Some(s) = sampling {
56 1.0 / sanitize_sampling(parse_sampling(s)?)
57 } else {
58 1.0
59 };
60
61 let tags = if sampling.is_none() {
63 parts.get(2)
64 } else {
65 parts.get(3)
66 };
67 let tags = tags.filter(|s| s.starts_with('#'));
68 let tags = tags.map(parse_tags).transpose()?;
69
70 let metric = match metric_type {
71 "c" => {
72 let val: f64 = parts[0].parse()?;
73 Metric::new(
74 name,
75 MetricKind::Incremental,
76 MetricValue::Counter {
77 value: val * sample_rate,
78 },
79 )
80 .with_tags(tags)
81 }
82 unit @ "h" | unit @ "ms" | unit @ "d" => {
83 let val: f64 = parts[0].parse()?;
84 let converted_val = match unit {
85 "ms" => match self.convert_to {
86 ConversionUnit::Seconds => val / 1000.0,
87 ConversionUnit::Milliseconds => val,
88 },
89 _ => val,
90 };
91 Metric::new(
92 name,
93 MetricKind::Incremental,
94 MetricValue::Distribution {
95 samples: vector_lib::samples![converted_val => sample_rate as u32],
96 statistic: convert_to_statistic(unit),
97 },
98 )
99 .with_tags(tags)
100 }
101 "g" => {
102 let value = if parts[0]
103 .chars()
104 .next()
105 .map(|c| c.is_ascii_digit())
106 .ok_or(ParseError::Malformed("empty first body component"))?
107 {
108 parts[0].parse()?
109 } else {
110 parts[0][1..].parse()?
111 };
112
113 match parse_direction(parts[0])? {
114 None => Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
115 .with_tags(tags),
116 Some(sign) => Metric::new(
117 name,
118 MetricKind::Incremental,
119 MetricValue::Gauge {
120 value: value * sign,
121 },
122 )
123 .with_tags(tags),
124 }
125 }
126 "s" => Metric::new(
127 name,
128 MetricKind::Incremental,
129 MetricValue::Set {
130 values: vec![parts[0].into()].into_iter().collect(),
131 },
132 )
133 .with_tags(tags),
134 other => return Err(ParseError::UnknownMetricType(other.into())),
135 };
136 Ok(metric)
137 }
138}
139
140fn parse_sampling(input: &str) -> Result<f64, ParseError> {
141 if !input.starts_with('@') || input.len() < 2 {
142 return Err(ParseError::Malformed(
143 "expected non empty '@'-prefixed sampling component",
144 ));
145 }
146
147 let num: f64 = input[1..].parse()?;
148 if num.is_sign_positive() {
149 Ok(num)
150 } else {
151 Err(ParseError::Malformed("sample rate can't be negative"))
152 }
153}
154
155fn parse_tags(input: &&str) -> Result<MetricTags, ParseError> {
157 if !input.starts_with('#') || input.len() < 2 {
158 return Err(ParseError::Malformed(
159 "expected non empty '#'-prefixed tags component",
160 ));
161 }
162
163 Ok(input[1..]
164 .split(',')
165 .map(extract_tag_key_and_value)
166 .collect())
167}
168
169fn parse_direction(input: &str) -> Result<Option<f64>, ParseError> {
170 match input
171 .chars()
172 .next()
173 .ok_or(ParseError::Malformed("empty body component"))?
174 {
175 '+' => Ok(Some(1.0)),
176 '-' => Ok(Some(-1.0)),
177 c if c.is_ascii_digit() => Ok(None),
178 _other => Err(ParseError::Malformed("invalid gauge value prefix")),
179 }
180}
181
182fn sanitize_key(key: &str, sanitize: bool) -> String {
183 if !sanitize {
184 key.to_owned()
185 } else {
186 let s = key.replace('/', "'-");
187 let s = WHITESPACE.replace_all(&s, "_");
188 let s = NONALPHANUM.replace_all(&s, "");
189 s.into()
190 }
191}
192
193fn sanitize_sampling(sampling: f64) -> f64 {
194 if sampling == 0.0 {
195 1.0
196 } else {
197 sampling
198 }
199}
200
201fn convert_to_statistic(unit: &str) -> StatisticKind {
202 match unit {
203 "d" => StatisticKind::Summary,
204 _ => StatisticKind::Histogram,
205 }
206}
207
208#[derive(Debug, PartialEq, Eq)]
209pub enum ParseError {
210 InvalidUtf8(Utf8Error),
211 Malformed(&'static str),
212 UnknownMetricType(String),
213 InvalidInteger(ParseIntError),
214 InvalidFloat(ParseFloatError),
215}
216
217impl fmt::Display for ParseError {
218 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
219 write!(f, "Statsd parse error: {self:?}")
220 }
221}
222
223vector_lib::impl_event_data_eq!(ParseError);
224
225impl error::Error for ParseError {}
226
227impl From<ParseIntError> for ParseError {
228 fn from(e: ParseIntError) -> ParseError {
229 ParseError::InvalidInteger(e)
230 }
231}
232
233impl From<ParseFloatError> for ParseError {
234 fn from(e: ParseFloatError) -> ParseError {
235 ParseError::InvalidFloat(e)
236 }
237}
238
239#[cfg(test)]
240mod test {
241 use vector_lib::assert_event_data_eq;
242 use vector_lib::{event::metric::TagValue, metric_tags};
243
244 use super::{sanitize_key, sanitize_sampling, ParseError, Parser};
245 use crate::event::metric::{Metric, MetricKind, MetricValue, StatisticKind};
246 use crate::sources::statsd::ConversionUnit;
247
248 const SANITIZING_PARSER: Parser = Parser::new(true, ConversionUnit::Seconds);
249 fn parse(packet: &str) -> Result<Metric, ParseError> {
250 SANITIZING_PARSER.parse(packet)
251 }
252
253 const NON_CONVERTING_PARSER: Parser = Parser::new(true, ConversionUnit::Milliseconds);
254 fn parse_non_converting(packet: &str) -> Result<Metric, ParseError> {
255 NON_CONVERTING_PARSER.parse(packet)
256 }
257
258 const NON_SANITIZING_PARSER: Parser = Parser::new(false, ConversionUnit::Seconds);
259 fn unsanitized_parse(packet: &str) -> Result<Metric, ParseError> {
260 NON_SANITIZING_PARSER.parse(packet)
261 }
262
263 #[test]
264 fn basic_counter() {
265 assert_event_data_eq!(
266 parse("foo:1|c"),
267 Ok(Metric::new(
268 "foo",
269 MetricKind::Incremental,
270 MetricValue::Counter { value: 1.0 },
271 )),
272 );
273 }
274
275 #[test]
276 fn tagged_counter() {
277 assert_event_data_eq!(
278 parse("foo/how@ever baz:1|c|#tag1,tag2:value"),
279 Ok(Metric::new(
280 "foo-however_baz",
281 MetricKind::Incremental,
282 MetricValue::Counter { value: 1.0 },
283 )
284 .with_tags(Some(metric_tags!(
285 "tag1" => TagValue::Bare,
286 "tag2" => "value",
287 )))),
288 );
289 }
290
291 #[test]
292 fn tagged_not_sanitized_counter() {
293 assert_event_data_eq!(
294 unsanitized_parse("foo/bar@baz baz:1|c|#tag1,tag2:value"),
295 Ok(Metric::new(
296 "foo/bar@baz baz",
297 MetricKind::Incremental,
298 MetricValue::Counter { value: 1.0 },
299 )
300 .with_tags(Some(metric_tags!(
301 "tag1" => TagValue::Bare,
302 "tag2" => "value",
303 )))),
304 );
305 }
306
307 #[test]
308 fn enhanced_tags() {
309 assert_event_data_eq!(
310 parse("foo:1|c|#tag1,tag2:valueA,tag2:valueB,tag3:value,tag3,tag4:"),
311 Ok(Metric::new(
312 "foo",
313 MetricKind::Incremental,
314 MetricValue::Counter { value: 1.0 },
315 )
316 .with_tags(Some(metric_tags!(
317 "tag1" => TagValue::Bare,
318 "tag2" => "valueA",
319 "tag2" => "valueB",
320 "tag3" => "value",
321 "tag3" => TagValue::Bare,
322 "tag4" => "",
323 )))),
324 );
325 }
326
327 #[test]
328 fn sampled_counter() {
329 assert_event_data_eq!(
330 parse("bar:2|c|@0.1"),
331 Ok(Metric::new(
332 "bar",
333 MetricKind::Incremental,
334 MetricValue::Counter { value: 20.0 },
335 )),
336 );
337 }
338
339 #[test]
340 fn zero_sampled_counter() {
341 assert_event_data_eq!(
342 parse("bar:2|c|@0"),
343 Ok(Metric::new(
344 "bar",
345 MetricKind::Incremental,
346 MetricValue::Counter { value: 2.0 },
347 )),
348 );
349 }
350
351 #[test]
352 fn sampled_timer() {
353 assert_event_data_eq!(
354 parse("glork:320|ms|@0.1"),
355 Ok(Metric::new(
356 "glork",
357 MetricKind::Incremental,
358 MetricValue::Distribution {
359 samples: vector_lib::samples![0.320 => 10],
360 statistic: StatisticKind::Histogram
361 },
362 )),
363 );
364 }
365
366 #[test]
367 fn sampled_timer_non_converting() {
368 assert_event_data_eq!(
369 parse_non_converting("glork:320|ms|@0.1"),
370 Ok(Metric::new(
371 "glork",
372 MetricKind::Incremental,
373 MetricValue::Distribution {
374 samples: vector_lib::samples![320.0 => 10],
375 statistic: StatisticKind::Histogram
376 },
377 )),
378 );
379 }
380
381 #[test]
382 fn sampled_tagged_histogram() {
383 assert_event_data_eq!(
384 parse("glork:320|h|@0.1|#region:us-west1,production,e:"),
385 Ok(Metric::new(
386 "glork",
387 MetricKind::Incremental,
388 MetricValue::Distribution {
389 samples: vector_lib::samples![320.0 => 10],
390 statistic: StatisticKind::Histogram
391 },
392 )
393 .with_tags(Some(metric_tags!(
394 "region" => "us-west1",
395 "production" => TagValue::Bare,
396 "e" => "",
397 )))),
398 );
399 }
400
401 #[test]
402 fn sampled_distribution() {
403 assert_event_data_eq!(
404 parse("glork:320|d|@0.1|#region:us-west1,production,e:"),
405 Ok(Metric::new(
406 "glork",
407 MetricKind::Incremental,
408 MetricValue::Distribution {
409 samples: vector_lib::samples![320.0 => 10],
410 statistic: StatisticKind::Summary
411 },
412 )
413 .with_tags(Some(metric_tags!(
414 "region" => "us-west1",
415 "production" => TagValue::Bare,
416 "e" => "",
417 )))),
418 );
419 }
420
421 #[test]
422 fn simple_gauge() {
423 assert_event_data_eq!(
424 parse("gaugor:333|g"),
425 Ok(Metric::new(
426 "gaugor",
427 MetricKind::Absolute,
428 MetricValue::Gauge { value: 333.0 },
429 )),
430 );
431 }
432
433 #[test]
434 fn signed_gauge() {
435 assert_event_data_eq!(
436 parse("gaugor:-4|g"),
437 Ok(Metric::new(
438 "gaugor",
439 MetricKind::Incremental,
440 MetricValue::Gauge { value: -4.0 },
441 )),
442 );
443 assert_event_data_eq!(
444 parse("gaugor:+10|g"),
445 Ok(Metric::new(
446 "gaugor",
447 MetricKind::Incremental,
448 MetricValue::Gauge { value: 10.0 },
449 )),
450 );
451 }
452
453 #[test]
454 fn sets() {
455 assert_event_data_eq!(
456 parse("uniques:765|s"),
457 Ok(Metric::new(
458 "uniques",
459 MetricKind::Incremental,
460 MetricValue::Set {
461 values: vec!["765".into()].into_iter().collect()
462 },
463 )),
464 );
465 }
466
467 #[test]
468 fn sanitizing_keys() {
469 assert_eq!("foo-bar-baz", sanitize_key("foo/bar/baz", true));
470 assert_eq!("foo/bar/baz", sanitize_key("foo/bar/baz", false));
471 assert_eq!("foo_bar_baz", sanitize_key("foo bar baz", true));
472 assert_eq!("foo.__bar_.baz", sanitize_key("foo. @& bar_$!#.baz", true));
473 assert_eq!(
474 "foo. @& bar_$!#.baz",
475 sanitize_key("foo. @& bar_$!#.baz", false)
476 );
477 }
478
479 #[test]
480 fn sanitizing_sampling() {
481 assert_eq!(1.0, sanitize_sampling(0.0));
482 assert_eq!(2.5, sanitize_sampling(2.5));
483 assert_eq!(-5.0, sanitize_sampling(-5.0));
484 }
485}