codecs/encoding/format/
raw_message.rs1use bytes::{BufMut, BytesMut};
2use serde::{Deserialize, Serialize};
3use tokio_util::codec::Encoder;
4use vector_core::{config::DataType, event::Event, schema};
5
6use crate::encoding::format::common::get_serializer_schema_requirement;
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
10pub struct RawMessageSerializerConfig;
11
12impl RawMessageSerializerConfig {
13    pub const fn new() -> Self {
15        Self
16    }
17
18    pub const fn build(&self) -> RawMessageSerializer {
20        RawMessageSerializer
21    }
22
23    pub fn input_type(&self) -> DataType {
25        DataType::Log
26    }
27
28    pub fn schema_requirement(&self) -> schema::Requirement {
30        get_serializer_schema_requirement()
31    }
32}
33
34#[derive(Debug, Clone)]
36pub struct RawMessageSerializer;
37
38impl Encoder<Event> for RawMessageSerializer {
39    type Error = vector_common::Error;
40
41    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
42        let log = event.as_log();
43        if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
44            buffer.put(bytes);
45        }
46        Ok(())
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use bytes::{Bytes, BytesMut};
53    use vector_core::event::LogEvent;
54
55    use super::*;
56
57    #[test]
58    fn serialize_bytes() {
59        let input = Event::from(LogEvent::from_str_legacy("foo"));
60        let mut serializer = RawMessageSerializer;
61
62        let mut buffer = BytesMut::new();
63        serializer.encode(input, &mut buffer).unwrap();
64
65        assert_eq!(buffer.freeze(), Bytes::from("foo"));
66    }
67}