1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 codecs::{BytesDeserializer, BytesDeserializerConfig},
14 config::{LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 file_source::{
17 file_server::{FileServer, Line, calculate_ignore_before},
18 paths_provider::{Glob, MatchOptions},
19 },
20 file_source_common::{
21 Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22 },
23 finalizer::OrderedFinalizer,
24 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30 SourceSender,
31 config::{
32 DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33 log_schema,
34 },
35 encoding_transcode::{Decoder, Encoder},
36 event::{BatchNotifier, BatchStatus, LogEvent},
37 internal_events::{
38 FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39 FileSourceInternalEventsEmitter, StreamClosedError,
40 },
41 line_agg::{self, LineAgg},
42 serde::bool_or_struct,
43 shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48 #[snafu(display(
49 "message_start_indicator {:?} is not a valid regex: {}",
50 indicator,
51 source
52 ))]
53 InvalidMessageStartIndicator {
54 indicator: String,
55 source: regex::Error,
56 },
57}
58
59#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65 #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67 pub include: Vec<PathBuf>,
68
69 #[serde(default)]
75 #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76 pub exclude: Vec<PathBuf>,
77
78 #[serde(default = "default_file_key")]
84 #[configurable(metadata(docs::examples = "path"))]
85 pub file_key: OptionalValuePath,
86
87 #[configurable(
89 deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90 )]
91 #[configurable(metadata(docs::hidden))]
92 #[serde(default)]
93 pub start_at_beginning: Option<bool>,
94
95 #[serde(default)]
99 pub ignore_checkpoints: Option<bool>,
100
101 #[serde(default = "default_read_from")]
102 #[configurable(derived)]
103 pub read_from: ReadFromConfig,
104
105 #[serde(alias = "ignore_older", default)]
107 #[configurable(metadata(docs::type_unit = "seconds"))]
108 #[configurable(metadata(docs::examples = 600))]
109 #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110 pub ignore_older_secs: Option<u64>,
111
112 #[serde(default = "default_max_line_bytes")]
116 #[configurable(metadata(docs::type_unit = "bytes"))]
117 pub max_line_bytes: usize,
118
119 #[configurable(metadata(docs::examples = "hostname"))]
127 pub host_key: Option<OptionalValuePath>,
128
129 #[serde(default)]
138 #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139 #[configurable(metadata(docs::human_name = "Data Directory"))]
140 pub data_dir: Option<PathBuf>,
141
142 #[serde(default)]
148 #[configurable(metadata(docs::examples = "offset"))]
149 pub offset_key: Option<OptionalValuePath>,
150
151 #[serde(
157 alias = "glob_minimum_cooldown",
158 default = "default_glob_minimum_cooldown_ms"
159 )]
160 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161 #[configurable(metadata(docs::type_unit = "milliseconds"))]
162 #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163 pub glob_minimum_cooldown_ms: Duration,
164
165 #[configurable(derived)]
166 #[serde(alias = "fingerprinting", default)]
167 fingerprint: FingerprintConfig,
168
169 #[serde(default)]
173 pub ignore_not_found: bool,
174
175 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177 #[configurable(metadata(docs::hidden))]
178 #[serde(default)]
179 pub message_start_indicator: Option<String>,
180
181 #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183 #[configurable(metadata(docs::hidden))]
184 #[serde(default = "default_multi_line_timeout")]
185 pub multi_line_timeout: u64,
186
187 #[configurable(derived)]
191 #[serde(default)]
192 pub multiline: Option<MultilineConfig>,
193
194 #[serde(default = "default_max_read_bytes")]
200 #[configurable(metadata(docs::type_unit = "bytes"))]
201 pub max_read_bytes: usize,
202
203 #[serde(default)]
205 pub oldest_first: bool,
206
207 #[serde(alias = "remove_after", default)]
211 #[configurable(metadata(docs::type_unit = "seconds"))]
212 #[configurable(metadata(docs::examples = 0))]
213 #[configurable(metadata(docs::examples = 5))]
214 #[configurable(metadata(docs::examples = 60))]
215 #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216 pub remove_after_secs: Option<u64>,
217
218 #[serde(default = "default_line_delimiter")]
220 #[configurable(metadata(docs::examples = "\r\n"))]
221 pub line_delimiter: String,
222
223 #[configurable(derived)]
224 #[serde(default)]
225 pub encoding: Option<EncodingConfig>,
226
227 #[configurable(derived)]
228 #[serde(default, deserialize_with = "bool_or_struct")]
229 acknowledgements: SourceAcknowledgementsConfig,
230
231 #[configurable(metadata(docs::hidden))]
233 #[serde(default)]
234 log_namespace: Option<bool>,
235
236 #[configurable(derived)]
237 #[serde(default)]
238 internal_metrics: FileInternalMetricsConfig,
239
240 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243 #[configurable(metadata(docs::type_unit = "seconds"))]
244 #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245 pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249 bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253 OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257 ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261 Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265 1000
266} const fn default_max_read_bytes() -> usize {
269 2048
270}
271
272fn default_line_delimiter() -> String {
273 "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277 Duration::from_secs(u64::MAX / 2)
278}
279
280#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287 docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290 Checksum {
292 #[serde(default = "default_ignored_header_bytes")]
298 #[configurable(metadata(docs::type_unit = "bytes"))]
299 ignored_header_bytes: usize,
300
301 #[serde(default = "default_lines")]
308 #[configurable(metadata(docs::type_unit = "lines"))]
309 lines: usize,
310 },
311
312 #[serde(rename = "device_and_inode")]
316 DevInode,
317}
318
319impl Default for FingerprintConfig {
320 fn default() -> Self {
321 Self::Checksum {
322 ignored_header_bytes: 0,
323 lines: default_lines(),
324 }
325 }
326}
327
328const fn default_ignored_header_bytes() -> usize {
329 0
330}
331
332const fn default_lines() -> usize {
333 1
334}
335
336impl From<FingerprintConfig> for FingerprintStrategy {
337 fn from(config: FingerprintConfig) -> FingerprintStrategy {
338 match config {
339 FingerprintConfig::Checksum {
340 ignored_header_bytes,
341 lines,
342 } => FingerprintStrategy::FirstLinesChecksum {
343 ignored_header_bytes,
344 lines,
345 },
346 FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
347 }
348 }
349}
350
351#[derive(Debug)]
352pub(crate) struct FinalizerEntry {
353 pub(crate) file_id: FileFingerprint,
354 pub(crate) offset: u64,
355}
356
357impl Default for FileConfig {
358 fn default() -> Self {
359 Self {
360 include: vec![PathBuf::from("/var/log/**/*.log")],
361 exclude: vec![],
362 file_key: default_file_key(),
363 start_at_beginning: None,
364 ignore_checkpoints: None,
365 read_from: default_read_from(),
366 ignore_older_secs: None,
367 max_line_bytes: default_max_line_bytes(),
368 fingerprint: FingerprintConfig::default(),
369 ignore_not_found: false,
370 host_key: None,
371 offset_key: None,
372 data_dir: None,
373 glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
374 message_start_indicator: None,
375 multi_line_timeout: default_multi_line_timeout(), multiline: None,
377 max_read_bytes: default_max_read_bytes(),
378 oldest_first: false,
379 remove_after_secs: None,
380 line_delimiter: default_line_delimiter(),
381 encoding: None,
382 acknowledgements: Default::default(),
383 log_namespace: None,
384 internal_metrics: Default::default(),
385 rotate_wait: default_rotate_wait(),
386 }
387 }
388}
389
390impl_generate_config_from_default!(FileConfig);
391
392#[async_trait::async_trait]
393#[typetag::serde(name = "file")]
394impl SourceConfig for FileConfig {
395 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
396 let data_dir = cx
401 .globals
402 .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
404
405 #[allow(clippy::suspicious_else_formatting)]
407 {
408 if let Some(ref config) = self.multiline {
409 let _: line_agg::Config = config.try_into()?;
410 }
411
412 if let Some(ref indicator) = self.message_start_indicator {
413 Regex::new(indicator)
414 .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
415 }
416 }
417
418 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
419
420 let log_namespace = cx.log_namespace(self.log_namespace);
421
422 Ok(file_source(
423 self,
424 data_dir,
425 cx.shutdown,
426 cx.out,
427 acknowledgements,
428 log_namespace,
429 ))
430 }
431
432 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
433 let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
434 let host_key = self
435 .host_key
436 .clone()
437 .unwrap_or(log_schema().host_key().cloned().into())
438 .path
439 .map(LegacyKey::Overwrite);
440
441 let offset_key = self
442 .offset_key
443 .clone()
444 .and_then(|k| k.path)
445 .map(LegacyKey::Overwrite);
446
447 let schema_definition = BytesDeserializerConfig
448 .schema_definition(global_log_namespace.merge(self.log_namespace))
449 .with_standard_vector_source_metadata()
450 .with_source_metadata(
451 Self::NAME,
452 host_key,
453 &owned_value_path!("host"),
454 Kind::bytes().or_undefined(),
455 Some("host"),
456 )
457 .with_source_metadata(
458 Self::NAME,
459 offset_key,
460 &owned_value_path!("offset"),
461 Kind::integer(),
462 None,
463 )
464 .with_source_metadata(
465 Self::NAME,
466 file_key,
467 &owned_value_path!("path"),
468 Kind::bytes(),
469 None,
470 );
471
472 vec![SourceOutput::new_maybe_logs(
473 DataType::Log,
474 schema_definition,
475 )]
476 }
477
478 fn can_acknowledge(&self) -> bool {
479 true
480 }
481}
482
483pub fn file_source(
484 config: &FileConfig,
485 data_dir: PathBuf,
486 shutdown: ShutdownSignal,
487 mut out: SourceSender,
488 acknowledgements: bool,
489 log_namespace: LogNamespace,
490) -> super::Source {
491 if config.include.is_empty() {
493 error!(
494 message = "`include` configuration option must contain at least one file pattern.",
495 internal_log_rate_limit = false
496 );
497 return Box::pin(future::ready(Err(())));
498 }
499
500 let exclude_patterns = config
501 .exclude
502 .iter()
503 .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
504 .collect::<Vec<PathBuf>>();
505 let ignore_before = calculate_ignore_before(config.ignore_older_secs);
506 let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
507 let (ignore_checkpoints, read_from) = reconcile_position_options(
508 config.start_at_beginning,
509 config.ignore_checkpoints,
510 Some(config.read_from),
511 );
512
513 let emitter = FileSourceInternalEventsEmitter {
514 include_file_metric_tag: config.internal_metrics.include_file_tag,
515 };
516
517 let paths_provider = Glob::new(
518 &config.include,
519 &exclude_patterns,
520 MatchOptions::default(),
521 emitter.clone(),
522 )
523 .expect("invalid glob patterns");
524
525 let encoding_charset = config.encoding.clone().map(|e| e.charset);
526
527 let line_delimiter_as_bytes = match encoding_charset {
530 Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
531 None => Bytes::from(config.line_delimiter.clone()),
532 };
533
534 let checkpointer = Checkpointer::new(&data_dir);
535 let strategy = config.fingerprint.clone().into();
536
537 let file_server = FileServer {
538 paths_provider,
539 max_read_bytes: config.max_read_bytes,
540 ignore_checkpoints,
541 read_from,
542 ignore_before,
543 max_line_bytes: config.max_line_bytes,
544 line_delimiter: line_delimiter_as_bytes,
545 data_dir,
546 glob_minimum_cooldown,
547 fingerprinter: Fingerprinter::new(strategy, config.max_line_bytes, config.ignore_not_found),
548 oldest_first: config.oldest_first,
549 remove_after: config.remove_after_secs.map(Duration::from_secs),
550 emitter,
551 rotate_wait: config.rotate_wait,
552 };
553
554 let event_metadata = EventMetadata {
555 host_key: config
556 .host_key
557 .clone()
558 .unwrap_or(log_schema().host_key().cloned().into())
559 .path,
560 hostname: crate::get_hostname().ok(),
561 file_key: config.file_key.clone().path,
562 offset_key: config.offset_key.clone().and_then(|k| k.path),
563 };
564
565 let include = config.include.clone();
566 let exclude = config.exclude.clone();
567 let multiline_config = config.multiline.clone();
568 let message_start_indicator = config.message_start_indicator.clone();
569 let multi_line_timeout = config.multi_line_timeout;
570
571 let (finalizer, shutdown_checkpointer) = if acknowledgements {
572 let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
576
577 let (send_shutdown, shutdown2) = oneshot::channel::<()>();
582 let checkpoints = checkpointer.view();
583 tokio::spawn(async move {
584 while let Some((status, entry)) = ack_stream.next().await {
585 if status == BatchStatus::Delivered {
586 checkpoints.update(entry.file_id, entry.offset);
587 }
588 }
589 send_shutdown.send(())
590 });
591 (Some(finalizer), shutdown2.map(|_| ()).boxed())
592 } else {
593 (None, shutdown.clone().map(|_| ()).boxed())
596 };
597
598 let checkpoints = checkpointer.view();
599 let include_file_metric_tag = config.internal_metrics.include_file_tag;
600 Box::pin(async move {
601 info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
602
603 let mut encoding_decoder = encoding_charset.map(Decoder::new);
604
605 let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
607 let rx = rx
608 .map(futures::stream::iter)
609 .flatten()
610 .map(move |mut line| {
611 emit!(FileBytesReceived {
612 byte_size: line.text.len(),
613 file: &line.filename,
614 include_file_metric_tag,
615 });
616 line.text = match encoding_decoder.as_mut() {
618 Some(d) => d.decode_to_utf8(line.text),
619 None => line.text,
620 };
621 line
622 });
623
624 let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
625 if let Some(ref multiline_config) = multiline_config {
626 wrap_with_line_agg(
627 rx,
628 multiline_config.try_into().unwrap(), )
630 } else if let Some(msi) = message_start_indicator {
631 wrap_with_line_agg(
632 rx,
633 line_agg::Config::for_legacy(
634 Regex::new(&msi).unwrap(), multi_line_timeout,
636 ),
637 )
638 } else {
639 Box::new(rx)
640 };
641
642 let span = Span::current();
645 let mut messages = messages.map(move |line| {
646 let mut event = create_event(
647 line.text,
648 line.start_offset,
649 &line.filename,
650 &event_metadata,
651 log_namespace,
652 include_file_metric_tag,
653 );
654
655 if let Some(finalizer) = &finalizer {
656 let (batch, receiver) = BatchNotifier::new_with_receiver();
657 event = event.with_batch_notifier(&batch);
658 let entry = FinalizerEntry {
659 file_id: line.file_id,
660 offset: line.end_offset,
661 };
662 finalizer.add(entry, receiver);
664 } else {
665 checkpoints.update(line.file_id, line.end_offset);
666 }
667 event
668 });
669 tokio::spawn(async move {
670 match out
671 .send_event_stream(&mut messages)
672 .instrument(span.or_current())
673 .await
674 {
675 Ok(()) => {
676 debug!("Finished sending.");
677 }
678 Err(_) => {
679 let (count, _) = messages.size_hint();
680 emit!(StreamClosedError { count });
681 }
682 }
683 });
684
685 let span = info_span!("file_server");
686 tokio::task::spawn_blocking(move || {
687 let _enter = span.enter();
688 let rt = tokio::runtime::Handle::current();
689 let result =
690 rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
691 emit!(FileOpen { count: 0 });
692 result.expect("file server exited with an error");
696 })
697 .map_err(|error| error!(message="File server unexpectedly stopped.", %error, internal_log_rate_limit = false))
698 .await
699 })
700}
701
702fn reconcile_position_options(
705 start_at_beginning: Option<bool>,
706 ignore_checkpoints: Option<bool>,
707 read_from: Option<ReadFromConfig>,
708) -> (bool, ReadFrom) {
709 if start_at_beginning.is_some() {
710 warn!(
711 message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
712 )
713 }
714
715 match start_at_beginning {
716 Some(true) => (
717 ignore_checkpoints.unwrap_or(true),
718 read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
719 ),
720 _ => (
721 ignore_checkpoints.unwrap_or(false),
722 read_from.map(Into::into).unwrap_or_default(),
723 ),
724 }
725}
726
727fn wrap_with_line_agg(
728 rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
729 config: line_agg::Config,
730) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
731 let logic = line_agg::Logic::new(config);
732 Box::new(
733 LineAgg::new(
734 rx.map(|line| {
735 (
736 line.filename,
737 line.text,
738 (line.file_id, line.start_offset, line.end_offset),
739 )
740 }),
741 logic,
742 )
743 .map(
744 |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
745 text,
746 filename,
747 file_id,
748 start_offset,
749 end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
750 lastline_end_offset
751 }),
752 },
753 ),
754 )
755}
756
757struct EventMetadata {
758 host_key: Option<OwnedValuePath>,
759 hostname: Option<String>,
760 file_key: Option<OwnedValuePath>,
761 offset_key: Option<OwnedValuePath>,
762}
763
764fn create_event(
765 line: Bytes,
766 offset: u64,
767 file: &str,
768 meta: &EventMetadata,
769 log_namespace: LogNamespace,
770 include_file_metric_tag: bool,
771) -> LogEvent {
772 let deserializer = BytesDeserializer;
773 let mut event = deserializer.parse_single(line, log_namespace);
774
775 log_namespace.insert_vector_metadata(
776 &mut event,
777 log_schema().source_type_key(),
778 path!("source_type"),
779 Bytes::from_static(FileConfig::NAME.as_bytes()),
780 );
781 log_namespace.insert_vector_metadata(
782 &mut event,
783 log_schema().timestamp_key(),
784 path!("ingest_timestamp"),
785 Utc::now(),
786 );
787
788 let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
789 if let Some(hostname) = &meta.hostname {
791 log_namespace.insert_source_metadata(
792 FileConfig::NAME,
793 &mut event,
794 legacy_host_key,
795 path!("host"),
796 hostname.clone(),
797 );
798 }
799
800 let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
801 log_namespace.insert_source_metadata(
802 FileConfig::NAME,
803 &mut event,
804 legacy_offset_key,
805 path!("offset"),
806 offset,
807 );
808
809 let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
810 log_namespace.insert_source_metadata(
811 FileConfig::NAME,
812 &mut event,
813 legacy_file_key,
814 path!("path"),
815 file,
816 );
817
818 emit!(FileEventsReceived {
819 count: 1,
820 file,
821 byte_size: event.estimated_json_encoded_size_of(),
822 include_file_metric_tag,
823 });
824
825 event
826}
827
828#[cfg(test)]
829mod tests {
830 use std::{
831 collections::HashSet,
832 fs::{self, File},
833 future::Future,
834 io::{Seek, Write},
835 };
836
837 use encoding_rs::UTF_16LE;
838 use similar_asserts::assert_eq;
839 use tempfile::tempdir;
840 use tokio::time::{Duration, sleep, timeout};
841 use vector_lib::schema::Definition;
842 use vrl::{value, value::kind::Collection};
843
844 use super::*;
845 use crate::{
846 config::Config,
847 event::{Event, EventStatus, Value},
848 shutdown::ShutdownSignal,
849 sources::file,
850 test_util::components::{FILE_SOURCE_TAGS, assert_source_compliance},
851 };
852
853 #[test]
854 fn generate_config() {
855 crate::test_util::test_generate_config::<FileConfig>();
856 }
857
858 fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
859 file::FileConfig {
860 fingerprint: FingerprintConfig::Checksum {
861 ignored_header_bytes: 0,
862 lines: 1,
863 },
864 data_dir: Some(dir.path().to_path_buf()),
865 glob_minimum_cooldown_ms: Duration::from_millis(100),
866 internal_metrics: FileInternalMetricsConfig {
867 include_file_tag: true,
868 },
869 ..Default::default()
870 }
871 }
872
873 async fn sleep_500_millis() {
874 sleep(Duration::from_millis(500)).await;
875 }
876
877 #[test]
878 fn parse_config() {
879 let config: FileConfig = toml::from_str(
880 r#"
881 include = [ "/var/log/**/*.log" ]
882 file_key = "file"
883 glob_minimum_cooldown_ms = 1000
884 multi_line_timeout = 1000
885 max_read_bytes = 2048
886 line_delimiter = "\n"
887 "#,
888 )
889 .unwrap();
890 assert_eq!(config, FileConfig::default());
891 assert_eq!(
892 config.fingerprint,
893 FingerprintConfig::Checksum {
894 ignored_header_bytes: 0,
895 lines: 1
896 }
897 );
898
899 let config: FileConfig = toml::from_str(
900 r#"
901 include = [ "/var/log/**/*.log" ]
902 [fingerprint]
903 strategy = "device_and_inode"
904 "#,
905 )
906 .unwrap();
907 assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
908
909 let config: FileConfig = toml::from_str(
910 r#"
911 include = [ "/var/log/**/*.log" ]
912 [fingerprint]
913 strategy = "checksum"
914 bytes = 128
915 ignored_header_bytes = 512
916 "#,
917 )
918 .unwrap();
919 assert_eq!(
920 config.fingerprint,
921 FingerprintConfig::Checksum {
922 ignored_header_bytes: 512,
923 lines: 1
924 }
925 );
926
927 let config: FileConfig = toml::from_str(
928 r#"
929 include = [ "/var/log/**/*.log" ]
930 [encoding]
931 charset = "utf-16le"
932 "#,
933 )
934 .unwrap();
935 assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
936
937 let config: FileConfig = toml::from_str(
938 r#"
939 include = [ "/var/log/**/*.log" ]
940 read_from = "beginning"
941 "#,
942 )
943 .unwrap();
944 assert_eq!(config.read_from, ReadFromConfig::Beginning);
945
946 let config: FileConfig = toml::from_str(
947 r#"
948 include = [ "/var/log/**/*.log" ]
949 read_from = "end"
950 "#,
951 )
952 .unwrap();
953 assert_eq!(config.read_from, ReadFromConfig::End);
954 }
955
956 #[test]
957 fn resolve_data_dir() {
958 let global_dir = tempdir().unwrap();
959 let local_dir = tempdir().unwrap();
960
961 let mut config = Config::default();
962 config.global.data_dir = global_dir.keep().into();
963
964 let res = config
966 .global
967 .resolve_and_validate_data_dir(test_default_file_config(&local_dir).data_dir.as_ref())
968 .unwrap();
969 assert_eq!(res, local_dir.path());
970
971 let res = config.global.resolve_and_validate_data_dir(None).unwrap();
973 assert_eq!(res, config.global.data_dir.unwrap());
974 }
975
976 #[test]
977 fn output_schema_definition_vector_namespace() {
978 let definitions = FileConfig::default()
979 .outputs(LogNamespace::Vector)
980 .remove(0)
981 .schema_definition(true);
982
983 assert_eq!(
984 definitions,
985 Some(
986 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
987 .with_meaning(OwnedTargetPath::event_root(), "message")
988 .with_metadata_field(
989 &owned_value_path!("vector", "source_type"),
990 Kind::bytes(),
991 None
992 )
993 .with_metadata_field(
994 &owned_value_path!("vector", "ingest_timestamp"),
995 Kind::timestamp(),
996 None
997 )
998 .with_metadata_field(
999 &owned_value_path!("file", "host"),
1000 Kind::bytes().or_undefined(),
1001 Some("host")
1002 )
1003 .with_metadata_field(
1004 &owned_value_path!("file", "offset"),
1005 Kind::integer(),
1006 None
1007 )
1008 .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1009 )
1010 )
1011 }
1012
1013 #[test]
1014 fn output_schema_definition_legacy_namespace() {
1015 let definitions = FileConfig::default()
1016 .outputs(LogNamespace::Legacy)
1017 .remove(0)
1018 .schema_definition(true);
1019
1020 assert_eq!(
1021 definitions,
1022 Some(
1023 Definition::new_with_default_metadata(
1024 Kind::object(Collection::empty()),
1025 [LogNamespace::Legacy]
1026 )
1027 .with_event_field(
1028 &owned_value_path!("message"),
1029 Kind::bytes(),
1030 Some("message")
1031 )
1032 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1033 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1034 .with_event_field(
1035 &owned_value_path!("host"),
1036 Kind::bytes().or_undefined(),
1037 Some("host")
1038 )
1039 .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1040 .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1041 )
1042 )
1043 }
1044
1045 #[test]
1046 fn create_event_legacy_namespace() {
1047 let line = Bytes::from("hello world");
1048 let file = "some_file.rs";
1049 let offset: u64 = 0;
1050
1051 let meta = EventMetadata {
1052 host_key: Some(owned_value_path!("host")),
1053 hostname: Some("Some.Machine".to_string()),
1054 file_key: Some(owned_value_path!("file")),
1055 offset_key: Some(owned_value_path!("offset")),
1056 };
1057 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1058
1059 assert_eq!(log["file"], "some_file.rs".into());
1060 assert_eq!(log["host"], "Some.Machine".into());
1061 assert_eq!(log["offset"], 0.into());
1062 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1063 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1064 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1065 }
1066
1067 #[test]
1068 fn create_event_custom_fields_legacy_namespace() {
1069 let line = Bytes::from("hello world");
1070 let file = "some_file.rs";
1071 let offset: u64 = 0;
1072
1073 let meta = EventMetadata {
1074 host_key: Some(owned_value_path!("hostname")),
1075 hostname: Some("Some.Machine".to_string()),
1076 file_key: Some(owned_value_path!("file_path")),
1077 offset_key: Some(owned_value_path!("off")),
1078 };
1079 let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1080
1081 assert_eq!(log["file_path"], "some_file.rs".into());
1082 assert_eq!(log["hostname"], "Some.Machine".into());
1083 assert_eq!(log["off"], 0.into());
1084 assert_eq!(*log.get_message().unwrap(), "hello world".into());
1085 assert_eq!(*log.get_source_type().unwrap(), "file".into());
1086 assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1087 }
1088
1089 #[test]
1090 fn create_event_vector_namespace() {
1091 let line = Bytes::from("hello world");
1092 let file = "some_file.rs";
1093 let offset: u64 = 0;
1094
1095 let meta = EventMetadata {
1096 host_key: Some(owned_value_path!("ignored")),
1097 hostname: Some("Some.Machine".to_string()),
1098 file_key: Some(owned_value_path!("ignored")),
1099 offset_key: Some(owned_value_path!("ignored")),
1100 };
1101 let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1102
1103 assert_eq!(log.value(), &value!("hello world"));
1104
1105 assert_eq!(
1106 log.metadata()
1107 .value()
1108 .get(path!("vector", "source_type"))
1109 .unwrap(),
1110 &value!("file")
1111 );
1112 assert!(
1113 log.metadata()
1114 .value()
1115 .get(path!("vector", "ingest_timestamp"))
1116 .unwrap()
1117 .is_timestamp()
1118 );
1119
1120 assert_eq!(
1121 log.metadata()
1122 .value()
1123 .get(path!(FileConfig::NAME, "host"))
1124 .unwrap(),
1125 &value!("Some.Machine")
1126 );
1127 assert_eq!(
1128 log.metadata()
1129 .value()
1130 .get(path!(FileConfig::NAME, "offset"))
1131 .unwrap(),
1132 &value!(0)
1133 );
1134 assert_eq!(
1135 log.metadata()
1136 .value()
1137 .get(path!(FileConfig::NAME, "path"))
1138 .unwrap(),
1139 &value!("some_file.rs")
1140 );
1141 }
1142
1143 #[tokio::test]
1144 async fn file_happy_path() {
1145 let n = 5;
1146
1147 let dir = tempdir().unwrap();
1148 let config = file::FileConfig {
1149 include: vec![dir.path().join("*")],
1150 ..test_default_file_config(&dir)
1151 };
1152
1153 let path1 = dir.path().join("file1");
1154 let path2 = dir.path().join("file2");
1155
1156 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1157 let mut file1 = File::create(&path1).unwrap();
1158 let mut file2 = File::create(&path2).unwrap();
1159
1160 sleep_500_millis().await; for i in 0..n {
1163 writeln!(&mut file1, "hello {i}").unwrap();
1164 writeln!(&mut file2, "goodbye {i}").unwrap();
1165 }
1166
1167 sleep_500_millis().await;
1168 })
1169 .await;
1170
1171 let mut hello_i = 0;
1172 let mut goodbye_i = 0;
1173
1174 for event in received {
1175 let line =
1176 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1177 if line.starts_with("hello") {
1178 assert_eq!(line, format!("hello {}", hello_i));
1179 assert_eq!(
1180 event.as_log()["file"].to_string_lossy(),
1181 path1.to_str().unwrap()
1182 );
1183 hello_i += 1;
1184 } else {
1185 assert_eq!(line, format!("goodbye {}", goodbye_i));
1186 assert_eq!(
1187 event.as_log()["file"].to_string_lossy(),
1188 path2.to_str().unwrap()
1189 );
1190 goodbye_i += 1;
1191 }
1192 }
1193 assert_eq!(hello_i, n);
1194 assert_eq!(goodbye_i, n);
1195 }
1196
1197 #[tokio::test]
1199 async fn file_read_empty_lines() {
1200 let n = 5;
1201
1202 let dir = tempdir().unwrap();
1203 let config = file::FileConfig {
1204 include: vec![dir.path().join("*")],
1205 ..test_default_file_config(&dir)
1206 };
1207
1208 let path = dir.path().join("file");
1209
1210 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1211 let mut file = File::create(&path).unwrap();
1212
1213 sleep_500_millis().await; writeln!(&mut file, "line for checkpointing").unwrap();
1216 for _i in 0..n {
1217 writeln!(&mut file).unwrap();
1218 }
1219
1220 sleep_500_millis().await;
1221 })
1222 .await;
1223
1224 assert_eq!(received.len(), n + 1);
1225 }
1226
1227 #[tokio::test]
1228 async fn file_truncate() {
1229 let n = 5;
1230
1231 let dir = tempdir().unwrap();
1232 let config = file::FileConfig {
1233 include: vec![dir.path().join("*")],
1234 ..test_default_file_config(&dir)
1235 };
1236 let path = dir.path().join("file");
1237 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1238 let mut file = File::create(&path).unwrap();
1239
1240 sleep_500_millis().await; for i in 0..n {
1243 writeln!(&mut file, "pretrunc {i}").unwrap();
1244 }
1245
1246 sleep_500_millis().await; file.set_len(0).unwrap();
1249 file.seek(std::io::SeekFrom::Start(0)).unwrap();
1250
1251 sleep_500_millis().await; for i in 0..n {
1254 writeln!(&mut file, "posttrunc {i}").unwrap();
1255 }
1256
1257 sleep_500_millis().await;
1258 })
1259 .await;
1260
1261 let mut i = 0;
1262 let mut pre_trunc = true;
1263
1264 for event in received {
1265 assert_eq!(
1266 event.as_log()["file"].to_string_lossy(),
1267 path.to_str().unwrap()
1268 );
1269
1270 let line =
1271 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1272
1273 if pre_trunc {
1274 assert_eq!(line, format!("pretrunc {}", i));
1275 } else {
1276 assert_eq!(line, format!("posttrunc {}", i));
1277 }
1278
1279 i += 1;
1280 if i == n {
1281 i = 0;
1282 pre_trunc = false;
1283 }
1284 }
1285 }
1286
1287 #[tokio::test]
1288 async fn file_rotate() {
1289 let n = 5;
1290
1291 let dir = tempdir().unwrap();
1292 let config = file::FileConfig {
1293 include: vec![dir.path().join("*")],
1294 ..test_default_file_config(&dir)
1295 };
1296
1297 let path = dir.path().join("file");
1298 let archive_path = dir.path().join("file");
1299 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1300 let mut file = File::create(&path).unwrap();
1301
1302 sleep_500_millis().await; for i in 0..n {
1305 writeln!(&mut file, "prerot {i}").unwrap();
1306 }
1307
1308 sleep_500_millis().await; fs::rename(&path, archive_path).expect("could not rename");
1311 let mut file = File::create(&path).unwrap();
1312
1313 sleep_500_millis().await; for i in 0..n {
1316 writeln!(&mut file, "postrot {i}").unwrap();
1317 }
1318
1319 sleep_500_millis().await;
1320 })
1321 .await;
1322
1323 let mut i = 0;
1324 let mut pre_rot = true;
1325
1326 for event in received {
1327 assert_eq!(
1328 event.as_log()["file"].to_string_lossy(),
1329 path.to_str().unwrap()
1330 );
1331
1332 let line =
1333 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1334
1335 if pre_rot {
1336 assert_eq!(line, format!("prerot {}", i));
1337 } else {
1338 assert_eq!(line, format!("postrot {}", i));
1339 }
1340
1341 i += 1;
1342 if i == n {
1343 i = 0;
1344 pre_rot = false;
1345 }
1346 }
1347 }
1348
1349 #[tokio::test]
1350 async fn file_multiple_paths() {
1351 let n = 5;
1352
1353 let dir = tempdir().unwrap();
1354 let config = file::FileConfig {
1355 include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1356 exclude: vec![dir.path().join("a.*.txt")],
1357 ..test_default_file_config(&dir)
1358 };
1359
1360 let path1 = dir.path().join("a.txt");
1361 let path2 = dir.path().join("b.txt");
1362 let path3 = dir.path().join("a.log");
1363 let path4 = dir.path().join("a.ignore.txt");
1364 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1365 let mut file1 = File::create(&path1).unwrap();
1366 let mut file2 = File::create(&path2).unwrap();
1367 let mut file3 = File::create(&path3).unwrap();
1368 let mut file4 = File::create(&path4).unwrap();
1369
1370 sleep_500_millis().await; for i in 0..n {
1373 writeln!(&mut file1, "1 {i}").unwrap();
1374 writeln!(&mut file2, "2 {i}").unwrap();
1375 writeln!(&mut file3, "3 {i}").unwrap();
1376 writeln!(&mut file4, "4 {i}").unwrap();
1377 }
1378
1379 sleep_500_millis().await;
1380 })
1381 .await;
1382
1383 let mut is = [0; 3];
1384
1385 for event in received {
1386 let line =
1387 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1388 let mut split = line.split(' ');
1389 let file = split.next().unwrap().parse::<usize>().unwrap();
1390 assert_ne!(file, 4);
1391 let i = split.next().unwrap().parse::<usize>().unwrap();
1392
1393 assert_eq!(is[file - 1], i);
1394 is[file - 1] += 1;
1395 }
1396
1397 assert_eq!(is, [n as usize; 3]);
1398 }
1399
1400 #[tokio::test]
1401 async fn file_exclude_paths() {
1402 let n = 5;
1403
1404 let dir = tempdir().unwrap();
1405 let config = file::FileConfig {
1406 include: vec![dir.path().join("a//b/*.log.*")],
1407 exclude: vec![dir.path().join("a//b/test.log.*")],
1408 ..test_default_file_config(&dir)
1409 };
1410
1411 let path1 = dir.path().join("a//b/a.log.1");
1412 let path2 = dir.path().join("a//b/test.log.1");
1413 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1414 std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1415 let mut file1 = File::create(&path1).unwrap();
1416 let mut file2 = File::create(&path2).unwrap();
1417
1418 sleep_500_millis().await; for i in 0..n {
1421 writeln!(&mut file1, "1 {i}").unwrap();
1422 writeln!(&mut file2, "2 {i}").unwrap();
1423 }
1424
1425 sleep_500_millis().await;
1426 })
1427 .await;
1428
1429 let mut is = [0; 1];
1430
1431 for event in received {
1432 let line =
1433 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1434 let mut split = line.split(' ');
1435 let file = split.next().unwrap().parse::<usize>().unwrap();
1436 assert_ne!(file, 4);
1437 let i = split.next().unwrap().parse::<usize>().unwrap();
1438
1439 assert_eq!(is[file - 1], i);
1440 is[file - 1] += 1;
1441 }
1442
1443 assert_eq!(is, [n as usize; 1]);
1444 }
1445
1446 #[tokio::test]
1447 async fn file_key_acknowledged() {
1448 file_key(Acks).await
1449 }
1450
1451 #[tokio::test]
1452 async fn file_key_no_acknowledge() {
1453 file_key(NoAcks).await
1454 }
1455
1456 async fn file_key(acks: AckingMode) {
1457 {
1459 let dir = tempdir().unwrap();
1460 let config = file::FileConfig {
1461 include: vec![dir.path().join("*")],
1462 ..test_default_file_config(&dir)
1463 };
1464
1465 let path = dir.path().join("file");
1466 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1467 let mut file = File::create(&path).unwrap();
1468
1469 sleep_500_millis().await;
1470
1471 writeln!(&mut file, "hello there").unwrap();
1472
1473 sleep_500_millis().await;
1474 })
1475 .await;
1476
1477 assert_eq!(received.len(), 1);
1478 assert_eq!(
1479 received[0].as_log()["file"].to_string_lossy(),
1480 path.to_str().unwrap()
1481 );
1482 }
1483
1484 {
1486 let dir = tempdir().unwrap();
1487 let config = file::FileConfig {
1488 include: vec![dir.path().join("*")],
1489 file_key: OptionalValuePath::from(owned_value_path!("source")),
1490 ..test_default_file_config(&dir)
1491 };
1492
1493 let path = dir.path().join("file");
1494 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1495 let mut file = File::create(&path).unwrap();
1496
1497 sleep_500_millis().await;
1498
1499 writeln!(&mut file, "hello there").unwrap();
1500
1501 sleep_500_millis().await;
1502 })
1503 .await;
1504
1505 assert_eq!(received.len(), 1);
1506 assert_eq!(
1507 received[0].as_log()["source"].to_string_lossy(),
1508 path.to_str().unwrap()
1509 );
1510 }
1511
1512 {
1514 let dir = tempdir().unwrap();
1515 let config = file::FileConfig {
1516 include: vec![dir.path().join("*")],
1517 ..test_default_file_config(&dir)
1518 };
1519
1520 let path = dir.path().join("file");
1521 let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1522 let mut file = File::create(&path).unwrap();
1523
1524 sleep_500_millis().await;
1525
1526 writeln!(&mut file, "hello there").unwrap();
1527
1528 sleep_500_millis().await;
1529 })
1530 .await;
1531
1532 assert_eq!(received.len(), 1);
1533 assert_eq!(
1534 received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1535 vec![
1536 default_file_key()
1537 .path
1538 .expect("file key to exist")
1539 .to_string()
1540 .into(),
1541 log_schema().host_key().unwrap().to_string().into(),
1542 log_schema().message_key().unwrap().to_string().into(),
1543 log_schema().timestamp_key().unwrap().to_string().into(),
1544 log_schema().source_type_key().unwrap().to_string().into()
1545 ]
1546 .into_iter()
1547 .collect::<HashSet<_>>()
1548 );
1549 }
1550 }
1551
1552 #[cfg(target_os = "linux")] #[tokio::test]
1554 async fn file_start_position_server_restart_acknowledged() {
1555 file_start_position_server_restart(Acks).await
1556 }
1557
1558 #[cfg(target_os = "linux")] #[tokio::test]
1560 async fn file_start_position_server_restart_no_acknowledge() {
1561 file_start_position_server_restart(NoAcks).await
1562 }
1563
1564 #[cfg(target_os = "linux")] async fn file_start_position_server_restart(acking: AckingMode) {
1566 let dir = tempdir().unwrap();
1567 let config = file::FileConfig {
1568 include: vec![dir.path().join("*")],
1569 ..test_default_file_config(&dir)
1570 };
1571
1572 let path = dir.path().join("file");
1573 let mut file = File::create(&path).unwrap();
1574 writeln!(&mut file, "zeroth line").unwrap();
1575 sleep_500_millis().await;
1576
1577 {
1579 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1580 sleep_500_millis().await;
1581 writeln!(&mut file, "first line").unwrap();
1582 sleep_500_millis().await;
1583 })
1584 .await;
1585
1586 let lines = extract_messages_string(received);
1587 assert_eq!(lines, vec!["zeroth line", "first line"]);
1588 }
1589 {
1591 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1592 sleep_500_millis().await;
1593 writeln!(&mut file, "second line").unwrap();
1594 sleep_500_millis().await;
1595 })
1596 .await;
1597
1598 let lines = extract_messages_string(received);
1599 assert_eq!(lines, vec!["second line"]);
1600 }
1601 {
1603 let config = file::FileConfig {
1604 include: vec![dir.path().join("*")],
1605 ignore_checkpoints: Some(true),
1606 read_from: ReadFromConfig::Beginning,
1607 ..test_default_file_config(&dir)
1608 };
1609 let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1610 sleep_500_millis().await;
1611 writeln!(&mut file, "third line").unwrap();
1612 sleep_500_millis().await;
1613 })
1614 .await;
1615
1616 let lines = extract_messages_string(received);
1617 assert_eq!(
1618 lines,
1619 vec!["zeroth line", "first line", "second line", "third line"]
1620 );
1621 }
1622 }
1623
1624 #[tokio::test]
1625 async fn file_start_position_server_restart_unfinalized() {
1626 let dir = tempdir().unwrap();
1627 let config = file::FileConfig {
1628 include: vec![dir.path().join("*")],
1629 ..test_default_file_config(&dir)
1630 };
1631
1632 let path = dir.path().join("file");
1633 let mut file = File::create(&path).unwrap();
1634 writeln!(&mut file, "the line").unwrap();
1635 file.flush().unwrap();
1636
1637 let received = run_file_source(
1639 &config,
1640 false,
1641 Unfinalized,
1642 LogNamespace::Legacy,
1643 sleep(Duration::from_secs(5)),
1644 )
1645 .await;
1646 let lines = extract_messages_string(received);
1647 assert_eq!(lines, vec!["the line"]);
1648
1649 let received = run_file_source(
1651 &config,
1652 false,
1653 Unfinalized,
1654 LogNamespace::Legacy,
1655 sleep(Duration::from_secs(5)),
1656 )
1657 .await;
1658 let lines = extract_messages_string(received);
1659 assert_eq!(lines, vec!["the line"]);
1660 }
1661
1662 #[tokio::test]
1663 async fn file_duplicate_processing_after_restart() {
1664 let dir = tempdir().unwrap();
1665 let config = file::FileConfig {
1666 include: vec![dir.path().join("*")],
1667 ..test_default_file_config(&dir)
1668 };
1669
1670 let path = dir.path().join("file");
1671 let mut file = File::create(&path).unwrap();
1672
1673 let line_count = 4000;
1674 for i in 0..line_count {
1675 writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1676 }
1677 file.flush().unwrap();
1678 sleep_500_millis().await;
1679
1680 let received = run_file_source(
1682 &config,
1683 true,
1684 Acks,
1685 LogNamespace::Legacy,
1686 sleep_500_millis(),
1688 )
1689 .await;
1690 let lines = extract_messages_string(received);
1691
1692 assert!(lines.len() < line_count);
1695
1696 let received = run_file_source(
1698 &config,
1699 true,
1700 Acks,
1701 LogNamespace::Legacy,
1702 sleep(Duration::from_secs(5)),
1703 )
1704 .await;
1705 let lines2 = extract_messages_string(received);
1706
1707 assert_eq!(lines.len() + lines2.len(), line_count);
1709 }
1710
1711 #[tokio::test]
1712 async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1713 file_start_position_server_restart_with_file_rotation(Acks).await
1714 }
1715
1716 #[tokio::test]
1717 async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1718 file_start_position_server_restart_with_file_rotation(NoAcks).await
1719 }
1720
1721 async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1722 let dir = tempdir().unwrap();
1723 let config = file::FileConfig {
1724 include: vec![dir.path().join("*")],
1725 ..test_default_file_config(&dir)
1726 };
1727
1728 let path = dir.path().join("file");
1729 let path_for_old_file = dir.path().join("file.old");
1730 {
1732 let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1733 let mut file = File::create(&path).unwrap();
1734 sleep_500_millis().await;
1735 writeln!(&mut file, "first line").unwrap();
1736 sleep_500_millis().await;
1737 })
1738 .await;
1739
1740 let lines = extract_messages_string(received);
1741 assert_eq!(lines, vec!["first line"]);
1742 }
1743 fs::rename(&path, &path_for_old_file).expect("could not rename");
1745 {
1748 let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1749 let mut file = File::create(&path).unwrap();
1750 sleep_500_millis().await;
1751 writeln!(&mut file, "second line").unwrap();
1752 sleep_500_millis().await;
1753 })
1754 .await;
1755
1756 let lines = extract_messages_string(received);
1757 assert_eq!(lines, vec!["second line"]);
1758 }
1759 }
1760
1761 #[cfg(unix)] #[tokio::test]
1763 async fn file_start_position_ignore_old_files() {
1764 use std::{
1765 os::unix::io::AsRawFd,
1766 time::{Duration, SystemTime},
1767 };
1768
1769 let dir = tempdir().unwrap();
1770 let config = file::FileConfig {
1771 include: vec![dir.path().join("*")],
1772 ignore_older_secs: Some(5),
1773 ..test_default_file_config(&dir)
1774 };
1775
1776 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1777 let before_path = dir.path().join("before");
1778 let mut before_file = File::create(&before_path).unwrap();
1779 let after_path = dir.path().join("after");
1780 let mut after_file = File::create(&after_path).unwrap();
1781
1782 writeln!(&mut before_file, "first line").unwrap(); writeln!(&mut after_file, "_first line").unwrap(); {
1786 let before = SystemTime::now() - Duration::from_secs(8);
1788 let after = SystemTime::now() - Duration::from_secs(2);
1789
1790 let before_time = libc::timeval {
1791 tv_sec: before
1792 .duration_since(SystemTime::UNIX_EPOCH)
1793 .unwrap()
1794 .as_secs() as _,
1795 tv_usec: 0,
1796 };
1797 let before_times = [before_time, before_time];
1798
1799 let after_time = libc::timeval {
1800 tv_sec: after
1801 .duration_since(SystemTime::UNIX_EPOCH)
1802 .unwrap()
1803 .as_secs() as _,
1804 tv_usec: 0,
1805 };
1806 let after_times = [after_time, after_time];
1807
1808 unsafe {
1809 libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1810 libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1811 }
1812 }
1813
1814 sleep_500_millis().await;
1815 writeln!(&mut before_file, "second line").unwrap();
1816 writeln!(&mut after_file, "_second line").unwrap();
1817
1818 sleep_500_millis().await;
1819 })
1820 .await;
1821
1822 let before_lines = received
1823 .iter()
1824 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1825 .map(|event| {
1826 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1827 })
1828 .collect::<Vec<_>>();
1829 let after_lines = received
1830 .iter()
1831 .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1832 .map(|event| {
1833 event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1834 })
1835 .collect::<Vec<_>>();
1836 assert_eq!(before_lines, vec!["second line"]);
1837 assert_eq!(after_lines, vec!["_first line", "_second line"]);
1838 }
1839
1840 #[tokio::test]
1841 async fn file_max_line_bytes() {
1842 let dir = tempdir().unwrap();
1843 let config = file::FileConfig {
1844 include: vec![dir.path().join("*")],
1845 max_line_bytes: 10,
1846 ..test_default_file_config(&dir)
1847 };
1848
1849 let path = dir.path().join("file");
1850 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1851 let mut file = File::create(&path).unwrap();
1852
1853 sleep_500_millis().await; writeln!(&mut file, "short").unwrap();
1856 writeln!(&mut file, "this is too long").unwrap();
1857 writeln!(&mut file, "11 eleven11").unwrap();
1858 let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1859 writeln!(&mut file, "{super_long}").unwrap();
1860 writeln!(&mut file, "exactly 10").unwrap();
1861 writeln!(&mut file, "it can end on a line that's too long").unwrap();
1862
1863 sleep_500_millis().await;
1864 sleep_500_millis().await;
1865
1866 writeln!(&mut file, "and then continue").unwrap();
1867 writeln!(&mut file, "last short").unwrap();
1868
1869 sleep_500_millis().await;
1870 sleep_500_millis().await;
1871 }).await;
1872
1873 let received = extract_messages_value(received);
1874
1875 assert_eq!(
1876 received,
1877 vec!["short".into(), "exactly 10".into(), "last short".into()]
1878 );
1879 }
1880
1881 #[tokio::test]
1882 async fn test_multi_line_aggregation_legacy() {
1883 let dir = tempdir().unwrap();
1884 let config = file::FileConfig {
1885 include: vec![dir.path().join("*")],
1886 message_start_indicator: Some("INFO".into()),
1887 multi_line_timeout: 25, ..test_default_file_config(&dir)
1889 };
1890
1891 let path = dir.path().join("file");
1892 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1893 let mut file = File::create(&path).unwrap();
1894
1895 sleep_500_millis().await; writeln!(&mut file, "leftover foo").unwrap();
1898 writeln!(&mut file, "INFO hello").unwrap();
1899 writeln!(&mut file, "INFO goodbye").unwrap();
1900 writeln!(&mut file, "part of goodbye").unwrap();
1901
1902 sleep_500_millis().await;
1903
1904 writeln!(&mut file, "INFO hi again").unwrap();
1905 writeln!(&mut file, "and some more").unwrap();
1906 writeln!(&mut file, "INFO hello").unwrap();
1907
1908 sleep_500_millis().await;
1909
1910 writeln!(&mut file, "too slow").unwrap();
1911 writeln!(&mut file, "INFO doesn't have").unwrap();
1912 writeln!(&mut file, "to be INFO in").unwrap();
1913 writeln!(&mut file, "the middle").unwrap();
1914
1915 sleep_500_millis().await;
1916 })
1917 .await;
1918
1919 let received = extract_messages_value(received);
1920
1921 assert_eq!(
1922 received,
1923 vec![
1924 "leftover foo".into(),
1925 "INFO hello".into(),
1926 "INFO goodbye\npart of goodbye".into(),
1927 "INFO hi again\nand some more".into(),
1928 "INFO hello".into(),
1929 "too slow".into(),
1930 "INFO doesn't have".into(),
1931 "to be INFO in\nthe middle".into(),
1932 ]
1933 );
1934 }
1935
1936 #[tokio::test]
1937 async fn test_multi_line_aggregation() {
1938 let dir = tempdir().unwrap();
1939 let config = file::FileConfig {
1940 include: vec![dir.path().join("*")],
1941 multiline: Some(MultilineConfig {
1942 start_pattern: "INFO".to_owned(),
1943 condition_pattern: "INFO".to_owned(),
1944 mode: line_agg::Mode::HaltBefore,
1945 timeout_ms: Duration::from_millis(25), }),
1947 ..test_default_file_config(&dir)
1948 };
1949
1950 let path = dir.path().join("file");
1951 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1952 let mut file = File::create(&path).unwrap();
1953
1954 sleep_500_millis().await; writeln!(&mut file, "leftover foo").unwrap();
1957 writeln!(&mut file, "INFO hello").unwrap();
1958 writeln!(&mut file, "INFO goodbye").unwrap();
1959 writeln!(&mut file, "part of goodbye").unwrap();
1960
1961 sleep_500_millis().await;
1962
1963 writeln!(&mut file, "INFO hi again").unwrap();
1964 writeln!(&mut file, "and some more").unwrap();
1965 writeln!(&mut file, "INFO hello").unwrap();
1966
1967 sleep_500_millis().await;
1968
1969 writeln!(&mut file, "too slow").unwrap();
1970 writeln!(&mut file, "INFO doesn't have").unwrap();
1971 writeln!(&mut file, "to be INFO in").unwrap();
1972 writeln!(&mut file, "the middle").unwrap();
1973
1974 sleep_500_millis().await;
1975 })
1976 .await;
1977
1978 let received = extract_messages_value(received);
1979
1980 assert_eq!(
1981 received,
1982 vec![
1983 "leftover foo".into(),
1984 "INFO hello".into(),
1985 "INFO goodbye\npart of goodbye".into(),
1986 "INFO hi again\nand some more".into(),
1987 "INFO hello".into(),
1988 "too slow".into(),
1989 "INFO doesn't have".into(),
1990 "to be INFO in\nthe middle".into(),
1991 ]
1992 );
1993 }
1994
1995 #[tokio::test]
1996 async fn test_multi_line_checkpointing() {
1997 let dir = tempdir().unwrap();
1998 let config = file::FileConfig {
1999 include: vec![dir.path().join("*")],
2000 offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2001 multiline: Some(MultilineConfig {
2002 start_pattern: "INFO".to_owned(),
2003 condition_pattern: "INFO".to_owned(),
2004 mode: line_agg::Mode::HaltBefore,
2005 timeout_ms: Duration::from_millis(25), }),
2007 ..test_default_file_config(&dir)
2008 };
2009
2010 let path = dir.path().join("file");
2011 let mut file = File::create(&path).unwrap();
2012
2013 writeln!(&mut file, "INFO hello").unwrap();
2014 writeln!(&mut file, "part of hello").unwrap();
2015
2016 let received = run_file_source(
2018 &config,
2019 false,
2020 Acks,
2021 LogNamespace::Legacy,
2022 sleep_500_millis(),
2023 )
2024 .await;
2025
2026 assert_eq!(received[0].as_log()["offset"], 0.into());
2027
2028 let lines = extract_messages_string(received);
2029 assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2030
2031 let received_after_restart =
2033 run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2034 writeln!(&mut file, "INFO goodbye").unwrap();
2035 })
2036 .await;
2037 assert_eq!(
2038 received_after_restart[0].as_log()["offset"],
2039 (lines[0].len() + 1).into()
2040 );
2041 let lines = extract_messages_string(received_after_restart);
2042 assert_eq!(lines, vec!["INFO goodbye"]);
2043 }
2044
2045 #[tokio::test]
2046 async fn test_fair_reads() {
2047 let dir = tempdir().unwrap();
2048 let config = file::FileConfig {
2049 include: vec![dir.path().join("*")],
2050 max_read_bytes: 1,
2051 oldest_first: false,
2052 ..test_default_file_config(&dir)
2053 };
2054
2055 let older_path = dir.path().join("z_older_file");
2056 let mut older = File::create(&older_path).unwrap();
2057
2058 sleep_500_millis().await;
2059
2060 let newer_path = dir.path().join("a_newer_file");
2061 let mut newer = File::create(&newer_path).unwrap();
2062
2063 writeln!(&mut older, "hello i am the old file").unwrap();
2064 writeln!(&mut older, "i have been around a while").unwrap();
2065 writeln!(&mut older, "you can read newer files at the same time").unwrap();
2066
2067 writeln!(&mut newer, "and i am the new file").unwrap();
2068 writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2069 writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2070
2071 sleep_500_millis().await;
2072
2073 let received = run_file_source(
2074 &config,
2075 false,
2076 NoAcks,
2077 LogNamespace::Legacy,
2078 sleep_500_millis(),
2079 )
2080 .await;
2081
2082 let received = extract_messages_value(received);
2083
2084 assert_eq!(
2085 received,
2086 vec![
2087 "hello i am the old file".into(),
2088 "and i am the new file".into(),
2089 "i have been around a while".into(),
2090 "this should be interleaved with the old one".into(),
2091 "you can read newer files at the same time".into(),
2092 "which is fine because we want fairness".into(),
2093 ]
2094 );
2095 }
2096
2097 #[tokio::test]
2098 async fn test_oldest_first() {
2099 let dir = tempdir().unwrap();
2100 let config = file::FileConfig {
2101 include: vec![dir.path().join("*")],
2102 max_read_bytes: 1,
2103 oldest_first: true,
2104 ..test_default_file_config(&dir)
2105 };
2106
2107 let older_path = dir.path().join("z_older_file");
2108 let mut older = File::create(&older_path).unwrap();
2109
2110 sleep_500_millis().await;
2111
2112 let newer_path = dir.path().join("a_newer_file");
2113 let mut newer = File::create(&newer_path).unwrap();
2114
2115 writeln!(&mut older, "hello i am the old file").unwrap();
2116 writeln!(&mut older, "i have been around a while").unwrap();
2117 writeln!(&mut older, "you should definitely read all of me first").unwrap();
2118
2119 writeln!(&mut newer, "i'm new").unwrap();
2120 writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2121 writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2122
2123 sleep_500_millis().await;
2124
2125 let received = run_file_source(
2126 &config,
2127 false,
2128 NoAcks,
2129 LogNamespace::Legacy,
2130 sleep_500_millis(),
2131 )
2132 .await;
2133
2134 let received = extract_messages_value(received);
2135
2136 assert_eq!(
2137 received,
2138 vec![
2139 "hello i am the old file".into(),
2140 "i have been around a while".into(),
2141 "you should definitely read all of me first".into(),
2142 "i'm new".into(),
2143 "hopefully you read all the old stuff first".into(),
2144 "because otherwise i'm not going to make sense".into(),
2145 ]
2146 );
2147 }
2148
2149 #[cfg(not(target_os = "macos"))]
2151 #[tokio::test]
2152 async fn test_split_reads() {
2153 let dir = tempdir().unwrap();
2154 let config = file::FileConfig {
2155 include: vec![dir.path().join("*")],
2156 max_read_bytes: 1,
2157 ..test_default_file_config(&dir)
2158 };
2159
2160 let path = dir.path().join("file");
2161 let mut file = File::create(&path).unwrap();
2162
2163 writeln!(&mut file, "hello i am a normal line").unwrap();
2164
2165 sleep_500_millis().await;
2166
2167 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2168 sleep_500_millis().await;
2169
2170 write!(&mut file, "i am not a full line").unwrap();
2171
2172 sleep_500_millis().await;
2174
2175 writeln!(&mut file, " until now").unwrap();
2176
2177 sleep_500_millis().await;
2178 })
2179 .await;
2180
2181 let received = extract_messages_value(received);
2182
2183 assert_eq!(
2184 received,
2185 vec![
2186 "hello i am a normal line".into(),
2187 "i am not a full line until now".into(),
2188 ]
2189 );
2190 }
2191
2192 #[tokio::test]
2193 async fn test_gzipped_file() {
2194 let dir = tempdir().unwrap();
2195 let config = file::FileConfig {
2196 include: vec![PathBuf::from("tests/data/gzipped.log")],
2197 max_line_bytes: 100,
2204 ..test_default_file_config(&dir)
2205 };
2206
2207 let received = run_file_source(
2208 &config,
2209 false,
2210 NoAcks,
2211 LogNamespace::Legacy,
2212 sleep_500_millis(),
2213 )
2214 .await;
2215
2216 let received = extract_messages_value(received);
2217
2218 assert_eq!(
2219 received,
2220 vec![
2221 "this is a simple file".into(),
2222 "i have been compressed".into(),
2223 "in order to make me smaller".into(),
2224 "but you can still read me".into(),
2225 "hooray".into(),
2226 ]
2227 );
2228 }
2229
2230 #[tokio::test]
2231 async fn test_non_utf8_encoded_file() {
2232 let dir = tempdir().unwrap();
2233 let config = file::FileConfig {
2234 include: vec![PathBuf::from("tests/data/utf-16le.log")],
2235 encoding: Some(EncodingConfig { charset: UTF_16LE }),
2236 ..test_default_file_config(&dir)
2237 };
2238
2239 let received = run_file_source(
2240 &config,
2241 false,
2242 NoAcks,
2243 LogNamespace::Legacy,
2244 sleep_500_millis(),
2245 )
2246 .await;
2247
2248 let received = extract_messages_value(received);
2249
2250 assert_eq!(
2251 received,
2252 vec![
2253 "hello i am a file".into(),
2254 "i can unicode".into(),
2255 "but i do so in 16 bits".into(),
2256 "and when i byte".into(),
2257 "i become little-endian".into(),
2258 ]
2259 );
2260 }
2261
2262 #[tokio::test]
2263 async fn test_non_default_line_delimiter() {
2264 let dir = tempdir().unwrap();
2265 let config = file::FileConfig {
2266 include: vec![dir.path().join("*")],
2267 line_delimiter: "\r\n".to_string(),
2268 ..test_default_file_config(&dir)
2269 };
2270
2271 let path = dir.path().join("file");
2272 let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2273 let mut file = File::create(&path).unwrap();
2274
2275 sleep_500_millis().await; write!(&mut file, "hello i am a line\r\n").unwrap();
2278 write!(&mut file, "and i am too\r\n").unwrap();
2279 write!(&mut file, "CRLF is how we end\r\n").unwrap();
2280 write!(&mut file, "please treat us well\r\n").unwrap();
2281
2282 sleep_500_millis().await;
2283 })
2284 .await;
2285
2286 let received = extract_messages_value(received);
2287
2288 assert_eq!(
2289 received,
2290 vec![
2291 "hello i am a line".into(),
2292 "and i am too".into(),
2293 "CRLF is how we end".into(),
2294 "please treat us well".into()
2295 ]
2296 );
2297 }
2298
2299 #[tokio::test]
2300 async fn remove_file() {
2301 let n = 5;
2302 let remove_after_secs = 1;
2303
2304 let dir = tempdir().unwrap();
2305 let config = file::FileConfig {
2306 include: vec![dir.path().join("*")],
2307 remove_after_secs: Some(remove_after_secs),
2308 ..test_default_file_config(&dir)
2309 };
2310
2311 let path = dir.path().join("file");
2312 let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2313 let mut file = File::create(&path).unwrap();
2314
2315 sleep_500_millis().await; for i in 0..n {
2318 writeln!(&mut file, "{i}").unwrap();
2319 }
2320 drop(file);
2321
2322 for _ in 0..10 {
2323 sleep(Duration::from_secs(remove_after_secs + 1)).await;
2325
2326 if File::open(&path).is_err() {
2327 break;
2328 }
2329 }
2330 })
2331 .await;
2332
2333 assert_eq!(received.len(), n);
2334
2335 match File::open(&path) {
2336 Ok(_) => panic!("File wasn't removed"),
2337 Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2338 }
2339 }
2340
2341 #[derive(Clone, Copy, Eq, PartialEq)]
2342 enum AckingMode {
2343 NoAcks, Unfinalized, Acks, }
2347 use AckingMode::*;
2348 use vector_lib::lookup::OwnedTargetPath;
2349
2350 async fn run_file_source(
2351 config: &FileConfig,
2352 wait_shutdown: bool,
2353 acking_mode: AckingMode,
2354 log_namespace: LogNamespace,
2355 inner: impl Future<Output = ()>,
2356 ) -> Vec<Event> {
2357 assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2358 let (tx, rx) = if acking_mode == Acks {
2359 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2360 (tx, rx.boxed())
2361 } else {
2362 let (tx, rx) = SourceSender::new_test();
2363 (tx, rx.boxed())
2364 };
2365
2366 let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2367 let data_dir = config.data_dir.clone().unwrap();
2368 let acks = !matches!(acking_mode, NoAcks);
2369
2370 tokio::spawn(file::file_source(
2371 config,
2372 data_dir,
2373 shutdown,
2374 tx,
2375 acks,
2376 log_namespace,
2377 ));
2378
2379 inner.await;
2380
2381 drop(trigger_shutdown);
2382
2383 let result = if acking_mode == Unfinalized {
2384 rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2385 .collect::<Vec<_>>()
2386 .await
2387 } else {
2388 timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2389 .await
2390 .expect(
2391 "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2392 )
2393 };
2394 if wait_shutdown {
2395 shutdown_done.await;
2396 }
2397
2398 result
2399 })
2400 .await
2401 }
2402
2403 fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2404 received
2405 .into_iter()
2406 .map(Event::into_log)
2407 .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2408 .collect()
2409 }
2410
2411 fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2412 received
2413 .into_iter()
2414 .map(Event::into_log)
2415 .map(|log| log.get_message().unwrap().clone())
2416 .collect()
2417 }
2418}