codecs/decoding/format/
bytes.rs

1use bytes::Bytes;
2use lookup::OwnedTargetPath;
3use serde::{Deserialize, Serialize};
4use smallvec::{smallvec, SmallVec};
5use vector_core::config::LogNamespace;
6use vector_core::schema::meaning;
7use vector_core::{
8    config::{log_schema, DataType},
9    event::{Event, LogEvent},
10    schema,
11};
12use vrl::value::Kind;
13
14use super::Deserializer;
15
16/// Config used to build a `BytesDeserializer`.
17#[derive(Debug, Clone, Default, Deserialize, Serialize)]
18pub struct BytesDeserializerConfig;
19
20impl BytesDeserializerConfig {
21    /// Creates a new `BytesDeserializerConfig`.
22    pub const fn new() -> Self {
23        Self
24    }
25
26    /// Build the `BytesDeserializer` from this configuration.
27    pub fn build(&self) -> BytesDeserializer {
28        BytesDeserializer
29    }
30
31    /// Return the type of event build by this deserializer.
32    pub fn output_type(&self) -> DataType {
33        DataType::Log
34    }
35
36    /// The schema produced by the deserializer.
37    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
38        match log_namespace {
39            LogNamespace::Legacy => {
40                let definition = schema::Definition::empty_legacy_namespace();
41                if let Some(message_key) = log_schema().message_key() {
42                    return definition.with_event_field(
43                        message_key,
44                        Kind::bytes(),
45                        Some(meaning::MESSAGE),
46                    );
47                }
48                definition
49            }
50            LogNamespace::Vector => {
51                schema::Definition::new_with_default_metadata(Kind::bytes(), [log_namespace])
52                    .with_meaning(OwnedTargetPath::event_root(), "message")
53            }
54        }
55    }
56}
57
58/// Deserializer that converts bytes to an `Event`.
59///
60/// This deserializer can be considered as the no-op action for input where no
61/// further decoding has been specified.
62#[derive(Debug, Clone)]
63pub struct BytesDeserializer;
64
65impl BytesDeserializer {
66    /// Deserializes the given bytes, which will always produce a single `LogEvent`.
67    pub fn parse_single(&self, bytes: Bytes, log_namespace: LogNamespace) -> LogEvent {
68        match log_namespace {
69            LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
70            LogNamespace::Legacy => {
71                let mut log = LogEvent::default();
72                log.maybe_insert(log_schema().message_key_target_path(), bytes);
73                log
74            }
75        }
76    }
77}
78
79impl Deserializer for BytesDeserializer {
80    fn parse(
81        &self,
82        bytes: Bytes,
83        log_namespace: LogNamespace,
84    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
85        let log = self.parse_single(bytes, log_namespace);
86        Ok(smallvec![log.into()])
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93    use vrl::value::Value;
94
95    #[test]
96    fn deserialize_bytes_legacy_namespace() {
97        let input = Bytes::from("foo");
98        let deserializer = BytesDeserializer;
99
100        let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
101        let mut events = events.into_iter();
102
103        {
104            let event = events.next().unwrap();
105            let log = event.as_log();
106            assert_eq!(*log.get_message().unwrap(), "foo".into());
107        }
108
109        assert_eq!(events.next(), None);
110    }
111
112    #[test]
113    fn deserialize_bytes_vector_namespace() {
114        let input = Bytes::from("foo");
115        let deserializer = BytesDeserializer;
116
117        let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
118        assert_eq!(events.len(), 1);
119
120        assert_eq!(events[0].as_log().get(".").unwrap(), &Value::from("foo"));
121    }
122}