vector/internal_events/
kubernetes_logs.rs

1use metrics::counter;
2use vector_lib::{
3    internal_event::{
4        ComponentEventsDropped, INTENTIONAL, InternalEvent, UNINTENTIONAL, error_stage, error_type,
5    },
6    json_size::JsonSize,
7};
8use vrl::core::Value;
9
10use crate::event::Event;
11
12#[derive(Debug)]
13pub struct KubernetesLogsEventsReceived<'a> {
14    pub file: &'a str,
15    pub byte_size: JsonSize,
16    pub pod_info: Option<KubernetesLogsPodInfo>,
17}
18
19#[derive(Debug)]
20pub struct KubernetesLogsPodInfo {
21    pub name: String,
22    pub namespace: String,
23}
24
25impl InternalEvent for KubernetesLogsEventsReceived<'_> {
26    fn emit(self) {
27        trace!(
28            message = "Events received.",
29            count = 1,
30            byte_size = %self.byte_size,
31            file = %self.file,
32        );
33        match self.pod_info {
34            Some(pod_info) => {
35                let pod_name = pod_info.name;
36                let pod_namespace = pod_info.namespace;
37
38                counter!(
39                    "component_received_events_total",
40                    "pod_name" => pod_name.clone(),
41                    "pod_namespace" => pod_namespace.clone(),
42                )
43                .increment(1);
44                counter!(
45                    "component_received_event_bytes_total",
46                    "pod_name" => pod_name,
47                    "pod_namespace" => pod_namespace,
48                )
49                .increment(self.byte_size.get() as u64);
50            }
51            None => {
52                counter!("component_received_events_total").increment(1);
53                counter!("component_received_event_bytes_total")
54                    .increment(self.byte_size.get() as u64);
55            }
56        }
57    }
58}
59
60const ANNOTATION_FAILED: &str = "annotation_failed";
61
62#[derive(Debug)]
63pub struct KubernetesLogsEventAnnotationError<'a> {
64    pub event: &'a Event,
65}
66
67impl InternalEvent for KubernetesLogsEventAnnotationError<'_> {
68    fn emit(self) {
69        error!(
70            message = "Failed to annotate event with pod metadata.",
71            event = ?self.event,
72            error_code = ANNOTATION_FAILED,
73            error_type = error_type::READER_FAILED,
74            stage = error_stage::PROCESSING,
75        );
76        counter!(
77            "component_errors_total",
78            "error_code" => ANNOTATION_FAILED,
79            "error_type" => error_type::READER_FAILED,
80            "stage" => error_stage::PROCESSING,
81        )
82        .increment(1);
83    }
84}
85
86#[derive(Debug)]
87pub(crate) struct KubernetesLogsEventNamespaceAnnotationError<'a> {
88    pub event: &'a Event,
89}
90
91impl InternalEvent for KubernetesLogsEventNamespaceAnnotationError<'_> {
92    fn emit(self) {
93        error!(
94            message = "Failed to annotate event with namespace metadata.",
95            event = ?self.event,
96            error_code = ANNOTATION_FAILED,
97            error_type = error_type::READER_FAILED,
98            stage = error_stage::PROCESSING,
99        );
100        counter!(
101            "component_errors_total",
102            "error_code" => ANNOTATION_FAILED,
103            "error_type" => error_type::READER_FAILED,
104            "stage" => error_stage::PROCESSING,
105        )
106        .increment(1);
107        counter!("k8s_event_namespace_annotation_failures_total").increment(1);
108    }
109}
110
111#[derive(Debug)]
112pub(crate) struct KubernetesLogsEventNodeAnnotationError<'a> {
113    pub event: &'a Event,
114}
115
116impl InternalEvent for KubernetesLogsEventNodeAnnotationError<'_> {
117    fn emit(self) {
118        error!(
119            message = "Failed to annotate event with node metadata.",
120            event = ?self.event,
121            error_code = ANNOTATION_FAILED,
122            error_type = error_type::READER_FAILED,
123            stage = error_stage::PROCESSING,
124        );
125        counter!(
126            "component_errors_total",
127            "error_code" => ANNOTATION_FAILED,
128            "error_type" => error_type::READER_FAILED,
129            "stage" => error_stage::PROCESSING,
130        )
131        .increment(1);
132        counter!("k8s_event_node_annotation_failures_total").increment(1);
133    }
134}
135
136#[derive(Debug)]
137pub struct KubernetesLogsFormatPickerEdgeCase {
138    pub what: &'static str,
139}
140
141impl InternalEvent for KubernetesLogsFormatPickerEdgeCase {
142    fn emit(self) {
143        warn!(
144            message = "Encountered format picker edge case.",
145            what = %self.what,
146        );
147        counter!("k8s_format_picker_edge_cases_total").increment(1);
148    }
149}
150
151#[derive(Debug)]
152pub struct KubernetesLogsDockerFormatParseError<'a> {
153    pub error: &'a dyn std::error::Error,
154}
155
156impl InternalEvent for KubernetesLogsDockerFormatParseError<'_> {
157    fn emit(self) {
158        error!(
159            message = "Failed to parse log line in docker format.",
160            error = %self.error,
161            error_type = error_type::PARSER_FAILED,
162            stage = error_stage::PROCESSING,
163        );
164        counter!(
165            "component_errors_total",
166            "error_type" => error_type::PARSER_FAILED,
167            "stage" => error_stage::PROCESSING,
168        )
169        .increment(1);
170        counter!("k8s_docker_format_parse_failures_total").increment(1);
171    }
172}
173
174const KUBERNETES_LIFECYCLE: &str = "kubernetes_lifecycle";
175
176#[derive(Debug)]
177pub struct KubernetesLifecycleError<E> {
178    pub message: &'static str,
179    pub error: E,
180    pub count: usize,
181}
182
183impl<E: std::fmt::Display> InternalEvent for KubernetesLifecycleError<E> {
184    fn emit(self) {
185        error!(
186            message = self.message,
187            error = %self.error,
188            error_code = KUBERNETES_LIFECYCLE,
189            error_type = error_type::READER_FAILED,
190            stage = error_stage::PROCESSING,
191        );
192        counter!(
193            "component_errors_total",
194            "error_code" => KUBERNETES_LIFECYCLE,
195            "error_type" => error_type::READER_FAILED,
196            "stage" => error_stage::PROCESSING,
197        )
198        .increment(1);
199        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
200            count: self.count,
201            reason: self.message,
202        });
203    }
204}
205
206#[derive(Debug)]
207pub struct KubernetesMergedLineTooBigError<'a> {
208    pub event: &'a Value,
209    pub configured_limit: usize,
210    pub encountered_size_so_far: usize,
211}
212
213impl InternalEvent for KubernetesMergedLineTooBigError<'_> {
214    fn emit(self) {
215        error!(
216            message = "Found line that exceeds max_merged_line_bytes; discarding.",
217            event = ?self.event,
218            configured_limit = self.configured_limit,
219            encountered_size_so_far = self.encountered_size_so_far,
220            error_type = error_type::CONDITION_FAILED,
221            stage = error_stage::RECEIVING,
222        );
223        counter!(
224            "component_errors_total",
225            "error_code" => "reading_line_from_kubernetes_log",
226            "error_type" => error_type::CONDITION_FAILED,
227            "stage" => error_stage::RECEIVING,
228        )
229        .increment(1);
230        emit!(ComponentEventsDropped::<INTENTIONAL> {
231            count: 1,
232            reason: "Found line that exceeds max_merged_line_bytes; discarding.",
233        });
234    }
235}