vector_common/internal_event/
cached_event.rs

1use std::{
2    collections::HashMap,
3    hash::Hash,
4    sync::{Arc, RwLock},
5};
6
7use derivative::Derivative;
8
9use super::{InternalEventHandle, RegisterInternalEvent};
10
11/// Metrics (eg. `component_sent_event_bytes_total`) may need to emit tags based on
12/// values contained within the events. These tags can't be determined in advance.
13///
14/// Metrics need to be registered and the handle needs to be held onto in order to
15/// prevent them from expiring and being dropped (this would result in the counter
16/// resetting to zero).
17/// `CachedEvent` is used to maintain a store of these registered metrics. When a
18/// new event is emitted for a previously unseen set of tags an event is registered
19/// and stored in the cache.
20#[derive(Derivative)]
21#[derivative(Clone(bound = "T: Clone"))]
22pub struct RegisteredEventCache<T, Event: RegisterTaggedInternalEvent> {
23    fixed_tags: T,
24    cache: Arc<
25        RwLock<
26            HashMap<
27                <Event as RegisterTaggedInternalEvent>::Tags,
28                <Event as RegisterInternalEvent>::Handle,
29            >,
30        >,
31    >,
32}
33
34/// This trait must be implemented by events that emit dynamic tags. `register` must
35/// be implemented to register an event based on the set of tags passed.
36pub trait RegisterTaggedInternalEvent: RegisterInternalEvent {
37    /// The type that will contain the data necessary to extract the tags
38    /// that will be used when registering the event.
39    type Tags;
40
41    /// The type that contains data necessary to extract the tags that will
42    /// be fixed and only need setting up front when the cache is first created.
43    type Fixed;
44
45    fn register(fixed: Self::Fixed, tags: Self::Tags) -> <Self as RegisterInternalEvent>::Handle;
46}
47
48impl<Event, EventHandle, Data, Tags, FixedTags> RegisteredEventCache<FixedTags, Event>
49where
50    Data: Sized,
51    EventHandle: InternalEventHandle<Data = Data>,
52    Tags: Clone + Eq + Hash,
53    FixedTags: Clone,
54    Event: RegisterInternalEvent<Handle = EventHandle>
55        + RegisterTaggedInternalEvent<Tags = Tags, Fixed = FixedTags>,
56{
57    /// Create a new event cache with a set of fixed tags. These tags are passed to
58    /// all registered events.
59    pub fn new(fixed_tags: FixedTags) -> Self {
60        Self {
61            fixed_tags,
62            cache: Arc::default(),
63        }
64    }
65
66    /// Emits the event with the given tags.
67    /// It will register the event and store in the cache if this has not already
68    /// been done.
69    ///
70    /// # Panics
71    ///
72    /// This will panic if the lock is poisoned.
73    pub fn emit(&self, tags: &Tags, value: Data) {
74        let read = self.cache.read().unwrap();
75        if let Some(event) = read.get(tags) {
76            event.emit(value);
77        } else {
78            let event = <Event as RegisterTaggedInternalEvent>::register(
79                self.fixed_tags.clone(),
80                tags.clone(),
81            );
82            event.emit(value);
83
84            // Ensure the read lock is dropped so we can write.
85            drop(read);
86            self.cache.write().unwrap().insert(tags.clone(), event);
87        }
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    #![allow(unreachable_pub)]
94    use metrics::{counter, Counter};
95
96    use super::*;
97
98    crate::registered_event!(
99        TestEvent {
100            fixed: String,
101            dynamic: String,
102        } => {
103            event: Counter = {
104                counter!("test_event_total", "fixed" => self.fixed, "dynamic" => self.dynamic)
105            },
106        }
107
108        fn emit(&self, count: u64) {
109            self.event.increment(count);
110        }
111
112        fn register(fixed: String, dynamic: String) {
113            crate::internal_event::register(TestEvent {
114                fixed,
115                dynamic,
116            })
117        }
118    );
119
120    #[test]
121    fn test_fixed_tag() {
122        let event: RegisteredEventCache<String, TestEvent> =
123            RegisteredEventCache::new("fixed".to_string());
124
125        for tag in 1..=5 {
126            event.emit(&format!("dynamic{tag}"), tag);
127        }
128    }
129}