codecs/encoding/format/
raw_message.rs1use bytes::{BufMut, BytesMut};
2use serde::{Deserialize, Serialize};
3use tokio_util::codec::Encoder;
4use vector_core::{config::DataType, event::Event, schema};
5
6use crate::encoding::format::common::get_serializer_schema_requirement;
7
8#[derive(Debug, Clone, Default, Deserialize, Serialize)]
10pub struct RawMessageSerializerConfig;
11
12impl RawMessageSerializerConfig {
13 pub const fn new() -> Self {
15 Self
16 }
17
18 pub const fn build(&self) -> RawMessageSerializer {
20 RawMessageSerializer
21 }
22
23 pub fn input_type(&self) -> DataType {
25 DataType::Log
26 }
27
28 pub fn schema_requirement(&self) -> schema::Requirement {
30 get_serializer_schema_requirement()
31 }
32}
33
34#[derive(Debug, Clone)]
36pub struct RawMessageSerializer;
37
38impl Encoder<Event> for RawMessageSerializer {
39 type Error = vector_common::Error;
40
41 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
42 let log = event.as_log();
43 if let Some(bytes) = log.get_message().map(|value| value.coerce_to_bytes()) {
44 buffer.put(bytes);
45 }
46 Ok(())
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use bytes::{Bytes, BytesMut};
53 use vector_core::event::LogEvent;
54
55 use super::*;
56
57 #[test]
58 fn serialize_bytes() {
59 let input = Event::from(LogEvent::from_str_legacy("foo"));
60 let mut serializer = RawMessageSerializer;
61
62 let mut buffer = BytesMut::new();
63 serializer.encode(input, &mut buffer).unwrap();
64
65 assert_eq!(buffer.freeze(), Bytes::from("foo"));
66 }
67}