vector/internal_events/
open.rs

1use std::{
2    hint,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc,
6    },
7};
8
9use metrics::gauge;
10use vector_lib::internal_event::InternalEvent;
11
12#[derive(Debug)]
13pub struct ConnectionOpen {
14    pub count: usize,
15}
16
17impl InternalEvent for ConnectionOpen {
18    fn emit(self) {
19        gauge!("open_connections").set(self.count as f64);
20    }
21}
22
23#[derive(Debug)]
24pub struct EndpointsActive {
25    pub count: usize,
26}
27
28impl InternalEvent for EndpointsActive {
29    fn emit(self) {
30        gauge!("active_endpoints").set(self.count as f64);
31    }
32}
33
34#[derive(Clone)]
35pub struct OpenGauge {
36    gauge: Arc<AtomicUsize>,
37}
38
39impl OpenGauge {
40    pub fn new() -> Self {
41        OpenGauge {
42            gauge: Arc::default(),
43        }
44    }
45
46    /// Increments and emits value once created.
47    /// Decrements and emits value once dropped.
48    pub fn open<E: Fn(usize)>(self, emitter: E) -> OpenToken<E> {
49        gauge_add(&self.gauge, 1, &emitter);
50        OpenToken {
51            gauge: self.gauge,
52            emitter,
53        }
54    }
55
56    #[cfg(all(feature = "sources-utils-net-unix", unix))]
57    pub fn any_open(&self) -> bool {
58        self.gauge.load(Ordering::Acquire) != 0
59    }
60}
61
62impl Default for OpenGauge {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68pub struct OpenToken<E: Fn(usize)> {
69    gauge: Arc<AtomicUsize>,
70    emitter: E,
71}
72
73impl<E: Fn(usize)> Drop for OpenToken<E> {
74    fn drop(&mut self) {
75        gauge_add(&self.gauge, -1, &self.emitter);
76    }
77}
78
79/// If reporting gauges from multiple threads, they can end up in a wrong order
80/// resulting in having wrong value for a prolonged period of time.
81/// This function performs a synchronization procedure that corrects that.
82fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) {
83    // The goal of this function is to properly sequence calls to `emitter` from
84    // multiple threads. It is possible that `emitter` will be called multiple
85    // times -- worst case, `n^2 / 2` times where `n` is the number of parallel
86    // peers -- but this is acceptable.
87    //
88    // The implementation here is a spin lock on the `gauge` value with the
89    // critical section being solely for updating the `gauge` value by `add` and
90    // calling `emitter`. If we suffer priority inversion at higher peer counts
91    // we might consider the use of a mutex, which will participate in the OS's
92    // scheduler. See [this
93    // post](https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html)
94    // for details if you're working on something like that and need background.
95    //
96    // The order of calls to `emitter` are not guaranteed but it is guaranteed
97    // that the most recent holder of the lock will be the most recent caller of
98    // `emitter`.
99    let mut value = gauge.load(Ordering::Acquire);
100    loop {
101        let new_value = (value as isize + add) as usize;
102        emitter(new_value);
103        // Try to update gauge to new value and releasing writes to gauge metric
104        // in the process.  Otherwise acquire new writes to gauge metric.
105        //
106        // When `compare_exchange_weak` returns Ok our `new_value` is now the
107        // current value in memory across all CPUs. When the return is Err we
108        // retry with the now current value.
109        match gauge.compare_exchange_weak(value, new_value, Ordering::AcqRel, Ordering::Acquire) {
110            Ok(_) => break,
111            Err(x) => {
112                hint::spin_loop();
113                value = x;
114            }
115        }
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use std::{mem::drop, thread};
122
123    use super::*;
124
125    /// If this fails at any run, then the algorithm in `gauge_add` is faulty.
126    #[test]
127    fn eventually_consistent() {
128        let n = 8;
129        let m = 1000;
130        let gauge = OpenGauge::new();
131        let value = Arc::new(AtomicUsize::new(0));
132
133        let handles = (0..n)
134            .map(|_| {
135                let gauge = gauge.clone();
136                let value = Arc::clone(&value);
137                thread::spawn(move || {
138                    let mut work = 0;
139                    for _ in 0..m {
140                        let token = gauge
141                            .clone()
142                            .open(|count| value.store(count, Ordering::Release));
143                        // Do some work
144                        for i in 0..100 {
145                            work += i;
146                        }
147                        drop(token);
148                    }
149                    work
150                })
151            })
152            .collect::<Vec<_>>();
153
154        for handle in handles {
155            handle.join().unwrap();
156        }
157
158        assert_eq!(0, value.load(Ordering::Acquire));
159    }
160}