vector/internal_events/
open.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
use std::{
    hint,
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    },
};

use metrics::gauge;
use vector_lib::internal_event::InternalEvent;

#[derive(Debug)]
pub struct ConnectionOpen {
    pub count: usize,
}

impl InternalEvent for ConnectionOpen {
    fn emit(self) {
        gauge!("open_connections").set(self.count as f64);
    }
}

#[derive(Debug)]
pub struct EndpointsActive {
    pub count: usize,
}

impl InternalEvent for EndpointsActive {
    fn emit(self) {
        gauge!("active_endpoints").set(self.count as f64);
    }
}

#[derive(Clone)]
pub struct OpenGauge {
    gauge: Arc<AtomicUsize>,
}

impl OpenGauge {
    pub fn new() -> Self {
        OpenGauge {
            gauge: Arc::default(),
        }
    }

    /// Increments and emits value once created.
    /// Decrements and emits value once dropped.
    pub fn open<E: Fn(usize)>(self, emitter: E) -> OpenToken<E> {
        gauge_add(&self.gauge, 1, &emitter);
        OpenToken {
            gauge: self.gauge,
            emitter,
        }
    }

    #[cfg(all(feature = "sources-utils-net-unix", unix))]
    pub fn any_open(&self) -> bool {
        self.gauge.load(Ordering::Acquire) != 0
    }
}

impl Default for OpenGauge {
    fn default() -> Self {
        Self::new()
    }
}

pub struct OpenToken<E: Fn(usize)> {
    gauge: Arc<AtomicUsize>,
    emitter: E,
}

impl<E: Fn(usize)> Drop for OpenToken<E> {
    fn drop(&mut self) {
        gauge_add(&self.gauge, -1, &self.emitter);
    }
}

/// If reporting gauges from multiple threads, they can end up in a wrong order
/// resulting in having wrong value for a prolonged period of time.
/// This function performs a synchronization procedure that corrects that.
fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) {
    // The goal of this function is to properly sequence calls to `emitter` from
    // multiple threads. It is possible that `emitter` will be called multiple
    // times -- worst case, `n^2 / 2` times where `n` is the number of parallel
    // peers -- but this is acceptable.
    //
    // The implementation here is a spin lock on the `gauge` value with the
    // critical section being solely for updating the `gauge` value by `add` and
    // calling `emitter`. If we suffer priority inversion at higher peer counts
    // we might consider the use of a mutex, which will participate in the OS's
    // scheduler. See [this
    // post](https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html)
    // for details if you're working on something like that and need background.
    //
    // The order of calls to `emitter` are not guaranteed but it is guaranteed
    // that the most recent holder of the lock will be the most recent caller of
    // `emitter`.
    let mut value = gauge.load(Ordering::Acquire);
    loop {
        let new_value = (value as isize + add) as usize;
        emitter(new_value);
        // Try to update gauge to new value and releasing writes to gauge metric
        // in the process.  Otherwise acquire new writes to gauge metric.
        //
        // When `compare_exchange_weak` returns Ok our `new_value` is now the
        // current value in memory across all CPUs. When the return is Err we
        // retry with the now current value.
        match gauge.compare_exchange_weak(value, new_value, Ordering::AcqRel, Ordering::Acquire) {
            Ok(_) => break,
            Err(x) => {
                hint::spin_loop();
                value = x;
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use std::{mem::drop, thread};

    use super::*;

    /// If this fails at any run, then the algorithm in `gauge_add` is faulty.
    #[test]
    fn eventually_consistent() {
        let n = 8;
        let m = 1000;
        let gauge = OpenGauge::new();
        let value = Arc::new(AtomicUsize::new(0));

        let handles = (0..n)
            .map(|_| {
                let gauge = gauge.clone();
                let value = Arc::clone(&value);
                thread::spawn(move || {
                    let mut work = 0;
                    for _ in 0..m {
                        let token = gauge
                            .clone()
                            .open(|count| value.store(count, Ordering::Release));
                        // Do some work
                        for i in 0..100 {
                            work += i;
                        }
                        drop(token);
                    }
                    work
                })
            })
            .collect::<Vec<_>>();

        for handle in handles {
            handle.join().unwrap();
        }

        assert_eq!(0, value.load(Ordering::Acquire));
    }
}