codecs/decoding/format/
protobuf.rs

1use std::path::PathBuf;
2
3use bytes::Bytes;
4use chrono::Utc;
5use derivative::Derivative;
6use prost_reflect::{DynamicMessage, MessageDescriptor};
7use smallvec::{SmallVec, smallvec};
8use vector_config::configurable_component;
9use vector_core::{
10    config::{DataType, LogNamespace, log_schema},
11    event::{Event, LogEvent, TraceEvent},
12    schema,
13};
14use vrl::{
15    protobuf::{
16        descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
17        parse::{Options, proto_to_value},
18    },
19    value::{Kind, Value},
20};
21
22use super::Deserializer;
23
24/// Config used to build a `ProtobufDeserializer`.
25#[configurable_component]
26#[derive(Debug, Clone, Default)]
27pub struct ProtobufDeserializerConfig {
28    /// Protobuf-specific decoding options.
29    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
30    pub protobuf: ProtobufDeserializerOptions,
31}
32
33impl ProtobufDeserializerConfig {
34    /// Build the `ProtobufDeserializer` from this configuration.
35    pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
36        ProtobufDeserializer::try_from(self)
37    }
38
39    /// Return the type of event build by this deserializer.
40    pub fn output_type(&self) -> DataType {
41        DataType::Log
42    }
43
44    /// The schema produced by the deserializer.
45    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
46        match log_namespace {
47            LogNamespace::Legacy => {
48                let mut definition =
49                    schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any());
50
51                if let Some(timestamp_key) = log_schema().timestamp_key() {
52                    definition = definition.try_with_field(
53                        timestamp_key,
54                        // The protobuf decoder will try to insert a new `timestamp`-type value into the
55                        // "timestamp_key" field, but only if that field doesn't already exist.
56                        Kind::any().or_timestamp(),
57                        Some("timestamp"),
58                    );
59                }
60                definition
61            }
62            LogNamespace::Vector => {
63                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
64            }
65        }
66    }
67}
68
69/// Protobuf-specific decoding options.
70#[configurable_component]
71#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
72#[derivative(Default)]
73pub struct ProtobufDeserializerOptions {
74    /// The path to the protobuf descriptor set file.
75    ///
76    /// This file is the output of `protoc -I <include path> -o <desc output path> <proto>`
77    ///
78    /// You can read more [here](https://buf.build/docs/reference/images/#how-buf-images-work).
79    pub desc_file: PathBuf,
80
81    /// The name of the message type to use for serializing.
82    #[configurable(metadata(docs::examples = "package.Message"))]
83    pub message_type: String,
84}
85
86/// Deserializer that builds `Event`s from a byte frame containing protobuf.
87#[derive(Debug, Clone)]
88pub struct ProtobufDeserializer {
89    message_descriptor: MessageDescriptor,
90    options: Options,
91}
92
93impl ProtobufDeserializer {
94    /// Creates a new `ProtobufDeserializer`.
95    pub fn new(message_descriptor: MessageDescriptor) -> Self {
96        Self {
97            message_descriptor,
98            options: Default::default(),
99        }
100    }
101
102    /// Creates a new deserializer instance using the descriptor bytes directly.
103    pub fn new_from_bytes(
104        desc_bytes: &[u8],
105        message_type: &str,
106        options: Options,
107    ) -> vector_common::Result<Self> {
108        let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
109        Ok(Self {
110            message_descriptor,
111            options,
112        })
113    }
114}
115
116fn extract_vrl_value(
117    bytes: Bytes,
118    message_descriptor: &MessageDescriptor,
119    options: &Options,
120) -> vector_common::Result<Value> {
121    let dynamic_message = DynamicMessage::decode(message_descriptor.clone(), bytes)
122        .map_err(|error| format!("Error parsing protobuf: {error:?}"))?;
123
124    Ok(proto_to_value(
125        &prost_reflect::Value::Message(dynamic_message),
126        None,
127        options,
128    )?)
129}
130
131impl Deserializer for ProtobufDeserializer {
132    fn parse(
133        &self,
134        bytes: Bytes,
135        log_namespace: LogNamespace,
136    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
137        let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?;
138        let mut event = Event::Log(LogEvent::from(vrl_value));
139
140        let event = match log_namespace {
141            LogNamespace::Vector => event,
142            LogNamespace::Legacy => {
143                let timestamp = Utc::now();
144                if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
145                    let log = event.as_mut_log();
146                    if !log.contains(timestamp_key) {
147                        log.insert(timestamp_key, timestamp);
148                    }
149                }
150                event
151            }
152        };
153
154        Ok(smallvec![event])
155    }
156
157    fn parse_traces(&self, bytes: Bytes) -> vector_common::Result<SmallVec<[Event; 1]>> {
158        let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?;
159        let trace_event = Event::Trace(TraceEvent::from(vrl_value));
160        Ok(smallvec![trace_event])
161    }
162}
163
164impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
165    type Error = vector_common::Error;
166    fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
167        let message_descriptor =
168            get_message_descriptor(&config.protobuf.desc_file, &config.protobuf.message_type)?;
169        Ok(Self::new(message_descriptor))
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    // TODO: add test for bad file path & invalid message_type
176
177    use std::{env, fs, path::PathBuf};
178
179    use vector_core::config::log_schema;
180
181    use super::*;
182
183    fn test_data_dir() -> PathBuf {
184        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
185    }
186
187    fn parse_and_validate(
188        protobuf_bin_message: String,
189        protobuf_desc_path: PathBuf,
190        message_type: &str,
191        validate_log: fn(&LogEvent),
192    ) {
193        let input = Bytes::from(protobuf_bin_message);
194        let message_descriptor = get_message_descriptor(&protobuf_desc_path, message_type).unwrap();
195        let deserializer = ProtobufDeserializer::new(message_descriptor);
196
197        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
198            let events = deserializer.parse(input.clone(), namespace).unwrap();
199            let mut events = events.into_iter();
200
201            {
202                let event = events.next().unwrap();
203                let log = event.as_log();
204                validate_log(log);
205                assert_eq!(
206                    log.get(log_schema().timestamp_key_target_path().unwrap())
207                        .is_some(),
208                    namespace == LogNamespace::Legacy
209                );
210            }
211
212            assert_eq!(events.next(), None);
213        }
214    }
215
216    #[test]
217    fn deserialize_protobuf() {
218        let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone.pb");
219        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
220        let message_type = "test_protobuf.Person";
221        let validate_log = |log: &LogEvent| {
222            assert_eq!(log["name"], "someone".into());
223            assert_eq!(
224                log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
225                    .as_str()
226                    .unwrap(),
227                "123456"
228            );
229        };
230
231        parse_and_validate(
232            fs::read_to_string(protobuf_bin_message_path).unwrap(),
233            protobuf_desc_path,
234            message_type,
235            validate_log,
236        );
237    }
238
239    #[test]
240    fn deserialize_protobuf3() {
241        let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone3.pb");
242        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf3.desc");
243        let message_type = "test_protobuf3.Person";
244        let validate_log = |log: &LogEvent| {
245            assert_eq!(log["name"], "someone".into());
246            assert_eq!(
247                log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
248                    .as_str()
249                    .unwrap(),
250                "1234"
251            );
252            assert_eq!(
253                log["data"].as_object().unwrap()["data_phone"],
254                "HOME".into()
255            );
256        };
257
258        parse_and_validate(
259            fs::read_to_string(protobuf_bin_message_path).unwrap(),
260            protobuf_desc_path,
261            message_type,
262            validate_log,
263        );
264    }
265
266    #[test]
267    fn deserialize_empty_buffer() {
268        let protobuf_bin_message = "".to_string();
269        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
270        let message_type = "test_protobuf.Person";
271        let validate_log = |log: &LogEvent| {
272            // No field will be set.
273            assert!(!log.contains("name"));
274            assert!(!log.contains("id"));
275            assert!(!log.contains("email"));
276            assert!(!log.contains("phones"));
277        };
278
279        parse_and_validate(
280            protobuf_bin_message,
281            protobuf_desc_path,
282            message_type,
283            validate_log,
284        );
285    }
286
287    #[test]
288    fn deserialize_error_invalid_protobuf() {
289        let input = Bytes::from("{ foo");
290        let message_descriptor = get_message_descriptor(
291            &test_data_dir().join("protos/test_protobuf.desc"),
292            "test_protobuf.Person",
293        )
294        .unwrap();
295        let deserializer = ProtobufDeserializer::new(message_descriptor);
296
297        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
298            assert!(deserializer.parse(input.clone(), namespace).is_err());
299        }
300    }
301}