codecs/encoding/format/
text.rs1use crate::encoding::format::common::get_serializer_schema_requirement;
2use bytes::{BufMut, BytesMut};
3use tokio_util::codec::Encoder;
4use vector_config_macros::configurable_component;
5use vector_core::{config::DataType, event::Event, schema};
6
7use crate::MetricTagValues;
8
9#[configurable_component]
11#[derive(Debug, Clone, Default)]
12pub struct TextSerializerConfig {
13 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
18 pub metric_tag_values: MetricTagValues,
19}
20
21impl TextSerializerConfig {
22 pub const fn new(metric_tag_values: MetricTagValues) -> Self {
24 Self { metric_tag_values }
25 }
26
27 pub const fn build(&self) -> TextSerializer {
29 TextSerializer::new(self.metric_tag_values)
30 }
31
32 pub fn input_type(&self) -> DataType {
34 DataType::Log | DataType::Metric
35 }
36
37 pub fn schema_requirement(&self) -> schema::Requirement {
39 get_serializer_schema_requirement()
40 }
41}
42
43#[derive(Debug, Clone)]
49pub struct TextSerializer {
50 metric_tag_values: MetricTagValues,
51}
52
53impl TextSerializer {
54 pub const fn new(metric_tag_values: MetricTagValues) -> Self {
56 Self { metric_tag_values }
57 }
58}
59
60impl Encoder<Event> for TextSerializer {
61 type Error = vector_common::Error;
62
63 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
64 match event {
65 Event::Log(log) => {
66 if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
67 buffer.put(bytes);
68 }
69 }
70 Event::Metric(mut metric) => {
71 if self.metric_tag_values == MetricTagValues::Single {
72 metric.reduce_tags_to_single();
73 }
74 let bytes = metric.to_string();
75 buffer.put(bytes.as_ref());
76 }
77 Event::Trace(_) => {}
78 };
79
80 Ok(())
81 }
82}
83
84#[cfg(test)]
85mod tests {
86 use bytes::{Bytes, BytesMut};
87 use vector_core::event::{LogEvent, Metric, MetricKind, MetricValue};
88 use vector_core::metric_tags;
89
90 use super::*;
91
92 #[test]
93 fn serialize_log() {
94 let buffer = serialize(
95 TextSerializerConfig::default(),
96 Event::from(LogEvent::from_str_legacy("foo")),
97 );
98 assert_eq!(buffer, Bytes::from("foo"));
99 }
100
101 #[test]
102 fn serialize_metric() {
103 let buffer = serialize(
104 TextSerializerConfig::default(),
105 Event::Metric(Metric::new(
106 "users",
107 MetricKind::Incremental,
108 MetricValue::Set {
109 values: vec!["bob".into()].into_iter().collect(),
110 },
111 )),
112 );
113 assert_eq!(buffer, Bytes::from("users{} + bob"));
114 }
115
116 #[test]
117 fn serialize_metric_tags_full() {
118 let buffer = serialize(
119 TextSerializerConfig {
120 metric_tag_values: MetricTagValues::Full,
121 },
122 metric2(),
123 );
124 assert_eq!(
125 buffer,
126 Bytes::from(r#"counter{a="first",a,a="second"} + 1"#)
127 );
128 }
129
130 #[test]
131 fn serialize_metric_tags_single() {
132 let buffer = serialize(
133 TextSerializerConfig {
134 metric_tag_values: MetricTagValues::Single,
135 },
136 metric2(),
137 );
138 assert_eq!(buffer, Bytes::from(r#"counter{a="second"} + 1"#));
139 }
140
141 fn metric2() -> Event {
142 Event::Metric(
143 Metric::new(
144 "counter",
145 MetricKind::Incremental,
146 MetricValue::Counter { value: 1.0 },
147 )
148 .with_tags(Some(metric_tags! (
149 "a" => "first",
150 "a" => None,
151 "a" => "second",
152 ))),
153 )
154 }
155
156 fn serialize(config: TextSerializerConfig, input: Event) -> Bytes {
157 let mut buffer = BytesMut::new();
158 config.build().encode(input, &mut buffer).unwrap();
159 buffer.freeze()
160 }
161}