vector/internal_events/
file.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use std::borrow::Cow;
4
5use metrics::{counter, gauge};
6use vector_lib::{
7    configurable::configurable_component,
8    internal_event::{
9        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
10    },
11};
12
13#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
14pub use self::source::*;
15
16/// Configuration of internal metrics for file-based components.
17#[configurable_component]
18#[derive(Clone, Debug, PartialEq, Eq, Default)]
19#[serde(deny_unknown_fields)]
20pub struct FileInternalMetricsConfig {
21    /// Whether or not to include the "file" tag on the component's corresponding internal metrics.
22    ///
23    /// This is useful for distinguishing between different files while monitoring. However, the tag's
24    /// cardinality is unbounded.
25    #[serde(default = "crate::serde::default_false")]
26    pub include_file_tag: bool,
27}
28
29#[derive(Debug)]
30pub struct FileOpen {
31    pub count: usize,
32}
33
34impl InternalEvent for FileOpen {
35    fn emit(self) {
36        gauge!("open_files").set(self.count as f64);
37    }
38}
39
40#[derive(Debug)]
41pub struct FileBytesSent<'a> {
42    pub byte_size: usize,
43    pub file: Cow<'a, str>,
44    pub include_file_metric_tag: bool,
45}
46
47impl InternalEvent for FileBytesSent<'_> {
48    fn emit(self) {
49        trace!(
50            message = "Bytes sent.",
51            byte_size = %self.byte_size,
52            protocol = "file",
53            file = %self.file,
54        );
55        if self.include_file_metric_tag {
56            counter!(
57                "component_sent_bytes_total",
58                "protocol" => "file",
59                "file" => self.file.clone().into_owned(),
60            )
61        } else {
62            counter!(
63                "component_sent_bytes_total",
64                "protocol" => "file",
65            )
66        }
67        .increment(self.byte_size as u64);
68    }
69}
70
71#[derive(Debug)]
72pub struct FileIoError<'a, P> {
73    pub error: std::io::Error,
74    pub code: &'static str,
75    pub message: &'static str,
76    pub path: &'a P,
77    pub dropped_events: usize,
78}
79
80impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
81    fn emit(self) {
82        error!(
83            message = %self.message,
84            path = ?self.path,
85            error = %self.error,
86            error_code = %self.code,
87            error_type = error_type::IO_FAILED,
88            stage = error_stage::SENDING,
89        );
90        counter!(
91            "component_errors_total",
92            "error_code" => self.code,
93            "error_type" => error_type::IO_FAILED,
94            "stage" => error_stage::SENDING,
95        )
96        .increment(1);
97
98        if self.dropped_events > 0 {
99            emit!(ComponentEventsDropped::<UNINTENTIONAL> {
100                count: self.dropped_events,
101                reason: self.message,
102            });
103        }
104    }
105}
106
107#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
108mod source {
109    use std::{io::Error, path::Path, time::Duration};
110
111    use bytes::BytesMut;
112    use metrics::counter;
113    use vector_lib::{
114        emit,
115        file_source_common::internal_events::FileSourceInternalEvents,
116        internal_event::{ComponentEventsDropped, INTENTIONAL, error_stage, error_type},
117        json_size::JsonSize,
118    };
119
120    use super::{FileOpen, InternalEvent};
121
122    #[derive(Debug)]
123    pub struct FileBytesReceived<'a> {
124        pub byte_size: usize,
125        pub file: &'a str,
126        pub include_file_metric_tag: bool,
127    }
128
129    impl InternalEvent for FileBytesReceived<'_> {
130        fn emit(self) {
131            trace!(
132                message = "Bytes received.",
133                byte_size = %self.byte_size,
134                protocol = "file",
135                file = %self.file,
136            );
137            if self.include_file_metric_tag {
138                counter!(
139                    "component_received_bytes_total",
140                    "protocol" => "file",
141                    "file" => self.file.to_owned()
142                )
143            } else {
144                counter!(
145                    "component_received_bytes_total",
146                    "protocol" => "file",
147                )
148            }
149            .increment(self.byte_size as u64);
150        }
151    }
152
153    #[derive(Debug)]
154    pub struct FileEventsReceived<'a> {
155        pub count: usize,
156        pub file: &'a str,
157        pub byte_size: JsonSize,
158        pub include_file_metric_tag: bool,
159    }
160
161    impl InternalEvent for FileEventsReceived<'_> {
162        fn emit(self) {
163            trace!(
164                message = "Events received.",
165                count = %self.count,
166                byte_size = %self.byte_size,
167                file = %self.file
168            );
169            if self.include_file_metric_tag {
170                counter!(
171                    "component_received_events_total",
172                    "file" => self.file.to_owned(),
173                )
174                .increment(self.count as u64);
175                counter!(
176                    "component_received_event_bytes_total",
177                    "file" => self.file.to_owned(),
178                )
179                .increment(self.byte_size.get() as u64);
180            } else {
181                counter!("component_received_events_total").increment(self.count as u64);
182                counter!("component_received_event_bytes_total")
183                    .increment(self.byte_size.get() as u64);
184            }
185        }
186    }
187
188    #[derive(Debug)]
189    pub struct FileChecksumFailed<'a> {
190        pub file: &'a Path,
191        pub include_file_metric_tag: bool,
192    }
193
194    impl InternalEvent for FileChecksumFailed<'_> {
195        fn emit(self) {
196            warn!(
197                message = "Currently ignoring file too small to fingerprint.",
198                file = %self.file.display(),
199            );
200            if self.include_file_metric_tag {
201                counter!(
202                    "checksum_errors_total",
203                    "file" => self.file.to_string_lossy().into_owned(),
204                )
205            } else {
206                counter!("checksum_errors_total")
207            }
208            .increment(1);
209        }
210    }
211
212    #[derive(Debug)]
213    pub struct FileFingerprintReadError<'a> {
214        pub file: &'a Path,
215        pub error: Error,
216        pub include_file_metric_tag: bool,
217    }
218
219    impl InternalEvent for FileFingerprintReadError<'_> {
220        fn emit(self) {
221            error!(
222                message = "Failed reading file for fingerprinting.",
223                file = %self.file.display(),
224                error = %self.error,
225                error_code = "reading_fingerprint",
226                error_type = error_type::READER_FAILED,
227                stage = error_stage::RECEIVING,
228            );
229            if self.include_file_metric_tag {
230                counter!(
231                    "component_errors_total",
232                    "error_code" => "reading_fingerprint",
233                    "error_type" => error_type::READER_FAILED,
234                    "stage" => error_stage::RECEIVING,
235                    "file" => self.file.to_string_lossy().into_owned(),
236                )
237            } else {
238                counter!(
239                    "component_errors_total",
240                    "error_code" => "reading_fingerprint",
241                    "error_type" => error_type::READER_FAILED,
242                    "stage" => error_stage::RECEIVING,
243                )
244            }
245            .increment(1);
246        }
247    }
248
249    const DELETION_FAILED: &str = "deletion_failed";
250
251    #[derive(Debug)]
252    pub struct FileDeleteError<'a> {
253        pub file: &'a Path,
254        pub error: Error,
255        pub include_file_metric_tag: bool,
256    }
257
258    impl InternalEvent for FileDeleteError<'_> {
259        fn emit(self) {
260            error!(
261                message = "Failed in deleting file.",
262                file = %self.file.display(),
263                error = %self.error,
264                error_code = DELETION_FAILED,
265                error_type = error_type::COMMAND_FAILED,
266                stage = error_stage::RECEIVING,
267            );
268            if self.include_file_metric_tag {
269                counter!(
270                    "component_errors_total",
271                    "file" => self.file.to_string_lossy().into_owned(),
272                    "error_code" => DELETION_FAILED,
273                    "error_type" => error_type::COMMAND_FAILED,
274                    "stage" => error_stage::RECEIVING,
275                )
276            } else {
277                counter!(
278                    "component_errors_total",
279                    "error_code" => DELETION_FAILED,
280                    "error_type" => error_type::COMMAND_FAILED,
281                    "stage" => error_stage::RECEIVING,
282                )
283            }
284            .increment(1);
285        }
286    }
287
288    #[derive(Debug)]
289    pub struct FileDeleted<'a> {
290        pub file: &'a Path,
291        pub include_file_metric_tag: bool,
292    }
293
294    impl InternalEvent for FileDeleted<'_> {
295        fn emit(self) {
296            info!(
297                message = "File deleted.",
298                file = %self.file.display(),
299            );
300            if self.include_file_metric_tag {
301                counter!(
302                    "files_deleted_total",
303                    "file" => self.file.to_string_lossy().into_owned(),
304                )
305            } else {
306                counter!("files_deleted_total")
307            }
308            .increment(1);
309        }
310    }
311
312    #[derive(Debug)]
313    pub struct FileUnwatched<'a> {
314        pub file: &'a Path,
315        pub include_file_metric_tag: bool,
316        pub reached_eof: bool,
317    }
318
319    impl InternalEvent for FileUnwatched<'_> {
320        fn emit(self) {
321            let reached_eof = if self.reached_eof { "true" } else { "false" };
322            info!(
323                message = "Stopped watching file.",
324                file = %self.file.display(),
325                reached_eof
326            );
327            if self.include_file_metric_tag {
328                counter!(
329                    "files_unwatched_total",
330                    "file" => self.file.to_string_lossy().into_owned(),
331                    "reached_eof" => reached_eof,
332                )
333            } else {
334                counter!(
335                    "files_unwatched_total",
336                    "reached_eof" => reached_eof,
337                )
338            }
339            .increment(1);
340        }
341    }
342
343    #[derive(Debug)]
344    struct FileWatchError<'a> {
345        pub file: &'a Path,
346        pub error: Error,
347        pub include_file_metric_tag: bool,
348    }
349
350    impl InternalEvent for FileWatchError<'_> {
351        fn emit(self) {
352            error!(
353                message = "Failed to watch file.",
354                error = %self.error,
355                error_code = "watching",
356                error_type = error_type::COMMAND_FAILED,
357                stage = error_stage::RECEIVING,
358                file = %self.file.display(),
359            );
360            if self.include_file_metric_tag {
361                counter!(
362                    "component_errors_total",
363                    "error_code" => "watching",
364                    "error_type" => error_type::COMMAND_FAILED,
365                    "stage" => error_stage::RECEIVING,
366                    "file" => self.file.to_string_lossy().into_owned(),
367                )
368            } else {
369                counter!(
370                    "component_errors_total",
371                    "error_code" => "watching",
372                    "error_type" => error_type::COMMAND_FAILED,
373                    "stage" => error_stage::RECEIVING,
374                )
375            }
376            .increment(1);
377        }
378    }
379
380    #[derive(Debug)]
381    pub struct FileResumed<'a> {
382        pub file: &'a Path,
383        pub file_position: u64,
384        pub include_file_metric_tag: bool,
385    }
386
387    impl InternalEvent for FileResumed<'_> {
388        fn emit(self) {
389            info!(
390                message = "Resuming to watch file.",
391                file = %self.file.display(),
392                file_position = %self.file_position
393            );
394            if self.include_file_metric_tag {
395                counter!(
396                    "files_resumed_total",
397                    "file" => self.file.to_string_lossy().into_owned(),
398                )
399            } else {
400                counter!("files_resumed_total")
401            }
402            .increment(1);
403        }
404    }
405
406    #[derive(Debug)]
407    pub struct FileAdded<'a> {
408        pub file: &'a Path,
409        pub include_file_metric_tag: bool,
410    }
411
412    impl InternalEvent for FileAdded<'_> {
413        fn emit(self) {
414            info!(
415                message = "Found new file to watch.",
416                file = %self.file.display(),
417            );
418            if self.include_file_metric_tag {
419                counter!(
420                    "files_added_total",
421                    "file" => self.file.to_string_lossy().into_owned(),
422                )
423            } else {
424                counter!("files_added_total")
425            }
426            .increment(1);
427        }
428    }
429
430    #[derive(Debug)]
431    pub struct FileCheckpointed {
432        pub count: usize,
433        pub duration: Duration,
434    }
435
436    impl InternalEvent for FileCheckpointed {
437        fn emit(self) {
438            debug!(
439                message = "Files checkpointed.",
440                count = %self.count,
441                duration_ms = self.duration.as_millis() as u64,
442            );
443            counter!("checkpoints_total").increment(self.count as u64);
444        }
445    }
446
447    #[derive(Debug)]
448    pub struct FileCheckpointWriteError {
449        pub error: Error,
450    }
451
452    impl InternalEvent for FileCheckpointWriteError {
453        fn emit(self) {
454            error!(
455                message = "Failed writing checkpoints.",
456                error = %self.error,
457                error_code = "writing_checkpoints",
458                error_type = error_type::WRITER_FAILED,
459                stage = error_stage::RECEIVING,
460            );
461            counter!(
462                "component_errors_total",
463                "error_code" => "writing_checkpoints",
464                "error_type" => error_type::WRITER_FAILED,
465                "stage" => error_stage::RECEIVING,
466            )
467            .increment(1);
468        }
469    }
470
471    #[derive(Debug)]
472    pub struct PathGlobbingError<'a> {
473        pub path: &'a Path,
474        pub error: &'a Error,
475    }
476
477    impl InternalEvent for PathGlobbingError<'_> {
478        fn emit(self) {
479            error!(
480                message = "Failed to glob path.",
481                error = %self.error,
482                error_code = "globbing",
483                error_type = error_type::READER_FAILED,
484                stage = error_stage::RECEIVING,
485                path = %self.path.display(),
486            );
487            counter!(
488                "component_errors_total",
489                "error_code" => "globbing",
490                "error_type" => error_type::READER_FAILED,
491                "stage" => error_stage::RECEIVING,
492            )
493            .increment(1);
494        }
495    }
496
497    #[derive(Debug)]
498    pub struct FileLineTooBigError<'a> {
499        pub truncated_bytes: &'a BytesMut,
500        pub configured_limit: usize,
501        pub encountered_size_so_far: usize,
502    }
503
504    impl InternalEvent for FileLineTooBigError<'_> {
505        fn emit(self) {
506            error!(
507                message = "Found line that exceeds max_line_bytes; discarding.",
508                truncated_bytes = ?self.truncated_bytes,
509                configured_limit = self.configured_limit,
510                encountered_size_so_far = self.encountered_size_so_far,
511                error_type = error_type::CONDITION_FAILED,
512                stage = error_stage::RECEIVING,
513            );
514            counter!(
515                "component_errors_total",
516                "error_code" => "reading_line_from_file",
517                "error_type" => error_type::CONDITION_FAILED,
518                "stage" => error_stage::RECEIVING,
519            )
520            .increment(1);
521            emit!(ComponentEventsDropped::<INTENTIONAL> {
522                count: 1,
523                reason: "Found line that exceeds max_line_bytes; discarding.",
524            });
525        }
526    }
527
528    #[derive(Clone)]
529    pub struct FileSourceInternalEventsEmitter {
530        pub include_file_metric_tag: bool,
531    }
532
533    impl FileSourceInternalEvents for FileSourceInternalEventsEmitter {
534        fn emit_file_added(&self, file: &Path) {
535            emit!(FileAdded {
536                file,
537                include_file_metric_tag: self.include_file_metric_tag
538            });
539        }
540
541        fn emit_file_resumed(&self, file: &Path, file_position: u64) {
542            emit!(FileResumed {
543                file,
544                file_position,
545                include_file_metric_tag: self.include_file_metric_tag
546            });
547        }
548
549        fn emit_file_watch_error(&self, file: &Path, error: Error) {
550            emit!(FileWatchError {
551                file,
552                error,
553                include_file_metric_tag: self.include_file_metric_tag
554            });
555        }
556
557        fn emit_file_unwatched(&self, file: &Path, reached_eof: bool) {
558            emit!(FileUnwatched {
559                file,
560                include_file_metric_tag: self.include_file_metric_tag,
561                reached_eof
562            });
563        }
564
565        fn emit_file_deleted(&self, file: &Path) {
566            emit!(FileDeleted {
567                file,
568                include_file_metric_tag: self.include_file_metric_tag
569            });
570        }
571
572        fn emit_file_delete_error(&self, file: &Path, error: Error) {
573            emit!(FileDeleteError {
574                file,
575                error,
576                include_file_metric_tag: self.include_file_metric_tag
577            });
578        }
579
580        fn emit_file_fingerprint_read_error(&self, file: &Path, error: Error) {
581            emit!(FileFingerprintReadError {
582                file,
583                error,
584                include_file_metric_tag: self.include_file_metric_tag
585            });
586        }
587
588        fn emit_file_checksum_failed(&self, file: &Path) {
589            emit!(FileChecksumFailed {
590                file,
591                include_file_metric_tag: self.include_file_metric_tag
592            });
593        }
594
595        fn emit_file_checkpointed(&self, count: usize, duration: Duration) {
596            emit!(FileCheckpointed { count, duration });
597        }
598
599        fn emit_file_checkpoint_write_error(&self, error: Error) {
600            emit!(FileCheckpointWriteError { error });
601        }
602
603        fn emit_files_open(&self, count: usize) {
604            emit!(FileOpen { count });
605        }
606
607        fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
608            emit!(PathGlobbingError { path, error });
609        }
610
611        fn emit_file_line_too_long(
612            &self,
613            truncated_bytes: &bytes::BytesMut,
614            configured_limit: usize,
615            encountered_size_so_far: usize,
616        ) {
617            emit!(FileLineTooBigError {
618                truncated_bytes,
619                configured_limit,
620                encountered_size_so_far
621            });
622        }
623    }
624}