1use metrics::counter;
2use vector_lib::{
3 NamedInternalEvent,
4 internal_event::{
5 ComponentEventsDropped, INTENTIONAL, InternalEvent, UNINTENTIONAL, error_stage, error_type,
6 },
7 json_size::JsonSize,
8};
9use vrl::core::Value;
10
11use crate::event::Event;
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct KubernetesLogsEventsReceived<'a> {
15 pub file: &'a str,
16 pub byte_size: JsonSize,
17 pub pod_info: Option<KubernetesLogsPodInfo>,
18}
19
20#[derive(Debug)]
21pub struct KubernetesLogsPodInfo {
22 pub name: String,
23 pub namespace: String,
24}
25
26impl InternalEvent for KubernetesLogsEventsReceived<'_> {
27 fn emit(self) {
28 trace!(
29 message = "Events received.",
30 count = 1,
31 byte_size = %self.byte_size,
32 file = %self.file,
33 );
34 match self.pod_info {
35 Some(pod_info) => {
36 let pod_name = pod_info.name;
37 let pod_namespace = pod_info.namespace;
38
39 counter!(
40 "component_received_events_total",
41 "pod_name" => pod_name.clone(),
42 "pod_namespace" => pod_namespace.clone(),
43 )
44 .increment(1);
45 counter!(
46 "component_received_event_bytes_total",
47 "pod_name" => pod_name,
48 "pod_namespace" => pod_namespace,
49 )
50 .increment(self.byte_size.get() as u64);
51 }
52 None => {
53 counter!("component_received_events_total").increment(1);
54 counter!("component_received_event_bytes_total")
55 .increment(self.byte_size.get() as u64);
56 }
57 }
58 }
59}
60
61const ANNOTATION_FAILED: &str = "annotation_failed";
62
63#[derive(Debug, NamedInternalEvent)]
64pub struct KubernetesLogsEventAnnotationError<'a> {
65 pub event: &'a Event,
66}
67
68impl InternalEvent for KubernetesLogsEventAnnotationError<'_> {
69 fn emit(self) {
70 error!(
71 message = "Failed to annotate event with pod metadata.",
72 event = ?self.event,
73 error_code = ANNOTATION_FAILED,
74 error_type = error_type::READER_FAILED,
75 stage = error_stage::PROCESSING,
76 );
77 counter!(
78 "component_errors_total",
79 "error_code" => ANNOTATION_FAILED,
80 "error_type" => error_type::READER_FAILED,
81 "stage" => error_stage::PROCESSING,
82 )
83 .increment(1);
84 }
85}
86
87#[derive(Debug, NamedInternalEvent)]
88pub(crate) struct KubernetesLogsEventNamespaceAnnotationError<'a> {
89 pub event: &'a Event,
90}
91
92impl InternalEvent for KubernetesLogsEventNamespaceAnnotationError<'_> {
93 fn emit(self) {
94 error!(
95 message = "Failed to annotate event with namespace metadata.",
96 event = ?self.event,
97 error_code = ANNOTATION_FAILED,
98 error_type = error_type::READER_FAILED,
99 stage = error_stage::PROCESSING,
100 );
101 counter!(
102 "component_errors_total",
103 "error_code" => ANNOTATION_FAILED,
104 "error_type" => error_type::READER_FAILED,
105 "stage" => error_stage::PROCESSING,
106 )
107 .increment(1);
108 counter!("k8s_event_namespace_annotation_failures_total").increment(1);
109 }
110}
111
112#[derive(Debug, NamedInternalEvent)]
113pub(crate) struct KubernetesLogsEventNodeAnnotationError<'a> {
114 pub event: &'a Event,
115}
116
117impl InternalEvent for KubernetesLogsEventNodeAnnotationError<'_> {
118 fn emit(self) {
119 error!(
120 message = "Failed to annotate event with node metadata.",
121 event = ?self.event,
122 error_code = ANNOTATION_FAILED,
123 error_type = error_type::READER_FAILED,
124 stage = error_stage::PROCESSING,
125 );
126 counter!(
127 "component_errors_total",
128 "error_code" => ANNOTATION_FAILED,
129 "error_type" => error_type::READER_FAILED,
130 "stage" => error_stage::PROCESSING,
131 )
132 .increment(1);
133 counter!("k8s_event_node_annotation_failures_total").increment(1);
134 }
135}
136
137#[derive(Debug, NamedInternalEvent)]
138pub struct KubernetesLogsFormatPickerEdgeCase {
139 pub what: &'static str,
140}
141
142impl InternalEvent for KubernetesLogsFormatPickerEdgeCase {
143 fn emit(self) {
144 warn!(
145 message = "Encountered format picker edge case.",
146 what = %self.what,
147 );
148 counter!("k8s_format_picker_edge_cases_total").increment(1);
149 }
150}
151
152#[derive(Debug, NamedInternalEvent)]
153pub struct KubernetesLogsDockerFormatParseError<'a> {
154 pub error: &'a dyn std::error::Error,
155}
156
157impl InternalEvent for KubernetesLogsDockerFormatParseError<'_> {
158 fn emit(self) {
159 error!(
160 message = "Failed to parse log line in docker format.",
161 error = %self.error,
162 error_type = error_type::PARSER_FAILED,
163 stage = error_stage::PROCESSING,
164 );
165 counter!(
166 "component_errors_total",
167 "error_type" => error_type::PARSER_FAILED,
168 "stage" => error_stage::PROCESSING,
169 )
170 .increment(1);
171 counter!("k8s_docker_format_parse_failures_total").increment(1);
172 }
173}
174
175const KUBERNETES_LIFECYCLE: &str = "kubernetes_lifecycle";
176
177#[derive(Debug, NamedInternalEvent)]
178pub struct KubernetesLifecycleError<E> {
179 pub message: &'static str,
180 pub error: E,
181 pub count: usize,
182}
183
184impl<E: std::fmt::Display> InternalEvent for KubernetesLifecycleError<E> {
185 fn emit(self) {
186 error!(
187 message = self.message,
188 error = %self.error,
189 error_code = KUBERNETES_LIFECYCLE,
190 error_type = error_type::READER_FAILED,
191 stage = error_stage::PROCESSING,
192 );
193 counter!(
194 "component_errors_total",
195 "error_code" => KUBERNETES_LIFECYCLE,
196 "error_type" => error_type::READER_FAILED,
197 "stage" => error_stage::PROCESSING,
198 )
199 .increment(1);
200 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
201 count: self.count,
202 reason: self.message,
203 });
204 }
205}
206
207#[derive(Debug, NamedInternalEvent)]
208pub struct KubernetesMergedLineTooBigError<'a> {
209 pub event: &'a Value,
210 pub configured_limit: usize,
211 pub encountered_size_so_far: usize,
212}
213
214impl InternalEvent for KubernetesMergedLineTooBigError<'_> {
215 fn emit(self) {
216 error!(
217 message = "Found line that exceeds max_merged_line_bytes; discarding.",
218 event = ?self.event,
219 configured_limit = self.configured_limit,
220 encountered_size_so_far = self.encountered_size_so_far,
221 error_type = error_type::CONDITION_FAILED,
222 stage = error_stage::RECEIVING,
223 );
224 counter!(
225 "component_errors_total",
226 "error_code" => "reading_line_from_kubernetes_log",
227 "error_type" => error_type::CONDITION_FAILED,
228 "stage" => error_stage::RECEIVING,
229 )
230 .increment(1);
231 emit!(ComponentEventsDropped::<INTENTIONAL> {
232 count: 1,
233 reason: "Found line that exceeds max_merged_line_bytes; discarding.",
234 });
235 }
236}