vector_core/event/
ser.rs

1use bytes::{Buf, BufMut};
2use enumflags2::{bitflags, BitFlags, FromBitsError};
3use prost::Message;
4use snafu::Snafu;
5use vector_buffers::encoding::{AsMetadata, Encodable};
6
7use super::{proto, Event, EventArray};
8
9#[derive(Debug, Snafu)]
10pub enum EncodeError {
11    #[snafu(display("the provided buffer was too small to fully encode this item"))]
12    BufferTooSmall,
13}
14
15#[derive(Debug, Snafu)]
16pub enum DecodeError {
17    #[snafu(display(
18        "the provided buffer could not be decoded as a valid Protocol Buffers payload"
19    ))]
20    InvalidProtobufPayload,
21    #[snafu(display("unsupported encoding metadata for this context"))]
22    UnsupportedEncodingMetadata,
23}
24/// Flags for describing the encoding scheme used by our primary event types that flow through buffers.
25///
26/// # Stability
27///
28/// This enumeration should never have any flags removed, only added.  This ensures that previously
29/// used flags cannot have their meaning changed/repurposed after-the-fact.
30#[bitflags]
31#[repr(u32)]
32#[derive(Copy, Clone, Debug, PartialEq, Eq)]
33pub enum EventEncodableMetadataFlags {
34    /// Chained encoding scheme that first tries to decode as `EventArray` and then as `Event`, as a
35    /// way to support gracefully migrating existing v1-based disk buffers to the new
36    /// `EventArray`-based architecture.
37    ///
38    /// All encoding uses the `EventArray` variant, however.
39    DiskBufferV1CompatibilityMode = 0b1,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq)]
43pub struct EventEncodableMetadata(BitFlags<EventEncodableMetadataFlags>);
44
45impl EventEncodableMetadata {
46    fn contains(self, flag: EventEncodableMetadataFlags) -> bool {
47        self.0.contains(flag)
48    }
49}
50
51impl From<EventEncodableMetadataFlags> for EventEncodableMetadata {
52    fn from(flag: EventEncodableMetadataFlags) -> Self {
53        Self(BitFlags::from(flag))
54    }
55}
56
57impl From<BitFlags<EventEncodableMetadataFlags>> for EventEncodableMetadata {
58    fn from(flags: BitFlags<EventEncodableMetadataFlags>) -> Self {
59        Self(flags)
60    }
61}
62
63impl TryFrom<u32> for EventEncodableMetadata {
64    type Error = FromBitsError<EventEncodableMetadataFlags>;
65
66    fn try_from(value: u32) -> Result<Self, Self::Error> {
67        BitFlags::try_from(value).map(Self)
68    }
69}
70
71impl AsMetadata for EventEncodableMetadata {
72    fn into_u32(self) -> u32 {
73        self.0.bits()
74    }
75
76    fn from_u32(value: u32) -> Option<Self> {
77        EventEncodableMetadata::try_from(value).ok()
78    }
79}
80
81impl Encodable for EventArray {
82    type Metadata = EventEncodableMetadata;
83    type EncodeError = EncodeError;
84    type DecodeError = DecodeError;
85
86    fn get_metadata() -> Self::Metadata {
87        EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode.into()
88    }
89
90    fn can_decode(metadata: Self::Metadata) -> bool {
91        metadata.contains(EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode)
92    }
93
94    fn encode<B>(self, buffer: &mut B) -> Result<(), Self::EncodeError>
95    where
96        B: BufMut,
97    {
98        proto::EventArray::from(self)
99            .encode(buffer)
100            .map_err(|_| EncodeError::BufferTooSmall)
101    }
102
103    fn decode<B>(metadata: Self::Metadata, buffer: B) -> Result<Self, Self::DecodeError>
104    where
105        B: Buf + Clone,
106    {
107        if metadata.contains(EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode) {
108            proto::EventArray::decode(buffer.clone())
109                .map(Into::into)
110                .or_else(|_| {
111                    proto::EventWrapper::decode(buffer)
112                        .map(|pe| EventArray::from(Event::from(pe)))
113                        .map_err(|_| DecodeError::InvalidProtobufPayload)
114                })
115        } else {
116            Err(DecodeError::UnsupportedEncodingMetadata)
117        }
118    }
119}