codecs/decoding/format/
native.rs

1use bytes::Bytes;
2use prost::Message;
3use serde::{Deserialize, Serialize};
4use smallvec::{SmallVec, smallvec};
5use vector_core::{
6    config::{DataType, LogNamespace},
7    event::{Event, EventArray, EventContainer, proto},
8    schema,
9};
10use vrl::value::Kind;
11
12use super::Deserializer;
13
14/// Config used to build a `NativeDeserializer`.
15#[derive(Debug, Clone, Default, Deserialize, Serialize)]
16pub struct NativeDeserializerConfig;
17
18impl NativeDeserializerConfig {
19    /// Build the `NativeDeserializer` from this configuration.
20    pub fn build(&self) -> NativeDeserializer {
21        NativeDeserializer
22    }
23
24    /// Return the type of event build by this deserializer.
25    pub fn output_type(&self) -> DataType {
26        DataType::all_bits()
27    }
28
29    /// The schema produced by the deserializer.
30    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
31        match log_namespace {
32            LogNamespace::Legacy => schema::Definition::empty_legacy_namespace(),
33            LogNamespace::Vector => {
34                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
35            }
36        }
37    }
38}
39
40/// Deserializer that builds `Event`s from a byte frame containing Vector's native protobuf format.
41#[derive(Debug, Clone, Default)]
42pub struct NativeDeserializer;
43
44impl Deserializer for NativeDeserializer {
45    fn parse(
46        &self,
47        bytes: Bytes,
48        // LogNamespace is ignored because Vector owns the data format being consumed and as such there
49        // is no need to change the fields of the event.
50        _log_namespace: LogNamespace,
51    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
52        if bytes.is_empty() {
53            Ok(smallvec![])
54        } else {
55            let event_array = EventArray::from(proto::EventArray::decode(bytes)?);
56            Ok(event_array.into_events().collect())
57        }
58    }
59}