vrl/protobuf/
parse.rs

1use crate::compiler::prelude::*;
2use prost_reflect::ReflectMessage;
3#[cfg(any(feature = "enable_system_functions", test))]
4use prost_reflect::{DynamicMessage, MessageDescriptor};
5
6#[derive(Default, Debug, Clone, Eq, PartialEq)]
7pub struct Options {
8    pub use_json_names: bool,
9}
10
11pub fn proto_to_value(
12    prost_reflect_value: &prost_reflect::Value,
13    field_descriptor: Option<&prost_reflect::FieldDescriptor>,
14    options: &Options,
15) -> Result<Value, String> {
16    let vrl_value = match prost_reflect_value {
17        prost_reflect::Value::Bool(v) => Value::from(*v),
18        prost_reflect::Value::I32(v) => Value::from(*v),
19        prost_reflect::Value::I64(v) => Value::from(*v),
20        prost_reflect::Value::U32(v) => Value::from(*v),
21        prost_reflect::Value::U64(v) => Value::from(*v),
22        prost_reflect::Value::F32(v) => {
23            Value::Float(NotNan::new(f64::from(*v)).map_err(|_e| "Float number cannot be Nan")?)
24        }
25        prost_reflect::Value::F64(v) => {
26            Value::Float(NotNan::new(*v).map_err(|_e| "F64 number cannot be Nan")?)
27        }
28        prost_reflect::Value::String(v) => Value::from(v.as_str()),
29        prost_reflect::Value::Bytes(v) => Value::from(v.clone()),
30        prost_reflect::Value::EnumNumber(v) => {
31            if let Some(field_descriptor) = field_descriptor {
32                let kind = field_descriptor.kind();
33                let enum_desc = kind.as_enum().ok_or_else(|| {
34                    format!(
35                        "Internal error while parsing protobuf enum. Field descriptor: {field_descriptor:?}"
36                    )
37                })?;
38                Value::from(
39                    enum_desc
40                        .get_value(*v)
41                        .ok_or_else(|| {
42                            format!("The number {} cannot be in '{}'", v, enum_desc.name())
43                        })?
44                        .name(),
45                )
46            } else {
47                Err("Expected valid field descriptor")?
48            }
49        }
50        prost_reflect::Value::Message(v) => {
51            let mut obj_map = ObjectMap::new();
52            for field_desc in v.descriptor().fields() {
53                if v.has_field(&field_desc) {
54                    let field_value = v.get_field(&field_desc);
55                    let out = proto_to_value(field_value.as_ref(), Some(&field_desc), options)?;
56                    let field_key = if options.use_json_names {
57                        field_desc.json_name()
58                    } else {
59                        field_desc.name()
60                    };
61                    obj_map.insert(field_key.into(), out);
62                }
63            }
64            Value::from(obj_map)
65        }
66        prost_reflect::Value::List(v) => {
67            let vec = v
68                .iter()
69                .map(|o| proto_to_value(o, field_descriptor, options))
70                .collect::<Result<Vec<_>, String>>()?;
71            Value::from(vec)
72        }
73        prost_reflect::Value::Map(v) => {
74            if let Some(field_descriptor) = field_descriptor {
75                let kind = field_descriptor.kind();
76                let message_desc = kind.as_message().ok_or_else(|| {
77                    format!(
78                        "Internal error while parsing protobuf field descriptor: {field_descriptor:?}"
79                    )
80                })?;
81                Value::from(
82                    v.iter()
83                        .map(|kv| {
84                            Ok((
85                                kv.0.as_str()
86                                    .ok_or_else(|| {
87                                        format!(
88                                            "Internal error while parsing protobuf map. Field descriptor: {field_descriptor:?}"
89                                        )
90                                    })?
91                                    .into(),
92                                proto_to_value(kv.1, Some(&message_desc.map_entry_value_field()), options)?,
93                            ))
94                        })
95                        .collect::<std::result::Result<ObjectMap, String>>()?,
96                )
97            } else {
98                Err("Expected valid field descriptor")?
99            }
100        }
101    };
102    Ok(vrl_value)
103}
104
105#[cfg(feature = "enable_system_functions")]
106pub(crate) fn parse_proto(descriptor: &MessageDescriptor, value: Value) -> Resolved {
107    let bytes = value.try_bytes()?;
108
109    let dynamic_message = DynamicMessage::decode(descriptor.clone(), bytes)
110        .map_err(|error| format!("Error parsing protobuf: {error:?}"))?;
111    Ok(proto_to_value(
112        &prost_reflect::Value::Message(dynamic_message),
113        None,
114        &Options::default(),
115    )?)
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121    use crate::protobuf::descriptor::get_message_descriptor;
122    use crate::{owned_value_path, value};
123    use std::path::PathBuf;
124    use std::{env, fs};
125
126    fn test_data_dir() -> PathBuf {
127        PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap()).join("tests/data/protobuf")
128    }
129
130    fn read_pb_file(protobuf_bin_message_path: &str) -> String {
131        fs::read_to_string(test_data_dir().join(protobuf_bin_message_path)).unwrap()
132    }
133
134    #[test]
135    fn test_parse_files() {
136        let path = test_data_dir().join("test_protobuf/v1/test_protobuf.desc");
137        let descriptor = get_message_descriptor(&path, "test_protobuf.v1.Person").unwrap();
138        let encoded_value = value!(read_pb_file("test_protobuf/v1/input/person_someone.pb"));
139        let parsed_value = parse_proto(&descriptor, encoded_value);
140        assert!(
141            parsed_value.is_ok(),
142            "Failed to parse proto: {:?}",
143            parsed_value.unwrap_err()
144        );
145        let parsed_value = parsed_value.unwrap();
146        let value = value!({ name: "Someone", phones: [{number: "123-456"}] });
147        assert_eq!(value, parsed_value)
148    }
149
150    #[test]
151    fn test_parse_proto3() {
152        let path = test_data_dir().join("test_protobuf3/v1/test_protobuf3.desc");
153        let descriptor = get_message_descriptor(&path, "test_protobuf3.v1.Person").unwrap();
154        let encoded_value = value!(read_pb_file("test_protobuf3/v1/input/person_someone.pb"));
155        let parsed_value = parse_proto(&descriptor, encoded_value);
156        assert!(
157            parsed_value.is_ok(),
158            "Failed to parse proto: {:?}",
159            parsed_value.unwrap_err()
160        );
161        let parsed_value = parsed_value.unwrap();
162        let value = value!({ name: "Someone",
163                                    phones: [{number: "123-456", type: "PHONE_TYPE_MOBILE"}] });
164        assert_eq!(value, parsed_value)
165    }
166
167    #[test]
168    fn test_proto_to_value_with_json_names() {
169        let path = test_data_dir().join("test_protobuf3/v1/test_protobuf3.desc");
170        let descriptor = get_message_descriptor(&path, "test_protobuf3.v1.Person").unwrap();
171        let encoded_value = value!(read_pb_file("test_protobuf3/v1/input/person_with_job.pb"));
172        let raw_bytes = encoded_value.try_bytes().unwrap();
173        let dynamic_message = DynamicMessage::decode(descriptor.clone(), raw_bytes)
174            .map_err(|error| format!("Error parsing protobuf: {error:?}"))
175            .unwrap();
176        let prost_value = prost_reflect::Value::Message(dynamic_message);
177        let vrl_value = proto_to_value(
178            &prost_value,
179            None,
180            &Options {
181                use_json_names: true,
182            },
183        )
184        .unwrap();
185        assert_eq!(
186            vrl_value.get(&owned_value_path!("jobDescription")),
187            Some(&Value::from("some job"))
188        );
189    }
190}