vector/internal_events/
kubernetes_logs.rs

1use metrics::counter;
2use vector_lib::{
3    internal_event::{
4        ComponentEventsDropped, INTENTIONAL, InternalEvent, UNINTENTIONAL, error_stage, error_type,
5    },
6    json_size::JsonSize,
7};
8use vrl::core::Value;
9
10use crate::event::Event;
11
12#[derive(Debug)]
13pub struct KubernetesLogsEventsReceived<'a> {
14    pub file: &'a str,
15    pub byte_size: JsonSize,
16    pub pod_info: Option<KubernetesLogsPodInfo>,
17}
18
19#[derive(Debug)]
20pub struct KubernetesLogsPodInfo {
21    pub name: String,
22    pub namespace: String,
23}
24
25impl InternalEvent for KubernetesLogsEventsReceived<'_> {
26    fn emit(self) {
27        trace!(
28            message = "Events received.",
29            count = 1,
30            byte_size = %self.byte_size,
31            file = %self.file,
32        );
33        match self.pod_info {
34            Some(pod_info) => {
35                let pod_name = pod_info.name;
36                let pod_namespace = pod_info.namespace;
37
38                counter!(
39                    "component_received_events_total",
40                    "pod_name" => pod_name.clone(),
41                    "pod_namespace" => pod_namespace.clone(),
42                )
43                .increment(1);
44                counter!(
45                    "component_received_event_bytes_total",
46                    "pod_name" => pod_name,
47                    "pod_namespace" => pod_namespace,
48                )
49                .increment(self.byte_size.get() as u64);
50            }
51            None => {
52                counter!("component_received_events_total").increment(1);
53                counter!("component_received_event_bytes_total")
54                    .increment(self.byte_size.get() as u64);
55            }
56        }
57    }
58}
59
60const ANNOTATION_FAILED: &str = "annotation_failed";
61
62#[derive(Debug)]
63pub struct KubernetesLogsEventAnnotationError<'a> {
64    pub event: &'a Event,
65}
66
67impl InternalEvent for KubernetesLogsEventAnnotationError<'_> {
68    fn emit(self) {
69        error!(
70            message = "Failed to annotate event with pod metadata.",
71            event = ?self.event,
72            error_code = ANNOTATION_FAILED,
73            error_type = error_type::READER_FAILED,
74            stage = error_stage::PROCESSING,
75            internal_log_rate_limit = true,
76        );
77        counter!(
78            "component_errors_total",
79            "error_code" => ANNOTATION_FAILED,
80            "error_type" => error_type::READER_FAILED,
81            "stage" => error_stage::PROCESSING,
82        )
83        .increment(1);
84    }
85}
86
87#[derive(Debug)]
88pub(crate) struct KubernetesLogsEventNamespaceAnnotationError<'a> {
89    pub event: &'a Event,
90}
91
92impl InternalEvent for KubernetesLogsEventNamespaceAnnotationError<'_> {
93    fn emit(self) {
94        error!(
95            message = "Failed to annotate event with namespace metadata.",
96            event = ?self.event,
97            error_code = ANNOTATION_FAILED,
98            error_type = error_type::READER_FAILED,
99            stage = error_stage::PROCESSING,
100            internal_log_rate_limit = true,
101        );
102        counter!(
103            "component_errors_total",
104            "error_code" => ANNOTATION_FAILED,
105            "error_type" => error_type::READER_FAILED,
106            "stage" => error_stage::PROCESSING,
107        )
108        .increment(1);
109        counter!("k8s_event_namespace_annotation_failures_total").increment(1);
110    }
111}
112
113#[derive(Debug)]
114pub(crate) struct KubernetesLogsEventNodeAnnotationError<'a> {
115    pub event: &'a Event,
116}
117
118impl InternalEvent for KubernetesLogsEventNodeAnnotationError<'_> {
119    fn emit(self) {
120        error!(
121            message = "Failed to annotate event with node metadata.",
122            event = ?self.event,
123            error_code = ANNOTATION_FAILED,
124            error_type = error_type::READER_FAILED,
125            stage = error_stage::PROCESSING,
126            internal_log_rate_limit = true,
127        );
128        counter!(
129            "component_errors_total",
130            "error_code" => ANNOTATION_FAILED,
131            "error_type" => error_type::READER_FAILED,
132            "stage" => error_stage::PROCESSING,
133        )
134        .increment(1);
135        counter!("k8s_event_node_annotation_failures_total").increment(1);
136    }
137}
138
139#[derive(Debug)]
140pub struct KubernetesLogsFormatPickerEdgeCase {
141    pub what: &'static str,
142}
143
144impl InternalEvent for KubernetesLogsFormatPickerEdgeCase {
145    fn emit(self) {
146        warn!(
147            message = "Encountered format picker edge case.",
148            what = %self.what,
149        );
150        counter!("k8s_format_picker_edge_cases_total").increment(1);
151    }
152}
153
154#[derive(Debug)]
155pub struct KubernetesLogsDockerFormatParseError<'a> {
156    pub error: &'a dyn std::error::Error,
157}
158
159impl InternalEvent for KubernetesLogsDockerFormatParseError<'_> {
160    fn emit(self) {
161        error!(
162            message = "Failed to parse log line in docker format.",
163            error = %self.error,
164            error_type = error_type::PARSER_FAILED,
165            stage = error_stage::PROCESSING,
166            internal_log_rate_limit = true,
167        );
168        counter!(
169            "component_errors_total",
170            "error_type" => error_type::PARSER_FAILED,
171            "stage" => error_stage::PROCESSING,
172        )
173        .increment(1);
174        counter!("k8s_docker_format_parse_failures_total").increment(1);
175    }
176}
177
178const KUBERNETES_LIFECYCLE: &str = "kubernetes_lifecycle";
179
180#[derive(Debug)]
181pub struct KubernetesLifecycleError<E> {
182    pub message: &'static str,
183    pub error: E,
184    pub count: usize,
185}
186
187impl<E: std::fmt::Display> InternalEvent for KubernetesLifecycleError<E> {
188    fn emit(self) {
189        error!(
190            message = self.message,
191            error = %self.error,
192            error_code = KUBERNETES_LIFECYCLE,
193            error_type = error_type::READER_FAILED,
194            stage = error_stage::PROCESSING,
195            internal_log_rate_limit = true,
196        );
197        counter!(
198            "component_errors_total",
199            "error_code" => KUBERNETES_LIFECYCLE,
200            "error_type" => error_type::READER_FAILED,
201            "stage" => error_stage::PROCESSING,
202        )
203        .increment(1);
204        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
205            count: self.count,
206            reason: self.message,
207        });
208    }
209}
210
211#[derive(Debug)]
212pub struct KubernetesMergedLineTooBigError<'a> {
213    pub event: &'a Value,
214    pub configured_limit: usize,
215    pub encountered_size_so_far: usize,
216}
217
218impl InternalEvent for KubernetesMergedLineTooBigError<'_> {
219    fn emit(self) {
220        error!(
221            message = "Found line that exceeds max_merged_line_bytes; discarding.",
222            event = ?self.event,
223            configured_limit = self.configured_limit,
224            encountered_size_so_far = self.encountered_size_so_far,
225            internal_log_rate_limit = true,
226            error_type = error_type::CONDITION_FAILED,
227            stage = error_stage::RECEIVING,
228        );
229        counter!(
230            "component_errors_total",
231            "error_code" => "reading_line_from_kubernetes_log",
232            "error_type" => error_type::CONDITION_FAILED,
233            "stage" => error_stage::RECEIVING,
234        )
235        .increment(1);
236        emit!(ComponentEventsDropped::<INTENTIONAL> {
237            count: 1,
238            reason: "Found line that exceeds max_merged_line_bytes; discarding.",
239        });
240    }
241}