vector/internal_events/
kafka.rs1#![allow(dead_code)] use metrics::{counter, gauge};
4use vector_lib::{
5 internal_event::{InternalEvent, error_stage, error_type},
6 json_size::JsonSize,
7};
8use vrl::path::OwnedTargetPath;
9
10#[derive(Debug)]
11pub struct KafkaBytesReceived<'a> {
12 pub byte_size: usize,
13 pub protocol: &'static str,
14 pub topic: &'a str,
15 pub partition: i32,
16}
17
18impl InternalEvent for KafkaBytesReceived<'_> {
19 fn emit(self) {
20 trace!(
21 message = "Bytes received.",
22 byte_size = %self.byte_size,
23 protocol = %self.protocol,
24 topic = self.topic,
25 partition = %self.partition,
26 );
27 counter!(
28 "component_received_bytes_total",
29 "protocol" => self.protocol,
30 "topic" => self.topic.to_string(),
31 "partition" => self.partition.to_string(),
32 )
33 .increment(self.byte_size as u64);
34 }
35}
36
37#[derive(Debug)]
38pub struct KafkaEventsReceived<'a> {
39 pub byte_size: JsonSize,
40 pub count: usize,
41 pub topic: &'a str,
42 pub partition: i32,
43}
44
45impl InternalEvent for KafkaEventsReceived<'_> {
46 fn emit(self) {
47 trace!(
48 message = "Events received.",
49 count = %self.count,
50 byte_size = %self.byte_size,
51 topic = self.topic,
52 partition = %self.partition,
53 );
54 counter!(
55 "component_received_events_total",
56 "topic" => self.topic.to_string(),
57 "partition" => self.partition.to_string(),
58 )
59 .increment(self.count as u64);
60 counter!(
61 "component_received_event_bytes_total",
62 "topic" => self.topic.to_string(),
63 "partition" => self.partition.to_string(),
64 )
65 .increment(self.byte_size.get() as u64);
66 }
67}
68
69#[derive(Debug)]
70pub struct KafkaOffsetUpdateError {
71 pub error: rdkafka::error::KafkaError,
72}
73
74impl InternalEvent for KafkaOffsetUpdateError {
75 fn emit(self) {
76 error!(
77 message = "Unable to update consumer offset.",
78 error = %self.error,
79 error_code = "kafka_offset_update",
80 error_type = error_type::READER_FAILED,
81 stage = error_stage::SENDING,
82 internal_log_rate_limit = true,
83 );
84 counter!(
85 "component_errors_total",
86 "error_code" => "kafka_offset_update",
87 "error_type" => error_type::READER_FAILED,
88 "stage" => error_stage::SENDING,
89 )
90 .increment(1);
91 }
92}
93
94#[derive(Debug)]
95pub struct KafkaReadError {
96 pub error: rdkafka::error::KafkaError,
97}
98
99impl InternalEvent for KafkaReadError {
100 fn emit(self) {
101 error!(
102 message = "Failed to read message.",
103 error = %self.error,
104 error_code = "reading_message",
105 error_type = error_type::READER_FAILED,
106 stage = error_stage::RECEIVING,
107 internal_log_rate_limit = true,
108 );
109 counter!(
110 "component_errors_total",
111 "error_code" => "reading_message",
112 "error_type" => error_type::READER_FAILED,
113 "stage" => error_stage::RECEIVING,
114 )
115 .increment(1);
116 }
117}
118
119#[derive(Debug)]
120pub struct KafkaStatisticsReceived<'a> {
121 pub statistics: &'a rdkafka::Statistics,
122 pub expose_lag_metrics: bool,
123}
124
125impl InternalEvent for KafkaStatisticsReceived<'_> {
126 fn emit(self) {
127 gauge!("kafka_queue_messages").set(self.statistics.msg_cnt as f64);
128 gauge!("kafka_queue_messages_bytes").set(self.statistics.msg_size as f64);
129 counter!("kafka_requests_total").absolute(self.statistics.tx as u64);
130 counter!("kafka_requests_bytes_total").absolute(self.statistics.tx_bytes as u64);
131 counter!("kafka_responses_total").absolute(self.statistics.rx as u64);
132 counter!("kafka_responses_bytes_total").absolute(self.statistics.rx_bytes as u64);
133 counter!("kafka_produced_messages_total").absolute(self.statistics.txmsgs as u64);
134 counter!("kafka_produced_messages_bytes_total")
135 .absolute(self.statistics.txmsg_bytes as u64);
136 counter!("kafka_consumed_messages_total").absolute(self.statistics.rxmsgs as u64);
137 counter!("kafka_consumed_messages_bytes_total")
138 .absolute(self.statistics.rxmsg_bytes as u64);
139
140 if self.expose_lag_metrics {
141 for (topic_id, topic) in &self.statistics.topics {
142 for (partition_id, partition) in &topic.partitions {
143 gauge!(
144 "kafka_consumer_lag",
145 "topic_id" => topic_id.clone(),
146 "partition_id" => partition_id.to_string(),
147 )
148 .set(partition.consumer_lag as f64);
149 }
150 }
151 }
152 }
153}
154
155pub struct KafkaHeaderExtractionError<'a> {
156 pub header_field: &'a OwnedTargetPath,
157}
158
159impl InternalEvent for KafkaHeaderExtractionError<'_> {
160 fn emit(self) {
161 error!(
162 message = "Failed to extract header. Value should be a map of String -> Bytes.",
163 error_code = "extracting_header",
164 error_type = error_type::PARSER_FAILED,
165 stage = error_stage::RECEIVING,
166 header_field = self.header_field.to_string(),
167 internal_log_rate_limit = true,
168 );
169 counter!(
170 "component_errors_total",
171 "error_code" => "extracting_header",
172 "error_type" => error_type::PARSER_FAILED,
173 "stage" => error_stage::RECEIVING,
174 )
175 .increment(1);
176 }
177}