vector/internal_events/
kafka.rs1#![allow(dead_code)] use metrics::{counter, gauge};
4use vector_lib::{
5 internal_event::{InternalEvent, error_stage, error_type},
6 json_size::JsonSize,
7};
8use vrl::path::OwnedTargetPath;
9
10#[derive(Debug)]
11pub struct KafkaBytesReceived<'a> {
12 pub byte_size: usize,
13 pub protocol: &'static str,
14 pub topic: &'a str,
15 pub partition: i32,
16}
17
18impl InternalEvent for KafkaBytesReceived<'_> {
19 fn emit(self) {
20 trace!(
21 message = "Bytes received.",
22 byte_size = %self.byte_size,
23 protocol = %self.protocol,
24 topic = self.topic,
25 partition = %self.partition,
26 );
27 counter!(
28 "component_received_bytes_total",
29 "protocol" => self.protocol,
30 "topic" => self.topic.to_string(),
31 "partition" => self.partition.to_string(),
32 )
33 .increment(self.byte_size as u64);
34 }
35}
36
37#[derive(Debug)]
38pub struct KafkaEventsReceived<'a> {
39 pub byte_size: JsonSize,
40 pub count: usize,
41 pub topic: &'a str,
42 pub partition: i32,
43}
44
45impl InternalEvent for KafkaEventsReceived<'_> {
46 fn emit(self) {
47 trace!(
48 message = "Events received.",
49 count = %self.count,
50 byte_size = %self.byte_size,
51 topic = self.topic,
52 partition = %self.partition,
53 );
54 counter!(
55 "component_received_events_total",
56 "topic" => self.topic.to_string(),
57 "partition" => self.partition.to_string(),
58 )
59 .increment(self.count as u64);
60 counter!(
61 "component_received_event_bytes_total",
62 "topic" => self.topic.to_string(),
63 "partition" => self.partition.to_string(),
64 )
65 .increment(self.byte_size.get() as u64);
66 }
67}
68
69#[derive(Debug)]
70pub struct KafkaOffsetUpdateError {
71 pub error: rdkafka::error::KafkaError,
72}
73
74impl InternalEvent for KafkaOffsetUpdateError {
75 fn emit(self) {
76 error!(
77 message = "Unable to update consumer offset.",
78 error = %self.error,
79 error_code = "kafka_offset_update",
80 error_type = error_type::READER_FAILED,
81 stage = error_stage::SENDING,
82 );
83 counter!(
84 "component_errors_total",
85 "error_code" => "kafka_offset_update",
86 "error_type" => error_type::READER_FAILED,
87 "stage" => error_stage::SENDING,
88 )
89 .increment(1);
90 }
91}
92
93#[derive(Debug)]
94pub struct KafkaReadError {
95 pub error: rdkafka::error::KafkaError,
96}
97
98impl InternalEvent for KafkaReadError {
99 fn emit(self) {
100 error!(
101 message = "Failed to read message.",
102 error = %self.error,
103 error_code = "reading_message",
104 error_type = error_type::READER_FAILED,
105 stage = error_stage::RECEIVING,
106 );
107 counter!(
108 "component_errors_total",
109 "error_code" => "reading_message",
110 "error_type" => error_type::READER_FAILED,
111 "stage" => error_stage::RECEIVING,
112 )
113 .increment(1);
114 }
115}
116
117#[derive(Debug)]
118pub struct KafkaStatisticsReceived<'a> {
119 pub statistics: &'a rdkafka::Statistics,
120 pub expose_lag_metrics: bool,
121}
122
123impl InternalEvent for KafkaStatisticsReceived<'_> {
124 fn emit(self) {
125 gauge!("kafka_queue_messages").set(self.statistics.msg_cnt as f64);
126 gauge!("kafka_queue_messages_bytes").set(self.statistics.msg_size as f64);
127 counter!("kafka_requests_total").absolute(self.statistics.tx as u64);
128 counter!("kafka_requests_bytes_total").absolute(self.statistics.tx_bytes as u64);
129 counter!("kafka_responses_total").absolute(self.statistics.rx as u64);
130 counter!("kafka_responses_bytes_total").absolute(self.statistics.rx_bytes as u64);
131 counter!("kafka_produced_messages_total").absolute(self.statistics.txmsgs as u64);
132 counter!("kafka_produced_messages_bytes_total")
133 .absolute(self.statistics.txmsg_bytes as u64);
134 counter!("kafka_consumed_messages_total").absolute(self.statistics.rxmsgs as u64);
135 counter!("kafka_consumed_messages_bytes_total")
136 .absolute(self.statistics.rxmsg_bytes as u64);
137
138 if self.expose_lag_metrics {
139 for (topic_id, topic) in &self.statistics.topics {
140 for (partition_id, partition) in &topic.partitions {
141 gauge!(
142 "kafka_consumer_lag",
143 "topic_id" => topic_id.clone(),
144 "partition_id" => partition_id.to_string(),
145 )
146 .set(partition.consumer_lag as f64);
147 }
148 }
149 }
150 }
151}
152
153pub struct KafkaHeaderExtractionError<'a> {
154 pub header_field: &'a OwnedTargetPath,
155}
156
157impl InternalEvent for KafkaHeaderExtractionError<'_> {
158 fn emit(self) {
159 error!(
160 message = "Failed to extract header. Value should be a map of String -> Bytes.",
161 error_code = "extracting_header",
162 error_type = error_type::PARSER_FAILED,
163 stage = error_stage::RECEIVING,
164 header_field = self.header_field.to_string(),
165 );
166 counter!(
167 "component_errors_total",
168 "error_code" => "extracting_header",
169 "error_type" => error_type::PARSER_FAILED,
170 "stage" => error_stage::RECEIVING,
171 )
172 .increment(1);
173 }
174}