vector/sinks/statsd/
encoder.rs

1use std::{
2    fmt::Display,
3    io::{self, Write},
4};
5
6use bytes::{BufMut, BytesMut};
7use tokio_util::codec::Encoder;
8use vector_lib::event::{Metric, MetricKind, MetricTags, MetricValue, StatisticKind};
9
10use crate::{
11    internal_events::StatsdInvalidMetricError,
12    sinks::util::{buffer::metrics::compress_distribution, encode_namespace},
13};
14
15/// Error type for errors that can never happen, but for use with `Encoder`.
16///
17/// For the StatsD encoder, the encoding operation is infallible. However, as `Encoder<T>` requires
18/// that the associated error type can be created by `From<io::Error>`, we can't simply use
19/// `Infallible`. This type exists to bridge that gap, acting as a marker type for "we emit no
20/// errors" while supporting the trait bounds on `Encoder<T>::Error`.
21#[derive(Debug)]
22pub struct InfallibleIo;
23
24impl From<io::Error> for InfallibleIo {
25    fn from(_: io::Error) -> Self {
26        Self
27    }
28}
29
30#[derive(Debug, Clone)]
31pub(super) struct StatsdEncoder {
32    default_namespace: Option<String>,
33}
34
35impl StatsdEncoder {
36    /// Creates a new `StatsdEncoder` with the given default namespace, if any.
37    pub const fn new(default_namespace: Option<String>) -> Self {
38        Self { default_namespace }
39    }
40}
41
42impl<'a> Encoder<&'a Metric> for StatsdEncoder {
43    type Error = InfallibleIo;
44
45    fn encode(&mut self, metric: &'a Metric, buf: &mut BytesMut) -> Result<(), Self::Error> {
46        let namespace = metric.namespace().or(self.default_namespace.as_deref());
47        let name = encode_namespace(namespace, '.', metric.name());
48        let tags = metric.tags().map(encode_tags);
49
50        match metric.value() {
51            MetricValue::Counter { value } => {
52                encode_and_write_single_event(buf, &name, tags.as_deref(), value, "c", None);
53            }
54            MetricValue::Gauge { value } => {
55                match metric.kind() {
56                    MetricKind::Incremental => encode_and_write_single_event(
57                        buf,
58                        &name,
59                        tags.as_deref(),
60                        format!("{value:+}"),
61                        "g",
62                        None,
63                    ),
64                    MetricKind::Absolute => {
65                        encode_and_write_single_event(buf, &name, tags.as_deref(), value, "g", None)
66                    }
67                };
68            }
69            MetricValue::Distribution { samples, statistic } => {
70                let metric_type = match statistic {
71                    StatisticKind::Histogram => "h",
72                    StatisticKind::Summary => "d",
73                };
74
75                // TODO: This would actually be good to potentially add a helper combinator for, in the same vein as
76                // `SinkBuilderExt::normalized`, that provides a metric "optimizer" for doing these sorts of things. We
77                // don't actually compress distributions as-is in other metrics sinks unless they use the old-style
78                // approach coupled with `MetricBuffer`. While not every sink would benefit from this -- the
79                // `datadog_metrics` sink always converts distributions to sketches anyways, for example -- a lot of
80                // them could.
81                let mut samples = samples.clone();
82                let compressed_samples = compress_distribution(&mut samples);
83                for sample in compressed_samples {
84                    encode_and_write_single_event(
85                        buf,
86                        &name,
87                        tags.as_deref(),
88                        sample.value,
89                        metric_type,
90                        Some(sample.rate),
91                    );
92                }
93            }
94            MetricValue::Set { values } => {
95                for val in values {
96                    encode_and_write_single_event(buf, &name, tags.as_deref(), val, "s", None);
97                }
98            }
99            _ => {
100                emit!(StatsdInvalidMetricError {
101                    value: metric.value(),
102                    kind: metric.kind(),
103                });
104
105                return Ok(());
106            }
107        };
108
109        Ok(())
110    }
111}
112
113// Note that if multi-valued tags are present, this encoding may change the order from the input
114// event, since the tags with multiple values may not have been grouped together.
115// This is not an issue, but noting as it may be an observed behavior.
116fn encode_tags(tags: &MetricTags) -> String {
117    let parts: Vec<_> = tags
118        .iter_all()
119        .map(|(name, tag_value)| match tag_value {
120            Some(value) => format!("{name}:{value}"),
121            None => name.to_owned(),
122        })
123        .collect();
124
125    // `parts` is already sorted by key because of BTreeMap
126    parts.join(",")
127}
128
129fn encode_and_write_single_event<V: Display>(
130    buf: &mut BytesMut,
131    metric_name: &str,
132    metric_tags: Option<&str>,
133    val: V,
134    metric_type: &str,
135    sample_rate: Option<u32>,
136) {
137    let mut writer = buf.writer();
138
139    write!(&mut writer, "{metric_name}:{val}|{metric_type}").unwrap();
140
141    if let Some(sample_rate) = sample_rate {
142        if sample_rate != 1 {
143            write!(&mut writer, "|@{}", 1.0 / f64::from(sample_rate)).unwrap();
144        }
145    };
146
147    if let Some(t) = metric_tags {
148        write!(&mut writer, "|#{t}").unwrap();
149    };
150
151    writeln!(&mut writer).unwrap();
152}
153
154#[cfg(test)]
155mod tests {
156    use vector_lib::{
157        event::{metric::TagValue, MetricTags},
158        metric_tags,
159    };
160
161    use super::encode_tags;
162
163    #[cfg(feature = "sources-statsd")]
164    use vector_lib::event::{Metric, MetricKind, MetricValue, StatisticKind};
165
166    #[cfg(feature = "sources-statsd")]
167    fn encode_metric(metric: &Metric) -> bytes::BytesMut {
168        use tokio_util::codec::Encoder;
169
170        let mut encoder = super::StatsdEncoder {
171            default_namespace: None,
172        };
173        let mut frame = bytes::BytesMut::new();
174        encoder.encode(metric, &mut frame).unwrap();
175        frame
176    }
177
178    #[cfg(feature = "sources-statsd")]
179    fn parse_encoded_metrics(metric: &[u8]) -> Vec<Metric> {
180        use crate::sources::statsd::{parser::Parser, ConversionUnit};
181        let statsd_parser = Parser::new(true, ConversionUnit::Seconds);
182
183        let s = std::str::from_utf8(metric).unwrap().trim();
184        s.split('\n')
185            .map(|packet| {
186                statsd_parser
187                    .parse(packet)
188                    .expect("should not fail to parse statsd packet")
189            })
190            .collect()
191    }
192
193    fn tags() -> MetricTags {
194        metric_tags!(
195            "normal_tag" => "value",
196            "multi_value" => "true",
197            "multi_value" => "false",
198            "multi_value" => TagValue::Bare,
199            "bare_tag" => TagValue::Bare,
200        )
201    }
202
203    #[test]
204    fn test_encode_tags() {
205        let actual = encode_tags(&tags());
206        let mut actual = actual.split(',').collect::<Vec<_>>();
207        actual.sort();
208
209        let mut expected =
210            "bare_tag,normal_tag:value,multi_value:true,multi_value:false,multi_value"
211                .split(',')
212                .collect::<Vec<_>>();
213        expected.sort();
214
215        assert_eq!(actual, expected);
216    }
217
218    #[test]
219    fn tags_order() {
220        assert_eq!(
221            &encode_tags(
222                &vec![
223                    ("a", "value"),
224                    ("b", "value"),
225                    ("c", "value"),
226                    ("d", "value"),
227                    ("e", "value"),
228                ]
229                .into_iter()
230                .map(|(k, v)| (k.to_owned(), v.to_owned()))
231                .collect()
232            ),
233            "a:value,b:value,c:value,d:value,e:value"
234        );
235    }
236
237    #[cfg(feature = "sources-statsd")]
238    #[test]
239    fn test_encode_counter() {
240        let input = Metric::new(
241            "counter",
242            MetricKind::Incremental,
243            MetricValue::Counter { value: 1.5 },
244        )
245        .with_tags(Some(tags()));
246
247        let frame = encode_metric(&input);
248        let mut output = parse_encoded_metrics(&frame);
249        vector_lib::assert_event_data_eq!(input, output.remove(0));
250    }
251
252    #[cfg(feature = "sources-statsd")]
253    #[test]
254    fn test_encode_absolute_counter() {
255        let input = Metric::new(
256            "counter",
257            MetricKind::Absolute,
258            MetricValue::Counter { value: 1.5 },
259        );
260
261        let frame = encode_metric(&input);
262        // The statsd parser will parse the counter as Incremental,
263        // so we can't compare it with the parsed value.
264        assert_eq!("counter:1.5|c\n", std::str::from_utf8(&frame).unwrap());
265    }
266
267    #[cfg(feature = "sources-statsd")]
268    #[test]
269    fn test_encode_gauge() {
270        let input = Metric::new(
271            "gauge",
272            MetricKind::Incremental,
273            MetricValue::Gauge { value: -1.5 },
274        )
275        .with_tags(Some(tags()));
276
277        let frame = encode_metric(&input);
278        let mut output = parse_encoded_metrics(&frame);
279        vector_lib::assert_event_data_eq!(input, output.remove(0));
280    }
281
282    #[cfg(feature = "sources-statsd")]
283    #[test]
284    fn test_encode_absolute_gauge() {
285        let input = Metric::new(
286            "gauge",
287            MetricKind::Absolute,
288            MetricValue::Gauge { value: 1.5 },
289        )
290        .with_tags(Some(tags()));
291
292        let frame = encode_metric(&input);
293        let mut output = parse_encoded_metrics(&frame);
294        vector_lib::assert_event_data_eq!(input, output.remove(0));
295    }
296
297    #[cfg(feature = "sources-statsd")]
298    #[test]
299    fn test_encode_distribution() {
300        let input = Metric::new(
301            "distribution",
302            MetricKind::Incremental,
303            MetricValue::Distribution {
304                samples: vector_lib::samples![1.5 => 1, 1.5 => 1],
305                statistic: StatisticKind::Histogram,
306            },
307        )
308        .with_tags(Some(tags()));
309
310        let expected = Metric::new(
311            "distribution",
312            MetricKind::Incremental,
313            MetricValue::Distribution {
314                samples: vector_lib::samples![1.5 => 2],
315                statistic: StatisticKind::Histogram,
316            },
317        )
318        .with_tags(Some(tags()));
319
320        let frame = encode_metric(&input);
321        let mut output = parse_encoded_metrics(&frame);
322        vector_lib::assert_event_data_eq!(expected, output.remove(0));
323    }
324
325    #[cfg(feature = "sources-statsd")]
326    #[test]
327    fn test_encode_distribution_aggregated() {
328        let input = Metric::new(
329            "distribution",
330            MetricKind::Incremental,
331            MetricValue::Distribution {
332                samples: vector_lib::samples![2.5 => 1, 1.5 => 1, 1.5 => 1],
333                statistic: StatisticKind::Histogram,
334            },
335        )
336        .with_tags(Some(tags()));
337
338        let expected1 = Metric::new(
339            "distribution",
340            MetricKind::Incremental,
341            MetricValue::Distribution {
342                samples: vector_lib::samples![1.5 => 2],
343                statistic: StatisticKind::Histogram,
344            },
345        )
346        .with_tags(Some(tags()));
347        let expected2 = Metric::new(
348            "distribution",
349            MetricKind::Incremental,
350            MetricValue::Distribution {
351                samples: vector_lib::samples![2.5 => 1],
352                statistic: StatisticKind::Histogram,
353            },
354        )
355        .with_tags(Some(tags()));
356
357        let frame = encode_metric(&input);
358        let mut output = parse_encoded_metrics(&frame);
359        vector_lib::assert_event_data_eq!(expected1, output.remove(0));
360        vector_lib::assert_event_data_eq!(expected2, output.remove(0));
361    }
362
363    #[cfg(feature = "sources-statsd")]
364    #[test]
365    fn test_encode_set() {
366        let input = Metric::new(
367            "set",
368            MetricKind::Incremental,
369            MetricValue::Set {
370                values: vec!["abc".to_owned()].into_iter().collect(),
371            },
372        )
373        .with_tags(Some(tags()));
374
375        let frame = encode_metric(&input);
376        let mut output = parse_encoded_metrics(&frame);
377        vector_lib::assert_event_data_eq!(input, output.remove(0));
378    }
379}