1use std::{
2 error, fmt,
3 num::{ParseFloatError, ParseIntError},
4 str::Utf8Error,
5 sync::LazyLock,
6};
7
8use regex::Regex;
9
10use crate::{
11 event::metric::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind},
12 sources::{statsd::ConversionUnit, util::extract_tag_key_and_value},
13};
14
15static WHITESPACE: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"\s+").unwrap());
16static NONALPHANUM: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"[^a-zA-Z_\-0-9\.]").unwrap());
17
18#[derive(Clone)]
19pub struct Parser {
20 sanitize: bool,
21 convert_to: ConversionUnit,
22}
23
24impl Parser {
25 pub const fn new(sanitize_keys: bool, convert_to: ConversionUnit) -> Self {
26 Self {
27 sanitize: sanitize_keys,
28 convert_to,
29 }
30 }
31
32 pub fn parse(&self, packet: &str) -> Result<Metric, ParseError> {
33 let key_and_body = packet.splitn(2, ':').collect::<Vec<_>>();
35 if key_and_body.len() != 2 {
36 return Err(ParseError::Malformed(
37 "should be key and body with ':' separator",
38 ));
39 }
40 let (key, body) = (key_and_body[0], key_and_body[1]);
41
42 let parts = body.split('|').collect::<Vec<_>>();
43 if parts.len() < 2 {
44 return Err(ParseError::Malformed(
45 "body should have at least two pipe separated components",
46 ));
47 }
48
49 let name = sanitize_key(key, self.sanitize);
50 let metric_type = parts[1];
51
52 let sampling = parts.get(2).filter(|s| s.starts_with('@'));
54 let sample_rate = if let Some(s) = sampling {
55 1.0 / sanitize_sampling(parse_sampling(s)?)
56 } else {
57 1.0
58 };
59
60 let tags = if sampling.is_none() {
62 parts.get(2)
63 } else {
64 parts.get(3)
65 };
66 let tags = tags.filter(|s| s.starts_with('#'));
67 let tags = tags.map(parse_tags).transpose()?;
68
69 let metric = match metric_type {
70 "c" => {
71 let val: f64 = parts[0].parse()?;
72 Metric::new(
73 name,
74 MetricKind::Incremental,
75 MetricValue::Counter {
76 value: val * sample_rate,
77 },
78 )
79 .with_tags(tags)
80 }
81 unit @ "h" | unit @ "ms" | unit @ "d" => {
82 let val: f64 = parts[0].parse()?;
83 let converted_val = match unit {
84 "ms" => match self.convert_to {
85 ConversionUnit::Seconds => val / 1000.0,
86 ConversionUnit::Milliseconds => val,
87 },
88 _ => val,
89 };
90 Metric::new(
91 name,
92 MetricKind::Incremental,
93 MetricValue::Distribution {
94 samples: vector_lib::samples![converted_val => sample_rate as u32],
95 statistic: convert_to_statistic(unit),
96 },
97 )
98 .with_tags(tags)
99 }
100 "g" => {
101 let value = if parts[0]
102 .chars()
103 .next()
104 .map(|c| c.is_ascii_digit())
105 .ok_or(ParseError::Malformed("empty first body component"))?
106 {
107 parts[0].parse()?
108 } else {
109 parts[0][1..].parse()?
110 };
111
112 match parse_direction(parts[0])? {
113 None => Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
114 .with_tags(tags),
115 Some(sign) => Metric::new(
116 name,
117 MetricKind::Incremental,
118 MetricValue::Gauge {
119 value: value * sign,
120 },
121 )
122 .with_tags(tags),
123 }
124 }
125 "s" => Metric::new(
126 name,
127 MetricKind::Incremental,
128 MetricValue::Set {
129 values: vec![parts[0].into()].into_iter().collect(),
130 },
131 )
132 .with_tags(tags),
133 other => return Err(ParseError::UnknownMetricType(other.into())),
134 };
135 Ok(metric)
136 }
137}
138
139fn parse_sampling(input: &str) -> Result<f64, ParseError> {
140 if !input.starts_with('@') || input.len() < 2 {
141 return Err(ParseError::Malformed(
142 "expected non empty '@'-prefixed sampling component",
143 ));
144 }
145
146 let num: f64 = input[1..].parse()?;
147 if num.is_sign_positive() {
148 Ok(num)
149 } else {
150 Err(ParseError::Malformed("sample rate can't be negative"))
151 }
152}
153
154fn parse_tags(input: &&str) -> Result<MetricTags, ParseError> {
156 if !input.starts_with('#') || input.len() < 2 {
157 return Err(ParseError::Malformed(
158 "expected non empty '#'-prefixed tags component",
159 ));
160 }
161
162 Ok(input[1..]
163 .split(',')
164 .map(extract_tag_key_and_value)
165 .collect())
166}
167
168fn parse_direction(input: &str) -> Result<Option<f64>, ParseError> {
169 match input
170 .chars()
171 .next()
172 .ok_or(ParseError::Malformed("empty body component"))?
173 {
174 '+' => Ok(Some(1.0)),
175 '-' => Ok(Some(-1.0)),
176 c if c.is_ascii_digit() => Ok(None),
177 _other => Err(ParseError::Malformed("invalid gauge value prefix")),
178 }
179}
180
181fn sanitize_key(key: &str, sanitize: bool) -> String {
182 if !sanitize {
183 key.to_owned()
184 } else {
185 let s = key.replace('/', "'-");
186 let s = WHITESPACE.replace_all(&s, "_");
187 let s = NONALPHANUM.replace_all(&s, "");
188 s.into()
189 }
190}
191
192fn sanitize_sampling(sampling: f64) -> f64 {
193 if sampling == 0.0 { 1.0 } else { sampling }
194}
195
196fn convert_to_statistic(unit: &str) -> StatisticKind {
197 match unit {
198 "d" => StatisticKind::Summary,
199 _ => StatisticKind::Histogram,
200 }
201}
202
203#[derive(Debug, PartialEq, Eq)]
204pub enum ParseError {
205 InvalidUtf8(Utf8Error),
206 Malformed(&'static str),
207 UnknownMetricType(String),
208 InvalidInteger(ParseIntError),
209 InvalidFloat(ParseFloatError),
210}
211
212impl fmt::Display for ParseError {
213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214 write!(f, "Statsd parse error: {self:?}")
215 }
216}
217
218vector_lib::impl_event_data_eq!(ParseError);
219
220impl error::Error for ParseError {}
221
222impl From<ParseIntError> for ParseError {
223 fn from(e: ParseIntError) -> ParseError {
224 ParseError::InvalidInteger(e)
225 }
226}
227
228impl From<ParseFloatError> for ParseError {
229 fn from(e: ParseFloatError) -> ParseError {
230 ParseError::InvalidFloat(e)
231 }
232}
233
234#[cfg(test)]
235mod test {
236 use vector_lib::{assert_event_data_eq, event::metric::TagValue, metric_tags};
237
238 use super::{ParseError, Parser, sanitize_key, sanitize_sampling};
239 use crate::{
240 event::metric::{Metric, MetricKind, MetricValue, StatisticKind},
241 sources::statsd::ConversionUnit,
242 };
243
244 const SANITIZING_PARSER: Parser = Parser::new(true, ConversionUnit::Seconds);
245 fn parse(packet: &str) -> Result<Metric, ParseError> {
246 SANITIZING_PARSER.parse(packet)
247 }
248
249 const NON_CONVERTING_PARSER: Parser = Parser::new(true, ConversionUnit::Milliseconds);
250 fn parse_non_converting(packet: &str) -> Result<Metric, ParseError> {
251 NON_CONVERTING_PARSER.parse(packet)
252 }
253
254 const NON_SANITIZING_PARSER: Parser = Parser::new(false, ConversionUnit::Seconds);
255 fn unsanitized_parse(packet: &str) -> Result<Metric, ParseError> {
256 NON_SANITIZING_PARSER.parse(packet)
257 }
258
259 #[test]
260 fn basic_counter() {
261 assert_event_data_eq!(
262 parse("foo:1|c"),
263 Ok(Metric::new(
264 "foo",
265 MetricKind::Incremental,
266 MetricValue::Counter { value: 1.0 },
267 )),
268 );
269 }
270
271 #[test]
272 fn tagged_counter() {
273 assert_event_data_eq!(
274 parse("foo/how@ever baz:1|c|#tag1,tag2:value"),
275 Ok(Metric::new(
276 "foo-however_baz",
277 MetricKind::Incremental,
278 MetricValue::Counter { value: 1.0 },
279 )
280 .with_tags(Some(metric_tags!(
281 "tag1" => TagValue::Bare,
282 "tag2" => "value",
283 )))),
284 );
285 }
286
287 #[test]
288 fn tagged_not_sanitized_counter() {
289 assert_event_data_eq!(
290 unsanitized_parse("foo/bar@baz baz:1|c|#tag1,tag2:value"),
291 Ok(Metric::new(
292 "foo/bar@baz baz",
293 MetricKind::Incremental,
294 MetricValue::Counter { value: 1.0 },
295 )
296 .with_tags(Some(metric_tags!(
297 "tag1" => TagValue::Bare,
298 "tag2" => "value",
299 )))),
300 );
301 }
302
303 #[test]
304 fn enhanced_tags() {
305 assert_event_data_eq!(
306 parse("foo:1|c|#tag1,tag2:valueA,tag2:valueB,tag3:value,tag3,tag4:"),
307 Ok(Metric::new(
308 "foo",
309 MetricKind::Incremental,
310 MetricValue::Counter { value: 1.0 },
311 )
312 .with_tags(Some(metric_tags!(
313 "tag1" => TagValue::Bare,
314 "tag2" => "valueA",
315 "tag2" => "valueB",
316 "tag3" => "value",
317 "tag3" => TagValue::Bare,
318 "tag4" => "",
319 )))),
320 );
321 }
322
323 #[test]
324 fn sampled_counter() {
325 assert_event_data_eq!(
326 parse("bar:2|c|@0.1"),
327 Ok(Metric::new(
328 "bar",
329 MetricKind::Incremental,
330 MetricValue::Counter { value: 20.0 },
331 )),
332 );
333 }
334
335 #[test]
336 fn zero_sampled_counter() {
337 assert_event_data_eq!(
338 parse("bar:2|c|@0"),
339 Ok(Metric::new(
340 "bar",
341 MetricKind::Incremental,
342 MetricValue::Counter { value: 2.0 },
343 )),
344 );
345 }
346
347 #[test]
348 fn sampled_timer() {
349 assert_event_data_eq!(
350 parse("glork:320|ms|@0.1"),
351 Ok(Metric::new(
352 "glork",
353 MetricKind::Incremental,
354 MetricValue::Distribution {
355 samples: vector_lib::samples![0.320 => 10],
356 statistic: StatisticKind::Histogram
357 },
358 )),
359 );
360 }
361
362 #[test]
363 fn sampled_timer_non_converting() {
364 assert_event_data_eq!(
365 parse_non_converting("glork:320|ms|@0.1"),
366 Ok(Metric::new(
367 "glork",
368 MetricKind::Incremental,
369 MetricValue::Distribution {
370 samples: vector_lib::samples![320.0 => 10],
371 statistic: StatisticKind::Histogram
372 },
373 )),
374 );
375 }
376
377 #[test]
378 fn sampled_tagged_histogram() {
379 assert_event_data_eq!(
380 parse("glork:320|h|@0.1|#region:us-west1,production,e:"),
381 Ok(Metric::new(
382 "glork",
383 MetricKind::Incremental,
384 MetricValue::Distribution {
385 samples: vector_lib::samples![320.0 => 10],
386 statistic: StatisticKind::Histogram
387 },
388 )
389 .with_tags(Some(metric_tags!(
390 "region" => "us-west1",
391 "production" => TagValue::Bare,
392 "e" => "",
393 )))),
394 );
395 }
396
397 #[test]
398 fn sampled_distribution() {
399 assert_event_data_eq!(
400 parse("glork:320|d|@0.1|#region:us-west1,production,e:"),
401 Ok(Metric::new(
402 "glork",
403 MetricKind::Incremental,
404 MetricValue::Distribution {
405 samples: vector_lib::samples![320.0 => 10],
406 statistic: StatisticKind::Summary
407 },
408 )
409 .with_tags(Some(metric_tags!(
410 "region" => "us-west1",
411 "production" => TagValue::Bare,
412 "e" => "",
413 )))),
414 );
415 }
416
417 #[test]
418 fn simple_gauge() {
419 assert_event_data_eq!(
420 parse("gaugor:333|g"),
421 Ok(Metric::new(
422 "gaugor",
423 MetricKind::Absolute,
424 MetricValue::Gauge { value: 333.0 },
425 )),
426 );
427 }
428
429 #[test]
430 fn signed_gauge() {
431 assert_event_data_eq!(
432 parse("gaugor:-4|g"),
433 Ok(Metric::new(
434 "gaugor",
435 MetricKind::Incremental,
436 MetricValue::Gauge { value: -4.0 },
437 )),
438 );
439 assert_event_data_eq!(
440 parse("gaugor:+10|g"),
441 Ok(Metric::new(
442 "gaugor",
443 MetricKind::Incremental,
444 MetricValue::Gauge { value: 10.0 },
445 )),
446 );
447 }
448
449 #[test]
450 fn sets() {
451 assert_event_data_eq!(
452 parse("uniques:765|s"),
453 Ok(Metric::new(
454 "uniques",
455 MetricKind::Incremental,
456 MetricValue::Set {
457 values: vec!["765".into()].into_iter().collect()
458 },
459 )),
460 );
461 }
462
463 #[test]
464 fn sanitizing_keys() {
465 assert_eq!("foo-bar-baz", sanitize_key("foo/bar/baz", true));
466 assert_eq!("foo/bar/baz", sanitize_key("foo/bar/baz", false));
467 assert_eq!("foo_bar_baz", sanitize_key("foo bar baz", true));
468 assert_eq!("foo.__bar_.baz", sanitize_key("foo. @& bar_$!#.baz", true));
469 assert_eq!(
470 "foo. @& bar_$!#.baz",
471 sanitize_key("foo. @& bar_$!#.baz", false)
472 );
473 }
474
475 #[test]
476 fn sanitizing_sampling() {
477 assert_eq!(1.0, sanitize_sampling(0.0));
478 assert_eq!(2.5, sanitize_sampling(2.5));
479 assert_eq!(-5.0, sanitize_sampling(-5.0));
480 }
481}