codecs/decoding/format/
protobuf.rs

1use std::path::PathBuf;
2
3use bytes::Bytes;
4use chrono::Utc;
5use derivative::Derivative;
6use prost_reflect::{DynamicMessage, MessageDescriptor};
7use smallvec::{smallvec, SmallVec};
8use vector_config::configurable_component;
9use vector_core::event::LogEvent;
10use vector_core::{
11    config::{log_schema, DataType, LogNamespace},
12    event::Event,
13    schema,
14};
15use vrl::value::Kind;
16
17use super::Deserializer;
18
19/// Config used to build a `ProtobufDeserializer`.
20#[configurable_component]
21#[derive(Debug, Clone, Default)]
22pub struct ProtobufDeserializerConfig {
23    /// Protobuf-specific decoding options.
24    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
25    pub protobuf: ProtobufDeserializerOptions,
26}
27
28impl ProtobufDeserializerConfig {
29    /// Build the `ProtobufDeserializer` from this configuration.
30    pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
31        ProtobufDeserializer::try_from(self)
32    }
33
34    /// Return the type of event build by this deserializer.
35    pub fn output_type(&self) -> DataType {
36        DataType::Log
37    }
38
39    /// The schema produced by the deserializer.
40    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
41        match log_namespace {
42            LogNamespace::Legacy => {
43                let mut definition =
44                    schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any());
45
46                if let Some(timestamp_key) = log_schema().timestamp_key() {
47                    definition = definition.try_with_field(
48                        timestamp_key,
49                        // The protobuf decoder will try to insert a new `timestamp`-type value into the
50                        // "timestamp_key" field, but only if that field doesn't already exist.
51                        Kind::any().or_timestamp(),
52                        Some("timestamp"),
53                    );
54                }
55                definition
56            }
57            LogNamespace::Vector => {
58                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
59            }
60        }
61    }
62}
63
64/// Protobuf-specific decoding options.
65#[configurable_component]
66#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
67#[derivative(Default)]
68pub struct ProtobufDeserializerOptions {
69    /// The path to the protobuf descriptor set file.
70    ///
71    /// This file is the output of `protoc -I <include path> -o <desc output path> <proto>`
72    ///
73    /// You can read more [here](https://buf.build/docs/reference/images/#how-buf-images-work).
74    pub desc_file: PathBuf,
75
76    /// The name of the message type to use for serializing.
77    #[configurable(metadata(docs::examples = "package.Message"))]
78    pub message_type: String,
79}
80
81/// Deserializer that builds `Event`s from a byte frame containing protobuf.
82#[derive(Debug, Clone)]
83pub struct ProtobufDeserializer {
84    message_descriptor: MessageDescriptor,
85}
86
87impl ProtobufDeserializer {
88    /// Creates a new `ProtobufDeserializer`.
89    pub fn new(message_descriptor: MessageDescriptor) -> Self {
90        Self { message_descriptor }
91    }
92}
93
94impl Deserializer for ProtobufDeserializer {
95    fn parse(
96        &self,
97        bytes: Bytes,
98        log_namespace: LogNamespace,
99    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
100        let dynamic_message = DynamicMessage::decode(self.message_descriptor.clone(), bytes)
101            .map_err(|error| format!("Error parsing protobuf: {error:?}"))?;
102
103        let proto_vrl =
104            vrl::protobuf::proto_to_value(&prost_reflect::Value::Message(dynamic_message), None)?;
105        let mut event = Event::Log(LogEvent::from(proto_vrl));
106        let event = match log_namespace {
107            LogNamespace::Vector => event,
108            LogNamespace::Legacy => {
109                let timestamp = Utc::now();
110                if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
111                    let log = event.as_mut_log();
112                    if !log.contains(timestamp_key) {
113                        log.insert(timestamp_key, timestamp);
114                    }
115                }
116                event
117            }
118        };
119
120        Ok(smallvec![event])
121    }
122}
123
124impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
125    type Error = vector_common::Error;
126    fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
127        let message_descriptor = vrl::protobuf::get_message_descriptor(
128            &config.protobuf.desc_file,
129            &config.protobuf.message_type,
130        )?;
131        Ok(Self::new(message_descriptor))
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    // TODO: add test for bad file path & invalid message_type
138
139    use std::path::PathBuf;
140    use std::{env, fs};
141    use vector_core::config::log_schema;
142
143    use super::*;
144
145    fn test_data_dir() -> PathBuf {
146        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
147    }
148
149    fn parse_and_validate(
150        protobuf_bin_message: String,
151        protobuf_desc_path: PathBuf,
152        message_type: &str,
153        validate_log: fn(&LogEvent),
154    ) {
155        let input = Bytes::from(protobuf_bin_message);
156        let message_descriptor =
157            vrl::protobuf::get_message_descriptor(&protobuf_desc_path, message_type).unwrap();
158        let deserializer = ProtobufDeserializer::new(message_descriptor);
159
160        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
161            let events = deserializer.parse(input.clone(), namespace).unwrap();
162            let mut events = events.into_iter();
163
164            {
165                let event = events.next().unwrap();
166                let log = event.as_log();
167                validate_log(log);
168                assert_eq!(
169                    log.get(log_schema().timestamp_key_target_path().unwrap())
170                        .is_some(),
171                    namespace == LogNamespace::Legacy
172                );
173            }
174
175            assert_eq!(events.next(), None);
176        }
177    }
178
179    #[test]
180    fn deserialize_protobuf() {
181        let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone.pb");
182        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
183        let message_type = "test_protobuf.Person";
184        let validate_log = |log: &LogEvent| {
185            assert_eq!(log["name"], "someone".into());
186            assert_eq!(
187                log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
188                    .as_str()
189                    .unwrap(),
190                "123456"
191            );
192        };
193
194        parse_and_validate(
195            fs::read_to_string(protobuf_bin_message_path).unwrap(),
196            protobuf_desc_path,
197            message_type,
198            validate_log,
199        );
200    }
201
202    #[test]
203    fn deserialize_protobuf3() {
204        let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone3.pb");
205        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf3.desc");
206        let message_type = "test_protobuf3.Person";
207        let validate_log = |log: &LogEvent| {
208            assert_eq!(log["name"], "someone".into());
209            assert_eq!(
210                log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
211                    .as_str()
212                    .unwrap(),
213                "1234"
214            );
215            assert_eq!(
216                log["data"].as_object().unwrap()["data_phone"],
217                "HOME".into()
218            );
219        };
220
221        parse_and_validate(
222            fs::read_to_string(protobuf_bin_message_path).unwrap(),
223            protobuf_desc_path,
224            message_type,
225            validate_log,
226        );
227    }
228
229    #[test]
230    fn deserialize_empty_buffer() {
231        let protobuf_bin_message = "".to_string();
232        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
233        let message_type = "test_protobuf.Person";
234        let validate_log = |log: &LogEvent| {
235            // No field will be set.
236            assert!(!log.contains("name"));
237            assert!(!log.contains("id"));
238            assert!(!log.contains("email"));
239            assert!(!log.contains("phones"));
240        };
241
242        parse_and_validate(
243            protobuf_bin_message,
244            protobuf_desc_path,
245            message_type,
246            validate_log,
247        );
248    }
249
250    #[test]
251    fn deserialize_error_invalid_protobuf() {
252        let input = Bytes::from("{ foo");
253        let message_descriptor = vrl::protobuf::get_message_descriptor(
254            &test_data_dir().join("protos/test_protobuf.desc"),
255            "test_protobuf.Person",
256        )
257        .unwrap();
258        let deserializer = ProtobufDeserializer::new(message_descriptor);
259
260        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
261            assert!(deserializer.parse(input.clone(), namespace).is_err());
262        }
263    }
264}