vector/internal_events/
file.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use std::borrow::Cow;
4
5use vector_lib::{
6    NamedInternalEvent,
7    configurable::configurable_component,
8    counter, gauge,
9    internal_event::{
10        ComponentEventsDropped, CounterName, GaugeName, InternalEvent, UNINTENTIONAL, error_stage,
11        error_type,
12    },
13};
14
15#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
16pub use self::source::*;
17
18/// Configuration of internal metrics for file-based components.
19#[configurable_component]
20#[derive(Clone, Debug, PartialEq, Eq, Default)]
21#[serde(deny_unknown_fields)]
22pub struct FileInternalMetricsConfig {
23    /// Whether or not to include the "file" tag on the component's corresponding internal metrics.
24    ///
25    /// This is useful for distinguishing between different files while monitoring. However, the tag's
26    /// cardinality is unbounded.
27    #[serde(default = "crate::serde::default_false")]
28    pub include_file_tag: bool,
29}
30
31#[derive(Debug, NamedInternalEvent)]
32pub struct FileOpen {
33    pub count: usize,
34}
35
36impl InternalEvent for FileOpen {
37    fn emit(self) {
38        gauge!(GaugeName::OpenFiles).set(self.count as f64);
39    }
40}
41
42#[derive(Debug, NamedInternalEvent)]
43pub struct FileBytesSent<'a> {
44    pub byte_size: usize,
45    pub file: Cow<'a, str>,
46    pub include_file_metric_tag: bool,
47}
48
49impl InternalEvent for FileBytesSent<'_> {
50    fn emit(self) {
51        trace!(
52            message = "Bytes sent.",
53            byte_size = %self.byte_size,
54            protocol = "file",
55            file = %self.file,
56        );
57        if self.include_file_metric_tag {
58            counter!(
59                CounterName::ComponentSentBytesTotal,
60                "protocol" => "file",
61                "file" => self.file.clone().into_owned(),
62            )
63        } else {
64            counter!(
65                CounterName::ComponentSentBytesTotal,
66                "protocol" => "file",
67            )
68        }
69        .increment(self.byte_size as u64);
70    }
71}
72
73#[derive(Debug, NamedInternalEvent)]
74pub struct FileIoError<'a, P> {
75    pub error: std::io::Error,
76    pub code: &'static str,
77    pub message: &'static str,
78    pub path: &'a P,
79    pub dropped_events: usize,
80}
81
82impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
83    fn emit(self) {
84        error!(
85            message = %self.message,
86            path = ?self.path,
87            error = %self.error,
88            error_code = %self.code,
89            error_type = error_type::IO_FAILED,
90            stage = error_stage::SENDING,
91        );
92        counter!(
93            CounterName::ComponentErrorsTotal,
94            "error_code" => self.code,
95            "error_type" => error_type::IO_FAILED,
96            "stage" => error_stage::SENDING,
97        )
98        .increment(1);
99
100        if self.dropped_events > 0 {
101            emit!(ComponentEventsDropped::<UNINTENTIONAL> {
102                count: self.dropped_events,
103                reason: self.message,
104            });
105        }
106    }
107}
108
109#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
110mod source {
111    use std::{io::Error, path::Path, time::Duration};
112
113    use bytes::BytesMut;
114    use vector_lib::{
115        NamedInternalEvent, counter, emit,
116        file_source_common::internal_events::FileSourceInternalEvents,
117        internal_event::{
118            ComponentEventsDropped, CounterName, INTENTIONAL, error_stage, error_type,
119        },
120        json_size::JsonSize,
121    };
122
123    use super::{FileOpen, InternalEvent};
124
125    #[derive(Debug, NamedInternalEvent)]
126    pub struct FileBytesReceived<'a> {
127        pub byte_size: usize,
128        pub file: &'a str,
129        pub include_file_metric_tag: bool,
130    }
131
132    impl InternalEvent for FileBytesReceived<'_> {
133        fn emit(self) {
134            trace!(
135                message = "Bytes received.",
136                byte_size = %self.byte_size,
137                protocol = "file",
138                file = %self.file,
139            );
140            if self.include_file_metric_tag {
141                counter!(
142                    CounterName::ComponentReceivedBytesTotal,
143                    "protocol" => "file",
144                    "file" => self.file.to_owned()
145                )
146            } else {
147                counter!(
148                    CounterName::ComponentReceivedBytesTotal,
149                    "protocol" => "file",
150                )
151            }
152            .increment(self.byte_size as u64);
153        }
154    }
155
156    #[derive(Debug, NamedInternalEvent)]
157    pub struct FileEventsReceived<'a> {
158        pub count: usize,
159        pub file: &'a str,
160        pub byte_size: JsonSize,
161        pub include_file_metric_tag: bool,
162    }
163
164    impl InternalEvent for FileEventsReceived<'_> {
165        fn emit(self) {
166            trace!(
167                message = "Events received.",
168                count = %self.count,
169                byte_size = %self.byte_size,
170                file = %self.file
171            );
172            if self.include_file_metric_tag {
173                counter!(
174                    CounterName::ComponentReceivedEventsTotal,
175                    "file" => self.file.to_owned(),
176                )
177                .increment(self.count as u64);
178                counter!(
179                    CounterName::ComponentReceivedEventBytesTotal,
180                    "file" => self.file.to_owned(),
181                )
182                .increment(self.byte_size.get() as u64);
183            } else {
184                counter!(CounterName::ComponentReceivedEventsTotal).increment(self.count as u64);
185                counter!(CounterName::ComponentReceivedEventBytesTotal)
186                    .increment(self.byte_size.get() as u64);
187            }
188        }
189    }
190
191    #[derive(Debug, NamedInternalEvent)]
192    pub struct FileChecksumFailed<'a> {
193        pub file: &'a Path,
194        pub include_file_metric_tag: bool,
195    }
196
197    impl InternalEvent for FileChecksumFailed<'_> {
198        fn emit(self) {
199            warn!(
200                message = "Currently ignoring file too small to fingerprint.",
201                file = %self.file.display(),
202            );
203            if self.include_file_metric_tag {
204                counter!(
205                    CounterName::ChecksumErrorsTotal,
206                    "file" => self.file.to_string_lossy().into_owned(),
207                )
208            } else {
209                counter!(CounterName::ChecksumErrorsTotal)
210            }
211            .increment(1);
212        }
213    }
214
215    #[derive(Debug, NamedInternalEvent)]
216    pub struct FileFingerprintReadError<'a> {
217        pub file: &'a Path,
218        pub error: Error,
219        pub include_file_metric_tag: bool,
220    }
221
222    impl InternalEvent for FileFingerprintReadError<'_> {
223        fn emit(self) {
224            error!(
225                message = "Failed reading file for fingerprinting.",
226                file = %self.file.display(),
227                error = %self.error,
228                error_code = "reading_fingerprint",
229                error_type = error_type::READER_FAILED,
230                stage = error_stage::RECEIVING,
231            );
232            if self.include_file_metric_tag {
233                counter!(
234                    CounterName::ComponentErrorsTotal,
235                    "error_code" => "reading_fingerprint",
236                    "error_type" => error_type::READER_FAILED,
237                    "stage" => error_stage::RECEIVING,
238                    "file" => self.file.to_string_lossy().into_owned(),
239                )
240            } else {
241                counter!(
242                    CounterName::ComponentErrorsTotal,
243                    "error_code" => "reading_fingerprint",
244                    "error_type" => error_type::READER_FAILED,
245                    "stage" => error_stage::RECEIVING,
246                )
247            }
248            .increment(1);
249        }
250    }
251
252    const DELETION_FAILED: &str = "deletion_failed";
253
254    #[derive(Debug, NamedInternalEvent)]
255    pub struct FileDeleteError<'a> {
256        pub file: &'a Path,
257        pub error: Error,
258        pub include_file_metric_tag: bool,
259    }
260
261    impl InternalEvent for FileDeleteError<'_> {
262        fn emit(self) {
263            error!(
264                message = "Failed in deleting file.",
265                file = %self.file.display(),
266                error = %self.error,
267                error_code = DELETION_FAILED,
268                error_type = error_type::COMMAND_FAILED,
269                stage = error_stage::RECEIVING,
270            );
271            if self.include_file_metric_tag {
272                counter!(
273                    CounterName::ComponentErrorsTotal,
274                    "file" => self.file.to_string_lossy().into_owned(),
275                    "error_code" => DELETION_FAILED,
276                    "error_type" => error_type::COMMAND_FAILED,
277                    "stage" => error_stage::RECEIVING,
278                )
279            } else {
280                counter!(
281                    CounterName::ComponentErrorsTotal,
282                    "error_code" => DELETION_FAILED,
283                    "error_type" => error_type::COMMAND_FAILED,
284                    "stage" => error_stage::RECEIVING,
285                )
286            }
287            .increment(1);
288        }
289    }
290
291    #[derive(Debug, NamedInternalEvent)]
292    pub struct FileDeleted<'a> {
293        pub file: &'a Path,
294        pub include_file_metric_tag: bool,
295    }
296
297    impl InternalEvent for FileDeleted<'_> {
298        fn emit(self) {
299            info!(
300                message = "File deleted.",
301                file = %self.file.display(),
302            );
303            if self.include_file_metric_tag {
304                counter!(
305                    CounterName::FilesDeletedTotal,
306                    "file" => self.file.to_string_lossy().into_owned(),
307                )
308            } else {
309                counter!(CounterName::FilesDeletedTotal)
310            }
311            .increment(1);
312        }
313    }
314
315    #[derive(Debug, NamedInternalEvent)]
316    pub struct FileUnwatched<'a> {
317        pub file: &'a Path,
318        pub include_file_metric_tag: bool,
319        pub reached_eof: bool,
320    }
321
322    impl InternalEvent for FileUnwatched<'_> {
323        fn emit(self) {
324            let reached_eof = if self.reached_eof { "true" } else { "false" };
325            info!(
326                message = "Stopped watching file.",
327                file = %self.file.display(),
328                reached_eof
329            );
330            if self.include_file_metric_tag {
331                counter!(
332                    CounterName::FilesUnwatchedTotal,
333                    "file" => self.file.to_string_lossy().into_owned(),
334                    "reached_eof" => reached_eof,
335                )
336            } else {
337                counter!(
338                    CounterName::FilesUnwatchedTotal,
339                    "reached_eof" => reached_eof,
340                )
341            }
342            .increment(1);
343        }
344    }
345
346    #[derive(Debug, NamedInternalEvent)]
347    struct FileWatchError<'a> {
348        pub file: &'a Path,
349        pub error: Error,
350        pub include_file_metric_tag: bool,
351    }
352
353    impl InternalEvent for FileWatchError<'_> {
354        fn emit(self) {
355            error!(
356                message = "Failed to watch file.",
357                error = %self.error,
358                error_code = "watching",
359                error_type = error_type::COMMAND_FAILED,
360                stage = error_stage::RECEIVING,
361                file = %self.file.display(),
362            );
363            if self.include_file_metric_tag {
364                counter!(
365                    CounterName::ComponentErrorsTotal,
366                    "error_code" => "watching",
367                    "error_type" => error_type::COMMAND_FAILED,
368                    "stage" => error_stage::RECEIVING,
369                    "file" => self.file.to_string_lossy().into_owned(),
370                )
371            } else {
372                counter!(
373                    CounterName::ComponentErrorsTotal,
374                    "error_code" => "watching",
375                    "error_type" => error_type::COMMAND_FAILED,
376                    "stage" => error_stage::RECEIVING,
377                )
378            }
379            .increment(1);
380        }
381    }
382
383    #[derive(Debug, NamedInternalEvent)]
384    pub struct FileResumed<'a> {
385        pub file: &'a Path,
386        pub file_position: u64,
387        pub include_file_metric_tag: bool,
388    }
389
390    impl InternalEvent for FileResumed<'_> {
391        fn emit(self) {
392            info!(
393                message = "Resuming to watch file.",
394                file = %self.file.display(),
395                file_position = %self.file_position
396            );
397            if self.include_file_metric_tag {
398                counter!(
399                    CounterName::FilesResumedTotal,
400                    "file" => self.file.to_string_lossy().into_owned(),
401                )
402            } else {
403                counter!(CounterName::FilesResumedTotal)
404            }
405            .increment(1);
406        }
407    }
408
409    #[derive(Debug, NamedInternalEvent)]
410    pub struct FileAdded<'a> {
411        pub file: &'a Path,
412        pub include_file_metric_tag: bool,
413    }
414
415    impl InternalEvent for FileAdded<'_> {
416        fn emit(self) {
417            info!(
418                message = "Found new file to watch.",
419                file = %self.file.display(),
420            );
421            if self.include_file_metric_tag {
422                counter!(
423                    CounterName::FilesAddedTotal,
424                    "file" => self.file.to_string_lossy().into_owned(),
425                )
426            } else {
427                counter!(CounterName::FilesAddedTotal)
428            }
429            .increment(1);
430        }
431    }
432
433    #[derive(Debug, NamedInternalEvent)]
434    pub struct FileCheckpointed {
435        pub count: usize,
436        pub duration: Duration,
437    }
438
439    impl InternalEvent for FileCheckpointed {
440        fn emit(self) {
441            debug!(
442                message = "Files checkpointed.",
443                count = %self.count,
444                duration_ms = self.duration.as_millis() as u64,
445            );
446            counter!(CounterName::CheckpointsTotal).increment(self.count as u64);
447        }
448    }
449
450    #[derive(Debug, NamedInternalEvent)]
451    pub struct FileCheckpointWriteError {
452        pub error: Error,
453    }
454
455    impl InternalEvent for FileCheckpointWriteError {
456        fn emit(self) {
457            error!(
458                message = "Failed writing checkpoints.",
459                error = %self.error,
460                error_code = "writing_checkpoints",
461                error_type = error_type::WRITER_FAILED,
462                stage = error_stage::RECEIVING,
463            );
464            counter!(
465                CounterName::ComponentErrorsTotal,
466                "error_code" => "writing_checkpoints",
467                "error_type" => error_type::WRITER_FAILED,
468                "stage" => error_stage::RECEIVING,
469            )
470            .increment(1);
471        }
472    }
473
474    #[derive(Debug, NamedInternalEvent)]
475    pub struct PathGlobbingError<'a> {
476        pub path: &'a Path,
477        pub error: &'a Error,
478    }
479
480    impl InternalEvent for PathGlobbingError<'_> {
481        fn emit(self) {
482            error!(
483                message = "Failed to glob path.",
484                error = %self.error,
485                error_code = "globbing",
486                error_type = error_type::READER_FAILED,
487                stage = error_stage::RECEIVING,
488                path = %self.path.display(),
489            );
490            counter!(
491                CounterName::ComponentErrorsTotal,
492                "error_code" => "globbing",
493                "error_type" => error_type::READER_FAILED,
494                "stage" => error_stage::RECEIVING,
495            )
496            .increment(1);
497        }
498    }
499
500    #[derive(Debug, NamedInternalEvent)]
501    pub struct FileLineTooBigError<'a> {
502        pub truncated_bytes: &'a BytesMut,
503        pub configured_limit: usize,
504        pub encountered_size_so_far: usize,
505    }
506
507    impl InternalEvent for FileLineTooBigError<'_> {
508        fn emit(self) {
509            error!(
510                message = "Found line that exceeds max_line_bytes; discarding.",
511                truncated_bytes = ?self.truncated_bytes,
512                configured_limit = self.configured_limit,
513                encountered_size_so_far = self.encountered_size_so_far,
514                error_type = error_type::CONDITION_FAILED,
515                stage = error_stage::RECEIVING,
516            );
517            counter!(
518                CounterName::ComponentErrorsTotal,
519                "error_code" => "reading_line_from_file",
520                "error_type" => error_type::CONDITION_FAILED,
521                "stage" => error_stage::RECEIVING,
522            )
523            .increment(1);
524            emit!(ComponentEventsDropped::<INTENTIONAL> {
525                count: 1,
526                reason: "Found line that exceeds max_line_bytes; discarding.",
527            });
528        }
529    }
530
531    #[derive(Clone)]
532    pub struct FileSourceInternalEventsEmitter {
533        pub include_file_metric_tag: bool,
534    }
535
536    impl FileSourceInternalEvents for FileSourceInternalEventsEmitter {
537        fn emit_file_added(&self, file: &Path) {
538            emit!(FileAdded {
539                file,
540                include_file_metric_tag: self.include_file_metric_tag
541            });
542        }
543
544        fn emit_file_resumed(&self, file: &Path, file_position: u64) {
545            emit!(FileResumed {
546                file,
547                file_position,
548                include_file_metric_tag: self.include_file_metric_tag
549            });
550        }
551
552        fn emit_file_watch_error(&self, file: &Path, error: Error) {
553            emit!(FileWatchError {
554                file,
555                error,
556                include_file_metric_tag: self.include_file_metric_tag
557            });
558        }
559
560        fn emit_file_unwatched(&self, file: &Path, reached_eof: bool) {
561            emit!(FileUnwatched {
562                file,
563                include_file_metric_tag: self.include_file_metric_tag,
564                reached_eof
565            });
566        }
567
568        fn emit_file_deleted(&self, file: &Path) {
569            emit!(FileDeleted {
570                file,
571                include_file_metric_tag: self.include_file_metric_tag
572            });
573        }
574
575        fn emit_file_delete_error(&self, file: &Path, error: Error) {
576            emit!(FileDeleteError {
577                file,
578                error,
579                include_file_metric_tag: self.include_file_metric_tag
580            });
581        }
582
583        fn emit_file_fingerprint_read_error(&self, file: &Path, error: Error) {
584            emit!(FileFingerprintReadError {
585                file,
586                error,
587                include_file_metric_tag: self.include_file_metric_tag
588            });
589        }
590
591        fn emit_file_checksum_failed(&self, file: &Path) {
592            emit!(FileChecksumFailed {
593                file,
594                include_file_metric_tag: self.include_file_metric_tag
595            });
596        }
597
598        fn emit_file_checkpointed(&self, count: usize, duration: Duration) {
599            emit!(FileCheckpointed { count, duration });
600        }
601
602        fn emit_file_checkpoint_write_error(&self, error: Error) {
603            emit!(FileCheckpointWriteError { error });
604        }
605
606        fn emit_files_open(&self, count: usize) {
607            emit!(FileOpen { count });
608        }
609
610        fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
611            emit!(PathGlobbingError { path, error });
612        }
613
614        fn emit_file_line_too_long(
615            &self,
616            truncated_bytes: &bytes::BytesMut,
617            configured_limit: usize,
618            encountered_size_so_far: usize,
619        ) {
620            emit!(FileLineTooBigError {
621                truncated_bytes,
622                configured_limit,
623                encountered_size_so_far
624            });
625        }
626    }
627}