vector/sources/fluent/
message.rs

1use std::{collections::BTreeMap, convert::TryInto};
2
3use chrono::{serde::ts_seconds, DateTime, TimeZone, Utc};
4use ordered_float::NotNan;
5use serde::{Deserialize, Serialize};
6use vector_lib::event::{KeyString, ObjectMap, Value};
7
8/// Fluent msgpack messages can be encoded in one of three ways, each with and
9/// without options, all using arrays to encode the top-level fields.
10///
11/// The spec refers to 4 ways, but really CompressedPackedForward is encoded the
12/// same as PackedForward, it just has an additional decompression step.
13///
14/// Not yet handled are the handshake messages.
15///
16/// <https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#event-modes>
17#[derive(Debug, Deserialize, Serialize)]
18#[serde(untagged)]
19pub(super) enum FluentMessage {
20    Message(FluentTag, FluentTimestamp, FluentRecord),
21    // I attempted to just one variant for each of these, with and without options, using an
22    // `Option` for the last element, but rmp expected the number of elements to match in that case
23    // still (it just allows the last element to be `nil`).
24    MessageWithOptions(
25        FluentTag,
26        FluentTimestamp,
27        FluentRecord,
28        FluentMessageOptions,
29    ),
30    Forward(FluentTag, Vec<FluentEntry>),
31    ForwardWithOptions(FluentTag, Vec<FluentEntry>, FluentMessageOptions),
32    PackedForward(FluentTag, serde_bytes::ByteBuf),
33    PackedForwardWithOptions(FluentTag, serde_bytes::ByteBuf, FluentMessageOptions),
34
35    // should be last as it'll match any other message
36    Heartbeat(rmpv::Value), // should be Nil if heartbeat
37}
38
39/// Server options sent by client.
40///
41/// <https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#option>
42#[derive(Default, Debug, Deserialize, Serialize)]
43#[serde(default)]
44pub(super) struct FluentMessageOptions {
45    pub(super) size: Option<u64>, // client provided hint for the number of entries
46    pub(super) chunk: Option<String>, // client provided chunk identifier for acks
47    pub(super) compressed: Option<String>, // this one is required if present
48}
49
50/// Fluent entry consisting of timestamp and record.
51///
52/// <https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#forward-mode>
53#[derive(Debug, Deserialize, Serialize)]
54pub(super) struct FluentEntry(pub(super) FluentTimestamp, pub(super) FluentRecord);
55
56/// Fluent record is just key/value pairs.
57pub(super) type FluentRecord = BTreeMap<String, FluentValue>;
58
59/// Fluent message tag.
60pub(super) type FluentTag = String;
61
62/// Custom decoder for Fluent's EventTime msgpack extension.
63///
64/// <https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#eventtime-ext-format>
65#[derive(Clone, Debug, PartialEq, Serialize)]
66pub(super) struct FluentEventTime(DateTime<Utc>);
67
68impl<'de> serde::de::Deserialize<'de> for FluentEventTime {
69    fn deserialize<D>(deserializer: D) -> Result<FluentEventTime, D::Error>
70    where
71        D: serde::Deserializer<'de>,
72    {
73        struct FluentEventTimeVisitor;
74
75        impl<'de> serde::de::Visitor<'de> for FluentEventTimeVisitor {
76            type Value = FluentEventTime;
77
78            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79                formatter.write_str("fluent timestamp extension")
80            }
81
82            fn visit_newtype_struct<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
83            where
84                D: serde::de::Deserializer<'de>,
85            {
86                deserializer.deserialize_tuple(2, self)
87            }
88
89            fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
90            where
91                A: serde::de::SeqAccess<'de>,
92            {
93                let tag: u32 = seq
94                    .next_element()?
95                    .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
96
97                if tag != 0 {
98                    return Err(serde::de::Error::custom(format!(
99                        "expected extension type 0 for fluent timestamp, got {tag}"
100                    )));
101                }
102
103                let bytes: serde_bytes::ByteBuf = seq
104                    .next_element()?
105                    .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
106
107                if bytes.len() != 8 {
108                    return Err(serde::de::Error::custom(format!(
109                        "expected exactly 8 bytes for binary encoded fluent timestamp, got {}",
110                        bytes.len()
111                    )));
112                }
113
114                // length checked right above
115                let seconds = u32::from_be_bytes(bytes[..4].try_into().expect("exactly 4 bytes"));
116                let nanoseconds =
117                    u32::from_be_bytes(bytes[4..].try_into().expect("exactly 4 bytes"));
118
119                Ok(FluentEventTime(
120                    Utc.timestamp_opt(seconds.into(), nanoseconds)
121                        .single()
122                        .expect("invalid timestamp"),
123                ))
124            }
125        }
126
127        deserializer.deserialize_any(FluentEventTimeVisitor)
128    }
129}
130
131/// Value for fluent record key.
132///
133/// Used mostly just to implement value conversion.
134#[derive(Debug, Deserialize, PartialEq, Serialize)]
135pub(super) struct FluentValue(rmpv::Value);
136
137impl From<rmpv::Value> for FluentValue {
138    fn from(value: rmpv::Value) -> Self {
139        Self(value)
140    }
141}
142
143impl From<FluentValue> for Value {
144    fn from(value: FluentValue) -> Self {
145        match value.0 {
146            rmpv::Value::Nil => Value::Null,
147            rmpv::Value::Boolean(b) => Value::Boolean(b),
148            rmpv::Value::Integer(i) => i
149                .as_i64()
150                .map(Value::Integer)
151                // unwrap large numbers to string similar to how
152                // `From<serde_json::Value> for Value` handles it
153                .unwrap_or_else(|| Value::Bytes(i.to_string().into())),
154            rmpv::Value::F32(f) => {
155                // serde_json converts NaN to Null, so we model that behavior here since this is non-fallible
156                NotNan::new(f as f64)
157                    .map(Value::Float)
158                    .unwrap_or(Value::Null)
159            }
160            rmpv::Value::F64(f) => {
161                // serde_json converts NaN to Null, so we model that behavior here since this is non-fallible
162                NotNan::new(f).map(Value::Float).unwrap_or(Value::Null)
163            }
164            rmpv::Value::String(s) => Value::Bytes(s.into_bytes().into()),
165            rmpv::Value::Binary(bytes) => Value::Bytes(bytes.into()),
166            rmpv::Value::Array(values) => Value::Array(
167                values
168                    .into_iter()
169                    .map(|value| Value::from(FluentValue(value)))
170                    .collect(),
171            ),
172            rmpv::Value::Map(values) => {
173                // Per
174                // <https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#message-modes>
175                // we should expect that keys are always stringy. Ultimately a
176                // lot hinges on what
177                // <https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1#grammar>
178                // defines 'object' as.
179                //
180                // The current implementation will SILENTLY DROP non-stringy keys.
181                Value::Object(
182                    values
183                        .into_iter()
184                        .filter_map(|(key, value)| {
185                            key.as_str()
186                                .map(|k| (k.into(), Value::from(FluentValue(value))))
187                        })
188                        .collect(),
189                )
190            }
191            rmpv::Value::Ext(code, bytes) => {
192                let mut fields = ObjectMap::new();
193                fields.insert(
194                    KeyString::from("msgpack_extension_code"),
195                    Value::Integer(code.into()),
196                );
197                fields.insert(KeyString::from("bytes"), Value::Bytes(bytes.into()));
198                Value::Object(fields)
199            }
200        }
201    }
202}
203
204/// Fluent message timestamp.
205///
206/// Message timestamps can be a unix timestamp or EventTime messagepack ext.
207#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
208#[serde(untagged)]
209pub(super) enum FluentTimestamp {
210    #[serde(with = "ts_seconds")]
211    Unix(DateTime<Utc>),
212    Ext(FluentEventTime),
213}
214
215impl From<FluentTimestamp> for Value {
216    fn from(timestamp: FluentTimestamp) -> Self {
217        match timestamp {
218            FluentTimestamp::Unix(timestamp) | FluentTimestamp::Ext(FluentEventTime(timestamp)) => {
219                Value::Timestamp(timestamp)
220            }
221        }
222    }
223}
224
225#[cfg(test)]
226mod test {
227    use std::collections::BTreeMap;
228
229    use approx::assert_relative_eq;
230    use quickcheck::quickcheck;
231    use vrl::value::{ObjectMap, Value};
232
233    use crate::sources::fluent::message::FluentValue;
234
235    quickcheck! {
236      fn from_bool(input: bool) -> () {
237          assert_eq!(Value::from(FluentValue(rmpv::Value::Boolean(input))),
238              Value::Boolean(input))
239        }
240    }
241
242    quickcheck! {
243      fn from_i64(input: i64) -> () {
244          assert_eq!(Value::from(FluentValue(rmpv::Value::Integer(rmpv::Integer::from(input)))),
245              Value::Integer(input))
246        }
247    }
248
249    quickcheck! {
250        fn from_u64(input: u64) -> () {
251            if input > i64::MAX as u64 {
252                assert_eq!(Value::from(FluentValue(rmpv::Value::Integer(rmpv::Integer::from(input)))),
253                           Value::Bytes(input.to_string().into()))
254            } else {
255                assert_eq!(Value::from(FluentValue(rmpv::Value::Integer(rmpv::Integer::from(input)))),
256                           Value::Integer(input as i64))
257            }
258        }
259    }
260
261    quickcheck! {
262      fn from_f32(input: f32) -> () {
263          let val = Value::from(FluentValue(rmpv::Value::F32(input)));
264          if input.is_nan() {
265              assert_eq!(val, Value::Null);
266          } else {
267              assert_relative_eq!(input as f64, val.as_float().unwrap().into_inner());
268          }
269        }
270    }
271
272    quickcheck! {
273      fn from_f64(input: f64) -> () {
274          let val = Value::from(FluentValue(rmpv::Value::F64(input)));
275          if input.is_nan() {
276              assert_eq!(val, Value::Null);
277          } else {
278              assert_relative_eq!(input, val.as_float().unwrap().into_inner());
279          }
280        }
281    }
282
283    quickcheck! {
284      fn from_string(input: String) -> () {
285          assert_eq!(Value::from(FluentValue(rmpv::Value::String(rmpv::Utf8String::from(input.clone())))),
286                     Value::Bytes(input.into_bytes().into()))
287      }
288    }
289
290    quickcheck! {
291      fn from_binary(input: Vec<u8>) -> () {
292          assert_eq!(Value::from(FluentValue(rmpv::Value::Binary(input.clone()))),
293                     Value::Bytes(input.into()))
294      }
295    }
296
297    quickcheck! {
298      fn from_i64_array(input: Vec<i64>) -> () {
299          let actual: rmpv::Value = rmpv::Value::Array(input.iter().map(|i| rmpv::Value::from(*i)).collect());
300          let expected: Value = Value::Array(input.iter().map(|i| Value::Integer(*i)).collect());
301          assert_eq!(Value::from(FluentValue(actual)), expected);
302      }
303    }
304
305    quickcheck! {
306        fn from_map(input: Vec<(String, i64)>) -> () {
307            let key_fn = |k| { rmpv::Value::String(rmpv::Utf8String::from(k)) };
308            let val_fn = |k| { rmpv::Value::Integer(rmpv::Integer::from(k)) };
309            let actual_inner: Vec<(rmpv::Value, rmpv::Value)> = input.clone().into_iter().map(|(k,v)| (key_fn(k), val_fn(v))).collect();
310            let actual = rmpv::Value::Map(actual_inner);
311
312            let mut expected_inner = ObjectMap::new();
313            for (k,v) in input.into_iter() {
314                expected_inner.insert(k.into(), Value::Integer(v));
315            }
316            let expected = Value::Object(expected_inner);
317
318            assert_eq!(Value::from(FluentValue(actual)), expected);
319      }
320    }
321
322    quickcheck! {
323        fn from_nonstring_key_map(input: Vec<(i64, i64)>) -> () {
324            // Any map that has non-string keys will be coerced into an empty
325            // map. Such maps are a violation of the fluent protocol and we
326            // prefer to silently drop keys rather than crash the process.
327
328            let key_fn = |k| { rmpv::Value::Integer(rmpv::Integer::from(k)) };
329            let val_fn = |k| { rmpv::Value::Integer(rmpv::Integer::from(k)) };
330            let actual_inner: Vec<(rmpv::Value, rmpv::Value)> = input.into_iter().map(|(k,v)| (key_fn(k), val_fn(v))).collect();
331            let actual = rmpv::Value::Map(actual_inner);
332
333            let expected = Value::Object(BTreeMap::new());
334
335            assert_eq!(Value::from(FluentValue(actual)), expected);
336      }
337    }
338
339    #[test]
340    fn from_nil() {
341        assert_eq!(Value::from(FluentValue(rmpv::Value::Nil)), Value::Null);
342    }
343
344    quickcheck! {
345        fn from_ext(code: i8, bytes: Vec<u8>) -> () {
346            let actual = rmpv::Value::Ext(code, bytes.clone());
347
348            let mut inner = ObjectMap::new();
349            inner.insert("msgpack_extension_code".into(), Value::Integer(code.into()));
350            inner.insert("bytes".into(), Value::Bytes(bytes.into()));
351            let expected = Value::Object(inner);
352
353            assert_eq!(Value::from(FluentValue(actual)), expected);
354      }
355    }
356}