vector/sources/
file.rs

1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::{BytesDeserializer, BytesDeserializerConfig},
14    config::{LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    file_source::{
17        file_server::{FileServer, Line, calculate_ignore_before},
18        paths_provider::{Glob, MatchOptions},
19    },
20    file_source_common::{
21        Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22    },
23    finalizer::OrderedFinalizer,
24    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30    SourceSender,
31    config::{
32        DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33        log_schema,
34    },
35    encoding_transcode::{Decoder, Encoder},
36    event::{BatchNotifier, BatchStatus, LogEvent},
37    internal_events::{
38        FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39        FileSourceInternalEventsEmitter, StreamClosedError,
40    },
41    line_agg::{self, LineAgg},
42    serde::bool_or_struct,
43    shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48    #[snafu(display(
49        "message_start_indicator {:?} is not a valid regex: {}",
50        indicator,
51        source
52    ))]
53    InvalidMessageStartIndicator {
54        indicator: String,
55        source: regex::Error,
56    },
57}
58
59/// Configuration for the `file` source.
60#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65    /// Array of file patterns to include. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
66    #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67    pub include: Vec<PathBuf>,
68
69    /// Array of file patterns to exclude. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
70    ///
71    /// Takes precedence over the `include` option. Note: The `exclude` patterns are applied _after_ the attempt to glob everything
72    /// in `include`. This means that all files are first matched by `include` and then filtered by the `exclude`
73    /// patterns. This can be impactful if `include` contains directories with contents that are not accessible.
74    #[serde(default)]
75    #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76    pub exclude: Vec<PathBuf>,
77
78    /// Overrides the name of the log field used to add the file path to each event.
79    ///
80    /// The value is the full path to the file where the event was read message.
81    ///
82    /// Set to `""` to suppress this key.
83    #[serde(default = "default_file_key")]
84    #[configurable(metadata(docs::examples = "path"))]
85    pub file_key: OptionalValuePath,
86
87    /// Whether or not to start reading from the beginning of a new file.
88    #[configurable(
89        deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90    )]
91    #[configurable(metadata(docs::hidden))]
92    #[serde(default)]
93    pub start_at_beginning: Option<bool>,
94
95    /// Whether or not to ignore existing checkpoints when determining where to start reading a file.
96    ///
97    /// Checkpoints are still written normally.
98    #[serde(default)]
99    pub ignore_checkpoints: Option<bool>,
100
101    #[serde(default = "default_read_from")]
102    #[configurable(derived)]
103    pub read_from: ReadFromConfig,
104
105    /// Ignore files with a data modification date older than the specified number of seconds.
106    #[serde(alias = "ignore_older", default)]
107    #[configurable(metadata(docs::type_unit = "seconds"))]
108    #[configurable(metadata(docs::examples = 600))]
109    #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110    pub ignore_older_secs: Option<u64>,
111
112    /// The maximum size of a line before it is discarded.
113    ///
114    /// This protects against malformed lines or tailing incorrect files.
115    #[serde(default = "default_max_line_bytes")]
116    #[configurable(metadata(docs::type_unit = "bytes"))]
117    pub max_line_bytes: usize,
118
119    /// Overrides the name of the log field used to add the current hostname to each event.
120    ///
121    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
122    ///
123    /// Set to `""` to suppress this key.
124    ///
125    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
126    #[configurable(metadata(docs::examples = "hostname"))]
127    pub host_key: Option<OptionalValuePath>,
128
129    /// The directory used to persist file checkpoint positions.
130    ///
131    /// By default, the [global `data_dir` option][global_data_dir] is used.
132    /// Make sure the running user has write permissions to this directory.
133    ///
134    /// If this directory is specified, then Vector will attempt to create it.
135    ///
136    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
137    #[serde(default)]
138    #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139    #[configurable(metadata(docs::human_name = "Data Directory"))]
140    pub data_dir: Option<PathBuf>,
141
142    /// Enables adding the file offset to each event and sets the name of the log field used.
143    ///
144    /// The value is the byte offset of the start of the line within the file.
145    ///
146    /// Off by default, the offset is only added to the event if this is set.
147    #[serde(default)]
148    #[configurable(metadata(docs::examples = "offset"))]
149    pub offset_key: Option<OptionalValuePath>,
150
151    /// The delay between file discovery calls.
152    ///
153    /// This controls the interval at which files are searched. A higher value results in greater
154    /// chances of some short-lived files being missed between searches, but a lower value increases
155    /// the performance impact of file discovery.
156    #[serde(
157        alias = "glob_minimum_cooldown",
158        default = "default_glob_minimum_cooldown_ms"
159    )]
160    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161    #[configurable(metadata(docs::type_unit = "milliseconds"))]
162    #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163    pub glob_minimum_cooldown_ms: Duration,
164
165    #[configurable(derived)]
166    #[serde(alias = "fingerprinting", default)]
167    fingerprint: FingerprintConfig,
168
169    /// Ignore missing files when fingerprinting.
170    ///
171    /// This may be useful when used with source directories containing dangling symlinks.
172    #[serde(default)]
173    pub ignore_not_found: bool,
174
175    /// String value used to identify the start of a multi-line message.
176    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177    #[configurable(metadata(docs::hidden))]
178    #[serde(default)]
179    pub message_start_indicator: Option<String>,
180
181    /// How long to wait for more data when aggregating a multi-line message, in milliseconds.
182    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183    #[configurable(metadata(docs::hidden))]
184    #[serde(default = "default_multi_line_timeout")]
185    pub multi_line_timeout: u64,
186
187    /// Multiline aggregation configuration.
188    ///
189    /// If not specified, multiline aggregation is disabled.
190    #[configurable(derived)]
191    #[serde(default)]
192    pub multiline: Option<MultilineConfig>,
193
194    /// Max amount of bytes to read from a single file before switching over to the next file.
195    /// **Note:** This does not apply when `oldest_first` is `true`.
196    ///
197    /// This allows distributing the reads more or less evenly across
198    /// the files.
199    #[serde(default = "default_max_read_bytes")]
200    #[configurable(metadata(docs::type_unit = "bytes"))]
201    pub max_read_bytes: usize,
202
203    /// Instead of balancing read capacity fairly across all watched files, prioritize draining the oldest files before moving on to read data from more recent files.
204    #[serde(default)]
205    pub oldest_first: bool,
206
207    /// After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
208    ///
209    /// If not specified, files are not removed.
210    #[serde(alias = "remove_after", default)]
211    #[configurable(metadata(docs::type_unit = "seconds"))]
212    #[configurable(metadata(docs::examples = 0))]
213    #[configurable(metadata(docs::examples = 5))]
214    #[configurable(metadata(docs::examples = 60))]
215    #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216    pub remove_after_secs: Option<u64>,
217
218    /// String sequence used to separate one file line from another.
219    #[serde(default = "default_line_delimiter")]
220    #[configurable(metadata(docs::examples = "\r\n"))]
221    pub line_delimiter: String,
222
223    #[configurable(derived)]
224    #[serde(default)]
225    pub encoding: Option<EncodingConfig>,
226
227    #[configurable(derived)]
228    #[serde(default, deserialize_with = "bool_or_struct")]
229    acknowledgements: SourceAcknowledgementsConfig,
230
231    /// The namespace to use for logs. This overrides the global setting.
232    #[configurable(metadata(docs::hidden))]
233    #[serde(default)]
234    log_namespace: Option<bool>,
235
236    #[configurable(derived)]
237    #[serde(default)]
238    internal_metrics: FileInternalMetricsConfig,
239
240    /// How long to keep an open handle to a rotated log file.
241    /// The default value represents "no limit"
242    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243    #[configurable(metadata(docs::type_unit = "seconds"))]
244    #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245    pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249    bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253    OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257    ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261    Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265    1000
266} // deprecated
267
268const fn default_max_read_bytes() -> usize {
269    2048
270}
271
272fn default_line_delimiter() -> String {
273    "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277    Duration::from_secs(u64::MAX / 2)
278}
279
280/// Configuration for how files should be identified.
281///
282/// This is important for `checkpointing` when file rotation is used.
283#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287    docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290    /// Read lines from the beginning of the file and compute a checksum over them.
291    Checksum {
292        /// The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
293        /// If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
294        /// gzip is supported at this time.
295        ///
296        /// This can be helpful if all files share a common header that should be skipped.
297        #[serde(default = "default_ignored_header_bytes")]
298        #[configurable(metadata(docs::type_unit = "bytes"))]
299        ignored_header_bytes: usize,
300
301        /// The number of lines to read for generating the checksum.
302        ///
303        /// The number of lines are determined from the uncompressed content if the file is compressed. Only
304        /// gzip is supported at this time.
305        ///
306        /// If the file has less than this amount of lines, it won’t be read at all.
307        #[serde(default = "default_lines")]
308        #[configurable(metadata(docs::type_unit = "lines"))]
309        lines: usize,
310    },
311
312    /// Use the [device and inode][inode] as the identifier.
313    ///
314    /// [inode]: https://en.wikipedia.org/wiki/Inode
315    #[serde(rename = "device_and_inode")]
316    DevInode,
317}
318
319impl Default for FingerprintConfig {
320    fn default() -> Self {
321        Self::Checksum {
322            ignored_header_bytes: 0,
323            lines: default_lines(),
324        }
325    }
326}
327
328const fn default_ignored_header_bytes() -> usize {
329    0
330}
331
332const fn default_lines() -> usize {
333    1
334}
335
336impl From<FingerprintConfig> for FingerprintStrategy {
337    fn from(config: FingerprintConfig) -> FingerprintStrategy {
338        match config {
339            FingerprintConfig::Checksum {
340                ignored_header_bytes,
341                lines,
342            } => FingerprintStrategy::FirstLinesChecksum {
343                ignored_header_bytes,
344                lines,
345            },
346            FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
347        }
348    }
349}
350
351#[derive(Debug)]
352pub(crate) struct FinalizerEntry {
353    pub(crate) file_id: FileFingerprint,
354    pub(crate) offset: u64,
355}
356
357impl Default for FileConfig {
358    fn default() -> Self {
359        Self {
360            include: vec![PathBuf::from("/var/log/**/*.log")],
361            exclude: vec![],
362            file_key: default_file_key(),
363            start_at_beginning: None,
364            ignore_checkpoints: None,
365            read_from: default_read_from(),
366            ignore_older_secs: None,
367            max_line_bytes: default_max_line_bytes(),
368            fingerprint: FingerprintConfig::default(),
369            ignore_not_found: false,
370            host_key: None,
371            offset_key: None,
372            data_dir: None,
373            glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
374            message_start_indicator: None,
375            multi_line_timeout: default_multi_line_timeout(), // millis
376            multiline: None,
377            max_read_bytes: default_max_read_bytes(),
378            oldest_first: false,
379            remove_after_secs: None,
380            line_delimiter: default_line_delimiter(),
381            encoding: None,
382            acknowledgements: Default::default(),
383            log_namespace: None,
384            internal_metrics: Default::default(),
385            rotate_wait: default_rotate_wait(),
386        }
387    }
388}
389
390impl_generate_config_from_default!(FileConfig);
391
392#[async_trait::async_trait]
393#[typetag::serde(name = "file")]
394impl SourceConfig for FileConfig {
395    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
396        // add the source name as a subdir, so that multiple sources can
397        // operate within the same given data_dir (e.g. the global one)
398        // without the file servers' checkpointers interfering with each
399        // other
400        let data_dir = cx
401            .globals
402            // source are only global, name can be used for subdir
403            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
404
405        // Clippy rule, because async_trait?
406        #[allow(clippy::suspicious_else_formatting)]
407        {
408            if let Some(ref config) = self.multiline {
409                let _: line_agg::Config = config.try_into()?;
410            }
411
412            if let Some(ref indicator) = self.message_start_indicator {
413                Regex::new(indicator)
414                    .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
415            }
416        }
417
418        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
419
420        let log_namespace = cx.log_namespace(self.log_namespace);
421
422        Ok(file_source(
423            self,
424            data_dir,
425            cx.shutdown,
426            cx.out,
427            acknowledgements,
428            log_namespace,
429        ))
430    }
431
432    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
433        let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
434        let host_key = self
435            .host_key
436            .clone()
437            .unwrap_or(log_schema().host_key().cloned().into())
438            .path
439            .map(LegacyKey::Overwrite);
440
441        let offset_key = self
442            .offset_key
443            .clone()
444            .and_then(|k| k.path)
445            .map(LegacyKey::Overwrite);
446
447        let schema_definition = BytesDeserializerConfig
448            .schema_definition(global_log_namespace.merge(self.log_namespace))
449            .with_standard_vector_source_metadata()
450            .with_source_metadata(
451                Self::NAME,
452                host_key,
453                &owned_value_path!("host"),
454                Kind::bytes().or_undefined(),
455                Some("host"),
456            )
457            .with_source_metadata(
458                Self::NAME,
459                offset_key,
460                &owned_value_path!("offset"),
461                Kind::integer(),
462                None,
463            )
464            .with_source_metadata(
465                Self::NAME,
466                file_key,
467                &owned_value_path!("path"),
468                Kind::bytes(),
469                None,
470            );
471
472        vec![SourceOutput::new_maybe_logs(
473            DataType::Log,
474            schema_definition,
475        )]
476    }
477
478    fn can_acknowledge(&self) -> bool {
479        true
480    }
481}
482
483pub fn file_source(
484    config: &FileConfig,
485    data_dir: PathBuf,
486    shutdown: ShutdownSignal,
487    mut out: SourceSender,
488    acknowledgements: bool,
489    log_namespace: LogNamespace,
490) -> super::Source {
491    // the include option must be specified but also must contain at least one entry.
492    if config.include.is_empty() {
493        error!(
494            message = "`include` configuration option must contain at least one file pattern.",
495            internal_log_rate_limit = false
496        );
497        return Box::pin(future::ready(Err(())));
498    }
499
500    let exclude_patterns = config
501        .exclude
502        .iter()
503        .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
504        .collect::<Vec<PathBuf>>();
505    let ignore_before = calculate_ignore_before(config.ignore_older_secs);
506    let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
507    let (ignore_checkpoints, read_from) = reconcile_position_options(
508        config.start_at_beginning,
509        config.ignore_checkpoints,
510        Some(config.read_from),
511    );
512
513    let emitter = FileSourceInternalEventsEmitter {
514        include_file_metric_tag: config.internal_metrics.include_file_tag,
515    };
516
517    let paths_provider = Glob::new(
518        &config.include,
519        &exclude_patterns,
520        MatchOptions::default(),
521        emitter.clone(),
522    )
523    .expect("invalid glob patterns");
524
525    let encoding_charset = config.encoding.clone().map(|e| e.charset);
526
527    // if file encoding is specified, need to convert the line delimiter (present as utf8)
528    // to the specified encoding, so that delimiter-based line splitting can work properly
529    let line_delimiter_as_bytes = match encoding_charset {
530        Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
531        None => Bytes::from(config.line_delimiter.clone()),
532    };
533
534    let checkpointer = Checkpointer::new(&data_dir);
535    let strategy = config.fingerprint.clone().into();
536
537    let file_server = FileServer {
538        paths_provider,
539        max_read_bytes: config.max_read_bytes,
540        ignore_checkpoints,
541        read_from,
542        ignore_before,
543        max_line_bytes: config.max_line_bytes,
544        line_delimiter: line_delimiter_as_bytes,
545        data_dir,
546        glob_minimum_cooldown,
547        fingerprinter: Fingerprinter::new(strategy, config.max_line_bytes, config.ignore_not_found),
548        oldest_first: config.oldest_first,
549        remove_after: config.remove_after_secs.map(Duration::from_secs),
550        emitter,
551        rotate_wait: config.rotate_wait,
552    };
553
554    let event_metadata = EventMetadata {
555        host_key: config
556            .host_key
557            .clone()
558            .unwrap_or(log_schema().host_key().cloned().into())
559            .path,
560        hostname: crate::get_hostname().ok(),
561        file_key: config.file_key.clone().path,
562        offset_key: config.offset_key.clone().and_then(|k| k.path),
563    };
564
565    let include = config.include.clone();
566    let exclude = config.exclude.clone();
567    let multiline_config = config.multiline.clone();
568    let message_start_indicator = config.message_start_indicator.clone();
569    let multi_line_timeout = config.multi_line_timeout;
570
571    let (finalizer, shutdown_checkpointer) = if acknowledgements {
572        // The shutdown sent in to the finalizer is the global
573        // shutdown handle used to tell it to stop accepting new batch
574        // statuses and just wait for the remaining acks to come in.
575        let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
576
577        // We set up a separate shutdown signal to tie together the
578        // finalizer and the checkpoint writer task in the file
579        // server, to make it continue to write out updated
580        // checkpoints until all the acks have come in.
581        let (send_shutdown, shutdown2) = oneshot::channel::<()>();
582        let checkpoints = checkpointer.view();
583        tokio::spawn(async move {
584            while let Some((status, entry)) = ack_stream.next().await {
585                if status == BatchStatus::Delivered {
586                    checkpoints.update(entry.file_id, entry.offset);
587                }
588            }
589            send_shutdown.send(())
590        });
591        (Some(finalizer), shutdown2.map(|_| ()).boxed())
592    } else {
593        // When not dealing with end-to-end acknowledgements, just
594        // clone the global shutdown to stop the checkpoint writer.
595        (None, shutdown.clone().map(|_| ()).boxed())
596    };
597
598    let checkpoints = checkpointer.view();
599    let include_file_metric_tag = config.internal_metrics.include_file_tag;
600    Box::pin(async move {
601        info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
602
603        let mut encoding_decoder = encoding_charset.map(Decoder::new);
604
605        // sizing here is just a guess
606        let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
607        let rx = rx
608            .map(futures::stream::iter)
609            .flatten()
610            .map(move |mut line| {
611                emit!(FileBytesReceived {
612                    byte_size: line.text.len(),
613                    file: &line.filename,
614                    include_file_metric_tag,
615                });
616                // transcode each line from the file's encoding charset to utf8
617                line.text = match encoding_decoder.as_mut() {
618                    Some(d) => d.decode_to_utf8(line.text),
619                    None => line.text,
620                };
621                line
622            });
623
624        let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
625            if let Some(ref multiline_config) = multiline_config {
626                wrap_with_line_agg(
627                    rx,
628                    multiline_config.try_into().unwrap(), // validated in build
629                )
630            } else if let Some(msi) = message_start_indicator {
631                wrap_with_line_agg(
632                    rx,
633                    line_agg::Config::for_legacy(
634                        Regex::new(&msi).unwrap(), // validated in build
635                        multi_line_timeout,
636                    ),
637                )
638            } else {
639                Box::new(rx)
640            };
641
642        // Once file server ends this will run until it has finished processing remaining
643        // logs in the queue.
644        let span = Span::current();
645        let mut messages = messages.map(move |line| {
646            let mut event = create_event(
647                line.text,
648                line.start_offset,
649                &line.filename,
650                &event_metadata,
651                log_namespace,
652                include_file_metric_tag,
653            );
654
655            if let Some(finalizer) = &finalizer {
656                let (batch, receiver) = BatchNotifier::new_with_receiver();
657                event = event.with_batch_notifier(&batch);
658                let entry = FinalizerEntry {
659                    file_id: line.file_id,
660                    offset: line.end_offset,
661                };
662                // checkpoints.update will be called from ack_stream's thread
663                finalizer.add(entry, receiver);
664            } else {
665                checkpoints.update(line.file_id, line.end_offset);
666            }
667            event
668        });
669        tokio::spawn(async move {
670            match out
671                .send_event_stream(&mut messages)
672                .instrument(span.or_current())
673                .await
674            {
675                Ok(()) => {
676                    debug!("Finished sending.");
677                }
678                Err(_) => {
679                    let (count, _) = messages.size_hint();
680                    emit!(StreamClosedError { count });
681                }
682            }
683        });
684
685        let span = info_span!("file_server");
686        tokio::task::spawn_blocking(move || {
687            let _enter = span.enter();
688            let rt = tokio::runtime::Handle::current();
689            let result =
690                rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
691            emit!(FileOpen { count: 0 });
692            // Panic if we encounter any error originating from the file server.
693            // We're at the `spawn_blocking` call, the panic will be caught and
694            // passed to the `JoinHandle` error, similar to the usual threads.
695            result.expect("file server exited with an error");
696        })
697        .map_err(|error| error!(message="File server unexpectedly stopped.", %error, internal_log_rate_limit = false))
698        .await
699    })
700}
701
702/// Emit deprecation warning if the old option is used, and take it into account when determining
703/// defaults. Any of the newer options will override it when set directly.
704fn reconcile_position_options(
705    start_at_beginning: Option<bool>,
706    ignore_checkpoints: Option<bool>,
707    read_from: Option<ReadFromConfig>,
708) -> (bool, ReadFrom) {
709    if start_at_beginning.is_some() {
710        warn!(
711            message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
712        )
713    }
714
715    match start_at_beginning {
716        Some(true) => (
717            ignore_checkpoints.unwrap_or(true),
718            read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
719        ),
720        _ => (
721            ignore_checkpoints.unwrap_or(false),
722            read_from.map(Into::into).unwrap_or_default(),
723        ),
724    }
725}
726
727fn wrap_with_line_agg(
728    rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
729    config: line_agg::Config,
730) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
731    let logic = line_agg::Logic::new(config);
732    Box::new(
733        LineAgg::new(
734            rx.map(|line| {
735                (
736                    line.filename,
737                    line.text,
738                    (line.file_id, line.start_offset, line.end_offset),
739                )
740            }),
741            logic,
742        )
743        .map(
744            |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
745                text,
746                filename,
747                file_id,
748                start_offset,
749                end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
750                    lastline_end_offset
751                }),
752            },
753        ),
754    )
755}
756
757struct EventMetadata {
758    host_key: Option<OwnedValuePath>,
759    hostname: Option<String>,
760    file_key: Option<OwnedValuePath>,
761    offset_key: Option<OwnedValuePath>,
762}
763
764fn create_event(
765    line: Bytes,
766    offset: u64,
767    file: &str,
768    meta: &EventMetadata,
769    log_namespace: LogNamespace,
770    include_file_metric_tag: bool,
771) -> LogEvent {
772    let deserializer = BytesDeserializer;
773    let mut event = deserializer.parse_single(line, log_namespace);
774
775    log_namespace.insert_vector_metadata(
776        &mut event,
777        log_schema().source_type_key(),
778        path!("source_type"),
779        Bytes::from_static(FileConfig::NAME.as_bytes()),
780    );
781    log_namespace.insert_vector_metadata(
782        &mut event,
783        log_schema().timestamp_key(),
784        path!("ingest_timestamp"),
785        Utc::now(),
786    );
787
788    let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
789    // `meta.host_key` is already `unwrap_or_else`ed so we can just pass it in.
790    if let Some(hostname) = &meta.hostname {
791        log_namespace.insert_source_metadata(
792            FileConfig::NAME,
793            &mut event,
794            legacy_host_key,
795            path!("host"),
796            hostname.clone(),
797        );
798    }
799
800    let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
801    log_namespace.insert_source_metadata(
802        FileConfig::NAME,
803        &mut event,
804        legacy_offset_key,
805        path!("offset"),
806        offset,
807    );
808
809    let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
810    log_namespace.insert_source_metadata(
811        FileConfig::NAME,
812        &mut event,
813        legacy_file_key,
814        path!("path"),
815        file,
816    );
817
818    emit!(FileEventsReceived {
819        count: 1,
820        file,
821        byte_size: event.estimated_json_encoded_size_of(),
822        include_file_metric_tag,
823    });
824
825    event
826}
827
828#[cfg(test)]
829mod tests {
830    use std::{
831        collections::HashSet,
832        fs::{self, File},
833        future::Future,
834        io::{Seek, Write},
835    };
836
837    use encoding_rs::UTF_16LE;
838    use similar_asserts::assert_eq;
839    use tempfile::tempdir;
840    use tokio::time::{Duration, sleep, timeout};
841    use vector_lib::schema::Definition;
842    use vrl::{value, value::kind::Collection};
843
844    use super::*;
845    use crate::{
846        config::Config,
847        event::{Event, EventStatus, Value},
848        shutdown::ShutdownSignal,
849        sources::file,
850        test_util::components::{FILE_SOURCE_TAGS, assert_source_compliance},
851    };
852
853    #[test]
854    fn generate_config() {
855        crate::test_util::test_generate_config::<FileConfig>();
856    }
857
858    fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
859        file::FileConfig {
860            fingerprint: FingerprintConfig::Checksum {
861                ignored_header_bytes: 0,
862                lines: 1,
863            },
864            data_dir: Some(dir.path().to_path_buf()),
865            glob_minimum_cooldown_ms: Duration::from_millis(100),
866            internal_metrics: FileInternalMetricsConfig {
867                include_file_tag: true,
868            },
869            ..Default::default()
870        }
871    }
872
873    async fn sleep_500_millis() {
874        sleep(Duration::from_millis(500)).await;
875    }
876
877    #[test]
878    fn parse_config() {
879        let config: FileConfig = toml::from_str(
880            r#"
881            include = [ "/var/log/**/*.log" ]
882            file_key = "file"
883            glob_minimum_cooldown_ms = 1000
884            multi_line_timeout = 1000
885            max_read_bytes = 2048
886            line_delimiter = "\n"
887        "#,
888        )
889        .unwrap();
890        assert_eq!(config, FileConfig::default());
891        assert_eq!(
892            config.fingerprint,
893            FingerprintConfig::Checksum {
894                ignored_header_bytes: 0,
895                lines: 1
896            }
897        );
898
899        let config: FileConfig = toml::from_str(
900            r#"
901        include = [ "/var/log/**/*.log" ]
902        [fingerprint]
903        strategy = "device_and_inode"
904        "#,
905        )
906        .unwrap();
907        assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
908
909        let config: FileConfig = toml::from_str(
910            r#"
911        include = [ "/var/log/**/*.log" ]
912        [fingerprint]
913        strategy = "checksum"
914        bytes = 128
915        ignored_header_bytes = 512
916        "#,
917        )
918        .unwrap();
919        assert_eq!(
920            config.fingerprint,
921            FingerprintConfig::Checksum {
922                ignored_header_bytes: 512,
923                lines: 1
924            }
925        );
926
927        let config: FileConfig = toml::from_str(
928            r#"
929        include = [ "/var/log/**/*.log" ]
930        [encoding]
931        charset = "utf-16le"
932        "#,
933        )
934        .unwrap();
935        assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
936
937        let config: FileConfig = toml::from_str(
938            r#"
939        include = [ "/var/log/**/*.log" ]
940        read_from = "beginning"
941        "#,
942        )
943        .unwrap();
944        assert_eq!(config.read_from, ReadFromConfig::Beginning);
945
946        let config: FileConfig = toml::from_str(
947            r#"
948        include = [ "/var/log/**/*.log" ]
949        read_from = "end"
950        "#,
951        )
952        .unwrap();
953        assert_eq!(config.read_from, ReadFromConfig::End);
954    }
955
956    #[test]
957    fn resolve_data_dir() {
958        let global_dir = tempdir().unwrap();
959        let local_dir = tempdir().unwrap();
960
961        let mut config = Config::default();
962        config.global.data_dir = global_dir.keep().into();
963
964        // local path given -- local should win
965        let res = config
966            .global
967            .resolve_and_validate_data_dir(test_default_file_config(&local_dir).data_dir.as_ref())
968            .unwrap();
969        assert_eq!(res, local_dir.path());
970
971        // no local path given -- global fallback should be in effect
972        let res = config.global.resolve_and_validate_data_dir(None).unwrap();
973        assert_eq!(res, config.global.data_dir.unwrap());
974    }
975
976    #[test]
977    fn output_schema_definition_vector_namespace() {
978        let definitions = FileConfig::default()
979            .outputs(LogNamespace::Vector)
980            .remove(0)
981            .schema_definition(true);
982
983        assert_eq!(
984            definitions,
985            Some(
986                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
987                    .with_meaning(OwnedTargetPath::event_root(), "message")
988                    .with_metadata_field(
989                        &owned_value_path!("vector", "source_type"),
990                        Kind::bytes(),
991                        None
992                    )
993                    .with_metadata_field(
994                        &owned_value_path!("vector", "ingest_timestamp"),
995                        Kind::timestamp(),
996                        None
997                    )
998                    .with_metadata_field(
999                        &owned_value_path!("file", "host"),
1000                        Kind::bytes().or_undefined(),
1001                        Some("host")
1002                    )
1003                    .with_metadata_field(
1004                        &owned_value_path!("file", "offset"),
1005                        Kind::integer(),
1006                        None
1007                    )
1008                    .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1009            )
1010        )
1011    }
1012
1013    #[test]
1014    fn output_schema_definition_legacy_namespace() {
1015        let definitions = FileConfig::default()
1016            .outputs(LogNamespace::Legacy)
1017            .remove(0)
1018            .schema_definition(true);
1019
1020        assert_eq!(
1021            definitions,
1022            Some(
1023                Definition::new_with_default_metadata(
1024                    Kind::object(Collection::empty()),
1025                    [LogNamespace::Legacy]
1026                )
1027                .with_event_field(
1028                    &owned_value_path!("message"),
1029                    Kind::bytes(),
1030                    Some("message")
1031                )
1032                .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1033                .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1034                .with_event_field(
1035                    &owned_value_path!("host"),
1036                    Kind::bytes().or_undefined(),
1037                    Some("host")
1038                )
1039                .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1040                .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1041            )
1042        )
1043    }
1044
1045    #[test]
1046    fn create_event_legacy_namespace() {
1047        let line = Bytes::from("hello world");
1048        let file = "some_file.rs";
1049        let offset: u64 = 0;
1050
1051        let meta = EventMetadata {
1052            host_key: Some(owned_value_path!("host")),
1053            hostname: Some("Some.Machine".to_string()),
1054            file_key: Some(owned_value_path!("file")),
1055            offset_key: Some(owned_value_path!("offset")),
1056        };
1057        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1058
1059        assert_eq!(log["file"], "some_file.rs".into());
1060        assert_eq!(log["host"], "Some.Machine".into());
1061        assert_eq!(log["offset"], 0.into());
1062        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1063        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1064        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1065    }
1066
1067    #[test]
1068    fn create_event_custom_fields_legacy_namespace() {
1069        let line = Bytes::from("hello world");
1070        let file = "some_file.rs";
1071        let offset: u64 = 0;
1072
1073        let meta = EventMetadata {
1074            host_key: Some(owned_value_path!("hostname")),
1075            hostname: Some("Some.Machine".to_string()),
1076            file_key: Some(owned_value_path!("file_path")),
1077            offset_key: Some(owned_value_path!("off")),
1078        };
1079        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1080
1081        assert_eq!(log["file_path"], "some_file.rs".into());
1082        assert_eq!(log["hostname"], "Some.Machine".into());
1083        assert_eq!(log["off"], 0.into());
1084        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1085        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1086        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1087    }
1088
1089    #[test]
1090    fn create_event_vector_namespace() {
1091        let line = Bytes::from("hello world");
1092        let file = "some_file.rs";
1093        let offset: u64 = 0;
1094
1095        let meta = EventMetadata {
1096            host_key: Some(owned_value_path!("ignored")),
1097            hostname: Some("Some.Machine".to_string()),
1098            file_key: Some(owned_value_path!("ignored")),
1099            offset_key: Some(owned_value_path!("ignored")),
1100        };
1101        let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1102
1103        assert_eq!(log.value(), &value!("hello world"));
1104
1105        assert_eq!(
1106            log.metadata()
1107                .value()
1108                .get(path!("vector", "source_type"))
1109                .unwrap(),
1110            &value!("file")
1111        );
1112        assert!(
1113            log.metadata()
1114                .value()
1115                .get(path!("vector", "ingest_timestamp"))
1116                .unwrap()
1117                .is_timestamp()
1118        );
1119
1120        assert_eq!(
1121            log.metadata()
1122                .value()
1123                .get(path!(FileConfig::NAME, "host"))
1124                .unwrap(),
1125            &value!("Some.Machine")
1126        );
1127        assert_eq!(
1128            log.metadata()
1129                .value()
1130                .get(path!(FileConfig::NAME, "offset"))
1131                .unwrap(),
1132            &value!(0)
1133        );
1134        assert_eq!(
1135            log.metadata()
1136                .value()
1137                .get(path!(FileConfig::NAME, "path"))
1138                .unwrap(),
1139            &value!("some_file.rs")
1140        );
1141    }
1142
1143    #[tokio::test]
1144    async fn file_happy_path() {
1145        let n = 5;
1146
1147        let dir = tempdir().unwrap();
1148        let config = file::FileConfig {
1149            include: vec![dir.path().join("*")],
1150            ..test_default_file_config(&dir)
1151        };
1152
1153        let path1 = dir.path().join("file1");
1154        let path2 = dir.path().join("file2");
1155
1156        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1157            let mut file1 = File::create(&path1).unwrap();
1158            let mut file2 = File::create(&path2).unwrap();
1159
1160            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1161
1162            for i in 0..n {
1163                writeln!(&mut file1, "hello {i}").unwrap();
1164                writeln!(&mut file2, "goodbye {i}").unwrap();
1165            }
1166
1167            sleep_500_millis().await;
1168        })
1169        .await;
1170
1171        let mut hello_i = 0;
1172        let mut goodbye_i = 0;
1173
1174        for event in received {
1175            let line =
1176                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1177            if line.starts_with("hello") {
1178                assert_eq!(line, format!("hello {}", hello_i));
1179                assert_eq!(
1180                    event.as_log()["file"].to_string_lossy(),
1181                    path1.to_str().unwrap()
1182                );
1183                hello_i += 1;
1184            } else {
1185                assert_eq!(line, format!("goodbye {}", goodbye_i));
1186                assert_eq!(
1187                    event.as_log()["file"].to_string_lossy(),
1188                    path2.to_str().unwrap()
1189                );
1190                goodbye_i += 1;
1191            }
1192        }
1193        assert_eq!(hello_i, n);
1194        assert_eq!(goodbye_i, n);
1195    }
1196
1197    // https://github.com/vectordotdev/vector/issues/8363
1198    #[tokio::test]
1199    async fn file_read_empty_lines() {
1200        let n = 5;
1201
1202        let dir = tempdir().unwrap();
1203        let config = file::FileConfig {
1204            include: vec![dir.path().join("*")],
1205            ..test_default_file_config(&dir)
1206        };
1207
1208        let path = dir.path().join("file");
1209
1210        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1211            let mut file = File::create(&path).unwrap();
1212
1213            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1214
1215            writeln!(&mut file, "line for checkpointing").unwrap();
1216            for _i in 0..n {
1217                writeln!(&mut file).unwrap();
1218            }
1219
1220            sleep_500_millis().await;
1221        })
1222        .await;
1223
1224        assert_eq!(received.len(), n + 1);
1225    }
1226
1227    #[tokio::test]
1228    async fn file_truncate() {
1229        let n = 5;
1230
1231        let dir = tempdir().unwrap();
1232        let config = file::FileConfig {
1233            include: vec![dir.path().join("*")],
1234            ..test_default_file_config(&dir)
1235        };
1236        let path = dir.path().join("file");
1237        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1238            let mut file = File::create(&path).unwrap();
1239
1240            sleep_500_millis().await; // The files must be observed at its original length before writing to it
1241
1242            for i in 0..n {
1243                writeln!(&mut file, "pretrunc {i}").unwrap();
1244            }
1245
1246            sleep_500_millis().await; // The writes must be observed before truncating
1247
1248            file.set_len(0).unwrap();
1249            file.seek(std::io::SeekFrom::Start(0)).unwrap();
1250
1251            sleep_500_millis().await; // The truncate must be observed before writing again
1252
1253            for i in 0..n {
1254                writeln!(&mut file, "posttrunc {i}").unwrap();
1255            }
1256
1257            sleep_500_millis().await;
1258        })
1259        .await;
1260
1261        let mut i = 0;
1262        let mut pre_trunc = true;
1263
1264        for event in received {
1265            assert_eq!(
1266                event.as_log()["file"].to_string_lossy(),
1267                path.to_str().unwrap()
1268            );
1269
1270            let line =
1271                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1272
1273            if pre_trunc {
1274                assert_eq!(line, format!("pretrunc {}", i));
1275            } else {
1276                assert_eq!(line, format!("posttrunc {}", i));
1277            }
1278
1279            i += 1;
1280            if i == n {
1281                i = 0;
1282                pre_trunc = false;
1283            }
1284        }
1285    }
1286
1287    #[tokio::test]
1288    async fn file_rotate() {
1289        let n = 5;
1290
1291        let dir = tempdir().unwrap();
1292        let config = file::FileConfig {
1293            include: vec![dir.path().join("*")],
1294            ..test_default_file_config(&dir)
1295        };
1296
1297        let path = dir.path().join("file");
1298        let archive_path = dir.path().join("file");
1299        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1300            let mut file = File::create(&path).unwrap();
1301
1302            sleep_500_millis().await; // The files must be observed at its original length before writing to it
1303
1304            for i in 0..n {
1305                writeln!(&mut file, "prerot {i}").unwrap();
1306            }
1307
1308            sleep_500_millis().await; // The writes must be observed before rotating
1309
1310            fs::rename(&path, archive_path).expect("could not rename");
1311            let mut file = File::create(&path).unwrap();
1312
1313            sleep_500_millis().await; // The rotation must be observed before writing again
1314
1315            for i in 0..n {
1316                writeln!(&mut file, "postrot {i}").unwrap();
1317            }
1318
1319            sleep_500_millis().await;
1320        })
1321        .await;
1322
1323        let mut i = 0;
1324        let mut pre_rot = true;
1325
1326        for event in received {
1327            assert_eq!(
1328                event.as_log()["file"].to_string_lossy(),
1329                path.to_str().unwrap()
1330            );
1331
1332            let line =
1333                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1334
1335            if pre_rot {
1336                assert_eq!(line, format!("prerot {}", i));
1337            } else {
1338                assert_eq!(line, format!("postrot {}", i));
1339            }
1340
1341            i += 1;
1342            if i == n {
1343                i = 0;
1344                pre_rot = false;
1345            }
1346        }
1347    }
1348
1349    #[tokio::test]
1350    async fn file_multiple_paths() {
1351        let n = 5;
1352
1353        let dir = tempdir().unwrap();
1354        let config = file::FileConfig {
1355            include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1356            exclude: vec![dir.path().join("a.*.txt")],
1357            ..test_default_file_config(&dir)
1358        };
1359
1360        let path1 = dir.path().join("a.txt");
1361        let path2 = dir.path().join("b.txt");
1362        let path3 = dir.path().join("a.log");
1363        let path4 = dir.path().join("a.ignore.txt");
1364        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1365            let mut file1 = File::create(&path1).unwrap();
1366            let mut file2 = File::create(&path2).unwrap();
1367            let mut file3 = File::create(&path3).unwrap();
1368            let mut file4 = File::create(&path4).unwrap();
1369
1370            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1371
1372            for i in 0..n {
1373                writeln!(&mut file1, "1 {i}").unwrap();
1374                writeln!(&mut file2, "2 {i}").unwrap();
1375                writeln!(&mut file3, "3 {i}").unwrap();
1376                writeln!(&mut file4, "4 {i}").unwrap();
1377            }
1378
1379            sleep_500_millis().await;
1380        })
1381        .await;
1382
1383        let mut is = [0; 3];
1384
1385        for event in received {
1386            let line =
1387                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1388            let mut split = line.split(' ');
1389            let file = split.next().unwrap().parse::<usize>().unwrap();
1390            assert_ne!(file, 4);
1391            let i = split.next().unwrap().parse::<usize>().unwrap();
1392
1393            assert_eq!(is[file - 1], i);
1394            is[file - 1] += 1;
1395        }
1396
1397        assert_eq!(is, [n as usize; 3]);
1398    }
1399
1400    #[tokio::test]
1401    async fn file_exclude_paths() {
1402        let n = 5;
1403
1404        let dir = tempdir().unwrap();
1405        let config = file::FileConfig {
1406            include: vec![dir.path().join("a//b/*.log.*")],
1407            exclude: vec![dir.path().join("a//b/test.log.*")],
1408            ..test_default_file_config(&dir)
1409        };
1410
1411        let path1 = dir.path().join("a//b/a.log.1");
1412        let path2 = dir.path().join("a//b/test.log.1");
1413        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1414            std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1415            let mut file1 = File::create(&path1).unwrap();
1416            let mut file2 = File::create(&path2).unwrap();
1417
1418            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1419
1420            for i in 0..n {
1421                writeln!(&mut file1, "1 {i}").unwrap();
1422                writeln!(&mut file2, "2 {i}").unwrap();
1423            }
1424
1425            sleep_500_millis().await;
1426        })
1427        .await;
1428
1429        let mut is = [0; 1];
1430
1431        for event in received {
1432            let line =
1433                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1434            let mut split = line.split(' ');
1435            let file = split.next().unwrap().parse::<usize>().unwrap();
1436            assert_ne!(file, 4);
1437            let i = split.next().unwrap().parse::<usize>().unwrap();
1438
1439            assert_eq!(is[file - 1], i);
1440            is[file - 1] += 1;
1441        }
1442
1443        assert_eq!(is, [n as usize; 1]);
1444    }
1445
1446    #[tokio::test]
1447    async fn file_key_acknowledged() {
1448        file_key(Acks).await
1449    }
1450
1451    #[tokio::test]
1452    async fn file_key_no_acknowledge() {
1453        file_key(NoAcks).await
1454    }
1455
1456    async fn file_key(acks: AckingMode) {
1457        // Default
1458        {
1459            let dir = tempdir().unwrap();
1460            let config = file::FileConfig {
1461                include: vec![dir.path().join("*")],
1462                ..test_default_file_config(&dir)
1463            };
1464
1465            let path = dir.path().join("file");
1466            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1467                let mut file = File::create(&path).unwrap();
1468
1469                sleep_500_millis().await;
1470
1471                writeln!(&mut file, "hello there").unwrap();
1472
1473                sleep_500_millis().await;
1474            })
1475            .await;
1476
1477            assert_eq!(received.len(), 1);
1478            assert_eq!(
1479                received[0].as_log()["file"].to_string_lossy(),
1480                path.to_str().unwrap()
1481            );
1482        }
1483
1484        // Custom
1485        {
1486            let dir = tempdir().unwrap();
1487            let config = file::FileConfig {
1488                include: vec![dir.path().join("*")],
1489                file_key: OptionalValuePath::from(owned_value_path!("source")),
1490                ..test_default_file_config(&dir)
1491            };
1492
1493            let path = dir.path().join("file");
1494            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1495                let mut file = File::create(&path).unwrap();
1496
1497                sleep_500_millis().await;
1498
1499                writeln!(&mut file, "hello there").unwrap();
1500
1501                sleep_500_millis().await;
1502            })
1503            .await;
1504
1505            assert_eq!(received.len(), 1);
1506            assert_eq!(
1507                received[0].as_log()["source"].to_string_lossy(),
1508                path.to_str().unwrap()
1509            );
1510        }
1511
1512        // Hidden
1513        {
1514            let dir = tempdir().unwrap();
1515            let config = file::FileConfig {
1516                include: vec![dir.path().join("*")],
1517                ..test_default_file_config(&dir)
1518            };
1519
1520            let path = dir.path().join("file");
1521            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1522                let mut file = File::create(&path).unwrap();
1523
1524                sleep_500_millis().await;
1525
1526                writeln!(&mut file, "hello there").unwrap();
1527
1528                sleep_500_millis().await;
1529            })
1530            .await;
1531
1532            assert_eq!(received.len(), 1);
1533            assert_eq!(
1534                received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1535                vec![
1536                    default_file_key()
1537                        .path
1538                        .expect("file key to exist")
1539                        .to_string()
1540                        .into(),
1541                    log_schema().host_key().unwrap().to_string().into(),
1542                    log_schema().message_key().unwrap().to_string().into(),
1543                    log_schema().timestamp_key().unwrap().to_string().into(),
1544                    log_schema().source_type_key().unwrap().to_string().into()
1545                ]
1546                .into_iter()
1547                .collect::<HashSet<_>>()
1548            );
1549        }
1550    }
1551
1552    #[cfg(target_os = "linux")] // see #7988
1553    #[tokio::test]
1554    async fn file_start_position_server_restart_acknowledged() {
1555        file_start_position_server_restart(Acks).await
1556    }
1557
1558    #[cfg(target_os = "linux")] // see #7988
1559    #[tokio::test]
1560    async fn file_start_position_server_restart_no_acknowledge() {
1561        file_start_position_server_restart(NoAcks).await
1562    }
1563
1564    #[cfg(target_os = "linux")] // see #7988
1565    async fn file_start_position_server_restart(acking: AckingMode) {
1566        let dir = tempdir().unwrap();
1567        let config = file::FileConfig {
1568            include: vec![dir.path().join("*")],
1569            ..test_default_file_config(&dir)
1570        };
1571
1572        let path = dir.path().join("file");
1573        let mut file = File::create(&path).unwrap();
1574        writeln!(&mut file, "zeroth line").unwrap();
1575        sleep_500_millis().await;
1576
1577        // First time server runs it picks up existing lines.
1578        {
1579            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1580                sleep_500_millis().await;
1581                writeln!(&mut file, "first line").unwrap();
1582                sleep_500_millis().await;
1583            })
1584            .await;
1585
1586            let lines = extract_messages_string(received);
1587            assert_eq!(lines, vec!["zeroth line", "first line"]);
1588        }
1589        // Restart server, read file from checkpoint.
1590        {
1591            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1592                sleep_500_millis().await;
1593                writeln!(&mut file, "second line").unwrap();
1594                sleep_500_millis().await;
1595            })
1596            .await;
1597
1598            let lines = extract_messages_string(received);
1599            assert_eq!(lines, vec!["second line"]);
1600        }
1601        // Restart server, read files from beginning.
1602        {
1603            let config = file::FileConfig {
1604                include: vec![dir.path().join("*")],
1605                ignore_checkpoints: Some(true),
1606                read_from: ReadFromConfig::Beginning,
1607                ..test_default_file_config(&dir)
1608            };
1609            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1610                sleep_500_millis().await;
1611                writeln!(&mut file, "third line").unwrap();
1612                sleep_500_millis().await;
1613            })
1614            .await;
1615
1616            let lines = extract_messages_string(received);
1617            assert_eq!(
1618                lines,
1619                vec!["zeroth line", "first line", "second line", "third line"]
1620            );
1621        }
1622    }
1623
1624    #[tokio::test]
1625    async fn file_start_position_server_restart_unfinalized() {
1626        let dir = tempdir().unwrap();
1627        let config = file::FileConfig {
1628            include: vec![dir.path().join("*")],
1629            ..test_default_file_config(&dir)
1630        };
1631
1632        let path = dir.path().join("file");
1633        let mut file = File::create(&path).unwrap();
1634        writeln!(&mut file, "the line").unwrap();
1635        file.flush().unwrap();
1636
1637        // First time server runs it picks up existing lines.
1638        let received = run_file_source(
1639            &config,
1640            false,
1641            Unfinalized,
1642            LogNamespace::Legacy,
1643            sleep(Duration::from_secs(5)),
1644        )
1645        .await;
1646        let lines = extract_messages_string(received);
1647        assert_eq!(lines, vec!["the line"]);
1648
1649        // Restart server, it re-reads file since the events were not acknowledged before shutdown
1650        let received = run_file_source(
1651            &config,
1652            false,
1653            Unfinalized,
1654            LogNamespace::Legacy,
1655            sleep(Duration::from_secs(5)),
1656        )
1657        .await;
1658        let lines = extract_messages_string(received);
1659        assert_eq!(lines, vec!["the line"]);
1660    }
1661
1662    #[tokio::test]
1663    async fn file_duplicate_processing_after_restart() {
1664        let dir = tempdir().unwrap();
1665        let config = file::FileConfig {
1666            include: vec![dir.path().join("*")],
1667            ..test_default_file_config(&dir)
1668        };
1669
1670        let path = dir.path().join("file");
1671        let mut file = File::create(&path).unwrap();
1672
1673        let line_count = 4000;
1674        for i in 0..line_count {
1675            writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1676        }
1677        file.flush().unwrap();
1678        sleep_500_millis().await;
1679
1680        // First time server runs it should pick up a bunch of lines
1681        let received = run_file_source(
1682            &config,
1683            true,
1684            Acks,
1685            LogNamespace::Legacy,
1686            // shutdown signal is sent after this duration
1687            sleep_500_millis(),
1688        )
1689        .await;
1690        let lines = extract_messages_string(received);
1691
1692        // ...but not all the lines; if the first run processed the entire file, we may not hit the
1693        // bug we're testing for, which happens if the finalizer stream exits on shutdown with pending acks
1694        assert!(lines.len() < line_count);
1695
1696        // Restart the server, and it should read the rest without duplicating any
1697        let received = run_file_source(
1698            &config,
1699            true,
1700            Acks,
1701            LogNamespace::Legacy,
1702            sleep(Duration::from_secs(5)),
1703        )
1704        .await;
1705        let lines2 = extract_messages_string(received);
1706
1707        // Between both runs, we should have the expected number of lines
1708        assert_eq!(lines.len() + lines2.len(), line_count);
1709    }
1710
1711    #[tokio::test]
1712    async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1713        file_start_position_server_restart_with_file_rotation(Acks).await
1714    }
1715
1716    #[tokio::test]
1717    async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1718        file_start_position_server_restart_with_file_rotation(NoAcks).await
1719    }
1720
1721    async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1722        let dir = tempdir().unwrap();
1723        let config = file::FileConfig {
1724            include: vec![dir.path().join("*")],
1725            ..test_default_file_config(&dir)
1726        };
1727
1728        let path = dir.path().join("file");
1729        let path_for_old_file = dir.path().join("file.old");
1730        // Run server first time, collect some lines.
1731        {
1732            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1733                let mut file = File::create(&path).unwrap();
1734                sleep_500_millis().await;
1735                writeln!(&mut file, "first line").unwrap();
1736                sleep_500_millis().await;
1737            })
1738            .await;
1739
1740            let lines = extract_messages_string(received);
1741            assert_eq!(lines, vec!["first line"]);
1742        }
1743        // Perform 'file rotation' to archive old lines.
1744        fs::rename(&path, &path_for_old_file).expect("could not rename");
1745        // Restart the server and make sure it does not re-read the old file
1746        // even though it has a new name.
1747        {
1748            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1749                let mut file = File::create(&path).unwrap();
1750                sleep_500_millis().await;
1751                writeln!(&mut file, "second line").unwrap();
1752                sleep_500_millis().await;
1753            })
1754            .await;
1755
1756            let lines = extract_messages_string(received);
1757            assert_eq!(lines, vec!["second line"]);
1758        }
1759    }
1760
1761    #[cfg(unix)] // this test uses unix-specific function `futimes` during test time
1762    #[tokio::test]
1763    async fn file_start_position_ignore_old_files() {
1764        use std::{
1765            os::unix::io::AsRawFd,
1766            time::{Duration, SystemTime},
1767        };
1768
1769        let dir = tempdir().unwrap();
1770        let config = file::FileConfig {
1771            include: vec![dir.path().join("*")],
1772            ignore_older_secs: Some(5),
1773            ..test_default_file_config(&dir)
1774        };
1775
1776        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1777            let before_path = dir.path().join("before");
1778            let mut before_file = File::create(&before_path).unwrap();
1779            let after_path = dir.path().join("after");
1780            let mut after_file = File::create(&after_path).unwrap();
1781
1782            writeln!(&mut before_file, "first line").unwrap(); // first few bytes make up unique file fingerprint
1783            writeln!(&mut after_file, "_first line").unwrap(); //   and therefore need to be non-identical
1784
1785            {
1786                // Set the modified times
1787                let before = SystemTime::now() - Duration::from_secs(8);
1788                let after = SystemTime::now() - Duration::from_secs(2);
1789
1790                let before_time = libc::timeval {
1791                    tv_sec: before
1792                        .duration_since(SystemTime::UNIX_EPOCH)
1793                        .unwrap()
1794                        .as_secs() as _,
1795                    tv_usec: 0,
1796                };
1797                let before_times = [before_time, before_time];
1798
1799                let after_time = libc::timeval {
1800                    tv_sec: after
1801                        .duration_since(SystemTime::UNIX_EPOCH)
1802                        .unwrap()
1803                        .as_secs() as _,
1804                    tv_usec: 0,
1805                };
1806                let after_times = [after_time, after_time];
1807
1808                unsafe {
1809                    libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1810                    libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1811                }
1812            }
1813
1814            sleep_500_millis().await;
1815            writeln!(&mut before_file, "second line").unwrap();
1816            writeln!(&mut after_file, "_second line").unwrap();
1817
1818            sleep_500_millis().await;
1819        })
1820        .await;
1821
1822        let before_lines = received
1823            .iter()
1824            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1825            .map(|event| {
1826                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1827            })
1828            .collect::<Vec<_>>();
1829        let after_lines = received
1830            .iter()
1831            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1832            .map(|event| {
1833                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1834            })
1835            .collect::<Vec<_>>();
1836        assert_eq!(before_lines, vec!["second line"]);
1837        assert_eq!(after_lines, vec!["_first line", "_second line"]);
1838    }
1839
1840    #[tokio::test]
1841    async fn file_max_line_bytes() {
1842        let dir = tempdir().unwrap();
1843        let config = file::FileConfig {
1844            include: vec![dir.path().join("*")],
1845            max_line_bytes: 10,
1846            ..test_default_file_config(&dir)
1847        };
1848
1849        let path = dir.path().join("file");
1850        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1851            let mut file = File::create(&path).unwrap();
1852
1853            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1854
1855            writeln!(&mut file, "short").unwrap();
1856            writeln!(&mut file, "this is too long").unwrap();
1857            writeln!(&mut file, "11 eleven11").unwrap();
1858            let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1859            writeln!(&mut file, "{super_long}").unwrap();
1860            writeln!(&mut file, "exactly 10").unwrap();
1861            writeln!(&mut file, "it can end on a line that's too long").unwrap();
1862
1863            sleep_500_millis().await;
1864            sleep_500_millis().await;
1865
1866            writeln!(&mut file, "and then continue").unwrap();
1867            writeln!(&mut file, "last short").unwrap();
1868
1869            sleep_500_millis().await;
1870            sleep_500_millis().await;
1871        }).await;
1872
1873        let received = extract_messages_value(received);
1874
1875        assert_eq!(
1876            received,
1877            vec!["short".into(), "exactly 10".into(), "last short".into()]
1878        );
1879    }
1880
1881    #[tokio::test]
1882    async fn test_multi_line_aggregation_legacy() {
1883        let dir = tempdir().unwrap();
1884        let config = file::FileConfig {
1885            include: vec![dir.path().join("*")],
1886            message_start_indicator: Some("INFO".into()),
1887            multi_line_timeout: 25, // less than 50 in sleep()
1888            ..test_default_file_config(&dir)
1889        };
1890
1891        let path = dir.path().join("file");
1892        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1893            let mut file = File::create(&path).unwrap();
1894
1895            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1896
1897            writeln!(&mut file, "leftover foo").unwrap();
1898            writeln!(&mut file, "INFO hello").unwrap();
1899            writeln!(&mut file, "INFO goodbye").unwrap();
1900            writeln!(&mut file, "part of goodbye").unwrap();
1901
1902            sleep_500_millis().await;
1903
1904            writeln!(&mut file, "INFO hi again").unwrap();
1905            writeln!(&mut file, "and some more").unwrap();
1906            writeln!(&mut file, "INFO hello").unwrap();
1907
1908            sleep_500_millis().await;
1909
1910            writeln!(&mut file, "too slow").unwrap();
1911            writeln!(&mut file, "INFO doesn't have").unwrap();
1912            writeln!(&mut file, "to be INFO in").unwrap();
1913            writeln!(&mut file, "the middle").unwrap();
1914
1915            sleep_500_millis().await;
1916        })
1917        .await;
1918
1919        let received = extract_messages_value(received);
1920
1921        assert_eq!(
1922            received,
1923            vec![
1924                "leftover foo".into(),
1925                "INFO hello".into(),
1926                "INFO goodbye\npart of goodbye".into(),
1927                "INFO hi again\nand some more".into(),
1928                "INFO hello".into(),
1929                "too slow".into(),
1930                "INFO doesn't have".into(),
1931                "to be INFO in\nthe middle".into(),
1932            ]
1933        );
1934    }
1935
1936    #[tokio::test]
1937    async fn test_multi_line_aggregation() {
1938        let dir = tempdir().unwrap();
1939        let config = file::FileConfig {
1940            include: vec![dir.path().join("*")],
1941            multiline: Some(MultilineConfig {
1942                start_pattern: "INFO".to_owned(),
1943                condition_pattern: "INFO".to_owned(),
1944                mode: line_agg::Mode::HaltBefore,
1945                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
1946            }),
1947            ..test_default_file_config(&dir)
1948        };
1949
1950        let path = dir.path().join("file");
1951        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1952            let mut file = File::create(&path).unwrap();
1953
1954            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1955
1956            writeln!(&mut file, "leftover foo").unwrap();
1957            writeln!(&mut file, "INFO hello").unwrap();
1958            writeln!(&mut file, "INFO goodbye").unwrap();
1959            writeln!(&mut file, "part of goodbye").unwrap();
1960
1961            sleep_500_millis().await;
1962
1963            writeln!(&mut file, "INFO hi again").unwrap();
1964            writeln!(&mut file, "and some more").unwrap();
1965            writeln!(&mut file, "INFO hello").unwrap();
1966
1967            sleep_500_millis().await;
1968
1969            writeln!(&mut file, "too slow").unwrap();
1970            writeln!(&mut file, "INFO doesn't have").unwrap();
1971            writeln!(&mut file, "to be INFO in").unwrap();
1972            writeln!(&mut file, "the middle").unwrap();
1973
1974            sleep_500_millis().await;
1975        })
1976        .await;
1977
1978        let received = extract_messages_value(received);
1979
1980        assert_eq!(
1981            received,
1982            vec![
1983                "leftover foo".into(),
1984                "INFO hello".into(),
1985                "INFO goodbye\npart of goodbye".into(),
1986                "INFO hi again\nand some more".into(),
1987                "INFO hello".into(),
1988                "too slow".into(),
1989                "INFO doesn't have".into(),
1990                "to be INFO in\nthe middle".into(),
1991            ]
1992        );
1993    }
1994
1995    #[tokio::test]
1996    async fn test_multi_line_checkpointing() {
1997        let dir = tempdir().unwrap();
1998        let config = file::FileConfig {
1999            include: vec![dir.path().join("*")],
2000            offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2001            multiline: Some(MultilineConfig {
2002                start_pattern: "INFO".to_owned(),
2003                condition_pattern: "INFO".to_owned(),
2004                mode: line_agg::Mode::HaltBefore,
2005                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
2006            }),
2007            ..test_default_file_config(&dir)
2008        };
2009
2010        let path = dir.path().join("file");
2011        let mut file = File::create(&path).unwrap();
2012
2013        writeln!(&mut file, "INFO hello").unwrap();
2014        writeln!(&mut file, "part of hello").unwrap();
2015
2016        // Read and aggregate existing lines
2017        let received = run_file_source(
2018            &config,
2019            false,
2020            Acks,
2021            LogNamespace::Legacy,
2022            sleep_500_millis(),
2023        )
2024        .await;
2025
2026        assert_eq!(received[0].as_log()["offset"], 0.into());
2027
2028        let lines = extract_messages_string(received);
2029        assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2030
2031        // After restart, we should not see any part of the previously aggregated lines
2032        let received_after_restart =
2033            run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2034                writeln!(&mut file, "INFO goodbye").unwrap();
2035            })
2036            .await;
2037        assert_eq!(
2038            received_after_restart[0].as_log()["offset"],
2039            (lines[0].len() + 1).into()
2040        );
2041        let lines = extract_messages_string(received_after_restart);
2042        assert_eq!(lines, vec!["INFO goodbye"]);
2043    }
2044
2045    #[tokio::test]
2046    async fn test_fair_reads() {
2047        let dir = tempdir().unwrap();
2048        let config = file::FileConfig {
2049            include: vec![dir.path().join("*")],
2050            max_read_bytes: 1,
2051            oldest_first: false,
2052            ..test_default_file_config(&dir)
2053        };
2054
2055        let older_path = dir.path().join("z_older_file");
2056        let mut older = File::create(&older_path).unwrap();
2057
2058        sleep_500_millis().await;
2059
2060        let newer_path = dir.path().join("a_newer_file");
2061        let mut newer = File::create(&newer_path).unwrap();
2062
2063        writeln!(&mut older, "hello i am the old file").unwrap();
2064        writeln!(&mut older, "i have been around a while").unwrap();
2065        writeln!(&mut older, "you can read newer files at the same time").unwrap();
2066
2067        writeln!(&mut newer, "and i am the new file").unwrap();
2068        writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2069        writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2070
2071        sleep_500_millis().await;
2072
2073        let received = run_file_source(
2074            &config,
2075            false,
2076            NoAcks,
2077            LogNamespace::Legacy,
2078            sleep_500_millis(),
2079        )
2080        .await;
2081
2082        let received = extract_messages_value(received);
2083
2084        assert_eq!(
2085            received,
2086            vec![
2087                "hello i am the old file".into(),
2088                "and i am the new file".into(),
2089                "i have been around a while".into(),
2090                "this should be interleaved with the old one".into(),
2091                "you can read newer files at the same time".into(),
2092                "which is fine because we want fairness".into(),
2093            ]
2094        );
2095    }
2096
2097    #[tokio::test]
2098    async fn test_oldest_first() {
2099        let dir = tempdir().unwrap();
2100        let config = file::FileConfig {
2101            include: vec![dir.path().join("*")],
2102            max_read_bytes: 1,
2103            oldest_first: true,
2104            ..test_default_file_config(&dir)
2105        };
2106
2107        let older_path = dir.path().join("z_older_file");
2108        let mut older = File::create(&older_path).unwrap();
2109
2110        sleep_500_millis().await;
2111
2112        let newer_path = dir.path().join("a_newer_file");
2113        let mut newer = File::create(&newer_path).unwrap();
2114
2115        writeln!(&mut older, "hello i am the old file").unwrap();
2116        writeln!(&mut older, "i have been around a while").unwrap();
2117        writeln!(&mut older, "you should definitely read all of me first").unwrap();
2118
2119        writeln!(&mut newer, "i'm new").unwrap();
2120        writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2121        writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2122
2123        sleep_500_millis().await;
2124
2125        let received = run_file_source(
2126            &config,
2127            false,
2128            NoAcks,
2129            LogNamespace::Legacy,
2130            sleep_500_millis(),
2131        )
2132        .await;
2133
2134        let received = extract_messages_value(received);
2135
2136        assert_eq!(
2137            received,
2138            vec![
2139                "hello i am the old file".into(),
2140                "i have been around a while".into(),
2141                "you should definitely read all of me first".into(),
2142                "i'm new".into(),
2143                "hopefully you read all the old stuff first".into(),
2144                "because otherwise i'm not going to make sense".into(),
2145            ]
2146        );
2147    }
2148
2149    // Ignoring on mac: https://github.com/vectordotdev/vector/issues/8373
2150    #[cfg(not(target_os = "macos"))]
2151    #[tokio::test]
2152    async fn test_split_reads() {
2153        let dir = tempdir().unwrap();
2154        let config = file::FileConfig {
2155            include: vec![dir.path().join("*")],
2156            max_read_bytes: 1,
2157            ..test_default_file_config(&dir)
2158        };
2159
2160        let path = dir.path().join("file");
2161        let mut file = File::create(&path).unwrap();
2162
2163        writeln!(&mut file, "hello i am a normal line").unwrap();
2164
2165        sleep_500_millis().await;
2166
2167        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2168            sleep_500_millis().await;
2169
2170            write!(&mut file, "i am not a full line").unwrap();
2171
2172            // Longer than the EOF timeout
2173            sleep_500_millis().await;
2174
2175            writeln!(&mut file, " until now").unwrap();
2176
2177            sleep_500_millis().await;
2178        })
2179        .await;
2180
2181        let received = extract_messages_value(received);
2182
2183        assert_eq!(
2184            received,
2185            vec![
2186                "hello i am a normal line".into(),
2187                "i am not a full line until now".into(),
2188            ]
2189        );
2190    }
2191
2192    #[tokio::test]
2193    async fn test_gzipped_file() {
2194        let dir = tempdir().unwrap();
2195        let config = file::FileConfig {
2196            include: vec![PathBuf::from("tests/data/gzipped.log")],
2197            // TODO: remove this once files are fingerprinted after decompression
2198            //
2199            // Currently, this needs to be smaller than the total size of the compressed file
2200            // because the fingerprinter tries to read until a newline, which it's not going to see
2201            // in the compressed data, or this number of bytes. If it hits EOF before that, it
2202            // can't return a fingerprint because the value would change once more data is written.
2203            max_line_bytes: 100,
2204            ..test_default_file_config(&dir)
2205        };
2206
2207        let received = run_file_source(
2208            &config,
2209            false,
2210            NoAcks,
2211            LogNamespace::Legacy,
2212            sleep_500_millis(),
2213        )
2214        .await;
2215
2216        let received = extract_messages_value(received);
2217
2218        assert_eq!(
2219            received,
2220            vec![
2221                "this is a simple file".into(),
2222                "i have been compressed".into(),
2223                "in order to make me smaller".into(),
2224                "but you can still read me".into(),
2225                "hooray".into(),
2226            ]
2227        );
2228    }
2229
2230    #[tokio::test]
2231    async fn test_non_utf8_encoded_file() {
2232        let dir = tempdir().unwrap();
2233        let config = file::FileConfig {
2234            include: vec![PathBuf::from("tests/data/utf-16le.log")],
2235            encoding: Some(EncodingConfig { charset: UTF_16LE }),
2236            ..test_default_file_config(&dir)
2237        };
2238
2239        let received = run_file_source(
2240            &config,
2241            false,
2242            NoAcks,
2243            LogNamespace::Legacy,
2244            sleep_500_millis(),
2245        )
2246        .await;
2247
2248        let received = extract_messages_value(received);
2249
2250        assert_eq!(
2251            received,
2252            vec![
2253                "hello i am a file".into(),
2254                "i can unicode".into(),
2255                "but i do so in 16 bits".into(),
2256                "and when i byte".into(),
2257                "i become little-endian".into(),
2258            ]
2259        );
2260    }
2261
2262    #[tokio::test]
2263    async fn test_non_default_line_delimiter() {
2264        let dir = tempdir().unwrap();
2265        let config = file::FileConfig {
2266            include: vec![dir.path().join("*")],
2267            line_delimiter: "\r\n".to_string(),
2268            ..test_default_file_config(&dir)
2269        };
2270
2271        let path = dir.path().join("file");
2272        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2273            let mut file = File::create(&path).unwrap();
2274
2275            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
2276
2277            write!(&mut file, "hello i am a line\r\n").unwrap();
2278            write!(&mut file, "and i am too\r\n").unwrap();
2279            write!(&mut file, "CRLF is how we end\r\n").unwrap();
2280            write!(&mut file, "please treat us well\r\n").unwrap();
2281
2282            sleep_500_millis().await;
2283        })
2284        .await;
2285
2286        let received = extract_messages_value(received);
2287
2288        assert_eq!(
2289            received,
2290            vec![
2291                "hello i am a line".into(),
2292                "and i am too".into(),
2293                "CRLF is how we end".into(),
2294                "please treat us well".into()
2295            ]
2296        );
2297    }
2298
2299    #[tokio::test]
2300    async fn remove_file() {
2301        let n = 5;
2302        let remove_after_secs = 1;
2303
2304        let dir = tempdir().unwrap();
2305        let config = file::FileConfig {
2306            include: vec![dir.path().join("*")],
2307            remove_after_secs: Some(remove_after_secs),
2308            ..test_default_file_config(&dir)
2309        };
2310
2311        let path = dir.path().join("file");
2312        let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2313            let mut file = File::create(&path).unwrap();
2314
2315            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
2316
2317            for i in 0..n {
2318                writeln!(&mut file, "{i}").unwrap();
2319            }
2320            drop(file);
2321
2322            for _ in 0..10 {
2323                // Wait for remove grace period to end.
2324                sleep(Duration::from_secs(remove_after_secs + 1)).await;
2325
2326                if File::open(&path).is_err() {
2327                    break;
2328                }
2329            }
2330        })
2331        .await;
2332
2333        assert_eq!(received.len(), n);
2334
2335        match File::open(&path) {
2336            Ok(_) => panic!("File wasn't removed"),
2337            Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2338        }
2339    }
2340
2341    #[derive(Clone, Copy, Eq, PartialEq)]
2342    enum AckingMode {
2343        NoAcks,      // No acknowledgement handling and no finalization
2344        Unfinalized, // Acknowledgement handling but no finalization
2345        Acks,        // Full acknowledgements and proper finalization
2346    }
2347    use AckingMode::*;
2348    use vector_lib::lookup::OwnedTargetPath;
2349
2350    async fn run_file_source(
2351        config: &FileConfig,
2352        wait_shutdown: bool,
2353        acking_mode: AckingMode,
2354        log_namespace: LogNamespace,
2355        inner: impl Future<Output = ()>,
2356    ) -> Vec<Event> {
2357        assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2358            let (tx, rx) = if acking_mode == Acks {
2359                let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2360                (tx, rx.boxed())
2361            } else {
2362                let (tx, rx) = SourceSender::new_test();
2363                (tx, rx.boxed())
2364            };
2365
2366            let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2367            let data_dir = config.data_dir.clone().unwrap();
2368            let acks = !matches!(acking_mode, NoAcks);
2369
2370            tokio::spawn(file::file_source(
2371                config,
2372                data_dir,
2373                shutdown,
2374                tx,
2375                acks,
2376                log_namespace,
2377            ));
2378
2379            inner.await;
2380
2381            drop(trigger_shutdown);
2382
2383            let result = if acking_mode == Unfinalized {
2384                rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2385                    .collect::<Vec<_>>()
2386                    .await
2387            } else {
2388                timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2389                    .await
2390                    .expect(
2391                        "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2392                    )
2393            };
2394            if wait_shutdown {
2395                shutdown_done.await;
2396            }
2397
2398            result
2399        })
2400        .await
2401    }
2402
2403    fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2404        received
2405            .into_iter()
2406            .map(Event::into_log)
2407            .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2408            .collect()
2409    }
2410
2411    fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2412        received
2413            .into_iter()
2414            .map(Event::into_log)
2415            .map(|log| log.get_message().unwrap().clone())
2416            .collect()
2417    }
2418}