vector/sinks/statsd/
encoder.rs

1use std::{
2    fmt::Display,
3    io::{self, Write},
4};
5
6use bytes::{BufMut, BytesMut};
7use tokio_util::codec::Encoder;
8use vector_lib::event::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
9
10use crate::{
11    internal_events::StatsdInvalidMetricError,
12    sinks::util::{buffer::metrics::compress_distribution, encode_namespace},
13};
14
15/// Error type for errors that can never happen, but for use with `Encoder`.
16///
17/// For the StatsD encoder, the encoding operation is infallible. However, as `Encoder<T>` requires
18/// that the associated error type can be created by `From<io::Error>`, we can't simply use
19/// `Infallible`. This type exists to bridge that gap, acting as a marker type for "we emit no
20/// errors" while supporting the trait bounds on `Encoder<T>::Error`.
21#[derive(Debug)]
22pub struct InfallibleIo;
23
24impl From<io::Error> for InfallibleIo {
25    fn from(_: io::Error) -> Self {
26        Self
27    }
28}
29
30#[derive(Debug, Clone)]
31pub(super) struct StatsdEncoder {
32    default_namespace: Option<String>,
33}
34
35impl StatsdEncoder {
36    /// Creates a new `StatsdEncoder` with the given default namespace, if any.
37    pub const fn new(default_namespace: Option<String>) -> Self {
38        Self { default_namespace }
39    }
40}
41
42impl<'a> Encoder<&'a Metric> for StatsdEncoder {
43    type Error = InfallibleIo;
44
45    fn encode(&mut self, metric: &'a Metric, buf: &mut BytesMut) -> Result<(), Self::Error> {
46        let namespace = metric.namespace().or(self.default_namespace.as_deref());
47        let name = encode_namespace(namespace, '.', metric.name());
48        let tags = metric.tags().map(encode_tags);
49
50        match metric.value() {
51            MetricValue::Counter { value } => {
52                encode_and_write_single_event(buf, &name, tags.as_deref(), value, "c", None);
53            }
54            MetricValue::Gauge { value } => {
55                match metric.kind() {
56                    MetricKind::Incremental => encode_and_write_single_event(
57                        buf,
58                        &name,
59                        tags.as_deref(),
60                        format!("{value:+}"),
61                        "g",
62                        None,
63                    ),
64                    MetricKind::Absolute => {
65                        encode_and_write_single_event(buf, &name, tags.as_deref(), value, "g", None)
66                    }
67                };
68            }
69            MetricValue::Distribution { samples, statistic } => {
70                let metric_type = match statistic {
71                    StatisticKind::Histogram => "h",
72                    StatisticKind::Summary => "d",
73                };
74
75                // TODO: This would actually be good to potentially add a helper combinator for, in the same vein as
76                // `SinkBuilderExt::normalized`, that provides a metric "optimizer" for doing these sorts of things. We
77                // don't actually compress distributions as-is in other metrics sinks unless they use the old-style
78                // approach coupled with `MetricBuffer`. While not every sink would benefit from this -- the
79                // `datadog_metrics` sink always converts distributions to sketches anyways, for example -- a lot of
80                // them could.
81                let mut samples = samples.clone();
82                let compressed_samples = compress_distribution(&mut samples);
83                for sample in compressed_samples {
84                    encode_and_write_single_event(
85                        buf,
86                        &name,
87                        tags.as_deref(),
88                        sample.value,
89                        metric_type,
90                        Some(sample.rate),
91                    );
92                }
93            }
94            MetricValue::Set { values } => {
95                for val in values {
96                    encode_and_write_single_event(buf, &name, tags.as_deref(), val, "s", None);
97                }
98            }
99            _ => {
100                emit!(StatsdInvalidMetricError {
101                    value: metric.value(),
102                    kind: metric.kind(),
103                });
104
105                return Ok(());
106            }
107        };
108
109        Ok(())
110    }
111}
112
113// Note that if multi-valued tags are present, this encoding may change the order from the input
114// event, since the tags with multiple values may not have been grouped together.
115// This is not an issue, but noting as it may be an observed behavior.
116fn encode_tags(tags: &MetricTags) -> String {
117    let parts: Vec<_> = tags
118        .iter_all()
119        .map(|(name, tag_value)| match tag_value {
120            Some(value) => format!("{name}:{value}"),
121            None => name.to_owned(),
122        })
123        .collect();
124
125    // `parts` is already sorted by key because of BTreeMap
126    parts.join(",")
127}
128
129fn encode_and_write_single_event<V: Display>(
130    buf: &mut BytesMut,
131    metric_name: &str,
132    metric_tags: Option<&str>,
133    val: V,
134    metric_type: &str,
135    sample_rate: Option<u32>,
136) {
137    let mut writer = buf.writer();
138
139    write!(&mut writer, "{metric_name}:{val}|{metric_type}").unwrap();
140
141    if let Some(sample_rate) = sample_rate
142        && sample_rate != 1
143    {
144        write!(&mut writer, "|@{}", 1.0 / f64::from(sample_rate)).unwrap();
145    };
146
147    if let Some(t) = metric_tags {
148        write!(&mut writer, "|#{t}").unwrap();
149    };
150
151    writeln!(&mut writer).unwrap();
152}
153
154#[cfg(test)]
155mod tests {
156    #[cfg(feature = "sources-statsd")]
157    use vector_lib::event::{Metric, MetricKind, MetricValue, StatisticKind};
158    use vector_lib::{
159        event::{MetricTags, metric::TagValue},
160        metric_tags,
161    };
162
163    use super::encode_tags;
164
165    #[cfg(feature = "sources-statsd")]
166    fn encode_metric(metric: &Metric) -> bytes::BytesMut {
167        use tokio_util::codec::Encoder;
168
169        let mut encoder = super::StatsdEncoder {
170            default_namespace: None,
171        };
172        let mut frame = bytes::BytesMut::new();
173        encoder.encode(metric, &mut frame).unwrap();
174        frame
175    }
176
177    #[cfg(feature = "sources-statsd")]
178    fn parse_encoded_metrics(metric: &[u8]) -> Vec<Metric> {
179        use crate::sources::statsd::{ConversionUnit, parser::Parser};
180        let statsd_parser = Parser::new(true, ConversionUnit::Seconds);
181
182        let s = std::str::from_utf8(metric).unwrap().trim();
183        s.split('\n')
184            .map(|packet| {
185                statsd_parser
186                    .parse(packet)
187                    .expect("should not fail to parse statsd packet")
188            })
189            .collect()
190    }
191
192    fn tags() -> MetricTags {
193        metric_tags!(
194            "normal_tag" => "value",
195            "multi_value" => "true",
196            "multi_value" => "false",
197            "multi_value" => TagValue::Bare,
198            "bare_tag" => TagValue::Bare,
199        )
200    }
201
202    #[test]
203    fn test_encode_tags() {
204        let actual = encode_tags(&tags());
205        let mut actual = actual.split(',').collect::<Vec<_>>();
206        actual.sort();
207
208        let mut expected =
209            "bare_tag,normal_tag:value,multi_value:true,multi_value:false,multi_value"
210                .split(',')
211                .collect::<Vec<_>>();
212        expected.sort();
213
214        assert_eq!(actual, expected);
215    }
216
217    #[test]
218    fn tags_order() {
219        assert_eq!(
220            &encode_tags(
221                &vec![
222                    ("a", "value"),
223                    ("b", "value"),
224                    ("c", "value"),
225                    ("d", "value"),
226                    ("e", "value"),
227                ]
228                .into_iter()
229                .map(|(k, v)| (k.to_owned(), v.to_owned()))
230                .collect()
231            ),
232            "a:value,b:value,c:value,d:value,e:value"
233        );
234    }
235
236    #[cfg(feature = "sources-statsd")]
237    #[test]
238    fn test_encode_counter() {
239        let input = Metric::new(
240            "counter",
241            MetricKind::Incremental,
242            MetricValue::Counter { value: 1.5 },
243        )
244        .with_tags(Some(tags()));
245
246        let frame = encode_metric(&input);
247        let mut output = parse_encoded_metrics(&frame);
248        vector_lib::assert_event_data_eq!(input, output.remove(0));
249    }
250
251    #[cfg(feature = "sources-statsd")]
252    #[test]
253    fn test_encode_absolute_counter() {
254        let input = Metric::new(
255            "counter",
256            MetricKind::Absolute,
257            MetricValue::Counter { value: 1.5 },
258        );
259
260        let frame = encode_metric(&input);
261        // The statsd parser will parse the counter as Incremental,
262        // so we can't compare it with the parsed value.
263        assert_eq!("counter:1.5|c\n", std::str::from_utf8(&frame).unwrap());
264    }
265
266    #[cfg(feature = "sources-statsd")]
267    #[test]
268    fn test_encode_gauge() {
269        let input = Metric::new(
270            "gauge",
271            MetricKind::Incremental,
272            MetricValue::Gauge { value: -1.5 },
273        )
274        .with_tags(Some(tags()));
275
276        let frame = encode_metric(&input);
277        let mut output = parse_encoded_metrics(&frame);
278        vector_lib::assert_event_data_eq!(input, output.remove(0));
279    }
280
281    #[cfg(feature = "sources-statsd")]
282    #[test]
283    fn test_encode_absolute_gauge() {
284        let input = Metric::new(
285            "gauge",
286            MetricKind::Absolute,
287            MetricValue::Gauge { value: 1.5 },
288        )
289        .with_tags(Some(tags()));
290
291        let frame = encode_metric(&input);
292        let mut output = parse_encoded_metrics(&frame);
293        vector_lib::assert_event_data_eq!(input, output.remove(0));
294    }
295
296    #[cfg(feature = "sources-statsd")]
297    #[test]
298    fn test_encode_distribution() {
299        let input = Metric::new(
300            "distribution",
301            MetricKind::Incremental,
302            MetricValue::Distribution {
303                samples: vector_lib::samples![1.5 => 1, 1.5 => 1],
304                statistic: StatisticKind::Histogram,
305            },
306        )
307        .with_tags(Some(tags()));
308
309        let expected = Metric::new(
310            "distribution",
311            MetricKind::Incremental,
312            MetricValue::Distribution {
313                samples: vector_lib::samples![1.5 => 2],
314                statistic: StatisticKind::Histogram,
315            },
316        )
317        .with_tags(Some(tags()));
318
319        let frame = encode_metric(&input);
320        let mut output = parse_encoded_metrics(&frame);
321        vector_lib::assert_event_data_eq!(expected, output.remove(0));
322    }
323
324    #[cfg(feature = "sources-statsd")]
325    #[test]
326    fn test_encode_distribution_aggregated() {
327        let input = Metric::new(
328            "distribution",
329            MetricKind::Incremental,
330            MetricValue::Distribution {
331                samples: vector_lib::samples![2.5 => 1, 1.5 => 1, 1.5 => 1],
332                statistic: StatisticKind::Histogram,
333            },
334        )
335        .with_tags(Some(tags()));
336
337        let expected1 = Metric::new(
338            "distribution",
339            MetricKind::Incremental,
340            MetricValue::Distribution {
341                samples: vector_lib::samples![1.5 => 2],
342                statistic: StatisticKind::Histogram,
343            },
344        )
345        .with_tags(Some(tags()));
346        let expected2 = Metric::new(
347            "distribution",
348            MetricKind::Incremental,
349            MetricValue::Distribution {
350                samples: vector_lib::samples![2.5 => 1],
351                statistic: StatisticKind::Histogram,
352            },
353        )
354        .with_tags(Some(tags()));
355
356        let frame = encode_metric(&input);
357        let mut output = parse_encoded_metrics(&frame);
358        vector_lib::assert_event_data_eq!(expected1, output.remove(0));
359        vector_lib::assert_event_data_eq!(expected2, output.remove(0));
360    }
361
362    #[cfg(feature = "sources-statsd")]
363    #[test]
364    fn test_encode_set() {
365        let input = Metric::new(
366            "set",
367            MetricKind::Incremental,
368            MetricValue::Set {
369                values: vec!["abc".to_owned()].into_iter().collect(),
370            },
371        )
372        .with_tags(Some(tags()));
373
374        let frame = encode_metric(&input);
375        let mut output = parse_encoded_metrics(&frame);
376        vector_lib::assert_event_data_eq!(input, output.remove(0));
377    }
378}