1use metrics::counter;
2use vector_lib::internal_event::{InternalEvent, INTENTIONAL};
3use vector_lib::{
4 internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL},
5 json_size::JsonSize,
6};
7use vrl::core::Value;
8
9use crate::event::Event;
10
11#[derive(Debug)]
12pub struct KubernetesLogsEventsReceived<'a> {
13 pub file: &'a str,
14 pub byte_size: JsonSize,
15 pub pod_info: Option<KubernetesLogsPodInfo>,
16}
17
18#[derive(Debug)]
19pub struct KubernetesLogsPodInfo {
20 pub name: String,
21 pub namespace: String,
22}
23
24impl InternalEvent for KubernetesLogsEventsReceived<'_> {
25 fn emit(self) {
26 trace!(
27 message = "Events received.",
28 count = 1,
29 byte_size = %self.byte_size,
30 file = %self.file,
31 );
32 match self.pod_info {
33 Some(pod_info) => {
34 let pod_name = pod_info.name;
35 let pod_namespace = pod_info.namespace;
36
37 counter!(
38 "component_received_events_total",
39 "pod_name" => pod_name.clone(),
40 "pod_namespace" => pod_namespace.clone(),
41 )
42 .increment(1);
43 counter!(
44 "component_received_event_bytes_total",
45 "pod_name" => pod_name,
46 "pod_namespace" => pod_namespace,
47 )
48 .increment(self.byte_size.get() as u64);
49 }
50 None => {
51 counter!("component_received_events_total").increment(1);
52 counter!("component_received_event_bytes_total")
53 .increment(self.byte_size.get() as u64);
54 }
55 }
56 }
57}
58
59const ANNOTATION_FAILED: &str = "annotation_failed";
60
61#[derive(Debug)]
62pub struct KubernetesLogsEventAnnotationError<'a> {
63 pub event: &'a Event,
64}
65
66impl InternalEvent for KubernetesLogsEventAnnotationError<'_> {
67 fn emit(self) {
68 error!(
69 message = "Failed to annotate event with pod metadata.",
70 event = ?self.event,
71 error_code = ANNOTATION_FAILED,
72 error_type = error_type::READER_FAILED,
73 stage = error_stage::PROCESSING,
74 internal_log_rate_limit = true,
75 );
76 counter!(
77 "component_errors_total",
78 "error_code" => ANNOTATION_FAILED,
79 "error_type" => error_type::READER_FAILED,
80 "stage" => error_stage::PROCESSING,
81 )
82 .increment(1);
83 }
84}
85
86#[derive(Debug)]
87pub(crate) struct KubernetesLogsEventNamespaceAnnotationError<'a> {
88 pub event: &'a Event,
89}
90
91impl InternalEvent for KubernetesLogsEventNamespaceAnnotationError<'_> {
92 fn emit(self) {
93 error!(
94 message = "Failed to annotate event with namespace metadata.",
95 event = ?self.event,
96 error_code = ANNOTATION_FAILED,
97 error_type = error_type::READER_FAILED,
98 stage = error_stage::PROCESSING,
99 internal_log_rate_limit = true,
100 );
101 counter!(
102 "component_errors_total",
103 "error_code" => ANNOTATION_FAILED,
104 "error_type" => error_type::READER_FAILED,
105 "stage" => error_stage::PROCESSING,
106 )
107 .increment(1);
108 counter!("k8s_event_namespace_annotation_failures_total").increment(1);
109 }
110}
111
112#[derive(Debug)]
113pub(crate) struct KubernetesLogsEventNodeAnnotationError<'a> {
114 pub event: &'a Event,
115}
116
117impl InternalEvent for KubernetesLogsEventNodeAnnotationError<'_> {
118 fn emit(self) {
119 error!(
120 message = "Failed to annotate event with node metadata.",
121 event = ?self.event,
122 error_code = ANNOTATION_FAILED,
123 error_type = error_type::READER_FAILED,
124 stage = error_stage::PROCESSING,
125 internal_log_rate_limit = true,
126 );
127 counter!(
128 "component_errors_total",
129 "error_code" => ANNOTATION_FAILED,
130 "error_type" => error_type::READER_FAILED,
131 "stage" => error_stage::PROCESSING,
132 )
133 .increment(1);
134 counter!("k8s_event_node_annotation_failures_total").increment(1);
135 }
136}
137
138#[derive(Debug)]
139pub struct KubernetesLogsFormatPickerEdgeCase {
140 pub what: &'static str,
141}
142
143impl InternalEvent for KubernetesLogsFormatPickerEdgeCase {
144 fn emit(self) {
145 warn!(
146 message = "Encountered format picker edge case.",
147 what = %self.what,
148 );
149 counter!("k8s_format_picker_edge_cases_total").increment(1);
150 }
151}
152
153#[derive(Debug)]
154pub struct KubernetesLogsDockerFormatParseError<'a> {
155 pub error: &'a dyn std::error::Error,
156}
157
158impl InternalEvent for KubernetesLogsDockerFormatParseError<'_> {
159 fn emit(self) {
160 error!(
161 message = "Failed to parse log line in docker format.",
162 error = %self.error,
163 error_type = error_type::PARSER_FAILED,
164 stage = error_stage::PROCESSING,
165 internal_log_rate_limit = true,
166 );
167 counter!(
168 "component_errors_total",
169 "error_type" => error_type::PARSER_FAILED,
170 "stage" => error_stage::PROCESSING,
171 )
172 .increment(1);
173 counter!("k8s_docker_format_parse_failures_total").increment(1);
174 }
175}
176
177const KUBERNETES_LIFECYCLE: &str = "kubernetes_lifecycle";
178
179#[derive(Debug)]
180pub struct KubernetesLifecycleError<E> {
181 pub message: &'static str,
182 pub error: E,
183 pub count: usize,
184}
185
186impl<E: std::fmt::Display> InternalEvent for KubernetesLifecycleError<E> {
187 fn emit(self) {
188 error!(
189 message = self.message,
190 error = %self.error,
191 error_code = KUBERNETES_LIFECYCLE,
192 error_type = error_type::READER_FAILED,
193 stage = error_stage::PROCESSING,
194 internal_log_rate_limit = true,
195 );
196 counter!(
197 "component_errors_total",
198 "error_code" => KUBERNETES_LIFECYCLE,
199 "error_type" => error_type::READER_FAILED,
200 "stage" => error_stage::PROCESSING,
201 )
202 .increment(1);
203 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
204 count: self.count,
205 reason: self.message,
206 });
207 }
208}
209
210#[derive(Debug)]
211pub struct KubernetesMergedLineTooBigError<'a> {
212 pub event: &'a Value,
213 pub configured_limit: usize,
214 pub encountered_size_so_far: usize,
215}
216
217impl InternalEvent for KubernetesMergedLineTooBigError<'_> {
218 fn emit(self) {
219 error!(
220 message = "Found line that exceeds max_merged_line_bytes; discarding.",
221 event = ?self.event,
222 configured_limit = self.configured_limit,
223 encountered_size_so_far = self.encountered_size_so_far,
224 internal_log_rate_limit = true,
225 error_type = error_type::CONDITION_FAILED,
226 stage = error_stage::RECEIVING,
227 );
228 counter!(
229 "component_errors_total",
230 "error_code" => "reading_line_from_kubernetes_log",
231 "error_type" => error_type::CONDITION_FAILED,
232 "stage" => error_stage::RECEIVING,
233 )
234 .increment(1);
235 emit!(ComponentEventsDropped::<INTENTIONAL> {
236 count: 1,
237 reason: "Found line that exceeds max_merged_line_bytes; discarding.",
238 });
239 }
240}