vector/sources/
file.rs

1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::sync::oneshot;
10use tracing::{Instrument, Span};
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    codecs::{BytesDeserializer, BytesDeserializerConfig},
14    config::{LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    file_source::{
17        file_server::{FileServer, Line, calculate_ignore_before},
18        paths_provider::{Glob, MatchOptions},
19    },
20    file_source_common::{
21        Checkpointer, FileFingerprint, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
22    },
23    finalizer::OrderedFinalizer,
24    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
25};
26use vrl::value::Kind;
27
28use super::util::{EncodingConfig, MultilineConfig};
29use crate::{
30    SourceSender,
31    config::{
32        DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
33        log_schema,
34    },
35    encoding_transcode::{Decoder, Encoder},
36    event::{BatchNotifier, BatchStatus, LogEvent},
37    internal_events::{
38        FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
39        FileSourceInternalEventsEmitter, StreamClosedError,
40    },
41    line_agg::{self, LineAgg},
42    serde::bool_or_struct,
43    shutdown::ShutdownSignal,
44};
45
46#[derive(Debug, Snafu)]
47enum BuildError {
48    #[snafu(display(
49        "message_start_indicator {:?} is not a valid regex: {}",
50        indicator,
51        source
52    ))]
53    InvalidMessageStartIndicator {
54        indicator: String,
55        source: regex::Error,
56    },
57}
58
59/// Configuration for the `file` source.
60#[serde_as]
61#[configurable_component(source("file", "Collect logs from files."))]
62#[derive(Clone, Debug, PartialEq, Eq)]
63#[serde(deny_unknown_fields)]
64pub struct FileConfig {
65    /// Array of file patterns to include. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
66    #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
67    pub include: Vec<PathBuf>,
68
69    /// Array of file patterns to exclude. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
70    ///
71    /// Takes precedence over the `include` option. Note: The `exclude` patterns are applied _after_ the attempt to glob everything
72    /// in `include`. This means that all files are first matched by `include` and then filtered by the `exclude`
73    /// patterns. This can be impactful if `include` contains directories with contents that are not accessible.
74    #[serde(default)]
75    #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
76    pub exclude: Vec<PathBuf>,
77
78    /// Overrides the name of the log field used to add the file path to each event.
79    ///
80    /// The value is the full path to the file where the event was read message.
81    ///
82    /// Set to `""` to suppress this key.
83    #[serde(default = "default_file_key")]
84    #[configurable(metadata(docs::examples = "path"))]
85    pub file_key: OptionalValuePath,
86
87    /// Whether or not to start reading from the beginning of a new file.
88    #[configurable(
89        deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
90    )]
91    #[configurable(metadata(docs::hidden))]
92    #[serde(default)]
93    pub start_at_beginning: Option<bool>,
94
95    /// Whether or not to ignore existing checkpoints when determining where to start reading a file.
96    ///
97    /// Checkpoints are still written normally.
98    #[serde(default)]
99    pub ignore_checkpoints: Option<bool>,
100
101    #[serde(default = "default_read_from")]
102    #[configurable(derived)]
103    pub read_from: ReadFromConfig,
104
105    /// Ignore files with a data modification date older than the specified number of seconds.
106    #[serde(alias = "ignore_older", default)]
107    #[configurable(metadata(docs::type_unit = "seconds"))]
108    #[configurable(metadata(docs::examples = 600))]
109    #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
110    pub ignore_older_secs: Option<u64>,
111
112    /// The maximum size of a line before it is discarded.
113    ///
114    /// This protects against malformed lines or tailing incorrect files.
115    #[serde(default = "default_max_line_bytes")]
116    #[configurable(metadata(docs::type_unit = "bytes"))]
117    pub max_line_bytes: usize,
118
119    /// Overrides the name of the log field used to add the current hostname to each event.
120    ///
121    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
122    ///
123    /// Set to `""` to suppress this key.
124    ///
125    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
126    #[configurable(metadata(docs::examples = "hostname"))]
127    pub host_key: Option<OptionalValuePath>,
128
129    /// The directory used to persist file checkpoint positions.
130    ///
131    /// By default, the [global `data_dir` option][global_data_dir] is used.
132    /// Make sure the running user has write permissions to this directory.
133    ///
134    /// If this directory is specified, then Vector will attempt to create it.
135    ///
136    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
137    #[serde(default)]
138    #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
139    #[configurable(metadata(docs::human_name = "Data Directory"))]
140    pub data_dir: Option<PathBuf>,
141
142    /// Enables adding the file offset to each event and sets the name of the log field used.
143    ///
144    /// The value is the byte offset of the start of the line within the file.
145    ///
146    /// Off by default, the offset is only added to the event if this is set.
147    #[serde(default)]
148    #[configurable(metadata(docs::examples = "offset"))]
149    pub offset_key: Option<OptionalValuePath>,
150
151    /// The delay between file discovery calls.
152    ///
153    /// This controls the interval at which files are searched. A higher value results in greater
154    /// chances of some short-lived files being missed between searches, but a lower value increases
155    /// the performance impact of file discovery.
156    #[serde(
157        alias = "glob_minimum_cooldown",
158        default = "default_glob_minimum_cooldown_ms"
159    )]
160    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161    #[configurable(metadata(docs::type_unit = "milliseconds"))]
162    #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
163    pub glob_minimum_cooldown_ms: Duration,
164
165    #[configurable(derived)]
166    #[serde(alias = "fingerprinting", default)]
167    fingerprint: FingerprintConfig,
168
169    /// Ignore missing files when fingerprinting.
170    ///
171    /// This may be useful when used with source directories containing dangling symlinks.
172    #[serde(default)]
173    pub ignore_not_found: bool,
174
175    /// String value used to identify the start of a multi-line message.
176    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
177    #[configurable(metadata(docs::hidden))]
178    #[serde(default)]
179    pub message_start_indicator: Option<String>,
180
181    /// How long to wait for more data when aggregating a multi-line message, in milliseconds.
182    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
183    #[configurable(metadata(docs::hidden))]
184    #[serde(default = "default_multi_line_timeout")]
185    pub multi_line_timeout: u64,
186
187    /// Multiline aggregation configuration.
188    ///
189    /// If not specified, multiline aggregation is disabled.
190    #[configurable(derived)]
191    #[serde(default)]
192    pub multiline: Option<MultilineConfig>,
193
194    /// Max amount of bytes to read from a single file before switching over to the next file.
195    /// **Note:** This does not apply when `oldest_first` is `true`.
196    ///
197    /// This allows distributing the reads more or less evenly across
198    /// the files.
199    #[serde(default = "default_max_read_bytes")]
200    #[configurable(metadata(docs::type_unit = "bytes"))]
201    pub max_read_bytes: usize,
202
203    /// Instead of balancing read capacity fairly across all watched files, prioritize draining the oldest files before moving on to read data from more recent files.
204    #[serde(default)]
205    pub oldest_first: bool,
206
207    /// After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
208    ///
209    /// If not specified, files are not removed.
210    #[serde(alias = "remove_after", default)]
211    #[configurable(metadata(docs::type_unit = "seconds"))]
212    #[configurable(metadata(docs::examples = 0))]
213    #[configurable(metadata(docs::examples = 5))]
214    #[configurable(metadata(docs::examples = 60))]
215    #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
216    pub remove_after_secs: Option<u64>,
217
218    /// String sequence used to separate one file line from another.
219    #[serde(default = "default_line_delimiter")]
220    #[configurable(metadata(docs::examples = "\r\n"))]
221    pub line_delimiter: String,
222
223    #[configurable(derived)]
224    #[serde(default)]
225    pub encoding: Option<EncodingConfig>,
226
227    #[configurable(derived)]
228    #[serde(default, deserialize_with = "bool_or_struct")]
229    acknowledgements: SourceAcknowledgementsConfig,
230
231    /// The namespace to use for logs. This overrides the global setting.
232    #[configurable(metadata(docs::hidden))]
233    #[serde(default)]
234    log_namespace: Option<bool>,
235
236    #[configurable(derived)]
237    #[serde(default)]
238    internal_metrics: FileInternalMetricsConfig,
239
240    /// How long to keep an open handle to a rotated log file.
241    /// The default value represents "no limit"
242    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
243    #[configurable(metadata(docs::type_unit = "seconds"))]
244    #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
245    pub rotate_wait: Duration,
246}
247
248fn default_max_line_bytes() -> usize {
249    bytesize::kib(100u64) as usize
250}
251
252fn default_file_key() -> OptionalValuePath {
253    OptionalValuePath::from(owned_value_path!("file"))
254}
255
256const fn default_read_from() -> ReadFromConfig {
257    ReadFromConfig::Beginning
258}
259
260const fn default_glob_minimum_cooldown_ms() -> Duration {
261    Duration::from_millis(1000)
262}
263
264const fn default_multi_line_timeout() -> u64 {
265    1000
266} // deprecated
267
268const fn default_max_read_bytes() -> usize {
269    2048
270}
271
272fn default_line_delimiter() -> String {
273    "\n".to_string()
274}
275
276const fn default_rotate_wait() -> Duration {
277    Duration::from_secs(u64::MAX / 2)
278}
279
280/// Configuration for how files should be identified.
281///
282/// This is important for `checkpointing` when file rotation is used.
283#[configurable_component]
284#[derive(Clone, Debug, PartialEq, Eq)]
285#[serde(tag = "strategy", rename_all = "snake_case")]
286#[configurable(metadata(
287    docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
288))]
289pub enum FingerprintConfig {
290    /// Read lines from the beginning of the file and compute a checksum over them.
291    Checksum {
292        /// The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
293        /// If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
294        /// gzip is supported at this time.
295        ///
296        /// This can be helpful if all files share a common header that should be skipped.
297        #[serde(default = "default_ignored_header_bytes")]
298        #[configurable(metadata(docs::type_unit = "bytes"))]
299        ignored_header_bytes: usize,
300
301        /// The number of lines to read for generating the checksum.
302        ///
303        /// The number of lines are determined from the uncompressed content if the file is compressed. Only
304        /// gzip is supported at this time.
305        ///
306        /// If the file has less than this amount of lines, it won’t be read at all.
307        #[serde(default = "default_lines")]
308        #[configurable(metadata(docs::type_unit = "lines"))]
309        lines: usize,
310    },
311
312    /// Use the [device and inode][inode] as the identifier.
313    ///
314    /// [inode]: https://en.wikipedia.org/wiki/Inode
315    #[serde(rename = "device_and_inode")]
316    DevInode,
317}
318
319impl Default for FingerprintConfig {
320    fn default() -> Self {
321        Self::Checksum {
322            ignored_header_bytes: 0,
323            lines: default_lines(),
324        }
325    }
326}
327
328const fn default_ignored_header_bytes() -> usize {
329    0
330}
331
332const fn default_lines() -> usize {
333    1
334}
335
336impl From<FingerprintConfig> for FingerprintStrategy {
337    fn from(config: FingerprintConfig) -> FingerprintStrategy {
338        match config {
339            FingerprintConfig::Checksum {
340                ignored_header_bytes,
341                lines,
342            } => FingerprintStrategy::FirstLinesChecksum {
343                ignored_header_bytes,
344                lines,
345            },
346            FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
347        }
348    }
349}
350
351#[derive(Debug)]
352pub(crate) struct FinalizerEntry {
353    pub(crate) file_id: FileFingerprint,
354    pub(crate) offset: u64,
355}
356
357impl Default for FileConfig {
358    fn default() -> Self {
359        Self {
360            include: vec![PathBuf::from("/var/log/**/*.log")],
361            exclude: vec![],
362            file_key: default_file_key(),
363            start_at_beginning: None,
364            ignore_checkpoints: None,
365            read_from: default_read_from(),
366            ignore_older_secs: None,
367            max_line_bytes: default_max_line_bytes(),
368            fingerprint: FingerprintConfig::default(),
369            ignore_not_found: false,
370            host_key: None,
371            offset_key: None,
372            data_dir: None,
373            glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
374            message_start_indicator: None,
375            multi_line_timeout: default_multi_line_timeout(), // millis
376            multiline: None,
377            max_read_bytes: default_max_read_bytes(),
378            oldest_first: false,
379            remove_after_secs: None,
380            line_delimiter: default_line_delimiter(),
381            encoding: None,
382            acknowledgements: Default::default(),
383            log_namespace: None,
384            internal_metrics: Default::default(),
385            rotate_wait: default_rotate_wait(),
386        }
387    }
388}
389
390impl_generate_config_from_default!(FileConfig);
391
392#[async_trait::async_trait]
393#[typetag::serde(name = "file")]
394impl SourceConfig for FileConfig {
395    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
396        // add the source name as a subdir, so that multiple sources can
397        // operate within the same given data_dir (e.g. the global one)
398        // without the file servers' checkpointers interfering with each
399        // other
400        let data_dir = cx
401            .globals
402            // source are only global, name can be used for subdir
403            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
404
405        // Clippy rule, because async_trait?
406        #[allow(clippy::suspicious_else_formatting)]
407        {
408            if let Some(ref config) = self.multiline {
409                let _: line_agg::Config = config.try_into()?;
410            }
411
412            if let Some(ref indicator) = self.message_start_indicator {
413                Regex::new(indicator)
414                    .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
415            }
416        }
417
418        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
419
420        let log_namespace = cx.log_namespace(self.log_namespace);
421
422        Ok(file_source(
423            self,
424            data_dir,
425            cx.shutdown,
426            cx.out,
427            acknowledgements,
428            log_namespace,
429        ))
430    }
431
432    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
433        let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
434        let host_key = self
435            .host_key
436            .clone()
437            .unwrap_or(log_schema().host_key().cloned().into())
438            .path
439            .map(LegacyKey::Overwrite);
440
441        let offset_key = self
442            .offset_key
443            .clone()
444            .and_then(|k| k.path)
445            .map(LegacyKey::Overwrite);
446
447        let schema_definition = BytesDeserializerConfig
448            .schema_definition(global_log_namespace.merge(self.log_namespace))
449            .with_standard_vector_source_metadata()
450            .with_source_metadata(
451                Self::NAME,
452                host_key,
453                &owned_value_path!("host"),
454                Kind::bytes().or_undefined(),
455                Some("host"),
456            )
457            .with_source_metadata(
458                Self::NAME,
459                offset_key,
460                &owned_value_path!("offset"),
461                Kind::integer(),
462                None,
463            )
464            .with_source_metadata(
465                Self::NAME,
466                file_key,
467                &owned_value_path!("path"),
468                Kind::bytes(),
469                None,
470            );
471
472        vec![SourceOutput::new_maybe_logs(
473            DataType::Log,
474            schema_definition,
475        )]
476    }
477
478    fn can_acknowledge(&self) -> bool {
479        true
480    }
481}
482
483pub fn file_source(
484    config: &FileConfig,
485    data_dir: PathBuf,
486    shutdown: ShutdownSignal,
487    mut out: SourceSender,
488    acknowledgements: bool,
489    log_namespace: LogNamespace,
490) -> super::Source {
491    // the include option must be specified but also must contain at least one entry.
492    if config.include.is_empty() {
493        error!(
494            message = "`include` configuration option must contain at least one file pattern.",
495            internal_log_rate_limit = false
496        );
497        return Box::pin(future::ready(Err(())));
498    }
499
500    let exclude_patterns = config
501        .exclude
502        .iter()
503        .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
504        .collect::<Vec<PathBuf>>();
505    let ignore_before = calculate_ignore_before(config.ignore_older_secs);
506    let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
507    let (ignore_checkpoints, read_from) = reconcile_position_options(
508        config.start_at_beginning,
509        config.ignore_checkpoints,
510        Some(config.read_from),
511    );
512
513    let emitter = FileSourceInternalEventsEmitter {
514        include_file_metric_tag: config.internal_metrics.include_file_tag,
515    };
516
517    let paths_provider = Glob::new(
518        &config.include,
519        &exclude_patterns,
520        MatchOptions::default(),
521        emitter.clone(),
522    )
523    .expect("invalid glob patterns");
524
525    let encoding_charset = config.encoding.clone().map(|e| e.charset);
526
527    // if file encoding is specified, need to convert the line delimiter (present as utf8)
528    // to the specified encoding, so that delimiter-based line splitting can work properly
529    let line_delimiter_as_bytes = match encoding_charset {
530        Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
531        None => Bytes::from(config.line_delimiter.clone()),
532    };
533
534    let checkpointer = Checkpointer::new(&data_dir);
535    let strategy = config.fingerprint.clone().into();
536
537    let file_server = FileServer {
538        paths_provider,
539        max_read_bytes: config.max_read_bytes,
540        ignore_checkpoints,
541        read_from,
542        ignore_before,
543        max_line_bytes: config.max_line_bytes,
544        line_delimiter: line_delimiter_as_bytes,
545        data_dir,
546        glob_minimum_cooldown,
547        fingerprinter: Fingerprinter::new(strategy, config.max_line_bytes, config.ignore_not_found),
548        oldest_first: config.oldest_first,
549        remove_after: config.remove_after_secs.map(Duration::from_secs),
550        emitter,
551        rotate_wait: config.rotate_wait,
552    };
553
554    let event_metadata = EventMetadata {
555        host_key: config
556            .host_key
557            .clone()
558            .unwrap_or(log_schema().host_key().cloned().into())
559            .path,
560        hostname: crate::get_hostname().ok(),
561        file_key: config.file_key.clone().path,
562        offset_key: config.offset_key.clone().and_then(|k| k.path),
563    };
564
565    let include = config.include.clone();
566    let exclude = config.exclude.clone();
567    let multiline_config = config.multiline.clone();
568    let message_start_indicator = config.message_start_indicator.clone();
569    let multi_line_timeout = config.multi_line_timeout;
570
571    let (finalizer, shutdown_checkpointer) = if acknowledgements {
572        // The shutdown sent in to the finalizer is the global
573        // shutdown handle used to tell it to stop accepting new batch
574        // statuses and just wait for the remaining acks to come in.
575        let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
576
577        // We set up a separate shutdown signal to tie together the
578        // finalizer and the checkpoint writer task in the file
579        // server, to make it continue to write out updated
580        // checkpoints until all the acks have come in.
581        let (send_shutdown, shutdown2) = oneshot::channel::<()>();
582        let checkpoints = checkpointer.view();
583        tokio::spawn(async move {
584            while let Some((status, entry)) = ack_stream.next().await {
585                if status == BatchStatus::Delivered {
586                    checkpoints.update(entry.file_id, entry.offset);
587                }
588            }
589            send_shutdown.send(())
590        });
591        (Some(finalizer), shutdown2.map(|_| ()).boxed())
592    } else {
593        // When not dealing with end-to-end acknowledgements, just
594        // clone the global shutdown to stop the checkpoint writer.
595        (None, shutdown.clone().map(|_| ()).boxed())
596    };
597
598    let checkpoints = checkpointer.view();
599    let include_file_metric_tag = config.internal_metrics.include_file_tag;
600    Box::pin(async move {
601        info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
602
603        let mut encoding_decoder = encoding_charset.map(Decoder::new);
604
605        // sizing here is just a guess
606        let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
607        let rx = rx
608            .map(futures::stream::iter)
609            .flatten()
610            .map(move |mut line| {
611                emit!(FileBytesReceived {
612                    byte_size: line.text.len(),
613                    file: &line.filename,
614                    include_file_metric_tag,
615                });
616                // transcode each line from the file's encoding charset to utf8
617                line.text = match encoding_decoder.as_mut() {
618                    Some(d) => d.decode_to_utf8(line.text),
619                    None => line.text,
620                };
621                line
622            });
623
624        let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
625            if let Some(ref multiline_config) = multiline_config {
626                wrap_with_line_agg(
627                    rx,
628                    multiline_config.try_into().unwrap(), // validated in build
629                )
630            } else if let Some(msi) = message_start_indicator {
631                wrap_with_line_agg(
632                    rx,
633                    line_agg::Config::for_legacy(
634                        Regex::new(&msi).unwrap(), // validated in build
635                        multi_line_timeout,
636                    ),
637                )
638            } else {
639                Box::new(rx)
640            };
641
642        // Once file server ends this will run until it has finished processing remaining
643        // logs in the queue.
644        let span = Span::current();
645        let mut messages = messages.map(move |line| {
646            let mut event = create_event(
647                line.text,
648                line.start_offset,
649                &line.filename,
650                &event_metadata,
651                log_namespace,
652                include_file_metric_tag,
653            );
654
655            if let Some(finalizer) = &finalizer {
656                let (batch, receiver) = BatchNotifier::new_with_receiver();
657                event = event.with_batch_notifier(&batch);
658                let entry = FinalizerEntry {
659                    file_id: line.file_id,
660                    offset: line.end_offset,
661                };
662                // checkpoints.update will be called from ack_stream's thread
663                finalizer.add(entry, receiver);
664            } else {
665                checkpoints.update(line.file_id, line.end_offset);
666            }
667            event
668        });
669        tokio::spawn(async move {
670            match out
671                .send_event_stream(&mut messages)
672                .instrument(span.or_current())
673                .await
674            {
675                Ok(()) => {
676                    debug!("Finished sending.");
677                }
678                Err(_) => {
679                    let (count, _) = messages.size_hint();
680                    emit!(StreamClosedError { count });
681                }
682            }
683        });
684
685        let span = info_span!("file_server");
686        tokio::task::spawn_blocking(move || {
687            let _enter = span.enter();
688            let rt = tokio::runtime::Handle::current();
689            let result =
690                rt.block_on(file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer));
691            emit!(FileOpen { count: 0 });
692            // Panic if we encounter any error originating from the file server.
693            // We're at the `spawn_blocking` call, the panic will be caught and
694            // passed to the `JoinHandle` error, similar to the usual threads.
695            result.expect("file server exited with an error");
696        })
697        .map_err(|error| error!(message="File server unexpectedly stopped.", %error, internal_log_rate_limit = false))
698        .await
699    })
700}
701
702/// Emit deprecation warning if the old option is used, and take it into account when determining
703/// defaults. Any of the newer options will override it when set directly.
704fn reconcile_position_options(
705    start_at_beginning: Option<bool>,
706    ignore_checkpoints: Option<bool>,
707    read_from: Option<ReadFromConfig>,
708) -> (bool, ReadFrom) {
709    if start_at_beginning.is_some() {
710        warn!(
711            message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead."
712        )
713    }
714
715    match start_at_beginning {
716        Some(true) => (
717            ignore_checkpoints.unwrap_or(true),
718            read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
719        ),
720        _ => (
721            ignore_checkpoints.unwrap_or(false),
722            read_from.map(Into::into).unwrap_or_default(),
723        ),
724    }
725}
726
727fn wrap_with_line_agg(
728    rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
729    config: line_agg::Config,
730) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
731    let logic = line_agg::Logic::new(config);
732    Box::new(
733        LineAgg::new(
734            rx.map(|line| {
735                (
736                    line.filename,
737                    line.text,
738                    (line.file_id, line.start_offset, line.end_offset),
739                )
740            }),
741            logic,
742        )
743        .map(
744            |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
745                text,
746                filename,
747                file_id,
748                start_offset,
749                end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
750                    lastline_end_offset
751                }),
752            },
753        ),
754    )
755}
756
757struct EventMetadata {
758    host_key: Option<OwnedValuePath>,
759    hostname: Option<String>,
760    file_key: Option<OwnedValuePath>,
761    offset_key: Option<OwnedValuePath>,
762}
763
764fn create_event(
765    line: Bytes,
766    offset: u64,
767    file: &str,
768    meta: &EventMetadata,
769    log_namespace: LogNamespace,
770    include_file_metric_tag: bool,
771) -> LogEvent {
772    let deserializer = BytesDeserializer;
773    let mut event = deserializer.parse_single(line, log_namespace);
774
775    log_namespace.insert_vector_metadata(
776        &mut event,
777        log_schema().source_type_key(),
778        path!("source_type"),
779        Bytes::from_static(FileConfig::NAME.as_bytes()),
780    );
781    log_namespace.insert_vector_metadata(
782        &mut event,
783        log_schema().timestamp_key(),
784        path!("ingest_timestamp"),
785        Utc::now(),
786    );
787
788    let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
789    // `meta.host_key` is already `unwrap_or_else`ed so we can just pass it in.
790    if let Some(hostname) = &meta.hostname {
791        log_namespace.insert_source_metadata(
792            FileConfig::NAME,
793            &mut event,
794            legacy_host_key,
795            path!("host"),
796            hostname.clone(),
797        );
798    }
799
800    let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
801    log_namespace.insert_source_metadata(
802        FileConfig::NAME,
803        &mut event,
804        legacy_offset_key,
805        path!("offset"),
806        offset,
807    );
808
809    let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
810    log_namespace.insert_source_metadata(
811        FileConfig::NAME,
812        &mut event,
813        legacy_file_key,
814        path!("path"),
815        file,
816    );
817
818    emit!(FileEventsReceived {
819        count: 1,
820        file,
821        byte_size: event.estimated_json_encoded_size_of(),
822        include_file_metric_tag,
823    });
824
825    event
826}
827
828#[cfg(test)]
829mod tests {
830    use std::{
831        collections::HashSet,
832        fs::{self, File},
833        future::Future,
834        io::{Seek, Write},
835    };
836
837    use encoding_rs::UTF_16LE;
838    use similar_asserts::assert_eq;
839    use tempfile::tempdir;
840    use tokio::time::{Duration, sleep, timeout};
841    use vector_lib::schema::Definition;
842    use vrl::{value, value::kind::Collection};
843
844    use super::*;
845    use crate::{
846        config::Config,
847        event::{Event, EventStatus, Value},
848        shutdown::ShutdownSignal,
849        sources::file,
850        test_util::components::{FILE_SOURCE_TAGS, assert_source_compliance},
851    };
852
853    #[test]
854    fn generate_config() {
855        crate::test_util::test_generate_config::<FileConfig>();
856    }
857
858    fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
859        file::FileConfig {
860            fingerprint: FingerprintConfig::Checksum {
861                ignored_header_bytes: 0,
862                lines: 1,
863            },
864            data_dir: Some(dir.path().to_path_buf()),
865            glob_minimum_cooldown_ms: Duration::from_millis(100),
866            internal_metrics: FileInternalMetricsConfig {
867                include_file_tag: true,
868            },
869            ..Default::default()
870        }
871    }
872
873    async fn sleep_500_millis() {
874        sleep(Duration::from_millis(500)).await;
875    }
876
877    #[test]
878    fn parse_config() {
879        let config: FileConfig = toml::from_str(
880            r#"
881            include = [ "/var/log/**/*.log" ]
882            file_key = "file"
883            glob_minimum_cooldown_ms = 1000
884            multi_line_timeout = 1000
885            max_read_bytes = 2048
886            line_delimiter = "\n"
887        "#,
888        )
889        .unwrap();
890        assert_eq!(config, FileConfig::default());
891        assert_eq!(
892            config.fingerprint,
893            FingerprintConfig::Checksum {
894                ignored_header_bytes: 0,
895                lines: 1
896            }
897        );
898
899        let config: FileConfig = toml::from_str(
900            r#"
901        include = [ "/var/log/**/*.log" ]
902        [fingerprint]
903        strategy = "device_and_inode"
904        "#,
905        )
906        .unwrap();
907        assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
908
909        let config: FileConfig = toml::from_str(
910            r#"
911        include = [ "/var/log/**/*.log" ]
912        [fingerprint]
913        strategy = "checksum"
914        bytes = 128
915        ignored_header_bytes = 512
916        "#,
917        )
918        .unwrap();
919        assert_eq!(
920            config.fingerprint,
921            FingerprintConfig::Checksum {
922                ignored_header_bytes: 512,
923                lines: 1
924            }
925        );
926
927        let config: FileConfig = toml::from_str(
928            r#"
929        include = [ "/var/log/**/*.log" ]
930        [encoding]
931        charset = "utf-16le"
932        "#,
933        )
934        .unwrap();
935        assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
936
937        let config: FileConfig = toml::from_str(
938            r#"
939        include = [ "/var/log/**/*.log" ]
940        read_from = "beginning"
941        "#,
942        )
943        .unwrap();
944        assert_eq!(config.read_from, ReadFromConfig::Beginning);
945
946        let config: FileConfig = toml::from_str(
947            r#"
948        include = [ "/var/log/**/*.log" ]
949        read_from = "end"
950        "#,
951        )
952        .unwrap();
953        assert_eq!(config.read_from, ReadFromConfig::End);
954    }
955
956    #[test]
957    fn resolve_data_dir() {
958        let global_dir = tempdir().unwrap();
959        let local_dir = tempdir().unwrap();
960
961        let mut config = Config::default();
962        config.global.data_dir = global_dir.keep().into();
963
964        // local path given -- local should win
965        let res = config
966            .global
967            .resolve_and_validate_data_dir(test_default_file_config(&local_dir).data_dir.as_ref())
968            .unwrap();
969        assert_eq!(res, local_dir.path());
970
971        // no local path given -- global fallback should be in effect
972        let res = config.global.resolve_and_validate_data_dir(None).unwrap();
973        assert_eq!(res, config.global.data_dir.unwrap());
974    }
975
976    #[test]
977    fn output_schema_definition_vector_namespace() {
978        let definitions = FileConfig::default()
979            .outputs(LogNamespace::Vector)
980            .remove(0)
981            .schema_definition(true);
982
983        assert_eq!(
984            definitions,
985            Some(
986                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
987                    .with_meaning(OwnedTargetPath::event_root(), "message")
988                    .with_metadata_field(
989                        &owned_value_path!("vector", "source_type"),
990                        Kind::bytes(),
991                        None
992                    )
993                    .with_metadata_field(
994                        &owned_value_path!("vector", "ingest_timestamp"),
995                        Kind::timestamp(),
996                        None
997                    )
998                    .with_metadata_field(
999                        &owned_value_path!("file", "host"),
1000                        Kind::bytes().or_undefined(),
1001                        Some("host")
1002                    )
1003                    .with_metadata_field(
1004                        &owned_value_path!("file", "offset"),
1005                        Kind::integer(),
1006                        None
1007                    )
1008                    .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1009            )
1010        )
1011    }
1012
1013    #[test]
1014    fn output_schema_definition_legacy_namespace() {
1015        let definitions = FileConfig::default()
1016            .outputs(LogNamespace::Legacy)
1017            .remove(0)
1018            .schema_definition(true);
1019
1020        assert_eq!(
1021            definitions,
1022            Some(
1023                Definition::new_with_default_metadata(
1024                    Kind::object(Collection::empty()),
1025                    [LogNamespace::Legacy]
1026                )
1027                .with_event_field(
1028                    &owned_value_path!("message"),
1029                    Kind::bytes(),
1030                    Some("message")
1031                )
1032                .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1033                .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1034                .with_event_field(
1035                    &owned_value_path!("host"),
1036                    Kind::bytes().or_undefined(),
1037                    Some("host")
1038                )
1039                .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1040                .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1041            )
1042        )
1043    }
1044
1045    #[test]
1046    fn create_event_legacy_namespace() {
1047        let line = Bytes::from("hello world");
1048        let file = "some_file.rs";
1049        let offset: u64 = 0;
1050
1051        let meta = EventMetadata {
1052            host_key: Some(owned_value_path!("host")),
1053            hostname: Some("Some.Machine".to_string()),
1054            file_key: Some(owned_value_path!("file")),
1055            offset_key: Some(owned_value_path!("offset")),
1056        };
1057        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1058
1059        assert_eq!(log["file"], "some_file.rs".into());
1060        assert_eq!(log["host"], "Some.Machine".into());
1061        assert_eq!(log["offset"], 0.into());
1062        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1063        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1064        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1065    }
1066
1067    #[test]
1068    fn create_event_custom_fields_legacy_namespace() {
1069        let line = Bytes::from("hello world");
1070        let file = "some_file.rs";
1071        let offset: u64 = 0;
1072
1073        let meta = EventMetadata {
1074            host_key: Some(owned_value_path!("hostname")),
1075            hostname: Some("Some.Machine".to_string()),
1076            file_key: Some(owned_value_path!("file_path")),
1077            offset_key: Some(owned_value_path!("off")),
1078        };
1079        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1080
1081        assert_eq!(log["file_path"], "some_file.rs".into());
1082        assert_eq!(log["hostname"], "Some.Machine".into());
1083        assert_eq!(log["off"], 0.into());
1084        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1085        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1086        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1087    }
1088
1089    #[test]
1090    fn create_event_vector_namespace() {
1091        let line = Bytes::from("hello world");
1092        let file = "some_file.rs";
1093        let offset: u64 = 0;
1094
1095        let meta = EventMetadata {
1096            host_key: Some(owned_value_path!("ignored")),
1097            hostname: Some("Some.Machine".to_string()),
1098            file_key: Some(owned_value_path!("ignored")),
1099            offset_key: Some(owned_value_path!("ignored")),
1100        };
1101        let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1102
1103        assert_eq!(log.value(), &value!("hello world"));
1104
1105        assert_eq!(
1106            log.metadata()
1107                .value()
1108                .get(path!("vector", "source_type"))
1109                .unwrap(),
1110            &value!("file")
1111        );
1112        assert!(
1113            log.metadata()
1114                .value()
1115                .get(path!("vector", "ingest_timestamp"))
1116                .unwrap()
1117                .is_timestamp()
1118        );
1119
1120        assert_eq!(
1121            log.metadata()
1122                .value()
1123                .get(path!(FileConfig::NAME, "host"))
1124                .unwrap(),
1125            &value!("Some.Machine")
1126        );
1127        assert_eq!(
1128            log.metadata()
1129                .value()
1130                .get(path!(FileConfig::NAME, "offset"))
1131                .unwrap(),
1132            &value!(0)
1133        );
1134        assert_eq!(
1135            log.metadata()
1136                .value()
1137                .get(path!(FileConfig::NAME, "path"))
1138                .unwrap(),
1139            &value!("some_file.rs")
1140        );
1141    }
1142
1143    #[tokio::test]
1144    async fn file_happy_path() {
1145        let n = 5;
1146
1147        let dir = tempdir().unwrap();
1148        let config = file::FileConfig {
1149            include: vec![dir.path().join("*")],
1150            ..test_default_file_config(&dir)
1151        };
1152
1153        let path1 = dir.path().join("file1");
1154        let path2 = dir.path().join("file2");
1155
1156        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1157            let mut file1 = File::create(&path1).unwrap();
1158            let mut file2 = File::create(&path2).unwrap();
1159
1160            for i in 0..n {
1161                writeln!(&mut file1, "hello {i}").unwrap();
1162                writeln!(&mut file2, "goodbye {i}").unwrap();
1163            }
1164
1165            file1.flush().unwrap();
1166            file2.flush().unwrap();
1167
1168            sleep_500_millis().await;
1169        })
1170        .await;
1171
1172        let mut hello_i = 0;
1173        let mut goodbye_i = 0;
1174
1175        for event in received {
1176            let line =
1177                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1178            if line.starts_with("hello") {
1179                assert_eq!(line, format!("hello {}", hello_i));
1180                assert_eq!(
1181                    event.as_log()["file"].to_string_lossy(),
1182                    path1.to_str().unwrap()
1183                );
1184                hello_i += 1;
1185            } else {
1186                assert_eq!(line, format!("goodbye {}", goodbye_i));
1187                assert_eq!(
1188                    event.as_log()["file"].to_string_lossy(),
1189                    path2.to_str().unwrap()
1190                );
1191                goodbye_i += 1;
1192            }
1193        }
1194        assert_eq!(hello_i, n);
1195        assert_eq!(goodbye_i, n);
1196    }
1197
1198    // https://github.com/vectordotdev/vector/issues/8363
1199    #[tokio::test]
1200    async fn file_read_empty_lines() {
1201        let n = 5;
1202
1203        let dir = tempdir().unwrap();
1204        let config = file::FileConfig {
1205            include: vec![dir.path().join("*")],
1206            ..test_default_file_config(&dir)
1207        };
1208
1209        let path = dir.path().join("file");
1210
1211        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1212            let mut file = File::create(&path).unwrap();
1213
1214            writeln!(&mut file, "line for checkpointing").unwrap();
1215            for _i in 0..n {
1216                writeln!(&mut file).unwrap();
1217            }
1218            file.flush().unwrap();
1219
1220            sleep_500_millis().await;
1221        })
1222        .await;
1223
1224        assert_eq!(received.len(), n + 1);
1225    }
1226
1227    #[tokio::test]
1228    async fn file_truncate() {
1229        let n = 5;
1230
1231        let dir = tempdir().unwrap();
1232        let config = file::FileConfig {
1233            include: vec![dir.path().join("*")],
1234            ..test_default_file_config(&dir)
1235        };
1236        let path = dir.path().join("file");
1237        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1238            let mut file = File::create(&path).unwrap();
1239
1240            for i in 0..n {
1241                writeln!(&mut file, "pretrunc {i}").unwrap();
1242            }
1243
1244            file.flush().unwrap();
1245            sleep_500_millis().await; // The writes must be observed before truncating
1246
1247            file.set_len(0).unwrap();
1248            file.seek(std::io::SeekFrom::Start(0)).unwrap();
1249
1250            file.sync_all().unwrap();
1251            sleep_500_millis().await; // The truncate must be observed before writing again
1252
1253            for i in 0..n {
1254                writeln!(&mut file, "posttrunc {i}").unwrap();
1255            }
1256
1257            file.flush().unwrap();
1258            sleep_500_millis().await;
1259        })
1260        .await;
1261
1262        let mut i = 0;
1263        let mut pre_trunc = true;
1264
1265        for event in received {
1266            assert_eq!(
1267                event.as_log()["file"].to_string_lossy(),
1268                path.to_str().unwrap()
1269            );
1270
1271            let line =
1272                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1273
1274            if pre_trunc {
1275                assert_eq!(line, format!("pretrunc {}", i));
1276            } else {
1277                assert_eq!(line, format!("posttrunc {}", i));
1278            }
1279
1280            i += 1;
1281            if i == n {
1282                i = 0;
1283                pre_trunc = false;
1284            }
1285        }
1286    }
1287
1288    #[tokio::test]
1289    async fn file_rotate() {
1290        let n = 5;
1291
1292        let dir = tempdir().unwrap();
1293        let config = file::FileConfig {
1294            include: vec![dir.path().join("*")],
1295            ..test_default_file_config(&dir)
1296        };
1297
1298        let path = dir.path().join("file");
1299        let archive_path = dir.path().join("file");
1300        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1301            let mut file = File::create(&path).unwrap();
1302
1303            for i in 0..n {
1304                writeln!(&mut file, "prerot {i}").unwrap();
1305            }
1306
1307            file.flush().unwrap();
1308            sleep_500_millis().await; // The writes must be observed before rotating
1309
1310            fs::rename(&path, archive_path).expect("could not rename");
1311            file.sync_all().unwrap();
1312
1313            let mut file = File::create(&path).unwrap();
1314
1315            file.sync_all().unwrap();
1316            sleep_500_millis().await; // The rotation must be observed before writing again
1317
1318            for i in 0..n {
1319                writeln!(&mut file, "postrot {i}").unwrap();
1320            }
1321
1322            file.flush().unwrap();
1323            sleep_500_millis().await;
1324        })
1325        .await;
1326
1327        let mut i = 0;
1328        let mut pre_rot = true;
1329
1330        for event in received {
1331            assert_eq!(
1332                event.as_log()["file"].to_string_lossy(),
1333                path.to_str().unwrap()
1334            );
1335
1336            let line =
1337                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1338
1339            if pre_rot {
1340                assert_eq!(line, format!("prerot {}", i));
1341            } else {
1342                assert_eq!(line, format!("postrot {}", i));
1343            }
1344
1345            i += 1;
1346            if i == n {
1347                i = 0;
1348                pre_rot = false;
1349            }
1350        }
1351    }
1352
1353    #[tokio::test]
1354    async fn file_multiple_paths() {
1355        let n = 5;
1356
1357        let dir = tempdir().unwrap();
1358        let config = file::FileConfig {
1359            include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1360            exclude: vec![dir.path().join("a.*.txt")],
1361            ..test_default_file_config(&dir)
1362        };
1363
1364        let path1 = dir.path().join("a.txt");
1365        let path2 = dir.path().join("b.txt");
1366        let path3 = dir.path().join("a.log");
1367        let path4 = dir.path().join("a.ignore.txt");
1368        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1369            let mut file1 = File::create(&path1).unwrap();
1370            let mut file2 = File::create(&path2).unwrap();
1371            let mut file3 = File::create(&path3).unwrap();
1372            let mut file4 = File::create(&path4).unwrap();
1373
1374            for i in 0..n {
1375                writeln!(&mut file1, "1 {i}").unwrap();
1376                writeln!(&mut file2, "2 {i}").unwrap();
1377                writeln!(&mut file3, "3 {i}").unwrap();
1378                writeln!(&mut file4, "4 {i}").unwrap();
1379            }
1380            file1.flush().unwrap();
1381            file2.flush().unwrap();
1382            file3.flush().unwrap();
1383            file4.flush().unwrap();
1384
1385            sleep_500_millis().await;
1386        })
1387        .await;
1388
1389        let mut is = [0; 3];
1390
1391        for event in received {
1392            let line =
1393                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1394            let mut split = line.split(' ');
1395            let file = split.next().unwrap().parse::<usize>().unwrap();
1396            assert_ne!(file, 4);
1397            let i = split.next().unwrap().parse::<usize>().unwrap();
1398
1399            assert_eq!(is[file - 1], i);
1400            is[file - 1] += 1;
1401        }
1402
1403        assert_eq!(is, [n as usize; 3]);
1404    }
1405
1406    #[tokio::test]
1407    async fn file_exclude_paths() {
1408        let n = 5;
1409
1410        let dir = tempdir().unwrap();
1411        let config = file::FileConfig {
1412            include: vec![dir.path().join("a//b/*.log.*")],
1413            exclude: vec![dir.path().join("a//b/test.log.*")],
1414            ..test_default_file_config(&dir)
1415        };
1416
1417        let path1 = dir.path().join("a//b/a.log.1");
1418        let path2 = dir.path().join("a//b/test.log.1");
1419        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1420            std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1421            let mut file1 = File::create(&path1).unwrap();
1422            let mut file2 = File::create(&path2).unwrap();
1423
1424            for i in 0..n {
1425                writeln!(&mut file1, "1 {i}").unwrap();
1426                writeln!(&mut file2, "2 {i}").unwrap();
1427            }
1428
1429            file1.flush().unwrap();
1430            file2.flush().unwrap();
1431            sleep_500_millis().await;
1432        })
1433        .await;
1434
1435        let mut is = [0; 1];
1436
1437        for event in received {
1438            let line =
1439                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1440            let mut split = line.split(' ');
1441            let file = split.next().unwrap().parse::<usize>().unwrap();
1442            assert_ne!(file, 4);
1443            let i = split.next().unwrap().parse::<usize>().unwrap();
1444
1445            assert_eq!(is[file - 1], i);
1446            is[file - 1] += 1;
1447        }
1448
1449        assert_eq!(is, [n as usize; 1]);
1450    }
1451
1452    #[tokio::test]
1453    async fn file_key_acknowledged() {
1454        file_key(Acks).await
1455    }
1456
1457    #[tokio::test]
1458    async fn file_key_no_acknowledge() {
1459        file_key(NoAcks).await
1460    }
1461
1462    async fn file_key(acks: AckingMode) {
1463        // Default
1464        {
1465            let dir = tempdir().unwrap();
1466            let config = file::FileConfig {
1467                include: vec![dir.path().join("*")],
1468                ..test_default_file_config(&dir)
1469            };
1470
1471            let path = dir.path().join("file");
1472            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1473                let mut file = File::create(&path).unwrap();
1474
1475                writeln!(&mut file, "hello there").unwrap();
1476                file.flush().unwrap();
1477
1478                sleep_500_millis().await;
1479            })
1480            .await;
1481
1482            assert_eq!(received.len(), 1);
1483            assert_eq!(
1484                received[0].as_log()["file"].to_string_lossy(),
1485                path.to_str().unwrap()
1486            );
1487        }
1488
1489        // Custom
1490        {
1491            let dir = tempdir().unwrap();
1492            let config = file::FileConfig {
1493                include: vec![dir.path().join("*")],
1494                file_key: OptionalValuePath::from(owned_value_path!("source")),
1495                ..test_default_file_config(&dir)
1496            };
1497
1498            let path = dir.path().join("file");
1499            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1500                let mut file = File::create(&path).unwrap();
1501
1502                writeln!(&mut file, "hello there").unwrap();
1503                file.flush().unwrap();
1504
1505                sleep_500_millis().await;
1506            })
1507            .await;
1508
1509            assert_eq!(received.len(), 1);
1510            assert_eq!(
1511                received[0].as_log()["source"].to_string_lossy(),
1512                path.to_str().unwrap()
1513            );
1514        }
1515
1516        // Hidden
1517        {
1518            let dir = tempdir().unwrap();
1519            let config = file::FileConfig {
1520                include: vec![dir.path().join("*")],
1521                ..test_default_file_config(&dir)
1522            };
1523
1524            let path = dir.path().join("file");
1525            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1526                let mut file = File::create(&path).unwrap();
1527
1528                writeln!(&mut file, "hello there").unwrap();
1529
1530                file.flush().unwrap();
1531                sleep_500_millis().await;
1532            })
1533            .await;
1534
1535            assert_eq!(received.len(), 1);
1536            assert_eq!(
1537                received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1538                vec![
1539                    default_file_key()
1540                        .path
1541                        .expect("file key to exist")
1542                        .to_string()
1543                        .into(),
1544                    log_schema().host_key().unwrap().to_string().into(),
1545                    log_schema().message_key().unwrap().to_string().into(),
1546                    log_schema().timestamp_key().unwrap().to_string().into(),
1547                    log_schema().source_type_key().unwrap().to_string().into()
1548                ]
1549                .into_iter()
1550                .collect::<HashSet<_>>()
1551            );
1552        }
1553    }
1554
1555    #[tokio::test]
1556    async fn file_start_position_server_restart_acknowledged() {
1557        file_start_position_server_restart(Acks).await
1558    }
1559
1560    #[tokio::test]
1561    async fn file_start_position_server_restart_no_acknowledge() {
1562        file_start_position_server_restart(NoAcks).await
1563    }
1564
1565    async fn file_start_position_server_restart(acking: AckingMode) {
1566        let dir = tempdir().unwrap();
1567        let config = file::FileConfig {
1568            include: vec![dir.path().join("*")],
1569            ..test_default_file_config(&dir)
1570        };
1571
1572        let path = dir.path().join("file");
1573        let mut file = File::create(&path).unwrap();
1574        writeln!(&mut file, "zeroth line").unwrap();
1575        file.flush().unwrap();
1576
1577        // First time server runs it picks up existing lines.
1578        {
1579            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1580                sleep_500_millis().await;
1581                writeln!(&mut file, "first line").unwrap();
1582                file.flush().unwrap();
1583                sleep_500_millis().await;
1584            })
1585            .await;
1586
1587            let lines = extract_messages_string(received);
1588            assert_eq!(lines, vec!["zeroth line", "first line"]);
1589        }
1590        // Restart server, read file from checkpoint.
1591        {
1592            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1593                sleep_500_millis().await;
1594                writeln!(&mut file, "second line").unwrap();
1595                file.flush().unwrap();
1596                sleep_500_millis().await;
1597            })
1598            .await;
1599
1600            let lines = extract_messages_string(received);
1601            assert_eq!(lines, vec!["second line"]);
1602        }
1603        // Restart server, read files from beginning.
1604        {
1605            let config = file::FileConfig {
1606                include: vec![dir.path().join("*")],
1607                ignore_checkpoints: Some(true),
1608                read_from: ReadFromConfig::Beginning,
1609                ..test_default_file_config(&dir)
1610            };
1611            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1612                sleep_500_millis().await;
1613                writeln!(&mut file, "third line").unwrap();
1614                file.flush().unwrap();
1615                sleep_500_millis().await;
1616            })
1617            .await;
1618
1619            let lines = extract_messages_string(received);
1620            assert_eq!(
1621                lines,
1622                vec!["zeroth line", "first line", "second line", "third line"]
1623            );
1624        }
1625    }
1626
1627    #[tokio::test]
1628    async fn file_start_position_server_restart_unfinalized() {
1629        let dir = tempdir().unwrap();
1630        let config = file::FileConfig {
1631            include: vec![dir.path().join("*")],
1632            ..test_default_file_config(&dir)
1633        };
1634
1635        let path = dir.path().join("file");
1636        let mut file = File::create(&path).unwrap();
1637        writeln!(&mut file, "the line").unwrap();
1638        file.flush().unwrap();
1639
1640        // First time server runs it picks up existing lines.
1641        let received = run_file_source(
1642            &config,
1643            false,
1644            Unfinalized,
1645            LogNamespace::Legacy,
1646            sleep(Duration::from_secs(5)),
1647        )
1648        .await;
1649        let lines = extract_messages_string(received);
1650        assert_eq!(lines, vec!["the line"]);
1651
1652        // Restart server, it re-reads file since the events were not acknowledged before shutdown
1653        let received = run_file_source(
1654            &config,
1655            false,
1656            Unfinalized,
1657            LogNamespace::Legacy,
1658            sleep(Duration::from_secs(5)),
1659        )
1660        .await;
1661        let lines = extract_messages_string(received);
1662        assert_eq!(lines, vec!["the line"]);
1663    }
1664
1665    #[tokio::test]
1666    async fn file_duplicate_processing_after_restart() {
1667        let dir = tempdir().unwrap();
1668        let config = file::FileConfig {
1669            include: vec![dir.path().join("*")],
1670            ..test_default_file_config(&dir)
1671        };
1672
1673        let path = dir.path().join("file");
1674        let mut file = File::create(&path).unwrap();
1675
1676        let line_count = 4000;
1677        for i in 0..line_count {
1678            writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1679        }
1680        file.flush().unwrap();
1681
1682        // First time server runs it should pick up a bunch of lines
1683        let received = run_file_source(
1684            &config,
1685            true,
1686            Acks,
1687            LogNamespace::Legacy,
1688            // shutdown signal is sent after this duration
1689            sleep_500_millis(),
1690        )
1691        .await;
1692        let lines = extract_messages_string(received);
1693
1694        // ...but not all the lines; if the first run processed the entire file, we may not hit the
1695        // bug we're testing for, which happens if the finalizer stream exits on shutdown with pending acks
1696        assert!(lines.len() < line_count);
1697
1698        // Restart the server, and it should read the rest without duplicating any
1699        let received = run_file_source(
1700            &config,
1701            true,
1702            Acks,
1703            LogNamespace::Legacy,
1704            sleep(Duration::from_secs(5)),
1705        )
1706        .await;
1707        let lines2 = extract_messages_string(received);
1708
1709        // Between both runs, we should have the expected number of lines
1710        assert_eq!(lines.len() + lines2.len(), line_count);
1711    }
1712
1713    #[tokio::test]
1714    async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1715        file_start_position_server_restart_with_file_rotation(Acks).await
1716    }
1717
1718    #[tokio::test]
1719    async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1720        file_start_position_server_restart_with_file_rotation(NoAcks).await
1721    }
1722
1723    async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1724        let dir = tempdir().unwrap();
1725        let config = file::FileConfig {
1726            include: vec![dir.path().join("*")],
1727            ..test_default_file_config(&dir)
1728        };
1729
1730        let path = dir.path().join("file");
1731        let path_for_old_file = dir.path().join("file.old");
1732        // Run server first time, collect some lines.
1733        {
1734            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1735                let mut file = File::create(&path).unwrap();
1736                writeln!(&mut file, "first line").unwrap();
1737                file.flush().unwrap();
1738                sleep_500_millis().await;
1739            })
1740            .await;
1741
1742            let lines = extract_messages_string(received);
1743            assert_eq!(lines, vec!["first line"]);
1744        }
1745        // Perform 'file rotation' to archive old lines.
1746        fs::rename(&path, &path_for_old_file).expect("could not rename");
1747        // Restart the server and make sure it does not re-read the old file
1748        // even though it has a new name.
1749        {
1750            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1751                let mut file = File::create(&path).unwrap();
1752                writeln!(&mut file, "second line").unwrap();
1753                file.flush().unwrap();
1754                sleep_500_millis().await;
1755            })
1756            .await;
1757
1758            let lines = extract_messages_string(received);
1759            assert_eq!(lines, vec!["second line"]);
1760        }
1761    }
1762
1763    #[cfg(unix)] // this test uses unix-specific function `futimes` during test time
1764    #[tokio::test]
1765    async fn file_start_position_ignore_old_files() {
1766        use std::{
1767            os::unix::io::AsRawFd,
1768            time::{Duration, SystemTime},
1769        };
1770
1771        let dir = tempdir().unwrap();
1772        let config = file::FileConfig {
1773            include: vec![dir.path().join("*")],
1774            ignore_older_secs: Some(5),
1775            ..test_default_file_config(&dir)
1776        };
1777
1778        let before_path = dir.path().join("before");
1779        let mut before_file = File::create(&before_path).unwrap();
1780        let after_path = dir.path().join("after");
1781        let mut after_file = File::create(&after_path).unwrap();
1782
1783        writeln!(&mut before_file, "first line").unwrap(); // first few bytes make up unique file fingerprint
1784        writeln!(&mut after_file, "_first line").unwrap(); //   and therefore need to be non-identical
1785
1786        {
1787            // Set the modified times
1788            let before = SystemTime::now() - Duration::from_secs(8);
1789            let after = SystemTime::now() - Duration::from_secs(2);
1790
1791            let before_time = libc::timeval {
1792                tv_sec: before
1793                    .duration_since(SystemTime::UNIX_EPOCH)
1794                    .unwrap()
1795                    .as_secs() as _,
1796                tv_usec: 0,
1797            };
1798            let before_times = [before_time, before_time];
1799
1800            let after_time = libc::timeval {
1801                tv_sec: after
1802                    .duration_since(SystemTime::UNIX_EPOCH)
1803                    .unwrap()
1804                    .as_secs() as _,
1805                tv_usec: 0,
1806            };
1807            let after_times = [after_time, after_time];
1808
1809            unsafe {
1810                libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1811                libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1812            }
1813        }
1814
1815        before_file.sync_all().unwrap();
1816        after_file.sync_all().unwrap();
1817
1818        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1819            sleep_500_millis().await;
1820            writeln!(&mut before_file, "second line").unwrap();
1821            writeln!(&mut after_file, "_second line").unwrap();
1822
1823            before_file.flush().unwrap();
1824            after_file.flush().unwrap();
1825            sleep_500_millis().await;
1826        })
1827        .await;
1828
1829        let before_lines = received
1830            .iter()
1831            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1832            .map(|event| {
1833                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1834            })
1835            .collect::<Vec<_>>();
1836        let after_lines = received
1837            .iter()
1838            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1839            .map(|event| {
1840                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1841            })
1842            .collect::<Vec<_>>();
1843        assert_eq!(before_lines, vec!["second line"]);
1844        assert_eq!(after_lines, vec!["_first line", "_second line"]);
1845    }
1846
1847    #[tokio::test]
1848    async fn file_max_line_bytes() {
1849        let dir = tempdir().unwrap();
1850        let config = file::FileConfig {
1851            include: vec![dir.path().join("*")],
1852            max_line_bytes: 10,
1853            ..test_default_file_config(&dir)
1854        };
1855
1856        let path = dir.path().join("file");
1857        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1858            let mut file = File::create(&path).unwrap();
1859
1860            writeln!(&mut file, "short").unwrap();
1861            writeln!(&mut file, "this is too long").unwrap();
1862            writeln!(&mut file, "11 eleven11").unwrap();
1863            let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1864            writeln!(&mut file, "{super_long}").unwrap();
1865            writeln!(&mut file, "exactly 10").unwrap();
1866            writeln!(&mut file, "it can end on a line that's too long").unwrap();
1867
1868            file.flush().unwrap();
1869            sleep_500_millis().await;
1870            sleep_500_millis().await;
1871
1872            writeln!(&mut file, "and then continue").unwrap();
1873            writeln!(&mut file, "last short").unwrap();
1874            file.flush().unwrap();
1875
1876            sleep_500_millis().await;
1877            sleep_500_millis().await;
1878        }).await;
1879
1880        let received = extract_messages_value(received);
1881
1882        assert_eq!(
1883            received,
1884            vec!["short".into(), "exactly 10".into(), "last short".into()]
1885        );
1886    }
1887
1888    #[tokio::test]
1889    async fn test_multi_line_aggregation_legacy() {
1890        let dir = tempdir().unwrap();
1891        let config = file::FileConfig {
1892            include: vec![dir.path().join("*")],
1893            message_start_indicator: Some("INFO".into()),
1894            multi_line_timeout: 25, // less than 50 in sleep()
1895            ..test_default_file_config(&dir)
1896        };
1897
1898        let path = dir.path().join("file");
1899        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1900            let mut file = File::create(&path).unwrap();
1901
1902            writeln!(&mut file, "leftover foo").unwrap();
1903            writeln!(&mut file, "INFO hello").unwrap();
1904            writeln!(&mut file, "INFO goodbye").unwrap();
1905            writeln!(&mut file, "part of goodbye").unwrap();
1906
1907            file.flush().unwrap();
1908            sleep_500_millis().await;
1909
1910            writeln!(&mut file, "INFO hi again").unwrap();
1911            writeln!(&mut file, "and some more").unwrap();
1912            writeln!(&mut file, "INFO hello").unwrap();
1913
1914            file.flush().unwrap();
1915            sleep_500_millis().await;
1916
1917            writeln!(&mut file, "too slow").unwrap();
1918            writeln!(&mut file, "INFO doesn't have").unwrap();
1919            writeln!(&mut file, "to be INFO in").unwrap();
1920            writeln!(&mut file, "the middle").unwrap();
1921
1922            file.flush().unwrap();
1923            sleep_500_millis().await;
1924        })
1925        .await;
1926
1927        let received = extract_messages_value(received);
1928
1929        assert_eq!(
1930            received,
1931            vec![
1932                "leftover foo".into(),
1933                "INFO hello".into(),
1934                "INFO goodbye\npart of goodbye".into(),
1935                "INFO hi again\nand some more".into(),
1936                "INFO hello".into(),
1937                "too slow".into(),
1938                "INFO doesn't have".into(),
1939                "to be INFO in\nthe middle".into(),
1940            ]
1941        );
1942    }
1943
1944    #[tokio::test]
1945    async fn test_multi_line_aggregation() {
1946        let dir = tempdir().unwrap();
1947        let config = file::FileConfig {
1948            include: vec![dir.path().join("*")],
1949            multiline: Some(MultilineConfig {
1950                start_pattern: "INFO".to_owned(),
1951                condition_pattern: "INFO".to_owned(),
1952                mode: line_agg::Mode::HaltBefore,
1953                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
1954            }),
1955            ..test_default_file_config(&dir)
1956        };
1957
1958        let path = dir.path().join("file");
1959        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1960            let mut file = File::create(&path).unwrap();
1961
1962            writeln!(&mut file, "leftover foo").unwrap();
1963            writeln!(&mut file, "INFO hello").unwrap();
1964            writeln!(&mut file, "INFO goodbye").unwrap();
1965            writeln!(&mut file, "part of goodbye").unwrap();
1966
1967            file.flush().unwrap();
1968            sleep_500_millis().await;
1969
1970            writeln!(&mut file, "INFO hi again").unwrap();
1971            writeln!(&mut file, "and some more").unwrap();
1972            writeln!(&mut file, "INFO hello").unwrap();
1973
1974            file.flush().unwrap();
1975            sleep_500_millis().await;
1976
1977            writeln!(&mut file, "too slow").unwrap();
1978            writeln!(&mut file, "INFO doesn't have").unwrap();
1979            writeln!(&mut file, "to be INFO in").unwrap();
1980            writeln!(&mut file, "the middle").unwrap();
1981
1982            file.flush().unwrap();
1983            sleep_500_millis().await;
1984        })
1985        .await;
1986
1987        let received = extract_messages_value(received);
1988
1989        assert_eq!(
1990            received,
1991            vec![
1992                "leftover foo".into(),
1993                "INFO hello".into(),
1994                "INFO goodbye\npart of goodbye".into(),
1995                "INFO hi again\nand some more".into(),
1996                "INFO hello".into(),
1997                "too slow".into(),
1998                "INFO doesn't have".into(),
1999                "to be INFO in\nthe middle".into(),
2000            ]
2001        );
2002    }
2003
2004    #[tokio::test]
2005    async fn test_multi_line_checkpointing() {
2006        let dir = tempdir().unwrap();
2007        let config = file::FileConfig {
2008            include: vec![dir.path().join("*")],
2009            offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2010            multiline: Some(MultilineConfig {
2011                start_pattern: "INFO".to_owned(),
2012                condition_pattern: "INFO".to_owned(),
2013                mode: line_agg::Mode::HaltBefore,
2014                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
2015            }),
2016            ..test_default_file_config(&dir)
2017        };
2018
2019        let path = dir.path().join("file");
2020        let mut file = File::create(&path).unwrap();
2021
2022        writeln!(&mut file, "INFO hello").unwrap();
2023        writeln!(&mut file, "part of hello").unwrap();
2024
2025        file.sync_all().unwrap();
2026
2027        // Read and aggregate existing lines
2028        let received = run_file_source(
2029            &config,
2030            false,
2031            Acks,
2032            LogNamespace::Legacy,
2033            sleep_500_millis(),
2034        )
2035        .await;
2036
2037        assert_eq!(received[0].as_log()["offset"], 0.into());
2038
2039        let lines = extract_messages_string(received);
2040        assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2041
2042        // After restart, we should not see any part of the previously aggregated lines
2043        let received_after_restart =
2044            run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2045                writeln!(&mut file, "INFO goodbye").unwrap();
2046                file.flush().unwrap();
2047                sleep_500_millis().await;
2048            })
2049            .await;
2050        assert_eq!(
2051            received_after_restart[0].as_log()["offset"],
2052            (lines[0].len() + 1).into()
2053        );
2054        let lines = extract_messages_string(received_after_restart);
2055        assert_eq!(lines, vec!["INFO goodbye"]);
2056    }
2057
2058    #[tokio::test]
2059    async fn test_fair_reads() {
2060        let dir = tempdir().unwrap();
2061        let config = file::FileConfig {
2062            include: vec![dir.path().join("*")],
2063            max_read_bytes: 1,
2064            oldest_first: false,
2065            ..test_default_file_config(&dir)
2066        };
2067
2068        let older_path = dir.path().join("z_older_file");
2069        let mut older = File::create(&older_path).unwrap();
2070
2071        writeln!(&mut older, "hello i am the old file").unwrap();
2072        writeln!(&mut older, "i have been around a while").unwrap();
2073        writeln!(&mut older, "you can read newer files at the same time").unwrap();
2074        older.sync_all().unwrap();
2075
2076        let newer_path = dir.path().join("a_newer_file");
2077        let mut newer = File::create(&newer_path).unwrap();
2078
2079        writeln!(&mut newer, "and i am the new file").unwrap();
2080        writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2081        writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2082        newer.sync_all().unwrap();
2083
2084        let received = run_file_source(
2085            &config,
2086            false,
2087            NoAcks,
2088            LogNamespace::Legacy,
2089            sleep_500_millis(),
2090        )
2091        .await;
2092
2093        let received = extract_messages_value(received);
2094
2095        let old_first = vec![
2096            "hello i am the old file".into(),
2097            "and i am the new file".into(),
2098            "i have been around a while".into(),
2099            "this should be interleaved with the old one".into(),
2100            "you can read newer files at the same time".into(),
2101            "which is fine because we want fairness".into(),
2102        ];
2103        let new_first: Vec<_> = old_first
2104            .chunks(2)
2105            .flat_map(|chunk| chunk.iter().rev().cloned().collect::<Vec<_>>())
2106            .collect();
2107
2108        if received[0] == old_first[0] {
2109            assert_eq!(received, old_first);
2110        } else {
2111            assert_eq!(received, new_first);
2112        }
2113    }
2114
2115    #[tokio::test]
2116    async fn test_oldest_first() {
2117        let dir = tempdir().unwrap();
2118        let config = file::FileConfig {
2119            include: vec![dir.path().join("*")],
2120            max_read_bytes: 1,
2121            oldest_first: true,
2122            ..test_default_file_config(&dir)
2123        };
2124
2125        let older_path = dir.path().join("z_older_file");
2126        let mut older = File::create(&older_path).unwrap();
2127        older.sync_all().unwrap();
2128
2129        // Sleep to ensure the creation timestamps are different
2130        sleep_500_millis().await;
2131
2132        let newer_path = dir.path().join("a_newer_file");
2133        let mut newer = File::create(&newer_path).unwrap();
2134        newer.sync_all().unwrap();
2135
2136        writeln!(&mut older, "hello i am the old file").unwrap();
2137        writeln!(&mut older, "i have been around a while").unwrap();
2138        writeln!(&mut older, "you should definitely read all of me first").unwrap();
2139        older.flush().unwrap();
2140
2141        writeln!(&mut newer, "i'm new").unwrap();
2142        writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2143        writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2144        newer.flush().unwrap();
2145
2146        let received = run_file_source(
2147            &config,
2148            false,
2149            NoAcks,
2150            LogNamespace::Legacy,
2151            sleep_500_millis(),
2152        )
2153        .await;
2154
2155        let received = extract_messages_value(received);
2156
2157        assert_eq!(
2158            received,
2159            vec![
2160                "hello i am the old file".into(),
2161                "i have been around a while".into(),
2162                "you should definitely read all of me first".into(),
2163                "i'm new".into(),
2164                "hopefully you read all the old stuff first".into(),
2165                "because otherwise i'm not going to make sense".into(),
2166            ]
2167        );
2168    }
2169
2170    #[tokio::test]
2171    async fn test_split_reads() {
2172        let dir = tempdir().unwrap();
2173        let config = file::FileConfig {
2174            include: vec![dir.path().join("*")],
2175            max_read_bytes: 1,
2176            ..test_default_file_config(&dir)
2177        };
2178
2179        let path = dir.path().join("file");
2180        let mut file = File::create(&path).unwrap();
2181
2182        writeln!(&mut file, "hello i am a normal line").unwrap();
2183        file.sync_all().unwrap();
2184
2185        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2186            sleep_500_millis().await;
2187
2188            write!(&mut file, "i am not a full line").unwrap();
2189
2190            file.flush().unwrap();
2191            // Longer than the EOF timeout
2192            sleep_500_millis().await;
2193
2194            writeln!(&mut file, " until now").unwrap();
2195
2196            file.flush().unwrap();
2197            sleep_500_millis().await;
2198        })
2199        .await;
2200
2201        let received = extract_messages_value(received);
2202
2203        assert_eq!(
2204            received,
2205            vec![
2206                "hello i am a normal line".into(),
2207                "i am not a full line until now".into(),
2208            ]
2209        );
2210    }
2211
2212    #[tokio::test]
2213    async fn test_gzipped_file() {
2214        let dir = tempdir().unwrap();
2215        let config = file::FileConfig {
2216            include: vec![PathBuf::from("tests/data/gzipped.log")],
2217            // TODO: remove this once files are fingerprinted after decompression
2218            //
2219            // Currently, this needs to be smaller than the total size of the compressed file
2220            // because the fingerprinter tries to read until a newline, which it's not going to see
2221            // in the compressed data, or this number of bytes. If it hits EOF before that, it
2222            // can't return a fingerprint because the value would change once more data is written.
2223            max_line_bytes: 100,
2224            ..test_default_file_config(&dir)
2225        };
2226
2227        let received = run_file_source(
2228            &config,
2229            false,
2230            NoAcks,
2231            LogNamespace::Legacy,
2232            sleep_500_millis(),
2233        )
2234        .await;
2235
2236        let received = extract_messages_value(received);
2237
2238        assert_eq!(
2239            received,
2240            vec![
2241                "this is a simple file".into(),
2242                "i have been compressed".into(),
2243                "in order to make me smaller".into(),
2244                "but you can still read me".into(),
2245                "hooray".into(),
2246            ]
2247        );
2248    }
2249
2250    #[tokio::test]
2251    async fn test_non_utf8_encoded_file() {
2252        let dir = tempdir().unwrap();
2253        let config = file::FileConfig {
2254            include: vec![PathBuf::from("tests/data/utf-16le.log")],
2255            encoding: Some(EncodingConfig { charset: UTF_16LE }),
2256            ..test_default_file_config(&dir)
2257        };
2258
2259        let received = run_file_source(
2260            &config,
2261            false,
2262            NoAcks,
2263            LogNamespace::Legacy,
2264            sleep_500_millis(),
2265        )
2266        .await;
2267
2268        let received = extract_messages_value(received);
2269
2270        assert_eq!(
2271            received,
2272            vec![
2273                "hello i am a file".into(),
2274                "i can unicode".into(),
2275                "but i do so in 16 bits".into(),
2276                "and when i byte".into(),
2277                "i become little-endian".into(),
2278            ]
2279        );
2280    }
2281
2282    #[tokio::test]
2283    async fn test_non_default_line_delimiter() {
2284        let dir = tempdir().unwrap();
2285        let config = file::FileConfig {
2286            include: vec![dir.path().join("*")],
2287            line_delimiter: "\r\n".to_string(),
2288            ..test_default_file_config(&dir)
2289        };
2290
2291        let path = dir.path().join("file");
2292        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2293            let mut file = File::create(&path).unwrap();
2294
2295            write!(&mut file, "hello i am a line\r\n").unwrap();
2296            write!(&mut file, "and i am too\r\n").unwrap();
2297            write!(&mut file, "CRLF is how we end\r\n").unwrap();
2298            write!(&mut file, "please treat us well\r\n").unwrap();
2299
2300            file.flush().unwrap();
2301            sleep_500_millis().await;
2302        })
2303        .await;
2304
2305        let received = extract_messages_value(received);
2306
2307        assert_eq!(
2308            received,
2309            vec![
2310                "hello i am a line".into(),
2311                "and i am too".into(),
2312                "CRLF is how we end".into(),
2313                "please treat us well".into()
2314            ]
2315        );
2316    }
2317
2318    // Regression test for https://github.com/vectordotdev/vector/issues/24027
2319    // Tests that multi-character delimiters (like \r\n) are correctly handled when
2320    // split across buffer boundaries. Without the fix, events would be merged together.
2321    #[tokio::test]
2322    async fn test_multi_char_delimiter_split_across_buffer_boundary() {
2323        let dir = tempdir().unwrap();
2324        let config = file::FileConfig {
2325            include: vec![dir.path().join("*")],
2326            line_delimiter: "\r\n".to_string(),
2327            ..test_default_file_config(&dir)
2328        };
2329
2330        let path = dir.path().join("file");
2331        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2332            let mut file = File::create(&path).unwrap();
2333
2334            sleep_500_millis().await;
2335
2336            // Create data where \r\n is split at 8KB buffer boundary
2337            // This reproduces the exact scenario that caused data corruption:
2338            // - Event 1 ends with \r at byte 8191
2339            // - The \n appears at byte 8192 (right at the buffer boundary)
2340            // - Without the fix, Event 1 and Event 2 would be merged
2341
2342            let buffer_size = 8192;
2343
2344            // Event 1: Position \r\n to split at first boundary
2345            let event1_prefix = "Event 1: ";
2346            let padding1_len = buffer_size - event1_prefix.len() - 1; // -1 for the \r
2347            write!(&mut file, "{}", event1_prefix).unwrap();
2348            file.write_all(&vec![b'X'; padding1_len]).unwrap();
2349            write!(&mut file, "\r\n").unwrap(); // \r at byte 8191, \n at byte 8192
2350
2351            // Event 2: Position \r\n to split at second boundary
2352            let event2_prefix = "Event 2: ";
2353            let padding2_len = buffer_size - event2_prefix.len() - 1;
2354            write!(&mut file, "{}", event2_prefix).unwrap();
2355            file.write_all(&vec![b'Y'; padding2_len]).unwrap();
2356            write!(&mut file, "\r\n").unwrap(); // \r at byte 16383, \n at byte 16384
2357
2358            // Event 3: Normal line without boundary split
2359            write!(&mut file, "Event 3: Final\r\n").unwrap();
2360
2361            sleep_500_millis().await;
2362        })
2363        .await;
2364
2365        let messages = extract_messages_value(received);
2366
2367        // The bug would cause Events 1 and 2 to be merged into a single message
2368        assert_eq!(
2369            messages.len(),
2370            3,
2371            "Should receive exactly 3 separate events (bug would merge them)"
2372        );
2373
2374        // Verify each event is correctly separated and starts with expected prefix
2375        let msg0 = messages[0].to_string_lossy();
2376        let msg1 = messages[1].to_string_lossy();
2377        let msg2 = messages[2].to_string_lossy();
2378
2379        assert!(
2380            msg0.starts_with("Event 1: "),
2381            "First event should start with 'Event 1: ', got: {}",
2382            msg0
2383        );
2384        assert!(
2385            msg1.starts_with("Event 2: "),
2386            "Second event should start with 'Event 2: ', got: {}",
2387            msg1
2388        );
2389        assert_eq!(msg2, "Event 3: Final");
2390
2391        // Ensure no event contains embedded CR/LF (sign of incorrect merging)
2392        for (i, msg) in messages.iter().enumerate() {
2393            let msg_str = msg.to_string_lossy();
2394            assert!(
2395                !msg_str.contains('\r'),
2396                "Event {} should not contain embedded \\r",
2397                i
2398            );
2399            assert!(
2400                !msg_str.contains('\n'),
2401                "Event {} should not contain embedded \\n",
2402                i
2403            );
2404        }
2405    }
2406
2407    #[tokio::test]
2408    async fn remove_file() {
2409        let n = 5;
2410        let remove_after_secs = 1;
2411
2412        let dir = tempdir().unwrap();
2413        let config = file::FileConfig {
2414            include: vec![dir.path().join("*")],
2415            remove_after_secs: Some(remove_after_secs),
2416            ..test_default_file_config(&dir)
2417        };
2418
2419        let path = dir.path().join("file");
2420        let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2421            let mut file = File::create(&path).unwrap();
2422
2423            for i in 0..n {
2424                writeln!(&mut file, "{i}").unwrap();
2425            }
2426            file.flush().unwrap();
2427            drop(file);
2428
2429            for _ in 0..10 {
2430                // Wait for remove grace period to end.
2431                sleep(Duration::from_secs(remove_after_secs + 1)).await;
2432
2433                if File::open(&path).is_err() {
2434                    break;
2435                }
2436            }
2437        })
2438        .await;
2439
2440        assert_eq!(received.len(), n);
2441
2442        match File::open(&path) {
2443            Ok(_) => panic!("File wasn't removed"),
2444            Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2445        }
2446    }
2447
2448    #[derive(Clone, Copy, Eq, PartialEq)]
2449    enum AckingMode {
2450        NoAcks,      // No acknowledgement handling and no finalization
2451        Unfinalized, // Acknowledgement handling but no finalization
2452        Acks,        // Full acknowledgements and proper finalization
2453    }
2454    use AckingMode::*;
2455    use vector_lib::lookup::OwnedTargetPath;
2456
2457    async fn run_file_source(
2458        config: &FileConfig,
2459        wait_shutdown: bool,
2460        acking_mode: AckingMode,
2461        log_namespace: LogNamespace,
2462        inner: impl Future<Output = ()>,
2463    ) -> Vec<Event> {
2464        assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2465            let (tx, rx) = if acking_mode == Acks {
2466                let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2467                (tx, rx.boxed())
2468            } else {
2469                let (tx, rx) = SourceSender::new_test();
2470                (tx, rx.boxed())
2471            };
2472
2473            let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2474            let data_dir = config.data_dir.clone().unwrap();
2475            let acks = !matches!(acking_mode, NoAcks);
2476
2477            tokio::spawn(file::file_source(
2478                config,
2479                data_dir,
2480                shutdown,
2481                tx,
2482                acks,
2483                log_namespace,
2484            ));
2485
2486            inner.await;
2487
2488            drop(trigger_shutdown);
2489
2490            let result = if acking_mode == Unfinalized {
2491                rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2492                    .collect::<Vec<_>>()
2493                    .await
2494            } else {
2495                timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2496                    .await
2497                    .expect(
2498                        "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2499                    )
2500            };
2501            if wait_shutdown {
2502                shutdown_done.await;
2503            }
2504
2505            result
2506        })
2507        .await
2508    }
2509
2510    fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2511        received
2512            .into_iter()
2513            .map(Event::into_log)
2514            .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2515            .collect()
2516    }
2517
2518    fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2519        received
2520            .into_iter()
2521            .map(Event::into_log)
2522            .map(|log| log.get_message().unwrap().clone())
2523            .collect()
2524    }
2525}