vector/internal_events/
kubernetes_logs.rs

1use metrics::counter;
2use vector_lib::internal_event::{InternalEvent, INTENTIONAL};
3use vector_lib::{
4    internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL},
5    json_size::JsonSize,
6};
7use vrl::core::Value;
8
9use crate::event::Event;
10
11#[derive(Debug)]
12pub struct KubernetesLogsEventsReceived<'a> {
13    pub file: &'a str,
14    pub byte_size: JsonSize,
15    pub pod_info: Option<KubernetesLogsPodInfo>,
16}
17
18#[derive(Debug)]
19pub struct KubernetesLogsPodInfo {
20    pub name: String,
21    pub namespace: String,
22}
23
24impl InternalEvent for KubernetesLogsEventsReceived<'_> {
25    fn emit(self) {
26        trace!(
27            message = "Events received.",
28            count = 1,
29            byte_size = %self.byte_size,
30            file = %self.file,
31        );
32        match self.pod_info {
33            Some(pod_info) => {
34                let pod_name = pod_info.name;
35                let pod_namespace = pod_info.namespace;
36
37                counter!(
38                    "component_received_events_total",
39                    "pod_name" => pod_name.clone(),
40                    "pod_namespace" => pod_namespace.clone(),
41                )
42                .increment(1);
43                counter!(
44                    "component_received_event_bytes_total",
45                    "pod_name" => pod_name,
46                    "pod_namespace" => pod_namespace,
47                )
48                .increment(self.byte_size.get() as u64);
49            }
50            None => {
51                counter!("component_received_events_total").increment(1);
52                counter!("component_received_event_bytes_total")
53                    .increment(self.byte_size.get() as u64);
54            }
55        }
56    }
57}
58
59const ANNOTATION_FAILED: &str = "annotation_failed";
60
61#[derive(Debug)]
62pub struct KubernetesLogsEventAnnotationError<'a> {
63    pub event: &'a Event,
64}
65
66impl InternalEvent for KubernetesLogsEventAnnotationError<'_> {
67    fn emit(self) {
68        error!(
69            message = "Failed to annotate event with pod metadata.",
70            event = ?self.event,
71            error_code = ANNOTATION_FAILED,
72            error_type = error_type::READER_FAILED,
73            stage = error_stage::PROCESSING,
74            internal_log_rate_limit = true,
75        );
76        counter!(
77            "component_errors_total",
78            "error_code" => ANNOTATION_FAILED,
79            "error_type" => error_type::READER_FAILED,
80            "stage" => error_stage::PROCESSING,
81        )
82        .increment(1);
83    }
84}
85
86#[derive(Debug)]
87pub(crate) struct KubernetesLogsEventNamespaceAnnotationError<'a> {
88    pub event: &'a Event,
89}
90
91impl InternalEvent for KubernetesLogsEventNamespaceAnnotationError<'_> {
92    fn emit(self) {
93        error!(
94            message = "Failed to annotate event with namespace metadata.",
95            event = ?self.event,
96            error_code = ANNOTATION_FAILED,
97            error_type = error_type::READER_FAILED,
98            stage = error_stage::PROCESSING,
99            internal_log_rate_limit = true,
100        );
101        counter!(
102            "component_errors_total",
103            "error_code" => ANNOTATION_FAILED,
104            "error_type" => error_type::READER_FAILED,
105            "stage" => error_stage::PROCESSING,
106        )
107        .increment(1);
108        counter!("k8s_event_namespace_annotation_failures_total").increment(1);
109    }
110}
111
112#[derive(Debug)]
113pub(crate) struct KubernetesLogsEventNodeAnnotationError<'a> {
114    pub event: &'a Event,
115}
116
117impl InternalEvent for KubernetesLogsEventNodeAnnotationError<'_> {
118    fn emit(self) {
119        error!(
120            message = "Failed to annotate event with node metadata.",
121            event = ?self.event,
122            error_code = ANNOTATION_FAILED,
123            error_type = error_type::READER_FAILED,
124            stage = error_stage::PROCESSING,
125            internal_log_rate_limit = true,
126        );
127        counter!(
128            "component_errors_total",
129            "error_code" => ANNOTATION_FAILED,
130            "error_type" => error_type::READER_FAILED,
131            "stage" => error_stage::PROCESSING,
132        )
133        .increment(1);
134        counter!("k8s_event_node_annotation_failures_total").increment(1);
135    }
136}
137
138#[derive(Debug)]
139pub struct KubernetesLogsFormatPickerEdgeCase {
140    pub what: &'static str,
141}
142
143impl InternalEvent for KubernetesLogsFormatPickerEdgeCase {
144    fn emit(self) {
145        warn!(
146            message = "Encountered format picker edge case.",
147            what = %self.what,
148        );
149        counter!("k8s_format_picker_edge_cases_total").increment(1);
150    }
151}
152
153#[derive(Debug)]
154pub struct KubernetesLogsDockerFormatParseError<'a> {
155    pub error: &'a dyn std::error::Error,
156}
157
158impl InternalEvent for KubernetesLogsDockerFormatParseError<'_> {
159    fn emit(self) {
160        error!(
161            message = "Failed to parse log line in docker format.",
162            error = %self.error,
163            error_type = error_type::PARSER_FAILED,
164            stage = error_stage::PROCESSING,
165            internal_log_rate_limit = true,
166        );
167        counter!(
168            "component_errors_total",
169            "error_type" => error_type::PARSER_FAILED,
170            "stage" => error_stage::PROCESSING,
171        )
172        .increment(1);
173        counter!("k8s_docker_format_parse_failures_total").increment(1);
174    }
175}
176
177const KUBERNETES_LIFECYCLE: &str = "kubernetes_lifecycle";
178
179#[derive(Debug)]
180pub struct KubernetesLifecycleError<E> {
181    pub message: &'static str,
182    pub error: E,
183    pub count: usize,
184}
185
186impl<E: std::fmt::Display> InternalEvent for KubernetesLifecycleError<E> {
187    fn emit(self) {
188        error!(
189            message = self.message,
190            error = %self.error,
191            error_code = KUBERNETES_LIFECYCLE,
192            error_type = error_type::READER_FAILED,
193            stage = error_stage::PROCESSING,
194            internal_log_rate_limit = true,
195        );
196        counter!(
197            "component_errors_total",
198            "error_code" => KUBERNETES_LIFECYCLE,
199            "error_type" => error_type::READER_FAILED,
200            "stage" => error_stage::PROCESSING,
201        )
202        .increment(1);
203        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
204            count: self.count,
205            reason: self.message,
206        });
207    }
208}
209
210#[derive(Debug)]
211pub struct KubernetesMergedLineTooBigError<'a> {
212    pub event: &'a Value,
213    pub configured_limit: usize,
214    pub encountered_size_so_far: usize,
215}
216
217impl InternalEvent for KubernetesMergedLineTooBigError<'_> {
218    fn emit(self) {
219        error!(
220            message = "Found line that exceeds max_merged_line_bytes; discarding.",
221            event = ?self.event,
222            configured_limit = self.configured_limit,
223            encountered_size_so_far = self.encountered_size_so_far,
224            internal_log_rate_limit = true,
225            error_type = error_type::CONDITION_FAILED,
226            stage = error_stage::RECEIVING,
227        );
228        counter!(
229            "component_errors_total",
230            "error_code" => "reading_line_from_kubernetes_log",
231            "error_type" => error_type::CONDITION_FAILED,
232            "stage" => error_stage::RECEIVING,
233        )
234        .increment(1);
235        emit!(ComponentEventsDropped::<INTENTIONAL> {
236            count: 1,
237            reason: "Found line that exceeds max_merged_line_bytes; discarding.",
238        });
239    }
240}