codecs/decoding/format/
bytes.rs1use bytes::Bytes;
2use lookup::OwnedTargetPath;
3use serde::{Deserialize, Serialize};
4use smallvec::{SmallVec, smallvec};
5use vector_core::{
6 config::{DataType, LogNamespace, log_schema},
7 event::{Event, LogEvent},
8 schema,
9 schema::meaning,
10};
11use vrl::value::Kind;
12
13use super::Deserializer;
14
15#[derive(Debug, Clone, Default, Deserialize, Serialize)]
17pub struct BytesDeserializerConfig;
18
19impl BytesDeserializerConfig {
20 pub const fn new() -> Self {
22 Self
23 }
24
25 pub fn build(&self) -> BytesDeserializer {
27 BytesDeserializer
28 }
29
30 pub fn output_type(&self) -> DataType {
32 DataType::Log
33 }
34
35 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
37 match log_namespace {
38 LogNamespace::Legacy => {
39 let definition = schema::Definition::empty_legacy_namespace();
40 if let Some(message_key) = log_schema().message_key() {
41 return definition.with_event_field(
42 message_key,
43 Kind::bytes(),
44 Some(meaning::MESSAGE),
45 );
46 }
47 definition
48 }
49 LogNamespace::Vector => {
50 schema::Definition::new_with_default_metadata(Kind::bytes(), [log_namespace])
51 .with_meaning(OwnedTargetPath::event_root(), "message")
52 }
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
62pub struct BytesDeserializer;
63
64impl BytesDeserializer {
65 pub fn parse_single(&self, bytes: Bytes, log_namespace: LogNamespace) -> LogEvent {
67 match log_namespace {
68 LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
69 LogNamespace::Legacy => {
70 let mut log = LogEvent::default();
71 log.maybe_insert(log_schema().message_key_target_path(), bytes);
72 log
73 }
74 }
75 }
76}
77
78impl Deserializer for BytesDeserializer {
79 fn parse(
80 &self,
81 bytes: Bytes,
82 log_namespace: LogNamespace,
83 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
84 let log = self.parse_single(bytes, log_namespace);
85 Ok(smallvec![log.into()])
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use vrl::value::Value;
92
93 use super::*;
94
95 #[test]
96 fn deserialize_bytes_legacy_namespace() {
97 let input = Bytes::from("foo");
98 let deserializer = BytesDeserializer;
99
100 let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
101 let mut events = events.into_iter();
102
103 {
104 let event = events.next().unwrap();
105 let log = event.as_log();
106 assert_eq!(*log.get_message().unwrap(), "foo".into());
107 }
108
109 assert_eq!(events.next(), None);
110 }
111
112 #[test]
113 fn deserialize_bytes_vector_namespace() {
114 let input = Bytes::from("foo");
115 let deserializer = BytesDeserializer;
116
117 let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
118 assert_eq!(events.len(), 1);
119
120 assert_eq!(events[0].as_log().get(".").unwrap(), &Value::from("foo"));
121 }
122}