1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::{BytesDeserializer, BytesDeserializerConfig},
14 config::{LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 file_source::{
17 file_server::{FileServer, Line, calculate_ignore_before},
18 paths_provider::{Glob, MatchOptions},
19 },
20 file_source_common::{
21 Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22 },
23 finalizer::OrderedFinalizer,
24 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30 SourceSender,
31 config::{
32 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33 log_schema,
34 },
35 encoding_transcode::{Decoder, Encoder},
36 event::{BatchNotifier, BatchStatus, LogEvent},
37 internal_events::{
38 FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39 FileSourceInternalEventsEmitter, StreamClosedError,
40 },
41 line_agg::{self, LineAgg},
42 serde::bool_or_struct,
43 shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48 #[snafu(display(
49 "message_start_indicator {:?} is not a valid regex: {}",
50 indicator,
51 source
52 ))]
53 InvalidMessageStartIndicator {
54 indicator: String,
55 source: regex::Error,
56 },
57}
58
59#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65 #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67 pub include: Vec<PathBuf>,
68
69 #[serde(default)]
75 #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76 pub exclude: Vec<PathBuf>,
77
78 #[serde(default = "default_file_key")]
84 #[configurable(metadata(docs::examples = "path"))]
85 pub file_key: OptionalValuePath,
86
87 #[configurable(
89 deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90 )]
91 #[configurable(metadata(docs::hidden))]
92 #[serde(default)]
93 pub start_at_beginning: Option<bool>,
94
95 #[serde(default)]
99 pub ignore_checkpoints: Option<bool>,
100
101 #[serde(default = "default_read_from")]
102 #[configurable(derived)]
103 pub read_from: ReadFromConfig,
104
105 #[serde(alias = "ignore_older", default)]
107 #[configurable(metadata(docs::type_unit = "seconds"))]
108 #[configurable(metadata(docs::examples = 600))]
109 #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110 pub ignore_older_secs: Option<u64>,
111
112 #[serde(default = "default_max_line_bytes")]
116 #[configurable(metadata(docs::type_unit = "bytes"))]
117 pub max_line_bytes: usize,
118
119 #[configurable(metadata(docs::examples = "hostname"))]
127 pub host_key: Option<OptionalValuePath>,
128
129 #[serde(default)]
138 #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139 #[configurable(metadata(docs::human_name = "Data Directory"))]
140 pub data_dir: Option<PathBuf>,
141
142 #[serde(default)]
148 #[configurable(metadata(docs::examples = "offset"))]
149 pub offset_key: Option<OptionalValuePath>,
150
151 #[serde(
157 alias = "glob_minimum_cooldown",
158 default = "default_glob_minimum_cooldown_ms"
159 )]
160 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161 #[configurable(metadata(docs::type_unit = "milliseconds"))]
162 #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163 pub glob_minimum_cooldown_ms: Duration,
164
165 #[configurable(derived)]
166 #[serde(alias = "fingerprinting", default)]
167 fingerprint: FingerprintConfig,
168
169 #[serde(default)]
173 pub ignore_not_found: bool,
174
175 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177 #[configurable(metadata(docs::hidden))]
178 #[serde(default)]
179 pub message_start_indicator: Option<String>,
180
181 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183 #[configurable(metadata(docs::hidden))]
184 #[serde(default = "default_multi_line_timeout")]
185 pub multi_line_timeout: u64,
186
187 #[configurable(derived)]
191 #[serde(default)]
192 pub multiline: Option<MultilineConfig>,
193
194 #[serde(default = "default_max_read_bytes")]
200 #[configurable(metadata(docs::type_unit = "bytes"))]
201 pub max_read_bytes: usize,
202
203 #[serde(default)]
205 pub oldest_first: bool,
206
207 #[serde(alias = "remove_after", default)]
211 #[configurable(metadata(docs::type_unit = "seconds"))]
212 #[configurable(metadata(docs::examples = 0))]
213 #[configurable(metadata(docs::examples = 5))]
214 #[configurable(metadata(docs::examples = 60))]
215 #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216 pub remove_after_secs: Option<u64>,
217
218 #[serde(default = "default_line_delimiter")]
220 #[configurable(metadata(docs::examples = "\r\n"))]
221 pub line_delimiter: String,
222
223 #[configurable(derived)]
224 #[serde(default)]
225 pub encoding: Option<EncodingConfig>,
226
227 #[configurable(derived)]
228 #[serde(default, deserialize_with = "bool_or_struct")]
229 acknowledgements: SourceAcknowledgementsConfig,
230
231 #[configurable(metadata(docs::hidden))]
233 #[serde(default)]
234 log_namespace: Option<bool>,
235
236 #[configurable(derived)]
237 #[serde(default)]
238 internal_metrics: FileInternalMetricsConfig,
239
240 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243 #[configurable(metadata(docs::type_unit = "seconds"))]
244 #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245 pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249 bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253 OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257 ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261 Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265 1000
266} const fn default_max_read_bytes() -> usize {
269 2048
270}
271
272fn default_line_delimiter() -> String {
273 "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277 Duration::from_secs(u64::MAX / 2)
278}
279
280#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287 docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290 Checksum {
292 #[serde(alias = "fingerprint_bytes")]
297 #[configurable(metadata(docs::hidden))]
298 #[configurable(metadata(docs::type_unit = "bytes"))]
299 bytes: Option<usize>,
300
301 #[serde(default = "default_ignored_header_bytes")]
307 #[configurable(metadata(docs::type_unit = "bytes"))]
308 ignored_header_bytes: usize,
309
310 #[serde(default = "default_lines")]
317 #[configurable(metadata(docs::type_unit = "lines"))]
318 lines: usize,
319 },
320
321 #[serde(rename = "device_and_inode")]
325 DevInode,
326}
327
328impl Default for FingerprintConfig {
329 fn default() -> Self {
330 Self::Checksum {
331 bytes: None,
332 ignored_header_bytes: 0,
333 lines: default_lines(),
334 }
335 }
336}
337
338const fn default_ignored_header_bytes() -> usize {
339 0
340}
341
342const fn default_lines() -> usize {
343 1
344}
345
346impl From<FingerprintConfig> for FingerprintStrategy {
347 fn from(config: FingerprintConfig) -> FingerprintStrategy {
348 match config {
349 FingerprintConfig::Checksum {
350 bytes,
351 ignored_header_bytes,
352 lines,
353 } => {
354 let bytes = match bytes {
355 Some(bytes) => {
356 warn!(
357 message = "The `fingerprint.bytes` option will be used to convert old file fingerprints created by vector < v0.11.0, but are not supported for new file fingerprints. The first line will be used instead."
358 );
359 bytes
360 }
361 None => 256,
362 };
363 FingerprintStrategy::Checksum {
364 bytes,
365 ignored_header_bytes,
366 lines,
367 }
368 }
369 FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
370 }
371 }
372}
373
374#[derive(Debug)]
375pub(crate) struct FinalizerEntry {
376 pub(crate) file_id: FileFingerprint,
377 pub(crate) offset: u64,
378}
379
380impl Default for FileConfig {
381 fn default() -> Self {
382 Self {
383 include: vec![PathBuf::from("/var/log/**/*.log")],
384 exclude: vec![],
385 file_key: default_file_key(),
386 start_at_beginning: None,
387 ignore_checkpoints: None,
388 read_from: default_read_from(),
389 ignore_older_secs: None,
390 max_line_bytes: default_max_line_bytes(),
391 fingerprint: FingerprintConfig::default(),
392 ignore_not_found: false,
393 host_key: None,
394 offset_key: None,
395 data_dir: None,
396 glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
397 message_start_indicator: None,
398 multi_line_timeout: default_multi_line_timeout(), multiline: None,
400 max_read_bytes: default_max_read_bytes(),
401 oldest_first: false,
402 remove_after_secs: None,
403 line_delimiter: default_line_delimiter(),
404 encoding: None,
405 acknowledgements: Default::default(),
406 log_namespace: None,
407 internal_metrics: Default::default(),
408 rotate_wait: default_rotate_wait(),
409 }
410 }
411}
412
413impl_generate_config_from_default!(FileConfig);
414
415#[async_trait::async_trait]
416#[typetag::serde(name = "file")]
417impl SourceConfig for FileConfig {
418 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
419 let data_dir = cx
424 .globals
425 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
427
428 #[allow(clippy::suspicious_else_formatting)]
430 {
431 if let Some(ref config) = self.multiline {
432 let _: line_agg::Config = config.try_into()?;
433 }
434
435 if let Some(ref indicator) = self.message_start_indicator {
436 Regex::new(indicator)
437 .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
438 }
439 }
440
441 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
442
443 let log_namespace = cx.log_namespace(self.log_namespace);
444
445 Ok(file_source(
446 self,
447 data_dir,
448 cx.shutdown,
449 cx.out,
450 acknowledgements,
451 log_namespace,
452 ))
453 }
454
455 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
456 let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
457 let host_key = self
458 .host_key
459 .clone()
460 .unwrap_or(log_schema().host_key().cloned().into())
461 .path
462 .map(LegacyKey::Overwrite);
463
464 let offset_key = self
465 .offset_key
466 .clone()
467 .and_then(|k| k.path)
468 .map(LegacyKey::Overwrite);
469
470 let schema_definition = BytesDeserializerConfig
471 .schema_definition(global_log_namespace.merge(self.log_namespace))
472 .with_standard_vector_source_metadata()
473 .with_source_metadata(
474 Self::NAME,
475 host_key,
476 &owned_value_path!("host"),
477 Kind::bytes().or_undefined(),
478 Some("host"),
479 )
480 .with_source_metadata(
481 Self::NAME,
482 offset_key,
483 &owned_value_path!("offset"),
484 Kind::integer(),
485 None,
486 )
487 .with_source_metadata(
488 Self::NAME,
489 file_key,
490 &owned_value_path!("path"),
491 Kind::bytes(),
492 None,
493 );
494
495 vec![SourceOutput::new_maybe_logs(
496 DataType::Log,
497 schema_definition,
498 )]
499 }
500
501 fn can_acknowledge(&self) -> bool {
502 true
503 }
504}
505
506pub fn file_source(
507 config: &FileConfig,
508 data_dir: PathBuf,
509 shutdown: ShutdownSignal,
510 mut out: SourceSender,
511 acknowledgements: bool,
512 log_namespace: LogNamespace,
513) -> super::Source {
514 if config.include.is_empty() {
516 error!(message = "`include` configuration option must contain at least one file pattern.");
517 return Box::pin(future::ready(Err(())));
518 }
519
520 let exclude_patterns = config
521 .exclude
522 .iter()
523 .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
524 .collect::<Vec<PathBuf>>();
525 let ignore_before = calculate_ignore_before(config.ignore_older_secs);
526 let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
527 let (ignore_checkpoints, read_from) = reconcile_position_options(
528 config.start_at_beginning,
529 config.ignore_checkpoints,
530 Some(config.read_from),
531 );
532
533 let emitter = FileSourceInternalEventsEmitter {
534 include_file_metric_tag: config.internal_metrics.include_file_tag,
535 };
536
537 let paths_provider = Glob::new(
538 &config.include,
539 &exclude_patterns,
540 MatchOptions::default(),
541 emitter.clone(),
542 )
543 .expect("invalid glob patterns");
544
545 let encoding_charset = config.encoding.clone().map(|e| e.charset);
546
547 let line_delimiter_as_bytes = match encoding_charset {
550 Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
551 None => Bytes::from(config.line_delimiter.clone()),
552 };
553
554 let checkpointer = Checkpointer::new(&data_dir);
555 let file_server = FileServer {
556 paths_provider,
557 max_read_bytes: config.max_read_bytes,
558 ignore_checkpoints,
559 read_from,
560 ignore_before,
561 max_line_bytes: config.max_line_bytes,
562 line_delimiter: line_delimiter_as_bytes,
563 data_dir,
564 glob_minimum_cooldown,
565 fingerprinter: Fingerprinter {
566 strategy: config.fingerprint.clone().into(),
567 max_line_length: config.max_line_bytes,
568 ignore_not_found: config.ignore_not_found,
569 },
570 oldest_first: config.oldest_first,
571 remove_after: config.remove_after_secs.map(Duration::from_secs),
572 emitter,
573 rotate_wait: config.rotate_wait,
574 };
575
576 let event_metadata = EventMetadata {
577 host_key: config
578 .host_key
579 .clone()
580 .unwrap_or(log_schema().host_key().cloned().into())
581 .path,
582 hostname: crate::get_hostname().ok(),
583 file_key: config.file_key.clone().path,
584 offset_key: config.offset_key.clone().and_then(|k| k.path),
585 };
586
587 let include = config.include.clone();
588 let exclude = config.exclude.clone();
589 let multiline_config = config.multiline.clone();
590 let message_start_indicator = config.message_start_indicator.clone();
591 let multi_line_timeout = config.multi_line_timeout;
592
593 let (finalizer, shutdown_checkpointer) = if acknowledgements {
594 let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
598
599 let (send_shutdown, shutdown2) = oneshot::channel::<()>();
604 let checkpoints = checkpointer.view();
605 tokio::spawn(async move {
606 while let Some((status, entry)) = ack_stream.next().await {
607 if status == BatchStatus::Delivered {
608 checkpoints.update(entry.file_id, entry.offset);
609 }
610 }
611 send_shutdown.send(())
612 });
613 (Some(finalizer), shutdown2.map(|_| ()).boxed())
614 } else {
615 (None, shutdown.clone().map(|_| ()).boxed())
618 };
619
620 let checkpoints = checkpointer.view();
621 let include_file_metric_tag = config.internal_metrics.include_file_tag;
622 Box::pin(async move {
623 info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
624
625 let mut encoding_decoder = encoding_charset.map(Decoder::new);
626
627 let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
629 let rx = rx
630 .map(futures::stream::iter)
631 .flatten()
632 .map(move |mut line| {
633 emit!(FileBytesReceived {
634 byte_size: line.text.len(),
635 file: &line.filename,
636 include_file_metric_tag,
637 });
638 line.text = match encoding_decoder.as_mut() {
640 Some(d) => d.decode_to_utf8(line.text),
641 None => line.text,
642 };
643 line
644 });
645
646 let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
647 if let Some(ref multiline_config) = multiline_config {
648 wrap_with_line_agg(
649 rx,
650 multiline_config.try_into().unwrap(), )
652 } else if let Some(msi) = message_start_indicator {
653 wrap_with_line_agg(
654 rx,
655 line_agg::Config::for_legacy(
656 Regex::new(&msi).unwrap(), multi_line_timeout,
658 ),
659 )
660 } else {
661 Box::new(rx)
662 };
663
664 let span = Span::current();
667 let mut messages = messages.map(move |line| {
668 let mut event = create_event(
669 line.text,
670 line.start_offset,
671 &line.filename,
672 &event_metadata,
673 log_namespace,
674 include_file_metric_tag,
675 );
676
677 if let Some(finalizer) = &finalizer {
678 let (batch, receiver) = BatchNotifier::new_with_receiver();
679 event = event.with_batch_notifier(&batch);
680 let entry = FinalizerEntry {
681 file_id: line.file_id,
682 offset: line.end_offset,
683 };
684 finalizer.add(entry, receiver);
685 } else {
686 checkpoints.update(line.file_id, line.end_offset);
687 }
688 event
689 });
690 tokio::spawn(async move {
691 match out
692 .send_event_stream(&mut messages)
693 .instrument(span.or_current())
694 .await
695 {
696 Ok(()) => {
697 debug!("Finished sending.");
698 }
699 Err(_) => {
700 let (count, _) = messages.size_hint();
701 emit!(StreamClosedError { count });
702 }
703 }
704 });
705
706 let span = info_span!("file_server");
707 tokio::task::spawn_blocking(move || {
708 let _enter = span.enter();
709 let rt = tokio::runtime::Handle::current();
710 let result =
711 rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
712 emit!(FileOpen { count: 0 });
713 result.expect("file server exited with an error");
717 })
718 .map_err(|error| error!(message="File server unexpectedly stopped.", %error))
719 .await
720 })
721}
722
723fn reconcile_position_options(
726 start_at_beginning: Option<bool>,
727 ignore_checkpoints: Option<bool>,
728 read_from: Option<ReadFromConfig>,
729) -> (bool, ReadFrom) {
730 if start_at_beginning.is_some() {
731 warn!(
732 message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
733 )
734 }
735
736 match start_at_beginning {
737 Some(true) => (
738 ignore_checkpoints.unwrap_or(true),
739 read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
740 ),
741 _ => (
742 ignore_checkpoints.unwrap_or(false),
743 read_from.map(Into::into).unwrap_or_default(),
744 ),
745 }
746}
747
748fn wrap_with_line_agg(
749 rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
750 config: line_agg::Config,
751) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
752 let logic = line_agg::Logic::new(config);
753 Box::new(
754 LineAgg::new(
755 rx.map(|line| {
756 (
757 line.filename,
758 line.text,
759 (line.file_id, line.start_offset, line.end_offset),
760 )
761 }),
762 logic,
763 )
764 .map(
765 |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
766 text,
767 filename,
768 file_id,
769 start_offset,
770 end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
771 lastline_end_offset
772 }),
773 },
774 ),
775 )
776}
777
778struct EventMetadata {
779 host_key: Option<OwnedValuePath>,
780 hostname: Option<String>,
781 file_key: Option<OwnedValuePath>,
782 offset_key: Option<OwnedValuePath>,
783}
784
785fn create_event(
786 line: Bytes,
787 offset: u64,
788 file: &str,
789 meta: &EventMetadata,
790 log_namespace: LogNamespace,
791 include_file_metric_tag: bool,
792) -> LogEvent {
793 let deserializer = BytesDeserializer;
794 let mut event = deserializer.parse_single(line, log_namespace);
795
796 log_namespace.insert_vector_metadata(
797 &mut event,
798 log_schema().source_type_key(),
799 path!("source_type"),
800 Bytes::from_static(FileConfig::NAME.as_bytes()),
801 );
802 log_namespace.insert_vector_metadata(
803 &mut event,
804 log_schema().timestamp_key(),
805 path!("ingest_timestamp"),
806 Utc::now(),
807 );
808
809 let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
810 if let Some(hostname) = &meta.hostname {
812 log_namespace.insert_source_metadata(
813 FileConfig::NAME,
814 &mut event,
815 legacy_host_key,
816 path!("host"),
817 hostname.clone(),
818 );
819 }
820
821 let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
822 log_namespace.insert_source_metadata(
823 FileConfig::NAME,
824 &mut event,
825 legacy_offset_key,
826 path!("offset"),
827 offset,
828 );
829
830 let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
831 log_namespace.insert_source_metadata(
832 FileConfig::NAME,
833 &mut event,
834 legacy_file_key,
835 path!("path"),
836 file,
837 );
838
839 emit!(FileEventsReceived {
840 count: 1,
841 file,
842 byte_size: event.estimated_json_encoded_size_of(),
843 include_file_metric_tag,
844 });
845
846 event
847}
848
849#[cfg(test)]
850mod tests {
851 use std::{
852 collections::HashSet,
853 fs::{self, File},
854 future::Future,
855 io::{Seek, Write},
856 };
857
858 use encoding_rs::UTF_16LE;
859 use similar_asserts::assert_eq;
860 use tempfile::tempdir;
861 use tokio::time::{Duration, sleep, timeout};
862 use vector_lib::schema::Definition;
863 use vrl::{value, value::kind::Collection};
864
865 use super::*;
866 use crate::{
867 config::Config,
868 event::{Event, EventStatus, Value},
869 shutdown::ShutdownSignal,
870 sources::file,
871 test_util::components::{FILE_SOURCE_TAGS, assert_source_compliance},
872 };
873
874 #[test]
875 fn generate_config() {
876 crate::test_util::test_generate_config::<FileConfig>();
877 }
878
879 fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
880 file::FileConfig {
881 fingerprint: FingerprintConfig::Checksum {
882 bytes: Some(8),
883 ignored_header_bytes: 0,
884 lines: 1,
885 },
886 data_dir: Some(dir.path().to_path_buf()),
887 glob_minimum_cooldown_ms: Duration::from_millis(100),
888 internal_metrics: FileInternalMetricsConfig {
889 include_file_tag: true,
890 },
891 ..Default::default()
892 }
893 }
894
895 async fn sleep_500_millis() {
896 sleep(Duration::from_millis(500)).await;
897 }
898
899 #[test]
900 fn parse_config() {
901 let config: FileConfig = toml::from_str(
902 r#"
903 include = [ "/var/log/**/*.log" ]
904 file_key = "file"
905 glob_minimum_cooldown_ms = 1000
906 multi_line_timeout = 1000
907 max_read_bytes = 2048
908 line_delimiter = "\n"
909 "#,
910 )
911 .unwrap();
912 assert_eq!(config, FileConfig::default());
913 assert_eq!(
914 config.fingerprint,
915 FingerprintConfig::Checksum {
916 bytes: None,
917 ignored_header_bytes: 0,
918 lines: 1
919 }
920 );
921
922 let config: FileConfig = toml::from_str(
923 r#"
924 include = [ "/var/log/**/*.log" ]
925 [fingerprint]
926 strategy = "device_and_inode"
927 "#,
928 )
929 .unwrap();
930 assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
931
932 let config: FileConfig = toml::from_str(
933 r#"
934 include = [ "/var/log/**/*.log" ]
935 [fingerprint]
936 strategy = "checksum"
937 bytes = 128
938 ignored_header_bytes = 512
939 "#,
940 )
941 .unwrap();
942 assert_eq!(
943 config.fingerprint,
944 FingerprintConfig::Checksum {
945 bytes: Some(128),
946 ignored_header_bytes: 512,
947 lines: 1
948 }
949 );
950
951 let config: FileConfig = toml::from_str(
952 r#"
953 include = [ "/var/log/**/*.log" ]
954 [encoding]
955 charset = "utf-16le"
956 "#,
957 )
958 .unwrap();
959 assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
960
961 let config: FileConfig = toml::from_str(
962 r#"
963 include = [ "/var/log/**/*.log" ]
964 read_from = "beginning"
965 "#,
966 )
967 .unwrap();
968 assert_eq!(config.read_from, ReadFromConfig::Beginning);
969
970 let config: FileConfig = toml::from_str(
971 r#"
972 include = [ "/var/log/**/*.log" ]
973 read_from = "end"
974 "#,
975 )
976 .unwrap();
977 assert_eq!(config.read_from, ReadFromConfig::End);
978 }
979
980 #[test]
981 fn resolve_data_dir() {
982 let global_dir = tempdir().unwrap();
983 let local_dir = tempdir().unwrap();
984
985 let mut config = Config::default();
986 config.global.data_dir = global_dir.keep().into();
987
988 let res = config
990 .global
991 .resolve_and_validate_data_dir(test_default_file_config(&local_dir).data_dir.as_ref())
992 .unwrap();
993 assert_eq!(res, local_dir.path());
994
995 let res = config.global.resolve_and_validate_data_dir(None).unwrap();
997 assert_eq!(res, config.global.data_dir.unwrap());
998 }
999
1000 #[test]
1001 fn output_schema_definition_vector_namespace() {
1002 let definitions = FileConfig::default()
1003 .outputs(LogNamespace::Vector)
1004 .remove(0)
1005 .schema_definition(true);
1006
1007 assert_eq!(
1008 definitions,
1009 Some(
1010 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1011 .with_meaning(OwnedTargetPath::event_root(), "message")
1012 .with_metadata_field(
1013 &owned_value_path!("vector", "source_type"),
1014 Kind::bytes(),
1015 None
1016 )
1017 .with_metadata_field(
1018 &owned_value_path!("vector", "ingest_timestamp"),
1019 Kind::timestamp(),
1020 None
1021 )
1022 .with_metadata_field(
1023 &owned_value_path!("file", "host"),
1024 Kind::bytes().or_undefined(),
1025 Some("host")
1026 )
1027 .with_metadata_field(
1028 &owned_value_path!("file", "offset"),
1029 Kind::integer(),
1030 None
1031 )
1032 .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1033 )
1034 )
1035 }
1036
1037 #[test]
1038 fn output_schema_definition_legacy_namespace() {
1039 let definitions = FileConfig::default()
1040 .outputs(LogNamespace::Legacy)
1041 .remove(0)
1042 .schema_definition(true);
1043
1044 assert_eq!(
1045 definitions,
1046 Some(
1047 Definition::new_with_default_metadata(
1048 Kind::object(Collection::empty()),
1049 [LogNamespace::Legacy]
1050 )
1051 .with_event_field(
1052 &owned_value_path!("message"),
1053 Kind::bytes(),
1054 Some("message")
1055 )
1056 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1057 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1058 .with_event_field(
1059 &owned_value_path!("host"),
1060 Kind::bytes().or_undefined(),
1061 Some("host")
1062 )
1063 .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1064 .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1065 )
1066 )
1067 }
1068
1069 #[test]
1070 fn create_event_legacy_namespace() {
1071 let line = Bytes::from("hello world");
1072 let file = "some_file.rs";
1073 let offset: u64 = 0;
1074
1075 let meta = EventMetadata {
1076 host_key: Some(owned_value_path!("host")),
1077 hostname: Some("Some.Machine".to_string()),
1078 file_key: Some(owned_value_path!("file")),
1079 offset_key: Some(owned_value_path!("offset")),
1080 };
1081 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1082
1083 assert_eq!(log["file"], "some_file.rs".into());
1084 assert_eq!(log["host"], "Some.Machine".into());
1085 assert_eq!(log["offset"], 0.into());
1086 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1087 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1088 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1089 }
1090
1091 #[test]
1092 fn create_event_custom_fields_legacy_namespace() {
1093 let line = Bytes::from("hello world");
1094 let file = "some_file.rs";
1095 let offset: u64 = 0;
1096
1097 let meta = EventMetadata {
1098 host_key: Some(owned_value_path!("hostname")),
1099 hostname: Some("Some.Machine".to_string()),
1100 file_key: Some(owned_value_path!("file_path")),
1101 offset_key: Some(owned_value_path!("off")),
1102 };
1103 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1104
1105 assert_eq!(log["file_path"], "some_file.rs".into());
1106 assert_eq!(log["hostname"], "Some.Machine".into());
1107 assert_eq!(log["off"], 0.into());
1108 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1109 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1110 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1111 }
1112
1113 #[test]
1114 fn create_event_vector_namespace() {
1115 let line = Bytes::from("hello world");
1116 let file = "some_file.rs";
1117 let offset: u64 = 0;
1118
1119 let meta = EventMetadata {
1120 host_key: Some(owned_value_path!("ignored")),
1121 hostname: Some("Some.Machine".to_string()),
1122 file_key: Some(owned_value_path!("ignored")),
1123 offset_key: Some(owned_value_path!("ignored")),
1124 };
1125 let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1126
1127 assert_eq!(log.value(), &value!("hello world"));
1128
1129 assert_eq!(
1130 log.metadata()
1131 .value()
1132 .get(path!("vector", "source_type"))
1133 .unwrap(),
1134 &value!("file")
1135 );
1136 assert!(
1137 log.metadata()
1138 .value()
1139 .get(path!("vector", "ingest_timestamp"))
1140 .unwrap()
1141 .is_timestamp()
1142 );
1143
1144 assert_eq!(
1145 log.metadata()
1146 .value()
1147 .get(path!(FileConfig::NAME, "host"))
1148 .unwrap(),
1149 &value!("Some.Machine")
1150 );
1151 assert_eq!(
1152 log.metadata()
1153 .value()
1154 .get(path!(FileConfig::NAME, "offset"))
1155 .unwrap(),
1156 &value!(0)
1157 );
1158 assert_eq!(
1159 log.metadata()
1160 .value()
1161 .get(path!(FileConfig::NAME, "path"))
1162 .unwrap(),
1163 &value!("some_file.rs")
1164 );
1165 }
1166
1167 #[tokio::test]
1168 async fn file_happy_path() {
1169 let n = 5;
1170
1171 let dir = tempdir().unwrap();
1172 let config = file::FileConfig {
1173 include: vec![dir.path().join("*")],
1174 ..test_default_file_config(&dir)
1175 };
1176
1177 let path1 = dir.path().join("file1");
1178 let path2 = dir.path().join("file2");
1179
1180 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1181 let mut file1 = File::create(&path1).unwrap();
1182 let mut file2 = File::create(&path2).unwrap();
1183
1184 sleep_500_millis().await; for i in 0..n {
1187 writeln!(&mut file1, "hello {i}").unwrap();
1188 writeln!(&mut file2, "goodbye {i}").unwrap();
1189 }
1190
1191 sleep_500_millis().await;
1192 })
1193 .await;
1194
1195 let mut hello_i = 0;
1196 let mut goodbye_i = 0;
1197
1198 for event in received {
1199 let line =
1200 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1201 if line.starts_with("hello") {
1202 assert_eq!(line, format!("hello {}", hello_i));
1203 assert_eq!(
1204 event.as_log()["file"].to_string_lossy(),
1205 path1.to_str().unwrap()
1206 );
1207 hello_i += 1;
1208 } else {
1209 assert_eq!(line, format!("goodbye {}", goodbye_i));
1210 assert_eq!(
1211 event.as_log()["file"].to_string_lossy(),
1212 path2.to_str().unwrap()
1213 );
1214 goodbye_i += 1;
1215 }
1216 }
1217 assert_eq!(hello_i, n);
1218 assert_eq!(goodbye_i, n);
1219 }
1220
1221 #[tokio::test]
1223 async fn file_read_empty_lines() {
1224 let n = 5;
1225
1226 let dir = tempdir().unwrap();
1227 let config = file::FileConfig {
1228 include: vec![dir.path().join("*")],
1229 ..test_default_file_config(&dir)
1230 };
1231
1232 let path = dir.path().join("file");
1233
1234 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1235 let mut file = File::create(&path).unwrap();
1236
1237 sleep_500_millis().await; writeln!(&mut file, "line for checkpointing").unwrap();
1240 for _i in 0..n {
1241 writeln!(&mut file).unwrap();
1242 }
1243
1244 sleep_500_millis().await;
1245 })
1246 .await;
1247
1248 assert_eq!(received.len(), n + 1);
1249 }
1250
1251 #[tokio::test]
1252 async fn file_truncate() {
1253 let n = 5;
1254
1255 let dir = tempdir().unwrap();
1256 let config = file::FileConfig {
1257 include: vec![dir.path().join("*")],
1258 ..test_default_file_config(&dir)
1259 };
1260 let path = dir.path().join("file");
1261 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1262 let mut file = File::create(&path).unwrap();
1263
1264 sleep_500_millis().await; for i in 0..n {
1267 writeln!(&mut file, "pretrunc {i}").unwrap();
1268 }
1269
1270 sleep_500_millis().await; file.set_len(0).unwrap();
1273 file.seek(std::io::SeekFrom::Start(0)).unwrap();
1274
1275 sleep_500_millis().await; for i in 0..n {
1278 writeln!(&mut file, "posttrunc {i}").unwrap();
1279 }
1280
1281 sleep_500_millis().await;
1282 })
1283 .await;
1284
1285 let mut i = 0;
1286 let mut pre_trunc = true;
1287
1288 for event in received {
1289 assert_eq!(
1290 event.as_log()["file"].to_string_lossy(),
1291 path.to_str().unwrap()
1292 );
1293
1294 let line =
1295 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1296
1297 if pre_trunc {
1298 assert_eq!(line, format!("pretrunc {}", i));
1299 } else {
1300 assert_eq!(line, format!("posttrunc {}", i));
1301 }
1302
1303 i += 1;
1304 if i == n {
1305 i = 0;
1306 pre_trunc = false;
1307 }
1308 }
1309 }
1310
1311 #[tokio::test]
1312 async fn file_rotate() {
1313 let n = 5;
1314
1315 let dir = tempdir().unwrap();
1316 let config = file::FileConfig {
1317 include: vec![dir.path().join("*")],
1318 ..test_default_file_config(&dir)
1319 };
1320
1321 let path = dir.path().join("file");
1322 let archive_path = dir.path().join("file");
1323 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1324 let mut file = File::create(&path).unwrap();
1325
1326 sleep_500_millis().await; for i in 0..n {
1329 writeln!(&mut file, "prerot {i}").unwrap();
1330 }
1331
1332 sleep_500_millis().await; fs::rename(&path, archive_path).expect("could not rename");
1335 let mut file = File::create(&path).unwrap();
1336
1337 sleep_500_millis().await; for i in 0..n {
1340 writeln!(&mut file, "postrot {i}").unwrap();
1341 }
1342
1343 sleep_500_millis().await;
1344 })
1345 .await;
1346
1347 let mut i = 0;
1348 let mut pre_rot = true;
1349
1350 for event in received {
1351 assert_eq!(
1352 event.as_log()["file"].to_string_lossy(),
1353 path.to_str().unwrap()
1354 );
1355
1356 let line =
1357 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1358
1359 if pre_rot {
1360 assert_eq!(line, format!("prerot {}", i));
1361 } else {
1362 assert_eq!(line, format!("postrot {}", i));
1363 }
1364
1365 i += 1;
1366 if i == n {
1367 i = 0;
1368 pre_rot = false;
1369 }
1370 }
1371 }
1372
1373 #[tokio::test]
1374 async fn file_multiple_paths() {
1375 let n = 5;
1376
1377 let dir = tempdir().unwrap();
1378 let config = file::FileConfig {
1379 include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1380 exclude: vec![dir.path().join("a.*.txt")],
1381 ..test_default_file_config(&dir)
1382 };
1383
1384 let path1 = dir.path().join("a.txt");
1385 let path2 = dir.path().join("b.txt");
1386 let path3 = dir.path().join("a.log");
1387 let path4 = dir.path().join("a.ignore.txt");
1388 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1389 let mut file1 = File::create(&path1).unwrap();
1390 let mut file2 = File::create(&path2).unwrap();
1391 let mut file3 = File::create(&path3).unwrap();
1392 let mut file4 = File::create(&path4).unwrap();
1393
1394 sleep_500_millis().await; for i in 0..n {
1397 writeln!(&mut file1, "1 {i}").unwrap();
1398 writeln!(&mut file2, "2 {i}").unwrap();
1399 writeln!(&mut file3, "3 {i}").unwrap();
1400 writeln!(&mut file4, "4 {i}").unwrap();
1401 }
1402
1403 sleep_500_millis().await;
1404 })
1405 .await;
1406
1407 let mut is = [0; 3];
1408
1409 for event in received {
1410 let line =
1411 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1412 let mut split = line.split(' ');
1413 let file = split.next().unwrap().parse::<usize>().unwrap();
1414 assert_ne!(file, 4);
1415 let i = split.next().unwrap().parse::<usize>().unwrap();
1416
1417 assert_eq!(is[file - 1], i);
1418 is[file - 1] += 1;
1419 }
1420
1421 assert_eq!(is, [n as usize; 3]);
1422 }
1423
1424 #[tokio::test]
1425 async fn file_exclude_paths() {
1426 let n = 5;
1427
1428 let dir = tempdir().unwrap();
1429 let config = file::FileConfig {
1430 include: vec![dir.path().join("a//b/*.log.*")],
1431 exclude: vec![dir.path().join("a//b/test.log.*")],
1432 ..test_default_file_config(&dir)
1433 };
1434
1435 let path1 = dir.path().join("a//b/a.log.1");
1436 let path2 = dir.path().join("a//b/test.log.1");
1437 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1438 std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1439 let mut file1 = File::create(&path1).unwrap();
1440 let mut file2 = File::create(&path2).unwrap();
1441
1442 sleep_500_millis().await; for i in 0..n {
1445 writeln!(&mut file1, "1 {i}").unwrap();
1446 writeln!(&mut file2, "2 {i}").unwrap();
1447 }
1448
1449 sleep_500_millis().await;
1450 })
1451 .await;
1452
1453 let mut is = [0; 1];
1454
1455 for event in received {
1456 let line =
1457 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1458 let mut split = line.split(' ');
1459 let file = split.next().unwrap().parse::<usize>().unwrap();
1460 assert_ne!(file, 4);
1461 let i = split.next().unwrap().parse::<usize>().unwrap();
1462
1463 assert_eq!(is[file - 1], i);
1464 is[file - 1] += 1;
1465 }
1466
1467 assert_eq!(is, [n as usize; 1]);
1468 }
1469
1470 #[tokio::test]
1471 async fn file_key_acknowledged() {
1472 file_key(Acks).await
1473 }
1474
1475 #[tokio::test]
1476 async fn file_key_no_acknowledge() {
1477 file_key(NoAcks).await
1478 }
1479
1480 async fn file_key(acks: AckingMode) {
1481 {
1483 let dir = tempdir().unwrap();
1484 let config = file::FileConfig {
1485 include: vec![dir.path().join("*")],
1486 ..test_default_file_config(&dir)
1487 };
1488
1489 let path = dir.path().join("file");
1490 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1491 let mut file = File::create(&path).unwrap();
1492
1493 sleep_500_millis().await;
1494
1495 writeln!(&mut file, "hello there").unwrap();
1496
1497 sleep_500_millis().await;
1498 })
1499 .await;
1500
1501 assert_eq!(received.len(), 1);
1502 assert_eq!(
1503 received[0].as_log()["file"].to_string_lossy(),
1504 path.to_str().unwrap()
1505 );
1506 }
1507
1508 {
1510 let dir = tempdir().unwrap();
1511 let config = file::FileConfig {
1512 include: vec![dir.path().join("*")],
1513 file_key: OptionalValuePath::from(owned_value_path!("source")),
1514 ..test_default_file_config(&dir)
1515 };
1516
1517 let path = dir.path().join("file");
1518 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1519 let mut file = File::create(&path).unwrap();
1520
1521 sleep_500_millis().await;
1522
1523 writeln!(&mut file, "hello there").unwrap();
1524
1525 sleep_500_millis().await;
1526 })
1527 .await;
1528
1529 assert_eq!(received.len(), 1);
1530 assert_eq!(
1531 received[0].as_log()["source"].to_string_lossy(),
1532 path.to_str().unwrap()
1533 );
1534 }
1535
1536 {
1538 let dir = tempdir().unwrap();
1539 let config = file::FileConfig {
1540 include: vec![dir.path().join("*")],
1541 ..test_default_file_config(&dir)
1542 };
1543
1544 let path = dir.path().join("file");
1545 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1546 let mut file = File::create(&path).unwrap();
1547
1548 sleep_500_millis().await;
1549
1550 writeln!(&mut file, "hello there").unwrap();
1551
1552 sleep_500_millis().await;
1553 })
1554 .await;
1555
1556 assert_eq!(received.len(), 1);
1557 assert_eq!(
1558 received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1559 vec![
1560 default_file_key()
1561 .path
1562 .expect("file key to exist")
1563 .to_string()
1564 .into(),
1565 log_schema().host_key().unwrap().to_string().into(),
1566 log_schema().message_key().unwrap().to_string().into(),
1567 log_schema().timestamp_key().unwrap().to_string().into(),
1568 log_schema().source_type_key().unwrap().to_string().into()
1569 ]
1570 .into_iter()
1571 .collect::<HashSet<_>>()
1572 );
1573 }
1574 }
1575
1576 #[cfg(target_os = "linux")] #[tokio::test]
1578 async fn file_start_position_server_restart_acknowledged() {
1579 file_start_position_server_restart(Acks).await
1580 }
1581
1582 #[cfg(target_os = "linux")] #[tokio::test]
1584 async fn file_start_position_server_restart_no_acknowledge() {
1585 file_start_position_server_restart(NoAcks).await
1586 }
1587
1588 #[cfg(target_os = "linux")] async fn file_start_position_server_restart(acking: AckingMode) {
1590 let dir = tempdir().unwrap();
1591 let config = file::FileConfig {
1592 include: vec![dir.path().join("*")],
1593 ..test_default_file_config(&dir)
1594 };
1595
1596 let path = dir.path().join("file");
1597 let mut file = File::create(&path).unwrap();
1598 writeln!(&mut file, "zeroth line").unwrap();
1599 sleep_500_millis().await;
1600
1601 {
1603 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1604 sleep_500_millis().await;
1605 writeln!(&mut file, "first line").unwrap();
1606 sleep_500_millis().await;
1607 })
1608 .await;
1609
1610 let lines = extract_messages_string(received);
1611 assert_eq!(lines, vec!["zeroth line", "first line"]);
1612 }
1613 {
1615 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1616 sleep_500_millis().await;
1617 writeln!(&mut file, "second line").unwrap();
1618 sleep_500_millis().await;
1619 })
1620 .await;
1621
1622 let lines = extract_messages_string(received);
1623 assert_eq!(lines, vec!["second line"]);
1624 }
1625 {
1627 let config = file::FileConfig {
1628 include: vec![dir.path().join("*")],
1629 ignore_checkpoints: Some(true),
1630 read_from: ReadFromConfig::Beginning,
1631 ..test_default_file_config(&dir)
1632 };
1633 let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1634 sleep_500_millis().await;
1635 writeln!(&mut file, "third line").unwrap();
1636 sleep_500_millis().await;
1637 })
1638 .await;
1639
1640 let lines = extract_messages_string(received);
1641 assert_eq!(
1642 lines,
1643 vec!["zeroth line", "first line", "second line", "third line"]
1644 );
1645 }
1646 }
1647
1648 #[tokio::test]
1649 async fn file_start_position_server_restart_unfinalized() {
1650 let dir = tempdir().unwrap();
1651 let config = file::FileConfig {
1652 include: vec![dir.path().join("*")],
1653 ..test_default_file_config(&dir)
1654 };
1655
1656 let path = dir.path().join("file");
1657 let mut file = File::create(&path).unwrap();
1658 writeln!(&mut file, "the line").unwrap();
1659 sleep_500_millis().await;
1660
1661 let received = run_file_source(
1663 &config,
1664 false,
1665 Unfinalized,
1666 LogNamespace::Legacy,
1667 sleep_500_millis(),
1668 )
1669 .await;
1670 let lines = extract_messages_string(received);
1671 assert_eq!(lines, vec!["the line"]);
1672
1673 let received = run_file_source(
1675 &config,
1676 false,
1677 Unfinalized,
1678 LogNamespace::Legacy,
1679 sleep_500_millis(),
1680 )
1681 .await;
1682 let lines = extract_messages_string(received);
1683 assert_eq!(lines, vec!["the line"]);
1684 }
1685
1686 #[tokio::test]
1687 async fn file_duplicate_processing_after_restart() {
1688 let dir = tempdir().unwrap();
1689 let config = file::FileConfig {
1690 include: vec![dir.path().join("*")],
1691 ..test_default_file_config(&dir)
1692 };
1693
1694 let path = dir.path().join("file");
1695 let mut file = File::create(&path).unwrap();
1696
1697 let line_count = 4000;
1698 for i in 0..line_count {
1699 writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1700 }
1701 file.flush().unwrap();
1702 sleep_500_millis().await;
1703
1704 let received = run_file_source(
1706 &config,
1707 true,
1708 Acks,
1709 LogNamespace::Legacy,
1710 sleep_500_millis(),
1712 )
1713 .await;
1714 let lines = extract_messages_string(received);
1715
1716 assert!(lines.len() < line_count);
1719
1720 let received = run_file_source(
1722 &config,
1723 true,
1724 Acks,
1725 LogNamespace::Legacy,
1726 sleep(Duration::from_secs(5)),
1727 )
1728 .await;
1729 let lines2 = extract_messages_string(received);
1730
1731 assert_eq!(lines.len() + lines2.len(), line_count);
1733 }
1734
1735 #[tokio::test]
1736 async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1737 file_start_position_server_restart_with_file_rotation(Acks).await
1738 }
1739
1740 #[tokio::test]
1741 async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1742 file_start_position_server_restart_with_file_rotation(NoAcks).await
1743 }
1744
1745 async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1746 let dir = tempdir().unwrap();
1747 let config = file::FileConfig {
1748 include: vec![dir.path().join("*")],
1749 ..test_default_file_config(&dir)
1750 };
1751
1752 let path = dir.path().join("file");
1753 let path_for_old_file = dir.path().join("file.old");
1754 {
1756 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1757 let mut file = File::create(&path).unwrap();
1758 sleep_500_millis().await;
1759 writeln!(&mut file, "first line").unwrap();
1760 sleep_500_millis().await;
1761 })
1762 .await;
1763
1764 let lines = extract_messages_string(received);
1765 assert_eq!(lines, vec!["first line"]);
1766 }
1767 fs::rename(&path, &path_for_old_file).expect("could not rename");
1769 {
1772 let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1773 let mut file = File::create(&path).unwrap();
1774 sleep_500_millis().await;
1775 writeln!(&mut file, "second line").unwrap();
1776 sleep_500_millis().await;
1777 })
1778 .await;
1779
1780 let lines = extract_messages_string(received);
1781 assert_eq!(lines, vec!["second line"]);
1782 }
1783 }
1784
1785 #[cfg(unix)] #[tokio::test]
1787 async fn file_start_position_ignore_old_files() {
1788 use std::{
1789 os::unix::io::AsRawFd,
1790 time::{Duration, SystemTime},
1791 };
1792
1793 let dir = tempdir().unwrap();
1794 let config = file::FileConfig {
1795 include: vec![dir.path().join("*")],
1796 ignore_older_secs: Some(5),
1797 ..test_default_file_config(&dir)
1798 };
1799
1800 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1801 let before_path = dir.path().join("before");
1802 let mut before_file = File::create(&before_path).unwrap();
1803 let after_path = dir.path().join("after");
1804 let mut after_file = File::create(&after_path).unwrap();
1805
1806 writeln!(&mut before_file, "first line").unwrap(); writeln!(&mut after_file, "_first line").unwrap(); {
1810 let before = SystemTime::now() - Duration::from_secs(8);
1812 let after = SystemTime::now() - Duration::from_secs(2);
1813
1814 let before_time = libc::timeval {
1815 tv_sec: before
1816 .duration_since(SystemTime::UNIX_EPOCH)
1817 .unwrap()
1818 .as_secs() as _,
1819 tv_usec: 0,
1820 };
1821 let before_times = [before_time, before_time];
1822
1823 let after_time = libc::timeval {
1824 tv_sec: after
1825 .duration_since(SystemTime::UNIX_EPOCH)
1826 .unwrap()
1827 .as_secs() as _,
1828 tv_usec: 0,
1829 };
1830 let after_times = [after_time, after_time];
1831
1832 unsafe {
1833 libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1834 libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1835 }
1836 }
1837
1838 sleep_500_millis().await;
1839 writeln!(&mut before_file, "second line").unwrap();
1840 writeln!(&mut after_file, "_second line").unwrap();
1841
1842 sleep_500_millis().await;
1843 })
1844 .await;
1845
1846 let before_lines = received
1847 .iter()
1848 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1849 .map(|event| {
1850 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1851 })
1852 .collect::<Vec<_>>();
1853 let after_lines = received
1854 .iter()
1855 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1856 .map(|event| {
1857 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1858 })
1859 .collect::<Vec<_>>();
1860 assert_eq!(before_lines, vec!["second line"]);
1861 assert_eq!(after_lines, vec!["_first line", "_second line"]);
1862 }
1863
1864 #[tokio::test]
1865 async fn file_max_line_bytes() {
1866 let dir = tempdir().unwrap();
1867 let config = file::FileConfig {
1868 include: vec![dir.path().join("*")],
1869 max_line_bytes: 10,
1870 ..test_default_file_config(&dir)
1871 };
1872
1873 let path = dir.path().join("file");
1874 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1875 let mut file = File::create(&path).unwrap();
1876
1877 sleep_500_millis().await; writeln!(&mut file, "short").unwrap();
1880 writeln!(&mut file, "this is too long").unwrap();
1881 writeln!(&mut file, "11 eleven11").unwrap();
1882 let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1883 writeln!(&mut file, "{super_long}").unwrap();
1884 writeln!(&mut file, "exactly 10").unwrap();
1885 writeln!(&mut file, "it can end on a line that's too long").unwrap();
1886
1887 sleep_500_millis().await;
1888 sleep_500_millis().await;
1889
1890 writeln!(&mut file, "and then continue").unwrap();
1891 writeln!(&mut file, "last short").unwrap();
1892
1893 sleep_500_millis().await;
1894 sleep_500_millis().await;
1895 }).await;
1896
1897 let received = extract_messages_value(received);
1898
1899 assert_eq!(
1900 received,
1901 vec!["short".into(), "exactly 10".into(), "last short".into()]
1902 );
1903 }
1904
1905 #[tokio::test]
1906 async fn test_multi_line_aggregation_legacy() {
1907 let dir = tempdir().unwrap();
1908 let config = file::FileConfig {
1909 include: vec![dir.path().join("*")],
1910 message_start_indicator: Some("INFO".into()),
1911 multi_line_timeout: 25, ..test_default_file_config(&dir)
1913 };
1914
1915 let path = dir.path().join("file");
1916 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1917 let mut file = File::create(&path).unwrap();
1918
1919 sleep_500_millis().await; writeln!(&mut file, "leftover foo").unwrap();
1922 writeln!(&mut file, "INFO hello").unwrap();
1923 writeln!(&mut file, "INFO goodbye").unwrap();
1924 writeln!(&mut file, "part of goodbye").unwrap();
1925
1926 sleep_500_millis().await;
1927
1928 writeln!(&mut file, "INFO hi again").unwrap();
1929 writeln!(&mut file, "and some more").unwrap();
1930 writeln!(&mut file, "INFO hello").unwrap();
1931
1932 sleep_500_millis().await;
1933
1934 writeln!(&mut file, "too slow").unwrap();
1935 writeln!(&mut file, "INFO doesn't have").unwrap();
1936 writeln!(&mut file, "to be INFO in").unwrap();
1937 writeln!(&mut file, "the middle").unwrap();
1938
1939 sleep_500_millis().await;
1940 })
1941 .await;
1942
1943 let received = extract_messages_value(received);
1944
1945 assert_eq!(
1946 received,
1947 vec![
1948 "leftover foo".into(),
1949 "INFO hello".into(),
1950 "INFO goodbye\npart of goodbye".into(),
1951 "INFO hi again\nand some more".into(),
1952 "INFO hello".into(),
1953 "too slow".into(),
1954 "INFO doesn't have".into(),
1955 "to be INFO in\nthe middle".into(),
1956 ]
1957 );
1958 }
1959
1960 #[tokio::test]
1961 async fn test_multi_line_aggregation() {
1962 let dir = tempdir().unwrap();
1963 let config = file::FileConfig {
1964 include: vec![dir.path().join("*")],
1965 multiline: Some(MultilineConfig {
1966 start_pattern: "INFO".to_owned(),
1967 condition_pattern: "INFO".to_owned(),
1968 mode: line_agg::Mode::HaltBefore,
1969 timeout_ms: Duration::from_millis(25), }),
1971 ..test_default_file_config(&dir)
1972 };
1973
1974 let path = dir.path().join("file");
1975 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1976 let mut file = File::create(&path).unwrap();
1977
1978 sleep_500_millis().await; writeln!(&mut file, "leftover foo").unwrap();
1981 writeln!(&mut file, "INFO hello").unwrap();
1982 writeln!(&mut file, "INFO goodbye").unwrap();
1983 writeln!(&mut file, "part of goodbye").unwrap();
1984
1985 sleep_500_millis().await;
1986
1987 writeln!(&mut file, "INFO hi again").unwrap();
1988 writeln!(&mut file, "and some more").unwrap();
1989 writeln!(&mut file, "INFO hello").unwrap();
1990
1991 sleep_500_millis().await;
1992
1993 writeln!(&mut file, "too slow").unwrap();
1994 writeln!(&mut file, "INFO doesn't have").unwrap();
1995 writeln!(&mut file, "to be INFO in").unwrap();
1996 writeln!(&mut file, "the middle").unwrap();
1997
1998 sleep_500_millis().await;
1999 })
2000 .await;
2001
2002 let received = extract_messages_value(received);
2003
2004 assert_eq!(
2005 received,
2006 vec![
2007 "leftover foo".into(),
2008 "INFO hello".into(),
2009 "INFO goodbye\npart of goodbye".into(),
2010 "INFO hi again\nand some more".into(),
2011 "INFO hello".into(),
2012 "too slow".into(),
2013 "INFO doesn't have".into(),
2014 "to be INFO in\nthe middle".into(),
2015 ]
2016 );
2017 }
2018
2019 #[tokio::test]
2020 async fn test_multi_line_checkpointing() {
2021 let dir = tempdir().unwrap();
2022 let config = file::FileConfig {
2023 include: vec![dir.path().join("*")],
2024 offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2025 multiline: Some(MultilineConfig {
2026 start_pattern: "INFO".to_owned(),
2027 condition_pattern: "INFO".to_owned(),
2028 mode: line_agg::Mode::HaltBefore,
2029 timeout_ms: Duration::from_millis(25), }),
2031 ..test_default_file_config(&dir)
2032 };
2033
2034 let path = dir.path().join("file");
2035 let mut file = File::create(&path).unwrap();
2036
2037 writeln!(&mut file, "INFO hello").unwrap();
2038 writeln!(&mut file, "part of hello").unwrap();
2039
2040 let received = run_file_source(
2042 &config,
2043 false,
2044 Acks,
2045 LogNamespace::Legacy,
2046 sleep_500_millis(),
2047 )
2048 .await;
2049
2050 assert_eq!(received[0].as_log()["offset"], 0.into());
2051
2052 let lines = extract_messages_string(received);
2053 assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2054
2055 let received_after_restart =
2057 run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2058 writeln!(&mut file, "INFO goodbye").unwrap();
2059 })
2060 .await;
2061 assert_eq!(
2062 received_after_restart[0].as_log()["offset"],
2063 (lines[0].len() + 1).into()
2064 );
2065 let lines = extract_messages_string(received_after_restart);
2066 assert_eq!(lines, vec!["INFO goodbye"]);
2067 }
2068
2069 #[tokio::test]
2070 async fn test_fair_reads() {
2071 let dir = tempdir().unwrap();
2072 let config = file::FileConfig {
2073 include: vec![dir.path().join("*")],
2074 max_read_bytes: 1,
2075 oldest_first: false,
2076 ..test_default_file_config(&dir)
2077 };
2078
2079 let older_path = dir.path().join("z_older_file");
2080 let mut older = File::create(&older_path).unwrap();
2081
2082 sleep_500_millis().await;
2083
2084 let newer_path = dir.path().join("a_newer_file");
2085 let mut newer = File::create(&newer_path).unwrap();
2086
2087 writeln!(&mut older, "hello i am the old file").unwrap();
2088 writeln!(&mut older, "i have been around a while").unwrap();
2089 writeln!(&mut older, "you can read newer files at the same time").unwrap();
2090
2091 writeln!(&mut newer, "and i am the new file").unwrap();
2092 writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2093 writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2094
2095 sleep_500_millis().await;
2096
2097 let received = run_file_source(
2098 &config,
2099 false,
2100 NoAcks,
2101 LogNamespace::Legacy,
2102 sleep_500_millis(),
2103 )
2104 .await;
2105
2106 let received = extract_messages_value(received);
2107
2108 assert_eq!(
2109 received,
2110 vec![
2111 "hello i am the old file".into(),
2112 "and i am the new file".into(),
2113 "i have been around a while".into(),
2114 "this should be interleaved with the old one".into(),
2115 "you can read newer files at the same time".into(),
2116 "which is fine because we want fairness".into(),
2117 ]
2118 );
2119 }
2120
2121 #[tokio::test]
2122 async fn test_oldest_first() {
2123 let dir = tempdir().unwrap();
2124 let config = file::FileConfig {
2125 include: vec![dir.path().join("*")],
2126 max_read_bytes: 1,
2127 oldest_first: true,
2128 ..test_default_file_config(&dir)
2129 };
2130
2131 let older_path = dir.path().join("z_older_file");
2132 let mut older = File::create(&older_path).unwrap();
2133
2134 sleep_500_millis().await;
2135
2136 let newer_path = dir.path().join("a_newer_file");
2137 let mut newer = File::create(&newer_path).unwrap();
2138
2139 writeln!(&mut older, "hello i am the old file").unwrap();
2140 writeln!(&mut older, "i have been around a while").unwrap();
2141 writeln!(&mut older, "you should definitely read all of me first").unwrap();
2142
2143 writeln!(&mut newer, "i'm new").unwrap();
2144 writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2145 writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2146
2147 sleep_500_millis().await;
2148
2149 let received = run_file_source(
2150 &config,
2151 false,
2152 NoAcks,
2153 LogNamespace::Legacy,
2154 sleep_500_millis(),
2155 )
2156 .await;
2157
2158 let received = extract_messages_value(received);
2159
2160 assert_eq!(
2161 received,
2162 vec![
2163 "hello i am the old file".into(),
2164 "i have been around a while".into(),
2165 "you should definitely read all of me first".into(),
2166 "i'm new".into(),
2167 "hopefully you read all the old stuff first".into(),
2168 "because otherwise i'm not going to make sense".into(),
2169 ]
2170 );
2171 }
2172
2173 #[cfg(not(target_os = "macos"))]
2175 #[tokio::test]
2176 async fn test_split_reads() {
2177 let dir = tempdir().unwrap();
2178 let config = file::FileConfig {
2179 include: vec![dir.path().join("*")],
2180 max_read_bytes: 1,
2181 ..test_default_file_config(&dir)
2182 };
2183
2184 let path = dir.path().join("file");
2185 let mut file = File::create(&path).unwrap();
2186
2187 writeln!(&mut file, "hello i am a normal line").unwrap();
2188
2189 sleep_500_millis().await;
2190
2191 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2192 sleep_500_millis().await;
2193
2194 write!(&mut file, "i am not a full line").unwrap();
2195
2196 sleep_500_millis().await;
2198
2199 writeln!(&mut file, " until now").unwrap();
2200
2201 sleep_500_millis().await;
2202 })
2203 .await;
2204
2205 let received = extract_messages_value(received);
2206
2207 assert_eq!(
2208 received,
2209 vec![
2210 "hello i am a normal line".into(),
2211 "i am not a full line until now".into(),
2212 ]
2213 );
2214 }
2215
2216 #[tokio::test]
2217 async fn test_gzipped_file() {
2218 let dir = tempdir().unwrap();
2219 let config = file::FileConfig {
2220 include: vec![PathBuf::from("tests/data/gzipped.log")],
2221 max_line_bytes: 100,
2228 ..test_default_file_config(&dir)
2229 };
2230
2231 let received = run_file_source(
2232 &config,
2233 false,
2234 NoAcks,
2235 LogNamespace::Legacy,
2236 sleep_500_millis(),
2237 )
2238 .await;
2239
2240 let received = extract_messages_value(received);
2241
2242 assert_eq!(
2243 received,
2244 vec![
2245 "this is a simple file".into(),
2246 "i have been compressed".into(),
2247 "in order to make me smaller".into(),
2248 "but you can still read me".into(),
2249 "hooray".into(),
2250 ]
2251 );
2252 }
2253
2254 #[tokio::test]
2255 async fn test_non_utf8_encoded_file() {
2256 let dir = tempdir().unwrap();
2257 let config = file::FileConfig {
2258 include: vec![PathBuf::from("tests/data/utf-16le.log")],
2259 encoding: Some(EncodingConfig { charset: UTF_16LE }),
2260 ..test_default_file_config(&dir)
2261 };
2262
2263 let received = run_file_source(
2264 &config,
2265 false,
2266 NoAcks,
2267 LogNamespace::Legacy,
2268 sleep_500_millis(),
2269 )
2270 .await;
2271
2272 let received = extract_messages_value(received);
2273
2274 assert_eq!(
2275 received,
2276 vec![
2277 "hello i am a file".into(),
2278 "i can unicode".into(),
2279 "but i do so in 16 bits".into(),
2280 "and when i byte".into(),
2281 "i become little-endian".into(),
2282 ]
2283 );
2284 }
2285
2286 #[tokio::test]
2287 async fn test_non_default_line_delimiter() {
2288 let dir = tempdir().unwrap();
2289 let config = file::FileConfig {
2290 include: vec![dir.path().join("*")],
2291 line_delimiter: "\r\n".to_string(),
2292 ..test_default_file_config(&dir)
2293 };
2294
2295 let path = dir.path().join("file");
2296 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2297 let mut file = File::create(&path).unwrap();
2298
2299 sleep_500_millis().await; write!(&mut file, "hello i am a line\r\n").unwrap();
2302 write!(&mut file, "and i am too\r\n").unwrap();
2303 write!(&mut file, "CRLF is how we end\r\n").unwrap();
2304 write!(&mut file, "please treat us well\r\n").unwrap();
2305
2306 sleep_500_millis().await;
2307 })
2308 .await;
2309
2310 let received = extract_messages_value(received);
2311
2312 assert_eq!(
2313 received,
2314 vec![
2315 "hello i am a line".into(),
2316 "and i am too".into(),
2317 "CRLF is how we end".into(),
2318 "please treat us well".into()
2319 ]
2320 );
2321 }
2322
2323 #[tokio::test]
2324 async fn remove_file() {
2325 let n = 5;
2326 let remove_after_secs = 1;
2327
2328 let dir = tempdir().unwrap();
2329 let config = file::FileConfig {
2330 include: vec![dir.path().join("*")],
2331 remove_after_secs: Some(remove_after_secs),
2332 ..test_default_file_config(&dir)
2333 };
2334
2335 let path = dir.path().join("file");
2336 let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2337 let mut file = File::create(&path).unwrap();
2338
2339 sleep_500_millis().await; for i in 0..n {
2342 writeln!(&mut file, "{i}").unwrap();
2343 }
2344 drop(file);
2345
2346 for _ in 0..10 {
2347 sleep(Duration::from_secs(remove_after_secs + 1)).await;
2349
2350 if File::open(&path).is_err() {
2351 break;
2352 }
2353 }
2354 })
2355 .await;
2356
2357 assert_eq!(received.len(), n);
2358
2359 match File::open(&path) {
2360 Ok(_) => panic!("File wasn't removed"),
2361 Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2362 }
2363 }
2364
2365 #[derive(Clone, Copy, Eq, PartialEq)]
2366 enum AckingMode {
2367 NoAcks, Unfinalized, Acks, }
2371 use AckingMode::*;
2372 use vector_lib::lookup::OwnedTargetPath;
2373
2374 async fn run_file_source(
2375 config: &FileConfig,
2376 wait_shutdown: bool,
2377 acking_mode: AckingMode,
2378 log_namespace: LogNamespace,
2379 inner: impl Future<Output = ()>,
2380 ) -> Vec<Event> {
2381 assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2382 let (tx, rx) = if acking_mode == Acks {
2383 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2384 (tx, rx.boxed())
2385 } else {
2386 let (tx, rx) = SourceSender::new_test();
2387 (tx, rx.boxed())
2388 };
2389
2390 let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2391 let data_dir = config.data_dir.clone().unwrap();
2392 let acks = !matches!(acking_mode, NoAcks);
2393
2394 tokio::spawn(file::file_source(
2395 config,
2396 data_dir,
2397 shutdown,
2398 tx,
2399 acks,
2400 log_namespace,
2401 ));
2402
2403 inner.await;
2404
2405 drop(trigger_shutdown);
2406
2407 let result = if acking_mode == Unfinalized {
2408 rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2409 .collect::<Vec<_>>()
2410 .await
2411 } else {
2412 timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2413 .await
2414 .expect(
2415 "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2416 )
2417 };
2418 if wait_shutdown {
2419 shutdown_done.await;
2420 }
2421
2422 result
2423 })
2424 .await
2425 }
2426
2427 fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2428 received
2429 .into_iter()
2430 .map(Event::into_log)
2431 .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2432 .collect()
2433 }
2434
2435 fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2436 received
2437 .into_iter()
2438 .map(Event::into_log)
2439 .map(|log| log.get_message().unwrap().clone())
2440 .collect()
2441 }
2442}