1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::{BytesDeserializer, BytesDeserializerConfig},
14 config::{LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 file_source::{
17 file_server::{FileServer, Line, calculate_ignore_before},
18 paths_provider::{Glob, MatchOptions},
19 },
20 file_source_common::{
21 Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22 },
23 finalizer::OrderedFinalizer,
24 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30 SourceSender,
31 config::{
32 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33 log_schema,
34 },
35 encoding_transcode::{Decoder, Encoder},
36 event::{BatchNotifier, BatchStatus, LogEvent},
37 internal_events::{
38 FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39 FileSourceInternalEventsEmitter, StreamClosedError,
40 },
41 line_agg::{self, LineAgg},
42 serde::bool_or_struct,
43 shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48 #[snafu(display(
49 "message_start_indicator {:?} is not a valid regex: {}",
50 indicator,
51 source
52 ))]
53 InvalidMessageStartIndicator {
54 indicator: String,
55 source: regex::Error,
56 },
57}
58
59#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65 #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67 pub include: Vec<PathBuf>,
68
69 #[serde(default)]
75 #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76 pub exclude: Vec<PathBuf>,
77
78 #[serde(default = "default_file_key")]
84 #[configurable(metadata(docs::examples = "path"))]
85 pub file_key: OptionalValuePath,
86
87 #[configurable(
89 deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90 )]
91 #[configurable(metadata(docs::hidden))]
92 #[serde(default)]
93 pub start_at_beginning: Option<bool>,
94
95 #[serde(default)]
99 pub ignore_checkpoints: Option<bool>,
100
101 #[serde(default = "default_read_from")]
102 #[configurable(derived)]
103 pub read_from: ReadFromConfig,
104
105 #[serde(alias = "ignore_older", default)]
107 #[configurable(metadata(docs::type_unit = "seconds"))]
108 #[configurable(metadata(docs::examples = 600))]
109 #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110 pub ignore_older_secs: Option<u64>,
111
112 #[serde(default = "default_max_line_bytes")]
116 #[configurable(metadata(docs::type_unit = "bytes"))]
117 pub max_line_bytes: usize,
118
119 #[configurable(metadata(docs::examples = "hostname"))]
127 pub host_key: Option<OptionalValuePath>,
128
129 #[serde(default)]
138 #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139 #[configurable(metadata(docs::human_name = "Data Directory"))]
140 pub data_dir: Option<PathBuf>,
141
142 #[serde(default)]
148 #[configurable(metadata(docs::examples = "offset"))]
149 pub offset_key: Option<OptionalValuePath>,
150
151 #[serde(
157 alias = "glob_minimum_cooldown",
158 default = "default_glob_minimum_cooldown_ms"
159 )]
160 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161 #[configurable(metadata(docs::type_unit = "milliseconds"))]
162 #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163 pub glob_minimum_cooldown_ms: Duration,
164
165 #[configurable(derived)]
166 #[serde(alias = "fingerprinting", default)]
167 fingerprint: FingerprintConfig,
168
169 #[serde(default)]
173 pub ignore_not_found: bool,
174
175 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177 #[configurable(metadata(docs::hidden))]
178 #[serde(default)]
179 pub message_start_indicator: Option<String>,
180
181 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183 #[configurable(metadata(docs::hidden))]
184 #[serde(default = "default_multi_line_timeout")]
185 pub multi_line_timeout: u64,
186
187 #[configurable(derived)]
191 #[serde(default)]
192 pub multiline: Option<MultilineConfig>,
193
194 #[serde(default = "default_max_read_bytes")]
200 #[configurable(metadata(docs::type_unit = "bytes"))]
201 pub max_read_bytes: usize,
202
203 #[serde(default)]
205 pub oldest_first: bool,
206
207 #[serde(alias = "remove_after", default)]
211 #[configurable(metadata(docs::type_unit = "seconds"))]
212 #[configurable(metadata(docs::examples = 0))]
213 #[configurable(metadata(docs::examples = 5))]
214 #[configurable(metadata(docs::examples = 60))]
215 #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216 pub remove_after_secs: Option<u64>,
217
218 #[serde(default = "default_line_delimiter")]
220 #[configurable(metadata(docs::examples = "\r\n"))]
221 pub line_delimiter: String,
222
223 #[configurable(derived)]
224 #[serde(default)]
225 pub encoding: Option<EncodingConfig>,
226
227 #[configurable(derived)]
228 #[serde(default, deserialize_with = "bool_or_struct")]
229 acknowledgements: SourceAcknowledgementsConfig,
230
231 #[configurable(metadata(docs::hidden))]
233 #[serde(default)]
234 log_namespace: Option<bool>,
235
236 #[configurable(derived)]
237 #[serde(default)]
238 internal_metrics: FileInternalMetricsConfig,
239
240 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243 #[configurable(metadata(docs::type_unit = "seconds"))]
244 #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245 pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249 bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253 OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257 ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261 Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265 1000
266} const fn default_max_read_bytes() -> usize {
269 2048
270}
271
272fn default_line_delimiter() -> String {
273 "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277 Duration::from_secs(u64::MAX / 2)
278}
279
280#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287 docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290 Checksum {
292 #[serde(default = "default_ignored_header_bytes")]
298 #[configurable(metadata(docs::type_unit = "bytes"))]
299 ignored_header_bytes: usize,
300
301 #[serde(default = "default_lines")]
308 #[configurable(metadata(docs::type_unit = "lines"))]
309 lines: usize,
310 },
311
312 #[serde(rename = "device_and_inode")]
316 DevInode,
317}
318
319impl Default for FingerprintConfig {
320 fn default() -> Self {
321 Self::Checksum {
322 ignored_header_bytes: 0,
323 lines: default_lines(),
324 }
325 }
326}
327
328const fn default_ignored_header_bytes() -> usize {
329 0
330}
331
332const fn default_lines() -> usize {
333 1
334}
335
336impl From<FingerprintConfig> for FingerprintStrategy {
337 fn from(config: FingerprintConfig) -> FingerprintStrategy {
338 match config {
339 FingerprintConfig::Checksum {
340 ignored_header_bytes,
341 lines,
342 } => FingerprintStrategy::FirstLinesChecksum {
343 ignored_header_bytes,
344 lines,
345 },
346 FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
347 }
348 }
349}
350
351#[derive(Debug)]
352pub(crate) struct FinalizerEntry {
353 pub(crate) file_id: FileFingerprint,
354 pub(crate) offset: u64,
355}
356
357impl Default for FileConfig {
358 fn default() -> Self {
359 Self {
360 include: vec![PathBuf::from("/var/log/**/*.log")],
361 exclude: vec![],
362 file_key: default_file_key(),
363 start_at_beginning: None,
364 ignore_checkpoints: None,
365 read_from: default_read_from(),
366 ignore_older_secs: None,
367 max_line_bytes: default_max_line_bytes(),
368 fingerprint: FingerprintConfig::default(),
369 ignore_not_found: false,
370 host_key: None,
371 offset_key: None,
372 data_dir: None,
373 glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
374 message_start_indicator: None,
375 multi_line_timeout: default_multi_line_timeout(), multiline: None,
377 max_read_bytes: default_max_read_bytes(),
378 oldest_first: false,
379 remove_after_secs: None,
380 line_delimiter: default_line_delimiter(),
381 encoding: None,
382 acknowledgements: Default::default(),
383 log_namespace: None,
384 internal_metrics: Default::default(),
385 rotate_wait: default_rotate_wait(),
386 }
387 }
388}
389
390impl_generate_config_from_default!(FileConfig);
391
392#[async_trait::async_trait]
393#[typetag::serde(name = "file")]
394impl SourceConfig for FileConfig {
395 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
396 let data_dir = cx
401 .globals
402 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
404
405 #[allow(clippy::suspicious_else_formatting)]
407 {
408 if let Some(ref config) = self.multiline {
409 let _: line_agg::Config = config.try_into()?;
410 }
411
412 if let Some(ref indicator) = self.message_start_indicator {
413 Regex::new(indicator)
414 .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
415 }
416 }
417
418 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
419
420 let log_namespace = cx.log_namespace(self.log_namespace);
421
422 Ok(file_source(
423 self,
424 data_dir,
425 cx.shutdown,
426 cx.out,
427 acknowledgements,
428 log_namespace,
429 ))
430 }
431
432 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
433 let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
434 let host_key = self
435 .host_key
436 .clone()
437 .unwrap_or(log_schema().host_key().cloned().into())
438 .path
439 .map(LegacyKey::Overwrite);
440
441 let offset_key = self
442 .offset_key
443 .clone()
444 .and_then(|k| k.path)
445 .map(LegacyKey::Overwrite);
446
447 let schema_definition = BytesDeserializerConfig
448 .schema_definition(global_log_namespace.merge(self.log_namespace))
449 .with_standard_vector_source_metadata()
450 .with_source_metadata(
451 Self::NAME,
452 host_key,
453 &owned_value_path!("host"),
454 Kind::bytes().or_undefined(),
455 Some("host"),
456 )
457 .with_source_metadata(
458 Self::NAME,
459 offset_key,
460 &owned_value_path!("offset"),
461 Kind::integer(),
462 None,
463 )
464 .with_source_metadata(
465 Self::NAME,
466 file_key,
467 &owned_value_path!("path"),
468 Kind::bytes(),
469 None,
470 );
471
472 vec![SourceOutput::new_maybe_logs(
473 DataType::Log,
474 schema_definition,
475 )]
476 }
477
478 fn can_acknowledge(&self) -> bool {
479 true
480 }
481}
482
483pub fn file_source(
484 config: &FileConfig,
485 data_dir: PathBuf,
486 shutdown: ShutdownSignal,
487 mut out: SourceSender,
488 acknowledgements: bool,
489 log_namespace: LogNamespace,
490) -> super::Source {
491 if config.include.is_empty() {
493 error!(
494 message = "`include` configuration option must contain at least one file pattern.",
495 internal_log_rate_limit = false
496 );
497 return Box::pin(future::ready(Err(())));
498 }
499
500 let exclude_patterns = config
501 .exclude
502 .iter()
503 .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
504 .collect::<Vec<PathBuf>>();
505 let ignore_before = calculate_ignore_before(config.ignore_older_secs);
506 let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
507 let (ignore_checkpoints, read_from) = reconcile_position_options(
508 config.start_at_beginning,
509 config.ignore_checkpoints,
510 Some(config.read_from),
511 );
512
513 let emitter = FileSourceInternalEventsEmitter {
514 include_file_metric_tag: config.internal_metrics.include_file_tag,
515 };
516
517 let paths_provider = Glob::new(
518 &config.include,
519 &exclude_patterns,
520 MatchOptions::default(),
521 emitter.clone(),
522 )
523 .expect("invalid glob patterns");
524
525 let encoding_charset = config.encoding.clone().map(|e| e.charset);
526
527 let line_delimiter_as_bytes = match encoding_charset {
530 Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
531 None => Bytes::from(config.line_delimiter.clone()),
532 };
533
534 let checkpointer = Checkpointer::new(&data_dir);
535 let strategy = config.fingerprint.clone().into();
536
537 let file_server = FileServer {
538 paths_provider,
539 max_read_bytes: config.max_read_bytes,
540 ignore_checkpoints,
541 read_from,
542 ignore_before,
543 max_line_bytes: config.max_line_bytes,
544 line_delimiter: line_delimiter_as_bytes,
545 data_dir,
546 glob_minimum_cooldown,
547 fingerprinter: Fingerprinter::new(strategy, config.max_line_bytes, config.ignore_not_found),
548 oldest_first: config.oldest_first,
549 remove_after: config.remove_after_secs.map(Duration::from_secs),
550 emitter,
551 rotate_wait: config.rotate_wait,
552 };
553
554 let event_metadata = EventMetadata {
555 host_key: config
556 .host_key
557 .clone()
558 .unwrap_or(log_schema().host_key().cloned().into())
559 .path,
560 hostname: crate::get_hostname().ok(),
561 file_key: config.file_key.clone().path,
562 offset_key: config.offset_key.clone().and_then(|k| k.path),
563 };
564
565 let include = config.include.clone();
566 let exclude = config.exclude.clone();
567 let multiline_config = config.multiline.clone();
568 let message_start_indicator = config.message_start_indicator.clone();
569 let multi_line_timeout = config.multi_line_timeout;
570
571 let (finalizer, shutdown_checkpointer) = if acknowledgements {
572 let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
576
577 let (send_shutdown, shutdown2) = oneshot::channel::<()>();
582 let checkpoints = checkpointer.view();
583 tokio::spawn(async move {
584 while let Some((status, entry)) = ack_stream.next().await {
585 if status == BatchStatus::Delivered {
586 checkpoints.update(entry.file_id, entry.offset);
587 }
588 }
589 send_shutdown.send(())
590 });
591 (Some(finalizer), shutdown2.map(|_| ()).boxed())
592 } else {
593 (None, shutdown.clone().map(|_| ()).boxed())
596 };
597
598 let checkpoints = checkpointer.view();
599 let include_file_metric_tag = config.internal_metrics.include_file_tag;
600 Box::pin(async move {
601 info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
602
603 let mut encoding_decoder = encoding_charset.map(Decoder::new);
604
605 let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
607 let rx = rx
608 .map(futures::stream::iter)
609 .flatten()
610 .map(move |mut line| {
611 emit!(FileBytesReceived {
612 byte_size: line.text.len(),
613 file: &line.filename,
614 include_file_metric_tag,
615 });
616 line.text = match encoding_decoder.as_mut() {
618 Some(d) => d.decode_to_utf8(line.text),
619 None => line.text,
620 };
621 line
622 });
623
624 let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
625 if let Some(ref multiline_config) = multiline_config {
626 wrap_with_line_agg(
627 rx,
628 multiline_config.try_into().unwrap(), )
630 } else if let Some(msi) = message_start_indicator {
631 wrap_with_line_agg(
632 rx,
633 line_agg::Config::for_legacy(
634 Regex::new(&msi).unwrap(), multi_line_timeout,
636 ),
637 )
638 } else {
639 Box::new(rx)
640 };
641
642 let span = Span::current();
645 let mut messages = messages.map(move |line| {
646 let mut event = create_event(
647 line.text,
648 line.start_offset,
649 &line.filename,
650 &event_metadata,
651 log_namespace,
652 include_file_metric_tag,
653 );
654
655 if let Some(finalizer) = &finalizer {
656 let (batch, receiver) = BatchNotifier::new_with_receiver();
657 event = event.with_batch_notifier(&batch);
658 let entry = FinalizerEntry {
659 file_id: line.file_id,
660 offset: line.end_offset,
661 };
662 finalizer.add(entry, receiver);
664 } else {
665 checkpoints.update(line.file_id, line.end_offset);
666 }
667 event
668 });
669 tokio::spawn(async move {
670 match out
671 .send_event_stream(&mut messages)
672 .instrument(span.or_current())
673 .await
674 {
675 Ok(()) => {
676 debug!("Finished sending.");
677 }
678 Err(_) => {
679 let (count, _) = messages.size_hint();
680 emit!(StreamClosedError { count });
681 }
682 }
683 });
684
685 let span = info_span!("file_server");
686 tokio::task::spawn_blocking(move || {
687 let _enter = span.enter();
688 let rt = tokio::runtime::Handle::current();
689 let result =
690 rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
691 emit!(FileOpen { count: 0 });
692 result.expect("file server exited with an error");
696 })
697 .map_err(|error| error!(message="File server unexpectedly stopped.", %error, internal_log_rate_limit = false))
698 .await
699 })
700}
701
702fn reconcile_position_options(
705 start_at_beginning: Option<bool>,
706 ignore_checkpoints: Option<bool>,
707 read_from: Option<ReadFromConfig>,
708) -> (bool, ReadFrom) {
709 if start_at_beginning.is_some() {
710 warn!(
711 message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
712 )
713 }
714
715 match start_at_beginning {
716 Some(true) => (
717 ignore_checkpoints.unwrap_or(true),
718 read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
719 ),
720 _ => (
721 ignore_checkpoints.unwrap_or(false),
722 read_from.map(Into::into).unwrap_or_default(),
723 ),
724 }
725}
726
727fn wrap_with_line_agg(
728 rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
729 config: line_agg::Config,
730) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
731 let logic = line_agg::Logic::new(config);
732 Box::new(
733 LineAgg::new(
734 rx.map(|line| {
735 (
736 line.filename,
737 line.text,
738 (line.file_id, line.start_offset, line.end_offset),
739 )
740 }),
741 logic,
742 )
743 .map(
744 |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
745 text,
746 filename,
747 file_id,
748 start_offset,
749 end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
750 lastline_end_offset
751 }),
752 },
753 ),
754 )
755}
756
757struct EventMetadata {
758 host_key: Option<OwnedValuePath>,
759 hostname: Option<String>,
760 file_key: Option<OwnedValuePath>,
761 offset_key: Option<OwnedValuePath>,
762}
763
764fn create_event(
765 line: Bytes,
766 offset: u64,
767 file: &str,
768 meta: &EventMetadata,
769 log_namespace: LogNamespace,
770 include_file_metric_tag: bool,
771) -> LogEvent {
772 let deserializer = BytesDeserializer;
773 let mut event = deserializer.parse_single(line, log_namespace);
774
775 log_namespace.insert_vector_metadata(
776 &mut event,
777 log_schema().source_type_key(),
778 path!("source_type"),
779 Bytes::from_static(FileConfig::NAME.as_bytes()),
780 );
781 log_namespace.insert_vector_metadata(
782 &mut event,
783 log_schema().timestamp_key(),
784 path!("ingest_timestamp"),
785 Utc::now(),
786 );
787
788 let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
789 if let Some(hostname) = &meta.hostname {
791 log_namespace.insert_source_metadata(
792 FileConfig::NAME,
793 &mut event,
794 legacy_host_key,
795 path!("host"),
796 hostname.clone(),
797 );
798 }
799
800 let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
801 log_namespace.insert_source_metadata(
802 FileConfig::NAME,
803 &mut event,
804 legacy_offset_key,
805 path!("offset"),
806 offset,
807 );
808
809 let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
810 log_namespace.insert_source_metadata(
811 FileConfig::NAME,
812 &mut event,
813 legacy_file_key,
814 path!("path"),
815 file,
816 );
817
818 emit!(FileEventsReceived {
819 count: 1,
820 file,
821 byte_size: event.estimated_json_encoded_size_of(),
822 include_file_metric_tag,
823 });
824
825 event
826}
827
828#[cfg(test)]
829mod tests {
830 use std::{
831 collections::HashSet,
832 fs::{self, File},
833 future::Future,
834 io::{Seek, Write},
835 };
836
837 use encoding_rs::UTF_16LE;
838 use similar_asserts::assert_eq;
839 use tempfile::tempdir;
840 use tokio::time::{Duration, sleep, timeout};
841 use vector_lib::schema::Definition;
842 use vrl::{value, value::kind::Collection};
843
844 use super::*;
845 use crate::{
846 config::Config,
847 event::{Event, EventStatus, Value},
848 shutdown::ShutdownSignal,
849 sources::file,
850 test_util::components::{FILE_SOURCE_TAGS, assert_source_compliance},
851 };
852
853 #[test]
854 fn generate_config() {
855 crate::test_util::test_generate_config::<FileConfig>();
856 }
857
858 fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
859 file::FileConfig {
860 fingerprint: FingerprintConfig::Checksum {
861 ignored_header_bytes: 0,
862 lines: 1,
863 },
864 data_dir: Some(dir.path().to_path_buf()),
865 glob_minimum_cooldown_ms: Duration::from_millis(100),
866 internal_metrics: FileInternalMetricsConfig {
867 include_file_tag: true,
868 },
869 ..Default::default()
870 }
871 }
872
873 async fn sleep_500_millis() {
874 sleep(Duration::from_millis(500)).await;
875 }
876
877 #[test]
878 fn parse_config() {
879 let config: FileConfig = toml::from_str(
880 r#"
881 include = [ "/var/log/**/*.log" ]
882 file_key = "file"
883 glob_minimum_cooldown_ms = 1000
884 multi_line_timeout = 1000
885 max_read_bytes = 2048
886 line_delimiter = "\n"
887 "#,
888 )
889 .unwrap();
890 assert_eq!(config, FileConfig::default());
891 assert_eq!(
892 config.fingerprint,
893 FingerprintConfig::Checksum {
894 ignored_header_bytes: 0,
895 lines: 1
896 }
897 );
898
899 let config: FileConfig = toml::from_str(
900 r#"
901 include = [ "/var/log/**/*.log" ]
902 [fingerprint]
903 strategy = "device_and_inode"
904 "#,
905 )
906 .unwrap();
907 assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
908
909 let config: FileConfig = toml::from_str(
910 r#"
911 include = [ "/var/log/**/*.log" ]
912 [fingerprint]
913 strategy = "checksum"
914 bytes = 128
915 ignored_header_bytes = 512
916 "#,
917 )
918 .unwrap();
919 assert_eq!(
920 config.fingerprint,
921 FingerprintConfig::Checksum {
922 ignored_header_bytes: 512,
923 lines: 1
924 }
925 );
926
927 let config: FileConfig = toml::from_str(
928 r#"
929 include = [ "/var/log/**/*.log" ]
930 [encoding]
931 charset = "utf-16le"
932 "#,
933 )
934 .unwrap();
935 assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
936
937 let config: FileConfig = toml::from_str(
938 r#"
939 include = [ "/var/log/**/*.log" ]
940 read_from = "beginning"
941 "#,
942 )
943 .unwrap();
944 assert_eq!(config.read_from, ReadFromConfig::Beginning);
945
946 let config: FileConfig = toml::from_str(
947 r#"
948 include = [ "/var/log/**/*.log" ]
949 read_from = "end"
950 "#,
951 )
952 .unwrap();
953 assert_eq!(config.read_from, ReadFromConfig::End);
954 }
955
956 #[test]
957 fn resolve_data_dir() {
958 let global_dir = tempdir().unwrap();
959 let local_dir = tempdir().unwrap();
960
961 let mut config = Config::default();
962 config.global.data_dir = global_dir.keep().into();
963
964 let res = config
966 .global
967 .resolve_and_validate_data_dir(test_default_file_config(&local_dir).data_dir.as_ref())
968 .unwrap();
969 assert_eq!(res, local_dir.path());
970
971 let res = config.global.resolve_and_validate_data_dir(None).unwrap();
973 assert_eq!(res, config.global.data_dir.unwrap());
974 }
975
976 #[test]
977 fn output_schema_definition_vector_namespace() {
978 let definitions = FileConfig::default()
979 .outputs(LogNamespace::Vector)
980 .remove(0)
981 .schema_definition(true);
982
983 assert_eq!(
984 definitions,
985 Some(
986 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
987 .with_meaning(OwnedTargetPath::event_root(), "message")
988 .with_metadata_field(
989 &owned_value_path!("vector", "source_type"),
990 Kind::bytes(),
991 None
992 )
993 .with_metadata_field(
994 &owned_value_path!("vector", "ingest_timestamp"),
995 Kind::timestamp(),
996 None
997 )
998 .with_metadata_field(
999 &owned_value_path!("file", "host"),
1000 Kind::bytes().or_undefined(),
1001 Some("host")
1002 )
1003 .with_metadata_field(
1004 &owned_value_path!("file", "offset"),
1005 Kind::integer(),
1006 None
1007 )
1008 .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1009 )
1010 )
1011 }
1012
1013 #[test]
1014 fn output_schema_definition_legacy_namespace() {
1015 let definitions = FileConfig::default()
1016 .outputs(LogNamespace::Legacy)
1017 .remove(0)
1018 .schema_definition(true);
1019
1020 assert_eq!(
1021 definitions,
1022 Some(
1023 Definition::new_with_default_metadata(
1024 Kind::object(Collection::empty()),
1025 [LogNamespace::Legacy]
1026 )
1027 .with_event_field(
1028 &owned_value_path!("message"),
1029 Kind::bytes(),
1030 Some("message")
1031 )
1032 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1033 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1034 .with_event_field(
1035 &owned_value_path!("host"),
1036 Kind::bytes().or_undefined(),
1037 Some("host")
1038 )
1039 .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1040 .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1041 )
1042 )
1043 }
1044
1045 #[test]
1046 fn create_event_legacy_namespace() {
1047 let line = Bytes::from("hello world");
1048 let file = "some_file.rs";
1049 let offset: u64 = 0;
1050
1051 let meta = EventMetadata {
1052 host_key: Some(owned_value_path!("host")),
1053 hostname: Some("Some.Machine".to_string()),
1054 file_key: Some(owned_value_path!("file")),
1055 offset_key: Some(owned_value_path!("offset")),
1056 };
1057 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1058
1059 assert_eq!(log["file"], "some_file.rs".into());
1060 assert_eq!(log["host"], "Some.Machine".into());
1061 assert_eq!(log["offset"], 0.into());
1062 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1063 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1064 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1065 }
1066
1067 #[test]
1068 fn create_event_custom_fields_legacy_namespace() {
1069 let line = Bytes::from("hello world");
1070 let file = "some_file.rs";
1071 let offset: u64 = 0;
1072
1073 let meta = EventMetadata {
1074 host_key: Some(owned_value_path!("hostname")),
1075 hostname: Some("Some.Machine".to_string()),
1076 file_key: Some(owned_value_path!("file_path")),
1077 offset_key: Some(owned_value_path!("off")),
1078 };
1079 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1080
1081 assert_eq!(log["file_path"], "some_file.rs".into());
1082 assert_eq!(log["hostname"], "Some.Machine".into());
1083 assert_eq!(log["off"], 0.into());
1084 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1085 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1086 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1087 }
1088
1089 #[test]
1090 fn create_event_vector_namespace() {
1091 let line = Bytes::from("hello world");
1092 let file = "some_file.rs";
1093 let offset: u64 = 0;
1094
1095 let meta = EventMetadata {
1096 host_key: Some(owned_value_path!("ignored")),
1097 hostname: Some("Some.Machine".to_string()),
1098 file_key: Some(owned_value_path!("ignored")),
1099 offset_key: Some(owned_value_path!("ignored")),
1100 };
1101 let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1102
1103 assert_eq!(log.value(), &value!("hello world"));
1104
1105 assert_eq!(
1106 log.metadata()
1107 .value()
1108 .get(path!("vector", "source_type"))
1109 .unwrap(),
1110 &value!("file")
1111 );
1112 assert!(
1113 log.metadata()
1114 .value()
1115 .get(path!("vector", "ingest_timestamp"))
1116 .unwrap()
1117 .is_timestamp()
1118 );
1119
1120 assert_eq!(
1121 log.metadata()
1122 .value()
1123 .get(path!(FileConfig::NAME, "host"))
1124 .unwrap(),
1125 &value!("Some.Machine")
1126 );
1127 assert_eq!(
1128 log.metadata()
1129 .value()
1130 .get(path!(FileConfig::NAME, "offset"))
1131 .unwrap(),
1132 &value!(0)
1133 );
1134 assert_eq!(
1135 log.metadata()
1136 .value()
1137 .get(path!(FileConfig::NAME, "path"))
1138 .unwrap(),
1139 &value!("some_file.rs")
1140 );
1141 }
1142
1143 #[tokio::test]
1144 async fn file_happy_path() {
1145 let n = 5;
1146
1147 let dir = tempdir().unwrap();
1148 let config = file::FileConfig {
1149 include: vec![dir.path().join("*")],
1150 ..test_default_file_config(&dir)
1151 };
1152
1153 let path1 = dir.path().join("file1");
1154 let path2 = dir.path().join("file2");
1155
1156 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1157 let mut file1 = File::create(&path1).unwrap();
1158 let mut file2 = File::create(&path2).unwrap();
1159
1160 for i in 0..n {
1161 writeln!(&mut file1, "hello {i}").unwrap();
1162 writeln!(&mut file2, "goodbye {i}").unwrap();
1163 }
1164
1165 file1.flush().unwrap();
1166 file2.flush().unwrap();
1167
1168 sleep_500_millis().await;
1169 })
1170 .await;
1171
1172 let mut hello_i = 0;
1173 let mut goodbye_i = 0;
1174
1175 for event in received {
1176 let line =
1177 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1178 if line.starts_with("hello") {
1179 assert_eq!(line, format!("hello {}", hello_i));
1180 assert_eq!(
1181 event.as_log()["file"].to_string_lossy(),
1182 path1.to_str().unwrap()
1183 );
1184 hello_i += 1;
1185 } else {
1186 assert_eq!(line, format!("goodbye {}", goodbye_i));
1187 assert_eq!(
1188 event.as_log()["file"].to_string_lossy(),
1189 path2.to_str().unwrap()
1190 );
1191 goodbye_i += 1;
1192 }
1193 }
1194 assert_eq!(hello_i, n);
1195 assert_eq!(goodbye_i, n);
1196 }
1197
1198 #[tokio::test]
1200 async fn file_read_empty_lines() {
1201 let n = 5;
1202
1203 let dir = tempdir().unwrap();
1204 let config = file::FileConfig {
1205 include: vec![dir.path().join("*")],
1206 ..test_default_file_config(&dir)
1207 };
1208
1209 let path = dir.path().join("file");
1210
1211 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1212 let mut file = File::create(&path).unwrap();
1213
1214 writeln!(&mut file, "line for checkpointing").unwrap();
1215 for _i in 0..n {
1216 writeln!(&mut file).unwrap();
1217 }
1218 file.flush().unwrap();
1219
1220 sleep_500_millis().await;
1221 })
1222 .await;
1223
1224 assert_eq!(received.len(), n + 1);
1225 }
1226
1227 #[tokio::test]
1228 async fn file_truncate() {
1229 let n = 5;
1230
1231 let dir = tempdir().unwrap();
1232 let config = file::FileConfig {
1233 include: vec![dir.path().join("*")],
1234 ..test_default_file_config(&dir)
1235 };
1236 let path = dir.path().join("file");
1237 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1238 let mut file = File::create(&path).unwrap();
1239
1240 for i in 0..n {
1241 writeln!(&mut file, "pretrunc {i}").unwrap();
1242 }
1243
1244 file.flush().unwrap();
1245 sleep_500_millis().await; file.set_len(0).unwrap();
1248 file.seek(std::io::SeekFrom::Start(0)).unwrap();
1249
1250 file.sync_all().unwrap();
1251 sleep_500_millis().await; for i in 0..n {
1254 writeln!(&mut file, "posttrunc {i}").unwrap();
1255 }
1256
1257 file.flush().unwrap();
1258 sleep_500_millis().await;
1259 })
1260 .await;
1261
1262 let mut i = 0;
1263 let mut pre_trunc = true;
1264
1265 for event in received {
1266 assert_eq!(
1267 event.as_log()["file"].to_string_lossy(),
1268 path.to_str().unwrap()
1269 );
1270
1271 let line =
1272 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1273
1274 if pre_trunc {
1275 assert_eq!(line, format!("pretrunc {}", i));
1276 } else {
1277 assert_eq!(line, format!("posttrunc {}", i));
1278 }
1279
1280 i += 1;
1281 if i == n {
1282 i = 0;
1283 pre_trunc = false;
1284 }
1285 }
1286 }
1287
1288 #[tokio::test]
1289 async fn file_rotate() {
1290 let n = 5;
1291
1292 let dir = tempdir().unwrap();
1293 let config = file::FileConfig {
1294 include: vec![dir.path().join("*")],
1295 ..test_default_file_config(&dir)
1296 };
1297
1298 let path = dir.path().join("file");
1299 let archive_path = dir.path().join("file");
1300 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1301 let mut file = File::create(&path).unwrap();
1302
1303 for i in 0..n {
1304 writeln!(&mut file, "prerot {i}").unwrap();
1305 }
1306
1307 file.flush().unwrap();
1308 sleep_500_millis().await; fs::rename(&path, archive_path).expect("could not rename");
1311 file.sync_all().unwrap();
1312
1313 let mut file = File::create(&path).unwrap();
1314
1315 file.sync_all().unwrap();
1316 sleep_500_millis().await; for i in 0..n {
1319 writeln!(&mut file, "postrot {i}").unwrap();
1320 }
1321
1322 file.flush().unwrap();
1323 sleep_500_millis().await;
1324 })
1325 .await;
1326
1327 let mut i = 0;
1328 let mut pre_rot = true;
1329
1330 for event in received {
1331 assert_eq!(
1332 event.as_log()["file"].to_string_lossy(),
1333 path.to_str().unwrap()
1334 );
1335
1336 let line =
1337 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1338
1339 if pre_rot {
1340 assert_eq!(line, format!("prerot {}", i));
1341 } else {
1342 assert_eq!(line, format!("postrot {}", i));
1343 }
1344
1345 i += 1;
1346 if i == n {
1347 i = 0;
1348 pre_rot = false;
1349 }
1350 }
1351 }
1352
1353 #[tokio::test]
1354 async fn file_multiple_paths() {
1355 let n = 5;
1356
1357 let dir = tempdir().unwrap();
1358 let config = file::FileConfig {
1359 include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1360 exclude: vec![dir.path().join("a.*.txt")],
1361 ..test_default_file_config(&dir)
1362 };
1363
1364 let path1 = dir.path().join("a.txt");
1365 let path2 = dir.path().join("b.txt");
1366 let path3 = dir.path().join("a.log");
1367 let path4 = dir.path().join("a.ignore.txt");
1368 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1369 let mut file1 = File::create(&path1).unwrap();
1370 let mut file2 = File::create(&path2).unwrap();
1371 let mut file3 = File::create(&path3).unwrap();
1372 let mut file4 = File::create(&path4).unwrap();
1373
1374 for i in 0..n {
1375 writeln!(&mut file1, "1 {i}").unwrap();
1376 writeln!(&mut file2, "2 {i}").unwrap();
1377 writeln!(&mut file3, "3 {i}").unwrap();
1378 writeln!(&mut file4, "4 {i}").unwrap();
1379 }
1380 file1.flush().unwrap();
1381 file2.flush().unwrap();
1382 file3.flush().unwrap();
1383 file4.flush().unwrap();
1384
1385 sleep_500_millis().await;
1386 })
1387 .await;
1388
1389 let mut is = [0; 3];
1390
1391 for event in received {
1392 let line =
1393 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1394 let mut split = line.split(' ');
1395 let file = split.next().unwrap().parse::<usize>().unwrap();
1396 assert_ne!(file, 4);
1397 let i = split.next().unwrap().parse::<usize>().unwrap();
1398
1399 assert_eq!(is[file - 1], i);
1400 is[file - 1] += 1;
1401 }
1402
1403 assert_eq!(is, [n as usize; 3]);
1404 }
1405
1406 #[tokio::test]
1407 async fn file_exclude_paths() {
1408 let n = 5;
1409
1410 let dir = tempdir().unwrap();
1411 let config = file::FileConfig {
1412 include: vec![dir.path().join("a//b/*.log.*")],
1413 exclude: vec![dir.path().join("a//b/test.log.*")],
1414 ..test_default_file_config(&dir)
1415 };
1416
1417 let path1 = dir.path().join("a//b/a.log.1");
1418 let path2 = dir.path().join("a//b/test.log.1");
1419 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1420 std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1421 let mut file1 = File::create(&path1).unwrap();
1422 let mut file2 = File::create(&path2).unwrap();
1423
1424 for i in 0..n {
1425 writeln!(&mut file1, "1 {i}").unwrap();
1426 writeln!(&mut file2, "2 {i}").unwrap();
1427 }
1428
1429 file1.flush().unwrap();
1430 file2.flush().unwrap();
1431 sleep_500_millis().await;
1432 })
1433 .await;
1434
1435 let mut is = [0; 1];
1436
1437 for event in received {
1438 let line =
1439 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1440 let mut split = line.split(' ');
1441 let file = split.next().unwrap().parse::<usize>().unwrap();
1442 assert_ne!(file, 4);
1443 let i = split.next().unwrap().parse::<usize>().unwrap();
1444
1445 assert_eq!(is[file - 1], i);
1446 is[file - 1] += 1;
1447 }
1448
1449 assert_eq!(is, [n as usize; 1]);
1450 }
1451
1452 #[tokio::test]
1453 async fn file_key_acknowledged() {
1454 file_key(Acks).await
1455 }
1456
1457 #[tokio::test]
1458 async fn file_key_no_acknowledge() {
1459 file_key(NoAcks).await
1460 }
1461
1462 async fn file_key(acks: AckingMode) {
1463 {
1465 let dir = tempdir().unwrap();
1466 let config = file::FileConfig {
1467 include: vec![dir.path().join("*")],
1468 ..test_default_file_config(&dir)
1469 };
1470
1471 let path = dir.path().join("file");
1472 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1473 let mut file = File::create(&path).unwrap();
1474
1475 writeln!(&mut file, "hello there").unwrap();
1476 file.flush().unwrap();
1477
1478 sleep_500_millis().await;
1479 })
1480 .await;
1481
1482 assert_eq!(received.len(), 1);
1483 assert_eq!(
1484 received[0].as_log()["file"].to_string_lossy(),
1485 path.to_str().unwrap()
1486 );
1487 }
1488
1489 {
1491 let dir = tempdir().unwrap();
1492 let config = file::FileConfig {
1493 include: vec![dir.path().join("*")],
1494 file_key: OptionalValuePath::from(owned_value_path!("source")),
1495 ..test_default_file_config(&dir)
1496 };
1497
1498 let path = dir.path().join("file");
1499 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1500 let mut file = File::create(&path).unwrap();
1501
1502 writeln!(&mut file, "hello there").unwrap();
1503 file.flush().unwrap();
1504
1505 sleep_500_millis().await;
1506 })
1507 .await;
1508
1509 assert_eq!(received.len(), 1);
1510 assert_eq!(
1511 received[0].as_log()["source"].to_string_lossy(),
1512 path.to_str().unwrap()
1513 );
1514 }
1515
1516 {
1518 let dir = tempdir().unwrap();
1519 let config = file::FileConfig {
1520 include: vec![dir.path().join("*")],
1521 ..test_default_file_config(&dir)
1522 };
1523
1524 let path = dir.path().join("file");
1525 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1526 let mut file = File::create(&path).unwrap();
1527
1528 writeln!(&mut file, "hello there").unwrap();
1529
1530 file.flush().unwrap();
1531 sleep_500_millis().await;
1532 })
1533 .await;
1534
1535 assert_eq!(received.len(), 1);
1536 assert_eq!(
1537 received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1538 vec![
1539 default_file_key()
1540 .path
1541 .expect("file key to exist")
1542 .to_string()
1543 .into(),
1544 log_schema().host_key().unwrap().to_string().into(),
1545 log_schema().message_key().unwrap().to_string().into(),
1546 log_schema().timestamp_key().unwrap().to_string().into(),
1547 log_schema().source_type_key().unwrap().to_string().into()
1548 ]
1549 .into_iter()
1550 .collect::<HashSet<_>>()
1551 );
1552 }
1553 }
1554
1555 #[tokio::test]
1556 async fn file_start_position_server_restart_acknowledged() {
1557 file_start_position_server_restart(Acks).await
1558 }
1559
1560 #[tokio::test]
1561 async fn file_start_position_server_restart_no_acknowledge() {
1562 file_start_position_server_restart(NoAcks).await
1563 }
1564
1565 async fn file_start_position_server_restart(acking: AckingMode) {
1566 let dir = tempdir().unwrap();
1567 let config = file::FileConfig {
1568 include: vec![dir.path().join("*")],
1569 ..test_default_file_config(&dir)
1570 };
1571
1572 let path = dir.path().join("file");
1573 let mut file = File::create(&path).unwrap();
1574 writeln!(&mut file, "zeroth line").unwrap();
1575 file.flush().unwrap();
1576
1577 {
1579 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1580 sleep_500_millis().await;
1581 writeln!(&mut file, "first line").unwrap();
1582 file.flush().unwrap();
1583 sleep_500_millis().await;
1584 })
1585 .await;
1586
1587 let lines = extract_messages_string(received);
1588 assert_eq!(lines, vec!["zeroth line", "first line"]);
1589 }
1590 {
1592 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1593 sleep_500_millis().await;
1594 writeln!(&mut file, "second line").unwrap();
1595 file.flush().unwrap();
1596 sleep_500_millis().await;
1597 })
1598 .await;
1599
1600 let lines = extract_messages_string(received);
1601 assert_eq!(lines, vec!["second line"]);
1602 }
1603 {
1605 let config = file::FileConfig {
1606 include: vec![dir.path().join("*")],
1607 ignore_checkpoints: Some(true),
1608 read_from: ReadFromConfig::Beginning,
1609 ..test_default_file_config(&dir)
1610 };
1611 let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1612 sleep_500_millis().await;
1613 writeln!(&mut file, "third line").unwrap();
1614 file.flush().unwrap();
1615 sleep_500_millis().await;
1616 })
1617 .await;
1618
1619 let lines = extract_messages_string(received);
1620 assert_eq!(
1621 lines,
1622 vec!["zeroth line", "first line", "second line", "third line"]
1623 );
1624 }
1625 }
1626
1627 #[tokio::test]
1628 async fn file_start_position_server_restart_unfinalized() {
1629 let dir = tempdir().unwrap();
1630 let config = file::FileConfig {
1631 include: vec![dir.path().join("*")],
1632 ..test_default_file_config(&dir)
1633 };
1634
1635 let path = dir.path().join("file");
1636 let mut file = File::create(&path).unwrap();
1637 writeln!(&mut file, "the line").unwrap();
1638 file.flush().unwrap();
1639
1640 let received = run_file_source(
1642 &config,
1643 false,
1644 Unfinalized,
1645 LogNamespace::Legacy,
1646 sleep(Duration::from_secs(5)),
1647 )
1648 .await;
1649 let lines = extract_messages_string(received);
1650 assert_eq!(lines, vec!["the line"]);
1651
1652 let received = run_file_source(
1654 &config,
1655 false,
1656 Unfinalized,
1657 LogNamespace::Legacy,
1658 sleep(Duration::from_secs(5)),
1659 )
1660 .await;
1661 let lines = extract_messages_string(received);
1662 assert_eq!(lines, vec!["the line"]);
1663 }
1664
1665 #[tokio::test]
1666 async fn file_duplicate_processing_after_restart() {
1667 let dir = tempdir().unwrap();
1668 let config = file::FileConfig {
1669 include: vec![dir.path().join("*")],
1670 ..test_default_file_config(&dir)
1671 };
1672
1673 let path = dir.path().join("file");
1674 let mut file = File::create(&path).unwrap();
1675
1676 let line_count = 4000;
1677 for i in 0..line_count {
1678 writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1679 }
1680 file.flush().unwrap();
1681
1682 let received = run_file_source(
1684 &config,
1685 true,
1686 Acks,
1687 LogNamespace::Legacy,
1688 sleep_500_millis(),
1690 )
1691 .await;
1692 let lines = extract_messages_string(received);
1693
1694 assert!(lines.len() < line_count);
1697
1698 let received = run_file_source(
1700 &config,
1701 true,
1702 Acks,
1703 LogNamespace::Legacy,
1704 sleep(Duration::from_secs(5)),
1705 )
1706 .await;
1707 let lines2 = extract_messages_string(received);
1708
1709 assert_eq!(lines.len() + lines2.len(), line_count);
1711 }
1712
1713 #[tokio::test]
1714 async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1715 file_start_position_server_restart_with_file_rotation(Acks).await
1716 }
1717
1718 #[tokio::test]
1719 async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1720 file_start_position_server_restart_with_file_rotation(NoAcks).await
1721 }
1722
1723 async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1724 let dir = tempdir().unwrap();
1725 let config = file::FileConfig {
1726 include: vec![dir.path().join("*")],
1727 ..test_default_file_config(&dir)
1728 };
1729
1730 let path = dir.path().join("file");
1731 let path_for_old_file = dir.path().join("file.old");
1732 {
1734 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1735 let mut file = File::create(&path).unwrap();
1736 writeln!(&mut file, "first line").unwrap();
1737 file.flush().unwrap();
1738 sleep_500_millis().await;
1739 })
1740 .await;
1741
1742 let lines = extract_messages_string(received);
1743 assert_eq!(lines, vec!["first line"]);
1744 }
1745 fs::rename(&path, &path_for_old_file).expect("could not rename");
1747 {
1750 let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1751 let mut file = File::create(&path).unwrap();
1752 writeln!(&mut file, "second line").unwrap();
1753 file.flush().unwrap();
1754 sleep_500_millis().await;
1755 })
1756 .await;
1757
1758 let lines = extract_messages_string(received);
1759 assert_eq!(lines, vec!["second line"]);
1760 }
1761 }
1762
1763 #[cfg(unix)] #[tokio::test]
1765 async fn file_start_position_ignore_old_files() {
1766 use std::{
1767 os::unix::io::AsRawFd,
1768 time::{Duration, SystemTime},
1769 };
1770
1771 let dir = tempdir().unwrap();
1772 let config = file::FileConfig {
1773 include: vec![dir.path().join("*")],
1774 ignore_older_secs: Some(5),
1775 ..test_default_file_config(&dir)
1776 };
1777
1778 let before_path = dir.path().join("before");
1779 let mut before_file = File::create(&before_path).unwrap();
1780 let after_path = dir.path().join("after");
1781 let mut after_file = File::create(&after_path).unwrap();
1782
1783 writeln!(&mut before_file, "first line").unwrap(); writeln!(&mut after_file, "_first line").unwrap(); {
1787 let before = SystemTime::now() - Duration::from_secs(8);
1789 let after = SystemTime::now() - Duration::from_secs(2);
1790
1791 let before_time = libc::timeval {
1792 tv_sec: before
1793 .duration_since(SystemTime::UNIX_EPOCH)
1794 .unwrap()
1795 .as_secs() as _,
1796 tv_usec: 0,
1797 };
1798 let before_times = [before_time, before_time];
1799
1800 let after_time = libc::timeval {
1801 tv_sec: after
1802 .duration_since(SystemTime::UNIX_EPOCH)
1803 .unwrap()
1804 .as_secs() as _,
1805 tv_usec: 0,
1806 };
1807 let after_times = [after_time, after_time];
1808
1809 unsafe {
1810 libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1811 libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1812 }
1813 }
1814
1815 before_file.sync_all().unwrap();
1816 after_file.sync_all().unwrap();
1817
1818 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1819 sleep_500_millis().await;
1820 writeln!(&mut before_file, "second line").unwrap();
1821 writeln!(&mut after_file, "_second line").unwrap();
1822
1823 before_file.flush().unwrap();
1824 after_file.flush().unwrap();
1825 sleep_500_millis().await;
1826 })
1827 .await;
1828
1829 let before_lines = received
1830 .iter()
1831 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1832 .map(|event| {
1833 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1834 })
1835 .collect::<Vec<_>>();
1836 let after_lines = received
1837 .iter()
1838 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1839 .map(|event| {
1840 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1841 })
1842 .collect::<Vec<_>>();
1843 assert_eq!(before_lines, vec!["second line"]);
1844 assert_eq!(after_lines, vec!["_first line", "_second line"]);
1845 }
1846
1847 #[tokio::test]
1848 async fn file_max_line_bytes() {
1849 let dir = tempdir().unwrap();
1850 let config = file::FileConfig {
1851 include: vec![dir.path().join("*")],
1852 max_line_bytes: 10,
1853 ..test_default_file_config(&dir)
1854 };
1855
1856 let path = dir.path().join("file");
1857 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1858 let mut file = File::create(&path).unwrap();
1859
1860 writeln!(&mut file, "short").unwrap();
1861 writeln!(&mut file, "this is too long").unwrap();
1862 writeln!(&mut file, "11 eleven11").unwrap();
1863 let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1864 writeln!(&mut file, "{super_long}").unwrap();
1865 writeln!(&mut file, "exactly 10").unwrap();
1866 writeln!(&mut file, "it can end on a line that's too long").unwrap();
1867
1868 file.flush().unwrap();
1869 sleep_500_millis().await;
1870 sleep_500_millis().await;
1871
1872 writeln!(&mut file, "and then continue").unwrap();
1873 writeln!(&mut file, "last short").unwrap();
1874 file.flush().unwrap();
1875
1876 sleep_500_millis().await;
1877 sleep_500_millis().await;
1878 }).await;
1879
1880 let received = extract_messages_value(received);
1881
1882 assert_eq!(
1883 received,
1884 vec!["short".into(), "exactly 10".into(), "last short".into()]
1885 );
1886 }
1887
1888 #[tokio::test]
1889 async fn test_multi_line_aggregation_legacy() {
1890 let dir = tempdir().unwrap();
1891 let config = file::FileConfig {
1892 include: vec![dir.path().join("*")],
1893 message_start_indicator: Some("INFO".into()),
1894 multi_line_timeout: 25, ..test_default_file_config(&dir)
1896 };
1897
1898 let path = dir.path().join("file");
1899 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1900 let mut file = File::create(&path).unwrap();
1901
1902 writeln!(&mut file, "leftover foo").unwrap();
1903 writeln!(&mut file, "INFO hello").unwrap();
1904 writeln!(&mut file, "INFO goodbye").unwrap();
1905 writeln!(&mut file, "part of goodbye").unwrap();
1906
1907 file.flush().unwrap();
1908 sleep_500_millis().await;
1909
1910 writeln!(&mut file, "INFO hi again").unwrap();
1911 writeln!(&mut file, "and some more").unwrap();
1912 writeln!(&mut file, "INFO hello").unwrap();
1913
1914 file.flush().unwrap();
1915 sleep_500_millis().await;
1916
1917 writeln!(&mut file, "too slow").unwrap();
1918 writeln!(&mut file, "INFO doesn't have").unwrap();
1919 writeln!(&mut file, "to be INFO in").unwrap();
1920 writeln!(&mut file, "the middle").unwrap();
1921
1922 file.flush().unwrap();
1923 sleep_500_millis().await;
1924 })
1925 .await;
1926
1927 let received = extract_messages_value(received);
1928
1929 assert_eq!(
1930 received,
1931 vec![
1932 "leftover foo".into(),
1933 "INFO hello".into(),
1934 "INFO goodbye\npart of goodbye".into(),
1935 "INFO hi again\nand some more".into(),
1936 "INFO hello".into(),
1937 "too slow".into(),
1938 "INFO doesn't have".into(),
1939 "to be INFO in\nthe middle".into(),
1940 ]
1941 );
1942 }
1943
1944 #[tokio::test]
1945 async fn test_multi_line_aggregation() {
1946 let dir = tempdir().unwrap();
1947 let config = file::FileConfig {
1948 include: vec![dir.path().join("*")],
1949 multiline: Some(MultilineConfig {
1950 start_pattern: "INFO".to_owned(),
1951 condition_pattern: "INFO".to_owned(),
1952 mode: line_agg::Mode::HaltBefore,
1953 timeout_ms: Duration::from_millis(25), }),
1955 ..test_default_file_config(&dir)
1956 };
1957
1958 let path = dir.path().join("file");
1959 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1960 let mut file = File::create(&path).unwrap();
1961
1962 writeln!(&mut file, "leftover foo").unwrap();
1963 writeln!(&mut file, "INFO hello").unwrap();
1964 writeln!(&mut file, "INFO goodbye").unwrap();
1965 writeln!(&mut file, "part of goodbye").unwrap();
1966
1967 file.flush().unwrap();
1968 sleep_500_millis().await;
1969
1970 writeln!(&mut file, "INFO hi again").unwrap();
1971 writeln!(&mut file, "and some more").unwrap();
1972 writeln!(&mut file, "INFO hello").unwrap();
1973
1974 file.flush().unwrap();
1975 sleep_500_millis().await;
1976
1977 writeln!(&mut file, "too slow").unwrap();
1978 writeln!(&mut file, "INFO doesn't have").unwrap();
1979 writeln!(&mut file, "to be INFO in").unwrap();
1980 writeln!(&mut file, "the middle").unwrap();
1981
1982 file.flush().unwrap();
1983 sleep_500_millis().await;
1984 })
1985 .await;
1986
1987 let received = extract_messages_value(received);
1988
1989 assert_eq!(
1990 received,
1991 vec![
1992 "leftover foo".into(),
1993 "INFO hello".into(),
1994 "INFO goodbye\npart of goodbye".into(),
1995 "INFO hi again\nand some more".into(),
1996 "INFO hello".into(),
1997 "too slow".into(),
1998 "INFO doesn't have".into(),
1999 "to be INFO in\nthe middle".into(),
2000 ]
2001 );
2002 }
2003
2004 #[tokio::test]
2005 async fn test_multi_line_checkpointing() {
2006 let dir = tempdir().unwrap();
2007 let config = file::FileConfig {
2008 include: vec![dir.path().join("*")],
2009 offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2010 multiline: Some(MultilineConfig {
2011 start_pattern: "INFO".to_owned(),
2012 condition_pattern: "INFO".to_owned(),
2013 mode: line_agg::Mode::HaltBefore,
2014 timeout_ms: Duration::from_millis(25), }),
2016 ..test_default_file_config(&dir)
2017 };
2018
2019 let path = dir.path().join("file");
2020 let mut file = File::create(&path).unwrap();
2021
2022 writeln!(&mut file, "INFO hello").unwrap();
2023 writeln!(&mut file, "part of hello").unwrap();
2024
2025 file.sync_all().unwrap();
2026
2027 let received = run_file_source(
2029 &config,
2030 false,
2031 Acks,
2032 LogNamespace::Legacy,
2033 sleep_500_millis(),
2034 )
2035 .await;
2036
2037 assert_eq!(received[0].as_log()["offset"], 0.into());
2038
2039 let lines = extract_messages_string(received);
2040 assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2041
2042 let received_after_restart =
2044 run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2045 writeln!(&mut file, "INFO goodbye").unwrap();
2046 file.flush().unwrap();
2047 sleep_500_millis().await;
2048 })
2049 .await;
2050 assert_eq!(
2051 received_after_restart[0].as_log()["offset"],
2052 (lines[0].len() + 1).into()
2053 );
2054 let lines = extract_messages_string(received_after_restart);
2055 assert_eq!(lines, vec!["INFO goodbye"]);
2056 }
2057
2058 #[tokio::test]
2059 async fn test_fair_reads() {
2060 let dir = tempdir().unwrap();
2061 let config = file::FileConfig {
2062 include: vec![dir.path().join("*")],
2063 max_read_bytes: 1,
2064 oldest_first: false,
2065 ..test_default_file_config(&dir)
2066 };
2067
2068 let older_path = dir.path().join("z_older_file");
2069 let mut older = File::create(&older_path).unwrap();
2070
2071 writeln!(&mut older, "hello i am the old file").unwrap();
2072 writeln!(&mut older, "i have been around a while").unwrap();
2073 writeln!(&mut older, "you can read newer files at the same time").unwrap();
2074 older.sync_all().unwrap();
2075
2076 let newer_path = dir.path().join("a_newer_file");
2077 let mut newer = File::create(&newer_path).unwrap();
2078
2079 writeln!(&mut newer, "and i am the new file").unwrap();
2080 writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2081 writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2082 newer.sync_all().unwrap();
2083
2084 let received = run_file_source(
2085 &config,
2086 false,
2087 NoAcks,
2088 LogNamespace::Legacy,
2089 sleep_500_millis(),
2090 )
2091 .await;
2092
2093 let received = extract_messages_value(received);
2094
2095 let old_first = vec![
2096 "hello i am the old file".into(),
2097 "and i am the new file".into(),
2098 "i have been around a while".into(),
2099 "this should be interleaved with the old one".into(),
2100 "you can read newer files at the same time".into(),
2101 "which is fine because we want fairness".into(),
2102 ];
2103 let new_first: Vec<_> = old_first
2104 .chunks(2)
2105 .flat_map(|chunk| chunk.iter().rev().cloned().collect::<Vec<_>>())
2106 .collect();
2107
2108 if received[0] == old_first[0] {
2109 assert_eq!(received, old_first);
2110 } else {
2111 assert_eq!(received, new_first);
2112 }
2113 }
2114
2115 #[tokio::test]
2116 async fn test_oldest_first() {
2117 let dir = tempdir().unwrap();
2118 let config = file::FileConfig {
2119 include: vec![dir.path().join("*")],
2120 max_read_bytes: 1,
2121 oldest_first: true,
2122 ..test_default_file_config(&dir)
2123 };
2124
2125 let older_path = dir.path().join("z_older_file");
2126 let mut older = File::create(&older_path).unwrap();
2127 older.sync_all().unwrap();
2128
2129 sleep_500_millis().await;
2131
2132 let newer_path = dir.path().join("a_newer_file");
2133 let mut newer = File::create(&newer_path).unwrap();
2134 newer.sync_all().unwrap();
2135
2136 writeln!(&mut older, "hello i am the old file").unwrap();
2137 writeln!(&mut older, "i have been around a while").unwrap();
2138 writeln!(&mut older, "you should definitely read all of me first").unwrap();
2139 older.flush().unwrap();
2140
2141 writeln!(&mut newer, "i'm new").unwrap();
2142 writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2143 writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2144 newer.flush().unwrap();
2145
2146 let received = run_file_source(
2147 &config,
2148 false,
2149 NoAcks,
2150 LogNamespace::Legacy,
2151 sleep_500_millis(),
2152 )
2153 .await;
2154
2155 let received = extract_messages_value(received);
2156
2157 assert_eq!(
2158 received,
2159 vec![
2160 "hello i am the old file".into(),
2161 "i have been around a while".into(),
2162 "you should definitely read all of me first".into(),
2163 "i'm new".into(),
2164 "hopefully you read all the old stuff first".into(),
2165 "because otherwise i'm not going to make sense".into(),
2166 ]
2167 );
2168 }
2169
2170 #[tokio::test]
2171 async fn test_split_reads() {
2172 let dir = tempdir().unwrap();
2173 let config = file::FileConfig {
2174 include: vec![dir.path().join("*")],
2175 max_read_bytes: 1,
2176 ..test_default_file_config(&dir)
2177 };
2178
2179 let path = dir.path().join("file");
2180 let mut file = File::create(&path).unwrap();
2181
2182 writeln!(&mut file, "hello i am a normal line").unwrap();
2183 file.sync_all().unwrap();
2184
2185 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2186 sleep_500_millis().await;
2187
2188 write!(&mut file, "i am not a full line").unwrap();
2189
2190 file.flush().unwrap();
2191 sleep_500_millis().await;
2193
2194 writeln!(&mut file, " until now").unwrap();
2195
2196 file.flush().unwrap();
2197 sleep_500_millis().await;
2198 })
2199 .await;
2200
2201 let received = extract_messages_value(received);
2202
2203 assert_eq!(
2204 received,
2205 vec![
2206 "hello i am a normal line".into(),
2207 "i am not a full line until now".into(),
2208 ]
2209 );
2210 }
2211
2212 #[tokio::test]
2213 async fn test_gzipped_file() {
2214 let dir = tempdir().unwrap();
2215 let config = file::FileConfig {
2216 include: vec![PathBuf::from("tests/data/gzipped.log")],
2217 max_line_bytes: 100,
2224 ..test_default_file_config(&dir)
2225 };
2226
2227 let received = run_file_source(
2228 &config,
2229 false,
2230 NoAcks,
2231 LogNamespace::Legacy,
2232 sleep_500_millis(),
2233 )
2234 .await;
2235
2236 let received = extract_messages_value(received);
2237
2238 assert_eq!(
2239 received,
2240 vec![
2241 "this is a simple file".into(),
2242 "i have been compressed".into(),
2243 "in order to make me smaller".into(),
2244 "but you can still read me".into(),
2245 "hooray".into(),
2246 ]
2247 );
2248 }
2249
2250 #[tokio::test]
2251 async fn test_non_utf8_encoded_file() {
2252 let dir = tempdir().unwrap();
2253 let config = file::FileConfig {
2254 include: vec![PathBuf::from("tests/data/utf-16le.log")],
2255 encoding: Some(EncodingConfig { charset: UTF_16LE }),
2256 ..test_default_file_config(&dir)
2257 };
2258
2259 let received = run_file_source(
2260 &config,
2261 false,
2262 NoAcks,
2263 LogNamespace::Legacy,
2264 sleep_500_millis(),
2265 )
2266 .await;
2267
2268 let received = extract_messages_value(received);
2269
2270 assert_eq!(
2271 received,
2272 vec![
2273 "hello i am a file".into(),
2274 "i can unicode".into(),
2275 "but i do so in 16 bits".into(),
2276 "and when i byte".into(),
2277 "i become little-endian".into(),
2278 ]
2279 );
2280 }
2281
2282 #[tokio::test]
2283 async fn test_non_default_line_delimiter() {
2284 let dir = tempdir().unwrap();
2285 let config = file::FileConfig {
2286 include: vec![dir.path().join("*")],
2287 line_delimiter: "\r\n".to_string(),
2288 ..test_default_file_config(&dir)
2289 };
2290
2291 let path = dir.path().join("file");
2292 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2293 let mut file = File::create(&path).unwrap();
2294
2295 write!(&mut file, "hello i am a line\r\n").unwrap();
2296 write!(&mut file, "and i am too\r\n").unwrap();
2297 write!(&mut file, "CRLF is how we end\r\n").unwrap();
2298 write!(&mut file, "please treat us well\r\n").unwrap();
2299
2300 file.flush().unwrap();
2301 sleep_500_millis().await;
2302 })
2303 .await;
2304
2305 let received = extract_messages_value(received);
2306
2307 assert_eq!(
2308 received,
2309 vec![
2310 "hello i am a line".into(),
2311 "and i am too".into(),
2312 "CRLF is how we end".into(),
2313 "please treat us well".into()
2314 ]
2315 );
2316 }
2317
2318 #[tokio::test]
2322 async fn test_multi_char_delimiter_split_across_buffer_boundary() {
2323 let dir = tempdir().unwrap();
2324 let config = file::FileConfig {
2325 include: vec![dir.path().join("*")],
2326 line_delimiter: "\r\n".to_string(),
2327 ..test_default_file_config(&dir)
2328 };
2329
2330 let path = dir.path().join("file");
2331 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2332 let mut file = File::create(&path).unwrap();
2333
2334 sleep_500_millis().await;
2335
2336 let buffer_size = 8192;
2343
2344 let event1_prefix = "Event 1: ";
2346 let padding1_len = buffer_size - event1_prefix.len() - 1; write!(&mut file, "{}", event1_prefix).unwrap();
2348 file.write_all(&vec![b'X'; padding1_len]).unwrap();
2349 write!(&mut file, "\r\n").unwrap(); let event2_prefix = "Event 2: ";
2353 let padding2_len = buffer_size - event2_prefix.len() - 1;
2354 write!(&mut file, "{}", event2_prefix).unwrap();
2355 file.write_all(&vec![b'Y'; padding2_len]).unwrap();
2356 write!(&mut file, "\r\n").unwrap(); write!(&mut file, "Event 3: Final\r\n").unwrap();
2360
2361 sleep_500_millis().await;
2362 })
2363 .await;
2364
2365 let messages = extract_messages_value(received);
2366
2367 assert_eq!(
2369 messages.len(),
2370 3,
2371 "Should receive exactly 3 separate events (bug would merge them)"
2372 );
2373
2374 let msg0 = messages[0].to_string_lossy();
2376 let msg1 = messages[1].to_string_lossy();
2377 let msg2 = messages[2].to_string_lossy();
2378
2379 assert!(
2380 msg0.starts_with("Event 1: "),
2381 "First event should start with 'Event 1: ', got: {}",
2382 msg0
2383 );
2384 assert!(
2385 msg1.starts_with("Event 2: "),
2386 "Second event should start with 'Event 2: ', got: {}",
2387 msg1
2388 );
2389 assert_eq!(msg2, "Event 3: Final");
2390
2391 for (i, msg) in messages.iter().enumerate() {
2393 let msg_str = msg.to_string_lossy();
2394 assert!(
2395 !msg_str.contains('\r'),
2396 "Event {} should not contain embedded \\r",
2397 i
2398 );
2399 assert!(
2400 !msg_str.contains('\n'),
2401 "Event {} should not contain embedded \\n",
2402 i
2403 );
2404 }
2405 }
2406
2407 #[tokio::test]
2408 async fn remove_file() {
2409 let n = 5;
2410 let remove_after_secs = 1;
2411
2412 let dir = tempdir().unwrap();
2413 let config = file::FileConfig {
2414 include: vec![dir.path().join("*")],
2415 remove_after_secs: Some(remove_after_secs),
2416 ..test_default_file_config(&dir)
2417 };
2418
2419 let path = dir.path().join("file");
2420 let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2421 let mut file = File::create(&path).unwrap();
2422
2423 for i in 0..n {
2424 writeln!(&mut file, "{i}").unwrap();
2425 }
2426 file.flush().unwrap();
2427 drop(file);
2428
2429 for _ in 0..10 {
2430 sleep(Duration::from_secs(remove_after_secs + 1)).await;
2432
2433 if File::open(&path).is_err() {
2434 break;
2435 }
2436 }
2437 })
2438 .await;
2439
2440 assert_eq!(received.len(), n);
2441
2442 match File::open(&path) {
2443 Ok(_) => panic!("File wasn't removed"),
2444 Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2445 }
2446 }
2447
2448 #[derive(Clone, Copy, Eq, PartialEq)]
2449 enum AckingMode {
2450 NoAcks, Unfinalized, Acks, }
2454 use AckingMode::*;
2455 use vector_lib::lookup::OwnedTargetPath;
2456
2457 async fn run_file_source(
2458 config: &FileConfig,
2459 wait_shutdown: bool,
2460 acking_mode: AckingMode,
2461 log_namespace: LogNamespace,
2462 inner: impl Future<Output = ()>,
2463 ) -> Vec<Event> {
2464 assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2465 let (tx, rx) = if acking_mode == Acks {
2466 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2467 (tx, rx.boxed())
2468 } else {
2469 let (tx, rx) = SourceSender::new_test();
2470 (tx, rx.boxed())
2471 };
2472
2473 let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2474 let data_dir = config.data_dir.clone().unwrap();
2475 let acks = !matches!(acking_mode, NoAcks);
2476
2477 tokio::spawn(file::file_source(
2478 config,
2479 data_dir,
2480 shutdown,
2481 tx,
2482 acks,
2483 log_namespace,
2484 ));
2485
2486 inner.await;
2487
2488 drop(trigger_shutdown);
2489
2490 let result = if acking_mode == Unfinalized {
2491 rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2492 .collect::<Vec<_>>()
2493 .await
2494 } else {
2495 timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2496 .await
2497 .expect(
2498 "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2499 )
2500 };
2501 if wait_shutdown {
2502 shutdown_done.await;
2503 }
2504
2505 result
2506 })
2507 .await
2508 }
2509
2510 fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2511 received
2512 .into_iter()
2513 .map(Event::into_log)
2514 .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2515 .collect()
2516 }
2517
2518 fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2519 received
2520 .into_iter()
2521 .map(Event::into_log)
2522 .map(|log| log.get_message().unwrap().clone())
2523 .collect()
2524 }
2525}