vector/internal_events/
kafka.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use metrics::{counter, gauge};
4use vector_lib::{
5    internal_event::{InternalEvent, error_stage, error_type},
6    json_size::JsonSize,
7};
8use vrl::path::OwnedTargetPath;
9
10#[derive(Debug)]
11pub struct KafkaBytesReceived<'a> {
12    pub byte_size: usize,
13    pub protocol: &'static str,
14    pub topic: &'a str,
15    pub partition: i32,
16}
17
18impl InternalEvent for KafkaBytesReceived<'_> {
19    fn emit(self) {
20        trace!(
21            message = "Bytes received.",
22            byte_size = %self.byte_size,
23            protocol = %self.protocol,
24            topic = self.topic,
25            partition = %self.partition,
26        );
27        counter!(
28            "component_received_bytes_total",
29            "protocol" => self.protocol,
30            "topic" => self.topic.to_string(),
31            "partition" => self.partition.to_string(),
32        )
33        .increment(self.byte_size as u64);
34    }
35}
36
37#[derive(Debug)]
38pub struct KafkaEventsReceived<'a> {
39    pub byte_size: JsonSize,
40    pub count: usize,
41    pub topic: &'a str,
42    pub partition: i32,
43}
44
45impl InternalEvent for KafkaEventsReceived<'_> {
46    fn emit(self) {
47        trace!(
48            message = "Events received.",
49            count = %self.count,
50            byte_size = %self.byte_size,
51            topic = self.topic,
52            partition = %self.partition,
53        );
54        counter!(
55            "component_received_events_total",
56            "topic" => self.topic.to_string(),
57            "partition" => self.partition.to_string(),
58        )
59        .increment(self.count as u64);
60        counter!(
61            "component_received_event_bytes_total",
62            "topic" => self.topic.to_string(),
63            "partition" => self.partition.to_string(),
64        )
65        .increment(self.byte_size.get() as u64);
66    }
67}
68
69#[derive(Debug)]
70pub struct KafkaOffsetUpdateError {
71    pub error: rdkafka::error::KafkaError,
72}
73
74impl InternalEvent for KafkaOffsetUpdateError {
75    fn emit(self) {
76        error!(
77            message = "Unable to update consumer offset.",
78            error = %self.error,
79            error_code = "kafka_offset_update",
80            error_type = error_type::READER_FAILED,
81            stage = error_stage::SENDING,
82            internal_log_rate_limit = true,
83        );
84        counter!(
85            "component_errors_total",
86            "error_code" => "kafka_offset_update",
87            "error_type" => error_type::READER_FAILED,
88            "stage" => error_stage::SENDING,
89        )
90        .increment(1);
91    }
92}
93
94#[derive(Debug)]
95pub struct KafkaReadError {
96    pub error: rdkafka::error::KafkaError,
97}
98
99impl InternalEvent for KafkaReadError {
100    fn emit(self) {
101        error!(
102            message = "Failed to read message.",
103            error = %self.error,
104            error_code = "reading_message",
105            error_type = error_type::READER_FAILED,
106            stage = error_stage::RECEIVING,
107            internal_log_rate_limit = true,
108        );
109        counter!(
110            "component_errors_total",
111            "error_code" => "reading_message",
112            "error_type" => error_type::READER_FAILED,
113            "stage" => error_stage::RECEIVING,
114        )
115        .increment(1);
116    }
117}
118
119#[derive(Debug)]
120pub struct KafkaStatisticsReceived<'a> {
121    pub statistics: &'a rdkafka::Statistics,
122    pub expose_lag_metrics: bool,
123}
124
125impl InternalEvent for KafkaStatisticsReceived<'_> {
126    fn emit(self) {
127        gauge!("kafka_queue_messages").set(self.statistics.msg_cnt as f64);
128        gauge!("kafka_queue_messages_bytes").set(self.statistics.msg_size as f64);
129        counter!("kafka_requests_total").absolute(self.statistics.tx as u64);
130        counter!("kafka_requests_bytes_total").absolute(self.statistics.tx_bytes as u64);
131        counter!("kafka_responses_total").absolute(self.statistics.rx as u64);
132        counter!("kafka_responses_bytes_total").absolute(self.statistics.rx_bytes as u64);
133        counter!("kafka_produced_messages_total").absolute(self.statistics.txmsgs as u64);
134        counter!("kafka_produced_messages_bytes_total")
135            .absolute(self.statistics.txmsg_bytes as u64);
136        counter!("kafka_consumed_messages_total").absolute(self.statistics.rxmsgs as u64);
137        counter!("kafka_consumed_messages_bytes_total")
138            .absolute(self.statistics.rxmsg_bytes as u64);
139
140        if self.expose_lag_metrics {
141            for (topic_id, topic) in &self.statistics.topics {
142                for (partition_id, partition) in &topic.partitions {
143                    gauge!(
144                        "kafka_consumer_lag",
145                        "topic_id" => topic_id.clone(),
146                        "partition_id" => partition_id.to_string(),
147                    )
148                    .set(partition.consumer_lag as f64);
149                }
150            }
151        }
152    }
153}
154
155pub struct KafkaHeaderExtractionError<'a> {
156    pub header_field: &'a OwnedTargetPath,
157}
158
159impl InternalEvent for KafkaHeaderExtractionError<'_> {
160    fn emit(self) {
161        error!(
162            message = "Failed to extract header. Value should be a map of String -> Bytes.",
163            error_code = "extracting_header",
164            error_type = error_type::PARSER_FAILED,
165            stage = error_stage::RECEIVING,
166            header_field = self.header_field.to_string(),
167            internal_log_rate_limit = true,
168        );
169        counter!(
170            "component_errors_total",
171            "error_code" => "extracting_header",
172            "error_type" => error_type::PARSER_FAILED,
173            "stage" => error_stage::RECEIVING,
174        )
175        .increment(1);
176    }
177}