vector_common/internal_event/
cached_event.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use std::{
    collections::HashMap,
    hash::Hash,
    sync::{Arc, RwLock},
};

use derivative::Derivative;

use super::{InternalEventHandle, RegisterInternalEvent};

/// Metrics (eg. `component_sent_event_bytes_total`) may need to emit tags based on
/// values contained within the events. These tags can't be determined in advance.
///
/// Metrics need to be registered and the handle needs to be held onto in order to
/// prevent them from expiring and being dropped (this would result in the counter
/// resetting to zero).
/// `CachedEvent` is used to maintain a store of these registered metrics. When a
/// new event is emitted for a previously unseen set of tags an event is registered
/// and stored in the cache.
#[derive(Derivative)]
#[derivative(Clone(bound = "T: Clone"))]
pub struct RegisteredEventCache<T, Event: RegisterTaggedInternalEvent> {
    fixed_tags: T,
    cache: Arc<
        RwLock<
            HashMap<
                <Event as RegisterTaggedInternalEvent>::Tags,
                <Event as RegisterInternalEvent>::Handle,
            >,
        >,
    >,
}

/// This trait must be implemented by events that emit dynamic tags. `register` must
/// be implemented to register an event based on the set of tags passed.
pub trait RegisterTaggedInternalEvent: RegisterInternalEvent {
    /// The type that will contain the data necessary to extract the tags
    /// that will be used when registering the event.
    type Tags;

    /// The type that contains data necessary to extract the tags that will
    /// be fixed and only need setting up front when the cache is first created.
    type Fixed;

    fn register(fixed: Self::Fixed, tags: Self::Tags) -> <Self as RegisterInternalEvent>::Handle;
}

impl<Event, EventHandle, Data, Tags, FixedTags> RegisteredEventCache<FixedTags, Event>
where
    Data: Sized,
    EventHandle: InternalEventHandle<Data = Data>,
    Tags: Clone + Eq + Hash,
    FixedTags: Clone,
    Event: RegisterInternalEvent<Handle = EventHandle>
        + RegisterTaggedInternalEvent<Tags = Tags, Fixed = FixedTags>,
{
    /// Create a new event cache with a set of fixed tags. These tags are passed to
    /// all registered events.
    pub fn new(fixed_tags: FixedTags) -> Self {
        Self {
            fixed_tags,
            cache: Arc::default(),
        }
    }

    /// Emits the event with the given tags.
    /// It will register the event and store in the cache if this has not already
    /// been done.
    ///
    /// # Panics
    ///
    /// This will panic if the lock is poisoned.
    pub fn emit(&self, tags: &Tags, value: Data) {
        let read = self.cache.read().unwrap();
        if let Some(event) = read.get(tags) {
            event.emit(value);
        } else {
            let event = <Event as RegisterTaggedInternalEvent>::register(
                self.fixed_tags.clone(),
                tags.clone(),
            );
            event.emit(value);

            // Ensure the read lock is dropped so we can write.
            drop(read);
            self.cache.write().unwrap().insert(tags.clone(), event);
        }
    }
}

#[cfg(test)]
mod tests {
    #![allow(unreachable_pub)]
    use metrics::{counter, Counter};

    use super::*;

    crate::registered_event!(
        TestEvent {
            fixed: String,
            dynamic: String,
        } => {
            event: Counter = {
                counter!("test_event_total", "fixed" => self.fixed, "dynamic" => self.dynamic)
            },
        }

        fn emit(&self, count: u64) {
            self.event.increment(count);
        }

        fn register(fixed: String, dynamic: String) {
            crate::internal_event::register(TestEvent {
                fixed,
                dynamic,
            })
        }
    );

    #[test]
    fn test_fixed_tag() {
        let event: RegisteredEventCache<String, TestEvent> =
            RegisteredEventCache::new("fixed".to_string());

        for tag in 1..=5 {
            event.emit(&format!("dynamic{tag}"), tag);
        }
    }
}