vector_buffers/
internal_events.rs

1use std::time::Duration;
2
3use metrics::{Histogram, counter, gauge, histogram};
4use vector_common::NamedInternalEvent;
5use vector_common::{
6    internal_event::{InternalEvent, error_type},
7    registered_event,
8};
9
10#[derive(NamedInternalEvent)]
11pub struct BufferCreated {
12    pub buffer_id: String,
13    pub idx: usize,
14    pub max_size_events: usize,
15    pub max_size_bytes: u64,
16}
17
18impl InternalEvent for BufferCreated {
19    #[expect(clippy::cast_precision_loss)]
20    fn emit(self) {
21        if self.max_size_events != 0 {
22            gauge!(
23                "buffer_max_event_size",
24                "buffer_id" => self.buffer_id.clone(),
25                "stage" => self.idx.to_string(),
26            )
27            .set(self.max_size_events as f64);
28        }
29        if self.max_size_bytes != 0 {
30            gauge!(
31                "buffer_max_byte_size",
32                "buffer_id" => self.buffer_id,
33                "stage" => self.idx.to_string(),
34            )
35            .set(self.max_size_bytes as f64);
36        }
37    }
38}
39
40#[derive(NamedInternalEvent)]
41pub struct BufferEventsReceived {
42    pub buffer_id: String,
43    pub idx: usize,
44    pub count: u64,
45    pub byte_size: u64,
46    pub total_count: u64,
47    pub total_byte_size: u64,
48}
49
50impl InternalEvent for BufferEventsReceived {
51    #[expect(clippy::cast_precision_loss)]
52    fn emit(self) {
53        counter!(
54            "buffer_received_events_total",
55            "buffer_id" => self.buffer_id.clone(),
56            "stage" => self.idx.to_string()
57        )
58        .increment(self.count);
59
60        counter!(
61            "buffer_received_bytes_total",
62            "buffer_id" => self.buffer_id.clone(),
63            "stage" => self.idx.to_string()
64        )
65        .increment(self.byte_size);
66        gauge!(
67            "buffer_events",
68            "buffer_id" => self.buffer_id.clone(),
69            "stage" => self.idx.to_string()
70        )
71        .set(self.total_count as f64);
72        gauge!(
73            "buffer_byte_size",
74            "buffer_id" => self.buffer_id,
75            "stage" => self.idx.to_string()
76        )
77        .set(self.total_byte_size as f64);
78    }
79}
80
81#[derive(NamedInternalEvent)]
82pub struct BufferEventsSent {
83    pub buffer_id: String,
84    pub idx: usize,
85    pub count: u64,
86    pub byte_size: u64,
87    pub total_count: u64,
88    pub total_byte_size: u64,
89}
90
91impl InternalEvent for BufferEventsSent {
92    #[expect(clippy::cast_precision_loss)]
93    fn emit(self) {
94        counter!(
95            "buffer_sent_events_total",
96            "buffer_id" => self.buffer_id.clone(),
97            "stage" => self.idx.to_string()
98        )
99        .increment(self.count);
100        counter!(
101            "buffer_sent_bytes_total",
102            "buffer_id" => self.buffer_id.clone(),
103            "stage" => self.idx.to_string()
104        )
105        .increment(self.byte_size);
106        gauge!(
107            "buffer_events",
108            "buffer_id" => self.buffer_id.clone(),
109            "stage" => self.idx.to_string()
110        )
111        .set(self.total_count as f64);
112        gauge!(
113            "buffer_byte_size",
114            "buffer_id" => self.buffer_id,
115            "stage" => self.idx.to_string()
116        )
117        .set(self.total_byte_size as f64);
118    }
119}
120
121#[derive(NamedInternalEvent)]
122pub struct BufferEventsDropped {
123    pub buffer_id: String,
124    pub idx: usize,
125    pub count: u64,
126    pub byte_size: u64,
127    pub total_count: u64,
128    pub total_byte_size: u64,
129    pub intentional: bool,
130    pub reason: &'static str,
131}
132
133impl InternalEvent for BufferEventsDropped {
134    #[expect(clippy::cast_precision_loss)]
135    fn emit(self) {
136        let intentional_str = if self.intentional { "true" } else { "false" };
137        if self.intentional {
138            debug!(
139                message = "Events dropped.",
140                count = %self.count,
141                byte_size = %self.byte_size,
142                intentional = %intentional_str,
143                reason = %self.reason,
144                buffer_id = %self.buffer_id,
145                stage = %self.idx,
146            );
147        } else {
148            error!(
149                message = "Events dropped.",
150                count = %self.count,
151                byte_size = %self.byte_size,
152                intentional = %intentional_str,
153                reason = %self.reason,
154                buffer_id = %self.buffer_id,
155                stage = %self.idx,
156            );
157        }
158
159        counter!(
160            "buffer_discarded_events_total",
161            "buffer_id" => self.buffer_id.clone(),
162            "stage" => self.idx.to_string(),
163            "intentional" => intentional_str,
164        )
165        .increment(self.count);
166        counter!(
167            "buffer_discarded_bytes_total",
168            "buffer_id" => self.buffer_id.clone(),
169            "stage" => self.idx.to_string(),
170            "intentional" => intentional_str,
171        )
172        .increment(self.byte_size);
173        gauge!(
174            "buffer_events",
175            "buffer_id" => self.buffer_id.clone(),
176            "stage" => self.idx.to_string()
177        )
178        .set(self.total_count as f64);
179        gauge!(
180            "buffer_byte_size",
181            "buffer_id" => self.buffer_id,
182            "stage" => self.idx.to_string()
183        )
184        .set(self.total_byte_size as f64);
185    }
186}
187
188#[derive(NamedInternalEvent)]
189pub struct BufferReadError {
190    pub error_code: &'static str,
191    pub error: String,
192}
193
194impl InternalEvent for BufferReadError {
195    fn emit(self) {
196        error!(
197            message = "Error encountered during buffer read.",
198            error = %self.error,
199            error_code = self.error_code,
200            error_type = error_type::READER_FAILED,
201            stage = "processing",
202        );
203        counter!(
204            "buffer_errors_total", "error_code" => self.error_code,
205            "error_type" => "reader_failed",
206            "stage" => "processing",
207        )
208        .increment(1);
209    }
210}
211
212registered_event! {
213    BufferSendDuration {
214        stage: usize,
215    } => {
216        send_duration: Histogram = histogram!("buffer_send_duration_seconds", "stage" => self.stage.to_string()),
217    }
218
219    fn emit(&self, duration: Duration) {
220        self.send_duration.record(duration);
221    }
222}