vector_core/event/
discriminant.rs

1use std::{
2    fmt,
3    hash::{Hash, Hasher},
4};
5
6use super::{LogEvent, ObjectMap, Value};
7
8// TODO: if we had `Value` implement `Eq` and `Hash`, the implementation here
9// would be much easier. The issue is with `f64` type. We should consider using
10// a newtype for `f64` there that'd implement `Eq` and `Hash` if it's safe, for
11// example `NormalF64`, and guard the values with `val.is_normal() == true`
12// invariant.
13// See also: https://internals.rust-lang.org/t/f32-f64-should-implement-hash/5436/32
14
15/// An event discriminant identifies a distinguishable subset of events.
16/// Intended for dissecting streams of events to sub-streams, for instance to
17/// be able to allocate a buffer per sub-stream.
18/// Implements `PartialEq`, `Eq` and `Hash` to enable use as a `HashMap` key.
19#[derive(Debug, Clone)]
20pub struct Discriminant {
21    values: Vec<Option<Value>>,
22}
23
24impl Discriminant {
25    /// Create a new Discriminant from the `LogEvent` and an ordered slice of
26    /// fields to include into a discriminant value.
27    pub fn from_log_event(event: &LogEvent, discriminant_fields: &[impl AsRef<str>]) -> Self {
28        let values: Vec<Option<Value>> = discriminant_fields
29            .iter()
30            .map(|discriminant_field| {
31                event
32                    .parse_path_and_get_value(discriminant_field.as_ref())
33                    .ok()
34                    .flatten()
35                    .cloned()
36            })
37            .collect();
38        Self { values }
39    }
40}
41
42impl PartialEq for Discriminant {
43    fn eq(&self, other: &Self) -> bool {
44        self.values
45            .iter()
46            .zip(other.values.iter())
47            .all(|(this, other)| match (this, other) {
48                (None, None) => true,
49                (Some(this), Some(other)) => value_eq(this, other),
50                _ => false,
51            })
52    }
53}
54
55impl Eq for Discriminant {}
56
57// Equality check for discriminant purposes.
58fn value_eq(this: &Value, other: &Value) -> bool {
59    match (this, other) {
60        // Trivial.
61        (Value::Bytes(this), Value::Bytes(other)) => this.eq(other),
62        (Value::Boolean(this), Value::Boolean(other)) => this.eq(other),
63        (Value::Integer(this), Value::Integer(other)) => this.eq(other),
64        (Value::Timestamp(this), Value::Timestamp(other)) => this.eq(other),
65        (Value::Null, Value::Null) => true,
66        // Non-trivial.
67        (Value::Float(this), Value::Float(other)) => f64_eq(this.into_inner(), other.into_inner()),
68        (Value::Array(this), Value::Array(other)) => array_eq(this, other),
69        (Value::Object(this), Value::Object(other)) => map_eq(this, other),
70        // Type mismatch.
71        _ => false,
72    }
73}
74
75// Does an f64 comparison that is suitable for discriminant purposes.
76fn f64_eq(this: f64, other: f64) -> bool {
77    if this.is_nan() && other.is_nan() {
78        return true;
79    }
80    if this != other {
81        return false;
82    }
83    if (this.is_sign_positive() && other.is_sign_negative())
84        || (this.is_sign_negative() && other.is_sign_positive())
85    {
86        return false;
87    }
88    true
89}
90
91fn array_eq(this: &[Value], other: &[Value]) -> bool {
92    if this.len() != other.len() {
93        return false;
94    }
95
96    this.iter()
97        .zip(other.iter())
98        .all(|(first, second)| value_eq(first, second))
99}
100
101fn map_eq(this: &ObjectMap, other: &ObjectMap) -> bool {
102    if this.len() != other.len() {
103        return false;
104    }
105
106    this.iter()
107        .zip(other.iter())
108        .all(|((key1, value1), (key2, value2))| key1 == key2 && value_eq(value1, value2))
109}
110
111impl Hash for Discriminant {
112    fn hash<H: Hasher>(&self, state: &mut H) {
113        for value in &self.values {
114            match value {
115                Some(value) => {
116                    state.write_u8(1);
117                    hash_value(state, value);
118                }
119                None => state.write_u8(0),
120            }
121        }
122    }
123}
124
125// Hashes value for discriminant purposes.
126fn hash_value<H: Hasher>(hasher: &mut H, value: &Value) {
127    match value {
128        // Trivial.
129        Value::Bytes(val) => val.hash(hasher),
130        Value::Regex(val) => val.as_bytes_slice().hash(hasher),
131        Value::Boolean(val) => val.hash(hasher),
132        Value::Integer(val) => val.hash(hasher),
133        Value::Timestamp(val) => val.hash(hasher),
134        // Non-trivial.
135        Value::Float(val) => hash_f64(hasher, val.into_inner()),
136        Value::Array(val) => hash_array(hasher, val),
137        Value::Object(val) => hash_map(hasher, val),
138        Value::Null => hash_null(hasher),
139    }
140}
141
142// Does f64 hashing that is suitable for discriminant purposes.
143fn hash_f64<H: Hasher>(hasher: &mut H, value: f64) {
144    hasher.write(&value.to_ne_bytes());
145}
146
147fn hash_array<H: Hasher>(hasher: &mut H, array: &[Value]) {
148    for val in array {
149        hash_value(hasher, val);
150    }
151}
152
153fn hash_map<H: Hasher>(hasher: &mut H, map: &ObjectMap) {
154    for (key, val) in map {
155        hasher.write(key.as_bytes());
156        hash_value(hasher, val);
157    }
158}
159
160fn hash_null<H: Hasher>(hasher: &mut H) {
161    hasher.write_u8(0);
162}
163
164impl fmt::Display for Discriminant {
165    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
166        for (i, value) in self.values.iter().enumerate() {
167            if i != 0 {
168                write!(fmt, "-")?;
169            }
170            if let Some(value) = value {
171                value.fmt(fmt)?;
172            } else {
173                fmt.write_str("none")?;
174            }
175        }
176        Ok(())
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use std::collections::{HashMap, hash_map::DefaultHasher};
183
184    use super::*;
185    use crate::event::LogEvent;
186
187    fn hash<H: Hash>(hash: H) -> u64 {
188        let mut hasher = DefaultHasher::new();
189        hash.hash(&mut hasher);
190        hasher.finish()
191    }
192
193    #[test]
194    fn equal() {
195        let mut event_1 = LogEvent::default();
196        event_1.insert("hostname", "localhost");
197        event_1.insert("irrelevant", "not even used");
198        let mut event_2 = event_1.clone();
199        event_2.insert("irrelevant", "does not matter if it's different");
200
201        let discriminant_fields = vec!["hostname".to_string(), "container_id".to_string()];
202
203        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
204        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
205
206        assert_eq!(discriminant_1, discriminant_2);
207        assert_eq!(hash(discriminant_1), hash(discriminant_2));
208    }
209
210    #[test]
211    fn not_equal() {
212        let mut event_1 = LogEvent::default();
213        event_1.insert("hostname", "localhost");
214        event_1.insert("container_id", "abc");
215        let mut event_2 = event_1.clone();
216        event_2.insert("container_id", "def");
217
218        let discriminant_fields = vec!["hostname".to_string(), "container_id".to_string()];
219
220        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
221        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
222
223        assert_ne!(discriminant_1, discriminant_2);
224        assert_ne!(hash(discriminant_1), hash(discriminant_2));
225    }
226
227    #[test]
228    fn field_order() {
229        let mut event_1 = LogEvent::default();
230        event_1.insert("a", "a");
231        event_1.insert("b", "b");
232        let mut event_2 = LogEvent::default();
233        event_2.insert("b", "b");
234        event_2.insert("a", "a");
235
236        let discriminant_fields = vec!["a".to_string(), "b".to_string()];
237
238        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
239        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
240
241        assert_eq!(discriminant_1, discriminant_2);
242        assert_eq!(hash(discriminant_1), hash(discriminant_2));
243    }
244
245    #[test]
246    fn map_values_key_order() {
247        let mut event_1 = LogEvent::default();
248        event_1.insert("nested.a", "a");
249        event_1.insert("nested.b", "b");
250        let mut event_2 = LogEvent::default();
251        event_2.insert("nested.b", "b");
252        event_2.insert("nested.a", "a");
253
254        let discriminant_fields = vec!["nested".to_string()];
255
256        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
257        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
258
259        assert_eq!(discriminant_1, discriminant_2);
260        assert_eq!(hash(discriminant_1), hash(discriminant_2));
261    }
262
263    #[test]
264    fn array_values_insertion_order() {
265        let mut event_1 = LogEvent::default();
266        event_1.insert("array[0]", "a");
267        event_1.insert("array[1]", "b");
268        let mut event_2 = LogEvent::default();
269        event_2.insert("array[1]", "b");
270        event_2.insert("array[0]", "a");
271
272        let discriminant_fields = vec!["array".to_string()];
273
274        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
275        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
276
277        assert_eq!(discriminant_1, discriminant_2);
278        assert_eq!(hash(discriminant_1), hash(discriminant_2));
279    }
280
281    #[test]
282    fn map_values_matter_1() {
283        let mut event_1 = LogEvent::default();
284        event_1.insert("nested.a", "a"); // `nested` is a `Value::Map`
285        let event_2 = LogEvent::default(); // empty event
286
287        let discriminant_fields = vec!["nested".to_string()];
288
289        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
290        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
291
292        assert_ne!(discriminant_1, discriminant_2);
293        assert_ne!(hash(discriminant_1), hash(discriminant_2));
294    }
295
296    #[test]
297    fn map_values_matter_2() {
298        let mut event_1 = LogEvent::default();
299        event_1.insert("nested.a", "a"); // `nested` is a `Value::Map`
300        let mut event_2 = LogEvent::default();
301        event_2.insert("nested", "x"); // `nested` is a `Value::String`
302
303        let discriminant_fields = vec!["nested".to_string()];
304
305        let discriminant_1 = Discriminant::from_log_event(&event_1, &discriminant_fields);
306        let discriminant_2 = Discriminant::from_log_event(&event_2, &discriminant_fields);
307
308        assert_ne!(discriminant_1, discriminant_2);
309        assert_ne!(hash(discriminant_1), hash(discriminant_2));
310    }
311
312    #[test]
313    fn with_hash_map() {
314        #[allow(clippy::mutable_key_type)]
315        let mut map: HashMap<Discriminant, usize> = HashMap::new();
316
317        let event_stream_1 = {
318            let mut event = LogEvent::default();
319            event.insert("hostname", "a.test");
320            event.insert("container_id", "abc");
321            event
322        };
323
324        let event_stream_2 = {
325            let mut event = LogEvent::default();
326            event.insert("hostname", "b.test");
327            event.insert("container_id", "def");
328            event
329        };
330
331        let event_stream_3 = {
332            // no `hostname` or `container_id`
333            LogEvent::default()
334        };
335
336        let discriminant_fields = vec!["hostname".to_string(), "container_id".to_string()];
337
338        let mut process_event = |event| {
339            let discriminant = Discriminant::from_log_event(&event, &discriminant_fields);
340            *map.entry(discriminant).and_modify(|e| *e += 1).or_insert(0)
341        };
342
343        {
344            let mut event = event_stream_1.clone();
345            event.insert("message", "a");
346            assert_eq!(process_event(event), 0);
347        }
348
349        {
350            let mut event = event_stream_1.clone();
351            event.insert("message", "b");
352            event.insert("irrelevant", "c");
353            assert_eq!(process_event(event), 1);
354        }
355
356        {
357            let mut event = event_stream_2.clone();
358            event.insert("message", "d");
359            assert_eq!(process_event(event), 0);
360        }
361
362        {
363            let mut event = event_stream_2.clone();
364            event.insert("message", "e");
365            event.insert("irrelevant", "d");
366            assert_eq!(process_event(event), 1);
367        }
368
369        {
370            let mut event = event_stream_3.clone();
371            event.insert("message", "f");
372            assert_eq!(process_event(event), 0);
373        }
374
375        {
376            let mut event = event_stream_3.clone();
377            event.insert("message", "g");
378            event.insert("irrelevant", "d");
379            assert_eq!(process_event(event), 1);
380        }
381
382        // Now assert the amount of events processed per discriminant.
383        assert_eq!(process_event(event_stream_1), 2);
384        assert_eq!(process_event(event_stream_2), 2);
385        assert_eq!(process_event(event_stream_3), 2);
386    }
387
388    #[test]
389    fn test_display() {
390        let mut event = LogEvent::default();
391        event.insert("hostname", "localhost");
392        event.insert("container_id", 1);
393
394        let discriminant = Discriminant::from_log_event(
395            &event,
396            &["hostname".to_string(), "container_id".to_string()],
397        );
398        assert_eq!(format!("{discriminant}"), "\"localhost\"-1");
399
400        let discriminant =
401            Discriminant::from_log_event(&event, &["hostname".to_string(), "service".to_string()]);
402        assert_eq!(format!("{discriminant}"), "\"localhost\"-none");
403    }
404}