codecs/encoding/format/
text.rs

1use bytes::{BufMut, BytesMut};
2use tokio_util::codec::Encoder;
3use vector_config_macros::configurable_component;
4use vector_core::{config::DataType, event::Event, schema};
5
6use crate::{MetricTagValues, encoding::format::common::get_serializer_schema_requirement};
7
8/// Config used to build a `TextSerializer`.
9#[configurable_component]
10#[derive(Debug, Clone, Default)]
11pub struct TextSerializerConfig {
12    /// Controls how metric tag values are encoded.
13    ///
14    /// When set to `single`, only the last non-bare value of tags are displayed with the
15    /// metric.  When set to `full`, all metric tags are exposed as separate assignments.
16    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
17    pub metric_tag_values: MetricTagValues,
18}
19
20impl TextSerializerConfig {
21    /// Creates a new `TextSerializerConfig`.
22    pub const fn new(metric_tag_values: MetricTagValues) -> Self {
23        Self { metric_tag_values }
24    }
25
26    /// Build the `TextSerializer` from this configuration.
27    pub const fn build(&self) -> TextSerializer {
28        TextSerializer::new(self.metric_tag_values)
29    }
30
31    /// The data type of events that are accepted by `TextSerializer`.
32    pub fn input_type(&self) -> DataType {
33        DataType::Log | DataType::Metric
34    }
35
36    /// The schema required by the serializer.
37    pub fn schema_requirement(&self) -> schema::Requirement {
38        get_serializer_schema_requirement()
39    }
40}
41
42/// Serializer that converts a log to bytes by extracting the message key, or converts a metric
43/// to bytes by calling its `Display` implementation.
44///
45/// This serializer exists to emulate the behavior of the `StandardEncoding::Text` for backwards
46/// compatibility, until it is phased out completely.
47#[derive(Debug, Clone)]
48pub struct TextSerializer {
49    metric_tag_values: MetricTagValues,
50}
51
52impl TextSerializer {
53    /// Creates a new `TextSerializer`.
54    pub const fn new(metric_tag_values: MetricTagValues) -> Self {
55        Self { metric_tag_values }
56    }
57}
58
59impl Encoder<Event> for TextSerializer {
60    type Error = vector_common::Error;
61
62    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
63        match event {
64            Event::Log(log) => {
65                if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
66                    buffer.put(bytes);
67                }
68            }
69            Event::Metric(mut metric) => {
70                if self.metric_tag_values == MetricTagValues::Single {
71                    metric.reduce_tags_to_single();
72                }
73                let bytes = metric.to_string();
74                buffer.put(bytes.as_ref());
75            }
76            Event::Trace(_) => {}
77        };
78
79        Ok(())
80    }
81}
82
83#[cfg(test)]
84mod tests {
85    use bytes::{Bytes, BytesMut};
86    use vector_core::{
87        event::{LogEvent, Metric, MetricKind, MetricValue},
88        metric_tags,
89    };
90
91    use super::*;
92
93    #[test]
94    fn serialize_log() {
95        let buffer = serialize(
96            TextSerializerConfig::default(),
97            Event::from(LogEvent::from_str_legacy("foo")),
98        );
99        assert_eq!(buffer, Bytes::from("foo"));
100    }
101
102    #[test]
103    fn serialize_metric() {
104        let buffer = serialize(
105            TextSerializerConfig::default(),
106            Event::Metric(Metric::new(
107                "users",
108                MetricKind::Incremental,
109                MetricValue::Set {
110                    values: vec!["bob".into()].into_iter().collect(),
111                },
112            )),
113        );
114        assert_eq!(buffer, Bytes::from("users{} + bob"));
115    }
116
117    #[test]
118    fn serialize_metric_tags_full() {
119        let buffer = serialize(
120            TextSerializerConfig {
121                metric_tag_values: MetricTagValues::Full,
122            },
123            metric2(),
124        );
125        assert_eq!(
126            buffer,
127            Bytes::from(r#"counter{a="first",a,a="second"} + 1"#)
128        );
129    }
130
131    #[test]
132    fn serialize_metric_tags_single() {
133        let buffer = serialize(
134            TextSerializerConfig {
135                metric_tag_values: MetricTagValues::Single,
136            },
137            metric2(),
138        );
139        assert_eq!(buffer, Bytes::from(r#"counter{a="second"} + 1"#));
140    }
141
142    fn metric2() -> Event {
143        Event::Metric(
144            Metric::new(
145                "counter",
146                MetricKind::Incremental,
147                MetricValue::Counter { value: 1.0 },
148            )
149            .with_tags(Some(metric_tags! (
150                "a" => "first",
151                "a" => None,
152                "a" => "second",
153            ))),
154        )
155    }
156
157    fn serialize(config: TextSerializerConfig, input: Event) -> Bytes {
158        let mut buffer = BytesMut::new();
159        config.build().encode(input, &mut buffer).unwrap();
160        buffer.freeze()
161    }
162}