vector/internal_events/
open.rs

1use std::{
2    hint,
3    sync::{
4        Arc,
5        atomic::{AtomicUsize, Ordering},
6    },
7};
8
9use metrics::gauge;
10use vector_lib::NamedInternalEvent;
11use vector_lib::internal_event::InternalEvent;
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct ConnectionOpen {
15    pub count: usize,
16}
17
18impl InternalEvent for ConnectionOpen {
19    fn emit(self) {
20        gauge!("open_connections").set(self.count as f64);
21    }
22}
23
24#[derive(Debug, NamedInternalEvent)]
25pub struct EndpointsActive {
26    pub count: usize,
27}
28
29impl InternalEvent for EndpointsActive {
30    fn emit(self) {
31        gauge!("active_endpoints").set(self.count as f64);
32    }
33}
34
35#[derive(Clone)]
36pub struct OpenGauge {
37    gauge: Arc<AtomicUsize>,
38}
39
40impl OpenGauge {
41    pub fn new() -> Self {
42        OpenGauge {
43            gauge: Arc::default(),
44        }
45    }
46
47    /// Increments and emits value once created.
48    /// Decrements and emits value once dropped.
49    pub fn open<E: Fn(usize)>(self, emitter: E) -> OpenToken<E> {
50        gauge_add(&self.gauge, 1, &emitter);
51        OpenToken {
52            gauge: self.gauge,
53            emitter,
54        }
55    }
56
57    #[cfg(all(feature = "sources-utils-net-unix", unix))]
58    pub fn any_open(&self) -> bool {
59        self.gauge.load(Ordering::Acquire) != 0
60    }
61}
62
63impl Default for OpenGauge {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69pub struct OpenToken<E: Fn(usize)> {
70    gauge: Arc<AtomicUsize>,
71    emitter: E,
72}
73
74impl<E: Fn(usize)> Drop for OpenToken<E> {
75    fn drop(&mut self) {
76        gauge_add(&self.gauge, -1, &self.emitter);
77    }
78}
79
80/// If reporting gauges from multiple threads, they can end up in a wrong order
81/// resulting in having wrong value for a prolonged period of time.
82/// This function performs a synchronization procedure that corrects that.
83fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) {
84    // The goal of this function is to properly sequence calls to `emitter` from
85    // multiple threads. It is possible that `emitter` will be called multiple
86    // times -- worst case, `n^2 / 2` times where `n` is the number of parallel
87    // peers -- but this is acceptable.
88    //
89    // The implementation here is a spin lock on the `gauge` value with the
90    // critical section being solely for updating the `gauge` value by `add` and
91    // calling `emitter`. If we suffer priority inversion at higher peer counts
92    // we might consider the use of a mutex, which will participate in the OS's
93    // scheduler. See [this
94    // post](https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html)
95    // for details if you're working on something like that and need background.
96    //
97    // The order of calls to `emitter` are not guaranteed but it is guaranteed
98    // that the most recent holder of the lock will be the most recent caller of
99    // `emitter`.
100    let mut value = gauge.load(Ordering::Acquire);
101    loop {
102        let new_value = (value as isize + add) as usize;
103        emitter(new_value);
104        // Try to update gauge to new value and releasing writes to gauge metric
105        // in the process.  Otherwise acquire new writes to gauge metric.
106        //
107        // When `compare_exchange_weak` returns Ok our `new_value` is now the
108        // current value in memory across all CPUs. When the return is Err we
109        // retry with the now current value.
110        match gauge.compare_exchange_weak(value, new_value, Ordering::AcqRel, Ordering::Acquire) {
111            Ok(_) => break,
112            Err(x) => {
113                hint::spin_loop();
114                value = x;
115            }
116        }
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use std::{mem::drop, thread};
123
124    use super::*;
125
126    /// If this fails at any run, then the algorithm in `gauge_add` is faulty.
127    #[test]
128    fn eventually_consistent() {
129        let n = 8;
130        let m = 1000;
131        let gauge = OpenGauge::new();
132        let value = Arc::new(AtomicUsize::new(0));
133
134        let handles = (0..n)
135            .map(|_| {
136                let gauge = gauge.clone();
137                let value = Arc::clone(&value);
138                thread::spawn(move || {
139                    let mut work = 0;
140                    for _ in 0..m {
141                        let token = gauge
142                            .clone()
143                            .open(|count| value.store(count, Ordering::Release));
144                        // Do some work
145                        for i in 0..100 {
146                            work += i;
147                        }
148                        drop(token);
149                    }
150                    work
151                })
152            })
153            .collect::<Vec<_>>();
154
155        for handle in handles {
156            handle.join().unwrap();
157        }
158
159        assert_eq!(0, value.load(Ordering::Acquire));
160    }
161}