1use bytes::{Buf, BufMut};
2use enumflags2::{bitflags, BitFlags, FromBitsError};
3use prost::Message;
4use snafu::Snafu;
5use vector_buffers::encoding::{AsMetadata, Encodable};
6
7use super::{proto, Event, EventArray};
8
9#[derive(Debug, Snafu)]
10pub enum EncodeError {
11 #[snafu(display("the provided buffer was too small to fully encode this item"))]
12 BufferTooSmall,
13}
14
15#[derive(Debug, Snafu)]
16pub enum DecodeError {
17 #[snafu(display(
18 "the provided buffer could not be decoded as a valid Protocol Buffers payload"
19 ))]
20 InvalidProtobufPayload,
21 #[snafu(display("unsupported encoding metadata for this context"))]
22 UnsupportedEncodingMetadata,
23}
24#[bitflags]
31#[repr(u32)]
32#[derive(Copy, Clone, Debug, PartialEq, Eq)]
33pub enum EventEncodableMetadataFlags {
34 DiskBufferV1CompatibilityMode = 0b1,
40}
41
42#[derive(Clone, Copy, Debug, PartialEq, Eq)]
43pub struct EventEncodableMetadata(BitFlags<EventEncodableMetadataFlags>);
44
45impl EventEncodableMetadata {
46 fn contains(self, flag: EventEncodableMetadataFlags) -> bool {
47 self.0.contains(flag)
48 }
49}
50
51impl From<EventEncodableMetadataFlags> for EventEncodableMetadata {
52 fn from(flag: EventEncodableMetadataFlags) -> Self {
53 Self(BitFlags::from(flag))
54 }
55}
56
57impl From<BitFlags<EventEncodableMetadataFlags>> for EventEncodableMetadata {
58 fn from(flags: BitFlags<EventEncodableMetadataFlags>) -> Self {
59 Self(flags)
60 }
61}
62
63impl TryFrom<u32> for EventEncodableMetadata {
64 type Error = FromBitsError<EventEncodableMetadataFlags>;
65
66 fn try_from(value: u32) -> Result<Self, Self::Error> {
67 BitFlags::try_from(value).map(Self)
68 }
69}
70
71impl AsMetadata for EventEncodableMetadata {
72 fn into_u32(self) -> u32 {
73 self.0.bits()
74 }
75
76 fn from_u32(value: u32) -> Option<Self> {
77 EventEncodableMetadata::try_from(value).ok()
78 }
79}
80
81impl Encodable for EventArray {
82 type Metadata = EventEncodableMetadata;
83 type EncodeError = EncodeError;
84 type DecodeError = DecodeError;
85
86 fn get_metadata() -> Self::Metadata {
87 EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode.into()
88 }
89
90 fn can_decode(metadata: Self::Metadata) -> bool {
91 metadata.contains(EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode)
92 }
93
94 fn encode<B>(self, buffer: &mut B) -> Result<(), Self::EncodeError>
95 where
96 B: BufMut,
97 {
98 proto::EventArray::from(self)
99 .encode(buffer)
100 .map_err(|_| EncodeError::BufferTooSmall)
101 }
102
103 fn decode<B>(metadata: Self::Metadata, buffer: B) -> Result<Self, Self::DecodeError>
104 where
105 B: Buf + Clone,
106 {
107 if metadata.contains(EventEncodableMetadataFlags::DiskBufferV1CompatibilityMode) {
108 proto::EventArray::decode(buffer.clone())
109 .map(Into::into)
110 .or_else(|_| {
111 proto::EventWrapper::decode(buffer)
112 .map(|pe| EventArray::from(Event::from(pe)))
113 .map_err(|_| DecodeError::InvalidProtobufPayload)
114 })
115 } else {
116 Err(DecodeError::UnsupportedEncodingMetadata)
117 }
118 }
119}