1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::{sync::oneshot, task::spawn_blocking};
10use tracing::{Instrument, Span};
11use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig};
12use vector_lib::configurable::configurable_component;
13use vector_lib::file_source::{
14 calculate_ignore_before,
15 paths_provider::glob::{Glob, MatchOptions},
16 Checkpointer, FileFingerprint, FileServer, FingerprintStrategy, Fingerprinter, Line, ReadFrom,
17 ReadFromConfig,
18};
19use vector_lib::finalizer::OrderedFinalizer;
20use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
21use vector_lib::{
22 config::{LegacyKey, LogNamespace},
23 EstimatedJsonEncodedSizeOf,
24};
25use vrl::value::Kind;
26
27use super::util::{EncodingConfig, MultilineConfig};
28use crate::{
29 config::{
30 log_schema, DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
31 SourceOutput,
32 },
33 encoding_transcode::{Decoder, Encoder},
34 event::{BatchNotifier, BatchStatus, LogEvent},
35 internal_events::{
36 FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
37 FileSourceInternalEventsEmitter, StreamClosedError,
38 },
39 line_agg::{self, LineAgg},
40 serde::bool_or_struct,
41 shutdown::ShutdownSignal,
42 SourceSender,
43};
44
45#[derive(Debug, Snafu)]
46enum BuildError {
47 #[snafu(display(
48 "message_start_indicator {:?} is not a valid regex: {}",
49 indicator,
50 source
51 ))]
52 InvalidMessageStartIndicator {
53 indicator: String,
54 source: regex::Error,
55 },
56}
57
58#[serde_as]
60#[configurable_component(source("file", "Collect logs from files."))]
61#[derive(Clone, Debug, PartialEq, Eq)]
62#[serde(deny_unknown_fields)]
63pub struct FileConfig {
64 #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
66 pub include: Vec<PathBuf>,
67
68 #[serde(default)]
74 #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
75 pub exclude: Vec<PathBuf>,
76
77 #[serde(default = "default_file_key")]
83 #[configurable(metadata(docs::examples = "path"))]
84 pub file_key: OptionalValuePath,
85
86 #[configurable(
88 deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
89 )]
90 #[configurable(metadata(docs::hidden))]
91 #[serde(default)]
92 pub start_at_beginning: Option<bool>,
93
94 #[serde(default)]
98 pub ignore_checkpoints: Option<bool>,
99
100 #[serde(default = "default_read_from")]
101 #[configurable(derived)]
102 pub read_from: ReadFromConfig,
103
104 #[serde(alias = "ignore_older", default)]
106 #[configurable(metadata(docs::type_unit = "seconds"))]
107 #[configurable(metadata(docs::examples = 600))]
108 #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
109 pub ignore_older_secs: Option<u64>,
110
111 #[serde(default = "default_max_line_bytes")]
115 #[configurable(metadata(docs::type_unit = "bytes"))]
116 pub max_line_bytes: usize,
117
118 #[configurable(metadata(docs::examples = "hostname"))]
126 pub host_key: Option<OptionalValuePath>,
127
128 #[serde(default)]
137 #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
138 #[configurable(metadata(docs::human_name = "Data Directory"))]
139 pub data_dir: Option<PathBuf>,
140
141 #[serde(default)]
147 #[configurable(metadata(docs::examples = "offset"))]
148 pub offset_key: Option<OptionalValuePath>,
149
150 #[serde(
156 alias = "glob_minimum_cooldown",
157 default = "default_glob_minimum_cooldown_ms"
158 )]
159 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
160 #[configurable(metadata(docs::type_unit = "milliseconds"))]
161 #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
162 pub glob_minimum_cooldown_ms: Duration,
163
164 #[configurable(derived)]
165 #[serde(alias = "fingerprinting", default)]
166 fingerprint: FingerprintConfig,
167
168 #[serde(default)]
172 pub ignore_not_found: bool,
173
174 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
176 #[configurable(metadata(docs::hidden))]
177 #[serde(default)]
178 pub message_start_indicator: Option<String>,
179
180 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
182 #[configurable(metadata(docs::hidden))]
183 #[serde(default = "default_multi_line_timeout")]
184 pub multi_line_timeout: u64,
185
186 #[configurable(derived)]
190 #[serde(default)]
191 pub multiline: Option<MultilineConfig>,
192
193 #[serde(default = "default_max_read_bytes")]
199 #[configurable(metadata(docs::type_unit = "bytes"))]
200 pub max_read_bytes: usize,
201
202 #[serde(default)]
204 pub oldest_first: bool,
205
206 #[serde(alias = "remove_after", default)]
210 #[configurable(metadata(docs::type_unit = "seconds"))]
211 #[configurable(metadata(docs::examples = 0))]
212 #[configurable(metadata(docs::examples = 5))]
213 #[configurable(metadata(docs::examples = 60))]
214 #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
215 pub remove_after_secs: Option<u64>,
216
217 #[serde(default = "default_line_delimiter")]
219 #[configurable(metadata(docs::examples = "\r\n"))]
220 pub line_delimiter: String,
221
222 #[configurable(derived)]
223 #[serde(default)]
224 pub encoding: Option<EncodingConfig>,
225
226 #[configurable(derived)]
227 #[serde(default, deserialize_with = "bool_or_struct")]
228 acknowledgements: SourceAcknowledgementsConfig,
229
230 #[configurable(metadata(docs::hidden))]
232 #[serde(default)]
233 log_namespace: Option<bool>,
234
235 #[configurable(derived)]
236 #[serde(default)]
237 internal_metrics: FileInternalMetricsConfig,
238
239 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
242 #[configurable(metadata(docs::type_unit = "seconds"))]
243 #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
244 pub rotate_wait: Duration,
245}
246
247fn default_max_line_bytes() -> usize {
248 bytesize::kib(100u64) as usize
249}
250
251fn default_file_key() -> OptionalValuePath {
252 OptionalValuePath::from(owned_value_path!("file"))
253}
254
255const fn default_read_from() -> ReadFromConfig {
256 ReadFromConfig::Beginning
257}
258
259const fn default_glob_minimum_cooldown_ms() -> Duration {
260 Duration::from_millis(1000)
261}
262
263const fn default_multi_line_timeout() -> u64 {
264 1000
265} const fn default_max_read_bytes() -> usize {
268 2048
269}
270
271fn default_line_delimiter() -> String {
272 "\n".to_string()
273}
274
275const fn default_rotate_wait() -> Duration {
276 Duration::from_secs(u64::MAX / 2)
277}
278
279#[configurable_component]
283#[derive(Clone, Debug, PartialEq, Eq)]
284#[serde(tag = "strategy", rename_all = "snake_case")]
285#[configurable(metadata(
286 docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
287))]
288pub enum FingerprintConfig {
289 Checksum {
291 #[serde(alias = "fingerprint_bytes")]
296 #[configurable(metadata(docs::hidden))]
297 #[configurable(metadata(docs::type_unit = "bytes"))]
298 bytes: Option<usize>,
299
300 #[serde(default = "default_ignored_header_bytes")]
306 #[configurable(metadata(docs::type_unit = "bytes"))]
307 ignored_header_bytes: usize,
308
309 #[serde(default = "default_lines")]
316 #[configurable(metadata(docs::type_unit = "lines"))]
317 lines: usize,
318 },
319
320 #[serde(rename = "device_and_inode")]
324 DevInode,
325}
326
327impl Default for FingerprintConfig {
328 fn default() -> Self {
329 Self::Checksum {
330 bytes: None,
331 ignored_header_bytes: 0,
332 lines: default_lines(),
333 }
334 }
335}
336
337const fn default_ignored_header_bytes() -> usize {
338 0
339}
340
341const fn default_lines() -> usize {
342 1
343}
344
345impl From<FingerprintConfig> for FingerprintStrategy {
346 fn from(config: FingerprintConfig) -> FingerprintStrategy {
347 match config {
348 FingerprintConfig::Checksum {
349 bytes,
350 ignored_header_bytes,
351 lines,
352 } => {
353 let bytes = match bytes {
354 Some(bytes) => {
355 warn!(message = "The `fingerprint.bytes` option will be used to convert old file fingerprints created by vector < v0.11.0, but are not supported for new file fingerprints. The first line will be used instead.");
356 bytes
357 }
358 None => 256,
359 };
360 FingerprintStrategy::Checksum {
361 bytes,
362 ignored_header_bytes,
363 lines,
364 }
365 }
366 FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
367 }
368 }
369}
370
371#[derive(Debug)]
372pub(crate) struct FinalizerEntry {
373 pub(crate) file_id: FileFingerprint,
374 pub(crate) offset: u64,
375}
376
377impl Default for FileConfig {
378 fn default() -> Self {
379 Self {
380 include: vec![PathBuf::from("/var/log/**/*.log")],
381 exclude: vec![],
382 file_key: default_file_key(),
383 start_at_beginning: None,
384 ignore_checkpoints: None,
385 read_from: default_read_from(),
386 ignore_older_secs: None,
387 max_line_bytes: default_max_line_bytes(),
388 fingerprint: FingerprintConfig::default(),
389 ignore_not_found: false,
390 host_key: None,
391 offset_key: None,
392 data_dir: None,
393 glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
394 message_start_indicator: None,
395 multi_line_timeout: default_multi_line_timeout(), multiline: None,
397 max_read_bytes: default_max_read_bytes(),
398 oldest_first: false,
399 remove_after_secs: None,
400 line_delimiter: default_line_delimiter(),
401 encoding: None,
402 acknowledgements: Default::default(),
403 log_namespace: None,
404 internal_metrics: Default::default(),
405 rotate_wait: default_rotate_wait(),
406 }
407 }
408}
409
410impl_generate_config_from_default!(FileConfig);
411
412#[async_trait::async_trait]
413#[typetag::serde(name = "file")]
414impl SourceConfig for FileConfig {
415 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
416 let data_dir = cx
421 .globals
422 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
424
425 #[allow(clippy::suspicious_else_formatting)]
427 {
428 if let Some(ref config) = self.multiline {
429 let _: line_agg::Config = config.try_into()?;
430 }
431
432 if let Some(ref indicator) = self.message_start_indicator {
433 Regex::new(indicator)
434 .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
435 }
436 }
437
438 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
439
440 let log_namespace = cx.log_namespace(self.log_namespace);
441
442 Ok(file_source(
443 self,
444 data_dir,
445 cx.shutdown,
446 cx.out,
447 acknowledgements,
448 log_namespace,
449 ))
450 }
451
452 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
453 let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
454 let host_key = self
455 .host_key
456 .clone()
457 .unwrap_or(log_schema().host_key().cloned().into())
458 .path
459 .map(LegacyKey::Overwrite);
460
461 let offset_key = self
462 .offset_key
463 .clone()
464 .and_then(|k| k.path)
465 .map(LegacyKey::Overwrite);
466
467 let schema_definition = BytesDeserializerConfig
468 .schema_definition(global_log_namespace.merge(self.log_namespace))
469 .with_standard_vector_source_metadata()
470 .with_source_metadata(
471 Self::NAME,
472 host_key,
473 &owned_value_path!("host"),
474 Kind::bytes().or_undefined(),
475 Some("host"),
476 )
477 .with_source_metadata(
478 Self::NAME,
479 offset_key,
480 &owned_value_path!("offset"),
481 Kind::integer(),
482 None,
483 )
484 .with_source_metadata(
485 Self::NAME,
486 file_key,
487 &owned_value_path!("path"),
488 Kind::bytes(),
489 None,
490 );
491
492 vec![SourceOutput::new_maybe_logs(
493 DataType::Log,
494 schema_definition,
495 )]
496 }
497
498 fn can_acknowledge(&self) -> bool {
499 true
500 }
501}
502
503pub fn file_source(
504 config: &FileConfig,
505 data_dir: PathBuf,
506 shutdown: ShutdownSignal,
507 mut out: SourceSender,
508 acknowledgements: bool,
509 log_namespace: LogNamespace,
510) -> super::Source {
511 if config.include.is_empty() {
513 error!(message = "`include` configuration option must contain at least one file pattern.");
514 return Box::pin(future::ready(Err(())));
515 }
516
517 let exclude_patterns = config
518 .exclude
519 .iter()
520 .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
521 .collect::<Vec<PathBuf>>();
522 let ignore_before = calculate_ignore_before(config.ignore_older_secs);
523 let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
524 let (ignore_checkpoints, read_from) = reconcile_position_options(
525 config.start_at_beginning,
526 config.ignore_checkpoints,
527 Some(config.read_from),
528 );
529
530 let emitter = FileSourceInternalEventsEmitter {
531 include_file_metric_tag: config.internal_metrics.include_file_tag,
532 };
533
534 let paths_provider = Glob::new(
535 &config.include,
536 &exclude_patterns,
537 MatchOptions::default(),
538 emitter.clone(),
539 )
540 .expect("invalid glob patterns");
541
542 let encoding_charset = config.encoding.clone().map(|e| e.charset);
543
544 let line_delimiter_as_bytes = match encoding_charset {
547 Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
548 None => Bytes::from(config.line_delimiter.clone()),
549 };
550
551 let checkpointer = Checkpointer::new(&data_dir);
552 let file_server = FileServer {
553 paths_provider,
554 max_read_bytes: config.max_read_bytes,
555 ignore_checkpoints,
556 read_from,
557 ignore_before,
558 max_line_bytes: config.max_line_bytes,
559 line_delimiter: line_delimiter_as_bytes,
560 data_dir,
561 glob_minimum_cooldown,
562 fingerprinter: Fingerprinter {
563 strategy: config.fingerprint.clone().into(),
564 max_line_length: config.max_line_bytes,
565 ignore_not_found: config.ignore_not_found,
566 },
567 oldest_first: config.oldest_first,
568 remove_after: config.remove_after_secs.map(Duration::from_secs),
569 emitter,
570 handle: tokio::runtime::Handle::current(),
571 rotate_wait: config.rotate_wait,
572 };
573
574 let event_metadata = EventMetadata {
575 host_key: config
576 .host_key
577 .clone()
578 .unwrap_or(log_schema().host_key().cloned().into())
579 .path,
580 hostname: crate::get_hostname().ok(),
581 file_key: config.file_key.clone().path,
582 offset_key: config.offset_key.clone().and_then(|k| k.path),
583 };
584
585 let include = config.include.clone();
586 let exclude = config.exclude.clone();
587 let multiline_config = config.multiline.clone();
588 let message_start_indicator = config.message_start_indicator.clone();
589 let multi_line_timeout = config.multi_line_timeout;
590
591 let (finalizer, shutdown_checkpointer) = if acknowledgements {
592 let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
596
597 let (send_shutdown, shutdown2) = oneshot::channel::<()>();
602 let checkpoints = checkpointer.view();
603 tokio::spawn(async move {
604 while let Some((status, entry)) = ack_stream.next().await {
605 if status == BatchStatus::Delivered {
606 checkpoints.update(entry.file_id, entry.offset);
607 }
608 }
609 send_shutdown.send(())
610 });
611 (Some(finalizer), shutdown2.map(|_| ()).boxed())
612 } else {
613 (None, shutdown.clone().map(|_| ()).boxed())
616 };
617
618 let checkpoints = checkpointer.view();
619 let include_file_metric_tag = config.internal_metrics.include_file_tag;
620 Box::pin(async move {
621 info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
622
623 let mut encoding_decoder = encoding_charset.map(Decoder::new);
624
625 let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
627 let rx = rx
628 .map(futures::stream::iter)
629 .flatten()
630 .map(move |mut line| {
631 emit!(FileBytesReceived {
632 byte_size: line.text.len(),
633 file: &line.filename,
634 include_file_metric_tag,
635 });
636 line.text = match encoding_decoder.as_mut() {
638 Some(d) => d.decode_to_utf8(line.text),
639 None => line.text,
640 };
641 line
642 });
643
644 let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
645 if let Some(ref multiline_config) = multiline_config {
646 wrap_with_line_agg(
647 rx,
648 multiline_config.try_into().unwrap(), )
650 } else if let Some(msi) = message_start_indicator {
651 wrap_with_line_agg(
652 rx,
653 line_agg::Config::for_legacy(
654 Regex::new(&msi).unwrap(), multi_line_timeout,
656 ),
657 )
658 } else {
659 Box::new(rx)
660 };
661
662 let span = Span::current();
665 let mut messages = messages.map(move |line| {
666 let mut event = create_event(
667 line.text,
668 line.start_offset,
669 &line.filename,
670 &event_metadata,
671 log_namespace,
672 include_file_metric_tag,
673 );
674
675 if let Some(finalizer) = &finalizer {
676 let (batch, receiver) = BatchNotifier::new_with_receiver();
677 event = event.with_batch_notifier(&batch);
678 let entry = FinalizerEntry {
679 file_id: line.file_id,
680 offset: line.end_offset,
681 };
682 finalizer.add(entry, receiver);
683 } else {
684 checkpoints.update(line.file_id, line.end_offset);
685 }
686 event
687 });
688 tokio::spawn(async move {
689 match out
690 .send_event_stream(&mut messages)
691 .instrument(span.or_current())
692 .await
693 {
694 Ok(()) => {
695 debug!("Finished sending.");
696 }
697 Err(_) => {
698 let (count, _) = messages.size_hint();
699 emit!(StreamClosedError { count });
700 }
701 }
702 });
703
704 let span = info_span!("file_server");
705 spawn_blocking(move || {
706 let _enter = span.enter();
707 let result = file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer);
708 emit!(FileOpen { count: 0 });
709 result.unwrap();
713 })
714 .map_err(|error| error!(message="File server unexpectedly stopped.", %error))
715 .await
716 })
717}
718
719fn reconcile_position_options(
722 start_at_beginning: Option<bool>,
723 ignore_checkpoints: Option<bool>,
724 read_from: Option<ReadFromConfig>,
725) -> (bool, ReadFrom) {
726 if start_at_beginning.is_some() {
727 warn!(message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead.")
728 }
729
730 match start_at_beginning {
731 Some(true) => (
732 ignore_checkpoints.unwrap_or(true),
733 read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
734 ),
735 _ => (
736 ignore_checkpoints.unwrap_or(false),
737 read_from.map(Into::into).unwrap_or_default(),
738 ),
739 }
740}
741
742fn wrap_with_line_agg(
743 rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
744 config: line_agg::Config,
745) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
746 let logic = line_agg::Logic::new(config);
747 Box::new(
748 LineAgg::new(
749 rx.map(|line| {
750 (
751 line.filename,
752 line.text,
753 (line.file_id, line.start_offset, line.end_offset),
754 )
755 }),
756 logic,
757 )
758 .map(
759 |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
760 text,
761 filename,
762 file_id,
763 start_offset,
764 end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
765 lastline_end_offset
766 }),
767 },
768 ),
769 )
770}
771
772struct EventMetadata {
773 host_key: Option<OwnedValuePath>,
774 hostname: Option<String>,
775 file_key: Option<OwnedValuePath>,
776 offset_key: Option<OwnedValuePath>,
777}
778
779fn create_event(
780 line: Bytes,
781 offset: u64,
782 file: &str,
783 meta: &EventMetadata,
784 log_namespace: LogNamespace,
785 include_file_metric_tag: bool,
786) -> LogEvent {
787 let deserializer = BytesDeserializer;
788 let mut event = deserializer.parse_single(line, log_namespace);
789
790 log_namespace.insert_vector_metadata(
791 &mut event,
792 log_schema().source_type_key(),
793 path!("source_type"),
794 Bytes::from_static(FileConfig::NAME.as_bytes()),
795 );
796 log_namespace.insert_vector_metadata(
797 &mut event,
798 log_schema().timestamp_key(),
799 path!("ingest_timestamp"),
800 Utc::now(),
801 );
802
803 let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
804 if let Some(hostname) = &meta.hostname {
806 log_namespace.insert_source_metadata(
807 FileConfig::NAME,
808 &mut event,
809 legacy_host_key,
810 path!("host"),
811 hostname.clone(),
812 );
813 }
814
815 let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
816 log_namespace.insert_source_metadata(
817 FileConfig::NAME,
818 &mut event,
819 legacy_offset_key,
820 path!("offset"),
821 offset,
822 );
823
824 let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
825 log_namespace.insert_source_metadata(
826 FileConfig::NAME,
827 &mut event,
828 legacy_file_key,
829 path!("path"),
830 file,
831 );
832
833 emit!(FileEventsReceived {
834 count: 1,
835 file,
836 byte_size: event.estimated_json_encoded_size_of(),
837 include_file_metric_tag,
838 });
839
840 event
841}
842
843#[cfg(test)]
844mod tests {
845 use std::{
846 collections::HashSet,
847 fs::{self, File},
848 future::Future,
849 io::{Seek, Write},
850 };
851
852 use encoding_rs::UTF_16LE;
853 use similar_asserts::assert_eq;
854 use tempfile::tempdir;
855 use tokio::time::{sleep, timeout, Duration};
856 use vector_lib::schema::Definition;
857 use vrl::value::kind::Collection;
858
859 use super::*;
860 use crate::{
861 config::Config,
862 event::{Event, EventStatus, Value},
863 shutdown::ShutdownSignal,
864 sources::file,
865 test_util::components::{assert_source_compliance, FILE_SOURCE_TAGS},
866 };
867 use vrl::value;
868
869 #[test]
870 fn generate_config() {
871 crate::test_util::test_generate_config::<FileConfig>();
872 }
873
874 fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
875 file::FileConfig {
876 fingerprint: FingerprintConfig::Checksum {
877 bytes: Some(8),
878 ignored_header_bytes: 0,
879 lines: 1,
880 },
881 data_dir: Some(dir.path().to_path_buf()),
882 glob_minimum_cooldown_ms: Duration::from_millis(100),
883 internal_metrics: FileInternalMetricsConfig {
884 include_file_tag: true,
885 },
886 ..Default::default()
887 }
888 }
889
890 async fn sleep_500_millis() {
891 sleep(Duration::from_millis(500)).await;
892 }
893
894 #[test]
895 fn parse_config() {
896 let config: FileConfig = toml::from_str(
897 r#"
898 include = [ "/var/log/**/*.log" ]
899 file_key = "file"
900 glob_minimum_cooldown_ms = 1000
901 multi_line_timeout = 1000
902 max_read_bytes = 2048
903 line_delimiter = "\n"
904 "#,
905 )
906 .unwrap();
907 assert_eq!(config, FileConfig::default());
908 assert_eq!(
909 config.fingerprint,
910 FingerprintConfig::Checksum {
911 bytes: None,
912 ignored_header_bytes: 0,
913 lines: 1
914 }
915 );
916
917 let config: FileConfig = toml::from_str(
918 r#"
919 include = [ "/var/log/**/*.log" ]
920 [fingerprint]
921 strategy = "device_and_inode"
922 "#,
923 )
924 .unwrap();
925 assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
926
927 let config: FileConfig = toml::from_str(
928 r#"
929 include = [ "/var/log/**/*.log" ]
930 [fingerprint]
931 strategy = "checksum"
932 bytes = 128
933 ignored_header_bytes = 512
934 "#,
935 )
936 .unwrap();
937 assert_eq!(
938 config.fingerprint,
939 FingerprintConfig::Checksum {
940 bytes: Some(128),
941 ignored_header_bytes: 512,
942 lines: 1
943 }
944 );
945
946 let config: FileConfig = toml::from_str(
947 r#"
948 include = [ "/var/log/**/*.log" ]
949 [encoding]
950 charset = "utf-16le"
951 "#,
952 )
953 .unwrap();
954 assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
955
956 let config: FileConfig = toml::from_str(
957 r#"
958 include = [ "/var/log/**/*.log" ]
959 read_from = "beginning"
960 "#,
961 )
962 .unwrap();
963 assert_eq!(config.read_from, ReadFromConfig::Beginning);
964
965 let config: FileConfig = toml::from_str(
966 r#"
967 include = [ "/var/log/**/*.log" ]
968 read_from = "end"
969 "#,
970 )
971 .unwrap();
972 assert_eq!(config.read_from, ReadFromConfig::End);
973 }
974
975 #[test]
976 fn resolve_data_dir() {
977 let global_dir = tempdir().unwrap();
978 let local_dir = tempdir().unwrap();
979
980 let mut config = Config::default();
981 config.global.data_dir = global_dir.keep().into();
982
983 let res = config
985 .global
986 .resolve_and_validate_data_dir(test_default_file_config(&local_dir).data_dir.as_ref())
987 .unwrap();
988 assert_eq!(res, local_dir.path());
989
990 let res = config.global.resolve_and_validate_data_dir(None).unwrap();
992 assert_eq!(res, config.global.data_dir.unwrap());
993 }
994
995 #[test]
996 fn output_schema_definition_vector_namespace() {
997 let definitions = FileConfig::default()
998 .outputs(LogNamespace::Vector)
999 .remove(0)
1000 .schema_definition(true);
1001
1002 assert_eq!(
1003 definitions,
1004 Some(
1005 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1006 .with_meaning(OwnedTargetPath::event_root(), "message")
1007 .with_metadata_field(
1008 &owned_value_path!("vector", "source_type"),
1009 Kind::bytes(),
1010 None
1011 )
1012 .with_metadata_field(
1013 &owned_value_path!("vector", "ingest_timestamp"),
1014 Kind::timestamp(),
1015 None
1016 )
1017 .with_metadata_field(
1018 &owned_value_path!("file", "host"),
1019 Kind::bytes().or_undefined(),
1020 Some("host")
1021 )
1022 .with_metadata_field(
1023 &owned_value_path!("file", "offset"),
1024 Kind::integer(),
1025 None
1026 )
1027 .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1028 )
1029 )
1030 }
1031
1032 #[test]
1033 fn output_schema_definition_legacy_namespace() {
1034 let definitions = FileConfig::default()
1035 .outputs(LogNamespace::Legacy)
1036 .remove(0)
1037 .schema_definition(true);
1038
1039 assert_eq!(
1040 definitions,
1041 Some(
1042 Definition::new_with_default_metadata(
1043 Kind::object(Collection::empty()),
1044 [LogNamespace::Legacy]
1045 )
1046 .with_event_field(
1047 &owned_value_path!("message"),
1048 Kind::bytes(),
1049 Some("message")
1050 )
1051 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1052 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1053 .with_event_field(
1054 &owned_value_path!("host"),
1055 Kind::bytes().or_undefined(),
1056 Some("host")
1057 )
1058 .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1059 .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1060 )
1061 )
1062 }
1063
1064 #[test]
1065 fn create_event_legacy_namespace() {
1066 let line = Bytes::from("hello world");
1067 let file = "some_file.rs";
1068 let offset: u64 = 0;
1069
1070 let meta = EventMetadata {
1071 host_key: Some(owned_value_path!("host")),
1072 hostname: Some("Some.Machine".to_string()),
1073 file_key: Some(owned_value_path!("file")),
1074 offset_key: Some(owned_value_path!("offset")),
1075 };
1076 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1077
1078 assert_eq!(log["file"], "some_file.rs".into());
1079 assert_eq!(log["host"], "Some.Machine".into());
1080 assert_eq!(log["offset"], 0.into());
1081 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1082 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1083 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1084 }
1085
1086 #[test]
1087 fn create_event_custom_fields_legacy_namespace() {
1088 let line = Bytes::from("hello world");
1089 let file = "some_file.rs";
1090 let offset: u64 = 0;
1091
1092 let meta = EventMetadata {
1093 host_key: Some(owned_value_path!("hostname")),
1094 hostname: Some("Some.Machine".to_string()),
1095 file_key: Some(owned_value_path!("file_path")),
1096 offset_key: Some(owned_value_path!("off")),
1097 };
1098 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1099
1100 assert_eq!(log["file_path"], "some_file.rs".into());
1101 assert_eq!(log["hostname"], "Some.Machine".into());
1102 assert_eq!(log["off"], 0.into());
1103 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1104 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1105 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1106 }
1107
1108 #[test]
1109 fn create_event_vector_namespace() {
1110 let line = Bytes::from("hello world");
1111 let file = "some_file.rs";
1112 let offset: u64 = 0;
1113
1114 let meta = EventMetadata {
1115 host_key: Some(owned_value_path!("ignored")),
1116 hostname: Some("Some.Machine".to_string()),
1117 file_key: Some(owned_value_path!("ignored")),
1118 offset_key: Some(owned_value_path!("ignored")),
1119 };
1120 let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1121
1122 assert_eq!(log.value(), &value!("hello world"));
1123
1124 assert_eq!(
1125 log.metadata()
1126 .value()
1127 .get(path!("vector", "source_type"))
1128 .unwrap(),
1129 &value!("file")
1130 );
1131 assert!(log
1132 .metadata()
1133 .value()
1134 .get(path!("vector", "ingest_timestamp"))
1135 .unwrap()
1136 .is_timestamp());
1137
1138 assert_eq!(
1139 log.metadata()
1140 .value()
1141 .get(path!(FileConfig::NAME, "host"))
1142 .unwrap(),
1143 &value!("Some.Machine")
1144 );
1145 assert_eq!(
1146 log.metadata()
1147 .value()
1148 .get(path!(FileConfig::NAME, "offset"))
1149 .unwrap(),
1150 &value!(0)
1151 );
1152 assert_eq!(
1153 log.metadata()
1154 .value()
1155 .get(path!(FileConfig::NAME, "path"))
1156 .unwrap(),
1157 &value!("some_file.rs")
1158 );
1159 }
1160
1161 #[tokio::test]
1162 async fn file_happy_path() {
1163 let n = 5;
1164
1165 let dir = tempdir().unwrap();
1166 let config = file::FileConfig {
1167 include: vec![dir.path().join("*")],
1168 ..test_default_file_config(&dir)
1169 };
1170
1171 let path1 = dir.path().join("file1");
1172 let path2 = dir.path().join("file2");
1173
1174 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1175 let mut file1 = File::create(&path1).unwrap();
1176 let mut file2 = File::create(&path2).unwrap();
1177
1178 sleep_500_millis().await; for i in 0..n {
1181 writeln!(&mut file1, "hello {i}").unwrap();
1182 writeln!(&mut file2, "goodbye {i}").unwrap();
1183 }
1184
1185 sleep_500_millis().await;
1186 })
1187 .await;
1188
1189 let mut hello_i = 0;
1190 let mut goodbye_i = 0;
1191
1192 for event in received {
1193 let line =
1194 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1195 if line.starts_with("hello") {
1196 assert_eq!(line, format!("hello {}", hello_i));
1197 assert_eq!(
1198 event.as_log()["file"].to_string_lossy(),
1199 path1.to_str().unwrap()
1200 );
1201 hello_i += 1;
1202 } else {
1203 assert_eq!(line, format!("goodbye {}", goodbye_i));
1204 assert_eq!(
1205 event.as_log()["file"].to_string_lossy(),
1206 path2.to_str().unwrap()
1207 );
1208 goodbye_i += 1;
1209 }
1210 }
1211 assert_eq!(hello_i, n);
1212 assert_eq!(goodbye_i, n);
1213 }
1214
1215 #[tokio::test]
1217 async fn file_read_empty_lines() {
1218 let n = 5;
1219
1220 let dir = tempdir().unwrap();
1221 let config = file::FileConfig {
1222 include: vec![dir.path().join("*")],
1223 ..test_default_file_config(&dir)
1224 };
1225
1226 let path = dir.path().join("file");
1227
1228 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1229 let mut file = File::create(&path).unwrap();
1230
1231 sleep_500_millis().await; writeln!(&mut file, "line for checkpointing").unwrap();
1234 for _i in 0..n {
1235 writeln!(&mut file).unwrap();
1236 }
1237
1238 sleep_500_millis().await;
1239 })
1240 .await;
1241
1242 assert_eq!(received.len(), n + 1);
1243 }
1244
1245 #[tokio::test]
1246 async fn file_truncate() {
1247 let n = 5;
1248
1249 let dir = tempdir().unwrap();
1250 let config = file::FileConfig {
1251 include: vec![dir.path().join("*")],
1252 ..test_default_file_config(&dir)
1253 };
1254 let path = dir.path().join("file");
1255 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1256 let mut file = File::create(&path).unwrap();
1257
1258 sleep_500_millis().await; for i in 0..n {
1261 writeln!(&mut file, "pretrunc {i}").unwrap();
1262 }
1263
1264 sleep_500_millis().await; file.set_len(0).unwrap();
1267 file.seek(std::io::SeekFrom::Start(0)).unwrap();
1268
1269 sleep_500_millis().await; for i in 0..n {
1272 writeln!(&mut file, "posttrunc {i}").unwrap();
1273 }
1274
1275 sleep_500_millis().await;
1276 })
1277 .await;
1278
1279 let mut i = 0;
1280 let mut pre_trunc = true;
1281
1282 for event in received {
1283 assert_eq!(
1284 event.as_log()["file"].to_string_lossy(),
1285 path.to_str().unwrap()
1286 );
1287
1288 let line =
1289 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1290
1291 if pre_trunc {
1292 assert_eq!(line, format!("pretrunc {}", i));
1293 } else {
1294 assert_eq!(line, format!("posttrunc {}", i));
1295 }
1296
1297 i += 1;
1298 if i == n {
1299 i = 0;
1300 pre_trunc = false;
1301 }
1302 }
1303 }
1304
1305 #[tokio::test]
1306 async fn file_rotate() {
1307 let n = 5;
1308
1309 let dir = tempdir().unwrap();
1310 let config = file::FileConfig {
1311 include: vec![dir.path().join("*")],
1312 ..test_default_file_config(&dir)
1313 };
1314
1315 let path = dir.path().join("file");
1316 let archive_path = dir.path().join("file");
1317 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1318 let mut file = File::create(&path).unwrap();
1319
1320 sleep_500_millis().await; for i in 0..n {
1323 writeln!(&mut file, "prerot {i}").unwrap();
1324 }
1325
1326 sleep_500_millis().await; fs::rename(&path, archive_path).expect("could not rename");
1329 let mut file = File::create(&path).unwrap();
1330
1331 sleep_500_millis().await; for i in 0..n {
1334 writeln!(&mut file, "postrot {i}").unwrap();
1335 }
1336
1337 sleep_500_millis().await;
1338 })
1339 .await;
1340
1341 let mut i = 0;
1342 let mut pre_rot = true;
1343
1344 for event in received {
1345 assert_eq!(
1346 event.as_log()["file"].to_string_lossy(),
1347 path.to_str().unwrap()
1348 );
1349
1350 let line =
1351 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1352
1353 if pre_rot {
1354 assert_eq!(line, format!("prerot {}", i));
1355 } else {
1356 assert_eq!(line, format!("postrot {}", i));
1357 }
1358
1359 i += 1;
1360 if i == n {
1361 i = 0;
1362 pre_rot = false;
1363 }
1364 }
1365 }
1366
1367 #[tokio::test]
1368 async fn file_multiple_paths() {
1369 let n = 5;
1370
1371 let dir = tempdir().unwrap();
1372 let config = file::FileConfig {
1373 include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1374 exclude: vec![dir.path().join("a.*.txt")],
1375 ..test_default_file_config(&dir)
1376 };
1377
1378 let path1 = dir.path().join("a.txt");
1379 let path2 = dir.path().join("b.txt");
1380 let path3 = dir.path().join("a.log");
1381 let path4 = dir.path().join("a.ignore.txt");
1382 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1383 let mut file1 = File::create(&path1).unwrap();
1384 let mut file2 = File::create(&path2).unwrap();
1385 let mut file3 = File::create(&path3).unwrap();
1386 let mut file4 = File::create(&path4).unwrap();
1387
1388 sleep_500_millis().await; for i in 0..n {
1391 writeln!(&mut file1, "1 {i}").unwrap();
1392 writeln!(&mut file2, "2 {i}").unwrap();
1393 writeln!(&mut file3, "3 {i}").unwrap();
1394 writeln!(&mut file4, "4 {i}").unwrap();
1395 }
1396
1397 sleep_500_millis().await;
1398 })
1399 .await;
1400
1401 let mut is = [0; 3];
1402
1403 for event in received {
1404 let line =
1405 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1406 let mut split = line.split(' ');
1407 let file = split.next().unwrap().parse::<usize>().unwrap();
1408 assert_ne!(file, 4);
1409 let i = split.next().unwrap().parse::<usize>().unwrap();
1410
1411 assert_eq!(is[file - 1], i);
1412 is[file - 1] += 1;
1413 }
1414
1415 assert_eq!(is, [n as usize; 3]);
1416 }
1417
1418 #[tokio::test]
1419 async fn file_exclude_paths() {
1420 let n = 5;
1421
1422 let dir = tempdir().unwrap();
1423 let config = file::FileConfig {
1424 include: vec![dir.path().join("a//b/*.log.*")],
1425 exclude: vec![dir.path().join("a//b/test.log.*")],
1426 ..test_default_file_config(&dir)
1427 };
1428
1429 let path1 = dir.path().join("a//b/a.log.1");
1430 let path2 = dir.path().join("a//b/test.log.1");
1431 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1432 std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1433 let mut file1 = File::create(&path1).unwrap();
1434 let mut file2 = File::create(&path2).unwrap();
1435
1436 sleep_500_millis().await; for i in 0..n {
1439 writeln!(&mut file1, "1 {i}").unwrap();
1440 writeln!(&mut file2, "2 {i}").unwrap();
1441 }
1442
1443 sleep_500_millis().await;
1444 })
1445 .await;
1446
1447 let mut is = [0; 1];
1448
1449 for event in received {
1450 let line =
1451 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1452 let mut split = line.split(' ');
1453 let file = split.next().unwrap().parse::<usize>().unwrap();
1454 assert_ne!(file, 4);
1455 let i = split.next().unwrap().parse::<usize>().unwrap();
1456
1457 assert_eq!(is[file - 1], i);
1458 is[file - 1] += 1;
1459 }
1460
1461 assert_eq!(is, [n as usize; 1]);
1462 }
1463
1464 #[tokio::test]
1465 async fn file_key_acknowledged() {
1466 file_key(Acks).await
1467 }
1468
1469 #[tokio::test]
1470 async fn file_key_no_acknowledge() {
1471 file_key(NoAcks).await
1472 }
1473
1474 async fn file_key(acks: AckingMode) {
1475 {
1477 let dir = tempdir().unwrap();
1478 let config = file::FileConfig {
1479 include: vec![dir.path().join("*")],
1480 ..test_default_file_config(&dir)
1481 };
1482
1483 let path = dir.path().join("file");
1484 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1485 let mut file = File::create(&path).unwrap();
1486
1487 sleep_500_millis().await;
1488
1489 writeln!(&mut file, "hello there").unwrap();
1490
1491 sleep_500_millis().await;
1492 })
1493 .await;
1494
1495 assert_eq!(received.len(), 1);
1496 assert_eq!(
1497 received[0].as_log()["file"].to_string_lossy(),
1498 path.to_str().unwrap()
1499 );
1500 }
1501
1502 {
1504 let dir = tempdir().unwrap();
1505 let config = file::FileConfig {
1506 include: vec![dir.path().join("*")],
1507 file_key: OptionalValuePath::from(owned_value_path!("source")),
1508 ..test_default_file_config(&dir)
1509 };
1510
1511 let path = dir.path().join("file");
1512 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1513 let mut file = File::create(&path).unwrap();
1514
1515 sleep_500_millis().await;
1516
1517 writeln!(&mut file, "hello there").unwrap();
1518
1519 sleep_500_millis().await;
1520 })
1521 .await;
1522
1523 assert_eq!(received.len(), 1);
1524 assert_eq!(
1525 received[0].as_log()["source"].to_string_lossy(),
1526 path.to_str().unwrap()
1527 );
1528 }
1529
1530 {
1532 let dir = tempdir().unwrap();
1533 let config = file::FileConfig {
1534 include: vec![dir.path().join("*")],
1535 ..test_default_file_config(&dir)
1536 };
1537
1538 let path = dir.path().join("file");
1539 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1540 let mut file = File::create(&path).unwrap();
1541
1542 sleep_500_millis().await;
1543
1544 writeln!(&mut file, "hello there").unwrap();
1545
1546 sleep_500_millis().await;
1547 })
1548 .await;
1549
1550 assert_eq!(received.len(), 1);
1551 assert_eq!(
1552 received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1553 vec![
1554 default_file_key()
1555 .path
1556 .expect("file key to exist")
1557 .to_string()
1558 .into(),
1559 log_schema().host_key().unwrap().to_string().into(),
1560 log_schema().message_key().unwrap().to_string().into(),
1561 log_schema().timestamp_key().unwrap().to_string().into(),
1562 log_schema().source_type_key().unwrap().to_string().into()
1563 ]
1564 .into_iter()
1565 .collect::<HashSet<_>>()
1566 );
1567 }
1568 }
1569
1570 #[cfg(target_os = "linux")] #[tokio::test]
1572 async fn file_start_position_server_restart_acknowledged() {
1573 file_start_position_server_restart(Acks).await
1574 }
1575
1576 #[cfg(target_os = "linux")] #[tokio::test]
1578 async fn file_start_position_server_restart_no_acknowledge() {
1579 file_start_position_server_restart(NoAcks).await
1580 }
1581
1582 #[cfg(target_os = "linux")] async fn file_start_position_server_restart(acking: AckingMode) {
1584 let dir = tempdir().unwrap();
1585 let config = file::FileConfig {
1586 include: vec![dir.path().join("*")],
1587 ..test_default_file_config(&dir)
1588 };
1589
1590 let path = dir.path().join("file");
1591 let mut file = File::create(&path).unwrap();
1592 writeln!(&mut file, "zeroth line").unwrap();
1593 sleep_500_millis().await;
1594
1595 {
1597 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1598 sleep_500_millis().await;
1599 writeln!(&mut file, "first line").unwrap();
1600 sleep_500_millis().await;
1601 })
1602 .await;
1603
1604 let lines = extract_messages_string(received);
1605 assert_eq!(lines, vec!["zeroth line", "first line"]);
1606 }
1607 {
1609 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1610 sleep_500_millis().await;
1611 writeln!(&mut file, "second line").unwrap();
1612 sleep_500_millis().await;
1613 })
1614 .await;
1615
1616 let lines = extract_messages_string(received);
1617 assert_eq!(lines, vec!["second line"]);
1618 }
1619 {
1621 let config = file::FileConfig {
1622 include: vec![dir.path().join("*")],
1623 ignore_checkpoints: Some(true),
1624 read_from: ReadFromConfig::Beginning,
1625 ..test_default_file_config(&dir)
1626 };
1627 let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1628 sleep_500_millis().await;
1629 writeln!(&mut file, "third line").unwrap();
1630 sleep_500_millis().await;
1631 })
1632 .await;
1633
1634 let lines = extract_messages_string(received);
1635 assert_eq!(
1636 lines,
1637 vec!["zeroth line", "first line", "second line", "third line"]
1638 );
1639 }
1640 }
1641
1642 #[tokio::test]
1643 async fn file_start_position_server_restart_unfinalized() {
1644 let dir = tempdir().unwrap();
1645 let config = file::FileConfig {
1646 include: vec![dir.path().join("*")],
1647 ..test_default_file_config(&dir)
1648 };
1649
1650 let path = dir.path().join("file");
1651 let mut file = File::create(&path).unwrap();
1652 writeln!(&mut file, "the line").unwrap();
1653 sleep_500_millis().await;
1654
1655 let received = run_file_source(
1657 &config,
1658 false,
1659 Unfinalized,
1660 LogNamespace::Legacy,
1661 sleep_500_millis(),
1662 )
1663 .await;
1664 let lines = extract_messages_string(received);
1665 assert_eq!(lines, vec!["the line"]);
1666
1667 let received = run_file_source(
1669 &config,
1670 false,
1671 Unfinalized,
1672 LogNamespace::Legacy,
1673 sleep_500_millis(),
1674 )
1675 .await;
1676 let lines = extract_messages_string(received);
1677 assert_eq!(lines, vec!["the line"]);
1678 }
1679
1680 #[tokio::test]
1681 async fn file_duplicate_processing_after_restart() {
1682 let dir = tempdir().unwrap();
1683 let config = file::FileConfig {
1684 include: vec![dir.path().join("*")],
1685 ..test_default_file_config(&dir)
1686 };
1687
1688 let path = dir.path().join("file");
1689 let mut file = File::create(&path).unwrap();
1690
1691 let line_count = 4000;
1692 for i in 0..line_count {
1693 writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1694 }
1695 sleep_500_millis().await;
1696
1697 let received = run_file_source(
1699 &config,
1700 true,
1701 Acks,
1702 LogNamespace::Legacy,
1703 sleep_500_millis(),
1705 )
1706 .await;
1707 let lines = extract_messages_string(received);
1708
1709 assert!(lines.len() < line_count);
1712
1713 let received = run_file_source(
1715 &config,
1716 true,
1717 Acks,
1718 LogNamespace::Legacy,
1719 sleep(Duration::from_secs(5)),
1720 )
1721 .await;
1722 let lines2 = extract_messages_string(received);
1723
1724 assert_eq!(lines.len() + lines2.len(), line_count);
1726 }
1727
1728 #[tokio::test]
1729 async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1730 file_start_position_server_restart_with_file_rotation(Acks).await
1731 }
1732
1733 #[tokio::test]
1734 async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1735 file_start_position_server_restart_with_file_rotation(NoAcks).await
1736 }
1737
1738 async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1739 let dir = tempdir().unwrap();
1740 let config = file::FileConfig {
1741 include: vec![dir.path().join("*")],
1742 ..test_default_file_config(&dir)
1743 };
1744
1745 let path = dir.path().join("file");
1746 let path_for_old_file = dir.path().join("file.old");
1747 {
1749 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1750 let mut file = File::create(&path).unwrap();
1751 sleep_500_millis().await;
1752 writeln!(&mut file, "first line").unwrap();
1753 sleep_500_millis().await;
1754 })
1755 .await;
1756
1757 let lines = extract_messages_string(received);
1758 assert_eq!(lines, vec!["first line"]);
1759 }
1760 fs::rename(&path, &path_for_old_file).expect("could not rename");
1762 {
1765 let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1766 let mut file = File::create(&path).unwrap();
1767 sleep_500_millis().await;
1768 writeln!(&mut file, "second line").unwrap();
1769 sleep_500_millis().await;
1770 })
1771 .await;
1772
1773 let lines = extract_messages_string(received);
1774 assert_eq!(lines, vec!["second line"]);
1775 }
1776 }
1777
1778 #[cfg(unix)] #[tokio::test]
1780 async fn file_start_position_ignore_old_files() {
1781 use std::{
1782 os::unix::io::AsRawFd,
1783 time::{Duration, SystemTime},
1784 };
1785
1786 let dir = tempdir().unwrap();
1787 let config = file::FileConfig {
1788 include: vec![dir.path().join("*")],
1789 ignore_older_secs: Some(5),
1790 ..test_default_file_config(&dir)
1791 };
1792
1793 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1794 let before_path = dir.path().join("before");
1795 let mut before_file = File::create(&before_path).unwrap();
1796 let after_path = dir.path().join("after");
1797 let mut after_file = File::create(&after_path).unwrap();
1798
1799 writeln!(&mut before_file, "first line").unwrap(); writeln!(&mut after_file, "_first line").unwrap(); {
1803 let before = SystemTime::now() - Duration::from_secs(8);
1805 let after = SystemTime::now() - Duration::from_secs(2);
1806
1807 let before_time = libc::timeval {
1808 tv_sec: before
1809 .duration_since(SystemTime::UNIX_EPOCH)
1810 .unwrap()
1811 .as_secs() as _,
1812 tv_usec: 0,
1813 };
1814 let before_times = [before_time, before_time];
1815
1816 let after_time = libc::timeval {
1817 tv_sec: after
1818 .duration_since(SystemTime::UNIX_EPOCH)
1819 .unwrap()
1820 .as_secs() as _,
1821 tv_usec: 0,
1822 };
1823 let after_times = [after_time, after_time];
1824
1825 unsafe {
1826 libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1827 libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1828 }
1829 }
1830
1831 sleep_500_millis().await;
1832 writeln!(&mut before_file, "second line").unwrap();
1833 writeln!(&mut after_file, "_second line").unwrap();
1834
1835 sleep_500_millis().await;
1836 })
1837 .await;
1838
1839 let before_lines = received
1840 .iter()
1841 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1842 .map(|event| {
1843 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1844 })
1845 .collect::<Vec<_>>();
1846 let after_lines = received
1847 .iter()
1848 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1849 .map(|event| {
1850 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1851 })
1852 .collect::<Vec<_>>();
1853 assert_eq!(before_lines, vec!["second line"]);
1854 assert_eq!(after_lines, vec!["_first line", "_second line"]);
1855 }
1856
1857 #[tokio::test]
1858 async fn file_max_line_bytes() {
1859 let dir = tempdir().unwrap();
1860 let config = file::FileConfig {
1861 include: vec![dir.path().join("*")],
1862 max_line_bytes: 10,
1863 ..test_default_file_config(&dir)
1864 };
1865
1866 let path = dir.path().join("file");
1867 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1868 let mut file = File::create(&path).unwrap();
1869
1870 sleep_500_millis().await; writeln!(&mut file, "short").unwrap();
1873 writeln!(&mut file, "this is too long").unwrap();
1874 writeln!(&mut file, "11 eleven11").unwrap();
1875 let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1876 writeln!(&mut file, "{super_long}").unwrap();
1877 writeln!(&mut file, "exactly 10").unwrap();
1878 writeln!(&mut file, "it can end on a line that's too long").unwrap();
1879
1880 sleep_500_millis().await;
1881 sleep_500_millis().await;
1882
1883 writeln!(&mut file, "and then continue").unwrap();
1884 writeln!(&mut file, "last short").unwrap();
1885
1886 sleep_500_millis().await;
1887 sleep_500_millis().await;
1888 }).await;
1889
1890 let received = extract_messages_value(received);
1891
1892 assert_eq!(
1893 received,
1894 vec!["short".into(), "exactly 10".into(), "last short".into()]
1895 );
1896 }
1897
1898 #[tokio::test]
1899 async fn test_multi_line_aggregation_legacy() {
1900 let dir = tempdir().unwrap();
1901 let config = file::FileConfig {
1902 include: vec![dir.path().join("*")],
1903 message_start_indicator: Some("INFO".into()),
1904 multi_line_timeout: 25, ..test_default_file_config(&dir)
1906 };
1907
1908 let path = dir.path().join("file");
1909 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1910 let mut file = File::create(&path).unwrap();
1911
1912 sleep_500_millis().await; writeln!(&mut file, "leftover foo").unwrap();
1915 writeln!(&mut file, "INFO hello").unwrap();
1916 writeln!(&mut file, "INFO goodbye").unwrap();
1917 writeln!(&mut file, "part of goodbye").unwrap();
1918
1919 sleep_500_millis().await;
1920
1921 writeln!(&mut file, "INFO hi again").unwrap();
1922 writeln!(&mut file, "and some more").unwrap();
1923 writeln!(&mut file, "INFO hello").unwrap();
1924
1925 sleep_500_millis().await;
1926
1927 writeln!(&mut file, "too slow").unwrap();
1928 writeln!(&mut file, "INFO doesn't have").unwrap();
1929 writeln!(&mut file, "to be INFO in").unwrap();
1930 writeln!(&mut file, "the middle").unwrap();
1931
1932 sleep_500_millis().await;
1933 })
1934 .await;
1935
1936 let received = extract_messages_value(received);
1937
1938 assert_eq!(
1939 received,
1940 vec![
1941 "leftover foo".into(),
1942 "INFO hello".into(),
1943 "INFO goodbye\npart of goodbye".into(),
1944 "INFO hi again\nand some more".into(),
1945 "INFO hello".into(),
1946 "too slow".into(),
1947 "INFO doesn't have".into(),
1948 "to be INFO in\nthe middle".into(),
1949 ]
1950 );
1951 }
1952
1953 #[tokio::test]
1954 async fn test_multi_line_aggregation() {
1955 let dir = tempdir().unwrap();
1956 let config = file::FileConfig {
1957 include: vec![dir.path().join("*")],
1958 multiline: Some(MultilineConfig {
1959 start_pattern: "INFO".to_owned(),
1960 condition_pattern: "INFO".to_owned(),
1961 mode: line_agg::Mode::HaltBefore,
1962 timeout_ms: Duration::from_millis(25), }),
1964 ..test_default_file_config(&dir)
1965 };
1966
1967 let path = dir.path().join("file");
1968 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1969 let mut file = File::create(&path).unwrap();
1970
1971 sleep_500_millis().await; writeln!(&mut file, "leftover foo").unwrap();
1974 writeln!(&mut file, "INFO hello").unwrap();
1975 writeln!(&mut file, "INFO goodbye").unwrap();
1976 writeln!(&mut file, "part of goodbye").unwrap();
1977
1978 sleep_500_millis().await;
1979
1980 writeln!(&mut file, "INFO hi again").unwrap();
1981 writeln!(&mut file, "and some more").unwrap();
1982 writeln!(&mut file, "INFO hello").unwrap();
1983
1984 sleep_500_millis().await;
1985
1986 writeln!(&mut file, "too slow").unwrap();
1987 writeln!(&mut file, "INFO doesn't have").unwrap();
1988 writeln!(&mut file, "to be INFO in").unwrap();
1989 writeln!(&mut file, "the middle").unwrap();
1990
1991 sleep_500_millis().await;
1992 })
1993 .await;
1994
1995 let received = extract_messages_value(received);
1996
1997 assert_eq!(
1998 received,
1999 vec![
2000 "leftover foo".into(),
2001 "INFO hello".into(),
2002 "INFO goodbye\npart of goodbye".into(),
2003 "INFO hi again\nand some more".into(),
2004 "INFO hello".into(),
2005 "too slow".into(),
2006 "INFO doesn't have".into(),
2007 "to be INFO in\nthe middle".into(),
2008 ]
2009 );
2010 }
2011
2012 #[tokio::test]
2013 async fn test_multi_line_checkpointing() {
2014 let dir = tempdir().unwrap();
2015 let config = file::FileConfig {
2016 include: vec![dir.path().join("*")],
2017 offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2018 multiline: Some(MultilineConfig {
2019 start_pattern: "INFO".to_owned(),
2020 condition_pattern: "INFO".to_owned(),
2021 mode: line_agg::Mode::HaltBefore,
2022 timeout_ms: Duration::from_millis(25), }),
2024 ..test_default_file_config(&dir)
2025 };
2026
2027 let path = dir.path().join("file");
2028 let mut file = File::create(&path).unwrap();
2029
2030 writeln!(&mut file, "INFO hello").unwrap();
2031 writeln!(&mut file, "part of hello").unwrap();
2032
2033 let received = run_file_source(
2035 &config,
2036 false,
2037 Acks,
2038 LogNamespace::Legacy,
2039 sleep_500_millis(),
2040 )
2041 .await;
2042
2043 assert_eq!(received[0].as_log()["offset"], 0.into());
2044
2045 let lines = extract_messages_string(received);
2046 assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2047
2048 let received_after_restart =
2050 run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2051 writeln!(&mut file, "INFO goodbye").unwrap();
2052 })
2053 .await;
2054 assert_eq!(
2055 received_after_restart[0].as_log()["offset"],
2056 (lines[0].len() + 1).into()
2057 );
2058 let lines = extract_messages_string(received_after_restart);
2059 assert_eq!(lines, vec!["INFO goodbye"]);
2060 }
2061
2062 #[tokio::test]
2063 async fn test_fair_reads() {
2064 let dir = tempdir().unwrap();
2065 let config = file::FileConfig {
2066 include: vec![dir.path().join("*")],
2067 max_read_bytes: 1,
2068 oldest_first: false,
2069 ..test_default_file_config(&dir)
2070 };
2071
2072 let older_path = dir.path().join("z_older_file");
2073 let mut older = File::create(&older_path).unwrap();
2074
2075 sleep_500_millis().await;
2076
2077 let newer_path = dir.path().join("a_newer_file");
2078 let mut newer = File::create(&newer_path).unwrap();
2079
2080 writeln!(&mut older, "hello i am the old file").unwrap();
2081 writeln!(&mut older, "i have been around a while").unwrap();
2082 writeln!(&mut older, "you can read newer files at the same time").unwrap();
2083
2084 writeln!(&mut newer, "and i am the new file").unwrap();
2085 writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2086 writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2087
2088 sleep_500_millis().await;
2089
2090 let received = run_file_source(
2091 &config,
2092 false,
2093 NoAcks,
2094 LogNamespace::Legacy,
2095 sleep_500_millis(),
2096 )
2097 .await;
2098
2099 let received = extract_messages_value(received);
2100
2101 assert_eq!(
2102 received,
2103 vec![
2104 "hello i am the old file".into(),
2105 "and i am the new file".into(),
2106 "i have been around a while".into(),
2107 "this should be interleaved with the old one".into(),
2108 "you can read newer files at the same time".into(),
2109 "which is fine because we want fairness".into(),
2110 ]
2111 );
2112 }
2113
2114 #[tokio::test]
2115 async fn test_oldest_first() {
2116 let dir = tempdir().unwrap();
2117 let config = file::FileConfig {
2118 include: vec![dir.path().join("*")],
2119 max_read_bytes: 1,
2120 oldest_first: true,
2121 ..test_default_file_config(&dir)
2122 };
2123
2124 let older_path = dir.path().join("z_older_file");
2125 let mut older = File::create(&older_path).unwrap();
2126
2127 sleep_500_millis().await;
2128
2129 let newer_path = dir.path().join("a_newer_file");
2130 let mut newer = File::create(&newer_path).unwrap();
2131
2132 writeln!(&mut older, "hello i am the old file").unwrap();
2133 writeln!(&mut older, "i have been around a while").unwrap();
2134 writeln!(&mut older, "you should definitely read all of me first").unwrap();
2135
2136 writeln!(&mut newer, "i'm new").unwrap();
2137 writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2138 writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2139
2140 sleep_500_millis().await;
2141
2142 let received = run_file_source(
2143 &config,
2144 false,
2145 NoAcks,
2146 LogNamespace::Legacy,
2147 sleep_500_millis(),
2148 )
2149 .await;
2150
2151 let received = extract_messages_value(received);
2152
2153 assert_eq!(
2154 received,
2155 vec![
2156 "hello i am the old file".into(),
2157 "i have been around a while".into(),
2158 "you should definitely read all of me first".into(),
2159 "i'm new".into(),
2160 "hopefully you read all the old stuff first".into(),
2161 "because otherwise i'm not going to make sense".into(),
2162 ]
2163 );
2164 }
2165
2166 #[cfg(not(target_os = "macos"))]
2168 #[tokio::test]
2169 async fn test_split_reads() {
2170 let dir = tempdir().unwrap();
2171 let config = file::FileConfig {
2172 include: vec![dir.path().join("*")],
2173 max_read_bytes: 1,
2174 ..test_default_file_config(&dir)
2175 };
2176
2177 let path = dir.path().join("file");
2178 let mut file = File::create(&path).unwrap();
2179
2180 writeln!(&mut file, "hello i am a normal line").unwrap();
2181
2182 sleep_500_millis().await;
2183
2184 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2185 sleep_500_millis().await;
2186
2187 write!(&mut file, "i am not a full line").unwrap();
2188
2189 sleep_500_millis().await;
2191
2192 writeln!(&mut file, " until now").unwrap();
2193
2194 sleep_500_millis().await;
2195 })
2196 .await;
2197
2198 let received = extract_messages_value(received);
2199
2200 assert_eq!(
2201 received,
2202 vec![
2203 "hello i am a normal line".into(),
2204 "i am not a full line until now".into(),
2205 ]
2206 );
2207 }
2208
2209 #[tokio::test]
2210 async fn test_gzipped_file() {
2211 let dir = tempdir().unwrap();
2212 let config = file::FileConfig {
2213 include: vec![PathBuf::from("tests/data/gzipped.log")],
2214 max_line_bytes: 100,
2221 ..test_default_file_config(&dir)
2222 };
2223
2224 let received = run_file_source(
2225 &config,
2226 false,
2227 NoAcks,
2228 LogNamespace::Legacy,
2229 sleep_500_millis(),
2230 )
2231 .await;
2232
2233 let received = extract_messages_value(received);
2234
2235 assert_eq!(
2236 received,
2237 vec![
2238 "this is a simple file".into(),
2239 "i have been compressed".into(),
2240 "in order to make me smaller".into(),
2241 "but you can still read me".into(),
2242 "hooray".into(),
2243 ]
2244 );
2245 }
2246
2247 #[tokio::test]
2248 async fn test_non_utf8_encoded_file() {
2249 let dir = tempdir().unwrap();
2250 let config = file::FileConfig {
2251 include: vec![PathBuf::from("tests/data/utf-16le.log")],
2252 encoding: Some(EncodingConfig { charset: UTF_16LE }),
2253 ..test_default_file_config(&dir)
2254 };
2255
2256 let received = run_file_source(
2257 &config,
2258 false,
2259 NoAcks,
2260 LogNamespace::Legacy,
2261 sleep_500_millis(),
2262 )
2263 .await;
2264
2265 let received = extract_messages_value(received);
2266
2267 assert_eq!(
2268 received,
2269 vec![
2270 "hello i am a file".into(),
2271 "i can unicode".into(),
2272 "but i do so in 16 bits".into(),
2273 "and when i byte".into(),
2274 "i become little-endian".into(),
2275 ]
2276 );
2277 }
2278
2279 #[tokio::test]
2280 async fn test_non_default_line_delimiter() {
2281 let dir = tempdir().unwrap();
2282 let config = file::FileConfig {
2283 include: vec![dir.path().join("*")],
2284 line_delimiter: "\r\n".to_string(),
2285 ..test_default_file_config(&dir)
2286 };
2287
2288 let path = dir.path().join("file");
2289 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2290 let mut file = File::create(&path).unwrap();
2291
2292 sleep_500_millis().await; write!(&mut file, "hello i am a line\r\n").unwrap();
2295 write!(&mut file, "and i am too\r\n").unwrap();
2296 write!(&mut file, "CRLF is how we end\r\n").unwrap();
2297 write!(&mut file, "please treat us well\r\n").unwrap();
2298
2299 sleep_500_millis().await;
2300 })
2301 .await;
2302
2303 let received = extract_messages_value(received);
2304
2305 assert_eq!(
2306 received,
2307 vec![
2308 "hello i am a line".into(),
2309 "and i am too".into(),
2310 "CRLF is how we end".into(),
2311 "please treat us well".into()
2312 ]
2313 );
2314 }
2315
2316 #[tokio::test]
2317 async fn remove_file() {
2318 let n = 5;
2319 let remove_after_secs = 1;
2320
2321 let dir = tempdir().unwrap();
2322 let config = file::FileConfig {
2323 include: vec![dir.path().join("*")],
2324 remove_after_secs: Some(remove_after_secs),
2325 ..test_default_file_config(&dir)
2326 };
2327
2328 let path = dir.path().join("file");
2329 let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2330 let mut file = File::create(&path).unwrap();
2331
2332 sleep_500_millis().await; for i in 0..n {
2335 writeln!(&mut file, "{i}").unwrap();
2336 }
2337 drop(file);
2338
2339 for _ in 0..10 {
2340 sleep(Duration::from_secs(remove_after_secs + 1)).await;
2342
2343 if File::open(&path).is_err() {
2344 break;
2345 }
2346 }
2347 })
2348 .await;
2349
2350 assert_eq!(received.len(), n);
2351
2352 match File::open(&path) {
2353 Ok(_) => panic!("File wasn't removed"),
2354 Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2355 }
2356 }
2357
2358 #[derive(Clone, Copy, Eq, PartialEq)]
2359 enum AckingMode {
2360 NoAcks, Unfinalized, Acks, }
2364 use vector_lib::lookup::OwnedTargetPath;
2365 use AckingMode::*;
2366
2367 async fn run_file_source(
2368 config: &FileConfig,
2369 wait_shutdown: bool,
2370 acking_mode: AckingMode,
2371 log_namespace: LogNamespace,
2372 inner: impl Future<Output = ()>,
2373 ) -> Vec<Event> {
2374 assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2375 let (tx, rx) = if acking_mode == Acks {
2376 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2377 (tx, rx.boxed())
2378 } else {
2379 let (tx, rx) = SourceSender::new_test();
2380 (tx, rx.boxed())
2381 };
2382
2383 let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2384 let data_dir = config.data_dir.clone().unwrap();
2385 let acks = !matches!(acking_mode, NoAcks);
2386
2387 tokio::spawn(file::file_source(
2388 config,
2389 data_dir,
2390 shutdown,
2391 tx,
2392 acks,
2393 log_namespace,
2394 ));
2395
2396 inner.await;
2397
2398 drop(trigger_shutdown);
2399
2400 let result = if acking_mode == Unfinalized {
2401 rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2402 .collect::<Vec<_>>()
2403 .await
2404 } else {
2405 timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2406 .await
2407 .expect(
2408 "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2409 )
2410 };
2411 if wait_shutdown {
2412 shutdown_done.await;
2413 }
2414
2415 result
2416 })
2417 .await
2418 }
2419
2420 fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2421 received
2422 .into_iter()
2423 .map(Event::into_log)
2424 .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2425 .collect()
2426 }
2427
2428 fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2429 received
2430 .into_iter()
2431 .map(Event::into_log)
2432 .map(|log| log.get_message().unwrap().clone())
2433 .collect()
2434 }
2435}