codecs/encoding/format/
text.rs

1use crate::encoding::format::common::get_serializer_schema_requirement;
2use bytes::{BufMut, BytesMut};
3use tokio_util::codec::Encoder;
4use vector_config_macros::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7use crate::MetricTagValues;
8
9/// Config used to build a `TextSerializer`.
10#[configurable_component]
11#[derive(Debug, Clone, Default)]
12pub struct TextSerializerConfig {
13    /// Controls how metric tag values are encoded.
14    ///
15    /// When set to `single`, only the last non-bare value of tags are displayed with the
16    /// metric.  When set to `full`, all metric tags are exposed as separate assignments.
17    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
18    pub metric_tag_values: MetricTagValues,
19}
20
21impl TextSerializerConfig {
22    /// Creates a new `TextSerializerConfig`.
23    pub const fn new(metric_tag_values: MetricTagValues) -> Self {
24        Self { metric_tag_values }
25    }
26
27    /// Build the `TextSerializer` from this configuration.
28    pub const fn build(&self) -> TextSerializer {
29        TextSerializer::new(self.metric_tag_values)
30    }
31
32    /// The data type of events that are accepted by `TextSerializer`.
33    pub fn input_type(&self) -> DataType {
34        DataType::Log | DataType::Metric
35    }
36
37    /// The schema required by the serializer.
38    pub fn schema_requirement(&self) -> schema::Requirement {
39        get_serializer_schema_requirement()
40    }
41}
42
43/// Serializer that converts a log to bytes by extracting the message key, or converts a metric
44/// to bytes by calling its `Display` implementation.
45///
46/// This serializer exists to emulate the behavior of the `StandardEncoding::Text` for backwards
47/// compatibility, until it is phased out completely.
48#[derive(Debug, Clone)]
49pub struct TextSerializer {
50    metric_tag_values: MetricTagValues,
51}
52
53impl TextSerializer {
54    /// Creates a new `TextSerializer`.
55    pub const fn new(metric_tag_values: MetricTagValues) -> Self {
56        Self { metric_tag_values }
57    }
58}
59
60impl Encoder<Event> for TextSerializer {
61    type Error = vector_common::Error;
62
63    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
64        match event {
65            Event::Log(log) => {
66                if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
67                    buffer.put(bytes);
68                }
69            }
70            Event::Metric(mut metric) => {
71                if self.metric_tag_values == MetricTagValues::Single {
72                    metric.reduce_tags_to_single();
73                }
74                let bytes = metric.to_string();
75                buffer.put(bytes.as_ref());
76            }
77            Event::Trace(_) => {}
78        };
79
80        Ok(())
81    }
82}
83
84#[cfg(test)]
85mod tests {
86    use bytes::{Bytes, BytesMut};
87    use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue};
88    use vector_core::metric_tags;
89
90    use super::*;
91
92    #[test]
93    fn serialize_log() {
94        let buffer = serialize(
95            TextSerializerConfig::default(),
96            Event::from(LogEvent::from_str_legacy("foo")),
97        );
98        assert_eq!(buffer, Bytes::from("foo"));
99    }
100
101    #[test]
102    fn serialize_metric() {
103        let buffer = serialize(
104            TextSerializerConfig::default(),
105            Event::Metric(Metric::new(
106                "users",
107                MetricKind::Incremental,
108                MetricValue::Set {
109                    values: vec!["bob".into()].into_iter().collect(),
110                },
111            )),
112        );
113        assert_eq!(buffer, Bytes::from("users{} + bob"));
114    }
115
116    #[test]
117    fn serialize_metric_tags_full() {
118        let buffer = serialize(
119            TextSerializerConfig {
120                metric_tag_values: MetricTagValues::Full,
121            },
122            metric2(),
123        );
124        assert_eq!(
125            buffer,
126            Bytes::from(r#"counter{a="first",a,a="second"} + 1"#)
127        );
128    }
129
130    #[test]
131    fn serialize_metric_tags_single() {
132        let buffer = serialize(
133            TextSerializerConfig {
134                metric_tag_values: MetricTagValues::Single,
135            },
136            metric2(),
137        );
138        assert_eq!(buffer, Bytes::from(r#"counter{a="second"} + 1"#));
139    }
140
141    fn metric2() -> Event {
142        Event::Metric(
143            Metric::new(
144                "counter",
145                MetricKind::Incremental,
146                MetricValue::Counter { value: 1.0 },
147            )
148            .with_tags(Some(metric_tags! (
149                "a" => "first",
150                "a" => None,
151                "a" => "second",
152            ))),
153        )
154    }
155
156    fn serialize(config: TextSerializerConfig, input: Event) -> Bytes {
157        let mut buffer = BytesMut::new();
158        config.build().encode(input, &mut buffer).unwrap();
159        buffer.freeze()
160    }
161}