1use metrics::{counter, gauge};
2use std::borrow::Cow;
3use vector_lib::{
4 configurable::configurable_component,
5 internal_event::{ComponentEventsDropped, InternalEvent, UNINTENTIONAL},
6};
7
8#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
9pub use self::source::*;
10
11use vector_lib::internal_event::{error_stage, error_type};
12
13#[configurable_component]
15#[derive(Clone, Debug, PartialEq, Eq, Default)]
16#[serde(deny_unknown_fields)]
17pub struct FileInternalMetricsConfig {
18 #[serde(default = "crate::serde::default_false")]
23 pub include_file_tag: bool,
24}
25
26#[derive(Debug)]
27pub struct FileOpen {
28 pub count: usize,
29}
30
31impl InternalEvent for FileOpen {
32 fn emit(self) {
33 gauge!("open_files").set(self.count as f64);
34 }
35}
36
37#[derive(Debug)]
38pub struct FileBytesSent<'a> {
39 pub byte_size: usize,
40 pub file: Cow<'a, str>,
41 pub include_file_metric_tag: bool,
42}
43
44impl InternalEvent for FileBytesSent<'_> {
45 fn emit(self) {
46 trace!(
47 message = "Bytes sent.",
48 byte_size = %self.byte_size,
49 protocol = "file",
50 file = %self.file,
51 );
52 if self.include_file_metric_tag {
53 counter!(
54 "component_sent_bytes_total",
55 "protocol" => "file",
56 "file" => self.file.clone().into_owned(),
57 )
58 } else {
59 counter!(
60 "component_sent_bytes_total",
61 "protocol" => "file",
62 )
63 }
64 .increment(self.byte_size as u64);
65 }
66}
67
68#[derive(Debug)]
69pub struct FileIoError<'a, P> {
70 pub error: std::io::Error,
71 pub code: &'static str,
72 pub message: &'static str,
73 pub path: &'a P,
74 pub dropped_events: usize,
75}
76
77impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
78 fn emit(self) {
79 error!(
80 message = %self.message,
81 path = ?self.path,
82 error = %self.error,
83 error_code = %self.code,
84 error_type = error_type::IO_FAILED,
85 stage = error_stage::SENDING,
86 internal_log_rate_limit = true,
87 );
88 counter!(
89 "component_errors_total",
90 "error_code" => self.code,
91 "error_type" => error_type::IO_FAILED,
92 "stage" => error_stage::SENDING,
93 )
94 .increment(1);
95
96 if self.dropped_events > 0 {
97 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
98 count: self.dropped_events,
99 reason: self.message,
100 });
101 }
102 }
103}
104
105#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
106mod source {
107 use std::{io::Error, path::Path, time::Duration};
108
109 use bytes::BytesMut;
110 use metrics::counter;
111 use vector_lib::file_source::FileSourceInternalEvents;
112 use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL};
113
114 use super::{FileOpen, InternalEvent};
115 use vector_lib::emit;
116 use vector_lib::{
117 internal_event::{error_stage, error_type},
118 json_size::JsonSize,
119 };
120
121 #[derive(Debug)]
122 pub struct FileBytesReceived<'a> {
123 pub byte_size: usize,
124 pub file: &'a str,
125 pub include_file_metric_tag: bool,
126 }
127
128 impl InternalEvent for FileBytesReceived<'_> {
129 fn emit(self) {
130 trace!(
131 message = "Bytes received.",
132 byte_size = %self.byte_size,
133 protocol = "file",
134 file = %self.file,
135 );
136 if self.include_file_metric_tag {
137 counter!(
138 "component_received_bytes_total",
139 "protocol" => "file",
140 "file" => self.file.to_owned()
141 )
142 } else {
143 counter!(
144 "component_received_bytes_total",
145 "protocol" => "file",
146 )
147 }
148 .increment(self.byte_size as u64);
149 }
150 }
151
152 #[derive(Debug)]
153 pub struct FileEventsReceived<'a> {
154 pub count: usize,
155 pub file: &'a str,
156 pub byte_size: JsonSize,
157 pub include_file_metric_tag: bool,
158 }
159
160 impl InternalEvent for FileEventsReceived<'_> {
161 fn emit(self) {
162 trace!(
163 message = "Events received.",
164 count = %self.count,
165 byte_size = %self.byte_size,
166 file = %self.file
167 );
168 if self.include_file_metric_tag {
169 counter!(
170 "component_received_events_total",
171 "file" => self.file.to_owned(),
172 )
173 .increment(self.count as u64);
174 counter!(
175 "component_received_event_bytes_total",
176 "file" => self.file.to_owned(),
177 )
178 .increment(self.byte_size.get() as u64);
179 } else {
180 counter!("component_received_events_total").increment(self.count as u64);
181 counter!("component_received_event_bytes_total")
182 .increment(self.byte_size.get() as u64);
183 }
184 }
185 }
186
187 #[derive(Debug)]
188 pub struct FileChecksumFailed<'a> {
189 pub file: &'a Path,
190 pub include_file_metric_tag: bool,
191 }
192
193 impl InternalEvent for FileChecksumFailed<'_> {
194 fn emit(self) {
195 warn!(
196 message = "Currently ignoring file too small to fingerprint.",
197 file = %self.file.display(),
198 );
199 if self.include_file_metric_tag {
200 counter!(
201 "checksum_errors_total",
202 "file" => self.file.to_string_lossy().into_owned(),
203 )
204 } else {
205 counter!("checksum_errors_total")
206 }
207 .increment(1);
208 }
209 }
210
211 #[derive(Debug)]
212 pub struct FileFingerprintReadError<'a> {
213 pub file: &'a Path,
214 pub error: Error,
215 pub include_file_metric_tag: bool,
216 }
217
218 impl InternalEvent for FileFingerprintReadError<'_> {
219 fn emit(self) {
220 error!(
221 message = "Failed reading file for fingerprinting.",
222 file = %self.file.display(),
223 error = %self.error,
224 error_code = "reading_fingerprint",
225 error_type = error_type::READER_FAILED,
226 stage = error_stage::RECEIVING,
227 internal_log_rate_limit = true,
228 );
229 if self.include_file_metric_tag {
230 counter!(
231 "component_errors_total",
232 "error_code" => "reading_fingerprint",
233 "error_type" => error_type::READER_FAILED,
234 "stage" => error_stage::RECEIVING,
235 "file" => self.file.to_string_lossy().into_owned(),
236 )
237 } else {
238 counter!(
239 "component_errors_total",
240 "error_code" => "reading_fingerprint",
241 "error_type" => error_type::READER_FAILED,
242 "stage" => error_stage::RECEIVING,
243 )
244 }
245 .increment(1);
246 }
247 }
248
249 const DELETION_FAILED: &str = "deletion_failed";
250
251 #[derive(Debug)]
252 pub struct FileDeleteError<'a> {
253 pub file: &'a Path,
254 pub error: Error,
255 pub include_file_metric_tag: bool,
256 }
257
258 impl InternalEvent for FileDeleteError<'_> {
259 fn emit(self) {
260 error!(
261 message = "Failed in deleting file.",
262 file = %self.file.display(),
263 error = %self.error,
264 error_code = DELETION_FAILED,
265 error_type = error_type::COMMAND_FAILED,
266 stage = error_stage::RECEIVING,
267 internal_log_rate_limit = true,
268 );
269 if self.include_file_metric_tag {
270 counter!(
271 "component_errors_total",
272 "file" => self.file.to_string_lossy().into_owned(),
273 "error_code" => DELETION_FAILED,
274 "error_type" => error_type::COMMAND_FAILED,
275 "stage" => error_stage::RECEIVING,
276 )
277 } else {
278 counter!(
279 "component_errors_total",
280 "error_code" => DELETION_FAILED,
281 "error_type" => error_type::COMMAND_FAILED,
282 "stage" => error_stage::RECEIVING,
283 )
284 }
285 .increment(1);
286 }
287 }
288
289 #[derive(Debug)]
290 pub struct FileDeleted<'a> {
291 pub file: &'a Path,
292 pub include_file_metric_tag: bool,
293 }
294
295 impl InternalEvent for FileDeleted<'_> {
296 fn emit(self) {
297 info!(
298 message = "File deleted.",
299 file = %self.file.display(),
300 );
301 if self.include_file_metric_tag {
302 counter!(
303 "files_deleted_total",
304 "file" => self.file.to_string_lossy().into_owned(),
305 )
306 } else {
307 counter!("files_deleted_total")
308 }
309 .increment(1);
310 }
311 }
312
313 #[derive(Debug)]
314 pub struct FileUnwatched<'a> {
315 pub file: &'a Path,
316 pub include_file_metric_tag: bool,
317 pub reached_eof: bool,
318 }
319
320 impl InternalEvent for FileUnwatched<'_> {
321 fn emit(self) {
322 let reached_eof = if self.reached_eof { "true" } else { "false" };
323 info!(
324 message = "Stopped watching file.",
325 file = %self.file.display(),
326 reached_eof
327 );
328 if self.include_file_metric_tag {
329 counter!(
330 "files_unwatched_total",
331 "file" => self.file.to_string_lossy().into_owned(),
332 "reached_eof" => reached_eof,
333 )
334 } else {
335 counter!(
336 "files_unwatched_total",
337 "reached_eof" => reached_eof,
338 )
339 }
340 .increment(1);
341 }
342 }
343
344 #[derive(Debug)]
345 struct FileWatchError<'a> {
346 pub file: &'a Path,
347 pub error: Error,
348 pub include_file_metric_tag: bool,
349 }
350
351 impl InternalEvent for FileWatchError<'_> {
352 fn emit(self) {
353 error!(
354 message = "Failed to watch file.",
355 error = %self.error,
356 error_code = "watching",
357 error_type = error_type::COMMAND_FAILED,
358 stage = error_stage::RECEIVING,
359 file = %self.file.display(),
360 internal_log_rate_limit = true,
361 );
362 if self.include_file_metric_tag {
363 counter!(
364 "component_errors_total",
365 "error_code" => "watching",
366 "error_type" => error_type::COMMAND_FAILED,
367 "stage" => error_stage::RECEIVING,
368 "file" => self.file.to_string_lossy().into_owned(),
369 )
370 } else {
371 counter!(
372 "component_errors_total",
373 "error_code" => "watching",
374 "error_type" => error_type::COMMAND_FAILED,
375 "stage" => error_stage::RECEIVING,
376 )
377 }
378 .increment(1);
379 }
380 }
381
382 #[derive(Debug)]
383 pub struct FileResumed<'a> {
384 pub file: &'a Path,
385 pub file_position: u64,
386 pub include_file_metric_tag: bool,
387 }
388
389 impl InternalEvent for FileResumed<'_> {
390 fn emit(self) {
391 info!(
392 message = "Resuming to watch file.",
393 file = %self.file.display(),
394 file_position = %self.file_position
395 );
396 if self.include_file_metric_tag {
397 counter!(
398 "files_resumed_total",
399 "file" => self.file.to_string_lossy().into_owned(),
400 )
401 } else {
402 counter!("files_resumed_total")
403 }
404 .increment(1);
405 }
406 }
407
408 #[derive(Debug)]
409 pub struct FileAdded<'a> {
410 pub file: &'a Path,
411 pub include_file_metric_tag: bool,
412 }
413
414 impl InternalEvent for FileAdded<'_> {
415 fn emit(self) {
416 info!(
417 message = "Found new file to watch.",
418 file = %self.file.display(),
419 );
420 if self.include_file_metric_tag {
421 counter!(
422 "files_added_total",
423 "file" => self.file.to_string_lossy().into_owned(),
424 )
425 } else {
426 counter!("files_added_total")
427 }
428 .increment(1);
429 }
430 }
431
432 #[derive(Debug)]
433 pub struct FileCheckpointed {
434 pub count: usize,
435 pub duration: Duration,
436 }
437
438 impl InternalEvent for FileCheckpointed {
439 fn emit(self) {
440 debug!(
441 message = "Files checkpointed.",
442 count = %self.count,
443 duration_ms = self.duration.as_millis() as u64,
444 );
445 counter!("checkpoints_total").increment(self.count as u64);
446 }
447 }
448
449 #[derive(Debug)]
450 pub struct FileCheckpointWriteError {
451 pub error: Error,
452 }
453
454 impl InternalEvent for FileCheckpointWriteError {
455 fn emit(self) {
456 error!(
457 message = "Failed writing checkpoints.",
458 error = %self.error,
459 error_code = "writing_checkpoints",
460 error_type = error_type::WRITER_FAILED,
461 stage = error_stage::RECEIVING,
462 internal_log_rate_limit = true,
463 );
464 counter!(
465 "component_errors_total",
466 "error_code" => "writing_checkpoints",
467 "error_type" => error_type::WRITER_FAILED,
468 "stage" => error_stage::RECEIVING,
469 )
470 .increment(1);
471 }
472 }
473
474 #[derive(Debug)]
475 pub struct PathGlobbingError<'a> {
476 pub path: &'a Path,
477 pub error: &'a Error,
478 }
479
480 impl InternalEvent for PathGlobbingError<'_> {
481 fn emit(self) {
482 error!(
483 message = "Failed to glob path.",
484 error = %self.error,
485 error_code = "globbing",
486 error_type = error_type::READER_FAILED,
487 stage = error_stage::RECEIVING,
488 path = %self.path.display(),
489 internal_log_rate_limit = true,
490 );
491 counter!(
492 "component_errors_total",
493 "error_code" => "globbing",
494 "error_type" => error_type::READER_FAILED,
495 "stage" => error_stage::RECEIVING,
496 )
497 .increment(1);
498 }
499 }
500
501 #[derive(Debug)]
502 pub struct FileLineTooBigError<'a> {
503 pub truncated_bytes: &'a BytesMut,
504 pub configured_limit: usize,
505 pub encountered_size_so_far: usize,
506 }
507
508 impl InternalEvent for FileLineTooBigError<'_> {
509 fn emit(self) {
510 error!(
511 message = "Found line that exceeds max_line_bytes; discarding.",
512 truncated_bytes = ?self.truncated_bytes,
513 configured_limit = self.configured_limit,
514 encountered_size_so_far = self.encountered_size_so_far,
515 internal_log_rate_limit = true,
516 error_type = error_type::CONDITION_FAILED,
517 stage = error_stage::RECEIVING,
518 );
519 counter!(
520 "component_errors_total",
521 "error_code" => "reading_line_from_file",
522 "error_type" => error_type::CONDITION_FAILED,
523 "stage" => error_stage::RECEIVING,
524 )
525 .increment(1);
526 emit!(ComponentEventsDropped::<INTENTIONAL> {
527 count: 1,
528 reason: "Found line that exceeds max_line_bytes; discarding.",
529 });
530 }
531 }
532
533 #[derive(Clone)]
534 pub struct FileSourceInternalEventsEmitter {
535 pub include_file_metric_tag: bool,
536 }
537
538 impl FileSourceInternalEvents for FileSourceInternalEventsEmitter {
539 fn emit_file_added(&self, file: &Path) {
540 emit!(FileAdded {
541 file,
542 include_file_metric_tag: self.include_file_metric_tag
543 });
544 }
545
546 fn emit_file_resumed(&self, file: &Path, file_position: u64) {
547 emit!(FileResumed {
548 file,
549 file_position,
550 include_file_metric_tag: self.include_file_metric_tag
551 });
552 }
553
554 fn emit_file_watch_error(&self, file: &Path, error: Error) {
555 emit!(FileWatchError {
556 file,
557 error,
558 include_file_metric_tag: self.include_file_metric_tag
559 });
560 }
561
562 fn emit_file_unwatched(&self, file: &Path, reached_eof: bool) {
563 emit!(FileUnwatched {
564 file,
565 include_file_metric_tag: self.include_file_metric_tag,
566 reached_eof
567 });
568 }
569
570 fn emit_file_deleted(&self, file: &Path) {
571 emit!(FileDeleted {
572 file,
573 include_file_metric_tag: self.include_file_metric_tag
574 });
575 }
576
577 fn emit_file_delete_error(&self, file: &Path, error: Error) {
578 emit!(FileDeleteError {
579 file,
580 error,
581 include_file_metric_tag: self.include_file_metric_tag
582 });
583 }
584
585 fn emit_file_fingerprint_read_error(&self, file: &Path, error: Error) {
586 emit!(FileFingerprintReadError {
587 file,
588 error,
589 include_file_metric_tag: self.include_file_metric_tag
590 });
591 }
592
593 fn emit_file_checksum_failed(&self, file: &Path) {
594 emit!(FileChecksumFailed {
595 file,
596 include_file_metric_tag: self.include_file_metric_tag
597 });
598 }
599
600 fn emit_file_checkpointed(&self, count: usize, duration: Duration) {
601 emit!(FileCheckpointed { count, duration });
602 }
603
604 fn emit_file_checkpoint_write_error(&self, error: Error) {
605 emit!(FileCheckpointWriteError { error });
606 }
607
608 fn emit_files_open(&self, count: usize) {
609 emit!(FileOpen { count });
610 }
611
612 fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
613 emit!(PathGlobbingError { path, error });
614 }
615
616 fn emit_file_line_too_long(
617 &self,
618 truncated_bytes: &bytes::BytesMut,
619 configured_limit: usize,
620 encountered_size_so_far: usize,
621 ) {
622 emit!(FileLineTooBigError {
623 truncated_bytes,
624 configured_limit,
625 encountered_size_so_far
626 });
627 }
628 }
629}