vector/internal_events/
kafka.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use metrics::{counter, gauge};
4use vector_lib::{
5    NamedInternalEvent,
6    internal_event::{InternalEvent, error_stage, error_type},
7    json_size::JsonSize,
8};
9use vrl::path::OwnedTargetPath;
10
11#[derive(Debug, NamedInternalEvent)]
12pub struct KafkaBytesReceived<'a> {
13    pub byte_size: usize,
14    pub protocol: &'static str,
15    pub topic: &'a str,
16    pub partition: i32,
17}
18
19impl InternalEvent for KafkaBytesReceived<'_> {
20    fn emit(self) {
21        trace!(
22            message = "Bytes received.",
23            byte_size = %self.byte_size,
24            protocol = %self.protocol,
25            topic = self.topic,
26            partition = %self.partition,
27        );
28        counter!(
29            "component_received_bytes_total",
30            "protocol" => self.protocol,
31            "topic" => self.topic.to_string(),
32            "partition" => self.partition.to_string(),
33        )
34        .increment(self.byte_size as u64);
35    }
36}
37
38#[derive(Debug, NamedInternalEvent)]
39pub struct KafkaEventsReceived<'a> {
40    pub byte_size: JsonSize,
41    pub count: usize,
42    pub topic: &'a str,
43    pub partition: i32,
44}
45
46impl InternalEvent for KafkaEventsReceived<'_> {
47    fn emit(self) {
48        trace!(
49            message = "Events received.",
50            count = %self.count,
51            byte_size = %self.byte_size,
52            topic = self.topic,
53            partition = %self.partition,
54        );
55        counter!(
56            "component_received_events_total",
57            "topic" => self.topic.to_string(),
58            "partition" => self.partition.to_string(),
59        )
60        .increment(self.count as u64);
61        counter!(
62            "component_received_event_bytes_total",
63            "topic" => self.topic.to_string(),
64            "partition" => self.partition.to_string(),
65        )
66        .increment(self.byte_size.get() as u64);
67    }
68}
69
70#[derive(Debug, NamedInternalEvent)]
71pub struct KafkaOffsetUpdateError {
72    pub error: rdkafka::error::KafkaError,
73}
74
75impl InternalEvent for KafkaOffsetUpdateError {
76    fn emit(self) {
77        error!(
78            message = "Unable to update consumer offset.",
79            error = %self.error,
80            error_code = "kafka_offset_update",
81            error_type = error_type::READER_FAILED,
82            stage = error_stage::SENDING,
83        );
84        counter!(
85            "component_errors_total",
86            "error_code" => "kafka_offset_update",
87            "error_type" => error_type::READER_FAILED,
88            "stage" => error_stage::SENDING,
89        )
90        .increment(1);
91    }
92}
93
94#[derive(Debug, NamedInternalEvent)]
95pub struct KafkaReadError {
96    pub error: rdkafka::error::KafkaError,
97}
98
99impl InternalEvent for KafkaReadError {
100    fn emit(self) {
101        error!(
102            message = "Failed to read message.",
103            error = %self.error,
104            error_code = "reading_message",
105            error_type = error_type::READER_FAILED,
106            stage = error_stage::RECEIVING,
107        );
108        counter!(
109            "component_errors_total",
110            "error_code" => "reading_message",
111            "error_type" => error_type::READER_FAILED,
112            "stage" => error_stage::RECEIVING,
113        )
114        .increment(1);
115    }
116}
117
118#[derive(Debug, NamedInternalEvent)]
119pub struct KafkaStatisticsReceived<'a> {
120    pub statistics: &'a rdkafka::Statistics,
121    pub expose_lag_metrics: bool,
122}
123
124impl InternalEvent for KafkaStatisticsReceived<'_> {
125    fn emit(self) {
126        gauge!("kafka_queue_messages").set(self.statistics.msg_cnt as f64);
127        gauge!("kafka_queue_messages_bytes").set(self.statistics.msg_size as f64);
128        counter!("kafka_requests_total").absolute(self.statistics.tx as u64);
129        counter!("kafka_requests_bytes_total").absolute(self.statistics.tx_bytes as u64);
130        counter!("kafka_responses_total").absolute(self.statistics.rx as u64);
131        counter!("kafka_responses_bytes_total").absolute(self.statistics.rx_bytes as u64);
132        counter!("kafka_produced_messages_total").absolute(self.statistics.txmsgs as u64);
133        counter!("kafka_produced_messages_bytes_total")
134            .absolute(self.statistics.txmsg_bytes as u64);
135        counter!("kafka_consumed_messages_total").absolute(self.statistics.rxmsgs as u64);
136        counter!("kafka_consumed_messages_bytes_total")
137            .absolute(self.statistics.rxmsg_bytes as u64);
138
139        if self.expose_lag_metrics {
140            for (topic_id, topic) in &self.statistics.topics {
141                for (partition_id, partition) in &topic.partitions {
142                    gauge!(
143                        "kafka_consumer_lag",
144                        "topic_id" => topic_id.clone(),
145                        "partition_id" => partition_id.to_string(),
146                    )
147                    .set(partition.consumer_lag as f64);
148                }
149            }
150        }
151    }
152}
153
154#[derive(NamedInternalEvent)]
155pub struct KafkaHeaderExtractionError<'a> {
156    pub header_field: &'a OwnedTargetPath,
157}
158
159impl InternalEvent for KafkaHeaderExtractionError<'_> {
160    fn emit(self) {
161        error!(
162            message = "Failed to extract header. Value should be a map of String -> Bytes.",
163            error_code = "extracting_header",
164            error_type = error_type::PARSER_FAILED,
165            stage = error_stage::RECEIVING,
166            header_field = self.header_field.to_string(),
167        );
168        counter!(
169            "component_errors_total",
170            "error_code" => "extracting_header",
171            "error_type" => error_type::PARSER_FAILED,
172            "stage" => error_stage::RECEIVING,
173        )
174        .increment(1);
175    }
176}