vector/internal_events/
kafka.rs1#![allow(dead_code)] use metrics::{counter, gauge};
4use vector_lib::{
5 NamedInternalEvent,
6 internal_event::{InternalEvent, error_stage, error_type},
7 json_size::JsonSize,
8};
9use vrl::path::OwnedTargetPath;
10
11#[derive(Debug, NamedInternalEvent)]
12pub struct KafkaBytesReceived<'a> {
13 pub byte_size: usize,
14 pub protocol: &'static str,
15 pub topic: &'a str,
16 pub partition: i32,
17}
18
19impl InternalEvent for KafkaBytesReceived<'_> {
20 fn emit(self) {
21 trace!(
22 message = "Bytes received.",
23 byte_size = %self.byte_size,
24 protocol = %self.protocol,
25 topic = self.topic,
26 partition = %self.partition,
27 );
28 counter!(
29 "component_received_bytes_total",
30 "protocol" => self.protocol,
31 "topic" => self.topic.to_string(),
32 "partition" => self.partition.to_string(),
33 )
34 .increment(self.byte_size as u64);
35 }
36}
37
38#[derive(Debug, NamedInternalEvent)]
39pub struct KafkaEventsReceived<'a> {
40 pub byte_size: JsonSize,
41 pub count: usize,
42 pub topic: &'a str,
43 pub partition: i32,
44}
45
46impl InternalEvent for KafkaEventsReceived<'_> {
47 fn emit(self) {
48 trace!(
49 message = "Events received.",
50 count = %self.count,
51 byte_size = %self.byte_size,
52 topic = self.topic,
53 partition = %self.partition,
54 );
55 counter!(
56 "component_received_events_total",
57 "topic" => self.topic.to_string(),
58 "partition" => self.partition.to_string(),
59 )
60 .increment(self.count as u64);
61 counter!(
62 "component_received_event_bytes_total",
63 "topic" => self.topic.to_string(),
64 "partition" => self.partition.to_string(),
65 )
66 .increment(self.byte_size.get() as u64);
67 }
68}
69
70#[derive(Debug, NamedInternalEvent)]
71pub struct KafkaOffsetUpdateError {
72 pub error: rdkafka::error::KafkaError,
73}
74
75impl InternalEvent for KafkaOffsetUpdateError {
76 fn emit(self) {
77 error!(
78 message = "Unable to update consumer offset.",
79 error = %self.error,
80 error_code = "kafka_offset_update",
81 error_type = error_type::READER_FAILED,
82 stage = error_stage::SENDING,
83 );
84 counter!(
85 "component_errors_total",
86 "error_code" => "kafka_offset_update",
87 "error_type" => error_type::READER_FAILED,
88 "stage" => error_stage::SENDING,
89 )
90 .increment(1);
91 }
92}
93
94#[derive(Debug, NamedInternalEvent)]
95pub struct KafkaReadError {
96 pub error: rdkafka::error::KafkaError,
97}
98
99impl InternalEvent for KafkaReadError {
100 fn emit(self) {
101 error!(
102 message = "Failed to read message.",
103 error = %self.error,
104 error_code = "reading_message",
105 error_type = error_type::READER_FAILED,
106 stage = error_stage::RECEIVING,
107 );
108 counter!(
109 "component_errors_total",
110 "error_code" => "reading_message",
111 "error_type" => error_type::READER_FAILED,
112 "stage" => error_stage::RECEIVING,
113 )
114 .increment(1);
115 }
116}
117
118#[derive(Debug, NamedInternalEvent)]
119pub struct KafkaStatisticsReceived<'a> {
120 pub statistics: &'a rdkafka::Statistics,
121 pub expose_lag_metrics: bool,
122}
123
124impl InternalEvent for KafkaStatisticsReceived<'_> {
125 fn emit(self) {
126 gauge!("kafka_queue_messages").set(self.statistics.msg_cnt as f64);
127 gauge!("kafka_queue_messages_bytes").set(self.statistics.msg_size as f64);
128 counter!("kafka_requests_total").absolute(self.statistics.tx as u64);
129 counter!("kafka_requests_bytes_total").absolute(self.statistics.tx_bytes as u64);
130 counter!("kafka_responses_total").absolute(self.statistics.rx as u64);
131 counter!("kafka_responses_bytes_total").absolute(self.statistics.rx_bytes as u64);
132 counter!("kafka_produced_messages_total").absolute(self.statistics.txmsgs as u64);
133 counter!("kafka_produced_messages_bytes_total")
134 .absolute(self.statistics.txmsg_bytes as u64);
135 counter!("kafka_consumed_messages_total").absolute(self.statistics.rxmsgs as u64);
136 counter!("kafka_consumed_messages_bytes_total")
137 .absolute(self.statistics.rxmsg_bytes as u64);
138
139 if self.expose_lag_metrics {
140 for (topic_id, topic) in &self.statistics.topics {
141 for (partition_id, partition) in &topic.partitions {
142 gauge!(
143 "kafka_consumer_lag",
144 "topic_id" => topic_id.clone(),
145 "partition_id" => partition_id.to_string(),
146 )
147 .set(partition.consumer_lag as f64);
148 }
149 }
150 }
151 }
152}
153
154#[derive(NamedInternalEvent)]
155pub struct KafkaHeaderExtractionError<'a> {
156 pub header_field: &'a OwnedTargetPath,
157}
158
159impl InternalEvent for KafkaHeaderExtractionError<'_> {
160 fn emit(self) {
161 error!(
162 message = "Failed to extract header. Value should be a map of String -> Bytes.",
163 error_code = "extracting_header",
164 error_type = error_type::PARSER_FAILED,
165 stage = error_stage::RECEIVING,
166 header_field = self.header_field.to_string(),
167 );
168 counter!(
169 "component_errors_total",
170 "error_code" => "extracting_header",
171 "error_type" => error_type::PARSER_FAILED,
172 "stage" => error_stage::RECEIVING,
173 )
174 .increment(1);
175 }
176}