1use std::time::Duration;
2
3use metrics::{Histogram, counter, gauge, histogram};
4use vector_common::NamedInternalEvent;
5use vector_common::{
6 internal_event::{InternalEvent, error_type},
7 registered_event,
8};
9
10#[derive(NamedInternalEvent)]
11pub struct BufferCreated {
12 pub buffer_id: String,
13 pub idx: usize,
14 pub max_size_events: usize,
15 pub max_size_bytes: u64,
16}
17
18impl InternalEvent for BufferCreated {
19 #[expect(clippy::cast_precision_loss)]
20 fn emit(self) {
21 if self.max_size_events != 0 {
22 gauge!(
23 "buffer_max_event_size",
24 "buffer_id" => self.buffer_id.clone(),
25 "stage" => self.idx.to_string(),
26 )
27 .set(self.max_size_events as f64);
28 }
29 if self.max_size_bytes != 0 {
30 gauge!(
31 "buffer_max_byte_size",
32 "buffer_id" => self.buffer_id,
33 "stage" => self.idx.to_string(),
34 )
35 .set(self.max_size_bytes as f64);
36 }
37 }
38}
39
40#[derive(NamedInternalEvent)]
41pub struct BufferEventsReceived {
42 pub buffer_id: String,
43 pub idx: usize,
44 pub count: u64,
45 pub byte_size: u64,
46 pub total_count: u64,
47 pub total_byte_size: u64,
48}
49
50impl InternalEvent for BufferEventsReceived {
51 #[expect(clippy::cast_precision_loss)]
52 fn emit(self) {
53 counter!(
54 "buffer_received_events_total",
55 "buffer_id" => self.buffer_id.clone(),
56 "stage" => self.idx.to_string()
57 )
58 .increment(self.count);
59
60 counter!(
61 "buffer_received_bytes_total",
62 "buffer_id" => self.buffer_id.clone(),
63 "stage" => self.idx.to_string()
64 )
65 .increment(self.byte_size);
66 gauge!(
67 "buffer_events",
68 "buffer_id" => self.buffer_id.clone(),
69 "stage" => self.idx.to_string()
70 )
71 .set(self.total_count as f64);
72 gauge!(
73 "buffer_byte_size",
74 "buffer_id" => self.buffer_id,
75 "stage" => self.idx.to_string()
76 )
77 .set(self.total_byte_size as f64);
78 }
79}
80
81#[derive(NamedInternalEvent)]
82pub struct BufferEventsSent {
83 pub buffer_id: String,
84 pub idx: usize,
85 pub count: u64,
86 pub byte_size: u64,
87 pub total_count: u64,
88 pub total_byte_size: u64,
89}
90
91impl InternalEvent for BufferEventsSent {
92 #[expect(clippy::cast_precision_loss)]
93 fn emit(self) {
94 counter!(
95 "buffer_sent_events_total",
96 "buffer_id" => self.buffer_id.clone(),
97 "stage" => self.idx.to_string()
98 )
99 .increment(self.count);
100 counter!(
101 "buffer_sent_bytes_total",
102 "buffer_id" => self.buffer_id.clone(),
103 "stage" => self.idx.to_string()
104 )
105 .increment(self.byte_size);
106 gauge!(
107 "buffer_events",
108 "buffer_id" => self.buffer_id.clone(),
109 "stage" => self.idx.to_string()
110 )
111 .set(self.total_count as f64);
112 gauge!(
113 "buffer_byte_size",
114 "buffer_id" => self.buffer_id,
115 "stage" => self.idx.to_string()
116 )
117 .set(self.total_byte_size as f64);
118 }
119}
120
121#[derive(NamedInternalEvent)]
122pub struct BufferEventsDropped {
123 pub buffer_id: String,
124 pub idx: usize,
125 pub count: u64,
126 pub byte_size: u64,
127 pub total_count: u64,
128 pub total_byte_size: u64,
129 pub intentional: bool,
130 pub reason: &'static str,
131}
132
133impl InternalEvent for BufferEventsDropped {
134 #[expect(clippy::cast_precision_loss)]
135 fn emit(self) {
136 let intentional_str = if self.intentional { "true" } else { "false" };
137 if self.intentional {
138 debug!(
139 message = "Events dropped.",
140 count = %self.count,
141 byte_size = %self.byte_size,
142 intentional = %intentional_str,
143 reason = %self.reason,
144 buffer_id = %self.buffer_id,
145 stage = %self.idx,
146 );
147 } else {
148 error!(
149 message = "Events dropped.",
150 count = %self.count,
151 byte_size = %self.byte_size,
152 intentional = %intentional_str,
153 reason = %self.reason,
154 buffer_id = %self.buffer_id,
155 stage = %self.idx,
156 );
157 }
158
159 counter!(
160 "buffer_discarded_events_total",
161 "buffer_id" => self.buffer_id.clone(),
162 "stage" => self.idx.to_string(),
163 "intentional" => intentional_str,
164 )
165 .increment(self.count);
166 counter!(
167 "buffer_discarded_bytes_total",
168 "buffer_id" => self.buffer_id.clone(),
169 "stage" => self.idx.to_string(),
170 "intentional" => intentional_str,
171 )
172 .increment(self.byte_size);
173 gauge!(
174 "buffer_events",
175 "buffer_id" => self.buffer_id.clone(),
176 "stage" => self.idx.to_string()
177 )
178 .set(self.total_count as f64);
179 gauge!(
180 "buffer_byte_size",
181 "buffer_id" => self.buffer_id,
182 "stage" => self.idx.to_string()
183 )
184 .set(self.total_byte_size as f64);
185 }
186}
187
188#[derive(NamedInternalEvent)]
189pub struct BufferReadError {
190 pub error_code: &'static str,
191 pub error: String,
192}
193
194impl InternalEvent for BufferReadError {
195 fn emit(self) {
196 error!(
197 message = "Error encountered during buffer read.",
198 error = %self.error,
199 error_code = self.error_code,
200 error_type = error_type::READER_FAILED,
201 stage = "processing",
202 );
203 counter!(
204 "buffer_errors_total", "error_code" => self.error_code,
205 "error_type" => "reader_failed",
206 "stage" => "processing",
207 )
208 .increment(1);
209 }
210}
211
212registered_event! {
213 BufferSendDuration {
214 stage: usize,
215 } => {
216 send_duration: Histogram = histogram!("buffer_send_duration_seconds", "stage" => self.stage.to_string()),
217 }
218
219 fn emit(&self, duration: Duration) {
220 self.send_duration.record(duration);
221 }
222}