1use std::path::PathBuf;
2
3use bytes::Bytes;
4use chrono::Utc;
5use derivative::Derivative;
6use prost_reflect::{DynamicMessage, MessageDescriptor};
7use smallvec::{SmallVec, smallvec};
8use vector_config::configurable_component;
9use vector_core::{
10 config::{DataType, LogNamespace, log_schema},
11 event::{Event, LogEvent},
12 schema,
13};
14use vrl::{
15 protobuf::{
16 descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
17 parse::{Options, proto_to_value},
18 },
19 value::{Kind, Value},
20};
21
22use super::Deserializer;
23
24#[configurable_component]
26#[derive(Debug, Clone, Default)]
27pub struct ProtobufDeserializerConfig {
28 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
30 pub protobuf: ProtobufDeserializerOptions,
31}
32
33impl ProtobufDeserializerConfig {
34 pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
36 ProtobufDeserializer::try_from(self)
37 }
38
39 pub fn output_type(&self) -> DataType {
41 DataType::Log
42 }
43
44 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
46 match log_namespace {
47 LogNamespace::Legacy => {
48 let mut definition =
49 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any());
50
51 if let Some(timestamp_key) = log_schema().timestamp_key() {
52 definition = definition.try_with_field(
53 timestamp_key,
54 Kind::any().or_timestamp(),
57 Some("timestamp"),
58 );
59 }
60 definition
61 }
62 LogNamespace::Vector => {
63 schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
64 }
65 }
66 }
67}
68
69#[configurable_component]
71#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
72#[derivative(Default)]
73pub struct ProtobufDeserializerOptions {
74 pub desc_file: PathBuf,
80
81 #[configurable(metadata(docs::examples = "package.Message"))]
83 pub message_type: String,
84
85 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
93 pub use_json_names: bool,
94}
95
96#[derive(Debug, Clone)]
98pub struct ProtobufDeserializer {
99 message_descriptor: MessageDescriptor,
100 options: Options,
101}
102
103impl ProtobufDeserializer {
104 pub fn new(message_descriptor: MessageDescriptor) -> Self {
106 Self {
107 message_descriptor,
108 options: Default::default(),
109 }
110 }
111
112 pub fn new_from_bytes(
114 desc_bytes: &[u8],
115 message_type: &str,
116 options: Options,
117 ) -> vector_common::Result<Self> {
118 let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
119 Ok(Self {
120 message_descriptor,
121 options,
122 })
123 }
124}
125
126fn extract_vrl_value(
127 bytes: Bytes,
128 message_descriptor: &MessageDescriptor,
129 options: &Options,
130) -> vector_common::Result<Value> {
131 let dynamic_message = DynamicMessage::decode(message_descriptor.clone(), bytes)
132 .map_err(|error| format!("Error parsing protobuf: {error:?}"))?;
133
134 Ok(proto_to_value(
135 &prost_reflect::Value::Message(dynamic_message),
136 None,
137 options,
138 )?)
139}
140
141impl Deserializer for ProtobufDeserializer {
142 fn parse(
143 &self,
144 bytes: Bytes,
145 log_namespace: LogNamespace,
146 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
147 let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?;
148 let mut event = Event::Log(LogEvent::from(vrl_value));
149
150 let event = match log_namespace {
151 LogNamespace::Vector => event,
152 LogNamespace::Legacy => {
153 let timestamp = Utc::now();
154 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
155 let log = event.as_mut_log();
156 if !log.contains(timestamp_key) {
157 log.insert(timestamp_key, timestamp);
158 }
159 }
160 event
161 }
162 };
163
164 Ok(smallvec![event])
165 }
166}
167
168impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
169 type Error = vector_common::Error;
170 fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
171 let message_descriptor =
172 get_message_descriptor(&config.protobuf.desc_file, &config.protobuf.message_type)?;
173 Ok(Self {
174 message_descriptor,
175 options: Options {
176 use_json_names: config.protobuf.use_json_names,
177 },
178 })
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use std::{env, fs, path::PathBuf};
187
188 use vector_core::config::log_schema;
189
190 use super::*;
191
192 fn test_data_dir() -> PathBuf {
193 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
194 }
195
196 fn parse_and_validate(
197 protobuf_bin_message: String,
198 protobuf_desc_path: PathBuf,
199 message_type: &str,
200 validate_log: fn(&LogEvent),
201 ) {
202 let input = Bytes::from(protobuf_bin_message);
203 let message_descriptor = get_message_descriptor(&protobuf_desc_path, message_type).unwrap();
204 let deserializer = ProtobufDeserializer::new(message_descriptor);
205
206 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
207 let events = deserializer.parse(input.clone(), namespace).unwrap();
208 let mut events = events.into_iter();
209
210 {
211 let event = events.next().unwrap();
212 let log = event.as_log();
213 validate_log(log);
214 assert_eq!(
215 log.get(log_schema().timestamp_key_target_path().unwrap())
216 .is_some(),
217 namespace == LogNamespace::Legacy
218 );
219 }
220
221 assert_eq!(events.next(), None);
222 }
223 }
224
225 #[test]
226 fn deserialize_protobuf() {
227 let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone.pb");
228 let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
229 let message_type = "test_protobuf.Person";
230 let validate_log = |log: &LogEvent| {
231 assert_eq!(log["name"], "someone".into());
232 assert_eq!(
233 log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
234 .as_str()
235 .unwrap(),
236 "123456"
237 );
238 };
239
240 parse_and_validate(
241 fs::read_to_string(protobuf_bin_message_path).unwrap(),
242 protobuf_desc_path,
243 message_type,
244 validate_log,
245 );
246 }
247
248 #[test]
249 fn deserialize_protobuf3() {
250 let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone3.pb");
251 let protobuf_desc_path = test_data_dir().join("protos/test_protobuf3.desc");
252 let message_type = "test_protobuf3.Person";
253 let validate_log = |log: &LogEvent| {
254 assert_eq!(log["name"], "someone".into());
255 assert_eq!(
256 log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
257 .as_str()
258 .unwrap(),
259 "1234"
260 );
261 assert_eq!(
262 log["data"].as_object().unwrap()["data_phone"],
263 "HOME".into()
264 );
265 };
266
267 parse_and_validate(
268 fs::read_to_string(protobuf_bin_message_path).unwrap(),
269 protobuf_desc_path,
270 message_type,
271 validate_log,
272 );
273 }
274
275 #[test]
276 fn deserialize_empty_buffer() {
277 let protobuf_bin_message = "".to_string();
278 let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
279 let message_type = "test_protobuf.Person";
280 let validate_log = |log: &LogEvent| {
281 assert!(!log.contains("name"));
283 assert!(!log.contains("id"));
284 assert!(!log.contains("email"));
285 assert!(!log.contains("phones"));
286 };
287
288 parse_and_validate(
289 protobuf_bin_message,
290 protobuf_desc_path,
291 message_type,
292 validate_log,
293 );
294 }
295
296 #[test]
297 fn deserialize_error_invalid_protobuf() {
298 let input = Bytes::from("{ foo");
299 let message_descriptor = get_message_descriptor(
300 &test_data_dir().join("protos/test_protobuf.desc"),
301 "test_protobuf.Person",
302 )
303 .unwrap();
304 let deserializer = ProtobufDeserializer::new(message_descriptor);
305
306 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
307 assert!(deserializer.parse(input.clone(), namespace).is_err());
308 }
309 }
310}