1use std::{collections::BTreeMap, convert::TryInto};
2
3use chrono::{serde::ts_seconds, DateTime, TimeZone, Utc};
4use ordered_float::NotNan;
5use serde::{Deserialize, Serialize};
6use vector_lib::event::{KeyString, ObjectMap, Value};
7
8#[derive(Debug, Deserialize, Serialize)]
18#[serde(untagged)]
19pub(super) enum FluentMessage {
20 Message(FluentTag, FluentTimestamp, FluentRecord),
21 MessageWithOptions(
25 FluentTag,
26 FluentTimestamp,
27 FluentRecord,
28 FluentMessageOptions,
29 ),
30 Forward(FluentTag, Vec<FluentEntry>),
31 ForwardWithOptions(FluentTag, Vec<FluentEntry>, FluentMessageOptions),
32 PackedForward(FluentTag, serde_bytes::ByteBuf),
33 PackedForwardWithOptions(FluentTag, serde_bytes::ByteBuf, FluentMessageOptions),
34
35 Heartbeat(rmpv::Value), }
38
39#[derive(Default, Debug, Deserialize, Serialize)]
43#[serde(default)]
44pub(super) struct FluentMessageOptions {
45 pub(super) size: Option<u64>, pub(super) chunk: Option<String>, pub(super) compressed: Option<String>, }
49
50#[derive(Debug, Deserialize, Serialize)]
54pub(super) struct FluentEntry(pub(super) FluentTimestamp, pub(super) FluentRecord);
55
56pub(super) type FluentRecord = BTreeMap<String, FluentValue>;
58
59pub(super) type FluentTag = String;
61
62#[derive(Clone, Debug, PartialEq, Serialize)]
66pub(super) struct FluentEventTime(DateTime<Utc>);
67
68impl<'de> serde::de::Deserialize<'de> for FluentEventTime {
69 fn deserialize<D>(deserializer: D) -> Result<FluentEventTime, D::Error>
70 where
71 D: serde::Deserializer<'de>,
72 {
73 struct FluentEventTimeVisitor;
74
75 impl<'de> serde::de::Visitor<'de> for FluentEventTimeVisitor {
76 type Value = FluentEventTime;
77
78 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 formatter.write_str("fluent timestamp extension")
80 }
81
82 fn visit_newtype_struct<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
83 where
84 D: serde::de::Deserializer<'de>,
85 {
86 deserializer.deserialize_tuple(2, self)
87 }
88
89 fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
90 where
91 A: serde::de::SeqAccess<'de>,
92 {
93 let tag: u32 = seq
94 .next_element()?
95 .ok_or_else(|| serde::de::Error::invalid_length(0, &self))?;
96
97 if tag != 0 {
98 return Err(serde::de::Error::custom(format!(
99 "expected extension type 0 for fluent timestamp, got {tag}"
100 )));
101 }
102
103 let bytes: serde_bytes::ByteBuf = seq
104 .next_element()?
105 .ok_or_else(|| serde::de::Error::invalid_length(1, &self))?;
106
107 if bytes.len() != 8 {
108 return Err(serde::de::Error::custom(format!(
109 "expected exactly 8 bytes for binary encoded fluent timestamp, got {}",
110 bytes.len()
111 )));
112 }
113
114 let seconds = u32::from_be_bytes(bytes[..4].try_into().expect("exactly 4 bytes"));
116 let nanoseconds =
117 u32::from_be_bytes(bytes[4..].try_into().expect("exactly 4 bytes"));
118
119 Ok(FluentEventTime(
120 Utc.timestamp_opt(seconds.into(), nanoseconds)
121 .single()
122 .expect("invalid timestamp"),
123 ))
124 }
125 }
126
127 deserializer.deserialize_any(FluentEventTimeVisitor)
128 }
129}
130
131#[derive(Debug, Deserialize, PartialEq, Serialize)]
135pub(super) struct FluentValue(rmpv::Value);
136
137impl From<rmpv::Value> for FluentValue {
138 fn from(value: rmpv::Value) -> Self {
139 Self(value)
140 }
141}
142
143impl From<FluentValue> for Value {
144 fn from(value: FluentValue) -> Self {
145 match value.0 {
146 rmpv::Value::Nil => Value::Null,
147 rmpv::Value::Boolean(b) => Value::Boolean(b),
148 rmpv::Value::Integer(i) => i
149 .as_i64()
150 .map(Value::Integer)
151 .unwrap_or_else(|| Value::Bytes(i.to_string().into())),
154 rmpv::Value::F32(f) => {
155 NotNan::new(f as f64)
157 .map(Value::Float)
158 .unwrap_or(Value::Null)
159 }
160 rmpv::Value::F64(f) => {
161 NotNan::new(f).map(Value::Float).unwrap_or(Value::Null)
163 }
164 rmpv::Value::String(s) => Value::Bytes(s.into_bytes().into()),
165 rmpv::Value::Binary(bytes) => Value::Bytes(bytes.into()),
166 rmpv::Value::Array(values) => Value::Array(
167 values
168 .into_iter()
169 .map(|value| Value::from(FluentValue(value)))
170 .collect(),
171 ),
172 rmpv::Value::Map(values) => {
173 Value::Object(
182 values
183 .into_iter()
184 .filter_map(|(key, value)| {
185 key.as_str()
186 .map(|k| (k.into(), Value::from(FluentValue(value))))
187 })
188 .collect(),
189 )
190 }
191 rmpv::Value::Ext(code, bytes) => {
192 let mut fields = ObjectMap::new();
193 fields.insert(
194 KeyString::from("msgpack_extension_code"),
195 Value::Integer(code.into()),
196 );
197 fields.insert(KeyString::from("bytes"), Value::Bytes(bytes.into()));
198 Value::Object(fields)
199 }
200 }
201 }
202}
203
204#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
208#[serde(untagged)]
209pub(super) enum FluentTimestamp {
210 #[serde(with = "ts_seconds")]
211 Unix(DateTime<Utc>),
212 Ext(FluentEventTime),
213}
214
215impl From<FluentTimestamp> for Value {
216 fn from(timestamp: FluentTimestamp) -> Self {
217 match timestamp {
218 FluentTimestamp::Unix(timestamp) | FluentTimestamp::Ext(FluentEventTime(timestamp)) => {
219 Value::Timestamp(timestamp)
220 }
221 }
222 }
223}
224
225#[cfg(test)]
226mod test {
227 use std::collections::BTreeMap;
228
229 use approx::assert_relative_eq;
230 use quickcheck::quickcheck;
231 use vrl::value::{ObjectMap, Value};
232
233 use crate::sources::fluent::message::FluentValue;
234
235 quickcheck! {
236 fn from_bool(input: bool) -> () {
237 assert_eq!(Value::from(FluentValue(rmpv::Value::Boolean(input))),
238 Value::Boolean(input))
239 }
240 }
241
242 quickcheck! {
243 fn from_i64(input: i64) -> () {
244 assert_eq!(Value::from(FluentValue(rmpv::Value::Integer(rmpv::Integer::from(input)))),
245 Value::Integer(input))
246 }
247 }
248
249 quickcheck! {
250 fn from_u64(input: u64) -> () {
251 if input > i64::MAX as u64 {
252 assert_eq!(Value::from(FluentValue(rmpv::Value::Integer(rmpv::Integer::from(input)))),
253 Value::Bytes(input.to_string().into()))
254 } else {
255 assert_eq!(Value::from(FluentValue(rmpv::Value::Integer(rmpv::Integer::from(input)))),
256 Value::Integer(input as i64))
257 }
258 }
259 }
260
261 quickcheck! {
262 fn from_f32(input: f32) -> () {
263 let val = Value::from(FluentValue(rmpv::Value::F32(input)));
264 if input.is_nan() {
265 assert_eq!(val, Value::Null);
266 } else {
267 assert_relative_eq!(input as f64, val.as_float().unwrap().into_inner());
268 }
269 }
270 }
271
272 quickcheck! {
273 fn from_f64(input: f64) -> () {
274 let val = Value::from(FluentValue(rmpv::Value::F64(input)));
275 if input.is_nan() {
276 assert_eq!(val, Value::Null);
277 } else {
278 assert_relative_eq!(input, val.as_float().unwrap().into_inner());
279 }
280 }
281 }
282
283 quickcheck! {
284 fn from_string(input: String) -> () {
285 assert_eq!(Value::from(FluentValue(rmpv::Value::String(rmpv::Utf8String::from(input.clone())))),
286 Value::Bytes(input.into_bytes().into()))
287 }
288 }
289
290 quickcheck! {
291 fn from_binary(input: Vec<u8>) -> () {
292 assert_eq!(Value::from(FluentValue(rmpv::Value::Binary(input.clone()))),
293 Value::Bytes(input.into()))
294 }
295 }
296
297 quickcheck! {
298 fn from_i64_array(input: Vec<i64>) -> () {
299 let actual: rmpv::Value = rmpv::Value::Array(input.iter().map(|i| rmpv::Value::from(*i)).collect());
300 let expected: Value = Value::Array(input.iter().map(|i| Value::Integer(*i)).collect());
301 assert_eq!(Value::from(FluentValue(actual)), expected);
302 }
303 }
304
305 quickcheck! {
306 fn from_map(input: Vec<(String, i64)>) -> () {
307 let key_fn = |k| { rmpv::Value::String(rmpv::Utf8String::from(k)) };
308 let val_fn = |k| { rmpv::Value::Integer(rmpv::Integer::from(k)) };
309 let actual_inner: Vec<(rmpv::Value, rmpv::Value)> = input.clone().into_iter().map(|(k,v)| (key_fn(k), val_fn(v))).collect();
310 let actual = rmpv::Value::Map(actual_inner);
311
312 let mut expected_inner = ObjectMap::new();
313 for (k,v) in input.into_iter() {
314 expected_inner.insert(k.into(), Value::Integer(v));
315 }
316 let expected = Value::Object(expected_inner);
317
318 assert_eq!(Value::from(FluentValue(actual)), expected);
319 }
320 }
321
322 quickcheck! {
323 fn from_nonstring_key_map(input: Vec<(i64, i64)>) -> () {
324 let key_fn = |k| { rmpv::Value::Integer(rmpv::Integer::from(k)) };
329 let val_fn = |k| { rmpv::Value::Integer(rmpv::Integer::from(k)) };
330 let actual_inner: Vec<(rmpv::Value, rmpv::Value)> = input.into_iter().map(|(k,v)| (key_fn(k), val_fn(v))).collect();
331 let actual = rmpv::Value::Map(actual_inner);
332
333 let expected = Value::Object(BTreeMap::new());
334
335 assert_eq!(Value::from(FluentValue(actual)), expected);
336 }
337 }
338
339 #[test]
340 fn from_nil() {
341 assert_eq!(Value::from(FluentValue(rmpv::Value::Nil)), Value::Null);
342 }
343
344 quickcheck! {
345 fn from_ext(code: i8, bytes: Vec<u8>) -> () {
346 let actual = rmpv::Value::Ext(code, bytes.clone());
347
348 let mut inner = ObjectMap::new();
349 inner.insert("msgpack_extension_code".into(), Value::Integer(code.into()));
350 inner.insert("bytes".into(), Value::Bytes(bytes.into()));
351 let expected = Value::Object(inner);
352
353 assert_eq!(Value::from(FluentValue(actual)), expected);
354 }
355 }
356}