vector/internal_events/
file.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use std::borrow::Cow;
4
5use metrics::{counter, gauge};
6use vector_lib::{
7    NamedInternalEvent,
8    configurable::configurable_component,
9    internal_event::{
10        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
11    },
12};
13
14#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
15pub use self::source::*;
16
17/// Configuration of internal metrics for file-based components.
18#[configurable_component]
19#[derive(Clone, Debug, PartialEq, Eq, Default)]
20#[serde(deny_unknown_fields)]
21pub struct FileInternalMetricsConfig {
22    /// Whether or not to include the "file" tag on the component's corresponding internal metrics.
23    ///
24    /// This is useful for distinguishing between different files while monitoring. However, the tag's
25    /// cardinality is unbounded.
26    #[serde(default = "crate::serde::default_false")]
27    pub include_file_tag: bool,
28}
29
30#[derive(Debug, NamedInternalEvent)]
31pub struct FileOpen {
32    pub count: usize,
33}
34
35impl InternalEvent for FileOpen {
36    fn emit(self) {
37        gauge!("open_files").set(self.count as f64);
38    }
39}
40
41#[derive(Debug, NamedInternalEvent)]
42pub struct FileBytesSent<'a> {
43    pub byte_size: usize,
44    pub file: Cow<'a, str>,
45    pub include_file_metric_tag: bool,
46}
47
48impl InternalEvent for FileBytesSent<'_> {
49    fn emit(self) {
50        trace!(
51            message = "Bytes sent.",
52            byte_size = %self.byte_size,
53            protocol = "file",
54            file = %self.file,
55        );
56        if self.include_file_metric_tag {
57            counter!(
58                "component_sent_bytes_total",
59                "protocol" => "file",
60                "file" => self.file.clone().into_owned(),
61            )
62        } else {
63            counter!(
64                "component_sent_bytes_total",
65                "protocol" => "file",
66            )
67        }
68        .increment(self.byte_size as u64);
69    }
70}
71
72#[derive(Debug, NamedInternalEvent)]
73pub struct FileIoError<'a, P> {
74    pub error: std::io::Error,
75    pub code: &'static str,
76    pub message: &'static str,
77    pub path: &'a P,
78    pub dropped_events: usize,
79}
80
81impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
82    fn emit(self) {
83        error!(
84            message = %self.message,
85            path = ?self.path,
86            error = %self.error,
87            error_code = %self.code,
88            error_type = error_type::IO_FAILED,
89            stage = error_stage::SENDING,
90        );
91        counter!(
92            "component_errors_total",
93            "error_code" => self.code,
94            "error_type" => error_type::IO_FAILED,
95            "stage" => error_stage::SENDING,
96        )
97        .increment(1);
98
99        if self.dropped_events > 0 {
100            emit!(ComponentEventsDropped::<UNINTENTIONAL> {
101                count: self.dropped_events,
102                reason: self.message,
103            });
104        }
105    }
106}
107
108#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
109mod source {
110    use std::{io::Error, path::Path, time::Duration};
111
112    use bytes::BytesMut;
113    use metrics::counter;
114    use vector_lib::{
115        NamedInternalEvent, emit,
116        file_source_common::internal_events::FileSourceInternalEvents,
117        internal_event::{ComponentEventsDropped, INTENTIONAL, error_stage, error_type},
118        json_size::JsonSize,
119    };
120
121    use super::{FileOpen, InternalEvent};
122
123    #[derive(Debug, NamedInternalEvent)]
124    pub struct FileBytesReceived<'a> {
125        pub byte_size: usize,
126        pub file: &'a str,
127        pub include_file_metric_tag: bool,
128    }
129
130    impl InternalEvent for FileBytesReceived<'_> {
131        fn emit(self) {
132            trace!(
133                message = "Bytes received.",
134                byte_size = %self.byte_size,
135                protocol = "file",
136                file = %self.file,
137            );
138            if self.include_file_metric_tag {
139                counter!(
140                    "component_received_bytes_total",
141                    "protocol" => "file",
142                    "file" => self.file.to_owned()
143                )
144            } else {
145                counter!(
146                    "component_received_bytes_total",
147                    "protocol" => "file",
148                )
149            }
150            .increment(self.byte_size as u64);
151        }
152    }
153
154    #[derive(Debug, NamedInternalEvent)]
155    pub struct FileEventsReceived<'a> {
156        pub count: usize,
157        pub file: &'a str,
158        pub byte_size: JsonSize,
159        pub include_file_metric_tag: bool,
160    }
161
162    impl InternalEvent for FileEventsReceived<'_> {
163        fn emit(self) {
164            trace!(
165                message = "Events received.",
166                count = %self.count,
167                byte_size = %self.byte_size,
168                file = %self.file
169            );
170            if self.include_file_metric_tag {
171                counter!(
172                    "component_received_events_total",
173                    "file" => self.file.to_owned(),
174                )
175                .increment(self.count as u64);
176                counter!(
177                    "component_received_event_bytes_total",
178                    "file" => self.file.to_owned(),
179                )
180                .increment(self.byte_size.get() as u64);
181            } else {
182                counter!("component_received_events_total").increment(self.count as u64);
183                counter!("component_received_event_bytes_total")
184                    .increment(self.byte_size.get() as u64);
185            }
186        }
187    }
188
189    #[derive(Debug, NamedInternalEvent)]
190    pub struct FileChecksumFailed<'a> {
191        pub file: &'a Path,
192        pub include_file_metric_tag: bool,
193    }
194
195    impl InternalEvent for FileChecksumFailed<'_> {
196        fn emit(self) {
197            warn!(
198                message = "Currently ignoring file too small to fingerprint.",
199                file = %self.file.display(),
200            );
201            if self.include_file_metric_tag {
202                counter!(
203                    "checksum_errors_total",
204                    "file" => self.file.to_string_lossy().into_owned(),
205                )
206            } else {
207                counter!("checksum_errors_total")
208            }
209            .increment(1);
210        }
211    }
212
213    #[derive(Debug, NamedInternalEvent)]
214    pub struct FileFingerprintReadError<'a> {
215        pub file: &'a Path,
216        pub error: Error,
217        pub include_file_metric_tag: bool,
218    }
219
220    impl InternalEvent for FileFingerprintReadError<'_> {
221        fn emit(self) {
222            error!(
223                message = "Failed reading file for fingerprinting.",
224                file = %self.file.display(),
225                error = %self.error,
226                error_code = "reading_fingerprint",
227                error_type = error_type::READER_FAILED,
228                stage = error_stage::RECEIVING,
229            );
230            if self.include_file_metric_tag {
231                counter!(
232                    "component_errors_total",
233                    "error_code" => "reading_fingerprint",
234                    "error_type" => error_type::READER_FAILED,
235                    "stage" => error_stage::RECEIVING,
236                    "file" => self.file.to_string_lossy().into_owned(),
237                )
238            } else {
239                counter!(
240                    "component_errors_total",
241                    "error_code" => "reading_fingerprint",
242                    "error_type" => error_type::READER_FAILED,
243                    "stage" => error_stage::RECEIVING,
244                )
245            }
246            .increment(1);
247        }
248    }
249
250    const DELETION_FAILED: &str = "deletion_failed";
251
252    #[derive(Debug, NamedInternalEvent)]
253    pub struct FileDeleteError<'a> {
254        pub file: &'a Path,
255        pub error: Error,
256        pub include_file_metric_tag: bool,
257    }
258
259    impl InternalEvent for FileDeleteError<'_> {
260        fn emit(self) {
261            error!(
262                message = "Failed in deleting file.",
263                file = %self.file.display(),
264                error = %self.error,
265                error_code = DELETION_FAILED,
266                error_type = error_type::COMMAND_FAILED,
267                stage = error_stage::RECEIVING,
268            );
269            if self.include_file_metric_tag {
270                counter!(
271                    "component_errors_total",
272                    "file" => self.file.to_string_lossy().into_owned(),
273                    "error_code" => DELETION_FAILED,
274                    "error_type" => error_type::COMMAND_FAILED,
275                    "stage" => error_stage::RECEIVING,
276                )
277            } else {
278                counter!(
279                    "component_errors_total",
280                    "error_code" => DELETION_FAILED,
281                    "error_type" => error_type::COMMAND_FAILED,
282                    "stage" => error_stage::RECEIVING,
283                )
284            }
285            .increment(1);
286        }
287    }
288
289    #[derive(Debug, NamedInternalEvent)]
290    pub struct FileDeleted<'a> {
291        pub file: &'a Path,
292        pub include_file_metric_tag: bool,
293    }
294
295    impl InternalEvent for FileDeleted<'_> {
296        fn emit(self) {
297            info!(
298                message = "File deleted.",
299                file = %self.file.display(),
300            );
301            if self.include_file_metric_tag {
302                counter!(
303                    "files_deleted_total",
304                    "file" => self.file.to_string_lossy().into_owned(),
305                )
306            } else {
307                counter!("files_deleted_total")
308            }
309            .increment(1);
310        }
311    }
312
313    #[derive(Debug, NamedInternalEvent)]
314    pub struct FileUnwatched<'a> {
315        pub file: &'a Path,
316        pub include_file_metric_tag: bool,
317        pub reached_eof: bool,
318    }
319
320    impl InternalEvent for FileUnwatched<'_> {
321        fn emit(self) {
322            let reached_eof = if self.reached_eof { "true" } else { "false" };
323            info!(
324                message = "Stopped watching file.",
325                file = %self.file.display(),
326                reached_eof
327            );
328            if self.include_file_metric_tag {
329                counter!(
330                    "files_unwatched_total",
331                    "file" => self.file.to_string_lossy().into_owned(),
332                    "reached_eof" => reached_eof,
333                )
334            } else {
335                counter!(
336                    "files_unwatched_total",
337                    "reached_eof" => reached_eof,
338                )
339            }
340            .increment(1);
341        }
342    }
343
344    #[derive(Debug, NamedInternalEvent)]
345    struct FileWatchError<'a> {
346        pub file: &'a Path,
347        pub error: Error,
348        pub include_file_metric_tag: bool,
349    }
350
351    impl InternalEvent for FileWatchError<'_> {
352        fn emit(self) {
353            error!(
354                message = "Failed to watch file.",
355                error = %self.error,
356                error_code = "watching",
357                error_type = error_type::COMMAND_FAILED,
358                stage = error_stage::RECEIVING,
359                file = %self.file.display(),
360            );
361            if self.include_file_metric_tag {
362                counter!(
363                    "component_errors_total",
364                    "error_code" => "watching",
365                    "error_type" => error_type::COMMAND_FAILED,
366                    "stage" => error_stage::RECEIVING,
367                    "file" => self.file.to_string_lossy().into_owned(),
368                )
369            } else {
370                counter!(
371                    "component_errors_total",
372                    "error_code" => "watching",
373                    "error_type" => error_type::COMMAND_FAILED,
374                    "stage" => error_stage::RECEIVING,
375                )
376            }
377            .increment(1);
378        }
379    }
380
381    #[derive(Debug, NamedInternalEvent)]
382    pub struct FileResumed<'a> {
383        pub file: &'a Path,
384        pub file_position: u64,
385        pub include_file_metric_tag: bool,
386    }
387
388    impl InternalEvent for FileResumed<'_> {
389        fn emit(self) {
390            info!(
391                message = "Resuming to watch file.",
392                file = %self.file.display(),
393                file_position = %self.file_position
394            );
395            if self.include_file_metric_tag {
396                counter!(
397                    "files_resumed_total",
398                    "file" => self.file.to_string_lossy().into_owned(),
399                )
400            } else {
401                counter!("files_resumed_total")
402            }
403            .increment(1);
404        }
405    }
406
407    #[derive(Debug, NamedInternalEvent)]
408    pub struct FileAdded<'a> {
409        pub file: &'a Path,
410        pub include_file_metric_tag: bool,
411    }
412
413    impl InternalEvent for FileAdded<'_> {
414        fn emit(self) {
415            info!(
416                message = "Found new file to watch.",
417                file = %self.file.display(),
418            );
419            if self.include_file_metric_tag {
420                counter!(
421                    "files_added_total",
422                    "file" => self.file.to_string_lossy().into_owned(),
423                )
424            } else {
425                counter!("files_added_total")
426            }
427            .increment(1);
428        }
429    }
430
431    #[derive(Debug, NamedInternalEvent)]
432    pub struct FileCheckpointed {
433        pub count: usize,
434        pub duration: Duration,
435    }
436
437    impl InternalEvent for FileCheckpointed {
438        fn emit(self) {
439            debug!(
440                message = "Files checkpointed.",
441                count = %self.count,
442                duration_ms = self.duration.as_millis() as u64,
443            );
444            counter!("checkpoints_total").increment(self.count as u64);
445        }
446    }
447
448    #[derive(Debug, NamedInternalEvent)]
449    pub struct FileCheckpointWriteError {
450        pub error: Error,
451    }
452
453    impl InternalEvent for FileCheckpointWriteError {
454        fn emit(self) {
455            error!(
456                message = "Failed writing checkpoints.",
457                error = %self.error,
458                error_code = "writing_checkpoints",
459                error_type = error_type::WRITER_FAILED,
460                stage = error_stage::RECEIVING,
461            );
462            counter!(
463                "component_errors_total",
464                "error_code" => "writing_checkpoints",
465                "error_type" => error_type::WRITER_FAILED,
466                "stage" => error_stage::RECEIVING,
467            )
468            .increment(1);
469        }
470    }
471
472    #[derive(Debug, NamedInternalEvent)]
473    pub struct PathGlobbingError<'a> {
474        pub path: &'a Path,
475        pub error: &'a Error,
476    }
477
478    impl InternalEvent for PathGlobbingError<'_> {
479        fn emit(self) {
480            error!(
481                message = "Failed to glob path.",
482                error = %self.error,
483                error_code = "globbing",
484                error_type = error_type::READER_FAILED,
485                stage = error_stage::RECEIVING,
486                path = %self.path.display(),
487            );
488            counter!(
489                "component_errors_total",
490                "error_code" => "globbing",
491                "error_type" => error_type::READER_FAILED,
492                "stage" => error_stage::RECEIVING,
493            )
494            .increment(1);
495        }
496    }
497
498    #[derive(Debug, NamedInternalEvent)]
499    pub struct FileLineTooBigError<'a> {
500        pub truncated_bytes: &'a BytesMut,
501        pub configured_limit: usize,
502        pub encountered_size_so_far: usize,
503    }
504
505    impl InternalEvent for FileLineTooBigError<'_> {
506        fn emit(self) {
507            error!(
508                message = "Found line that exceeds max_line_bytes; discarding.",
509                truncated_bytes = ?self.truncated_bytes,
510                configured_limit = self.configured_limit,
511                encountered_size_so_far = self.encountered_size_so_far,
512                error_type = error_type::CONDITION_FAILED,
513                stage = error_stage::RECEIVING,
514            );
515            counter!(
516                "component_errors_total",
517                "error_code" => "reading_line_from_file",
518                "error_type" => error_type::CONDITION_FAILED,
519                "stage" => error_stage::RECEIVING,
520            )
521            .increment(1);
522            emit!(ComponentEventsDropped::<INTENTIONAL> {
523                count: 1,
524                reason: "Found line that exceeds max_line_bytes; discarding.",
525            });
526        }
527    }
528
529    #[derive(Clone)]
530    pub struct FileSourceInternalEventsEmitter {
531        pub include_file_metric_tag: bool,
532    }
533
534    impl FileSourceInternalEvents for FileSourceInternalEventsEmitter {
535        fn emit_file_added(&self, file: &Path) {
536            emit!(FileAdded {
537                file,
538                include_file_metric_tag: self.include_file_metric_tag
539            });
540        }
541
542        fn emit_file_resumed(&self, file: &Path, file_position: u64) {
543            emit!(FileResumed {
544                file,
545                file_position,
546                include_file_metric_tag: self.include_file_metric_tag
547            });
548        }
549
550        fn emit_file_watch_error(&self, file: &Path, error: Error) {
551            emit!(FileWatchError {
552                file,
553                error,
554                include_file_metric_tag: self.include_file_metric_tag
555            });
556        }
557
558        fn emit_file_unwatched(&self, file: &Path, reached_eof: bool) {
559            emit!(FileUnwatched {
560                file,
561                include_file_metric_tag: self.include_file_metric_tag,
562                reached_eof
563            });
564        }
565
566        fn emit_file_deleted(&self, file: &Path) {
567            emit!(FileDeleted {
568                file,
569                include_file_metric_tag: self.include_file_metric_tag
570            });
571        }
572
573        fn emit_file_delete_error(&self, file: &Path, error: Error) {
574            emit!(FileDeleteError {
575                file,
576                error,
577                include_file_metric_tag: self.include_file_metric_tag
578            });
579        }
580
581        fn emit_file_fingerprint_read_error(&self, file: &Path, error: Error) {
582            emit!(FileFingerprintReadError {
583                file,
584                error,
585                include_file_metric_tag: self.include_file_metric_tag
586            });
587        }
588
589        fn emit_file_checksum_failed(&self, file: &Path) {
590            emit!(FileChecksumFailed {
591                file,
592                include_file_metric_tag: self.include_file_metric_tag
593            });
594        }
595
596        fn emit_file_checkpointed(&self, count: usize, duration: Duration) {
597            emit!(FileCheckpointed { count, duration });
598        }
599
600        fn emit_file_checkpoint_write_error(&self, error: Error) {
601            emit!(FileCheckpointWriteError { error });
602        }
603
604        fn emit_files_open(&self, count: usize) {
605            emit!(FileOpen { count });
606        }
607
608        fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
609            emit!(PathGlobbingError { path, error });
610        }
611
612        fn emit_file_line_too_long(
613            &self,
614            truncated_bytes: &bytes::BytesMut,
615            configured_limit: usize,
616            encountered_size_so_far: usize,
617        ) {
618            emit!(FileLineTooBigError {
619                truncated_bytes,
620                configured_limit,
621                encountered_size_so_far
622            });
623        }
624    }
625}