vector_core/event/
discriminant.rs

1use std::fmt;
2use std::hash::{Hash, Hasher};
3
4use super::{LogEvent, ObjectMap, Value};
5
6// TODO: if we had `Value` implement `Eq` and `Hash`, the implementation here
7// would be much easier. The issue is with `f64` type. We should consider using
8// a newtype for `f64` there that'd implement `Eq` and `Hash` if it's safe, for
9// example `NormalF64`, and guard the values with `val.is_normal() == true`
10// invariant.
11// See also: https://internals.rust-lang.org/t/f32-f64-should-implement-hash/5436/32
12
13/// An event discriminant identifies a distinguishable subset of events.
14/// Intended for dissecting streams of events to sub-streams, for instance to
15/// be able to allocate a buffer per sub-stream.
16/// Implements `PartialEq`, `Eq` and `Hash` to enable use as a `HashMap` key.
17#[derive(Debug, Clone)]
18pub struct Discriminant {
19    values: Vec<Option<Value>>,
20}
21
22impl Discriminant {
23    /// Create a new Discriminant from the `LogEvent` and an ordered slice of
24    /// fields to include into a discriminant value.
25    pub fn from_log_event(event: &LogEvent, discriminant_fields: &[impl AsRef<str>]) -> Self {
26        let values: Vec<Option<Value>> = discriminant_fields
27            .iter()
28            .map(|discriminant_field| {
29                event
30                    .parse_path_and_get_value(discriminant_field.as_ref())
31                    .ok()
32                    .flatten()
33                    .cloned()
34            })
35            .collect();
36        Self { values }
37    }
38}
39
40impl PartialEq for Discriminant {
41    fn eq(&self, other: &Self) -> bool {
42        self.values
43            .iter()
44            .zip(other.values.iter())
45            .all(|(this, other)| match (this, other) {
46                (None, None) => true,
47                (Some(this), Some(other)) => value_eq(this, other),
48                _ => false,
49            })
50    }
51}
52
53impl Eq for Discriminant {}
54
55// Equality check for discriminant purposes.
56fn value_eq(this: &Value, other: &Value) -> bool {
57    match (this, other) {
58        // Trivial.
59        (Value::Bytes(this), Value::Bytes(other)) => this.eq(other),
60        (Value::Boolean(this), Value::Boolean(other)) => this.eq(other),
61        (Value::Integer(this), Value::Integer(other)) => this.eq(other),
62        (Value::Timestamp(this), Value::Timestamp(other)) => this.eq(other),
63        (Value::Null, Value::Null) => true,
64        // Non-trivial.
65        (Value::Float(this), Value::Float(other)) => f64_eq(this.into_inner(), other.into_inner()),
66        (Value::Array(this), Value::Array(other)) => array_eq(this, other),
67        (Value::Object(this), Value::Object(other)) => map_eq(this, other),
68        // Type mismatch.
69        _ => false,
70    }
71}
72
73// Does an f64 comparison that is suitable for discriminant purposes.
74fn f64_eq(this: f64, other: f64) -> bool {
75    if this.is_nan() && other.is_nan() {
76        return true;
77    }
78    if this != other {
79        return false;
80    }
81    if (this.is_sign_positive() && other.is_sign_negative())
82        || (this.is_sign_negative() && other.is_sign_positive())
83    {
84        return false;
85    }
86    true
87}
88
89fn array_eq(this: &[Value], other: &[Value]) -> bool {
90    if this.len() != other.len() {
91        return false;
92    }
93
94    this.iter()
95        .zip(other.iter())
96        .all(|(first, second)| value_eq(first, second))
97}
98
99fn map_eq(this: &ObjectMap, other: &ObjectMap) -> bool {
100    if this.len() != other.len() {
101        return false;
102    }
103
104    this.iter()
105        .zip(other.iter())
106        .all(|((key1, value1), (key2, value2))| key1 == key2 && value_eq(value1, value2))
107}
108
109impl Hash for Discriminant {
110    fn hash<H: Hasher>(&self, state: &mut H) {
111        for value in &self.values {
112            match value {
113                Some(value) => {
114                    state.write_u8(1);
115                    hash_value(state, value);
116                }
117                None => state.write_u8(0),
118            }
119        }
120    }
121}
122
123// Hashes value for discriminant purposes.
124fn hash_value<H: Hasher>(hasher: &mut H, value: &Value) {
125    match value {
126        // Trivial.
127        Value::Bytes(val) => val.hash(hasher),
128        Value::Regex(val) => val.as_bytes_slice().hash(hasher),
129        Value::Boolean(val) => val.hash(hasher),
130        Value::Integer(val) => val.hash(hasher),
131        Value::Timestamp(val) => val.hash(hasher),
132        // Non-trivial.
133        Value::Float(val) => hash_f64(hasher, val.into_inner()),
134        Value::Array(val) => hash_array(hasher, val),
135        Value::Object(val) => hash_map(hasher, val),
136        Value::Null => hash_null(hasher),
137    }
138}
139
140// Does f64 hashing that is suitable for discriminant purposes.
141fn hash_f64<H: Hasher>(hasher: &mut H, value: f64) {
142    hasher.write(&value.to_ne_bytes());
143}
144
145fn hash_array<H: Hasher>(hasher: &mut H, array: &[Value]) {
146    for val in array {
147        hash_value(hasher, val);
148    }
149}
150
151fn hash_map<H: Hasher>(hasher: &mut H, map: &ObjectMap) {
152    for (key, val) in map {
153        hasher.write(key.as_bytes());
154        hash_value(hasher, val);
155    }
156}
157
158fn hash_null<H: Hasher>(hasher: &mut H) {
159    hasher.write_u8(0);
160}
161
162impl fmt::Display for Discriminant {
163    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
164        for (i, value) in self.values.iter().enumerate() {
165            if i != 0 {
166                write!(fmt, "-")?;
167            }
168            if let Some(value) = value {
169                value.fmt(fmt)?;
170            } else {
171                fmt.write_str("none")?;
172            }
173        }
174        Ok(())
175    }
176}
177
178#[cfg(test)]
179mod tests {
180    use std::collections::{hash_map::DefaultHasher, HashMap};
181
182    use super::*;
183    use crate::event::LogEvent;
184
185    fn hash<H: Hash>(hash: H) -> u64 {
186        let mut hasher = DefaultHasher::new();
187        hash.hash(&mut hasher);
188        hasher.finish()
189    }
190
191    #[test]
192    fn equal() {
193        let mut event_1 = LogEvent::default();
194        event_1.insert("hostname", "localhost");
195        event_1.insert("irrelevant", "not even used");
196        let mut event_2 = event_1.clone();
197        event_2.insert("irrelevant", "does not matter if it's different");
198
199        let discriminant_fields = vec!["hostname".to_string(), "container_id".to_string()];
200
201        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
202        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
203
204        assert_eq!(discriminant_1, discriminant_2);
205        assert_eq!(hash(discriminant_1), hash(discriminant_2));
206    }
207
208    #[test]
209    fn not_equal() {
210        let mut event_1 = LogEvent::default();
211        event_1.insert("hostname", "localhost");
212        event_1.insert("container_id", "abc");
213        let mut event_2 = event_1.clone();
214        event_2.insert("container_id", "def");
215
216        let discriminant_fields = vec!["hostname".to_string(), "container_id".to_string()];
217
218        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
219        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
220
221        assert_ne!(discriminant_1, discriminant_2);
222        assert_ne!(hash(discriminant_1), hash(discriminant_2));
223    }
224
225    #[test]
226    fn field_order() {
227        let mut event_1 = LogEvent::default();
228        event_1.insert("a", "a");
229        event_1.insert("b", "b");
230        let mut event_2 = LogEvent::default();
231        event_2.insert("b", "b");
232        event_2.insert("a", "a");
233
234        let discriminant_fields = vec!["a".to_string(), "b".to_string()];
235
236        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
237        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
238
239        assert_eq!(discriminant_1, discriminant_2);
240        assert_eq!(hash(discriminant_1), hash(discriminant_2));
241    }
242
243    #[test]
244    fn map_values_key_order() {
245        let mut event_1 = LogEvent::default();
246        event_1.insert("nested.a", "a");
247        event_1.insert("nested.b", "b");
248        let mut event_2 = LogEvent::default();
249        event_2.insert("nested.b", "b");
250        event_2.insert("nested.a", "a");
251
252        let discriminant_fields = vec!["nested".to_string()];
253
254        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
255        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
256
257        assert_eq!(discriminant_1, discriminant_2);
258        assert_eq!(hash(discriminant_1), hash(discriminant_2));
259    }
260
261    #[test]
262    fn array_values_insertion_order() {
263        let mut event_1 = LogEvent::default();
264        event_1.insert("array[0]", "a");
265        event_1.insert("array[1]", "b");
266        let mut event_2 = LogEvent::default();
267        event_2.insert("array[1]", "b");
268        event_2.insert("array[0]", "a");
269
270        let discriminant_fields = vec!["array".to_string()];
271
272        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
273        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
274
275        assert_eq!(discriminant_1, discriminant_2);
276        assert_eq!(hash(discriminant_1), hash(discriminant_2));
277    }
278
279    #[test]
280    fn map_values_matter_1() {
281        let mut event_1 = LogEvent::default();
282        event_1.insert("nested.a", "a"); // `nested` is a `Value::Map`
283        let event_2 = LogEvent::default(); // empty event
284
285        let discriminant_fields = vec!["nested".to_string()];
286
287        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
288        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
289
290        assert_ne!(discriminant_1, discriminant_2);
291        assert_ne!(hash(discriminant_1), hash(discriminant_2));
292    }
293
294    #[test]
295    fn map_values_matter_2() {
296        let mut event_1 = LogEvent::default();
297        event_1.insert("nested.a", "a"); // `nested` is a `Value::Map`
298        let mut event_2 = LogEvent::default();
299        event_2.insert("nested", "x"); // `nested` is a `Value::String`
300
301        let discriminant_fields = vec!["nested".to_string()];
302
303        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
304        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
305
306        assert_ne!(discriminant_1, discriminant_2);
307        assert_ne!(hash(discriminant_1), hash(discriminant_2));
308    }
309
310    #[test]
311    fn with_hash_map() {
312        #[allow(clippy::mutable_key_type)]
313        let mut map: HashMap<Discriminant, usize> = HashMap::new();
314
315        let event_stream_1 = {
316            let mut event = LogEvent::default();
317            event.insert("hostname", "a.test");
318            event.insert("container_id", "abc");
319            event
320        };
321
322        let event_stream_2 = {
323            let mut event = LogEvent::default();
324            event.insert("hostname", "b.test");
325            event.insert("container_id", "def");
326            event
327        };
328
329        let event_stream_3 = {
330            // no `hostname` or `container_id`
331            LogEvent::default()
332        };
333
334        let discriminant_fields = vec!["hostname".to_string(), "container_id".to_string()];
335
336        let mut process_event = |event| {
337            let discriminant = Discriminant::from_log_event(&event, &discriminant_fields);
338            *map.entry(discriminant).and_modify(|e| *e += 1).or_insert(0)
339        };
340
341        {
342            let mut event = event_stream_1.clone();
343            event.insert("message", "a");
344            assert_eq!(process_event(event), 0);
345        }
346
347        {
348            let mut event = event_stream_1.clone();
349            event.insert("message", "b");
350            event.insert("irrelevant", "c");
351            assert_eq!(process_event(event), 1);
352        }
353
354        {
355            let mut event = event_stream_2.clone();
356            event.insert("message", "d");
357            assert_eq!(process_event(event), 0);
358        }
359
360        {
361            let mut event = event_stream_2.clone();
362            event.insert("message", "e");
363            event.insert("irrelevant", "d");
364            assert_eq!(process_event(event), 1);
365        }
366
367        {
368            let mut event = event_stream_3.clone();
369            event.insert("message", "f");
370            assert_eq!(process_event(event), 0);
371        }
372
373        {
374            let mut event = event_stream_3.clone();
375            event.insert("message", "g");
376            event.insert("irrelevant", "d");
377            assert_eq!(process_event(event), 1);
378        }
379
380        // Now assert the amount of events processed per discriminant.
381        assert_eq!(process_event(event_stream_1), 2);
382        assert_eq!(process_event(event_stream_2), 2);
383        assert_eq!(process_event(event_stream_3), 2);
384    }
385
386    #[test]
387    fn test_display() {
388        let mut event = LogEvent::default();
389        event.insert("hostname", "localhost");
390        event.insert("container_id", 1);
391
392        let discriminant = Discriminant::from_log_event(
393            &event,
394            &["hostname".to_string(), "container_id".to_string()],
395        );
396        assert_eq!(format!("{discriminant}"), "\"localhost\"-1");
397
398        let discriminant =
399            Discriminant::from_log_event(&event, &["hostname".to_string(), "service".to_string()]);
400        assert_eq!(format!("{discriminant}"), "\"localhost\"-none");
401    }
402}