vector_buffers/
internal_events.rs

1use std::time::Duration;
2
3use metrics::{Histogram, counter, gauge, histogram};
4use vector_common::{
5    internal_event::{InternalEvent, error_type},
6    registered_event,
7};
8
9pub struct BufferCreated {
10    pub buffer_id: String,
11    pub idx: usize,
12    pub max_size_events: usize,
13    pub max_size_bytes: u64,
14}
15
16impl InternalEvent for BufferCreated {
17    #[expect(clippy::cast_precision_loss)]
18    fn emit(self) {
19        if self.max_size_events != 0 {
20            gauge!(
21                "buffer_max_event_size",
22                "buffer_id" => self.buffer_id.clone(),
23                "stage" => self.idx.to_string(),
24            )
25            .set(self.max_size_events as f64);
26        }
27        if self.max_size_bytes != 0 {
28            gauge!(
29                "buffer_max_byte_size",
30                "buffer_id" => self.buffer_id,
31                "stage" => self.idx.to_string(),
32            )
33            .set(self.max_size_bytes as f64);
34        }
35    }
36}
37
38pub struct BufferEventsReceived {
39    pub buffer_id: String,
40    pub idx: usize,
41    pub count: u64,
42    pub byte_size: u64,
43    pub total_count: u64,
44    pub total_byte_size: u64,
45}
46
47impl InternalEvent for BufferEventsReceived {
48    #[expect(clippy::cast_precision_loss)]
49    fn emit(self) {
50        counter!(
51            "buffer_received_events_total",
52            "buffer_id" => self.buffer_id.clone(),
53            "stage" => self.idx.to_string()
54        )
55        .increment(self.count);
56
57        counter!(
58            "buffer_received_bytes_total",
59            "buffer_id" => self.buffer_id.clone(),
60            "stage" => self.idx.to_string()
61        )
62        .increment(self.byte_size);
63        gauge!(
64            "buffer_events",
65            "buffer_id" => self.buffer_id.clone(),
66            "stage" => self.idx.to_string()
67        )
68        .set(self.total_count as f64);
69        gauge!(
70            "buffer_byte_size",
71            "buffer_id" => self.buffer_id,
72            "stage" => self.idx.to_string()
73        )
74        .set(self.total_byte_size as f64);
75    }
76}
77
78pub struct BufferEventsSent {
79    pub buffer_id: String,
80    pub idx: usize,
81    pub count: u64,
82    pub byte_size: u64,
83    pub total_count: u64,
84    pub total_byte_size: u64,
85}
86
87impl InternalEvent for BufferEventsSent {
88    #[expect(clippy::cast_precision_loss)]
89    fn emit(self) {
90        counter!(
91            "buffer_sent_events_total",
92            "buffer_id" => self.buffer_id.clone(),
93            "stage" => self.idx.to_string()
94        )
95        .increment(self.count);
96        counter!(
97            "buffer_sent_bytes_total",
98            "buffer_id" => self.buffer_id.clone(),
99            "stage" => self.idx.to_string()
100        )
101        .increment(self.byte_size);
102        gauge!(
103            "buffer_events",
104            "buffer_id" => self.buffer_id.clone(),
105            "stage" => self.idx.to_string()
106        )
107        .set(self.total_count as f64);
108        gauge!(
109            "buffer_byte_size",
110            "buffer_id" => self.buffer_id,
111            "stage" => self.idx.to_string()
112        )
113        .set(self.total_byte_size as f64);
114    }
115}
116
117pub struct BufferEventsDropped {
118    pub buffer_id: String,
119    pub idx: usize,
120    pub count: u64,
121    pub byte_size: u64,
122    pub total_count: u64,
123    pub total_byte_size: u64,
124    pub intentional: bool,
125    pub reason: &'static str,
126}
127
128impl InternalEvent for BufferEventsDropped {
129    #[expect(clippy::cast_precision_loss)]
130    fn emit(self) {
131        let intentional_str = if self.intentional { "true" } else { "false" };
132        if self.intentional {
133            debug!(
134                message = "Events dropped.",
135                count = %self.count,
136                byte_size = %self.byte_size,
137                intentional = %intentional_str,
138                reason = %self.reason,
139                buffer_id = %self.buffer_id,
140                stage = %self.idx,
141            );
142        } else {
143            error!(
144                message = "Events dropped.",
145                count = %self.count,
146                byte_size = %self.byte_size,
147                intentional = %intentional_str,
148                reason = %self.reason,
149                buffer_id = %self.buffer_id,
150                stage = %self.idx,
151            );
152        }
153
154        counter!(
155            "buffer_discarded_events_total",
156            "buffer_id" => self.buffer_id.clone(),
157            "stage" => self.idx.to_string(),
158            "intentional" => intentional_str,
159        )
160        .increment(self.count);
161        counter!(
162            "buffer_discarded_bytes_total",
163            "buffer_id" => self.buffer_id.clone(),
164            "stage" => self.idx.to_string(),
165            "intentional" => intentional_str,
166        )
167        .increment(self.byte_size);
168        gauge!(
169            "buffer_events",
170            "buffer_id" => self.buffer_id.clone(),
171            "stage" => self.idx.to_string()
172        )
173        .set(self.total_count as f64);
174        gauge!(
175            "buffer_byte_size",
176            "buffer_id" => self.buffer_id,
177            "stage" => self.idx.to_string()
178        )
179        .set(self.total_byte_size as f64);
180    }
181}
182
183pub struct BufferReadError {
184    pub error_code: &'static str,
185    pub error: String,
186}
187
188impl InternalEvent for BufferReadError {
189    fn emit(self) {
190        error!(
191            message = "Error encountered during buffer read.",
192            error = %self.error,
193            error_code = self.error_code,
194            error_type = error_type::READER_FAILED,
195            stage = "processing",
196            internal_log_rate_limit = true,
197        );
198        counter!(
199            "buffer_errors_total", "error_code" => self.error_code,
200            "error_type" => "reader_failed",
201            "stage" => "processing",
202        )
203        .increment(1);
204    }
205}
206
207registered_event! {
208    BufferSendDuration {
209        stage: usize,
210    } => {
211        send_duration: Histogram = histogram!("buffer_send_duration_seconds", "stage" => self.stage.to_string()),
212    }
213
214    fn emit(&self, duration: Duration) {
215        self.send_duration.record(duration);
216    }
217}