vector_buffers/
internal_events.rs

1use std::time::Duration;
2
3use metrics::{Histogram, counter, gauge, histogram};
4use vector_common::NamedInternalEvent;
5use vector_common::{
6    internal_event::{InternalEvent, error_type},
7    registered_event,
8};
9
10#[derive(NamedInternalEvent)]
11pub struct BufferCreated {
12    pub buffer_id: String,
13    pub idx: usize,
14    pub max_size_events: usize,
15    pub max_size_bytes: u64,
16}
17
18impl InternalEvent for BufferCreated {
19    #[expect(clippy::cast_precision_loss)]
20    fn emit(self) {
21        let stage = self.idx.to_string();
22        if self.max_size_events != 0 {
23            gauge!(
24                "buffer_max_size_events",
25                "buffer_id" => self.buffer_id.clone(),
26                "stage" => stage.clone(),
27            )
28            .set(self.max_size_events as f64);
29            // DEPRECATED: buffer-bytes-events-metrics
30            gauge!(
31                "buffer_max_event_size",
32                "buffer_id" => self.buffer_id.clone(),
33                "stage" => stage.clone(),
34            )
35            .set(self.max_size_events as f64);
36        }
37        if self.max_size_bytes != 0 {
38            gauge!(
39                "buffer_max_size_bytes",
40                "buffer_id" => self.buffer_id.clone(),
41                "stage" => stage.clone(),
42            )
43            .set(self.max_size_bytes as f64);
44            // DEPRECATED: buffer-bytes-events-metrics
45            gauge!(
46                "buffer_max_byte_size",
47                "buffer_id" => self.buffer_id,
48                "stage" => stage,
49            )
50            .set(self.max_size_bytes as f64);
51        }
52    }
53}
54
55#[derive(NamedInternalEvent)]
56pub struct BufferEventsReceived {
57    pub buffer_id: String,
58    pub idx: usize,
59    pub count: u64,
60    pub byte_size: u64,
61    pub total_count: u64,
62    pub total_byte_size: u64,
63}
64
65impl InternalEvent for BufferEventsReceived {
66    #[expect(clippy::cast_precision_loss)]
67    fn emit(self) {
68        counter!(
69            "buffer_received_events_total",
70            "buffer_id" => self.buffer_id.clone(),
71            "stage" => self.idx.to_string()
72        )
73        .increment(self.count);
74
75        counter!(
76            "buffer_received_bytes_total",
77            "buffer_id" => self.buffer_id.clone(),
78            "stage" => self.idx.to_string()
79        )
80        .increment(self.byte_size);
81        // DEPRECATED: buffer-bytes-events-metrics
82        gauge!(
83            "buffer_events",
84            "buffer_id" => self.buffer_id.clone(),
85            "stage" => self.idx.to_string()
86        )
87        .set(self.total_count as f64);
88        gauge!(
89            "buffer_size_events",
90            "buffer_id" => self.buffer_id.clone(),
91            "stage" => self.idx.to_string()
92        )
93        .set(self.total_count as f64);
94        gauge!(
95            "buffer_size_bytes",
96            "buffer_id" => self.buffer_id.clone(),
97            "stage" => self.idx.to_string()
98        )
99        .set(self.total_byte_size as f64);
100        // DEPRECATED: buffer-bytes-events-metrics
101        gauge!(
102            "buffer_byte_size",
103            "buffer_id" => self.buffer_id,
104            "stage" => self.idx.to_string()
105        )
106        .set(self.total_byte_size as f64);
107    }
108}
109
110#[derive(NamedInternalEvent)]
111pub struct BufferEventsSent {
112    pub buffer_id: String,
113    pub idx: usize,
114    pub count: u64,
115    pub byte_size: u64,
116    pub total_count: u64,
117    pub total_byte_size: u64,
118}
119
120impl InternalEvent for BufferEventsSent {
121    #[expect(clippy::cast_precision_loss)]
122    fn emit(self) {
123        counter!(
124            "buffer_sent_events_total",
125            "buffer_id" => self.buffer_id.clone(),
126            "stage" => self.idx.to_string()
127        )
128        .increment(self.count);
129        counter!(
130            "buffer_sent_bytes_total",
131            "buffer_id" => self.buffer_id.clone(),
132            "stage" => self.idx.to_string()
133        )
134        .increment(self.byte_size);
135        // DEPRECATED: buffer-bytes-events-metrics
136        gauge!(
137            "buffer_events",
138            "buffer_id" => self.buffer_id.clone(),
139            "stage" => self.idx.to_string()
140        )
141        .set(self.total_count as f64);
142        gauge!(
143            "buffer_size_events",
144            "buffer_id" => self.buffer_id.clone(),
145            "stage" => self.idx.to_string()
146        )
147        .set(self.total_count as f64);
148        gauge!(
149            "buffer_size_bytes",
150            "buffer_id" => self.buffer_id.clone(),
151            "stage" => self.idx.to_string()
152        )
153        .set(self.total_byte_size as f64);
154        // DEPRECATED: buffer-bytes-events-metrics
155        gauge!(
156            "buffer_byte_size",
157            "buffer_id" => self.buffer_id,
158            "stage" => self.idx.to_string()
159        )
160        .set(self.total_byte_size as f64);
161    }
162}
163
164#[derive(NamedInternalEvent)]
165pub struct BufferEventsDropped {
166    pub buffer_id: String,
167    pub idx: usize,
168    pub count: u64,
169    pub byte_size: u64,
170    pub total_count: u64,
171    pub total_byte_size: u64,
172    pub intentional: bool,
173    pub reason: &'static str,
174}
175
176impl InternalEvent for BufferEventsDropped {
177    #[expect(clippy::cast_precision_loss)]
178    fn emit(self) {
179        let intentional_str = if self.intentional { "true" } else { "false" };
180        if self.intentional {
181            debug!(
182                message = "Events dropped.",
183                count = %self.count,
184                byte_size = %self.byte_size,
185                intentional = %intentional_str,
186                reason = %self.reason,
187                buffer_id = %self.buffer_id,
188                stage = %self.idx,
189            );
190        } else {
191            error!(
192                message = "Events dropped.",
193                count = %self.count,
194                byte_size = %self.byte_size,
195                intentional = %intentional_str,
196                reason = %self.reason,
197                buffer_id = %self.buffer_id,
198                stage = %self.idx,
199            );
200        }
201
202        counter!(
203            "buffer_discarded_events_total",
204            "buffer_id" => self.buffer_id.clone(),
205            "stage" => self.idx.to_string(),
206            "intentional" => intentional_str,
207        )
208        .increment(self.count);
209        counter!(
210            "buffer_discarded_bytes_total",
211            "buffer_id" => self.buffer_id.clone(),
212            "stage" => self.idx.to_string(),
213            "intentional" => intentional_str,
214        )
215        .increment(self.byte_size);
216        // DEPRECATED: buffer-bytes-events-metrics
217        gauge!(
218            "buffer_events",
219            "buffer_id" => self.buffer_id.clone(),
220            "stage" => self.idx.to_string()
221        )
222        .set(self.total_count as f64);
223        gauge!(
224            "buffer_size_events",
225            "buffer_id" => self.buffer_id.clone(),
226            "stage" => self.idx.to_string()
227        )
228        .set(self.total_count as f64);
229        gauge!(
230            "buffer_size_bytes",
231            "buffer_id" => self.buffer_id.clone(),
232            "stage" => self.idx.to_string()
233        )
234        .set(self.total_byte_size as f64);
235        // DEPRECATED: buffer-bytes-events-metrics
236        gauge!(
237            "buffer_byte_size",
238            "buffer_id" => self.buffer_id,
239            "stage" => self.idx.to_string()
240        )
241        .set(self.total_byte_size as f64);
242    }
243}
244
245#[derive(NamedInternalEvent)]
246pub struct BufferReadError {
247    pub error_code: &'static str,
248    pub error: String,
249}
250
251impl InternalEvent for BufferReadError {
252    fn emit(self) {
253        error!(
254            message = "Error encountered during buffer read.",
255            error = %self.error,
256            error_code = self.error_code,
257            error_type = error_type::READER_FAILED,
258            stage = "processing",
259        );
260        counter!(
261            "buffer_errors_total", "error_code" => self.error_code,
262            "error_type" => "reader_failed",
263            "stage" => "processing",
264        )
265        .increment(1);
266    }
267}
268
269registered_event! {
270    BufferSendDuration {
271        stage: usize,
272    } => {
273        send_duration: Histogram = histogram!("buffer_send_duration_seconds", "stage" => self.stage.to_string()),
274    }
275
276    fn emit(&self, duration: Duration) {
277        self.send_duration.record(duration);
278    }
279}