codecs/encoding/format/
raw_message.rs1use crate::encoding::format::common::get_serializer_schema_requirement;
2use bytes::{BufMut, BytesMut};
3use serde::{Deserialize, Serialize};
4use tokio_util::codec::Encoder;
5use vector_core::{config::DataType, event::Event, schema};
6
7#[derive(Debug, Clone, Default, Deserialize, Serialize)]
9pub struct RawMessageSerializerConfig;
10
11impl RawMessageSerializerConfig {
12 pub const fn new() -> Self {
14 Self
15 }
16
17 pub const fn build(&self) -> RawMessageSerializer {
19 RawMessageSerializer
20 }
21
22 pub fn input_type(&self) -> DataType {
24 DataType::Log
25 }
26
27 pub fn schema_requirement(&self) -> schema::Requirement {
29 get_serializer_schema_requirement()
30 }
31}
32
33#[derive(Debug, Clone)]
35pub struct RawMessageSerializer;
36
37impl Encoder<Event> for RawMessageSerializer {
38 type Error = vector_common::Error;
39
40 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
41 let log = event.as_log();
42 if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
43 buffer.put(bytes);
44 }
45 Ok(())
46 }
47}
48
49#[cfg(test)]
50mod tests {
51 use bytes::{Bytes, BytesMut};
52 use vector_core::event::LogEvent;
53
54 use super::*;
55
56 #[test]
57 fn serialize_bytes() {
58 let input = Event::from(LogEvent::from_str_legacy("foo"));
59 let mut serializer = RawMessageSerializer;
60
61 let mut buffer = BytesMut::new();
62 serializer.encode(input, &mut buffer).unwrap();
63
64 assert_eq!(buffer.freeze(), Bytes::from("foo"));
65 }
66}