vector/internal_events/open.rs
1use std::{
2 hint,
3 sync::{
4 atomic::{AtomicUsize, Ordering},
5 Arc,
6 },
7};
8
9use metrics::gauge;
10use vector_lib::internal_event::InternalEvent;
11
12#[derive(Debug)]
13pub struct ConnectionOpen {
14 pub count: usize,
15}
16
17impl InternalEvent for ConnectionOpen {
18 fn emit(self) {
19 gauge!("open_connections").set(self.count as f64);
20 }
21}
22
23#[derive(Debug)]
24pub struct EndpointsActive {
25 pub count: usize,
26}
27
28impl InternalEvent for EndpointsActive {
29 fn emit(self) {
30 gauge!("active_endpoints").set(self.count as f64);
31 }
32}
33
34#[derive(Clone)]
35pub struct OpenGauge {
36 gauge: Arc<AtomicUsize>,
37}
38
39impl OpenGauge {
40 pub fn new() -> Self {
41 OpenGauge {
42 gauge: Arc::default(),
43 }
44 }
45
46 /// Increments and emits value once created.
47 /// Decrements and emits value once dropped.
48 pub fn open<E: Fn(usize)>(self, emitter: E) -> OpenToken<E> {
49 gauge_add(&self.gauge, 1, &emitter);
50 OpenToken {
51 gauge: self.gauge,
52 emitter,
53 }
54 }
55
56 #[cfg(all(feature = "sources-utils-net-unix", unix))]
57 pub fn any_open(&self) -> bool {
58 self.gauge.load(Ordering::Acquire) != 0
59 }
60}
61
62impl Default for OpenGauge {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68pub struct OpenToken<E: Fn(usize)> {
69 gauge: Arc<AtomicUsize>,
70 emitter: E,
71}
72
73impl<E: Fn(usize)> Drop for OpenToken<E> {
74 fn drop(&mut self) {
75 gauge_add(&self.gauge, -1, &self.emitter);
76 }
77}
78
79/// If reporting gauges from multiple threads, they can end up in a wrong order
80/// resulting in having wrong value for a prolonged period of time.
81/// This function performs a synchronization procedure that corrects that.
82fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) {
83 // The goal of this function is to properly sequence calls to `emitter` from
84 // multiple threads. It is possible that `emitter` will be called multiple
85 // times -- worst case, `n^2 / 2` times where `n` is the number of parallel
86 // peers -- but this is acceptable.
87 //
88 // The implementation here is a spin lock on the `gauge` value with the
89 // critical section being solely for updating the `gauge` value by `add` and
90 // calling `emitter`. If we suffer priority inversion at higher peer counts
91 // we might consider the use of a mutex, which will participate in the OS's
92 // scheduler. See [this
93 // post](https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html)
94 // for details if you're working on something like that and need background.
95 //
96 // The order of calls to `emitter` are not guaranteed but it is guaranteed
97 // that the most recent holder of the lock will be the most recent caller of
98 // `emitter`.
99 let mut value = gauge.load(Ordering::Acquire);
100 loop {
101 let new_value = (value as isize + add) as usize;
102 emitter(new_value);
103 // Try to update gauge to new value and releasing writes to gauge metric
104 // in the process. Otherwise acquire new writes to gauge metric.
105 //
106 // When `compare_exchange_weak` returns Ok our `new_value` is now the
107 // current value in memory across all CPUs. When the return is Err we
108 // retry with the now current value.
109 match gauge.compare_exchange_weak(value, new_value, Ordering::AcqRel, Ordering::Acquire) {
110 Ok(_) => break,
111 Err(x) => {
112 hint::spin_loop();
113 value = x;
114 }
115 }
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use std::{mem::drop, thread};
122
123 use super::*;
124
125 /// If this fails at any run, then the algorithm in `gauge_add` is faulty.
126 #[test]
127 fn eventually_consistent() {
128 let n = 8;
129 let m = 1000;
130 let gauge = OpenGauge::new();
131 let value = Arc::new(AtomicUsize::new(0));
132
133 let handles = (0..n)
134 .map(|_| {
135 let gauge = gauge.clone();
136 let value = Arc::clone(&value);
137 thread::spawn(move || {
138 let mut work = 0;
139 for _ in 0..m {
140 let token = gauge
141 .clone()
142 .open(|count| value.store(count, Ordering::Release));
143 // Do some work
144 for i in 0..100 {
145 work += i;
146 }
147 drop(token);
148 }
149 work
150 })
151 })
152 .collect::<Vec<_>>();
153
154 for handle in handles {
155 handle.join().unwrap();
156 }
157
158 assert_eq!(0, value.load(Ordering::Acquire));
159 }
160}