codecs/decoding/format/
protobuf.rs

1use std::path::PathBuf;
2
3use bytes::Bytes;
4use chrono::Utc;
5use derivative::Derivative;
6use prost_reflect::{DynamicMessage, MessageDescriptor};
7use smallvec::{SmallVec, smallvec};
8use vector_config::configurable_component;
9use vector_core::{
10    config::{DataType, LogNamespace, log_schema},
11    event::{Event, LogEvent},
12    schema,
13};
14use vrl::{
15    protobuf::{
16        descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
17        parse::{Options, proto_to_value},
18    },
19    value::{Kind, Value},
20};
21
22use super::Deserializer;
23
24/// Config used to build a `ProtobufDeserializer`.
25#[configurable_component]
26#[derive(Debug, Clone, Default)]
27pub struct ProtobufDeserializerConfig {
28    /// Protobuf-specific decoding options.
29    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
30    pub protobuf: ProtobufDeserializerOptions,
31}
32
33impl ProtobufDeserializerConfig {
34    /// Build the `ProtobufDeserializer` from this configuration.
35    pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
36        ProtobufDeserializer::try_from(self)
37    }
38
39    /// Return the type of event build by this deserializer.
40    pub fn output_type(&self) -> DataType {
41        DataType::Log
42    }
43
44    /// The schema produced by the deserializer.
45    pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
46        match log_namespace {
47            LogNamespace::Legacy => {
48                let mut definition =
49                    schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any());
50
51                if let Some(timestamp_key) = log_schema().timestamp_key() {
52                    definition = definition.try_with_field(
53                        timestamp_key,
54                        // The protobuf decoder will try to insert a new `timestamp`-type value into the
55                        // "timestamp_key" field, but only if that field doesn't already exist.
56                        Kind::any().or_timestamp(),
57                        Some("timestamp"),
58                    );
59                }
60                definition
61            }
62            LogNamespace::Vector => {
63                schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
64            }
65        }
66    }
67}
68
69/// Protobuf-specific decoding options.
70#[configurable_component]
71#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
72#[derivative(Default)]
73pub struct ProtobufDeserializerOptions {
74    /// The path to the protobuf descriptor set file.
75    ///
76    /// This file is the output of `protoc -I <include path> -o <desc output path> <proto>`.
77    ///
78    /// You can read more [here](https://buf.build/docs/reference/images/#how-buf-images-work).
79    pub desc_file: PathBuf,
80
81    /// The name of the message type to use for serializing.
82    #[configurable(metadata(docs::examples = "package.Message"))]
83    pub message_type: String,
84
85    /// Use JSON field names (camelCase) instead of protobuf field names (snake_case).
86    ///
87    /// When enabled, the deserializer will output fields using their JSON names as defined
88    /// in the `.proto` file (e.g., `jobDescription` instead of `job_description`).
89    ///
90    /// This is useful when working with data that needs to be converted to JSON or
91    /// when interfacing with systems that use JSON naming conventions.
92    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
93    pub use_json_names: bool,
94}
95
96/// Deserializer that builds `Event`s from a byte frame containing protobuf.
97#[derive(Debug, Clone)]
98pub struct ProtobufDeserializer {
99    message_descriptor: MessageDescriptor,
100    options: Options,
101}
102
103impl ProtobufDeserializer {
104    /// Creates a new `ProtobufDeserializer`.
105    pub fn new(message_descriptor: MessageDescriptor) -> Self {
106        Self {
107            message_descriptor,
108            options: Default::default(),
109        }
110    }
111
112    /// Creates a new deserializer instance using the descriptor bytes directly.
113    pub fn new_from_bytes(
114        desc_bytes: &[u8],
115        message_type: &str,
116        options: Options,
117    ) -> vector_common::Result<Self> {
118        let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
119        Ok(Self {
120            message_descriptor,
121            options,
122        })
123    }
124}
125
126fn extract_vrl_value(
127    bytes: Bytes,
128    message_descriptor: &MessageDescriptor,
129    options: &Options,
130) -> vector_common::Result<Value> {
131    let dynamic_message = DynamicMessage::decode(message_descriptor.clone(), bytes)
132        .map_err(|error| format!("Error parsing protobuf: {error:?}"))?;
133
134    Ok(proto_to_value(
135        &prost_reflect::Value::Message(dynamic_message),
136        None,
137        options,
138    )?)
139}
140
141impl Deserializer for ProtobufDeserializer {
142    fn parse(
143        &self,
144        bytes: Bytes,
145        log_namespace: LogNamespace,
146    ) -> vector_common::Result<SmallVec<[Event; 1]>> {
147        let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?;
148        let mut event = Event::Log(LogEvent::from(vrl_value));
149
150        let event = match log_namespace {
151            LogNamespace::Vector => event,
152            LogNamespace::Legacy => {
153                let timestamp = Utc::now();
154                if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
155                    let log = event.as_mut_log();
156                    if !log.contains(timestamp_key) {
157                        log.insert(timestamp_key, timestamp);
158                    }
159                }
160                event
161            }
162        };
163
164        Ok(smallvec![event])
165    }
166}
167
168impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
169    type Error = vector_common::Error;
170    fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
171        let message_descriptor =
172            get_message_descriptor(&config.protobuf.desc_file, &config.protobuf.message_type)?;
173        Ok(Self {
174            message_descriptor,
175            options: Options {
176                use_json_names: config.protobuf.use_json_names,
177            },
178        })
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    // TODO: add test for bad file path & invalid message_type
185
186    use std::{env, fs, path::PathBuf};
187
188    use vector_core::config::log_schema;
189
190    use super::*;
191
192    fn test_data_dir() -> PathBuf {
193        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
194    }
195
196    fn parse_and_validate(
197        protobuf_bin_message: String,
198        protobuf_desc_path: PathBuf,
199        message_type: &str,
200        validate_log: fn(&LogEvent),
201    ) {
202        let input = Bytes::from(protobuf_bin_message);
203        let message_descriptor = get_message_descriptor(&protobuf_desc_path, message_type).unwrap();
204        let deserializer = ProtobufDeserializer::new(message_descriptor);
205
206        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
207            let events = deserializer.parse(input.clone(), namespace).unwrap();
208            let mut events = events.into_iter();
209
210            {
211                let event = events.next().unwrap();
212                let log = event.as_log();
213                validate_log(log);
214                assert_eq!(
215                    log.get(log_schema().timestamp_key_target_path().unwrap())
216                        .is_some(),
217                    namespace == LogNamespace::Legacy
218                );
219            }
220
221            assert_eq!(events.next(), None);
222        }
223    }
224
225    #[test]
226    fn deserialize_protobuf() {
227        let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone.pb");
228        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
229        let message_type = "test_protobuf.Person";
230        let validate_log = |log: &LogEvent| {
231            assert_eq!(log["name"], "someone".into());
232            assert_eq!(
233                log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
234                    .as_str()
235                    .unwrap(),
236                "123456"
237            );
238        };
239
240        parse_and_validate(
241            fs::read_to_string(protobuf_bin_message_path).unwrap(),
242            protobuf_desc_path,
243            message_type,
244            validate_log,
245        );
246    }
247
248    #[test]
249    fn deserialize_protobuf3() {
250        let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone3.pb");
251        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf3.desc");
252        let message_type = "test_protobuf3.Person";
253        let validate_log = |log: &LogEvent| {
254            assert_eq!(log["name"], "someone".into());
255            assert_eq!(
256                log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
257                    .as_str()
258                    .unwrap(),
259                "1234"
260            );
261            assert_eq!(
262                log["data"].as_object().unwrap()["data_phone"],
263                "HOME".into()
264            );
265        };
266
267        parse_and_validate(
268            fs::read_to_string(protobuf_bin_message_path).unwrap(),
269            protobuf_desc_path,
270            message_type,
271            validate_log,
272        );
273    }
274
275    #[test]
276    fn deserialize_empty_buffer() {
277        let protobuf_bin_message = "".to_string();
278        let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
279        let message_type = "test_protobuf.Person";
280        let validate_log = |log: &LogEvent| {
281            // No field will be set.
282            assert!(!log.contains("name"));
283            assert!(!log.contains("id"));
284            assert!(!log.contains("email"));
285            assert!(!log.contains("phones"));
286        };
287
288        parse_and_validate(
289            protobuf_bin_message,
290            protobuf_desc_path,
291            message_type,
292            validate_log,
293        );
294    }
295
296    #[test]
297    fn deserialize_error_invalid_protobuf() {
298        let input = Bytes::from("{ foo");
299        let message_descriptor = get_message_descriptor(
300            &test_data_dir().join("protos/test_protobuf.desc"),
301            "test_protobuf.Person",
302        )
303        .unwrap();
304        let deserializer = ProtobufDeserializer::new(message_descriptor);
305
306        for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
307            assert!(deserializer.parse(input.clone(), namespace).is_err());
308        }
309    }
310}