1#![allow(dead_code)] use std::borrow::Cow;
4
5use metrics::{counter, gauge};
6use vector_lib::{
7 NamedInternalEvent,
8 configurable::configurable_component,
9 internal_event::{
10 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
11 },
12};
13
14#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
15pub use self::source::*;
16
17#[configurable_component]
19#[derive(Clone, Debug, PartialEq, Eq, Default)]
20#[serde(deny_unknown_fields)]
21pub struct FileInternalMetricsConfig {
22 #[serde(default = "crate::serde::default_false")]
27 pub include_file_tag: bool,
28}
29
30#[derive(Debug, NamedInternalEvent)]
31pub struct FileOpen {
32 pub count: usize,
33}
34
35impl InternalEvent for FileOpen {
36 fn emit(self) {
37 gauge!("open_files").set(self.count as f64);
38 }
39}
40
41#[derive(Debug, NamedInternalEvent)]
42pub struct FileBytesSent<'a> {
43 pub byte_size: usize,
44 pub file: Cow<'a, str>,
45 pub include_file_metric_tag: bool,
46}
47
48impl InternalEvent for FileBytesSent<'_> {
49 fn emit(self) {
50 trace!(
51 message = "Bytes sent.",
52 byte_size = %self.byte_size,
53 protocol = "file",
54 file = %self.file,
55 );
56 if self.include_file_metric_tag {
57 counter!(
58 "component_sent_bytes_total",
59 "protocol" => "file",
60 "file" => self.file.clone().into_owned(),
61 )
62 } else {
63 counter!(
64 "component_sent_bytes_total",
65 "protocol" => "file",
66 )
67 }
68 .increment(self.byte_size as u64);
69 }
70}
71
72#[derive(Debug, NamedInternalEvent)]
73pub struct FileIoError<'a, P> {
74 pub error: std::io::Error,
75 pub code: &'static str,
76 pub message: &'static str,
77 pub path: &'a P,
78 pub dropped_events: usize,
79}
80
81impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
82 fn emit(self) {
83 error!(
84 message = %self.message,
85 path = ?self.path,
86 error = %self.error,
87 error_code = %self.code,
88 error_type = error_type::IO_FAILED,
89 stage = error_stage::SENDING,
90 );
91 counter!(
92 "component_errors_total",
93 "error_code" => self.code,
94 "error_type" => error_type::IO_FAILED,
95 "stage" => error_stage::SENDING,
96 )
97 .increment(1);
98
99 if self.dropped_events > 0 {
100 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
101 count: self.dropped_events,
102 reason: self.message,
103 });
104 }
105 }
106}
107
108#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
109mod source {
110 use std::{io::Error, path::Path, time::Duration};
111
112 use bytes::BytesMut;
113 use metrics::counter;
114 use vector_lib::{
115 NamedInternalEvent, emit,
116 file_source_common::internal_events::FileSourceInternalEvents,
117 internal_event::{ComponentEventsDropped, INTENTIONAL, error_stage, error_type},
118 json_size::JsonSize,
119 };
120
121 use super::{FileOpen, InternalEvent};
122
123 #[derive(Debug, NamedInternalEvent)]
124 pub struct FileBytesReceived<'a> {
125 pub byte_size: usize,
126 pub file: &'a str,
127 pub include_file_metric_tag: bool,
128 }
129
130 impl InternalEvent for FileBytesReceived<'_> {
131 fn emit(self) {
132 trace!(
133 message = "Bytes received.",
134 byte_size = %self.byte_size,
135 protocol = "file",
136 file = %self.file,
137 );
138 if self.include_file_metric_tag {
139 counter!(
140 "component_received_bytes_total",
141 "protocol" => "file",
142 "file" => self.file.to_owned()
143 )
144 } else {
145 counter!(
146 "component_received_bytes_total",
147 "protocol" => "file",
148 )
149 }
150 .increment(self.byte_size as u64);
151 }
152 }
153
154 #[derive(Debug, NamedInternalEvent)]
155 pub struct FileEventsReceived<'a> {
156 pub count: usize,
157 pub file: &'a str,
158 pub byte_size: JsonSize,
159 pub include_file_metric_tag: bool,
160 }
161
162 impl InternalEvent for FileEventsReceived<'_> {
163 fn emit(self) {
164 trace!(
165 message = "Events received.",
166 count = %self.count,
167 byte_size = %self.byte_size,
168 file = %self.file
169 );
170 if self.include_file_metric_tag {
171 counter!(
172 "component_received_events_total",
173 "file" => self.file.to_owned(),
174 )
175 .increment(self.count as u64);
176 counter!(
177 "component_received_event_bytes_total",
178 "file" => self.file.to_owned(),
179 )
180 .increment(self.byte_size.get() as u64);
181 } else {
182 counter!("component_received_events_total").increment(self.count as u64);
183 counter!("component_received_event_bytes_total")
184 .increment(self.byte_size.get() as u64);
185 }
186 }
187 }
188
189 #[derive(Debug, NamedInternalEvent)]
190 pub struct FileChecksumFailed<'a> {
191 pub file: &'a Path,
192 pub include_file_metric_tag: bool,
193 }
194
195 impl InternalEvent for FileChecksumFailed<'_> {
196 fn emit(self) {
197 warn!(
198 message = "Currently ignoring file too small to fingerprint.",
199 file = %self.file.display(),
200 );
201 if self.include_file_metric_tag {
202 counter!(
203 "checksum_errors_total",
204 "file" => self.file.to_string_lossy().into_owned(),
205 )
206 } else {
207 counter!("checksum_errors_total")
208 }
209 .increment(1);
210 }
211 }
212
213 #[derive(Debug, NamedInternalEvent)]
214 pub struct FileFingerprintReadError<'a> {
215 pub file: &'a Path,
216 pub error: Error,
217 pub include_file_metric_tag: bool,
218 }
219
220 impl InternalEvent for FileFingerprintReadError<'_> {
221 fn emit(self) {
222 error!(
223 message = "Failed reading file for fingerprinting.",
224 file = %self.file.display(),
225 error = %self.error,
226 error_code = "reading_fingerprint",
227 error_type = error_type::READER_FAILED,
228 stage = error_stage::RECEIVING,
229 );
230 if self.include_file_metric_tag {
231 counter!(
232 "component_errors_total",
233 "error_code" => "reading_fingerprint",
234 "error_type" => error_type::READER_FAILED,
235 "stage" => error_stage::RECEIVING,
236 "file" => self.file.to_string_lossy().into_owned(),
237 )
238 } else {
239 counter!(
240 "component_errors_total",
241 "error_code" => "reading_fingerprint",
242 "error_type" => error_type::READER_FAILED,
243 "stage" => error_stage::RECEIVING,
244 )
245 }
246 .increment(1);
247 }
248 }
249
250 const DELETION_FAILED: &str = "deletion_failed";
251
252 #[derive(Debug, NamedInternalEvent)]
253 pub struct FileDeleteError<'a> {
254 pub file: &'a Path,
255 pub error: Error,
256 pub include_file_metric_tag: bool,
257 }
258
259 impl InternalEvent for FileDeleteError<'_> {
260 fn emit(self) {
261 error!(
262 message = "Failed in deleting file.",
263 file = %self.file.display(),
264 error = %self.error,
265 error_code = DELETION_FAILED,
266 error_type = error_type::COMMAND_FAILED,
267 stage = error_stage::RECEIVING,
268 );
269 if self.include_file_metric_tag {
270 counter!(
271 "component_errors_total",
272 "file" => self.file.to_string_lossy().into_owned(),
273 "error_code" => DELETION_FAILED,
274 "error_type" => error_type::COMMAND_FAILED,
275 "stage" => error_stage::RECEIVING,
276 )
277 } else {
278 counter!(
279 "component_errors_total",
280 "error_code" => DELETION_FAILED,
281 "error_type" => error_type::COMMAND_FAILED,
282 "stage" => error_stage::RECEIVING,
283 )
284 }
285 .increment(1);
286 }
287 }
288
289 #[derive(Debug, NamedInternalEvent)]
290 pub struct FileDeleted<'a> {
291 pub file: &'a Path,
292 pub include_file_metric_tag: bool,
293 }
294
295 impl InternalEvent for FileDeleted<'_> {
296 fn emit(self) {
297 info!(
298 message = "File deleted.",
299 file = %self.file.display(),
300 );
301 if self.include_file_metric_tag {
302 counter!(
303 "files_deleted_total",
304 "file" => self.file.to_string_lossy().into_owned(),
305 )
306 } else {
307 counter!("files_deleted_total")
308 }
309 .increment(1);
310 }
311 }
312
313 #[derive(Debug, NamedInternalEvent)]
314 pub struct FileUnwatched<'a> {
315 pub file: &'a Path,
316 pub include_file_metric_tag: bool,
317 pub reached_eof: bool,
318 }
319
320 impl InternalEvent for FileUnwatched<'_> {
321 fn emit(self) {
322 let reached_eof = if self.reached_eof { "true" } else { "false" };
323 info!(
324 message = "Stopped watching file.",
325 file = %self.file.display(),
326 reached_eof
327 );
328 if self.include_file_metric_tag {
329 counter!(
330 "files_unwatched_total",
331 "file" => self.file.to_string_lossy().into_owned(),
332 "reached_eof" => reached_eof,
333 )
334 } else {
335 counter!(
336 "files_unwatched_total",
337 "reached_eof" => reached_eof,
338 )
339 }
340 .increment(1);
341 }
342 }
343
344 #[derive(Debug, NamedInternalEvent)]
345 struct FileWatchError<'a> {
346 pub file: &'a Path,
347 pub error: Error,
348 pub include_file_metric_tag: bool,
349 }
350
351 impl InternalEvent for FileWatchError<'_> {
352 fn emit(self) {
353 error!(
354 message = "Failed to watch file.",
355 error = %self.error,
356 error_code = "watching",
357 error_type = error_type::COMMAND_FAILED,
358 stage = error_stage::RECEIVING,
359 file = %self.file.display(),
360 );
361 if self.include_file_metric_tag {
362 counter!(
363 "component_errors_total",
364 "error_code" => "watching",
365 "error_type" => error_type::COMMAND_FAILED,
366 "stage" => error_stage::RECEIVING,
367 "file" => self.file.to_string_lossy().into_owned(),
368 )
369 } else {
370 counter!(
371 "component_errors_total",
372 "error_code" => "watching",
373 "error_type" => error_type::COMMAND_FAILED,
374 "stage" => error_stage::RECEIVING,
375 )
376 }
377 .increment(1);
378 }
379 }
380
381 #[derive(Debug, NamedInternalEvent)]
382 pub struct FileResumed<'a> {
383 pub file: &'a Path,
384 pub file_position: u64,
385 pub include_file_metric_tag: bool,
386 }
387
388 impl InternalEvent for FileResumed<'_> {
389 fn emit(self) {
390 info!(
391 message = "Resuming to watch file.",
392 file = %self.file.display(),
393 file_position = %self.file_position
394 );
395 if self.include_file_metric_tag {
396 counter!(
397 "files_resumed_total",
398 "file" => self.file.to_string_lossy().into_owned(),
399 )
400 } else {
401 counter!("files_resumed_total")
402 }
403 .increment(1);
404 }
405 }
406
407 #[derive(Debug, NamedInternalEvent)]
408 pub struct FileAdded<'a> {
409 pub file: &'a Path,
410 pub include_file_metric_tag: bool,
411 }
412
413 impl InternalEvent for FileAdded<'_> {
414 fn emit(self) {
415 info!(
416 message = "Found new file to watch.",
417 file = %self.file.display(),
418 );
419 if self.include_file_metric_tag {
420 counter!(
421 "files_added_total",
422 "file" => self.file.to_string_lossy().into_owned(),
423 )
424 } else {
425 counter!("files_added_total")
426 }
427 .increment(1);
428 }
429 }
430
431 #[derive(Debug, NamedInternalEvent)]
432 pub struct FileCheckpointed {
433 pub count: usize,
434 pub duration: Duration,
435 }
436
437 impl InternalEvent for FileCheckpointed {
438 fn emit(self) {
439 debug!(
440 message = "Files checkpointed.",
441 count = %self.count,
442 duration_ms = self.duration.as_millis() as u64,
443 );
444 counter!("checkpoints_total").increment(self.count as u64);
445 }
446 }
447
448 #[derive(Debug, NamedInternalEvent)]
449 pub struct FileCheckpointWriteError {
450 pub error: Error,
451 }
452
453 impl InternalEvent for FileCheckpointWriteError {
454 fn emit(self) {
455 error!(
456 message = "Failed writing checkpoints.",
457 error = %self.error,
458 error_code = "writing_checkpoints",
459 error_type = error_type::WRITER_FAILED,
460 stage = error_stage::RECEIVING,
461 );
462 counter!(
463 "component_errors_total",
464 "error_code" => "writing_checkpoints",
465 "error_type" => error_type::WRITER_FAILED,
466 "stage" => error_stage::RECEIVING,
467 )
468 .increment(1);
469 }
470 }
471
472 #[derive(Debug, NamedInternalEvent)]
473 pub struct PathGlobbingError<'a> {
474 pub path: &'a Path,
475 pub error: &'a Error,
476 }
477
478 impl InternalEvent for PathGlobbingError<'_> {
479 fn emit(self) {
480 error!(
481 message = "Failed to glob path.",
482 error = %self.error,
483 error_code = "globbing",
484 error_type = error_type::READER_FAILED,
485 stage = error_stage::RECEIVING,
486 path = %self.path.display(),
487 );
488 counter!(
489 "component_errors_total",
490 "error_code" => "globbing",
491 "error_type" => error_type::READER_FAILED,
492 "stage" => error_stage::RECEIVING,
493 )
494 .increment(1);
495 }
496 }
497
498 #[derive(Debug, NamedInternalEvent)]
499 pub struct FileLineTooBigError<'a> {
500 pub truncated_bytes: &'a BytesMut,
501 pub configured_limit: usize,
502 pub encountered_size_so_far: usize,
503 }
504
505 impl InternalEvent for FileLineTooBigError<'_> {
506 fn emit(self) {
507 error!(
508 message = "Found line that exceeds max_line_bytes; discarding.",
509 truncated_bytes = ?self.truncated_bytes,
510 configured_limit = self.configured_limit,
511 encountered_size_so_far = self.encountered_size_so_far,
512 error_type = error_type::CONDITION_FAILED,
513 stage = error_stage::RECEIVING,
514 );
515 counter!(
516 "component_errors_total",
517 "error_code" => "reading_line_from_file",
518 "error_type" => error_type::CONDITION_FAILED,
519 "stage" => error_stage::RECEIVING,
520 )
521 .increment(1);
522 emit!(ComponentEventsDropped::<INTENTIONAL> {
523 count: 1,
524 reason: "Found line that exceeds max_line_bytes; discarding.",
525 });
526 }
527 }
528
529 #[derive(Clone)]
530 pub struct FileSourceInternalEventsEmitter {
531 pub include_file_metric_tag: bool,
532 }
533
534 impl FileSourceInternalEvents for FileSourceInternalEventsEmitter {
535 fn emit_file_added(&self, file: &Path) {
536 emit!(FileAdded {
537 file,
538 include_file_metric_tag: self.include_file_metric_tag
539 });
540 }
541
542 fn emit_file_resumed(&self, file: &Path, file_position: u64) {
543 emit!(FileResumed {
544 file,
545 file_position,
546 include_file_metric_tag: self.include_file_metric_tag
547 });
548 }
549
550 fn emit_file_watch_error(&self, file: &Path, error: Error) {
551 emit!(FileWatchError {
552 file,
553 error,
554 include_file_metric_tag: self.include_file_metric_tag
555 });
556 }
557
558 fn emit_file_unwatched(&self, file: &Path, reached_eof: bool) {
559 emit!(FileUnwatched {
560 file,
561 include_file_metric_tag: self.include_file_metric_tag,
562 reached_eof
563 });
564 }
565
566 fn emit_file_deleted(&self, file: &Path) {
567 emit!(FileDeleted {
568 file,
569 include_file_metric_tag: self.include_file_metric_tag
570 });
571 }
572
573 fn emit_file_delete_error(&self, file: &Path, error: Error) {
574 emit!(FileDeleteError {
575 file,
576 error,
577 include_file_metric_tag: self.include_file_metric_tag
578 });
579 }
580
581 fn emit_file_fingerprint_read_error(&self, file: &Path, error: Error) {
582 emit!(FileFingerprintReadError {
583 file,
584 error,
585 include_file_metric_tag: self.include_file_metric_tag
586 });
587 }
588
589 fn emit_file_checksum_failed(&self, file: &Path) {
590 emit!(FileChecksumFailed {
591 file,
592 include_file_metric_tag: self.include_file_metric_tag
593 });
594 }
595
596 fn emit_file_checkpointed(&self, count: usize, duration: Duration) {
597 emit!(FileCheckpointed { count, duration });
598 }
599
600 fn emit_file_checkpoint_write_error(&self, error: Error) {
601 emit!(FileCheckpointWriteError { error });
602 }
603
604 fn emit_files_open(&self, count: usize) {
605 emit!(FileOpen { count });
606 }
607
608 fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
609 emit!(PathGlobbingError { path, error });
610 }
611
612 fn emit_file_line_too_long(
613 &self,
614 truncated_bytes: &bytes::BytesMut,
615 configured_limit: usize,
616 encountered_size_so_far: usize,
617 ) {
618 emit!(FileLineTooBigError {
619 truncated_bytes,
620 configured_limit,
621 encountered_size_so_far
622 });
623 }
624 }
625}