codecs/encoding/format/
text.rs1use bytes::{BufMut, BytesMut};
2use tokio_util::codec::Encoder;
3use vector_config_macros::configurable_component;
4use vector_core::{config::DataType, event::Event, schema};
5
6use crate::{MetricTagValues, encoding::format::common::get_serializer_schema_requirement};
7
8#[configurable_component]
10#[derive(Debug, Clone, Default)]
11pub struct TextSerializerConfig {
12 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
17 pub metric_tag_values: MetricTagValues,
18}
19
20impl TextSerializerConfig {
21 pub const fn new(metric_tag_values: MetricTagValues) -> Self {
23 Self { metric_tag_values }
24 }
25
26 pub const fn build(&self) -> TextSerializer {
28 TextSerializer::new(self.metric_tag_values)
29 }
30
31 pub fn input_type(&self) -> DataType {
33 DataType::Log | DataType::Metric
34 }
35
36 pub fn schema_requirement(&self) -> schema::Requirement {
38 get_serializer_schema_requirement()
39 }
40}
41
42#[derive(Debug, Clone)]
48pub struct TextSerializer {
49 metric_tag_values: MetricTagValues,
50}
51
52impl TextSerializer {
53 pub const fn new(metric_tag_values: MetricTagValues) -> Self {
55 Self { metric_tag_values }
56 }
57}
58
59impl Encoder<Event> for TextSerializer {
60 type Error = vector_common::Error;
61
62 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
63 match event {
64 Event::Log(log) => {
65 if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
66 buffer.put(bytes);
67 }
68 }
69 Event::Metric(mut metric) => {
70 if self.metric_tag_values == MetricTagValues::Single {
71 metric.reduce_tags_to_single();
72 }
73 let bytes = metric.to_string();
74 buffer.put(bytes.as_ref());
75 }
76 Event::Trace(_) => {}
77 };
78
79 Ok(())
80 }
81}
82
83#[cfg(test)]
84mod tests {
85 use bytes::{Bytes, BytesMut};
86 use vector_core::{
87 event::{LogEvent, Metric, MetricKind, MetricValue},
88 metric_tags,
89 };
90
91 use super::*;
92
93 #[test]
94 fn serialize_log() {
95 let buffer = serialize(
96 TextSerializerConfig::default(),
97 Event::from(LogEvent::from_str_legacy("foo")),
98 );
99 assert_eq!(buffer, Bytes::from("foo"));
100 }
101
102 #[test]
103 fn serialize_metric() {
104 let buffer = serialize(
105 TextSerializerConfig::default(),
106 Event::Metric(Metric::new(
107 "users",
108 MetricKind::Incremental,
109 MetricValue::Set {
110 values: vec!["bob".into()].into_iter().collect(),
111 },
112 )),
113 );
114 assert_eq!(buffer, Bytes::from("users{} + bob"));
115 }
116
117 #[test]
118 fn serialize_metric_tags_full() {
119 let buffer = serialize(
120 TextSerializerConfig {
121 metric_tag_values: MetricTagValues::Full,
122 },
123 metric2(),
124 );
125 assert_eq!(
126 buffer,
127 Bytes::from(r#"counter{a="first",a,a="second"} + 1"#)
128 );
129 }
130
131 #[test]
132 fn serialize_metric_tags_single() {
133 let buffer = serialize(
134 TextSerializerConfig {
135 metric_tag_values: MetricTagValues::Single,
136 },
137 metric2(),
138 );
139 assert_eq!(buffer, Bytes::from(r#"counter{a="second"} + 1"#));
140 }
141
142 fn metric2() -> Event {
143 Event::Metric(
144 Metric::new(
145 "counter",
146 MetricKind::Incremental,
147 MetricValue::Counter { value: 1.0 },
148 )
149 .with_tags(Some(metric_tags! (
150 "a" => "first",
151 "a" => None,
152 "a" => "second",
153 ))),
154 )
155 }
156
157 fn serialize(config: TextSerializerConfig, input: Event) -> Bytes {
158 let mut buffer = BytesMut::new();
159 config.build().encode(input, &mut buffer).unwrap();
160 buffer.freeze()
161 }
162}