codecs/decoding/format/
bytes.rs1use bytes::Bytes;
2use lookup::OwnedTargetPath;
3use serde::{Deserialize, Serialize};
4use smallvec::{smallvec, SmallVec};
5use vector_core::config::LogNamespace;
6use vector_core::schema::meaning;
7use vector_core::{
8 config::{log_schema, DataType},
9 event::{Event, LogEvent},
10 schema,
11};
12use vrl::value::Kind;
13
14use super::Deserializer;
15
16#[derive(Debug, Clone, Default, Deserialize, Serialize)]
18pub struct BytesDeserializerConfig;
19
20impl BytesDeserializerConfig {
21 pub const fn new() -> Self {
23 Self
24 }
25
26 pub fn build(&self) -> BytesDeserializer {
28 BytesDeserializer
29 }
30
31 pub fn output_type(&self) -> DataType {
33 DataType::Log
34 }
35
36 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
38 match log_namespace {
39 LogNamespace::Legacy => {
40 let definition = schema::Definition::empty_legacy_namespace();
41 if let Some(message_key) = log_schema().message_key() {
42 return definition.with_event_field(
43 message_key,
44 Kind::bytes(),
45 Some(meaning::MESSAGE),
46 );
47 }
48 definition
49 }
50 LogNamespace::Vector => {
51 schema::Definition::new_with_default_metadata(Kind::bytes(), [log_namespace])
52 .with_meaning(OwnedTargetPath::event_root(), "message")
53 }
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
63pub struct BytesDeserializer;
64
65impl BytesDeserializer {
66 pub fn parse_single(&self, bytes: Bytes, log_namespace: LogNamespace) -> LogEvent {
68 match log_namespace {
69 LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
70 LogNamespace::Legacy => {
71 let mut log = LogEvent::default();
72 log.maybe_insert(log_schema().message_key_target_path(), bytes);
73 log
74 }
75 }
76 }
77}
78
79impl Deserializer for BytesDeserializer {
80 fn parse(
81 &self,
82 bytes: Bytes,
83 log_namespace: LogNamespace,
84 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
85 let log = self.parse_single(bytes, log_namespace);
86 Ok(smallvec![log.into()])
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use vrl::value::Value;
94
95 #[test]
96 fn deserialize_bytes_legacy_namespace() {
97 let input = Bytes::from("foo");
98 let deserializer = BytesDeserializer;
99
100 let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
101 let mut events = events.into_iter();
102
103 {
104 let event = events.next().unwrap();
105 let log = event.as_log();
106 assert_eq!(*log.get_message().unwrap(), "foo".into());
107 }
108
109 assert_eq!(events.next(), None);
110 }
111
112 #[test]
113 fn deserialize_bytes_vector_namespace() {
114 let input = Bytes::from("foo");
115 let deserializer = BytesDeserializer;
116
117 let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
118 assert_eq!(events.len(), 1);
119
120 assert_eq!(events[0].as_log().get(".").unwrap(), &Value::from("foo"));
121 }
122}