codecs/encoding/format/
raw_message.rs

1use crate::encoding::format::common::get_serializer_schema_requirement;
2use bytes::{BufMut, BytesMut};
3use serde::{Deserialize, Serialize};
4use tokio_util::codec::Encoder;
5use vector_core::{config::DataType, event::Event, schema};
6
7/// Config used to build a `RawMessageSerializer`.
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct RawMessageSerializerConfig;
10
11impl RawMessageSerializerConfig {
12    /// Creates a new `RawMessageSerializerConfig`.
13    pub const fn new() -> Self {
14        Self
15    }
16
17    /// Build the `RawMessageSerializer` from this configuration.
18    pub const fn build(&self) -> RawMessageSerializer {
19        RawMessageSerializer
20    }
21
22    /// The data type of events that are accepted by `RawMessageSerializer`.
23    pub fn input_type(&self) -> DataType {
24        DataType::Log
25    }
26
27    /// The schema required by the serializer.
28    pub fn schema_requirement(&self) -> schema::Requirement {
29        get_serializer_schema_requirement()
30    }
31}
32
33/// Serializer that converts an `Event` to bytes by extracting the message key.
34#[derive(Debug, Clone)]
35pub struct RawMessageSerializer;
36
37impl Encoder<Event> for RawMessageSerializer {
38    type Error = vector_common::Error;
39
40    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
41        let log = event.as_log();
42        if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
43            buffer.put(bytes);
44        }
45        Ok(())
46    }
47}
48
49#[cfg(test)]
50mod tests {
51    use bytes::{Bytes, BytesMut};
52    use vector_core::event::LogEvent;
53
54    use super::*;
55
56    #[test]
57    fn serialize_bytes() {
58        let input = Event::from(LogEvent::from_str_legacy("foo"));
59        let mut serializer = RawMessageSerializer;
60
61        let mut buffer = BytesMut::new();
62        serializer.encode(input, &mut buffer).unwrap();
63
64        assert_eq!(buffer.freeze(), Bytes::from("foo"));
65    }
66}