vector/sources/
file.rs

1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::{BytesDeserializer, BytesDeserializerConfig},
14    config::{LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    file_source::{
17        file_server::{FileServer, Line, calculate_ignore_before},
18        paths_provider::{Glob, MatchOptions},
19    },
20    file_source_common::{
21        Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22    },
23    finalizer::OrderedFinalizer,
24    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30    SourceSender,
31    config::{
32        DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33        log_schema,
34    },
35    encoding_transcode::{Decoder, Encoder},
36    event::{BatchNotifier, BatchStatus, LogEvent},
37    internal_events::{
38        FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39        FileSourceInternalEventsEmitter, StreamClosedError,
40    },
41    line_agg::{self, LineAgg},
42    serde::bool_or_struct,
43    shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48    #[snafu(display(
49        "message_start_indicator {:?} is not a valid regex: {}",
50        indicator,
51        source
52    ))]
53    InvalidMessageStartIndicator {
54        indicator: String,
55        source: regex::Error,
56    },
57}
58
59/// Configuration for the `file` source.
60#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65    /// Array of file patterns to include. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
66    #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67    pub include: Vec<PathBuf>,
68
69    /// Array of file patterns to exclude. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
70    ///
71    /// Takes precedence over the `include` option. Note: The `exclude` patterns are applied _after_ the attempt to glob everything
72    /// in `include`. This means that all files are first matched by `include` and then filtered by the `exclude`
73    /// patterns. This can be impactful if `include` contains directories with contents that are not accessible.
74    #[serde(default)]
75    #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76    pub exclude: Vec<PathBuf>,
77
78    /// Overrides the name of the log field used to add the file path to each event.
79    ///
80    /// The value is the full path to the file where the event was read message.
81    ///
82    /// Set to `""` to suppress this key.
83    #[serde(default = "default_file_key")]
84    #[configurable(metadata(docs::examples = "path"))]
85    pub file_key: OptionalValuePath,
86
87    /// Whether or not to start reading from the beginning of a new file.
88    #[configurable(
89        deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90    )]
91    #[configurable(metadata(docs::hidden))]
92    #[serde(default)]
93    pub start_at_beginning: Option<bool>,
94
95    /// Whether or not to ignore existing checkpoints when determining where to start reading a file.
96    ///
97    /// Checkpoints are still written normally.
98    #[serde(default)]
99    pub ignore_checkpoints: Option<bool>,
100
101    #[serde(default = "default_read_from")]
102    #[configurable(derived)]
103    pub read_from: ReadFromConfig,
104
105    /// Ignore files with a data modification date older than the specified number of seconds.
106    #[serde(alias = "ignore_older", default)]
107    #[configurable(metadata(docs::type_unit = "seconds"))]
108    #[configurable(metadata(docs::examples = 600))]
109    #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110    pub ignore_older_secs: Option<u64>,
111
112    /// The maximum size of a line before it is discarded.
113    ///
114    /// This protects against malformed lines or tailing incorrect files.
115    #[serde(default = "default_max_line_bytes")]
116    #[configurable(metadata(docs::type_unit = "bytes"))]
117    pub max_line_bytes: usize,
118
119    /// Overrides the name of the log field used to add the current hostname to each event.
120    ///
121    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
122    ///
123    /// Set to `""` to suppress this key.
124    ///
125    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
126    #[configurable(metadata(docs::examples = "hostname"))]
127    pub host_key: Option<OptionalValuePath>,
128
129    /// The directory used to persist file checkpoint positions.
130    ///
131    /// By default, the [global `data_dir` option][global_data_dir] is used.
132    /// Make sure the running user has write permissions to this directory.
133    ///
134    /// If this directory is specified, then Vector will attempt to create it.
135    ///
136    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
137    #[serde(default)]
138    #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139    #[configurable(metadata(docs::human_name = "Data Directory"))]
140    pub data_dir: Option<PathBuf>,
141
142    /// Enables adding the file offset to each event and sets the name of the log field used.
143    ///
144    /// The value is the byte offset of the start of the line within the file.
145    ///
146    /// Off by default, the offset is only added to the event if this is set.
147    #[serde(default)]
148    #[configurable(metadata(docs::examples = "offset"))]
149    pub offset_key: Option<OptionalValuePath>,
150
151    /// The delay between file discovery calls.
152    ///
153    /// This controls the interval at which files are searched. A higher value results in greater
154    /// chances of some short-lived files being missed between searches, but a lower value increases
155    /// the performance impact of file discovery.
156    #[serde(
157        alias = "glob_minimum_cooldown",
158        default = "default_glob_minimum_cooldown_ms"
159    )]
160    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161    #[configurable(metadata(docs::type_unit = "milliseconds"))]
162    #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163    pub glob_minimum_cooldown_ms: Duration,
164
165    #[configurable(derived)]
166    #[serde(alias = "fingerprinting", default)]
167    fingerprint: FingerprintConfig,
168
169    /// Ignore missing files when fingerprinting.
170    ///
171    /// This may be useful when used with source directories containing dangling symlinks.
172    #[serde(default)]
173    pub ignore_not_found: bool,
174
175    /// String value used to identify the start of a multi-line message.
176    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177    #[configurable(metadata(docs::hidden))]
178    #[serde(default)]
179    pub message_start_indicator: Option<String>,
180
181    /// How long to wait for more data when aggregating a multi-line message, in milliseconds.
182    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183    #[configurable(metadata(docs::hidden))]
184    #[serde(default = "default_multi_line_timeout")]
185    pub multi_line_timeout: u64,
186
187    /// Multiline aggregation configuration.
188    ///
189    /// If not specified, multiline aggregation is disabled.
190    #[configurable(derived)]
191    #[serde(default)]
192    pub multiline: Option<MultilineConfig>,
193
194    /// Max amount of bytes to read from a single file before switching over to the next file.
195    /// **Note:** This does not apply when `oldest_first` is `true`.
196    ///
197    /// This allows distributing the reads more or less evenly across
198    /// the files.
199    #[serde(default = "default_max_read_bytes")]
200    #[configurable(metadata(docs::type_unit = "bytes"))]
201    pub max_read_bytes: usize,
202
203    /// Instead of balancing read capacity fairly across all watched files, prioritize draining the oldest files before moving on to read data from more recent files.
204    #[serde(default)]
205    pub oldest_first: bool,
206
207    /// After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
208    ///
209    /// If not specified, files are not removed.
210    #[serde(alias = "remove_after", default)]
211    #[configurable(metadata(docs::type_unit = "seconds"))]
212    #[configurable(metadata(docs::examples = 0))]
213    #[configurable(metadata(docs::examples = 5))]
214    #[configurable(metadata(docs::examples = 60))]
215    #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216    pub remove_after_secs: Option<u64>,
217
218    /// String sequence used to separate one file line from another.
219    #[serde(default = "default_line_delimiter")]
220    #[configurable(metadata(docs::examples = "\r\n"))]
221    pub line_delimiter: String,
222
223    #[configurable(derived)]
224    #[serde(default)]
225    pub encoding: Option<EncodingConfig>,
226
227    #[configurable(derived)]
228    #[serde(default, deserialize_with = "bool_or_struct")]
229    acknowledgements: SourceAcknowledgementsConfig,
230
231    /// The namespace to use for logs. This overrides the global setting.
232    #[configurable(metadata(docs::hidden))]
233    #[serde(default)]
234    log_namespace: Option<bool>,
235
236    #[configurable(derived)]
237    #[serde(default)]
238    internal_metrics: FileInternalMetricsConfig,
239
240    /// How long to keep an open handle to a rotated log file.
241    /// The default value represents "no limit"
242    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243    #[configurable(metadata(docs::type_unit = "seconds"))]
244    #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245    pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249    bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253    OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257    ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261    Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265    1000
266} // deprecated
267
268const fn default_max_read_bytes() -> usize {
269    2048
270}
271
272fn default_line_delimiter() -> String {
273    "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277    Duration::from_secs(u64::MAX / 2)
278}
279
280/// Configuration for how files should be identified.
281///
282/// This is important for `checkpointing` when file rotation is used.
283#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287    docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290    /// Read lines from the beginning of the file and compute a checksum over them.
291    Checksum {
292        /// Maximum number of bytes to use, from the lines that are read, for generating the checksum.
293        ///
294        // TODO: Should we properly expose this in the documentation? There could definitely be value in allowing more
295        // bytes to be used for the checksum generation, but we should commit to exposing it rather than hiding it.
296        #[serde(alias = "fingerprint_bytes")]
297        #[configurable(metadata(docs::hidden))]
298        #[configurable(metadata(docs::type_unit = "bytes"))]
299        bytes: Option<usize>,
300
301        /// The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
302        /// If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
303        /// gzip is supported at this time.
304        ///
305        /// This can be helpful if all files share a common header that should be skipped.
306        #[serde(default = "default_ignored_header_bytes")]
307        #[configurable(metadata(docs::type_unit = "bytes"))]
308        ignored_header_bytes: usize,
309
310        /// The number of lines to read for generating the checksum.
311        ///
312        /// The number of lines are determined from the uncompressed content if the file is compressed. Only
313        /// gzip is supported at this time.
314        ///
315        /// If the file has less than this amount of lines, it won’t be read at all.
316        #[serde(default = "default_lines")]
317        #[configurable(metadata(docs::type_unit = "lines"))]
318        lines: usize,
319    },
320
321    /// Use the [device and inode][inode] as the identifier.
322    ///
323    /// [inode]: https://en.wikipedia.org/wiki/Inode
324    #[serde(rename = "device_and_inode")]
325    DevInode,
326}
327
328impl Default for FingerprintConfig {
329    fn default() -> Self {
330        Self::Checksum {
331            bytes: None,
332            ignored_header_bytes: 0,
333            lines: default_lines(),
334        }
335    }
336}
337
338const fn default_ignored_header_bytes() -> usize {
339    0
340}
341
342const fn default_lines() -> usize {
343    1
344}
345
346impl From<FingerprintConfig> for FingerprintStrategy {
347    fn from(config: FingerprintConfig) -> FingerprintStrategy {
348        match config {
349            FingerprintConfig::Checksum {
350                bytes,
351                ignored_header_bytes,
352                lines,
353            } => {
354                let bytes = match bytes {
355                    Some(bytes) => {
356                        warn!(
357                            message = "The `fingerprint.bytes` option will be used to convert old file fingerprints created by vector < v0.11.0, but are not supported for new file fingerprints. The first line will be used instead."
358                        );
359                        bytes
360                    }
361                    None => 256,
362                };
363                FingerprintStrategy::Checksum {
364                    bytes,
365                    ignored_header_bytes,
366                    lines,
367                }
368            }
369            FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
370        }
371    }
372}
373
374#[derive(Debug)]
375pub(crate) struct FinalizerEntry {
376    pub(crate) file_id: FileFingerprint,
377    pub(crate) offset: u64,
378}
379
380impl Default for FileConfig {
381    fn default() -> Self {
382        Self {
383            include: vec![PathBuf::from("/var/log/**/*.log")],
384            exclude: vec![],
385            file_key: default_file_key(),
386            start_at_beginning: None,
387            ignore_checkpoints: None,
388            read_from: default_read_from(),
389            ignore_older_secs: None,
390            max_line_bytes: default_max_line_bytes(),
391            fingerprint: FingerprintConfig::default(),
392            ignore_not_found: false,
393            host_key: None,
394            offset_key: None,
395            data_dir: None,
396            glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
397            message_start_indicator: None,
398            multi_line_timeout: default_multi_line_timeout(), // millis
399            multiline: None,
400            max_read_bytes: default_max_read_bytes(),
401            oldest_first: false,
402            remove_after_secs: None,
403            line_delimiter: default_line_delimiter(),
404            encoding: None,
405            acknowledgements: Default::default(),
406            log_namespace: None,
407            internal_metrics: Default::default(),
408            rotate_wait: default_rotate_wait(),
409        }
410    }
411}
412
413impl_generate_config_from_default!(FileConfig);
414
415#[async_trait::async_trait]
416#[typetag::serde(name = "file")]
417impl SourceConfig for FileConfig {
418    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
419        // add the source name as a subdir, so that multiple sources can
420        // operate within the same given data_dir (e.g. the global one)
421        // without the file servers' checkpointers interfering with each
422        // other
423        let data_dir = cx
424            .globals
425            // source are only global, name can be used for subdir
426            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
427
428        // Clippy rule, because async_trait?
429        #[allow(clippy::suspicious_else_formatting)]
430        {
431            if let Some(ref config) = self.multiline {
432                let _: line_agg::Config = config.try_into()?;
433            }
434
435            if let Some(ref indicator) = self.message_start_indicator {
436                Regex::new(indicator)
437                    .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
438            }
439        }
440
441        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
442
443        let log_namespace = cx.log_namespace(self.log_namespace);
444
445        Ok(file_source(
446            self,
447            data_dir,
448            cx.shutdown,
449            cx.out,
450            acknowledgements,
451            log_namespace,
452        ))
453    }
454
455    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
456        let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
457        let host_key = self
458            .host_key
459            .clone()
460            .unwrap_or(log_schema().host_key().cloned().into())
461            .path
462            .map(LegacyKey::Overwrite);
463
464        let offset_key = self
465            .offset_key
466            .clone()
467            .and_then(|k| k.path)
468            .map(LegacyKey::Overwrite);
469
470        let schema_definition = BytesDeserializerConfig
471            .schema_definition(global_log_namespace.merge(self.log_namespace))
472            .with_standard_vector_source_metadata()
473            .with_source_metadata(
474                Self::NAME,
475                host_key,
476                &owned_value_path!("host"),
477                Kind::bytes().or_undefined(),
478                Some("host"),
479            )
480            .with_source_metadata(
481                Self::NAME,
482                offset_key,
483                &owned_value_path!("offset"),
484                Kind::integer(),
485                None,
486            )
487            .with_source_metadata(
488                Self::NAME,
489                file_key,
490                &owned_value_path!("path"),
491                Kind::bytes(),
492                None,
493            );
494
495        vec![SourceOutput::new_maybe_logs(
496            DataType::Log,
497            schema_definition,
498        )]
499    }
500
501    fn can_acknowledge(&self) -> bool {
502        true
503    }
504}
505
506pub fn file_source(
507    config: &FileConfig,
508    data_dir: PathBuf,
509    shutdown: ShutdownSignal,
510    mut out: SourceSender,
511    acknowledgements: bool,
512    log_namespace: LogNamespace,
513) -> super::Source {
514    // the include option must be specified but also must contain at least one entry.
515    if config.include.is_empty() {
516        error!(message = "`include` configuration option must contain at least one file pattern.");
517        return Box::pin(future::ready(Err(())));
518    }
519
520    let exclude_patterns = config
521        .exclude
522        .iter()
523        .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
524        .collect::<Vec<PathBuf>>();
525    let ignore_before = calculate_ignore_before(config.ignore_older_secs);
526    let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
527    let (ignore_checkpoints, read_from) = reconcile_position_options(
528        config.start_at_beginning,
529        config.ignore_checkpoints,
530        Some(config.read_from),
531    );
532
533    let emitter = FileSourceInternalEventsEmitter {
534        include_file_metric_tag: config.internal_metrics.include_file_tag,
535    };
536
537    let paths_provider = Glob::new(
538        &config.include,
539        &exclude_patterns,
540        MatchOptions::default(),
541        emitter.clone(),
542    )
543    .expect("invalid glob patterns");
544
545    let encoding_charset = config.encoding.clone().map(|e| e.charset);
546
547    // if file encoding is specified, need to convert the line delimiter (present as utf8)
548    // to the specified encoding, so that delimiter-based line splitting can work properly
549    let line_delimiter_as_bytes = match encoding_charset {
550        Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
551        None => Bytes::from(config.line_delimiter.clone()),
552    };
553
554    let checkpointer = Checkpointer::new(&data_dir);
555    let file_server = FileServer {
556        paths_provider,
557        max_read_bytes: config.max_read_bytes,
558        ignore_checkpoints,
559        read_from,
560        ignore_before,
561        max_line_bytes: config.max_line_bytes,
562        line_delimiter: line_delimiter_as_bytes,
563        data_dir,
564        glob_minimum_cooldown,
565        fingerprinter: Fingerprinter {
566            strategy: config.fingerprint.clone().into(),
567            max_line_length: config.max_line_bytes,
568            ignore_not_found: config.ignore_not_found,
569        },
570        oldest_first: config.oldest_first,
571        remove_after: config.remove_after_secs.map(Duration::from_secs),
572        emitter,
573        rotate_wait: config.rotate_wait,
574    };
575
576    let event_metadata = EventMetadata {
577        host_key: config
578            .host_key
579            .clone()
580            .unwrap_or(log_schema().host_key().cloned().into())
581            .path,
582        hostname: crate::get_hostname().ok(),
583        file_key: config.file_key.clone().path,
584        offset_key: config.offset_key.clone().and_then(|k| k.path),
585    };
586
587    let include = config.include.clone();
588    let exclude = config.exclude.clone();
589    let multiline_config = config.multiline.clone();
590    let message_start_indicator = config.message_start_indicator.clone();
591    let multi_line_timeout = config.multi_line_timeout;
592
593    let (finalizer, shutdown_checkpointer) = if acknowledgements {
594        // The shutdown sent in to the finalizer is the global
595        // shutdown handle used to tell it to stop accepting new batch
596        // statuses and just wait for the remaining acks to come in.
597        let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
598
599        // We set up a separate shutdown signal to tie together the
600        // finalizer and the checkpoint writer task in the file
601        // server, to make it continue to write out updated
602        // checkpoints until all the acks have come in.
603        let (send_shutdown, shutdown2) = oneshot::channel::<()>();
604        let checkpoints = checkpointer.view();
605        tokio::spawn(async move {
606            while let Some((status, entry)) = ack_stream.next().await {
607                if status == BatchStatus::Delivered {
608                    checkpoints.update(entry.file_id, entry.offset);
609                }
610            }
611            send_shutdown.send(())
612        });
613        (Some(finalizer), shutdown2.map(|_| ()).boxed())
614    } else {
615        // When not dealing with end-to-end acknowledgements, just
616        // clone the global shutdown to stop the checkpoint writer.
617        (None, shutdown.clone().map(|_| ()).boxed())
618    };
619
620    let checkpoints = checkpointer.view();
621    let include_file_metric_tag = config.internal_metrics.include_file_tag;
622    Box::pin(async move {
623        info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
624
625        let mut encoding_decoder = encoding_charset.map(Decoder::new);
626
627        // sizing here is just a guess
628        let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
629        let rx = rx
630            .map(futures::stream::iter)
631            .flatten()
632            .map(move |mut line| {
633                emit!(FileBytesReceived {
634                    byte_size: line.text.len(),
635                    file: &line.filename,
636                    include_file_metric_tag,
637                });
638                // transcode each line from the file's encoding charset to utf8
639                line.text = match encoding_decoder.as_mut() {
640                    Some(d) => d.decode_to_utf8(line.text),
641                    None => line.text,
642                };
643                line
644            });
645
646        let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
647            if let Some(ref multiline_config) = multiline_config {
648                wrap_with_line_agg(
649                    rx,
650                    multiline_config.try_into().unwrap(), // validated in build
651                )
652            } else if let Some(msi) = message_start_indicator {
653                wrap_with_line_agg(
654                    rx,
655                    line_agg::Config::for_legacy(
656                        Regex::new(&msi).unwrap(), // validated in build
657                        multi_line_timeout,
658                    ),
659                )
660            } else {
661                Box::new(rx)
662            };
663
664        // Once file server ends this will run until it has finished processing remaining
665        // logs in the queue.
666        let span = Span::current();
667        let mut messages = messages.map(move |line| {
668            let mut event = create_event(
669                line.text,
670                line.start_offset,
671                &line.filename,
672                &event_metadata,
673                log_namespace,
674                include_file_metric_tag,
675            );
676
677            if let Some(finalizer) = &finalizer {
678                let (batch, receiver) = BatchNotifier::new_with_receiver();
679                event = event.with_batch_notifier(&batch);
680                let entry = FinalizerEntry {
681                    file_id: line.file_id,
682                    offset: line.end_offset,
683                };
684                finalizer.add(entry, receiver);
685            } else {
686                checkpoints.update(line.file_id, line.end_offset);
687            }
688            event
689        });
690        tokio::spawn(async move {
691            match out
692                .send_event_stream(&mut messages)
693                .instrument(span.or_current())
694                .await
695            {
696                Ok(()) => {
697                    debug!("Finished sending.");
698                }
699                Err(_) => {
700                    let (count, _) = messages.size_hint();
701                    emit!(StreamClosedError { count });
702                }
703            }
704        });
705
706        let span = info_span!("file_server");
707        tokio::task::spawn_blocking(move || {
708            let _enter = span.enter();
709            let rt = tokio::runtime::Handle::current();
710            let result =
711                rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
712            emit!(FileOpen { count: 0 });
713            // Panic if we encounter any error originating from the file server.
714            // We're at the `spawn_blocking` call, the panic will be caught and
715            // passed to the `JoinHandle` error, similar to the usual threads.
716            result.expect("file server exited with an error");
717        })
718        .map_err(|error| error!(message="File server unexpectedly stopped.", %error))
719        .await
720    })
721}
722
723/// Emit deprecation warning if the old option is used, and take it into account when determining
724/// defaults. Any of the newer options will override it when set directly.
725fn reconcile_position_options(
726    start_at_beginning: Option<bool>,
727    ignore_checkpoints: Option<bool>,
728    read_from: Option<ReadFromConfig>,
729) -> (bool, ReadFrom) {
730    if start_at_beginning.is_some() {
731        warn!(
732            message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
733        )
734    }
735
736    match start_at_beginning {
737        Some(true) => (
738            ignore_checkpoints.unwrap_or(true),
739            read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
740        ),
741        _ => (
742            ignore_checkpoints.unwrap_or(false),
743            read_from.map(Into::into).unwrap_or_default(),
744        ),
745    }
746}
747
748fn wrap_with_line_agg(
749    rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
750    config: line_agg::Config,
751) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
752    let logic = line_agg::Logic::new(config);
753    Box::new(
754        LineAgg::new(
755            rx.map(|line| {
756                (
757                    line.filename,
758                    line.text,
759                    (line.file_id, line.start_offset, line.end_offset),
760                )
761            }),
762            logic,
763        )
764        .map(
765            |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
766                text,
767                filename,
768                file_id,
769                start_offset,
770                end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
771                    lastline_end_offset
772                }),
773            },
774        ),
775    )
776}
777
778struct EventMetadata {
779    host_key: Option<OwnedValuePath>,
780    hostname: Option<String>,
781    file_key: Option<OwnedValuePath>,
782    offset_key: Option<OwnedValuePath>,
783}
784
785fn create_event(
786    line: Bytes,
787    offset: u64,
788    file: &str,
789    meta: &EventMetadata,
790    log_namespace: LogNamespace,
791    include_file_metric_tag: bool,
792) -> LogEvent {
793    let deserializer = BytesDeserializer;
794    let mut event = deserializer.parse_single(line, log_namespace);
795
796    log_namespace.insert_vector_metadata(
797        &mut event,
798        log_schema().source_type_key(),
799        path!("source_type"),
800        Bytes::from_static(FileConfig::NAME.as_bytes()),
801    );
802    log_namespace.insert_vector_metadata(
803        &mut event,
804        log_schema().timestamp_key(),
805        path!("ingest_timestamp"),
806        Utc::now(),
807    );
808
809    let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
810    // `meta.host_key` is already `unwrap_or_else`ed so we can just pass it in.
811    if let Some(hostname) = &meta.hostname {
812        log_namespace.insert_source_metadata(
813            FileConfig::NAME,
814            &mut event,
815            legacy_host_key,
816            path!("host"),
817            hostname.clone(),
818        );
819    }
820
821    let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
822    log_namespace.insert_source_metadata(
823        FileConfig::NAME,
824        &mut event,
825        legacy_offset_key,
826        path!("offset"),
827        offset,
828    );
829
830    let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
831    log_namespace.insert_source_metadata(
832        FileConfig::NAME,
833        &mut event,
834        legacy_file_key,
835        path!("path"),
836        file,
837    );
838
839    emit!(FileEventsReceived {
840        count: 1,
841        file,
842        byte_size: event.estimated_json_encoded_size_of(),
843        include_file_metric_tag,
844    });
845
846    event
847}
848
849#[cfg(test)]
850mod tests {
851    use std::{
852        collections::HashSet,
853        fs::{self, File},
854        future::Future,
855        io::{Seek, Write},
856    };
857
858    use encoding_rs::UTF_16LE;
859    use similar_asserts::assert_eq;
860    use tempfile::tempdir;
861    use tokio::time::{Duration, sleep, timeout};
862    use vector_lib::schema::Definition;
863    use vrl::{value, value::kind::Collection};
864
865    use super::*;
866    use crate::{
867        config::Config,
868        event::{Event, EventStatus, Value},
869        shutdown::ShutdownSignal,
870        sources::file,
871        test_util::components::{FILE_SOURCE_TAGS, assert_source_compliance},
872    };
873
874    #[test]
875    fn generate_config() {
876        crate::test_util::test_generate_config::<FileConfig>();
877    }
878
879    fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
880        file::FileConfig {
881            fingerprint: FingerprintConfig::Checksum {
882                bytes: Some(8),
883                ignored_header_bytes: 0,
884                lines: 1,
885            },
886            data_dir: Some(dir.path().to_path_buf()),
887            glob_minimum_cooldown_ms: Duration::from_millis(100),
888            internal_metrics: FileInternalMetricsConfig {
889                include_file_tag: true,
890            },
891            ..Default::default()
892        }
893    }
894
895    async fn sleep_500_millis() {
896        sleep(Duration::from_millis(500)).await;
897    }
898
899    #[test]
900    fn parse_config() {
901        let config: FileConfig = toml::from_str(
902            r#"
903            include = [ "/var/log/**/*.log" ]
904            file_key = "file"
905            glob_minimum_cooldown_ms = 1000
906            multi_line_timeout = 1000
907            max_read_bytes = 2048
908            line_delimiter = "\n"
909        "#,
910        )
911        .unwrap();
912        assert_eq!(config, FileConfig::default());
913        assert_eq!(
914            config.fingerprint,
915            FingerprintConfig::Checksum {
916                bytes: None,
917                ignored_header_bytes: 0,
918                lines: 1
919            }
920        );
921
922        let config: FileConfig = toml::from_str(
923            r#"
924        include = [ "/var/log/**/*.log" ]
925        [fingerprint]
926        strategy = "device_and_inode"
927        "#,
928        )
929        .unwrap();
930        assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
931
932        let config: FileConfig = toml::from_str(
933            r#"
934        include = [ "/var/log/**/*.log" ]
935        [fingerprint]
936        strategy = "checksum"
937        bytes = 128
938        ignored_header_bytes = 512
939        "#,
940        )
941        .unwrap();
942        assert_eq!(
943            config.fingerprint,
944            FingerprintConfig::Checksum {
945                bytes: Some(128),
946                ignored_header_bytes: 512,
947                lines: 1
948            }
949        );
950
951        let config: FileConfig = toml::from_str(
952            r#"
953        include = [ "/var/log/**/*.log" ]
954        [encoding]
955        charset = "utf-16le"
956        "#,
957        )
958        .unwrap();
959        assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
960
961        let config: FileConfig = toml::from_str(
962            r#"
963        include = [ "/var/log/**/*.log" ]
964        read_from = "beginning"
965        "#,
966        )
967        .unwrap();
968        assert_eq!(config.read_from, ReadFromConfig::Beginning);
969
970        let config: FileConfig = toml::from_str(
971            r#"
972        include = [ "/var/log/**/*.log" ]
973        read_from = "end"
974        "#,
975        )
976        .unwrap();
977        assert_eq!(config.read_from, ReadFromConfig::End);
978    }
979
980    #[test]
981    fn resolve_data_dir() {
982        let global_dir = tempdir().unwrap();
983        let local_dir = tempdir().unwrap();
984
985        let mut config = Config::default();
986        config.global.data_dir = global_dir.keep().into();
987
988        // local path given -- local should win
989        let res = config
990            .global
991            .resolve_and_validate_data_dir(test_default_file_config(&local_dir).data_dir.as_ref())
992            .unwrap();
993        assert_eq!(res, local_dir.path());
994
995        // no local path given -- global fallback should be in effect
996        let res = config.global.resolve_and_validate_data_dir(None).unwrap();
997        assert_eq!(res, config.global.data_dir.unwrap());
998    }
999
1000    #[test]
1001    fn output_schema_definition_vector_namespace() {
1002        let definitions = FileConfig::default()
1003            .outputs(LogNamespace::Vector)
1004            .remove(0)
1005            .schema_definition(true);
1006
1007        assert_eq!(
1008            definitions,
1009            Some(
1010                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1011                    .with_meaning(OwnedTargetPath::event_root(), "message")
1012                    .with_metadata_field(
1013                        &owned_value_path!("vector", "source_type"),
1014                        Kind::bytes(),
1015                        None
1016                    )
1017                    .with_metadata_field(
1018                        &owned_value_path!("vector", "ingest_timestamp"),
1019                        Kind::timestamp(),
1020                        None
1021                    )
1022                    .with_metadata_field(
1023                        &owned_value_path!("file", "host"),
1024                        Kind::bytes().or_undefined(),
1025                        Some("host")
1026                    )
1027                    .with_metadata_field(
1028                        &owned_value_path!("file", "offset"),
1029                        Kind::integer(),
1030                        None
1031                    )
1032                    .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1033            )
1034        )
1035    }
1036
1037    #[test]
1038    fn output_schema_definition_legacy_namespace() {
1039        let definitions = FileConfig::default()
1040            .outputs(LogNamespace::Legacy)
1041            .remove(0)
1042            .schema_definition(true);
1043
1044        assert_eq!(
1045            definitions,
1046            Some(
1047                Definition::new_with_default_metadata(
1048                    Kind::object(Collection::empty()),
1049                    [LogNamespace::Legacy]
1050                )
1051                .with_event_field(
1052                    &owned_value_path!("message"),
1053                    Kind::bytes(),
1054                    Some("message")
1055                )
1056                .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1057                .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1058                .with_event_field(
1059                    &owned_value_path!("host"),
1060                    Kind::bytes().or_undefined(),
1061                    Some("host")
1062                )
1063                .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1064                .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1065            )
1066        )
1067    }
1068
1069    #[test]
1070    fn create_event_legacy_namespace() {
1071        let line = Bytes::from("hello world");
1072        let file = "some_file.rs";
1073        let offset: u64 = 0;
1074
1075        let meta = EventMetadata {
1076            host_key: Some(owned_value_path!("host")),
1077            hostname: Some("Some.Machine".to_string()),
1078            file_key: Some(owned_value_path!("file")),
1079            offset_key: Some(owned_value_path!("offset")),
1080        };
1081        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1082
1083        assert_eq!(log["file"], "some_file.rs".into());
1084        assert_eq!(log["host"], "Some.Machine".into());
1085        assert_eq!(log["offset"], 0.into());
1086        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1087        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1088        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1089    }
1090
1091    #[test]
1092    fn create_event_custom_fields_legacy_namespace() {
1093        let line = Bytes::from("hello world");
1094        let file = "some_file.rs";
1095        let offset: u64 = 0;
1096
1097        let meta = EventMetadata {
1098            host_key: Some(owned_value_path!("hostname")),
1099            hostname: Some("Some.Machine".to_string()),
1100            file_key: Some(owned_value_path!("file_path")),
1101            offset_key: Some(owned_value_path!("off")),
1102        };
1103        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1104
1105        assert_eq!(log["file_path"], "some_file.rs".into());
1106        assert_eq!(log["hostname"], "Some.Machine".into());
1107        assert_eq!(log["off"], 0.into());
1108        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1109        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1110        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1111    }
1112
1113    #[test]
1114    fn create_event_vector_namespace() {
1115        let line = Bytes::from("hello world");
1116        let file = "some_file.rs";
1117        let offset: u64 = 0;
1118
1119        let meta = EventMetadata {
1120            host_key: Some(owned_value_path!("ignored")),
1121            hostname: Some("Some.Machine".to_string()),
1122            file_key: Some(owned_value_path!("ignored")),
1123            offset_key: Some(owned_value_path!("ignored")),
1124        };
1125        let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1126
1127        assert_eq!(log.value(), &value!("hello world"));
1128
1129        assert_eq!(
1130            log.metadata()
1131                .value()
1132                .get(path!("vector", "source_type"))
1133                .unwrap(),
1134            &value!("file")
1135        );
1136        assert!(
1137            log.metadata()
1138                .value()
1139                .get(path!("vector", "ingest_timestamp"))
1140                .unwrap()
1141                .is_timestamp()
1142        );
1143
1144        assert_eq!(
1145            log.metadata()
1146                .value()
1147                .get(path!(FileConfig::NAME, "host"))
1148                .unwrap(),
1149            &value!("Some.Machine")
1150        );
1151        assert_eq!(
1152            log.metadata()
1153                .value()
1154                .get(path!(FileConfig::NAME, "offset"))
1155                .unwrap(),
1156            &value!(0)
1157        );
1158        assert_eq!(
1159            log.metadata()
1160                .value()
1161                .get(path!(FileConfig::NAME, "path"))
1162                .unwrap(),
1163            &value!("some_file.rs")
1164        );
1165    }
1166
1167    #[tokio::test]
1168    async fn file_happy_path() {
1169        let n = 5;
1170
1171        let dir = tempdir().unwrap();
1172        let config = file::FileConfig {
1173            include: vec![dir.path().join("*")],
1174            ..test_default_file_config(&dir)
1175        };
1176
1177        let path1 = dir.path().join("file1");
1178        let path2 = dir.path().join("file2");
1179
1180        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1181            let mut file1 = File::create(&path1).unwrap();
1182            let mut file2 = File::create(&path2).unwrap();
1183
1184            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1185
1186            for i in 0..n {
1187                writeln!(&mut file1, "hello {i}").unwrap();
1188                writeln!(&mut file2, "goodbye {i}").unwrap();
1189            }
1190
1191            sleep_500_millis().await;
1192        })
1193        .await;
1194
1195        let mut hello_i = 0;
1196        let mut goodbye_i = 0;
1197
1198        for event in received {
1199            let line =
1200                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1201            if line.starts_with("hello") {
1202                assert_eq!(line, format!("hello {}", hello_i));
1203                assert_eq!(
1204                    event.as_log()["file"].to_string_lossy(),
1205                    path1.to_str().unwrap()
1206                );
1207                hello_i += 1;
1208            } else {
1209                assert_eq!(line, format!("goodbye {}", goodbye_i));
1210                assert_eq!(
1211                    event.as_log()["file"].to_string_lossy(),
1212                    path2.to_str().unwrap()
1213                );
1214                goodbye_i += 1;
1215            }
1216        }
1217        assert_eq!(hello_i, n);
1218        assert_eq!(goodbye_i, n);
1219    }
1220
1221    // https://github.com/vectordotdev/vector/issues/8363
1222    #[tokio::test]
1223    async fn file_read_empty_lines() {
1224        let n = 5;
1225
1226        let dir = tempdir().unwrap();
1227        let config = file::FileConfig {
1228            include: vec![dir.path().join("*")],
1229            ..test_default_file_config(&dir)
1230        };
1231
1232        let path = dir.path().join("file");
1233
1234        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1235            let mut file = File::create(&path).unwrap();
1236
1237            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1238
1239            writeln!(&mut file, "line for checkpointing").unwrap();
1240            for _i in 0..n {
1241                writeln!(&mut file).unwrap();
1242            }
1243
1244            sleep_500_millis().await;
1245        })
1246        .await;
1247
1248        assert_eq!(received.len(), n + 1);
1249    }
1250
1251    #[tokio::test]
1252    async fn file_truncate() {
1253        let n = 5;
1254
1255        let dir = tempdir().unwrap();
1256        let config = file::FileConfig {
1257            include: vec![dir.path().join("*")],
1258            ..test_default_file_config(&dir)
1259        };
1260        let path = dir.path().join("file");
1261        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1262            let mut file = File::create(&path).unwrap();
1263
1264            sleep_500_millis().await; // The files must be observed at its original length before writing to it
1265
1266            for i in 0..n {
1267                writeln!(&mut file, "pretrunc {i}").unwrap();
1268            }
1269
1270            sleep_500_millis().await; // The writes must be observed before truncating
1271
1272            file.set_len(0).unwrap();
1273            file.seek(std::io::SeekFrom::Start(0)).unwrap();
1274
1275            sleep_500_millis().await; // The truncate must be observed before writing again
1276
1277            for i in 0..n {
1278                writeln!(&mut file, "posttrunc {i}").unwrap();
1279            }
1280
1281            sleep_500_millis().await;
1282        })
1283        .await;
1284
1285        let mut i = 0;
1286        let mut pre_trunc = true;
1287
1288        for event in received {
1289            assert_eq!(
1290                event.as_log()["file"].to_string_lossy(),
1291                path.to_str().unwrap()
1292            );
1293
1294            let line =
1295                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1296
1297            if pre_trunc {
1298                assert_eq!(line, format!("pretrunc {}", i));
1299            } else {
1300                assert_eq!(line, format!("posttrunc {}", i));
1301            }
1302
1303            i += 1;
1304            if i == n {
1305                i = 0;
1306                pre_trunc = false;
1307            }
1308        }
1309    }
1310
1311    #[tokio::test]
1312    async fn file_rotate() {
1313        let n = 5;
1314
1315        let dir = tempdir().unwrap();
1316        let config = file::FileConfig {
1317            include: vec![dir.path().join("*")],
1318            ..test_default_file_config(&dir)
1319        };
1320
1321        let path = dir.path().join("file");
1322        let archive_path = dir.path().join("file");
1323        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1324            let mut file = File::create(&path).unwrap();
1325
1326            sleep_500_millis().await; // The files must be observed at its original length before writing to it
1327
1328            for i in 0..n {
1329                writeln!(&mut file, "prerot {i}").unwrap();
1330            }
1331
1332            sleep_500_millis().await; // The writes must be observed before rotating
1333
1334            fs::rename(&path, archive_path).expect("could not rename");
1335            let mut file = File::create(&path).unwrap();
1336
1337            sleep_500_millis().await; // The rotation must be observed before writing again
1338
1339            for i in 0..n {
1340                writeln!(&mut file, "postrot {i}").unwrap();
1341            }
1342
1343            sleep_500_millis().await;
1344        })
1345        .await;
1346
1347        let mut i = 0;
1348        let mut pre_rot = true;
1349
1350        for event in received {
1351            assert_eq!(
1352                event.as_log()["file"].to_string_lossy(),
1353                path.to_str().unwrap()
1354            );
1355
1356            let line =
1357                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1358
1359            if pre_rot {
1360                assert_eq!(line, format!("prerot {}", i));
1361            } else {
1362                assert_eq!(line, format!("postrot {}", i));
1363            }
1364
1365            i += 1;
1366            if i == n {
1367                i = 0;
1368                pre_rot = false;
1369            }
1370        }
1371    }
1372
1373    #[tokio::test]
1374    async fn file_multiple_paths() {
1375        let n = 5;
1376
1377        let dir = tempdir().unwrap();
1378        let config = file::FileConfig {
1379            include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1380            exclude: vec![dir.path().join("a.*.txt")],
1381            ..test_default_file_config(&dir)
1382        };
1383
1384        let path1 = dir.path().join("a.txt");
1385        let path2 = dir.path().join("b.txt");
1386        let path3 = dir.path().join("a.log");
1387        let path4 = dir.path().join("a.ignore.txt");
1388        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1389            let mut file1 = File::create(&path1).unwrap();
1390            let mut file2 = File::create(&path2).unwrap();
1391            let mut file3 = File::create(&path3).unwrap();
1392            let mut file4 = File::create(&path4).unwrap();
1393
1394            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1395
1396            for i in 0..n {
1397                writeln!(&mut file1, "1 {i}").unwrap();
1398                writeln!(&mut file2, "2 {i}").unwrap();
1399                writeln!(&mut file3, "3 {i}").unwrap();
1400                writeln!(&mut file4, "4 {i}").unwrap();
1401            }
1402
1403            sleep_500_millis().await;
1404        })
1405        .await;
1406
1407        let mut is = [0; 3];
1408
1409        for event in received {
1410            let line =
1411                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1412            let mut split = line.split(' ');
1413            let file = split.next().unwrap().parse::<usize>().unwrap();
1414            assert_ne!(file, 4);
1415            let i = split.next().unwrap().parse::<usize>().unwrap();
1416
1417            assert_eq!(is[file - 1], i);
1418            is[file - 1] += 1;
1419        }
1420
1421        assert_eq!(is, [n as usize; 3]);
1422    }
1423
1424    #[tokio::test]
1425    async fn file_exclude_paths() {
1426        let n = 5;
1427
1428        let dir = tempdir().unwrap();
1429        let config = file::FileConfig {
1430            include: vec![dir.path().join("a//b/*.log.*")],
1431            exclude: vec![dir.path().join("a//b/test.log.*")],
1432            ..test_default_file_config(&dir)
1433        };
1434
1435        let path1 = dir.path().join("a//b/a.log.1");
1436        let path2 = dir.path().join("a//b/test.log.1");
1437        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1438            std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1439            let mut file1 = File::create(&path1).unwrap();
1440            let mut file2 = File::create(&path2).unwrap();
1441
1442            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1443
1444            for i in 0..n {
1445                writeln!(&mut file1, "1 {i}").unwrap();
1446                writeln!(&mut file2, "2 {i}").unwrap();
1447            }
1448
1449            sleep_500_millis().await;
1450        })
1451        .await;
1452
1453        let mut is = [0; 1];
1454
1455        for event in received {
1456            let line =
1457                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1458            let mut split = line.split(' ');
1459            let file = split.next().unwrap().parse::<usize>().unwrap();
1460            assert_ne!(file, 4);
1461            let i = split.next().unwrap().parse::<usize>().unwrap();
1462
1463            assert_eq!(is[file - 1], i);
1464            is[file - 1] += 1;
1465        }
1466
1467        assert_eq!(is, [n as usize; 1]);
1468    }
1469
1470    #[tokio::test]
1471    async fn file_key_acknowledged() {
1472        file_key(Acks).await
1473    }
1474
1475    #[tokio::test]
1476    async fn file_key_no_acknowledge() {
1477        file_key(NoAcks).await
1478    }
1479
1480    async fn file_key(acks: AckingMode) {
1481        // Default
1482        {
1483            let dir = tempdir().unwrap();
1484            let config = file::FileConfig {
1485                include: vec![dir.path().join("*")],
1486                ..test_default_file_config(&dir)
1487            };
1488
1489            let path = dir.path().join("file");
1490            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1491                let mut file = File::create(&path).unwrap();
1492
1493                sleep_500_millis().await;
1494
1495                writeln!(&mut file, "hello there").unwrap();
1496
1497                sleep_500_millis().await;
1498            })
1499            .await;
1500
1501            assert_eq!(received.len(), 1);
1502            assert_eq!(
1503                received[0].as_log()["file"].to_string_lossy(),
1504                path.to_str().unwrap()
1505            );
1506        }
1507
1508        // Custom
1509        {
1510            let dir = tempdir().unwrap();
1511            let config = file::FileConfig {
1512                include: vec![dir.path().join("*")],
1513                file_key: OptionalValuePath::from(owned_value_path!("source")),
1514                ..test_default_file_config(&dir)
1515            };
1516
1517            let path = dir.path().join("file");
1518            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1519                let mut file = File::create(&path).unwrap();
1520
1521                sleep_500_millis().await;
1522
1523                writeln!(&mut file, "hello there").unwrap();
1524
1525                sleep_500_millis().await;
1526            })
1527            .await;
1528
1529            assert_eq!(received.len(), 1);
1530            assert_eq!(
1531                received[0].as_log()["source"].to_string_lossy(),
1532                path.to_str().unwrap()
1533            );
1534        }
1535
1536        // Hidden
1537        {
1538            let dir = tempdir().unwrap();
1539            let config = file::FileConfig {
1540                include: vec![dir.path().join("*")],
1541                ..test_default_file_config(&dir)
1542            };
1543
1544            let path = dir.path().join("file");
1545            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1546                let mut file = File::create(&path).unwrap();
1547
1548                sleep_500_millis().await;
1549
1550                writeln!(&mut file, "hello there").unwrap();
1551
1552                sleep_500_millis().await;
1553            })
1554            .await;
1555
1556            assert_eq!(received.len(), 1);
1557            assert_eq!(
1558                received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1559                vec![
1560                    default_file_key()
1561                        .path
1562                        .expect("file key to exist")
1563                        .to_string()
1564                        .into(),
1565                    log_schema().host_key().unwrap().to_string().into(),
1566                    log_schema().message_key().unwrap().to_string().into(),
1567                    log_schema().timestamp_key().unwrap().to_string().into(),
1568                    log_schema().source_type_key().unwrap().to_string().into()
1569                ]
1570                .into_iter()
1571                .collect::<HashSet<_>>()
1572            );
1573        }
1574    }
1575
1576    #[cfg(target_os = "linux")] // see #7988
1577    #[tokio::test]
1578    async fn file_start_position_server_restart_acknowledged() {
1579        file_start_position_server_restart(Acks).await
1580    }
1581
1582    #[cfg(target_os = "linux")] // see #7988
1583    #[tokio::test]
1584    async fn file_start_position_server_restart_no_acknowledge() {
1585        file_start_position_server_restart(NoAcks).await
1586    }
1587
1588    #[cfg(target_os = "linux")] // see #7988
1589    async fn file_start_position_server_restart(acking: AckingMode) {
1590        let dir = tempdir().unwrap();
1591        let config = file::FileConfig {
1592            include: vec![dir.path().join("*")],
1593            ..test_default_file_config(&dir)
1594        };
1595
1596        let path = dir.path().join("file");
1597        let mut file = File::create(&path).unwrap();
1598        writeln!(&mut file, "zeroth line").unwrap();
1599        sleep_500_millis().await;
1600
1601        // First time server runs it picks up existing lines.
1602        {
1603            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1604                sleep_500_millis().await;
1605                writeln!(&mut file, "first line").unwrap();
1606                sleep_500_millis().await;
1607            })
1608            .await;
1609
1610            let lines = extract_messages_string(received);
1611            assert_eq!(lines, vec!["zeroth line", "first line"]);
1612        }
1613        // Restart server, read file from checkpoint.
1614        {
1615            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1616                sleep_500_millis().await;
1617                writeln!(&mut file, "second line").unwrap();
1618                sleep_500_millis().await;
1619            })
1620            .await;
1621
1622            let lines = extract_messages_string(received);
1623            assert_eq!(lines, vec!["second line"]);
1624        }
1625        // Restart server, read files from beginning.
1626        {
1627            let config = file::FileConfig {
1628                include: vec![dir.path().join("*")],
1629                ignore_checkpoints: Some(true),
1630                read_from: ReadFromConfig::Beginning,
1631                ..test_default_file_config(&dir)
1632            };
1633            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1634                sleep_500_millis().await;
1635                writeln!(&mut file, "third line").unwrap();
1636                sleep_500_millis().await;
1637            })
1638            .await;
1639
1640            let lines = extract_messages_string(received);
1641            assert_eq!(
1642                lines,
1643                vec!["zeroth line", "first line", "second line", "third line"]
1644            );
1645        }
1646    }
1647
1648    #[tokio::test]
1649    async fn file_start_position_server_restart_unfinalized() {
1650        let dir = tempdir().unwrap();
1651        let config = file::FileConfig {
1652            include: vec![dir.path().join("*")],
1653            ..test_default_file_config(&dir)
1654        };
1655
1656        let path = dir.path().join("file");
1657        let mut file = File::create(&path).unwrap();
1658        writeln!(&mut file, "the line").unwrap();
1659        sleep_500_millis().await;
1660
1661        // First time server runs it picks up existing lines.
1662        let received = run_file_source(
1663            &config,
1664            false,
1665            Unfinalized,
1666            LogNamespace::Legacy,
1667            sleep_500_millis(),
1668        )
1669        .await;
1670        let lines = extract_messages_string(received);
1671        assert_eq!(lines, vec!["the line"]);
1672
1673        // Restart server, it re-reads file since the events were not acknowledged before shutdown
1674        let received = run_file_source(
1675            &config,
1676            false,
1677            Unfinalized,
1678            LogNamespace::Legacy,
1679            sleep_500_millis(),
1680        )
1681        .await;
1682        let lines = extract_messages_string(received);
1683        assert_eq!(lines, vec!["the line"]);
1684    }
1685
1686    #[tokio::test]
1687    async fn file_duplicate_processing_after_restart() {
1688        let dir = tempdir().unwrap();
1689        let config = file::FileConfig {
1690            include: vec![dir.path().join("*")],
1691            ..test_default_file_config(&dir)
1692        };
1693
1694        let path = dir.path().join("file");
1695        let mut file = File::create(&path).unwrap();
1696
1697        let line_count = 4000;
1698        for i in 0..line_count {
1699            writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1700        }
1701        file.flush().unwrap();
1702        sleep_500_millis().await;
1703
1704        // First time server runs it should pick up a bunch of lines
1705        let received = run_file_source(
1706            &config,
1707            true,
1708            Acks,
1709            LogNamespace::Legacy,
1710            // shutdown signal is sent after this duration
1711            sleep_500_millis(),
1712        )
1713        .await;
1714        let lines = extract_messages_string(received);
1715
1716        // ...but not all the lines; if the first run processed the entire file, we may not hit the
1717        // bug we're testing for, which happens if the finalizer stream exits on shutdown with pending acks
1718        assert!(lines.len() < line_count);
1719
1720        // Restart the server, and it should read the rest without duplicating any
1721        let received = run_file_source(
1722            &config,
1723            true,
1724            Acks,
1725            LogNamespace::Legacy,
1726            sleep(Duration::from_secs(5)),
1727        )
1728        .await;
1729        let lines2 = extract_messages_string(received);
1730
1731        // Between both runs, we should have the expected number of lines
1732        assert_eq!(lines.len() + lines2.len(), line_count);
1733    }
1734
1735    #[tokio::test]
1736    async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1737        file_start_position_server_restart_with_file_rotation(Acks).await
1738    }
1739
1740    #[tokio::test]
1741    async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1742        file_start_position_server_restart_with_file_rotation(NoAcks).await
1743    }
1744
1745    async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1746        let dir = tempdir().unwrap();
1747        let config = file::FileConfig {
1748            include: vec![dir.path().join("*")],
1749            ..test_default_file_config(&dir)
1750        };
1751
1752        let path = dir.path().join("file");
1753        let path_for_old_file = dir.path().join("file.old");
1754        // Run server first time, collect some lines.
1755        {
1756            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1757                let mut file = File::create(&path).unwrap();
1758                sleep_500_millis().await;
1759                writeln!(&mut file, "first line").unwrap();
1760                sleep_500_millis().await;
1761            })
1762            .await;
1763
1764            let lines = extract_messages_string(received);
1765            assert_eq!(lines, vec!["first line"]);
1766        }
1767        // Perform 'file rotation' to archive old lines.
1768        fs::rename(&path, &path_for_old_file).expect("could not rename");
1769        // Restart the server and make sure it does not re-read the old file
1770        // even though it has a new name.
1771        {
1772            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1773                let mut file = File::create(&path).unwrap();
1774                sleep_500_millis().await;
1775                writeln!(&mut file, "second line").unwrap();
1776                sleep_500_millis().await;
1777            })
1778            .await;
1779
1780            let lines = extract_messages_string(received);
1781            assert_eq!(lines, vec!["second line"]);
1782        }
1783    }
1784
1785    #[cfg(unix)] // this test uses unix-specific function `futimes` during test time
1786    #[tokio::test]
1787    async fn file_start_position_ignore_old_files() {
1788        use std::{
1789            os::unix::io::AsRawFd,
1790            time::{Duration, SystemTime},
1791        };
1792
1793        let dir = tempdir().unwrap();
1794        let config = file::FileConfig {
1795            include: vec![dir.path().join("*")],
1796            ignore_older_secs: Some(5),
1797            ..test_default_file_config(&dir)
1798        };
1799
1800        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1801            let before_path = dir.path().join("before");
1802            let mut before_file = File::create(&before_path).unwrap();
1803            let after_path = dir.path().join("after");
1804            let mut after_file = File::create(&after_path).unwrap();
1805
1806            writeln!(&mut before_file, "first line").unwrap(); // first few bytes make up unique file fingerprint
1807            writeln!(&mut after_file, "_first line").unwrap(); //   and therefore need to be non-identical
1808
1809            {
1810                // Set the modified times
1811                let before = SystemTime::now() - Duration::from_secs(8);
1812                let after = SystemTime::now() - Duration::from_secs(2);
1813
1814                let before_time = libc::timeval {
1815                    tv_sec: before
1816                        .duration_since(SystemTime::UNIX_EPOCH)
1817                        .unwrap()
1818                        .as_secs() as _,
1819                    tv_usec: 0,
1820                };
1821                let before_times = [before_time, before_time];
1822
1823                let after_time = libc::timeval {
1824                    tv_sec: after
1825                        .duration_since(SystemTime::UNIX_EPOCH)
1826                        .unwrap()
1827                        .as_secs() as _,
1828                    tv_usec: 0,
1829                };
1830                let after_times = [after_time, after_time];
1831
1832                unsafe {
1833                    libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1834                    libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1835                }
1836            }
1837
1838            sleep_500_millis().await;
1839            writeln!(&mut before_file, "second line").unwrap();
1840            writeln!(&mut after_file, "_second line").unwrap();
1841
1842            sleep_500_millis().await;
1843        })
1844        .await;
1845
1846        let before_lines = received
1847            .iter()
1848            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1849            .map(|event| {
1850                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1851            })
1852            .collect::<Vec<_>>();
1853        let after_lines = received
1854            .iter()
1855            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1856            .map(|event| {
1857                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1858            })
1859            .collect::<Vec<_>>();
1860        assert_eq!(before_lines, vec!["second line"]);
1861        assert_eq!(after_lines, vec!["_first line", "_second line"]);
1862    }
1863
1864    #[tokio::test]
1865    async fn file_max_line_bytes() {
1866        let dir = tempdir().unwrap();
1867        let config = file::FileConfig {
1868            include: vec![dir.path().join("*")],
1869            max_line_bytes: 10,
1870            ..test_default_file_config(&dir)
1871        };
1872
1873        let path = dir.path().join("file");
1874        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1875            let mut file = File::create(&path).unwrap();
1876
1877            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1878
1879            writeln!(&mut file, "short").unwrap();
1880            writeln!(&mut file, "this is too long").unwrap();
1881            writeln!(&mut file, "11 eleven11").unwrap();
1882            let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1883            writeln!(&mut file, "{super_long}").unwrap();
1884            writeln!(&mut file, "exactly 10").unwrap();
1885            writeln!(&mut file, "it can end on a line that's too long").unwrap();
1886
1887            sleep_500_millis().await;
1888            sleep_500_millis().await;
1889
1890            writeln!(&mut file, "and then continue").unwrap();
1891            writeln!(&mut file, "last short").unwrap();
1892
1893            sleep_500_millis().await;
1894            sleep_500_millis().await;
1895        }).await;
1896
1897        let received = extract_messages_value(received);
1898
1899        assert_eq!(
1900            received,
1901            vec!["short".into(), "exactly 10".into(), "last short".into()]
1902        );
1903    }
1904
1905    #[tokio::test]
1906    async fn test_multi_line_aggregation_legacy() {
1907        let dir = tempdir().unwrap();
1908        let config = file::FileConfig {
1909            include: vec![dir.path().join("*")],
1910            message_start_indicator: Some("INFO".into()),
1911            multi_line_timeout: 25, // less than 50 in sleep()
1912            ..test_default_file_config(&dir)
1913        };
1914
1915        let path = dir.path().join("file");
1916        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1917            let mut file = File::create(&path).unwrap();
1918
1919            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1920
1921            writeln!(&mut file, "leftover foo").unwrap();
1922            writeln!(&mut file, "INFO hello").unwrap();
1923            writeln!(&mut file, "INFO goodbye").unwrap();
1924            writeln!(&mut file, "part of goodbye").unwrap();
1925
1926            sleep_500_millis().await;
1927
1928            writeln!(&mut file, "INFO hi again").unwrap();
1929            writeln!(&mut file, "and some more").unwrap();
1930            writeln!(&mut file, "INFO hello").unwrap();
1931
1932            sleep_500_millis().await;
1933
1934            writeln!(&mut file, "too slow").unwrap();
1935            writeln!(&mut file, "INFO doesn't have").unwrap();
1936            writeln!(&mut file, "to be INFO in").unwrap();
1937            writeln!(&mut file, "the middle").unwrap();
1938
1939            sleep_500_millis().await;
1940        })
1941        .await;
1942
1943        let received = extract_messages_value(received);
1944
1945        assert_eq!(
1946            received,
1947            vec![
1948                "leftover foo".into(),
1949                "INFO hello".into(),
1950                "INFO goodbye\npart of goodbye".into(),
1951                "INFO hi again\nand some more".into(),
1952                "INFO hello".into(),
1953                "too slow".into(),
1954                "INFO doesn't have".into(),
1955                "to be INFO in\nthe middle".into(),
1956            ]
1957        );
1958    }
1959
1960    #[tokio::test]
1961    async fn test_multi_line_aggregation() {
1962        let dir = tempdir().unwrap();
1963        let config = file::FileConfig {
1964            include: vec![dir.path().join("*")],
1965            multiline: Some(MultilineConfig {
1966                start_pattern: "INFO".to_owned(),
1967                condition_pattern: "INFO".to_owned(),
1968                mode: line_agg::Mode::HaltBefore,
1969                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
1970            }),
1971            ..test_default_file_config(&dir)
1972        };
1973
1974        let path = dir.path().join("file");
1975        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1976            let mut file = File::create(&path).unwrap();
1977
1978            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1979
1980            writeln!(&mut file, "leftover foo").unwrap();
1981            writeln!(&mut file, "INFO hello").unwrap();
1982            writeln!(&mut file, "INFO goodbye").unwrap();
1983            writeln!(&mut file, "part of goodbye").unwrap();
1984
1985            sleep_500_millis().await;
1986
1987            writeln!(&mut file, "INFO hi again").unwrap();
1988            writeln!(&mut file, "and some more").unwrap();
1989            writeln!(&mut file, "INFO hello").unwrap();
1990
1991            sleep_500_millis().await;
1992
1993            writeln!(&mut file, "too slow").unwrap();
1994            writeln!(&mut file, "INFO doesn't have").unwrap();
1995            writeln!(&mut file, "to be INFO in").unwrap();
1996            writeln!(&mut file, "the middle").unwrap();
1997
1998            sleep_500_millis().await;
1999        })
2000        .await;
2001
2002        let received = extract_messages_value(received);
2003
2004        assert_eq!(
2005            received,
2006            vec![
2007                "leftover foo".into(),
2008                "INFO hello".into(),
2009                "INFO goodbye\npart of goodbye".into(),
2010                "INFO hi again\nand some more".into(),
2011                "INFO hello".into(),
2012                "too slow".into(),
2013                "INFO doesn't have".into(),
2014                "to be INFO in\nthe middle".into(),
2015            ]
2016        );
2017    }
2018
2019    #[tokio::test]
2020    async fn test_multi_line_checkpointing() {
2021        let dir = tempdir().unwrap();
2022        let config = file::FileConfig {
2023            include: vec![dir.path().join("*")],
2024            offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2025            multiline: Some(MultilineConfig {
2026                start_pattern: "INFO".to_owned(),
2027                condition_pattern: "INFO".to_owned(),
2028                mode: line_agg::Mode::HaltBefore,
2029                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
2030            }),
2031            ..test_default_file_config(&dir)
2032        };
2033
2034        let path = dir.path().join("file");
2035        let mut file = File::create(&path).unwrap();
2036
2037        writeln!(&mut file, "INFO hello").unwrap();
2038        writeln!(&mut file, "part of hello").unwrap();
2039
2040        // Read and aggregate existing lines
2041        let received = run_file_source(
2042            &config,
2043            false,
2044            Acks,
2045            LogNamespace::Legacy,
2046            sleep_500_millis(),
2047        )
2048        .await;
2049
2050        assert_eq!(received[0].as_log()["offset"], 0.into());
2051
2052        let lines = extract_messages_string(received);
2053        assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2054
2055        // After restart, we should not see any part of the previously aggregated lines
2056        let received_after_restart =
2057            run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2058                writeln!(&mut file, "INFO goodbye").unwrap();
2059            })
2060            .await;
2061        assert_eq!(
2062            received_after_restart[0].as_log()["offset"],
2063            (lines[0].len() + 1).into()
2064        );
2065        let lines = extract_messages_string(received_after_restart);
2066        assert_eq!(lines, vec!["INFO goodbye"]);
2067    }
2068
2069    #[tokio::test]
2070    async fn test_fair_reads() {
2071        let dir = tempdir().unwrap();
2072        let config = file::FileConfig {
2073            include: vec![dir.path().join("*")],
2074            max_read_bytes: 1,
2075            oldest_first: false,
2076            ..test_default_file_config(&dir)
2077        };
2078
2079        let older_path = dir.path().join("z_older_file");
2080        let mut older = File::create(&older_path).unwrap();
2081
2082        sleep_500_millis().await;
2083
2084        let newer_path = dir.path().join("a_newer_file");
2085        let mut newer = File::create(&newer_path).unwrap();
2086
2087        writeln!(&mut older, "hello i am the old file").unwrap();
2088        writeln!(&mut older, "i have been around a while").unwrap();
2089        writeln!(&mut older, "you can read newer files at the same time").unwrap();
2090
2091        writeln!(&mut newer, "and i am the new file").unwrap();
2092        writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2093        writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2094
2095        sleep_500_millis().await;
2096
2097        let received = run_file_source(
2098            &config,
2099            false,
2100            NoAcks,
2101            LogNamespace::Legacy,
2102            sleep_500_millis(),
2103        )
2104        .await;
2105
2106        let received = extract_messages_value(received);
2107
2108        assert_eq!(
2109            received,
2110            vec![
2111                "hello i am the old file".into(),
2112                "and i am the new file".into(),
2113                "i have been around a while".into(),
2114                "this should be interleaved with the old one".into(),
2115                "you can read newer files at the same time".into(),
2116                "which is fine because we want fairness".into(),
2117            ]
2118        );
2119    }
2120
2121    #[tokio::test]
2122    async fn test_oldest_first() {
2123        let dir = tempdir().unwrap();
2124        let config = file::FileConfig {
2125            include: vec![dir.path().join("*")],
2126            max_read_bytes: 1,
2127            oldest_first: true,
2128            ..test_default_file_config(&dir)
2129        };
2130
2131        let older_path = dir.path().join("z_older_file");
2132        let mut older = File::create(&older_path).unwrap();
2133
2134        sleep_500_millis().await;
2135
2136        let newer_path = dir.path().join("a_newer_file");
2137        let mut newer = File::create(&newer_path).unwrap();
2138
2139        writeln!(&mut older, "hello i am the old file").unwrap();
2140        writeln!(&mut older, "i have been around a while").unwrap();
2141        writeln!(&mut older, "you should definitely read all of me first").unwrap();
2142
2143        writeln!(&mut newer, "i'm new").unwrap();
2144        writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2145        writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2146
2147        sleep_500_millis().await;
2148
2149        let received = run_file_source(
2150            &config,
2151            false,
2152            NoAcks,
2153            LogNamespace::Legacy,
2154            sleep_500_millis(),
2155        )
2156        .await;
2157
2158        let received = extract_messages_value(received);
2159
2160        assert_eq!(
2161            received,
2162            vec![
2163                "hello i am the old file".into(),
2164                "i have been around a while".into(),
2165                "you should definitely read all of me first".into(),
2166                "i'm new".into(),
2167                "hopefully you read all the old stuff first".into(),
2168                "because otherwise i'm not going to make sense".into(),
2169            ]
2170        );
2171    }
2172
2173    // Ignoring on mac: https://github.com/vectordotdev/vector/issues/8373
2174    #[cfg(not(target_os = "macos"))]
2175    #[tokio::test]
2176    async fn test_split_reads() {
2177        let dir = tempdir().unwrap();
2178        let config = file::FileConfig {
2179            include: vec![dir.path().join("*")],
2180            max_read_bytes: 1,
2181            ..test_default_file_config(&dir)
2182        };
2183
2184        let path = dir.path().join("file");
2185        let mut file = File::create(&path).unwrap();
2186
2187        writeln!(&mut file, "hello i am a normal line").unwrap();
2188
2189        sleep_500_millis().await;
2190
2191        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2192            sleep_500_millis().await;
2193
2194            write!(&mut file, "i am not a full line").unwrap();
2195
2196            // Longer than the EOF timeout
2197            sleep_500_millis().await;
2198
2199            writeln!(&mut file, " until now").unwrap();
2200
2201            sleep_500_millis().await;
2202        })
2203        .await;
2204
2205        let received = extract_messages_value(received);
2206
2207        assert_eq!(
2208            received,
2209            vec![
2210                "hello i am a normal line".into(),
2211                "i am not a full line until now".into(),
2212            ]
2213        );
2214    }
2215
2216    #[tokio::test]
2217    async fn test_gzipped_file() {
2218        let dir = tempdir().unwrap();
2219        let config = file::FileConfig {
2220            include: vec![PathBuf::from("tests/data/gzipped.log")],
2221            // TODO: remove this once files are fingerprinted after decompression
2222            //
2223            // Currently, this needs to be smaller than the total size of the compressed file
2224            // because the fingerprinter tries to read until a newline, which it's not going to see
2225            // in the compressed data, or this number of bytes. If it hits EOF before that, it
2226            // can't return a fingerprint because the value would change once more data is written.
2227            max_line_bytes: 100,
2228            ..test_default_file_config(&dir)
2229        };
2230
2231        let received = run_file_source(
2232            &config,
2233            false,
2234            NoAcks,
2235            LogNamespace::Legacy,
2236            sleep_500_millis(),
2237        )
2238        .await;
2239
2240        let received = extract_messages_value(received);
2241
2242        assert_eq!(
2243            received,
2244            vec![
2245                "this is a simple file".into(),
2246                "i have been compressed".into(),
2247                "in order to make me smaller".into(),
2248                "but you can still read me".into(),
2249                "hooray".into(),
2250            ]
2251        );
2252    }
2253
2254    #[tokio::test]
2255    async fn test_non_utf8_encoded_file() {
2256        let dir = tempdir().unwrap();
2257        let config = file::FileConfig {
2258            include: vec![PathBuf::from("tests/data/utf-16le.log")],
2259            encoding: Some(EncodingConfig { charset: UTF_16LE }),
2260            ..test_default_file_config(&dir)
2261        };
2262
2263        let received = run_file_source(
2264            &config,
2265            false,
2266            NoAcks,
2267            LogNamespace::Legacy,
2268            sleep_500_millis(),
2269        )
2270        .await;
2271
2272        let received = extract_messages_value(received);
2273
2274        assert_eq!(
2275            received,
2276            vec![
2277                "hello i am a file".into(),
2278                "i can unicode".into(),
2279                "but i do so in 16 bits".into(),
2280                "and when i byte".into(),
2281                "i become little-endian".into(),
2282            ]
2283        );
2284    }
2285
2286    #[tokio::test]
2287    async fn test_non_default_line_delimiter() {
2288        let dir = tempdir().unwrap();
2289        let config = file::FileConfig {
2290            include: vec![dir.path().join("*")],
2291            line_delimiter: "\r\n".to_string(),
2292            ..test_default_file_config(&dir)
2293        };
2294
2295        let path = dir.path().join("file");
2296        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2297            let mut file = File::create(&path).unwrap();
2298
2299            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
2300
2301            write!(&mut file, "hello i am a line\r\n").unwrap();
2302            write!(&mut file, "and i am too\r\n").unwrap();
2303            write!(&mut file, "CRLF is how we end\r\n").unwrap();
2304            write!(&mut file, "please treat us well\r\n").unwrap();
2305
2306            sleep_500_millis().await;
2307        })
2308        .await;
2309
2310        let received = extract_messages_value(received);
2311
2312        assert_eq!(
2313            received,
2314            vec![
2315                "hello i am a line".into(),
2316                "and i am too".into(),
2317                "CRLF is how we end".into(),
2318                "please treat us well".into()
2319            ]
2320        );
2321    }
2322
2323    #[tokio::test]
2324    async fn remove_file() {
2325        let n = 5;
2326        let remove_after_secs = 1;
2327
2328        let dir = tempdir().unwrap();
2329        let config = file::FileConfig {
2330            include: vec![dir.path().join("*")],
2331            remove_after_secs: Some(remove_after_secs),
2332            ..test_default_file_config(&dir)
2333        };
2334
2335        let path = dir.path().join("file");
2336        let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2337            let mut file = File::create(&path).unwrap();
2338
2339            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
2340
2341            for i in 0..n {
2342                writeln!(&mut file, "{i}").unwrap();
2343            }
2344            drop(file);
2345
2346            for _ in 0..10 {
2347                // Wait for remove grace period to end.
2348                sleep(Duration::from_secs(remove_after_secs + 1)).await;
2349
2350                if File::open(&path).is_err() {
2351                    break;
2352                }
2353            }
2354        })
2355        .await;
2356
2357        assert_eq!(received.len(), n);
2358
2359        match File::open(&path) {
2360            Ok(_) => panic!("File wasn't removed"),
2361            Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2362        }
2363    }
2364
2365    #[derive(Clone, Copy, Eq, PartialEq)]
2366    enum AckingMode {
2367        NoAcks,      // No acknowledgement handling and no finalization
2368        Unfinalized, // Acknowledgement handling but no finalization
2369        Acks,        // Full acknowledgements and proper finalization
2370    }
2371    use AckingMode::*;
2372    use vector_lib::lookup::OwnedTargetPath;
2373
2374    async fn run_file_source(
2375        config: &FileConfig,
2376        wait_shutdown: bool,
2377        acking_mode: AckingMode,
2378        log_namespace: LogNamespace,
2379        inner: impl Future<Output = ()>,
2380    ) -> Vec<Event> {
2381        assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2382            let (tx, rx) = if acking_mode == Acks {
2383                let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2384                (tx, rx.boxed())
2385            } else {
2386                let (tx, rx) = SourceSender::new_test();
2387                (tx, rx.boxed())
2388            };
2389
2390            let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2391            let data_dir = config.data_dir.clone().unwrap();
2392            let acks = !matches!(acking_mode, NoAcks);
2393
2394            tokio::spawn(file::file_source(
2395                config,
2396                data_dir,
2397                shutdown,
2398                tx,
2399                acks,
2400                log_namespace,
2401            ));
2402
2403            inner.await;
2404
2405            drop(trigger_shutdown);
2406
2407            let result = if acking_mode == Unfinalized {
2408                rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2409                    .collect::<Vec<_>>()
2410                    .await
2411            } else {
2412                timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2413                    .await
2414                    .expect(
2415                        "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2416                    )
2417            };
2418            if wait_shutdown {
2419                shutdown_done.await;
2420            }
2421
2422            result
2423        })
2424        .await
2425    }
2426
2427    fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2428        received
2429            .into_iter()
2430            .map(Event::into_log)
2431            .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2432            .collect()
2433    }
2434
2435    fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2436        received
2437            .into_iter()
2438            .map(Event::into_log)
2439            .map(|log| log.get_message().unwrap().clone())
2440            .collect()
2441    }
2442}