vector/internal_events/
kafka.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use metrics::{counter, gauge};
4use vector_lib::{
5    internal_event::{InternalEvent, error_stage, error_type},
6    json_size::JsonSize,
7};
8use vrl::path::OwnedTargetPath;
9
10#[derive(Debug)]
11pub struct KafkaBytesReceived<'a> {
12    pub byte_size: usize,
13    pub protocol: &'static str,
14    pub topic: &'a str,
15    pub partition: i32,
16}
17
18impl InternalEvent for KafkaBytesReceived<'_> {
19    fn emit(self) {
20        trace!(
21            message = "Bytes received.",
22            byte_size = %self.byte_size,
23            protocol = %self.protocol,
24            topic = self.topic,
25            partition = %self.partition,
26        );
27        counter!(
28            "component_received_bytes_total",
29            "protocol" => self.protocol,
30            "topic" => self.topic.to_string(),
31            "partition" => self.partition.to_string(),
32        )
33        .increment(self.byte_size as u64);
34    }
35}
36
37#[derive(Debug)]
38pub struct KafkaEventsReceived<'a> {
39    pub byte_size: JsonSize,
40    pub count: usize,
41    pub topic: &'a str,
42    pub partition: i32,
43}
44
45impl InternalEvent for KafkaEventsReceived<'_> {
46    fn emit(self) {
47        trace!(
48            message = "Events received.",
49            count = %self.count,
50            byte_size = %self.byte_size,
51            topic = self.topic,
52            partition = %self.partition,
53        );
54        counter!(
55            "component_received_events_total",
56            "topic" => self.topic.to_string(),
57            "partition" => self.partition.to_string(),
58        )
59        .increment(self.count as u64);
60        counter!(
61            "component_received_event_bytes_total",
62            "topic" => self.topic.to_string(),
63            "partition" => self.partition.to_string(),
64        )
65        .increment(self.byte_size.get() as u64);
66    }
67}
68
69#[derive(Debug)]
70pub struct KafkaOffsetUpdateError {
71    pub error: rdkafka::error::KafkaError,
72}
73
74impl InternalEvent for KafkaOffsetUpdateError {
75    fn emit(self) {
76        error!(
77            message = "Unable to update consumer offset.",
78            error = %self.error,
79            error_code = "kafka_offset_update",
80            error_type = error_type::READER_FAILED,
81            stage = error_stage::SENDING,
82        );
83        counter!(
84            "component_errors_total",
85            "error_code" => "kafka_offset_update",
86            "error_type" => error_type::READER_FAILED,
87            "stage" => error_stage::SENDING,
88        )
89        .increment(1);
90    }
91}
92
93#[derive(Debug)]
94pub struct KafkaReadError {
95    pub error: rdkafka::error::KafkaError,
96}
97
98impl InternalEvent for KafkaReadError {
99    fn emit(self) {
100        error!(
101            message = "Failed to read message.",
102            error = %self.error,
103            error_code = "reading_message",
104            error_type = error_type::READER_FAILED,
105            stage = error_stage::RECEIVING,
106        );
107        counter!(
108            "component_errors_total",
109            "error_code" => "reading_message",
110            "error_type" => error_type::READER_FAILED,
111            "stage" => error_stage::RECEIVING,
112        )
113        .increment(1);
114    }
115}
116
117#[derive(Debug)]
118pub struct KafkaStatisticsReceived<'a> {
119    pub statistics: &'a rdkafka::Statistics,
120    pub expose_lag_metrics: bool,
121}
122
123impl InternalEvent for KafkaStatisticsReceived<'_> {
124    fn emit(self) {
125        gauge!("kafka_queue_messages").set(self.statistics.msg_cnt as f64);
126        gauge!("kafka_queue_messages_bytes").set(self.statistics.msg_size as f64);
127        counter!("kafka_requests_total").absolute(self.statistics.tx as u64);
128        counter!("kafka_requests_bytes_total").absolute(self.statistics.tx_bytes as u64);
129        counter!("kafka_responses_total").absolute(self.statistics.rx as u64);
130        counter!("kafka_responses_bytes_total").absolute(self.statistics.rx_bytes as u64);
131        counter!("kafka_produced_messages_total").absolute(self.statistics.txmsgs as u64);
132        counter!("kafka_produced_messages_bytes_total")
133            .absolute(self.statistics.txmsg_bytes as u64);
134        counter!("kafka_consumed_messages_total").absolute(self.statistics.rxmsgs as u64);
135        counter!("kafka_consumed_messages_bytes_total")
136            .absolute(self.statistics.rxmsg_bytes as u64);
137
138        if self.expose_lag_metrics {
139            for (topic_id, topic) in &self.statistics.topics {
140                for (partition_id, partition) in &topic.partitions {
141                    gauge!(
142                        "kafka_consumer_lag",
143                        "topic_id" => topic_id.clone(),
144                        "partition_id" => partition_id.to_string(),
145                    )
146                    .set(partition.consumer_lag as f64);
147                }
148            }
149        }
150    }
151}
152
153pub struct KafkaHeaderExtractionError<'a> {
154    pub header_field: &'a OwnedTargetPath,
155}
156
157impl InternalEvent for KafkaHeaderExtractionError<'_> {
158    fn emit(self) {
159        error!(
160            message = "Failed to extract header. Value should be a map of String -> Bytes.",
161            error_code = "extracting_header",
162            error_type = error_type::PARSER_FAILED,
163            stage = error_stage::RECEIVING,
164            header_field = self.header_field.to_string(),
165        );
166        counter!(
167            "component_errors_total",
168            "error_code" => "extracting_header",
169            "error_type" => error_type::PARSER_FAILED,
170            "stage" => error_stage::RECEIVING,
171        )
172        .increment(1);
173    }
174}