codecs/decoding/format/
protobuf.rs1use std::path::PathBuf;
2
3use bytes::Bytes;
4use chrono::Utc;
5use derivative::Derivative;
6use prost_reflect::{DynamicMessage, MessageDescriptor};
7use smallvec::{smallvec, SmallVec};
8use vector_config::configurable_component;
9use vector_core::event::LogEvent;
10use vector_core::{
11 config::{log_schema, DataType, LogNamespace},
12 event::Event,
13 schema,
14};
15use vrl::value::Kind;
16
17use super::Deserializer;
18
19#[configurable_component]
21#[derive(Debug, Clone, Default)]
22pub struct ProtobufDeserializerConfig {
23 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
25 pub protobuf: ProtobufDeserializerOptions,
26}
27
28impl ProtobufDeserializerConfig {
29 pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
31 ProtobufDeserializer::try_from(self)
32 }
33
34 pub fn output_type(&self) -> DataType {
36 DataType::Log
37 }
38
39 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
41 match log_namespace {
42 LogNamespace::Legacy => {
43 let mut definition =
44 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any());
45
46 if let Some(timestamp_key) = log_schema().timestamp_key() {
47 definition = definition.try_with_field(
48 timestamp_key,
49 Kind::any().or_timestamp(),
52 Some("timestamp"),
53 );
54 }
55 definition
56 }
57 LogNamespace::Vector => {
58 schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
59 }
60 }
61 }
62}
63
64#[configurable_component]
66#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
67#[derivative(Default)]
68pub struct ProtobufDeserializerOptions {
69 pub desc_file: PathBuf,
75
76 #[configurable(metadata(docs::examples = "package.Message"))]
78 pub message_type: String,
79}
80
81#[derive(Debug, Clone)]
83pub struct ProtobufDeserializer {
84 message_descriptor: MessageDescriptor,
85}
86
87impl ProtobufDeserializer {
88 pub fn new(message_descriptor: MessageDescriptor) -> Self {
90 Self { message_descriptor }
91 }
92}
93
94impl Deserializer for ProtobufDeserializer {
95 fn parse(
96 &self,
97 bytes: Bytes,
98 log_namespace: LogNamespace,
99 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
100 let dynamic_message = DynamicMessage::decode(self.message_descriptor.clone(), bytes)
101 .map_err(|error| format!("Error parsing protobuf: {error:?}"))?;
102
103 let proto_vrl =
104 vrl::protobuf::proto_to_value(&prost_reflect::Value::Message(dynamic_message), None)?;
105 let mut event = Event::Log(LogEvent::from(proto_vrl));
106 let event = match log_namespace {
107 LogNamespace::Vector => event,
108 LogNamespace::Legacy => {
109 let timestamp = Utc::now();
110 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
111 let log = event.as_mut_log();
112 if !log.contains(timestamp_key) {
113 log.insert(timestamp_key, timestamp);
114 }
115 }
116 event
117 }
118 };
119
120 Ok(smallvec![event])
121 }
122}
123
124impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
125 type Error = vector_common::Error;
126 fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
127 let message_descriptor = vrl::protobuf::get_message_descriptor(
128 &config.protobuf.desc_file,
129 &config.protobuf.message_type,
130 )?;
131 Ok(Self::new(message_descriptor))
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use std::path::PathBuf;
140 use std::{env, fs};
141 use vector_core::config::log_schema;
142
143 use super::*;
144
145 fn test_data_dir() -> PathBuf {
146 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
147 }
148
149 fn parse_and_validate(
150 protobuf_bin_message: String,
151 protobuf_desc_path: PathBuf,
152 message_type: &str,
153 validate_log: fn(&LogEvent),
154 ) {
155 let input = Bytes::from(protobuf_bin_message);
156 let message_descriptor =
157 vrl::protobuf::get_message_descriptor(&protobuf_desc_path, message_type).unwrap();
158 let deserializer = ProtobufDeserializer::new(message_descriptor);
159
160 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
161 let events = deserializer.parse(input.clone(), namespace).unwrap();
162 let mut events = events.into_iter();
163
164 {
165 let event = events.next().unwrap();
166 let log = event.as_log();
167 validate_log(log);
168 assert_eq!(
169 log.get(log_schema().timestamp_key_target_path().unwrap())
170 .is_some(),
171 namespace == LogNamespace::Legacy
172 );
173 }
174
175 assert_eq!(events.next(), None);
176 }
177 }
178
179 #[test]
180 fn deserialize_protobuf() {
181 let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone.pb");
182 let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
183 let message_type = "test_protobuf.Person";
184 let validate_log = |log: &LogEvent| {
185 assert_eq!(log["name"], "someone".into());
186 assert_eq!(
187 log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
188 .as_str()
189 .unwrap(),
190 "123456"
191 );
192 };
193
194 parse_and_validate(
195 fs::read_to_string(protobuf_bin_message_path).unwrap(),
196 protobuf_desc_path,
197 message_type,
198 validate_log,
199 );
200 }
201
202 #[test]
203 fn deserialize_protobuf3() {
204 let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone3.pb");
205 let protobuf_desc_path = test_data_dir().join("protos/test_protobuf3.desc");
206 let message_type = "test_protobuf3.Person";
207 let validate_log = |log: &LogEvent| {
208 assert_eq!(log["name"], "someone".into());
209 assert_eq!(
210 log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
211 .as_str()
212 .unwrap(),
213 "1234"
214 );
215 assert_eq!(
216 log["data"].as_object().unwrap()["data_phone"],
217 "HOME".into()
218 );
219 };
220
221 parse_and_validate(
222 fs::read_to_string(protobuf_bin_message_path).unwrap(),
223 protobuf_desc_path,
224 message_type,
225 validate_log,
226 );
227 }
228
229 #[test]
230 fn deserialize_empty_buffer() {
231 let protobuf_bin_message = "".to_string();
232 let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
233 let message_type = "test_protobuf.Person";
234 let validate_log = |log: &LogEvent| {
235 assert!(!log.contains("name"));
237 assert!(!log.contains("id"));
238 assert!(!log.contains("email"));
239 assert!(!log.contains("phones"));
240 };
241
242 parse_and_validate(
243 protobuf_bin_message,
244 protobuf_desc_path,
245 message_type,
246 validate_log,
247 );
248 }
249
250 #[test]
251 fn deserialize_error_invalid_protobuf() {
252 let input = Bytes::from("{ foo");
253 let message_descriptor = vrl::protobuf::get_message_descriptor(
254 &test_data_dir().join("protos/test_protobuf.desc"),
255 "test_protobuf.Person",
256 )
257 .unwrap();
258 let deserializer = ProtobufDeserializer::new(message_descriptor);
259
260 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
261 assert!(deserializer.parse(input.clone(), namespace).is_err());
262 }
263 }
264}