1#![allow(dead_code)] use std::borrow::Cow;
4
5use metrics::{counter, gauge};
6use vector_lib::{
7 configurable::configurable_component,
8 internal_event::{
9 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
10 },
11};
12
13#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
14pub use self::source::*;
15
16#[configurable_component]
18#[derive(Clone, Debug, PartialEq, Eq, Default)]
19#[serde(deny_unknown_fields)]
20pub struct FileInternalMetricsConfig {
21 #[serde(default = "crate::serde::default_false")]
26 pub include_file_tag: bool,
27}
28
29#[derive(Debug)]
30pub struct FileOpen {
31 pub count: usize,
32}
33
34impl InternalEvent for FileOpen {
35 fn emit(self) {
36 gauge!("open_files").set(self.count as f64);
37 }
38}
39
40#[derive(Debug)]
41pub struct FileBytesSent<'a> {
42 pub byte_size: usize,
43 pub file: Cow<'a, str>,
44 pub include_file_metric_tag: bool,
45}
46
47impl InternalEvent for FileBytesSent<'_> {
48 fn emit(self) {
49 trace!(
50 message = "Bytes sent.",
51 byte_size = %self.byte_size,
52 protocol = "file",
53 file = %self.file,
54 );
55 if self.include_file_metric_tag {
56 counter!(
57 "component_sent_bytes_total",
58 "protocol" => "file",
59 "file" => self.file.clone().into_owned(),
60 )
61 } else {
62 counter!(
63 "component_sent_bytes_total",
64 "protocol" => "file",
65 )
66 }
67 .increment(self.byte_size as u64);
68 }
69}
70
71#[derive(Debug)]
72pub struct FileIoError<'a, P> {
73 pub error: std::io::Error,
74 pub code: &'static str,
75 pub message: &'static str,
76 pub path: &'a P,
77 pub dropped_events: usize,
78}
79
80impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
81 fn emit(self) {
82 error!(
83 message = %self.message,
84 path = ?self.path,
85 error = %self.error,
86 error_code = %self.code,
87 error_type = error_type::IO_FAILED,
88 stage = error_stage::SENDING,
89 );
90 counter!(
91 "component_errors_total",
92 "error_code" => self.code,
93 "error_type" => error_type::IO_FAILED,
94 "stage" => error_stage::SENDING,
95 )
96 .increment(1);
97
98 if self.dropped_events > 0 {
99 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
100 count: self.dropped_events,
101 reason: self.message,
102 });
103 }
104 }
105}
106
107#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
108mod source {
109 use std::{io::Error, path::Path, time::Duration};
110
111 use bytes::BytesMut;
112 use metrics::counter;
113 use vector_lib::{
114 emit,
115 file_source_common::internal_events::FileSourceInternalEvents,
116 internal_event::{ComponentEventsDropped, INTENTIONAL, error_stage, error_type},
117 json_size::JsonSize,
118 };
119
120 use super::{FileOpen, InternalEvent};
121
122 #[derive(Debug)]
123 pub struct FileBytesReceived<'a> {
124 pub byte_size: usize,
125 pub file: &'a str,
126 pub include_file_metric_tag: bool,
127 }
128
129 impl InternalEvent for FileBytesReceived<'_> {
130 fn emit(self) {
131 trace!(
132 message = "Bytes received.",
133 byte_size = %self.byte_size,
134 protocol = "file",
135 file = %self.file,
136 );
137 if self.include_file_metric_tag {
138 counter!(
139 "component_received_bytes_total",
140 "protocol" => "file",
141 "file" => self.file.to_owned()
142 )
143 } else {
144 counter!(
145 "component_received_bytes_total",
146 "protocol" => "file",
147 )
148 }
149 .increment(self.byte_size as u64);
150 }
151 }
152
153 #[derive(Debug)]
154 pub struct FileEventsReceived<'a> {
155 pub count: usize,
156 pub file: &'a str,
157 pub byte_size: JsonSize,
158 pub include_file_metric_tag: bool,
159 }
160
161 impl InternalEvent for FileEventsReceived<'_> {
162 fn emit(self) {
163 trace!(
164 message = "Events received.",
165 count = %self.count,
166 byte_size = %self.byte_size,
167 file = %self.file
168 );
169 if self.include_file_metric_tag {
170 counter!(
171 "component_received_events_total",
172 "file" => self.file.to_owned(),
173 )
174 .increment(self.count as u64);
175 counter!(
176 "component_received_event_bytes_total",
177 "file" => self.file.to_owned(),
178 )
179 .increment(self.byte_size.get() as u64);
180 } else {
181 counter!("component_received_events_total").increment(self.count as u64);
182 counter!("component_received_event_bytes_total")
183 .increment(self.byte_size.get() as u64);
184 }
185 }
186 }
187
188 #[derive(Debug)]
189 pub struct FileChecksumFailed<'a> {
190 pub file: &'a Path,
191 pub include_file_metric_tag: bool,
192 }
193
194 impl InternalEvent for FileChecksumFailed<'_> {
195 fn emit(self) {
196 warn!(
197 message = "Currently ignoring file too small to fingerprint.",
198 file = %self.file.display(),
199 );
200 if self.include_file_metric_tag {
201 counter!(
202 "checksum_errors_total",
203 "file" => self.file.to_string_lossy().into_owned(),
204 )
205 } else {
206 counter!("checksum_errors_total")
207 }
208 .increment(1);
209 }
210 }
211
212 #[derive(Debug)]
213 pub struct FileFingerprintReadError<'a> {
214 pub file: &'a Path,
215 pub error: Error,
216 pub include_file_metric_tag: bool,
217 }
218
219 impl InternalEvent for FileFingerprintReadError<'_> {
220 fn emit(self) {
221 error!(
222 message = "Failed reading file for fingerprinting.",
223 file = %self.file.display(),
224 error = %self.error,
225 error_code = "reading_fingerprint",
226 error_type = error_type::READER_FAILED,
227 stage = error_stage::RECEIVING,
228 );
229 if self.include_file_metric_tag {
230 counter!(
231 "component_errors_total",
232 "error_code" => "reading_fingerprint",
233 "error_type" => error_type::READER_FAILED,
234 "stage" => error_stage::RECEIVING,
235 "file" => self.file.to_string_lossy().into_owned(),
236 )
237 } else {
238 counter!(
239 "component_errors_total",
240 "error_code" => "reading_fingerprint",
241 "error_type" => error_type::READER_FAILED,
242 "stage" => error_stage::RECEIVING,
243 )
244 }
245 .increment(1);
246 }
247 }
248
249 const DELETION_FAILED: &str = "deletion_failed";
250
251 #[derive(Debug)]
252 pub struct FileDeleteError<'a> {
253 pub file: &'a Path,
254 pub error: Error,
255 pub include_file_metric_tag: bool,
256 }
257
258 impl InternalEvent for FileDeleteError<'_> {
259 fn emit(self) {
260 error!(
261 message = "Failed in deleting file.",
262 file = %self.file.display(),
263 error = %self.error,
264 error_code = DELETION_FAILED,
265 error_type = error_type::COMMAND_FAILED,
266 stage = error_stage::RECEIVING,
267 );
268 if self.include_file_metric_tag {
269 counter!(
270 "component_errors_total",
271 "file" => self.file.to_string_lossy().into_owned(),
272 "error_code" => DELETION_FAILED,
273 "error_type" => error_type::COMMAND_FAILED,
274 "stage" => error_stage::RECEIVING,
275 )
276 } else {
277 counter!(
278 "component_errors_total",
279 "error_code" => DELETION_FAILED,
280 "error_type" => error_type::COMMAND_FAILED,
281 "stage" => error_stage::RECEIVING,
282 )
283 }
284 .increment(1);
285 }
286 }
287
288 #[derive(Debug)]
289 pub struct FileDeleted<'a> {
290 pub file: &'a Path,
291 pub include_file_metric_tag: bool,
292 }
293
294 impl InternalEvent for FileDeleted<'_> {
295 fn emit(self) {
296 info!(
297 message = "File deleted.",
298 file = %self.file.display(),
299 );
300 if self.include_file_metric_tag {
301 counter!(
302 "files_deleted_total",
303 "file" => self.file.to_string_lossy().into_owned(),
304 )
305 } else {
306 counter!("files_deleted_total")
307 }
308 .increment(1);
309 }
310 }
311
312 #[derive(Debug)]
313 pub struct FileUnwatched<'a> {
314 pub file: &'a Path,
315 pub include_file_metric_tag: bool,
316 pub reached_eof: bool,
317 }
318
319 impl InternalEvent for FileUnwatched<'_> {
320 fn emit(self) {
321 let reached_eof = if self.reached_eof { "true" } else { "false" };
322 info!(
323 message = "Stopped watching file.",
324 file = %self.file.display(),
325 reached_eof
326 );
327 if self.include_file_metric_tag {
328 counter!(
329 "files_unwatched_total",
330 "file" => self.file.to_string_lossy().into_owned(),
331 "reached_eof" => reached_eof,
332 )
333 } else {
334 counter!(
335 "files_unwatched_total",
336 "reached_eof" => reached_eof,
337 )
338 }
339 .increment(1);
340 }
341 }
342
343 #[derive(Debug)]
344 struct FileWatchError<'a> {
345 pub file: &'a Path,
346 pub error: Error,
347 pub include_file_metric_tag: bool,
348 }
349
350 impl InternalEvent for FileWatchError<'_> {
351 fn emit(self) {
352 error!(
353 message = "Failed to watch file.",
354 error = %self.error,
355 error_code = "watching",
356 error_type = error_type::COMMAND_FAILED,
357 stage = error_stage::RECEIVING,
358 file = %self.file.display(),
359 );
360 if self.include_file_metric_tag {
361 counter!(
362 "component_errors_total",
363 "error_code" => "watching",
364 "error_type" => error_type::COMMAND_FAILED,
365 "stage" => error_stage::RECEIVING,
366 "file" => self.file.to_string_lossy().into_owned(),
367 )
368 } else {
369 counter!(
370 "component_errors_total",
371 "error_code" => "watching",
372 "error_type" => error_type::COMMAND_FAILED,
373 "stage" => error_stage::RECEIVING,
374 )
375 }
376 .increment(1);
377 }
378 }
379
380 #[derive(Debug)]
381 pub struct FileResumed<'a> {
382 pub file: &'a Path,
383 pub file_position: u64,
384 pub include_file_metric_tag: bool,
385 }
386
387 impl InternalEvent for FileResumed<'_> {
388 fn emit(self) {
389 info!(
390 message = "Resuming to watch file.",
391 file = %self.file.display(),
392 file_position = %self.file_position
393 );
394 if self.include_file_metric_tag {
395 counter!(
396 "files_resumed_total",
397 "file" => self.file.to_string_lossy().into_owned(),
398 )
399 } else {
400 counter!("files_resumed_total")
401 }
402 .increment(1);
403 }
404 }
405
406 #[derive(Debug)]
407 pub struct FileAdded<'a> {
408 pub file: &'a Path,
409 pub include_file_metric_tag: bool,
410 }
411
412 impl InternalEvent for FileAdded<'_> {
413 fn emit(self) {
414 info!(
415 message = "Found new file to watch.",
416 file = %self.file.display(),
417 );
418 if self.include_file_metric_tag {
419 counter!(
420 "files_added_total",
421 "file" => self.file.to_string_lossy().into_owned(),
422 )
423 } else {
424 counter!("files_added_total")
425 }
426 .increment(1);
427 }
428 }
429
430 #[derive(Debug)]
431 pub struct FileCheckpointed {
432 pub count: usize,
433 pub duration: Duration,
434 }
435
436 impl InternalEvent for FileCheckpointed {
437 fn emit(self) {
438 debug!(
439 message = "Files checkpointed.",
440 count = %self.count,
441 duration_ms = self.duration.as_millis() as u64,
442 );
443 counter!("checkpoints_total").increment(self.count as u64);
444 }
445 }
446
447 #[derive(Debug)]
448 pub struct FileCheckpointWriteError {
449 pub error: Error,
450 }
451
452 impl InternalEvent for FileCheckpointWriteError {
453 fn emit(self) {
454 error!(
455 message = "Failed writing checkpoints.",
456 error = %self.error,
457 error_code = "writing_checkpoints",
458 error_type = error_type::WRITER_FAILED,
459 stage = error_stage::RECEIVING,
460 );
461 counter!(
462 "component_errors_total",
463 "error_code" => "writing_checkpoints",
464 "error_type" => error_type::WRITER_FAILED,
465 "stage" => error_stage::RECEIVING,
466 )
467 .increment(1);
468 }
469 }
470
471 #[derive(Debug)]
472 pub struct PathGlobbingError<'a> {
473 pub path: &'a Path,
474 pub error: &'a Error,
475 }
476
477 impl InternalEvent for PathGlobbingError<'_> {
478 fn emit(self) {
479 error!(
480 message = "Failed to glob path.",
481 error = %self.error,
482 error_code = "globbing",
483 error_type = error_type::READER_FAILED,
484 stage = error_stage::RECEIVING,
485 path = %self.path.display(),
486 );
487 counter!(
488 "component_errors_total",
489 "error_code" => "globbing",
490 "error_type" => error_type::READER_FAILED,
491 "stage" => error_stage::RECEIVING,
492 )
493 .increment(1);
494 }
495 }
496
497 #[derive(Debug)]
498 pub struct FileLineTooBigError<'a> {
499 pub truncated_bytes: &'a BytesMut,
500 pub configured_limit: usize,
501 pub encountered_size_so_far: usize,
502 }
503
504 impl InternalEvent for FileLineTooBigError<'_> {
505 fn emit(self) {
506 error!(
507 message = "Found line that exceeds max_line_bytes; discarding.",
508 truncated_bytes = ?self.truncated_bytes,
509 configured_limit = self.configured_limit,
510 encountered_size_so_far = self.encountered_size_so_far,
511 error_type = error_type::CONDITION_FAILED,
512 stage = error_stage::RECEIVING,
513 );
514 counter!(
515 "component_errors_total",
516 "error_code" => "reading_line_from_file",
517 "error_type" => error_type::CONDITION_FAILED,
518 "stage" => error_stage::RECEIVING,
519 )
520 .increment(1);
521 emit!(ComponentEventsDropped::<INTENTIONAL> {
522 count: 1,
523 reason: "Found line that exceeds max_line_bytes; discarding.",
524 });
525 }
526 }
527
528 #[derive(Clone)]
529 pub struct FileSourceInternalEventsEmitter {
530 pub include_file_metric_tag: bool,
531 }
532
533 impl FileSourceInternalEvents for FileSourceInternalEventsEmitter {
534 fn emit_file_added(&self, file: &Path) {
535 emit!(FileAdded {
536 file,
537 include_file_metric_tag: self.include_file_metric_tag
538 });
539 }
540
541 fn emit_file_resumed(&self, file: &Path, file_position: u64) {
542 emit!(FileResumed {
543 file,
544 file_position,
545 include_file_metric_tag: self.include_file_metric_tag
546 });
547 }
548
549 fn emit_file_watch_error(&self, file: &Path, error: Error) {
550 emit!(FileWatchError {
551 file,
552 error,
553 include_file_metric_tag: self.include_file_metric_tag
554 });
555 }
556
557 fn emit_file_unwatched(&self, file: &Path, reached_eof: bool) {
558 emit!(FileUnwatched {
559 file,
560 include_file_metric_tag: self.include_file_metric_tag,
561 reached_eof
562 });
563 }
564
565 fn emit_file_deleted(&self, file: &Path) {
566 emit!(FileDeleted {
567 file,
568 include_file_metric_tag: self.include_file_metric_tag
569 });
570 }
571
572 fn emit_file_delete_error(&self, file: &Path, error: Error) {
573 emit!(FileDeleteError {
574 file,
575 error,
576 include_file_metric_tag: self.include_file_metric_tag
577 });
578 }
579
580 fn emit_file_fingerprint_read_error(&self, file: &Path, error: Error) {
581 emit!(FileFingerprintReadError {
582 file,
583 error,
584 include_file_metric_tag: self.include_file_metric_tag
585 });
586 }
587
588 fn emit_file_checksum_failed(&self, file: &Path) {
589 emit!(FileChecksumFailed {
590 file,
591 include_file_metric_tag: self.include_file_metric_tag
592 });
593 }
594
595 fn emit_file_checkpointed(&self, count: usize, duration: Duration) {
596 emit!(FileCheckpointed { count, duration });
597 }
598
599 fn emit_file_checkpoint_write_error(&self, error: Error) {
600 emit!(FileCheckpointWriteError { error });
601 }
602
603 fn emit_files_open(&self, count: usize) {
604 emit!(FileOpen { count });
605 }
606
607 fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
608 emit!(PathGlobbingError { path, error });
609 }
610
611 fn emit_file_line_too_long(
612 &self,
613 truncated_bytes: &bytes::BytesMut,
614 configured_limit: usize,
615 encountered_size_so_far: usize,
616 ) {
617 emit!(FileLineTooBigError {
618 truncated_bytes,
619 configured_limit,
620 encountered_size_so_far
621 });
622 }
623 }
624}