1use std::{
2 fmt::Display,
3 io::{self, Write},
4};
5
6use bytes::{BufMut, BytesMut};
7use tokio_util::codec::Encoder;
8use vector_lib::event::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
9
10use crate::{
11 internal_events::StatsdInvalidMetricError,
12 sinks::util::{buffer::metrics::compress_distribution, encode_namespace},
13};
14
15#[derive(Debug)]
22pub struct InfallibleIo;
23
24impl From<io::Error> for InfallibleIo {
25 fn from(_: io::Error) -> Self {
26 Self
27 }
28}
29
30#[derive(Debug, Clone)]
31pub(super) struct StatsdEncoder {
32 default_namespace: Option<String>,
33}
34
35impl StatsdEncoder {
36 pub const fn new(default_namespace: Option<String>) -> Self {
38 Self { default_namespace }
39 }
40}
41
42impl<'a> Encoder<&'a Metric> for StatsdEncoder {
43 type Error = InfallibleIo;
44
45 fn encode(&mut self, metric: &'a Metric, buf: &mut BytesMut) -> Result<(), Self::Error> {
46 let namespace = metric.namespace().or(self.default_namespace.as_deref());
47 let name = encode_namespace(namespace, '.', metric.name());
48 let tags = metric.tags().map(encode_tags);
49
50 match metric.value() {
51 MetricValue::Counter { value } => {
52 encode_and_write_single_event(buf, &name, tags.as_deref(), value, "c", None);
53 }
54 MetricValue::Gauge { value } => {
55 match metric.kind() {
56 MetricKind::Incremental => encode_and_write_single_event(
57 buf,
58 &name,
59 tags.as_deref(),
60 format!("{value:+}"),
61 "g",
62 None,
63 ),
64 MetricKind::Absolute => {
65 encode_and_write_single_event(buf, &name, tags.as_deref(), value, "g", None)
66 }
67 };
68 }
69 MetricValue::Distribution { samples, statistic } => {
70 let metric_type = match statistic {
71 StatisticKind::Histogram => "h",
72 StatisticKind::Summary => "d",
73 };
74
75 let mut samples = samples.clone();
82 let compressed_samples = compress_distribution(&mut samples);
83 for sample in compressed_samples {
84 encode_and_write_single_event(
85 buf,
86 &name,
87 tags.as_deref(),
88 sample.value,
89 metric_type,
90 Some(sample.rate),
91 );
92 }
93 }
94 MetricValue::Set { values } => {
95 for val in values {
96 encode_and_write_single_event(buf, &name, tags.as_deref(), val, "s", None);
97 }
98 }
99 _ => {
100 emit!(StatsdInvalidMetricError {
101 value: metric.value(),
102 kind: metric.kind(),
103 });
104
105 return Ok(());
106 }
107 };
108
109 Ok(())
110 }
111}
112
113fn encode_tags(tags: &MetricTags) -> String {
117 let parts: Vec<_> = tags
118 .iter_all()
119 .map(|(name, tag_value)| match tag_value {
120 Some(value) => format!("{name}:{value}"),
121 None => name.to_owned(),
122 })
123 .collect();
124
125 parts.join(",")
127}
128
129fn encode_and_write_single_event<V: Display>(
130 buf: &mut BytesMut,
131 metric_name: &str,
132 metric_tags: Option<&str>,
133 val: V,
134 metric_type: &str,
135 sample_rate: Option<u32>,
136) {
137 let mut writer = buf.writer();
138
139 write!(&mut writer, "{metric_name}:{val}|{metric_type}").unwrap();
140
141 if let Some(sample_rate) = sample_rate
142 && sample_rate != 1
143 {
144 write!(&mut writer, "|@{}", 1.0 / f64::from(sample_rate)).unwrap();
145 };
146
147 if let Some(t) = metric_tags {
148 write!(&mut writer, "|#{t}").unwrap();
149 };
150
151 writeln!(&mut writer).unwrap();
152}
153
154#[cfg(test)]
155mod tests {
156 #[cfg(feature = "sources-statsd")]
157 use vector_lib::event::{Metric, MetricKind, MetricValue, StatisticKind};
158 use vector_lib::{
159 event::{MetricTags, metric::TagValue},
160 metric_tags,
161 };
162
163 use super::encode_tags;
164
165 #[cfg(feature = "sources-statsd")]
166 fn encode_metric(metric: &Metric) -> bytes::BytesMut {
167 use tokio_util::codec::Encoder;
168
169 let mut encoder = super::StatsdEncoder {
170 default_namespace: None,
171 };
172 let mut frame = bytes::BytesMut::new();
173 encoder.encode(metric, &mut frame).unwrap();
174 frame
175 }
176
177 #[cfg(feature = "sources-statsd")]
178 fn parse_encoded_metrics(metric: &[u8]) -> Vec<Metric> {
179 use crate::sources::statsd::{ConversionUnit, parser::Parser};
180 let statsd_parser = Parser::new(true, ConversionUnit::Seconds);
181
182 let s = std::str::from_utf8(metric).unwrap().trim();
183 s.split('\n')
184 .map(|packet| {
185 statsd_parser
186 .parse(packet)
187 .expect("should not fail to parse statsd packet")
188 })
189 .collect()
190 }
191
192 fn tags() -> MetricTags {
193 metric_tags!(
194 "normal_tag" => "value",
195 "multi_value" => "true",
196 "multi_value" => "false",
197 "multi_value" => TagValue::Bare,
198 "bare_tag" => TagValue::Bare,
199 )
200 }
201
202 #[test]
203 fn test_encode_tags() {
204 let actual = encode_tags(&tags());
205 let mut actual = actual.split(',').collect::<Vec<_>>();
206 actual.sort();
207
208 let mut expected =
209 "bare_tag,normal_tag:value,multi_value:true,multi_value:false,multi_value"
210 .split(',')
211 .collect::<Vec<_>>();
212 expected.sort();
213
214 assert_eq!(actual, expected);
215 }
216
217 #[test]
218 fn tags_order() {
219 assert_eq!(
220 &encode_tags(
221 &vec![
222 ("a", "value"),
223 ("b", "value"),
224 ("c", "value"),
225 ("d", "value"),
226 ("e", "value"),
227 ]
228 .into_iter()
229 .map(|(k, v)| (k.to_owned(), v.to_owned()))
230 .collect()
231 ),
232 "a:value,b:value,c:value,d:value,e:value"
233 );
234 }
235
236 #[cfg(feature = "sources-statsd")]
237 #[test]
238 fn test_encode_counter() {
239 let input = Metric::new(
240 "counter",
241 MetricKind::Incremental,
242 MetricValue::Counter { value: 1.5 },
243 )
244 .with_tags(Some(tags()));
245
246 let frame = encode_metric(&input);
247 let mut output = parse_encoded_metrics(&frame);
248 vector_lib::assert_event_data_eq!(input, output.remove(0));
249 }
250
251 #[cfg(feature = "sources-statsd")]
252 #[test]
253 fn test_encode_absolute_counter() {
254 let input = Metric::new(
255 "counter",
256 MetricKind::Absolute,
257 MetricValue::Counter { value: 1.5 },
258 );
259
260 let frame = encode_metric(&input);
261 assert_eq!("counter:1.5|c\n", std::str::from_utf8(&frame).unwrap());
264 }
265
266 #[cfg(feature = "sources-statsd")]
267 #[test]
268 fn test_encode_gauge() {
269 let input = Metric::new(
270 "gauge",
271 MetricKind::Incremental,
272 MetricValue::Gauge { value: -1.5 },
273 )
274 .with_tags(Some(tags()));
275
276 let frame = encode_metric(&input);
277 let mut output = parse_encoded_metrics(&frame);
278 vector_lib::assert_event_data_eq!(input, output.remove(0));
279 }
280
281 #[cfg(feature = "sources-statsd")]
282 #[test]
283 fn test_encode_absolute_gauge() {
284 let input = Metric::new(
285 "gauge",
286 MetricKind::Absolute,
287 MetricValue::Gauge { value: 1.5 },
288 )
289 .with_tags(Some(tags()));
290
291 let frame = encode_metric(&input);
292 let mut output = parse_encoded_metrics(&frame);
293 vector_lib::assert_event_data_eq!(input, output.remove(0));
294 }
295
296 #[cfg(feature = "sources-statsd")]
297 #[test]
298 fn test_encode_distribution() {
299 let input = Metric::new(
300 "distribution",
301 MetricKind::Incremental,
302 MetricValue::Distribution {
303 samples: vector_lib::samples![1.5 => 1, 1.5 => 1],
304 statistic: StatisticKind::Histogram,
305 },
306 )
307 .with_tags(Some(tags()));
308
309 let expected = Metric::new(
310 "distribution",
311 MetricKind::Incremental,
312 MetricValue::Distribution {
313 samples: vector_lib::samples![1.5 => 2],
314 statistic: StatisticKind::Histogram,
315 },
316 )
317 .with_tags(Some(tags()));
318
319 let frame = encode_metric(&input);
320 let mut output = parse_encoded_metrics(&frame);
321 vector_lib::assert_event_data_eq!(expected, output.remove(0));
322 }
323
324 #[cfg(feature = "sources-statsd")]
325 #[test]
326 fn test_encode_distribution_aggregated() {
327 let input = Metric::new(
328 "distribution",
329 MetricKind::Incremental,
330 MetricValue::Distribution {
331 samples: vector_lib::samples![2.5 => 1, 1.5 => 1, 1.5 => 1],
332 statistic: StatisticKind::Histogram,
333 },
334 )
335 .with_tags(Some(tags()));
336
337 let expected1 = Metric::new(
338 "distribution",
339 MetricKind::Incremental,
340 MetricValue::Distribution {
341 samples: vector_lib::samples![1.5 => 2],
342 statistic: StatisticKind::Histogram,
343 },
344 )
345 .with_tags(Some(tags()));
346 let expected2 = Metric::new(
347 "distribution",
348 MetricKind::Incremental,
349 MetricValue::Distribution {
350 samples: vector_lib::samples![2.5 => 1],
351 statistic: StatisticKind::Histogram,
352 },
353 )
354 .with_tags(Some(tags()));
355
356 let frame = encode_metric(&input);
357 let mut output = parse_encoded_metrics(&frame);
358 vector_lib::assert_event_data_eq!(expected1, output.remove(0));
359 vector_lib::assert_event_data_eq!(expected2, output.remove(0));
360 }
361
362 #[cfg(feature = "sources-statsd")]
363 #[test]
364 fn test_encode_set() {
365 let input = Metric::new(
366 "set",
367 MetricKind::Incremental,
368 MetricValue::Set {
369 values: vec!["abc".to_owned()].into_iter().collect(),
370 },
371 )
372 .with_tags(Some(tags()));
373
374 let frame = encode_metric(&input);
375 let mut output = parse_encoded_metrics(&frame);
376 vector_lib::assert_event_data_eq!(input, output.remove(0));
377 }
378}