1use std::path::PathBuf;
2
3use bytes::Bytes;
4use chrono::Utc;
5use derivative::Derivative;
6use prost_reflect::{DynamicMessage, MessageDescriptor};
7use smallvec::{SmallVec, smallvec};
8use vector_config::configurable_component;
9use vector_core::{
10 config::{DataType, LogNamespace, log_schema},
11 event::{Event, LogEvent, TraceEvent},
12 schema,
13};
14use vrl::{
15 protobuf::{
16 descriptor::{get_message_descriptor, get_message_descriptor_from_bytes},
17 parse::{Options, proto_to_value},
18 },
19 value::{Kind, Value},
20};
21
22use super::Deserializer;
23
24#[configurable_component]
26#[derive(Debug, Clone, Default)]
27pub struct ProtobufDeserializerConfig {
28 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
30 pub protobuf: ProtobufDeserializerOptions,
31}
32
33impl ProtobufDeserializerConfig {
34 pub fn build(&self) -> vector_common::Result<ProtobufDeserializer> {
36 ProtobufDeserializer::try_from(self)
37 }
38
39 pub fn output_type(&self) -> DataType {
41 DataType::Log
42 }
43
44 pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
46 match log_namespace {
47 LogNamespace::Legacy => {
48 let mut definition =
49 schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any());
50
51 if let Some(timestamp_key) = log_schema().timestamp_key() {
52 definition = definition.try_with_field(
53 timestamp_key,
54 Kind::any().or_timestamp(),
57 Some("timestamp"),
58 );
59 }
60 definition
61 }
62 LogNamespace::Vector => {
63 schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
64 }
65 }
66 }
67}
68
69#[configurable_component]
71#[derive(Debug, Clone, PartialEq, Eq, Derivative)]
72#[derivative(Default)]
73pub struct ProtobufDeserializerOptions {
74 pub desc_file: PathBuf,
80
81 #[configurable(metadata(docs::examples = "package.Message"))]
83 pub message_type: String,
84}
85
86#[derive(Debug, Clone)]
88pub struct ProtobufDeserializer {
89 message_descriptor: MessageDescriptor,
90 options: Options,
91}
92
93impl ProtobufDeserializer {
94 pub fn new(message_descriptor: MessageDescriptor) -> Self {
96 Self {
97 message_descriptor,
98 options: Default::default(),
99 }
100 }
101
102 pub fn new_from_bytes(
104 desc_bytes: &[u8],
105 message_type: &str,
106 options: Options,
107 ) -> vector_common::Result<Self> {
108 let message_descriptor = get_message_descriptor_from_bytes(desc_bytes, message_type)?;
109 Ok(Self {
110 message_descriptor,
111 options,
112 })
113 }
114}
115
116fn extract_vrl_value(
117 bytes: Bytes,
118 message_descriptor: &MessageDescriptor,
119 options: &Options,
120) -> vector_common::Result<Value> {
121 let dynamic_message = DynamicMessage::decode(message_descriptor.clone(), bytes)
122 .map_err(|error| format!("Error parsing protobuf: {error:?}"))?;
123
124 Ok(proto_to_value(
125 &prost_reflect::Value::Message(dynamic_message),
126 None,
127 options,
128 )?)
129}
130
131impl Deserializer for ProtobufDeserializer {
132 fn parse(
133 &self,
134 bytes: Bytes,
135 log_namespace: LogNamespace,
136 ) -> vector_common::Result<SmallVec<[Event; 1]>> {
137 let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?;
138 let mut event = Event::Log(LogEvent::from(vrl_value));
139
140 let event = match log_namespace {
141 LogNamespace::Vector => event,
142 LogNamespace::Legacy => {
143 let timestamp = Utc::now();
144 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
145 let log = event.as_mut_log();
146 if !log.contains(timestamp_key) {
147 log.insert(timestamp_key, timestamp);
148 }
149 }
150 event
151 }
152 };
153
154 Ok(smallvec![event])
155 }
156
157 fn parse_traces(&self, bytes: Bytes) -> vector_common::Result<SmallVec<[Event; 1]>> {
158 let vrl_value = extract_vrl_value(bytes, &self.message_descriptor, &self.options)?;
159 let trace_event = Event::Trace(TraceEvent::from(vrl_value));
160 Ok(smallvec![trace_event])
161 }
162}
163
164impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
165 type Error = vector_common::Error;
166 fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
167 let message_descriptor =
168 get_message_descriptor(&config.protobuf.desc_file, &config.protobuf.message_type)?;
169 Ok(Self::new(message_descriptor))
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use std::{env, fs, path::PathBuf};
178
179 use vector_core::config::log_schema;
180
181 use super::*;
182
183 fn test_data_dir() -> PathBuf {
184 PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
185 }
186
187 fn parse_and_validate(
188 protobuf_bin_message: String,
189 protobuf_desc_path: PathBuf,
190 message_type: &str,
191 validate_log: fn(&LogEvent),
192 ) {
193 let input = Bytes::from(protobuf_bin_message);
194 let message_descriptor = get_message_descriptor(&protobuf_desc_path, message_type).unwrap();
195 let deserializer = ProtobufDeserializer::new(message_descriptor);
196
197 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
198 let events = deserializer.parse(input.clone(), namespace).unwrap();
199 let mut events = events.into_iter();
200
201 {
202 let event = events.next().unwrap();
203 let log = event.as_log();
204 validate_log(log);
205 assert_eq!(
206 log.get(log_schema().timestamp_key_target_path().unwrap())
207 .is_some(),
208 namespace == LogNamespace::Legacy
209 );
210 }
211
212 assert_eq!(events.next(), None);
213 }
214 }
215
216 #[test]
217 fn deserialize_protobuf() {
218 let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone.pb");
219 let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
220 let message_type = "test_protobuf.Person";
221 let validate_log = |log: &LogEvent| {
222 assert_eq!(log["name"], "someone".into());
223 assert_eq!(
224 log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
225 .as_str()
226 .unwrap(),
227 "123456"
228 );
229 };
230
231 parse_and_validate(
232 fs::read_to_string(protobuf_bin_message_path).unwrap(),
233 protobuf_desc_path,
234 message_type,
235 validate_log,
236 );
237 }
238
239 #[test]
240 fn deserialize_protobuf3() {
241 let protobuf_bin_message_path = test_data_dir().join("pbs/person_someone3.pb");
242 let protobuf_desc_path = test_data_dir().join("protos/test_protobuf3.desc");
243 let message_type = "test_protobuf3.Person";
244 let validate_log = |log: &LogEvent| {
245 assert_eq!(log["name"], "someone".into());
246 assert_eq!(
247 log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
248 .as_str()
249 .unwrap(),
250 "1234"
251 );
252 assert_eq!(
253 log["data"].as_object().unwrap()["data_phone"],
254 "HOME".into()
255 );
256 };
257
258 parse_and_validate(
259 fs::read_to_string(protobuf_bin_message_path).unwrap(),
260 protobuf_desc_path,
261 message_type,
262 validate_log,
263 );
264 }
265
266 #[test]
267 fn deserialize_empty_buffer() {
268 let protobuf_bin_message = "".to_string();
269 let protobuf_desc_path = test_data_dir().join("protos/test_protobuf.desc");
270 let message_type = "test_protobuf.Person";
271 let validate_log = |log: &LogEvent| {
272 assert!(!log.contains("name"));
274 assert!(!log.contains("id"));
275 assert!(!log.contains("email"));
276 assert!(!log.contains("phones"));
277 };
278
279 parse_and_validate(
280 protobuf_bin_message,
281 protobuf_desc_path,
282 message_type,
283 validate_log,
284 );
285 }
286
287 #[test]
288 fn deserialize_error_invalid_protobuf() {
289 let input = Bytes::from("{ foo");
290 let message_descriptor = get_message_descriptor(
291 &test_data_dir().join("protos/test_protobuf.desc"),
292 "test_protobuf.Person",
293 )
294 .unwrap();
295 let deserializer = ProtobufDeserializer::new(message_descriptor);
296
297 for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
298 assert!(deserializer.parse(input.clone(), namespace).is_err());
299 }
300 }
301}