vector/internal_events/
file.rs

1use metrics::{counter, gauge};
2use std::borrow::Cow;
3use vector_lib::{
4    configurable::configurable_component,
5    internal_event::{ComponentEventsDropped, InternalEvent, UNINTENTIONAL},
6};
7
8#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
9pub use self::source::*;
10
11use vector_lib::internal_event::{error_stage, error_type};
12
13/// Configuration of internal metrics for file-based components.
14#[configurable_component]
15#[derive(Clone, Debug, PartialEq, Eq, Default)]
16#[serde(deny_unknown_fields)]
17pub struct FileInternalMetricsConfig {
18    /// Whether or not to include the "file" tag on the component's corresponding internal metrics.
19    ///
20    /// This is useful for distinguishing between different files while monitoring. However, the tag's
21    /// cardinality is unbounded.
22    #[serde(default = "crate::serde::default_false")]
23    pub include_file_tag: bool,
24}
25
26#[derive(Debug)]
27pub struct FileOpen {
28    pub count: usize,
29}
30
31impl InternalEvent for FileOpen {
32    fn emit(self) {
33        gauge!("open_files").set(self.count as f64);
34    }
35}
36
37#[derive(Debug)]
38pub struct FileBytesSent<'a> {
39    pub byte_size: usize,
40    pub file: Cow<'a, str>,
41    pub include_file_metric_tag: bool,
42}
43
44impl InternalEvent for FileBytesSent<'_> {
45    fn emit(self) {
46        trace!(
47            message = "Bytes sent.",
48            byte_size = %self.byte_size,
49            protocol = "file",
50            file = %self.file,
51        );
52        if self.include_file_metric_tag {
53            counter!(
54                "component_sent_bytes_total",
55                "protocol" => "file",
56                "file" => self.file.clone().into_owned(),
57            )
58        } else {
59            counter!(
60                "component_sent_bytes_total",
61                "protocol" => "file",
62            )
63        }
64        .increment(self.byte_size as u64);
65    }
66}
67
68#[derive(Debug)]
69pub struct FileIoError<'a, P> {
70    pub error: std::io::Error,
71    pub code: &'static str,
72    pub message: &'static str,
73    pub path: &'a P,
74    pub dropped_events: usize,
75}
76
77impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
78    fn emit(self) {
79        error!(
80            message = %self.message,
81            path = ?self.path,
82            error = %self.error,
83            error_code = %self.code,
84            error_type = error_type::IO_FAILED,
85            stage = error_stage::SENDING,
86            internal_log_rate_limit = true,
87        );
88        counter!(
89            "component_errors_total",
90            "error_code" => self.code,
91            "error_type" => error_type::IO_FAILED,
92            "stage" => error_stage::SENDING,
93        )
94        .increment(1);
95
96        if self.dropped_events > 0 {
97            emit!(ComponentEventsDropped::<UNINTENTIONAL> {
98                count: self.dropped_events,
99                reason: self.message,
100            });
101        }
102    }
103}
104
105#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
106mod source {
107    use std::{io::Error, path::Path, time::Duration};
108
109    use bytes::BytesMut;
110    use metrics::counter;
111    use vector_lib::file_source::FileSourceInternalEvents;
112    use vector_lib::internal_event::{ComponentEventsDropped, INTENTIONAL};
113
114    use super::{FileOpen, InternalEvent};
115    use vector_lib::emit;
116    use vector_lib::{
117        internal_event::{error_stage, error_type},
118        json_size::JsonSize,
119    };
120
121    #[derive(Debug)]
122    pub struct FileBytesReceived<'a> {
123        pub byte_size: usize,
124        pub file: &'a str,
125        pub include_file_metric_tag: bool,
126    }
127
128    impl InternalEvent for FileBytesReceived<'_> {
129        fn emit(self) {
130            trace!(
131                message = "Bytes received.",
132                byte_size = %self.byte_size,
133                protocol = "file",
134                file = %self.file,
135            );
136            if self.include_file_metric_tag {
137                counter!(
138                    "component_received_bytes_total",
139                    "protocol" => "file",
140                    "file" => self.file.to_owned()
141                )
142            } else {
143                counter!(
144                    "component_received_bytes_total",
145                    "protocol" => "file",
146                )
147            }
148            .increment(self.byte_size as u64);
149        }
150    }
151
152    #[derive(Debug)]
153    pub struct FileEventsReceived<'a> {
154        pub count: usize,
155        pub file: &'a str,
156        pub byte_size: JsonSize,
157        pub include_file_metric_tag: bool,
158    }
159
160    impl InternalEvent for FileEventsReceived<'_> {
161        fn emit(self) {
162            trace!(
163                message = "Events received.",
164                count = %self.count,
165                byte_size = %self.byte_size,
166                file = %self.file
167            );
168            if self.include_file_metric_tag {
169                counter!(
170                    "component_received_events_total",
171                    "file" => self.file.to_owned(),
172                )
173                .increment(self.count as u64);
174                counter!(
175                    "component_received_event_bytes_total",
176                    "file" => self.file.to_owned(),
177                )
178                .increment(self.byte_size.get() as u64);
179            } else {
180                counter!("component_received_events_total").increment(self.count as u64);
181                counter!("component_received_event_bytes_total")
182                    .increment(self.byte_size.get() as u64);
183            }
184        }
185    }
186
187    #[derive(Debug)]
188    pub struct FileChecksumFailed<'a> {
189        pub file: &'a Path,
190        pub include_file_metric_tag: bool,
191    }
192
193    impl InternalEvent for FileChecksumFailed<'_> {
194        fn emit(self) {
195            warn!(
196                message = "Currently ignoring file too small to fingerprint.",
197                file = %self.file.display(),
198            );
199            if self.include_file_metric_tag {
200                counter!(
201                    "checksum_errors_total",
202                    "file" => self.file.to_string_lossy().into_owned(),
203                )
204            } else {
205                counter!("checksum_errors_total")
206            }
207            .increment(1);
208        }
209    }
210
211    #[derive(Debug)]
212    pub struct FileFingerprintReadError<'a> {
213        pub file: &'a Path,
214        pub error: Error,
215        pub include_file_metric_tag: bool,
216    }
217
218    impl InternalEvent for FileFingerprintReadError<'_> {
219        fn emit(self) {
220            error!(
221                message = "Failed reading file for fingerprinting.",
222                file = %self.file.display(),
223                error = %self.error,
224                error_code = "reading_fingerprint",
225                error_type = error_type::READER_FAILED,
226                stage = error_stage::RECEIVING,
227                internal_log_rate_limit = true,
228            );
229            if self.include_file_metric_tag {
230                counter!(
231                    "component_errors_total",
232                    "error_code" => "reading_fingerprint",
233                    "error_type" => error_type::READER_FAILED,
234                    "stage" => error_stage::RECEIVING,
235                    "file" => self.file.to_string_lossy().into_owned(),
236                )
237            } else {
238                counter!(
239                    "component_errors_total",
240                    "error_code" => "reading_fingerprint",
241                    "error_type" => error_type::READER_FAILED,
242                    "stage" => error_stage::RECEIVING,
243                )
244            }
245            .increment(1);
246        }
247    }
248
249    const DELETION_FAILED: &str = "deletion_failed";
250
251    #[derive(Debug)]
252    pub struct FileDeleteError<'a> {
253        pub file: &'a Path,
254        pub error: Error,
255        pub include_file_metric_tag: bool,
256    }
257
258    impl InternalEvent for FileDeleteError<'_> {
259        fn emit(self) {
260            error!(
261                message = "Failed in deleting file.",
262                file = %self.file.display(),
263                error = %self.error,
264                error_code = DELETION_FAILED,
265                error_type = error_type::COMMAND_FAILED,
266                stage = error_stage::RECEIVING,
267                internal_log_rate_limit = true,
268            );
269            if self.include_file_metric_tag {
270                counter!(
271                    "component_errors_total",
272                    "file" => self.file.to_string_lossy().into_owned(),
273                    "error_code" => DELETION_FAILED,
274                    "error_type" => error_type::COMMAND_FAILED,
275                    "stage" => error_stage::RECEIVING,
276                )
277            } else {
278                counter!(
279                    "component_errors_total",
280                    "error_code" => DELETION_FAILED,
281                    "error_type" => error_type::COMMAND_FAILED,
282                    "stage" => error_stage::RECEIVING,
283                )
284            }
285            .increment(1);
286        }
287    }
288
289    #[derive(Debug)]
290    pub struct FileDeleted<'a> {
291        pub file: &'a Path,
292        pub include_file_metric_tag: bool,
293    }
294
295    impl InternalEvent for FileDeleted<'_> {
296        fn emit(self) {
297            info!(
298                message = "File deleted.",
299                file = %self.file.display(),
300            );
301            if self.include_file_metric_tag {
302                counter!(
303                    "files_deleted_total",
304                    "file" => self.file.to_string_lossy().into_owned(),
305                )
306            } else {
307                counter!("files_deleted_total")
308            }
309            .increment(1);
310        }
311    }
312
313    #[derive(Debug)]
314    pub struct FileUnwatched<'a> {
315        pub file: &'a Path,
316        pub include_file_metric_tag: bool,
317        pub reached_eof: bool,
318    }
319
320    impl InternalEvent for FileUnwatched<'_> {
321        fn emit(self) {
322            let reached_eof = if self.reached_eof { "true" } else { "false" };
323            info!(
324                message = "Stopped watching file.",
325                file = %self.file.display(),
326                reached_eof
327            );
328            if self.include_file_metric_tag {
329                counter!(
330                    "files_unwatched_total",
331                    "file" => self.file.to_string_lossy().into_owned(),
332                    "reached_eof" => reached_eof,
333                )
334            } else {
335                counter!(
336                    "files_unwatched_total",
337                    "reached_eof" => reached_eof,
338                )
339            }
340            .increment(1);
341        }
342    }
343
344    #[derive(Debug)]
345    struct FileWatchError<'a> {
346        pub file: &'a Path,
347        pub error: Error,
348        pub include_file_metric_tag: bool,
349    }
350
351    impl InternalEvent for FileWatchError<'_> {
352        fn emit(self) {
353            error!(
354                message = "Failed to watch file.",
355                error = %self.error,
356                error_code = "watching",
357                error_type = error_type::COMMAND_FAILED,
358                stage = error_stage::RECEIVING,
359                file = %self.file.display(),
360                internal_log_rate_limit = true,
361            );
362            if self.include_file_metric_tag {
363                counter!(
364                    "component_errors_total",
365                    "error_code" => "watching",
366                    "error_type" => error_type::COMMAND_FAILED,
367                    "stage" => error_stage::RECEIVING,
368                    "file" => self.file.to_string_lossy().into_owned(),
369                )
370            } else {
371                counter!(
372                    "component_errors_total",
373                    "error_code" => "watching",
374                    "error_type" => error_type::COMMAND_FAILED,
375                    "stage" => error_stage::RECEIVING,
376                )
377            }
378            .increment(1);
379        }
380    }
381
382    #[derive(Debug)]
383    pub struct FileResumed<'a> {
384        pub file: &'a Path,
385        pub file_position: u64,
386        pub include_file_metric_tag: bool,
387    }
388
389    impl InternalEvent for FileResumed<'_> {
390        fn emit(self) {
391            info!(
392                message = "Resuming to watch file.",
393                file = %self.file.display(),
394                file_position = %self.file_position
395            );
396            if self.include_file_metric_tag {
397                counter!(
398                    "files_resumed_total",
399                    "file" => self.file.to_string_lossy().into_owned(),
400                )
401            } else {
402                counter!("files_resumed_total")
403            }
404            .increment(1);
405        }
406    }
407
408    #[derive(Debug)]
409    pub struct FileAdded<'a> {
410        pub file: &'a Path,
411        pub include_file_metric_tag: bool,
412    }
413
414    impl InternalEvent for FileAdded<'_> {
415        fn emit(self) {
416            info!(
417                message = "Found new file to watch.",
418                file = %self.file.display(),
419            );
420            if self.include_file_metric_tag {
421                counter!(
422                    "files_added_total",
423                    "file" => self.file.to_string_lossy().into_owned(),
424                )
425            } else {
426                counter!("files_added_total")
427            }
428            .increment(1);
429        }
430    }
431
432    #[derive(Debug)]
433    pub struct FileCheckpointed {
434        pub count: usize,
435        pub duration: Duration,
436    }
437
438    impl InternalEvent for FileCheckpointed {
439        fn emit(self) {
440            debug!(
441                message = "Files checkpointed.",
442                count = %self.count,
443                duration_ms = self.duration.as_millis() as u64,
444            );
445            counter!("checkpoints_total").increment(self.count as u64);
446        }
447    }
448
449    #[derive(Debug)]
450    pub struct FileCheckpointWriteError {
451        pub error: Error,
452    }
453
454    impl InternalEvent for FileCheckpointWriteError {
455        fn emit(self) {
456            error!(
457                message = "Failed writing checkpoints.",
458                error = %self.error,
459                error_code = "writing_checkpoints",
460                error_type = error_type::WRITER_FAILED,
461                stage = error_stage::RECEIVING,
462                internal_log_rate_limit = true,
463            );
464            counter!(
465                "component_errors_total",
466                "error_code" => "writing_checkpoints",
467                "error_type" => error_type::WRITER_FAILED,
468                "stage" => error_stage::RECEIVING,
469            )
470            .increment(1);
471        }
472    }
473
474    #[derive(Debug)]
475    pub struct PathGlobbingError<'a> {
476        pub path: &'a Path,
477        pub error: &'a Error,
478    }
479
480    impl InternalEvent for PathGlobbingError<'_> {
481        fn emit(self) {
482            error!(
483                message = "Failed to glob path.",
484                error = %self.error,
485                error_code = "globbing",
486                error_type = error_type::READER_FAILED,
487                stage = error_stage::RECEIVING,
488                path = %self.path.display(),
489                internal_log_rate_limit = true,
490            );
491            counter!(
492                "component_errors_total",
493                "error_code" => "globbing",
494                "error_type" => error_type::READER_FAILED,
495                "stage" => error_stage::RECEIVING,
496            )
497            .increment(1);
498        }
499    }
500
501    #[derive(Debug)]
502    pub struct FileLineTooBigError<'a> {
503        pub truncated_bytes: &'a BytesMut,
504        pub configured_limit: usize,
505        pub encountered_size_so_far: usize,
506    }
507
508    impl InternalEvent for FileLineTooBigError<'_> {
509        fn emit(self) {
510            error!(
511                message = "Found line that exceeds max_line_bytes; discarding.",
512                truncated_bytes = ?self.truncated_bytes,
513                configured_limit = self.configured_limit,
514                encountered_size_so_far = self.encountered_size_so_far,
515                internal_log_rate_limit = true,
516                error_type = error_type::CONDITION_FAILED,
517                stage = error_stage::RECEIVING,
518            );
519            counter!(
520                "component_errors_total",
521                "error_code" => "reading_line_from_file",
522                "error_type" => error_type::CONDITION_FAILED,
523                "stage" => error_stage::RECEIVING,
524            )
525            .increment(1);
526            emit!(ComponentEventsDropped::<INTENTIONAL> {
527                count: 1,
528                reason: "Found line that exceeds max_line_bytes; discarding.",
529            });
530        }
531    }
532
533    #[derive(Clone)]
534    pub struct FileSourceInternalEventsEmitter {
535        pub include_file_metric_tag: bool,
536    }
537
538    impl FileSourceInternalEvents for FileSourceInternalEventsEmitter {
539        fn emit_file_added(&self, file: &Path) {
540            emit!(FileAdded {
541                file,
542                include_file_metric_tag: self.include_file_metric_tag
543            });
544        }
545
546        fn emit_file_resumed(&self, file: &Path, file_position: u64) {
547            emit!(FileResumed {
548                file,
549                file_position,
550                include_file_metric_tag: self.include_file_metric_tag
551            });
552        }
553
554        fn emit_file_watch_error(&self, file: &Path, error: Error) {
555            emit!(FileWatchError {
556                file,
557                error,
558                include_file_metric_tag: self.include_file_metric_tag
559            });
560        }
561
562        fn emit_file_unwatched(&self, file: &Path, reached_eof: bool) {
563            emit!(FileUnwatched {
564                file,
565                include_file_metric_tag: self.include_file_metric_tag,
566                reached_eof
567            });
568        }
569
570        fn emit_file_deleted(&self, file: &Path) {
571            emit!(FileDeleted {
572                file,
573                include_file_metric_tag: self.include_file_metric_tag
574            });
575        }
576
577        fn emit_file_delete_error(&self, file: &Path, error: Error) {
578            emit!(FileDeleteError {
579                file,
580                error,
581                include_file_metric_tag: self.include_file_metric_tag
582            });
583        }
584
585        fn emit_file_fingerprint_read_error(&self, file: &Path, error: Error) {
586            emit!(FileFingerprintReadError {
587                file,
588                error,
589                include_file_metric_tag: self.include_file_metric_tag
590            });
591        }
592
593        fn emit_file_checksum_failed(&self, file: &Path) {
594            emit!(FileChecksumFailed {
595                file,
596                include_file_metric_tag: self.include_file_metric_tag
597            });
598        }
599
600        fn emit_file_checkpointed(&self, count: usize, duration: Duration) {
601            emit!(FileCheckpointed { count, duration });
602        }
603
604        fn emit_file_checkpoint_write_error(&self, error: Error) {
605            emit!(FileCheckpointWriteError { error });
606        }
607
608        fn emit_files_open(&self, count: usize) {
609            emit!(FileOpen { count });
610        }
611
612        fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
613            emit!(PathGlobbingError { path, error });
614        }
615
616        fn emit_file_line_too_long(
617            &self,
618            truncated_bytes: &bytes::BytesMut,
619            configured_limit: usize,
620            encountered_size_so_far: usize,
621        ) {
622            emit!(FileLineTooBigError {
623                truncated_bytes,
624                configured_limit,
625                encountered_size_so_far
626            });
627        }
628    }
629}