vector/internal_events/
kubernetes_logs.rs

1use metrics::counter;
2use vector_lib::{
3    NamedInternalEvent,
4    internal_event::{
5        ComponentEventsDropped, INTENTIONAL, InternalEvent, UNINTENTIONAL, error_stage, error_type,
6    },
7    json_size::JsonSize,
8};
9use vrl::core::Value;
10
11use crate::event::Event;
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct KubernetesLogsEventsReceived<'a> {
15    pub file: &'a str,
16    pub byte_size: JsonSize,
17    pub pod_info: Option<KubernetesLogsPodInfo>,
18}
19
20#[derive(Debug)]
21pub struct KubernetesLogsPodInfo {
22    pub name: String,
23    pub namespace: String,
24}
25
26impl InternalEvent for KubernetesLogsEventsReceived<'_> {
27    fn emit(self) {
28        trace!(
29            message = "Events received.",
30            count = 1,
31            byte_size = %self.byte_size,
32            file = %self.file,
33        );
34        match self.pod_info {
35            Some(pod_info) => {
36                let pod_name = pod_info.name;
37                let pod_namespace = pod_info.namespace;
38
39                counter!(
40                    "component_received_events_total",
41                    "pod_name" => pod_name.clone(),
42                    "pod_namespace" => pod_namespace.clone(),
43                )
44                .increment(1);
45                counter!(
46                    "component_received_event_bytes_total",
47                    "pod_name" => pod_name,
48                    "pod_namespace" => pod_namespace,
49                )
50                .increment(self.byte_size.get() as u64);
51            }
52            None => {
53                counter!("component_received_events_total").increment(1);
54                counter!("component_received_event_bytes_total")
55                    .increment(self.byte_size.get() as u64);
56            }
57        }
58    }
59}
60
61const ANNOTATION_FAILED: &str = "annotation_failed";
62
63#[derive(Debug, NamedInternalEvent)]
64pub struct KubernetesLogsEventAnnotationError<'a> {
65    pub event: &'a Event,
66}
67
68impl InternalEvent for KubernetesLogsEventAnnotationError<'_> {
69    fn emit(self) {
70        error!(
71            message = "Failed to annotate event with pod metadata.",
72            event = ?self.event,
73            error_code = ANNOTATION_FAILED,
74            error_type = error_type::READER_FAILED,
75            stage = error_stage::PROCESSING,
76        );
77        counter!(
78            "component_errors_total",
79            "error_code" => ANNOTATION_FAILED,
80            "error_type" => error_type::READER_FAILED,
81            "stage" => error_stage::PROCESSING,
82        )
83        .increment(1);
84    }
85}
86
87#[derive(Debug, NamedInternalEvent)]
88pub(crate) struct KubernetesLogsEventNamespaceAnnotationError<'a> {
89    pub event: &'a Event,
90}
91
92impl InternalEvent for KubernetesLogsEventNamespaceAnnotationError<'_> {
93    fn emit(self) {
94        error!(
95            message = "Failed to annotate event with namespace metadata.",
96            event = ?self.event,
97            error_code = ANNOTATION_FAILED,
98            error_type = error_type::READER_FAILED,
99            stage = error_stage::PROCESSING,
100        );
101        counter!(
102            "component_errors_total",
103            "error_code" => ANNOTATION_FAILED,
104            "error_type" => error_type::READER_FAILED,
105            "stage" => error_stage::PROCESSING,
106        )
107        .increment(1);
108        counter!("k8s_event_namespace_annotation_failures_total").increment(1);
109    }
110}
111
112#[derive(Debug, NamedInternalEvent)]
113pub(crate) struct KubernetesLogsEventNodeAnnotationError<'a> {
114    pub event: &'a Event,
115}
116
117impl InternalEvent for KubernetesLogsEventNodeAnnotationError<'_> {
118    fn emit(self) {
119        error!(
120            message = "Failed to annotate event with node metadata.",
121            event = ?self.event,
122            error_code = ANNOTATION_FAILED,
123            error_type = error_type::READER_FAILED,
124            stage = error_stage::PROCESSING,
125        );
126        counter!(
127            "component_errors_total",
128            "error_code" => ANNOTATION_FAILED,
129            "error_type" => error_type::READER_FAILED,
130            "stage" => error_stage::PROCESSING,
131        )
132        .increment(1);
133        counter!("k8s_event_node_annotation_failures_total").increment(1);
134    }
135}
136
137#[derive(Debug, NamedInternalEvent)]
138pub struct KubernetesLogsFormatPickerEdgeCase {
139    pub what: &'static str,
140}
141
142impl InternalEvent for KubernetesLogsFormatPickerEdgeCase {
143    fn emit(self) {
144        warn!(
145            message = "Encountered format picker edge case.",
146            what = %self.what,
147        );
148        counter!("k8s_format_picker_edge_cases_total").increment(1);
149    }
150}
151
152#[derive(Debug, NamedInternalEvent)]
153pub struct KubernetesLogsDockerFormatParseError<'a> {
154    pub error: &'a dyn std::error::Error,
155}
156
157impl InternalEvent for KubernetesLogsDockerFormatParseError<'_> {
158    fn emit(self) {
159        error!(
160            message = "Failed to parse log line in docker format.",
161            error = %self.error,
162            error_type = error_type::PARSER_FAILED,
163            stage = error_stage::PROCESSING,
164        );
165        counter!(
166            "component_errors_total",
167            "error_type" => error_type::PARSER_FAILED,
168            "stage" => error_stage::PROCESSING,
169        )
170        .increment(1);
171        counter!("k8s_docker_format_parse_failures_total").increment(1);
172    }
173}
174
175const KUBERNETES_LIFECYCLE: &str = "kubernetes_lifecycle";
176
177#[derive(Debug, NamedInternalEvent)]
178pub struct KubernetesLifecycleError<E> {
179    pub message: &'static str,
180    pub error: E,
181    pub count: usize,
182}
183
184impl<E: std::fmt::Display> InternalEvent for KubernetesLifecycleError<E> {
185    fn emit(self) {
186        error!(
187            message = self.message,
188            error = %self.error,
189            error_code = KUBERNETES_LIFECYCLE,
190            error_type = error_type::READER_FAILED,
191            stage = error_stage::PROCESSING,
192        );
193        counter!(
194            "component_errors_total",
195            "error_code" => KUBERNETES_LIFECYCLE,
196            "error_type" => error_type::READER_FAILED,
197            "stage" => error_stage::PROCESSING,
198        )
199        .increment(1);
200        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
201            count: self.count,
202            reason: self.message,
203        });
204    }
205}
206
207#[derive(Debug, NamedInternalEvent)]
208pub struct KubernetesMergedLineTooBigError<'a> {
209    pub event: &'a Value,
210    pub configured_limit: usize,
211    pub encountered_size_so_far: usize,
212}
213
214impl InternalEvent for KubernetesMergedLineTooBigError<'_> {
215    fn emit(self) {
216        error!(
217            message = "Found line that exceeds max_merged_line_bytes; discarding.",
218            event = ?self.event,
219            configured_limit = self.configured_limit,
220            encountered_size_so_far = self.encountered_size_so_far,
221            error_type = error_type::CONDITION_FAILED,
222            stage = error_stage::RECEIVING,
223        );
224        counter!(
225            "component_errors_total",
226            "error_code" => "reading_line_from_kubernetes_log",
227            "error_type" => error_type::CONDITION_FAILED,
228            "stage" => error_stage::RECEIVING,
229        )
230        .increment(1);
231        emit!(ComponentEventsDropped::<INTENTIONAL> {
232            count: 1,
233            reason: "Found line that exceeds max_merged_line_bytes; discarding.",
234        });
235    }
236}