1#![allow(dead_code)] use std::borrow::Cow;
4
5use vector_lib::{
6 NamedInternalEvent,
7 configurable::configurable_component,
8 counter, gauge,
9 internal_event::{
10 ComponentEventsDropped, CounterName, GaugeName, InternalEvent, UNINTENTIONAL, error_stage,
11 error_type,
12 },
13};
14
15#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
16pub use self::source::*;
17
18#[configurable_component]
20#[derive(Clone, Debug, PartialEq, Eq, Default)]
21#[serde(deny_unknown_fields)]
22pub struct FileInternalMetricsConfig {
23 #[serde(default = "crate::serde::default_false")]
28 pub include_file_tag: bool,
29}
30
31#[derive(Debug, NamedInternalEvent)]
32pub struct FileOpen {
33 pub count: usize,
34}
35
36impl InternalEvent for FileOpen {
37 fn emit(self) {
38 gauge!(GaugeName::OpenFiles).set(self.count as f64);
39 }
40}
41
42#[derive(Debug, NamedInternalEvent)]
43pub struct FileBytesSent<'a> {
44 pub byte_size: usize,
45 pub file: Cow<'a, str>,
46 pub include_file_metric_tag: bool,
47}
48
49impl InternalEvent for FileBytesSent<'_> {
50 fn emit(self) {
51 trace!(
52 message = "Bytes sent.",
53 byte_size = %self.byte_size,
54 protocol = "file",
55 file = %self.file,
56 );
57 if self.include_file_metric_tag {
58 counter!(
59 CounterName::ComponentSentBytesTotal,
60 "protocol" => "file",
61 "file" => self.file.clone().into_owned(),
62 )
63 } else {
64 counter!(
65 CounterName::ComponentSentBytesTotal,
66 "protocol" => "file",
67 )
68 }
69 .increment(self.byte_size as u64);
70 }
71}
72
73#[derive(Debug, NamedInternalEvent)]
74pub struct FileIoError<'a, P> {
75 pub error: std::io::Error,
76 pub code: &'static str,
77 pub message: &'static str,
78 pub path: &'a P,
79 pub dropped_events: usize,
80}
81
82impl<P: std::fmt::Debug> InternalEvent for FileIoError<'_, P> {
83 fn emit(self) {
84 error!(
85 message = %self.message,
86 path = ?self.path,
87 error = %self.error,
88 error_code = %self.code,
89 error_type = error_type::IO_FAILED,
90 stage = error_stage::SENDING,
91 );
92 counter!(
93 CounterName::ComponentErrorsTotal,
94 "error_code" => self.code,
95 "error_type" => error_type::IO_FAILED,
96 "stage" => error_stage::SENDING,
97 )
98 .increment(1);
99
100 if self.dropped_events > 0 {
101 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
102 count: self.dropped_events,
103 reason: self.message,
104 });
105 }
106 }
107}
108
109#[cfg(any(feature = "sources-file", feature = "sources-kubernetes_logs"))]
110mod source {
111 use std::{io::Error, path::Path, time::Duration};
112
113 use bytes::BytesMut;
114 use vector_lib::{
115 NamedInternalEvent, counter, emit,
116 file_source_common::internal_events::FileSourceInternalEvents,
117 internal_event::{
118 ComponentEventsDropped, CounterName, INTENTIONAL, error_stage, error_type,
119 },
120 json_size::JsonSize,
121 };
122
123 use super::{FileOpen, InternalEvent};
124
125 #[derive(Debug, NamedInternalEvent)]
126 pub struct FileBytesReceived<'a> {
127 pub byte_size: usize,
128 pub file: &'a str,
129 pub include_file_metric_tag: bool,
130 }
131
132 impl InternalEvent for FileBytesReceived<'_> {
133 fn emit(self) {
134 trace!(
135 message = "Bytes received.",
136 byte_size = %self.byte_size,
137 protocol = "file",
138 file = %self.file,
139 );
140 if self.include_file_metric_tag {
141 counter!(
142 CounterName::ComponentReceivedBytesTotal,
143 "protocol" => "file",
144 "file" => self.file.to_owned()
145 )
146 } else {
147 counter!(
148 CounterName::ComponentReceivedBytesTotal,
149 "protocol" => "file",
150 )
151 }
152 .increment(self.byte_size as u64);
153 }
154 }
155
156 #[derive(Debug, NamedInternalEvent)]
157 pub struct FileEventsReceived<'a> {
158 pub count: usize,
159 pub file: &'a str,
160 pub byte_size: JsonSize,
161 pub include_file_metric_tag: bool,
162 }
163
164 impl InternalEvent for FileEventsReceived<'_> {
165 fn emit(self) {
166 trace!(
167 message = "Events received.",
168 count = %self.count,
169 byte_size = %self.byte_size,
170 file = %self.file
171 );
172 if self.include_file_metric_tag {
173 counter!(
174 CounterName::ComponentReceivedEventsTotal,
175 "file" => self.file.to_owned(),
176 )
177 .increment(self.count as u64);
178 counter!(
179 CounterName::ComponentReceivedEventBytesTotal,
180 "file" => self.file.to_owned(),
181 )
182 .increment(self.byte_size.get() as u64);
183 } else {
184 counter!(CounterName::ComponentReceivedEventsTotal).increment(self.count as u64);
185 counter!(CounterName::ComponentReceivedEventBytesTotal)
186 .increment(self.byte_size.get() as u64);
187 }
188 }
189 }
190
191 #[derive(Debug, NamedInternalEvent)]
192 pub struct FileChecksumFailed<'a> {
193 pub file: &'a Path,
194 pub include_file_metric_tag: bool,
195 }
196
197 impl InternalEvent for FileChecksumFailed<'_> {
198 fn emit(self) {
199 warn!(
200 message = "Currently ignoring file too small to fingerprint.",
201 file = %self.file.display(),
202 );
203 if self.include_file_metric_tag {
204 counter!(
205 CounterName::ChecksumErrorsTotal,
206 "file" => self.file.to_string_lossy().into_owned(),
207 )
208 } else {
209 counter!(CounterName::ChecksumErrorsTotal)
210 }
211 .increment(1);
212 }
213 }
214
215 #[derive(Debug, NamedInternalEvent)]
216 pub struct FileFingerprintReadError<'a> {
217 pub file: &'a Path,
218 pub error: Error,
219 pub include_file_metric_tag: bool,
220 }
221
222 impl InternalEvent for FileFingerprintReadError<'_> {
223 fn emit(self) {
224 error!(
225 message = "Failed reading file for fingerprinting.",
226 file = %self.file.display(),
227 error = %self.error,
228 error_code = "reading_fingerprint",
229 error_type = error_type::READER_FAILED,
230 stage = error_stage::RECEIVING,
231 );
232 if self.include_file_metric_tag {
233 counter!(
234 CounterName::ComponentErrorsTotal,
235 "error_code" => "reading_fingerprint",
236 "error_type" => error_type::READER_FAILED,
237 "stage" => error_stage::RECEIVING,
238 "file" => self.file.to_string_lossy().into_owned(),
239 )
240 } else {
241 counter!(
242 CounterName::ComponentErrorsTotal,
243 "error_code" => "reading_fingerprint",
244 "error_type" => error_type::READER_FAILED,
245 "stage" => error_stage::RECEIVING,
246 )
247 }
248 .increment(1);
249 }
250 }
251
252 const DELETION_FAILED: &str = "deletion_failed";
253
254 #[derive(Debug, NamedInternalEvent)]
255 pub struct FileDeleteError<'a> {
256 pub file: &'a Path,
257 pub error: Error,
258 pub include_file_metric_tag: bool,
259 }
260
261 impl InternalEvent for FileDeleteError<'_> {
262 fn emit(self) {
263 error!(
264 message = "Failed in deleting file.",
265 file = %self.file.display(),
266 error = %self.error,
267 error_code = DELETION_FAILED,
268 error_type = error_type::COMMAND_FAILED,
269 stage = error_stage::RECEIVING,
270 );
271 if self.include_file_metric_tag {
272 counter!(
273 CounterName::ComponentErrorsTotal,
274 "file" => self.file.to_string_lossy().into_owned(),
275 "error_code" => DELETION_FAILED,
276 "error_type" => error_type::COMMAND_FAILED,
277 "stage" => error_stage::RECEIVING,
278 )
279 } else {
280 counter!(
281 CounterName::ComponentErrorsTotal,
282 "error_code" => DELETION_FAILED,
283 "error_type" => error_type::COMMAND_FAILED,
284 "stage" => error_stage::RECEIVING,
285 )
286 }
287 .increment(1);
288 }
289 }
290
291 #[derive(Debug, NamedInternalEvent)]
292 pub struct FileDeleted<'a> {
293 pub file: &'a Path,
294 pub include_file_metric_tag: bool,
295 }
296
297 impl InternalEvent for FileDeleted<'_> {
298 fn emit(self) {
299 info!(
300 message = "File deleted.",
301 file = %self.file.display(),
302 );
303 if self.include_file_metric_tag {
304 counter!(
305 CounterName::FilesDeletedTotal,
306 "file" => self.file.to_string_lossy().into_owned(),
307 )
308 } else {
309 counter!(CounterName::FilesDeletedTotal)
310 }
311 .increment(1);
312 }
313 }
314
315 #[derive(Debug, NamedInternalEvent)]
316 pub struct FileUnwatched<'a> {
317 pub file: &'a Path,
318 pub include_file_metric_tag: bool,
319 pub reached_eof: bool,
320 }
321
322 impl InternalEvent for FileUnwatched<'_> {
323 fn emit(self) {
324 let reached_eof = if self.reached_eof { "true" } else { "false" };
325 info!(
326 message = "Stopped watching file.",
327 file = %self.file.display(),
328 reached_eof
329 );
330 if self.include_file_metric_tag {
331 counter!(
332 CounterName::FilesUnwatchedTotal,
333 "file" => self.file.to_string_lossy().into_owned(),
334 "reached_eof" => reached_eof,
335 )
336 } else {
337 counter!(
338 CounterName::FilesUnwatchedTotal,
339 "reached_eof" => reached_eof,
340 )
341 }
342 .increment(1);
343 }
344 }
345
346 #[derive(Debug, NamedInternalEvent)]
347 struct FileWatchError<'a> {
348 pub file: &'a Path,
349 pub error: Error,
350 pub include_file_metric_tag: bool,
351 }
352
353 impl InternalEvent for FileWatchError<'_> {
354 fn emit(self) {
355 error!(
356 message = "Failed to watch file.",
357 error = %self.error,
358 error_code = "watching",
359 error_type = error_type::COMMAND_FAILED,
360 stage = error_stage::RECEIVING,
361 file = %self.file.display(),
362 );
363 if self.include_file_metric_tag {
364 counter!(
365 CounterName::ComponentErrorsTotal,
366 "error_code" => "watching",
367 "error_type" => error_type::COMMAND_FAILED,
368 "stage" => error_stage::RECEIVING,
369 "file" => self.file.to_string_lossy().into_owned(),
370 )
371 } else {
372 counter!(
373 CounterName::ComponentErrorsTotal,
374 "error_code" => "watching",
375 "error_type" => error_type::COMMAND_FAILED,
376 "stage" => error_stage::RECEIVING,
377 )
378 }
379 .increment(1);
380 }
381 }
382
383 #[derive(Debug, NamedInternalEvent)]
384 pub struct FileResumed<'a> {
385 pub file: &'a Path,
386 pub file_position: u64,
387 pub include_file_metric_tag: bool,
388 }
389
390 impl InternalEvent for FileResumed<'_> {
391 fn emit(self) {
392 info!(
393 message = "Resuming to watch file.",
394 file = %self.file.display(),
395 file_position = %self.file_position
396 );
397 if self.include_file_metric_tag {
398 counter!(
399 CounterName::FilesResumedTotal,
400 "file" => self.file.to_string_lossy().into_owned(),
401 )
402 } else {
403 counter!(CounterName::FilesResumedTotal)
404 }
405 .increment(1);
406 }
407 }
408
409 #[derive(Debug, NamedInternalEvent)]
410 pub struct FileAdded<'a> {
411 pub file: &'a Path,
412 pub include_file_metric_tag: bool,
413 }
414
415 impl InternalEvent for FileAdded<'_> {
416 fn emit(self) {
417 info!(
418 message = "Found new file to watch.",
419 file = %self.file.display(),
420 );
421 if self.include_file_metric_tag {
422 counter!(
423 CounterName::FilesAddedTotal,
424 "file" => self.file.to_string_lossy().into_owned(),
425 )
426 } else {
427 counter!(CounterName::FilesAddedTotal)
428 }
429 .increment(1);
430 }
431 }
432
433 #[derive(Debug, NamedInternalEvent)]
434 pub struct FileCheckpointed {
435 pub count: usize,
436 pub duration: Duration,
437 }
438
439 impl InternalEvent for FileCheckpointed {
440 fn emit(self) {
441 debug!(
442 message = "Files checkpointed.",
443 count = %self.count,
444 duration_ms = self.duration.as_millis() as u64,
445 );
446 counter!(CounterName::CheckpointsTotal).increment(self.count as u64);
447 }
448 }
449
450 #[derive(Debug, NamedInternalEvent)]
451 pub struct FileCheckpointWriteError {
452 pub error: Error,
453 }
454
455 impl InternalEvent for FileCheckpointWriteError {
456 fn emit(self) {
457 error!(
458 message = "Failed writing checkpoints.",
459 error = %self.error,
460 error_code = "writing_checkpoints",
461 error_type = error_type::WRITER_FAILED,
462 stage = error_stage::RECEIVING,
463 );
464 counter!(
465 CounterName::ComponentErrorsTotal,
466 "error_code" => "writing_checkpoints",
467 "error_type" => error_type::WRITER_FAILED,
468 "stage" => error_stage::RECEIVING,
469 )
470 .increment(1);
471 }
472 }
473
474 #[derive(Debug, NamedInternalEvent)]
475 pub struct PathGlobbingError<'a> {
476 pub path: &'a Path,
477 pub error: &'a Error,
478 }
479
480 impl InternalEvent for PathGlobbingError<'_> {
481 fn emit(self) {
482 error!(
483 message = "Failed to glob path.",
484 error = %self.error,
485 error_code = "globbing",
486 error_type = error_type::READER_FAILED,
487 stage = error_stage::RECEIVING,
488 path = %self.path.display(),
489 );
490 counter!(
491 CounterName::ComponentErrorsTotal,
492 "error_code" => "globbing",
493 "error_type" => error_type::READER_FAILED,
494 "stage" => error_stage::RECEIVING,
495 )
496 .increment(1);
497 }
498 }
499
500 #[derive(Debug, NamedInternalEvent)]
501 pub struct FileLineTooBigError<'a> {
502 pub truncated_bytes: &'a BytesMut,
503 pub configured_limit: usize,
504 pub encountered_size_so_far: usize,
505 }
506
507 impl InternalEvent for FileLineTooBigError<'_> {
508 fn emit(self) {
509 error!(
510 message = "Found line that exceeds max_line_bytes; discarding.",
511 truncated_bytes = ?self.truncated_bytes,
512 configured_limit = self.configured_limit,
513 encountered_size_so_far = self.encountered_size_so_far,
514 error_type = error_type::CONDITION_FAILED,
515 stage = error_stage::RECEIVING,
516 );
517 counter!(
518 CounterName::ComponentErrorsTotal,
519 "error_code" => "reading_line_from_file",
520 "error_type" => error_type::CONDITION_FAILED,
521 "stage" => error_stage::RECEIVING,
522 )
523 .increment(1);
524 emit!(ComponentEventsDropped::<INTENTIONAL> {
525 count: 1,
526 reason: "Found line that exceeds max_line_bytes; discarding.",
527 });
528 }
529 }
530
531 #[derive(Clone)]
532 pub struct FileSourceInternalEventsEmitter {
533 pub include_file_metric_tag: bool,
534 }
535
536 impl FileSourceInternalEvents for FileSourceInternalEventsEmitter {
537 fn emit_file_added(&self, file: &Path) {
538 emit!(FileAdded {
539 file,
540 include_file_metric_tag: self.include_file_metric_tag
541 });
542 }
543
544 fn emit_file_resumed(&self, file: &Path, file_position: u64) {
545 emit!(FileResumed {
546 file,
547 file_position,
548 include_file_metric_tag: self.include_file_metric_tag
549 });
550 }
551
552 fn emit_file_watch_error(&self, file: &Path, error: Error) {
553 emit!(FileWatchError {
554 file,
555 error,
556 include_file_metric_tag: self.include_file_metric_tag
557 });
558 }
559
560 fn emit_file_unwatched(&self, file: &Path, reached_eof: bool) {
561 emit!(FileUnwatched {
562 file,
563 include_file_metric_tag: self.include_file_metric_tag,
564 reached_eof
565 });
566 }
567
568 fn emit_file_deleted(&self, file: &Path) {
569 emit!(FileDeleted {
570 file,
571 include_file_metric_tag: self.include_file_metric_tag
572 });
573 }
574
575 fn emit_file_delete_error(&self, file: &Path, error: Error) {
576 emit!(FileDeleteError {
577 file,
578 error,
579 include_file_metric_tag: self.include_file_metric_tag
580 });
581 }
582
583 fn emit_file_fingerprint_read_error(&self, file: &Path, error: Error) {
584 emit!(FileFingerprintReadError {
585 file,
586 error,
587 include_file_metric_tag: self.include_file_metric_tag
588 });
589 }
590
591 fn emit_file_checksum_failed(&self, file: &Path) {
592 emit!(FileChecksumFailed {
593 file,
594 include_file_metric_tag: self.include_file_metric_tag
595 });
596 }
597
598 fn emit_file_checkpointed(&self, count: usize, duration: Duration) {
599 emit!(FileCheckpointed { count, duration });
600 }
601
602 fn emit_file_checkpoint_write_error(&self, error: Error) {
603 emit!(FileCheckpointWriteError { error });
604 }
605
606 fn emit_files_open(&self, count: usize) {
607 emit!(FileOpen { count });
608 }
609
610 fn emit_path_globbing_failed(&self, path: &Path, error: &Error) {
611 emit!(PathGlobbingError { path, error });
612 }
613
614 fn emit_file_line_too_long(
615 &self,
616 truncated_bytes: &bytes::BytesMut,
617 configured_limit: usize,
618 encountered_size_so_far: usize,
619 ) {
620 emit!(FileLineTooBigError {
621 truncated_bytes,
622 configured_limit,
623 encountered_size_so_far
624 });
625 }
626 }
627}