vector/internal_events/
open.rs1use std::{
2 hint,
3 sync::{
4 Arc,
5 atomic::{AtomicUsize, Ordering},
6 },
7};
8
9use metrics::gauge;
10use vector_lib::NamedInternalEvent;
11use vector_lib::internal_event::InternalEvent;
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct ConnectionOpen {
15 pub count: usize,
16}
17
18impl InternalEvent for ConnectionOpen {
19 fn emit(self) {
20 gauge!("open_connections").set(self.count as f64);
21 }
22}
23
24#[derive(Debug, NamedInternalEvent)]
25pub struct EndpointsActive {
26 pub count: usize,
27}
28
29impl InternalEvent for EndpointsActive {
30 fn emit(self) {
31 gauge!("active_endpoints").set(self.count as f64);
32 }
33}
34
35#[derive(Clone)]
36pub struct OpenGauge {
37 gauge: Arc<AtomicUsize>,
38}
39
40impl OpenGauge {
41 pub fn new() -> Self {
42 OpenGauge {
43 gauge: Arc::default(),
44 }
45 }
46
47 pub fn open<E: Fn(usize)>(self, emitter: E) -> OpenToken<E> {
50 gauge_add(&self.gauge, 1, &emitter);
51 OpenToken {
52 gauge: self.gauge,
53 emitter,
54 }
55 }
56
57 #[cfg(all(feature = "sources-utils-net-unix", unix))]
58 pub fn any_open(&self) -> bool {
59 self.gauge.load(Ordering::Acquire) != 0
60 }
61}
62
63impl Default for OpenGauge {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69pub struct OpenToken<E: Fn(usize)> {
70 gauge: Arc<AtomicUsize>,
71 emitter: E,
72}
73
74impl<E: Fn(usize)> Drop for OpenToken<E> {
75 fn drop(&mut self) {
76 gauge_add(&self.gauge, -1, &self.emitter);
77 }
78}
79
80fn gauge_add(gauge: &AtomicUsize, add: isize, emitter: impl Fn(usize)) {
84 let mut value = gauge.load(Ordering::Acquire);
101 loop {
102 let new_value = (value as isize + add) as usize;
103 emitter(new_value);
104 match gauge.compare_exchange_weak(value, new_value, Ordering::AcqRel, Ordering::Acquire) {
111 Ok(_) => break,
112 Err(x) => {
113 hint::spin_loop();
114 value = x;
115 }
116 }
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use std::{mem::drop, thread};
123
124 use super::*;
125
126 #[test]
128 fn eventually_consistent() {
129 let n = 8;
130 let m = 1000;
131 let gauge = OpenGauge::new();
132 let value = Arc::new(AtomicUsize::new(0));
133
134 let handles = (0..n)
135 .map(|_| {
136 let gauge = gauge.clone();
137 let value = Arc::clone(&value);
138 thread::spawn(move || {
139 let mut work = 0;
140 for _ in 0..m {
141 let token = gauge
142 .clone()
143 .open(|count| value.store(count, Ordering::Release));
144 for i in 0..100 {
146 work += i;
147 }
148 drop(token);
149 }
150 work
151 })
152 })
153 .collect::<Vec<_>>();
154
155 for handle in handles {
156 handle.join().unwrap();
157 }
158
159 assert_eq!(0, value.load(Ordering::Acquire));
160 }
161}