codecs/decoding/format/
bytes.rs

1use bytes::Bytes;
2use lookup::OwnedTargetPath;
3use serde::{Deserialize, Serialize};
4use smallvec::{SmallVec, smallvec};
5use vector_core::{
6    config::{DataType, LogNamespace, log_schema},
7    event::{Event, LogEvent},
8    schema,
9    schema::meaning,
10};
11use vrl::value::Kind;
12
13use super::Deserializer;
14
15/// Config used to build a `BytesDeserializer`.
16#[derive(Debug, Clone, Default, Deserialize, Serialize)]
17pub struct BytesDeserializerConfig;
18
19impl BytesDeserializerConfig {
20    /// Creates a new `BytesDeserializerConfig`.
21    pub const fn new() -> Self {
22        Self
23    }
24
25    /// Build the `BytesDeserializer` from this configuration.
26    pub fn build(&self) -> BytesDeserializer {
27        BytesDeserializer
28    }
29
30    /// Return the type of event build by this deserializer.
31    pub fn output_type(&self) -> DataType {
32        DataType::Log
33    }
34
35    /// The schema produced by the deserializer.
36    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
37        match log_namespace {
38            LogNamespace::Legacy => {
39                let definition = schema::Definition::empty_legacy_namespace();
40                if let Some(message_key) = log_schema().message_key() {
41                    return definition.with_event_field(
42                        message_key,
43                        Kind::bytes(),
44                        Some(meaning::MESSAGE),
45                    );
46                }
47                definition
48            }
49            LogNamespace::Vector => {
50                schema::Definition::new_with_default_metadata(Kind::bytes(), [log_namespace])
51                    .with_meaning(OwnedTargetPath::event_root(), "message")
52            }
53        }
54    }
55}
56
57/// Deserializer that converts bytes to an `Event`.
58///
59/// This deserializer can be considered as the no-op action for input where no
60/// further decoding has been specified.
61#[derive(Debug, Clone)]
62pub struct BytesDeserializer;
63
64impl BytesDeserializer {
65    /// Deserializes the given bytes, which will always produce a single `LogEvent`.
66    pub fn parse_single(&self, bytes: Bytes, log_namespace: LogNamespace) -> LogEvent {
67        match log_namespace {
68            LogNamespace::Vector => log_namespace.new_log_from_data(bytes),
69            LogNamespace::Legacy => {
70                let mut log = LogEvent::default();
71                log.maybe_insert(log_schema().message_key_target_path(), bytes);
72                log
73            }
74        }
75    }
76}
77
78impl Deserializer for BytesDeserializer {
79    fn parse(
80        &self,
81        bytes: Bytes,
82        log_namespace: LogNamespace,
83    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
84        let log = self.parse_single(bytes, log_namespace);
85        Ok(smallvec![log.into()])
86    }
87}
88
89#[cfg(test)]
90mod tests {
91    use vrl::value::Value;
92
93    use super::*;
94
95    #[test]
96    fn deserialize_bytes_legacy_namespace() {
97        let input = Bytes::from("foo");
98        let deserializer = BytesDeserializer;
99
100        let events = deserializer.parse(input, LogNamespace::Legacy).unwrap();
101        let mut events = events.into_iter();
102
103        {
104            let event = events.next().unwrap();
105            let log = event.as_log();
106            assert_eq!(*log.get_message().unwrap(), "foo".into());
107        }
108
109        assert_eq!(events.next(), None);
110    }
111
112    #[test]
113    fn deserialize_bytes_vector_namespace() {
114        let input = Bytes::from("foo");
115        let deserializer = BytesDeserializer;
116
117        let events = deserializer.parse(input, LogNamespace::Vector).unwrap();
118        assert_eq!(events.len(), 1);
119
120        assert_eq!(events[0].as_log().get(".").unwrap(), &Value::from("foo"));
121    }
122}