1use std::{
2 fmt::Display,
3 io::{self, Write},
4};
5
6use bytes::{BufMut, BytesMut};
7use tokio_util::codec::Encoder;
8use vector_lib::event::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
9
10use crate::{
11 internal_events::StatsdInvalidMetricError,
12 sinks::util::{buffer::metrics::compress_distribution, encode_namespace},
13};
14
15#[derive(Debug)]
22pub struct InfallibleIo;
23
24impl From<io::Error> for InfallibleIo {
25 fn from(_: io::Error) -> Self {
26 Self
27 }
28}
29
30#[derive(Debug, Clone)]
31pub(super) struct StatsdEncoder {
32 default_namespace: Option<String>,
33}
34
35impl StatsdEncoder {
36 pub const fn new(default_namespace: Option<String>) -> Self {
38 Self { default_namespace }
39 }
40}
41
42impl<'a> Encoder<&'a Metric> for StatsdEncoder {
43 type Error = InfallibleIo;
44
45 fn encode(&mut self, metric: &'a Metric, buf: &mut BytesMut) -> Result<(), Self::Error> {
46 let namespace = metric.namespace().or(self.default_namespace.as_deref());
47 let name = encode_namespace(namespace, '.', metric.name());
48 let tags = metric.tags().map(encode_tags);
49
50 match metric.value() {
51 MetricValue::Counter { value } => {
52 encode_and_write_single_event(buf, &name, tags.as_deref(), value, "c", None);
53 }
54 MetricValue::Gauge { value } => {
55 match metric.kind() {
56 MetricKind::Incremental => encode_and_write_single_event(
57 buf,
58 &name,
59 tags.as_deref(),
60 format!("{value:+}"),
61 "g",
62 None,
63 ),
64 MetricKind::Absolute => {
65 encode_and_write_single_event(buf, &name, tags.as_deref(), value, "g", None)
66 }
67 };
68 }
69 MetricValue::Distribution { samples, statistic } => {
70 let metric_type = match statistic {
71 StatisticKind::Histogram => "h",
72 StatisticKind::Summary => "d",
73 };
74
75 let mut samples = samples.clone();
82 let compressed_samples = compress_distribution(&mut samples);
83 for sample in compressed_samples {
84 encode_and_write_single_event(
85 buf,
86 &name,
87 tags.as_deref(),
88 sample.value,
89 metric_type,
90 Some(sample.rate),
91 );
92 }
93 }
94 MetricValue::Set { values } => {
95 for val in values {
96 encode_and_write_single_event(buf, &name, tags.as_deref(), val, "s", None);
97 }
98 }
99 _ => {
100 emit!(StatsdInvalidMetricError {
101 value: metric.value(),
102 kind: metric.kind(),
103 });
104
105 return Ok(());
106 }
107 };
108
109 Ok(())
110 }
111}
112
113fn encode_tags(tags: &MetricTags) -> String {
117 let parts: Vec<_> = tags
118 .iter_all()
119 .map(|(name, tag_value)| match tag_value {
120 Some(value) => format!("{name}:{value}"),
121 None => name.to_owned(),
122 })
123 .collect();
124
125 parts.join(",")
127}
128
129fn encode_and_write_single_event<V: Display>(
130 buf: &mut BytesMut,
131 metric_name: &str,
132 metric_tags: Option<&str>,
133 val: V,
134 metric_type: &str,
135 sample_rate: Option<u32>,
136) {
137 let mut writer = buf.writer();
138
139 write!(&mut writer, "{metric_name}:{val}|{metric_type}").unwrap();
140
141 if let Some(sample_rate) = sample_rate {
142 if sample_rate != 1 {
143 write!(&mut writer, "|@{}", 1.0 / f64::from(sample_rate)).unwrap();
144 }
145 };
146
147 if let Some(t) = metric_tags {
148 write!(&mut writer, "|#{t}").unwrap();
149 };
150
151 writeln!(&mut writer).unwrap();
152}
153
154#[cfg(test)]
155mod tests {
156 use vector_lib::{
157 event::{metric::TagValue, MetricTags},
158 metric_tags,
159 };
160
161 use super::encode_tags;
162
163 #[cfg(feature = "sources-statsd")]
164 use vector_lib::event::{Metric, MetricKind, MetricValue, StatisticKind};
165
166 #[cfg(feature = "sources-statsd")]
167 fn encode_metric(metric: &Metric) -> bytes::BytesMut {
168 use tokio_util::codec::Encoder;
169
170 let mut encoder = super::StatsdEncoder {
171 default_namespace: None,
172 };
173 let mut frame = bytes::BytesMut::new();
174 encoder.encode(metric, &mut frame).unwrap();
175 frame
176 }
177
178 #[cfg(feature = "sources-statsd")]
179 fn parse_encoded_metrics(metric: &[u8]) -> Vec<Metric> {
180 use crate::sources::statsd::{parser::Parser, ConversionUnit};
181 let statsd_parser = Parser::new(true, ConversionUnit::Seconds);
182
183 let s = std::str::from_utf8(metric).unwrap().trim();
184 s.split('\n')
185 .map(|packet| {
186 statsd_parser
187 .parse(packet)
188 .expect("should not fail to parse statsd packet")
189 })
190 .collect()
191 }
192
193 fn tags() -> MetricTags {
194 metric_tags!(
195 "normal_tag" => "value",
196 "multi_value" => "true",
197 "multi_value" => "false",
198 "multi_value" => TagValue::Bare,
199 "bare_tag" => TagValue::Bare,
200 )
201 }
202
203 #[test]
204 fn test_encode_tags() {
205 let actual = encode_tags(&tags());
206 let mut actual = actual.split(',').collect::<Vec<_>>();
207 actual.sort();
208
209 let mut expected =
210 "bare_tag,normal_tag:value,multi_value:true,multi_value:false,multi_value"
211 .split(',')
212 .collect::<Vec<_>>();
213 expected.sort();
214
215 assert_eq!(actual, expected);
216 }
217
218 #[test]
219 fn tags_order() {
220 assert_eq!(
221 &encode_tags(
222 &vec![
223 ("a", "value"),
224 ("b", "value"),
225 ("c", "value"),
226 ("d", "value"),
227 ("e", "value"),
228 ]
229 .into_iter()
230 .map(|(k, v)| (k.to_owned(), v.to_owned()))
231 .collect()
232 ),
233 "a:value,b:value,c:value,d:value,e:value"
234 );
235 }
236
237 #[cfg(feature = "sources-statsd")]
238 #[test]
239 fn test_encode_counter() {
240 let input = Metric::new(
241 "counter",
242 MetricKind::Incremental,
243 MetricValue::Counter { value: 1.5 },
244 )
245 .with_tags(Some(tags()));
246
247 let frame = encode_metric(&input);
248 let mut output = parse_encoded_metrics(&frame);
249 vector_lib::assert_event_data_eq!(input, output.remove(0));
250 }
251
252 #[cfg(feature = "sources-statsd")]
253 #[test]
254 fn test_encode_absolute_counter() {
255 let input = Metric::new(
256 "counter",
257 MetricKind::Absolute,
258 MetricValue::Counter { value: 1.5 },
259 );
260
261 let frame = encode_metric(&input);
262 assert_eq!("counter:1.5|c\n", std::str::from_utf8(&frame).unwrap());
265 }
266
267 #[cfg(feature = "sources-statsd")]
268 #[test]
269 fn test_encode_gauge() {
270 let input = Metric::new(
271 "gauge",
272 MetricKind::Incremental,
273 MetricValue::Gauge { value: -1.5 },
274 )
275 .with_tags(Some(tags()));
276
277 let frame = encode_metric(&input);
278 let mut output = parse_encoded_metrics(&frame);
279 vector_lib::assert_event_data_eq!(input, output.remove(0));
280 }
281
282 #[cfg(feature = "sources-statsd")]
283 #[test]
284 fn test_encode_absolute_gauge() {
285 let input = Metric::new(
286 "gauge",
287 MetricKind::Absolute,
288 MetricValue::Gauge { value: 1.5 },
289 )
290 .with_tags(Some(tags()));
291
292 let frame = encode_metric(&input);
293 let mut output = parse_encoded_metrics(&frame);
294 vector_lib::assert_event_data_eq!(input, output.remove(0));
295 }
296
297 #[cfg(feature = "sources-statsd")]
298 #[test]
299 fn test_encode_distribution() {
300 let input = Metric::new(
301 "distribution",
302 MetricKind::Incremental,
303 MetricValue::Distribution {
304 samples: vector_lib::samples![1.5 => 1, 1.5 => 1],
305 statistic: StatisticKind::Histogram,
306 },
307 )
308 .with_tags(Some(tags()));
309
310 let expected = Metric::new(
311 "distribution",
312 MetricKind::Incremental,
313 MetricValue::Distribution {
314 samples: vector_lib::samples![1.5 => 2],
315 statistic: StatisticKind::Histogram,
316 },
317 )
318 .with_tags(Some(tags()));
319
320 let frame = encode_metric(&input);
321 let mut output = parse_encoded_metrics(&frame);
322 vector_lib::assert_event_data_eq!(expected, output.remove(0));
323 }
324
325 #[cfg(feature = "sources-statsd")]
326 #[test]
327 fn test_encode_distribution_aggregated() {
328 let input = Metric::new(
329 "distribution",
330 MetricKind::Incremental,
331 MetricValue::Distribution {
332 samples: vector_lib::samples![2.5 => 1, 1.5 => 1, 1.5 => 1],
333 statistic: StatisticKind::Histogram,
334 },
335 )
336 .with_tags(Some(tags()));
337
338 let expected1 = Metric::new(
339 "distribution",
340 MetricKind::Incremental,
341 MetricValue::Distribution {
342 samples: vector_lib::samples![1.5 => 2],
343 statistic: StatisticKind::Histogram,
344 },
345 )
346 .with_tags(Some(tags()));
347 let expected2 = Metric::new(
348 "distribution",
349 MetricKind::Incremental,
350 MetricValue::Distribution {
351 samples: vector_lib::samples![2.5 => 1],
352 statistic: StatisticKind::Histogram,
353 },
354 )
355 .with_tags(Some(tags()));
356
357 let frame = encode_metric(&input);
358 let mut output = parse_encoded_metrics(&frame);
359 vector_lib::assert_event_data_eq!(expected1, output.remove(0));
360 vector_lib::assert_event_data_eq!(expected2, output.remove(0));
361 }
362
363 #[cfg(feature = "sources-statsd")]
364 #[test]
365 fn test_encode_set() {
366 let input = Metric::new(
367 "set",
368 MetricKind::Incremental,
369 MetricValue::Set {
370 values: vec!["abc".to_owned()].into_iter().collect(),
371 },
372 )
373 .with_tags(Some(tags()));
374
375 let frame = encode_metric(&input);
376 let mut output = parse_encoded_metrics(&frame);
377 vector_lib::assert_event_data_eq!(input, output.remove(0));
378 }
379}