vector/sources/
file.rs

1use std::{convert::TryInto, future, path::PathBuf, time::Duration};
2
3use bytes::Bytes;
4use chrono::Utc;
5use futures::{FutureExt, Stream, StreamExt, TryFutureExt};
6use regex::bytes::Regex;
7use serde_with::serde_as;
8use snafu::{ResultExt, Snafu};
9use tokio::{sync::oneshot, task::spawn_blocking};
10use tracing::{Instrument, Span};
11use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig};
12use vector_lib::configurable::configurable_component;
13use vector_lib::file_source::{
14    calculate_ignore_before,
15    paths_provider::glob::{Glob, MatchOptions},
16    Checkpointer, FileFingerprint, FileServer, FingerprintStrategy, Fingerprinter, Line, ReadFrom,
17    ReadFromConfig,
18};
19use vector_lib::finalizer::OrderedFinalizer;
20use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
21use vector_lib::{
22    config::{LegacyKey, LogNamespace},
23    EstimatedJsonEncodedSizeOf,
24};
25use vrl::value::Kind;
26
27use super::util::{EncodingConfig, MultilineConfig};
28use crate::{
29    config::{
30        log_schema, DataType, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
31        SourceOutput,
32    },
33    encoding_transcode::{Decoder, Encoder},
34    event::{BatchNotifier, BatchStatus, LogEvent},
35    internal_events::{
36        FileBytesReceived, FileEventsReceived, FileInternalMetricsConfig, FileOpen,
37        FileSourceInternalEventsEmitter, StreamClosedError,
38    },
39    line_agg::{self, LineAgg},
40    serde::bool_or_struct,
41    shutdown::ShutdownSignal,
42    SourceSender,
43};
44
45#[derive(Debug, Snafu)]
46enum BuildError {
47    #[snafu(display(
48        "message_start_indicator {:?} is not a valid regex: {}",
49        indicator,
50        source
51    ))]
52    InvalidMessageStartIndicator {
53        indicator: String,
54        source: regex::Error,
55    },
56}
57
58/// Configuration for the `file` source.
59#[serde_as]
60#[configurable_component(source("file", "Collect logs from files."))]
61#[derive(Clone, Debug, PartialEq, Eq)]
62#[serde(deny_unknown_fields)]
63pub struct FileConfig {
64    /// Array of file patterns to include. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
65    #[configurable(metadata(docs::examples = "/var/log/**/*.log"))]
66    pub include: Vec<PathBuf>,
67
68    /// Array of file patterns to exclude. [Globbing](https://vector.dev/docs/reference/configuration/sources/file/#globbing) is supported.
69    ///
70    /// Takes precedence over the `include` option. Note: The `exclude` patterns are applied _after_ the attempt to glob everything
71    /// in `include`. This means that all files are first matched by `include` and then filtered by the `exclude`
72    /// patterns. This can be impactful if `include` contains directories with contents that are not accessible.
73    #[serde(default)]
74    #[configurable(metadata(docs::examples = "/var/log/binary-file.log"))]
75    pub exclude: Vec<PathBuf>,
76
77    /// Overrides the name of the log field used to add the file path to each event.
78    ///
79    /// The value is the full path to the file where the event was read message.
80    ///
81    /// Set to `""` to suppress this key.
82    #[serde(default = "default_file_key")]
83    #[configurable(metadata(docs::examples = "path"))]
84    pub file_key: OptionalValuePath,
85
86    /// Whether or not to start reading from the beginning of a new file.
87    #[configurable(
88        deprecated = "This option has been deprecated, use `ignore_checkpoints`/`read_from` instead."
89    )]
90    #[configurable(metadata(docs::hidden))]
91    #[serde(default)]
92    pub start_at_beginning: Option<bool>,
93
94    /// Whether or not to ignore existing checkpoints when determining where to start reading a file.
95    ///
96    /// Checkpoints are still written normally.
97    #[serde(default)]
98    pub ignore_checkpoints: Option<bool>,
99
100    #[serde(default = "default_read_from")]
101    #[configurable(derived)]
102    pub read_from: ReadFromConfig,
103
104    /// Ignore files with a data modification date older than the specified number of seconds.
105    #[serde(alias = "ignore_older", default)]
106    #[configurable(metadata(docs::type_unit = "seconds"))]
107    #[configurable(metadata(docs::examples = 600))]
108    #[configurable(metadata(docs::human_name = "Ignore Older Files"))]
109    pub ignore_older_secs: Option<u64>,
110
111    /// The maximum size of a line before it is discarded.
112    ///
113    /// This protects against malformed lines or tailing incorrect files.
114    #[serde(default = "default_max_line_bytes")]
115    #[configurable(metadata(docs::type_unit = "bytes"))]
116    pub max_line_bytes: usize,
117
118    /// Overrides the name of the log field used to add the current hostname to each event.
119    ///
120    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
121    ///
122    /// Set to `""` to suppress this key.
123    ///
124    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
125    #[configurable(metadata(docs::examples = "hostname"))]
126    pub host_key: Option<OptionalValuePath>,
127
128    /// The directory used to persist file checkpoint positions.
129    ///
130    /// By default, the [global `data_dir` option][global_data_dir] is used.
131    /// Make sure the running user has write permissions to this directory.
132    ///
133    /// If this directory is specified, then Vector will attempt to create it.
134    ///
135    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
136    #[serde(default)]
137    #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
138    #[configurable(metadata(docs::human_name = "Data Directory"))]
139    pub data_dir: Option<PathBuf>,
140
141    /// Enables adding the file offset to each event and sets the name of the log field used.
142    ///
143    /// The value is the byte offset of the start of the line within the file.
144    ///
145    /// Off by default, the offset is only added to the event if this is set.
146    #[serde(default)]
147    #[configurable(metadata(docs::examples = "offset"))]
148    pub offset_key: Option<OptionalValuePath>,
149
150    /// The delay between file discovery calls.
151    ///
152    /// This controls the interval at which files are searched. A higher value results in greater
153    /// chances of some short-lived files being missed between searches, but a lower value increases
154    /// the performance impact of file discovery.
155    #[serde(
156        alias = "glob_minimum_cooldown",
157        default = "default_glob_minimum_cooldown_ms"
158    )]
159    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
160    #[configurable(metadata(docs::type_unit = "milliseconds"))]
161    #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
162    pub glob_minimum_cooldown_ms: Duration,
163
164    #[configurable(derived)]
165    #[serde(alias = "fingerprinting", default)]
166    fingerprint: FingerprintConfig,
167
168    /// Ignore missing files when fingerprinting.
169    ///
170    /// This may be useful when used with source directories containing dangling symlinks.
171    #[serde(default)]
172    pub ignore_not_found: bool,
173
174    /// String value used to identify the start of a multi-line message.
175    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
176    #[configurable(metadata(docs::hidden))]
177    #[serde(default)]
178    pub message_start_indicator: Option<String>,
179
180    /// How long to wait for more data when aggregating a multi-line message, in milliseconds.
181    #[configurable(deprecated = "This option has been deprecated, use `multiline` instead.")]
182    #[configurable(metadata(docs::hidden))]
183    #[serde(default = "default_multi_line_timeout")]
184    pub multi_line_timeout: u64,
185
186    /// Multiline aggregation configuration.
187    ///
188    /// If not specified, multiline aggregation is disabled.
189    #[configurable(derived)]
190    #[serde(default)]
191    pub multiline: Option<MultilineConfig>,
192
193    /// Max amount of bytes to read from a single file before switching over to the next file.
194    /// **Note:** This does not apply when `oldest_first` is `true`.
195    ///
196    /// This allows distributing the reads more or less evenly across
197    /// the files.
198    #[serde(default = "default_max_read_bytes")]
199    #[configurable(metadata(docs::type_unit = "bytes"))]
200    pub max_read_bytes: usize,
201
202    /// Instead of balancing read capacity fairly across all watched files, prioritize draining the oldest files before moving on to read data from more recent files.
203    #[serde(default)]
204    pub oldest_first: bool,
205
206    /// After reaching EOF, the number of seconds to wait before removing the file, unless new data is written.
207    ///
208    /// If not specified, files are not removed.
209    #[serde(alias = "remove_after", default)]
210    #[configurable(metadata(docs::type_unit = "seconds"))]
211    #[configurable(metadata(docs::examples = 0))]
212    #[configurable(metadata(docs::examples = 5))]
213    #[configurable(metadata(docs::examples = 60))]
214    #[configurable(metadata(docs::human_name = "Wait Time Before Removing File"))]
215    pub remove_after_secs: Option<u64>,
216
217    /// String sequence used to separate one file line from another.
218    #[serde(default = "default_line_delimiter")]
219    #[configurable(metadata(docs::examples = "\r\n"))]
220    pub line_delimiter: String,
221
222    #[configurable(derived)]
223    #[serde(default)]
224    pub encoding: Option<EncodingConfig>,
225
226    #[configurable(derived)]
227    #[serde(default, deserialize_with = "bool_or_struct")]
228    acknowledgements: SourceAcknowledgementsConfig,
229
230    /// The namespace to use for logs. This overrides the global setting.
231    #[configurable(metadata(docs::hidden))]
232    #[serde(default)]
233    log_namespace: Option<bool>,
234
235    #[configurable(derived)]
236    #[serde(default)]
237    internal_metrics: FileInternalMetricsConfig,
238
239    /// How long to keep an open handle to a rotated log file.
240    /// The default value represents "no limit"
241    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
242    #[configurable(metadata(docs::type_unit = "seconds"))]
243    #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
244    pub rotate_wait: Duration,
245}
246
247fn default_max_line_bytes() -> usize {
248    bytesize::kib(100u64) as usize
249}
250
251fn default_file_key() -> OptionalValuePath {
252    OptionalValuePath::from(owned_value_path!("file"))
253}
254
255const fn default_read_from() -> ReadFromConfig {
256    ReadFromConfig::Beginning
257}
258
259const fn default_glob_minimum_cooldown_ms() -> Duration {
260    Duration::from_millis(1000)
261}
262
263const fn default_multi_line_timeout() -> u64 {
264    1000
265} // deprecated
266
267const fn default_max_read_bytes() -> usize {
268    2048
269}
270
271fn default_line_delimiter() -> String {
272    "\n".to_string()
273}
274
275const fn default_rotate_wait() -> Duration {
276    Duration::from_secs(u64::MAX / 2)
277}
278
279/// Configuration for how files should be identified.
280///
281/// This is important for `checkpointing` when file rotation is used.
282#[configurable_component]
283#[derive(Clone, Debug, PartialEq, Eq)]
284#[serde(tag = "strategy", rename_all = "snake_case")]
285#[configurable(metadata(
286    docs::enum_tag_description = "The strategy used to uniquely identify files.\n\nThis is important for checkpointing when file rotation is used."
287))]
288pub enum FingerprintConfig {
289    /// Read lines from the beginning of the file and compute a checksum over them.
290    Checksum {
291        /// Maximum number of bytes to use, from the lines that are read, for generating the checksum.
292        ///
293        // TODO: Should we properly expose this in the documentation? There could definitely be value in allowing more
294        // bytes to be used for the checksum generation, but we should commit to exposing it rather than hiding it.
295        #[serde(alias = "fingerprint_bytes")]
296        #[configurable(metadata(docs::hidden))]
297        #[configurable(metadata(docs::type_unit = "bytes"))]
298        bytes: Option<usize>,
299
300        /// The number of bytes to skip ahead (or ignore) when reading the data used for generating the checksum.
301        /// If the file is compressed, the number of bytes refer to the header in the uncompressed content. Only
302        /// gzip is supported at this time.
303        ///
304        /// This can be helpful if all files share a common header that should be skipped.
305        #[serde(default = "default_ignored_header_bytes")]
306        #[configurable(metadata(docs::type_unit = "bytes"))]
307        ignored_header_bytes: usize,
308
309        /// The number of lines to read for generating the checksum.
310        ///
311        /// The number of lines are determined from the uncompressed content if the file is compressed. Only
312        /// gzip is supported at this time.
313        ///
314        /// If the file has less than this amount of lines, it won’t be read at all.
315        #[serde(default = "default_lines")]
316        #[configurable(metadata(docs::type_unit = "lines"))]
317        lines: usize,
318    },
319
320    /// Use the [device and inode][inode] as the identifier.
321    ///
322    /// [inode]: https://en.wikipedia.org/wiki/Inode
323    #[serde(rename = "device_and_inode")]
324    DevInode,
325}
326
327impl Default for FingerprintConfig {
328    fn default() -> Self {
329        Self::Checksum {
330            bytes: None,
331            ignored_header_bytes: 0,
332            lines: default_lines(),
333        }
334    }
335}
336
337const fn default_ignored_header_bytes() -> usize {
338    0
339}
340
341const fn default_lines() -> usize {
342    1
343}
344
345impl From<FingerprintConfig> for FingerprintStrategy {
346    fn from(config: FingerprintConfig) -> FingerprintStrategy {
347        match config {
348            FingerprintConfig::Checksum {
349                bytes,
350                ignored_header_bytes,
351                lines,
352            } => {
353                let bytes = match bytes {
354                    Some(bytes) => {
355                        warn!(message = "The `fingerprint.bytes` option will be used to convert old file fingerprints created by vector < v0.11.0, but are not supported for new file fingerprints. The first line will be used instead.");
356                        bytes
357                    }
358                    None => 256,
359                };
360                FingerprintStrategy::Checksum {
361                    bytes,
362                    ignored_header_bytes,
363                    lines,
364                }
365            }
366            FingerprintConfig::DevInode => FingerprintStrategy::DevInode,
367        }
368    }
369}
370
371#[derive(Debug)]
372pub(crate) struct FinalizerEntry {
373    pub(crate) file_id: FileFingerprint,
374    pub(crate) offset: u64,
375}
376
377impl Default for FileConfig {
378    fn default() -> Self {
379        Self {
380            include: vec![PathBuf::from("/var/log/**/*.log")],
381            exclude: vec![],
382            file_key: default_file_key(),
383            start_at_beginning: None,
384            ignore_checkpoints: None,
385            read_from: default_read_from(),
386            ignore_older_secs: None,
387            max_line_bytes: default_max_line_bytes(),
388            fingerprint: FingerprintConfig::default(),
389            ignore_not_found: false,
390            host_key: None,
391            offset_key: None,
392            data_dir: None,
393            glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
394            message_start_indicator: None,
395            multi_line_timeout: default_multi_line_timeout(), // millis
396            multiline: None,
397            max_read_bytes: default_max_read_bytes(),
398            oldest_first: false,
399            remove_after_secs: None,
400            line_delimiter: default_line_delimiter(),
401            encoding: None,
402            acknowledgements: Default::default(),
403            log_namespace: None,
404            internal_metrics: Default::default(),
405            rotate_wait: default_rotate_wait(),
406        }
407    }
408}
409
410impl_generate_config_from_default!(FileConfig);
411
412#[async_trait::async_trait]
413#[typetag::serde(name = "file")]
414impl SourceConfig for FileConfig {
415    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
416        // add the source name as a subdir, so that multiple sources can
417        // operate within the same given data_dir (e.g. the global one)
418        // without the file servers' checkpointers interfering with each
419        // other
420        let data_dir = cx
421            .globals
422            // source are only global, name can be used for subdir
423            .resolve_and_make_data_subdir(self.data_dir.as_ref(), cx.key.id())?;
424
425        // Clippy rule, because async_trait?
426        #[allow(clippy::suspicious_else_formatting)]
427        {
428            if let Some(ref config) = self.multiline {
429                let _: line_agg::Config = config.try_into()?;
430            }
431
432            if let Some(ref indicator) = self.message_start_indicator {
433                Regex::new(indicator)
434                    .with_context(|_| InvalidMessageStartIndicatorSnafu { indicator })?;
435            }
436        }
437
438        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
439
440        let log_namespace = cx.log_namespace(self.log_namespace);
441
442        Ok(file_source(
443            self,
444            data_dir,
445            cx.shutdown,
446            cx.out,
447            acknowledgements,
448            log_namespace,
449        ))
450    }
451
452    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
453        let file_key = self.file_key.clone().path.map(LegacyKey::Overwrite);
454        let host_key = self
455            .host_key
456            .clone()
457            .unwrap_or(log_schema().host_key().cloned().into())
458            .path
459            .map(LegacyKey::Overwrite);
460
461        let offset_key = self
462            .offset_key
463            .clone()
464            .and_then(|k| k.path)
465            .map(LegacyKey::Overwrite);
466
467        let schema_definition = BytesDeserializerConfig
468            .schema_definition(global_log_namespace.merge(self.log_namespace))
469            .with_standard_vector_source_metadata()
470            .with_source_metadata(
471                Self::NAME,
472                host_key,
473                &owned_value_path!("host"),
474                Kind::bytes().or_undefined(),
475                Some("host"),
476            )
477            .with_source_metadata(
478                Self::NAME,
479                offset_key,
480                &owned_value_path!("offset"),
481                Kind::integer(),
482                None,
483            )
484            .with_source_metadata(
485                Self::NAME,
486                file_key,
487                &owned_value_path!("path"),
488                Kind::bytes(),
489                None,
490            );
491
492        vec![SourceOutput::new_maybe_logs(
493            DataType::Log,
494            schema_definition,
495        )]
496    }
497
498    fn can_acknowledge(&self) -> bool {
499        true
500    }
501}
502
503pub fn file_source(
504    config: &FileConfig,
505    data_dir: PathBuf,
506    shutdown: ShutdownSignal,
507    mut out: SourceSender,
508    acknowledgements: bool,
509    log_namespace: LogNamespace,
510) -> super::Source {
511    // the include option must be specified but also must contain at least one entry.
512    if config.include.is_empty() {
513        error!(message = "`include` configuration option must contain at least one file pattern.");
514        return Box::pin(future::ready(Err(())));
515    }
516
517    let exclude_patterns = config
518        .exclude
519        .iter()
520        .map(|path_buf| path_buf.iter().collect::<std::path::PathBuf>())
521        .collect::<Vec<PathBuf>>();
522    let ignore_before = calculate_ignore_before(config.ignore_older_secs);
523    let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
524    let (ignore_checkpoints, read_from) = reconcile_position_options(
525        config.start_at_beginning,
526        config.ignore_checkpoints,
527        Some(config.read_from),
528    );
529
530    let emitter = FileSourceInternalEventsEmitter {
531        include_file_metric_tag: config.internal_metrics.include_file_tag,
532    };
533
534    let paths_provider = Glob::new(
535        &config.include,
536        &exclude_patterns,
537        MatchOptions::default(),
538        emitter.clone(),
539    )
540    .expect("invalid glob patterns");
541
542    let encoding_charset = config.encoding.clone().map(|e| e.charset);
543
544    // if file encoding is specified, need to convert the line delimiter (present as utf8)
545    // to the specified encoding, so that delimiter-based line splitting can work properly
546    let line_delimiter_as_bytes = match encoding_charset {
547        Some(e) => Encoder::new(e).encode_from_utf8(&config.line_delimiter),
548        None => Bytes::from(config.line_delimiter.clone()),
549    };
550
551    let checkpointer = Checkpointer::new(&data_dir);
552    let file_server = FileServer {
553        paths_provider,
554        max_read_bytes: config.max_read_bytes,
555        ignore_checkpoints,
556        read_from,
557        ignore_before,
558        max_line_bytes: config.max_line_bytes,
559        line_delimiter: line_delimiter_as_bytes,
560        data_dir,
561        glob_minimum_cooldown,
562        fingerprinter: Fingerprinter {
563            strategy: config.fingerprint.clone().into(),
564            max_line_length: config.max_line_bytes,
565            ignore_not_found: config.ignore_not_found,
566        },
567        oldest_first: config.oldest_first,
568        remove_after: config.remove_after_secs.map(Duration::from_secs),
569        emitter,
570        handle: tokio::runtime::Handle::current(),
571        rotate_wait: config.rotate_wait,
572    };
573
574    let event_metadata = EventMetadata {
575        host_key: config
576            .host_key
577            .clone()
578            .unwrap_or(log_schema().host_key().cloned().into())
579            .path,
580        hostname: crate::get_hostname().ok(),
581        file_key: config.file_key.clone().path,
582        offset_key: config.offset_key.clone().and_then(|k| k.path),
583    };
584
585    let include = config.include.clone();
586    let exclude = config.exclude.clone();
587    let multiline_config = config.multiline.clone();
588    let message_start_indicator = config.message_start_indicator.clone();
589    let multi_line_timeout = config.multi_line_timeout;
590
591    let (finalizer, shutdown_checkpointer) = if acknowledgements {
592        // The shutdown sent in to the finalizer is the global
593        // shutdown handle used to tell it to stop accepting new batch
594        // statuses and just wait for the remaining acks to come in.
595        let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
596
597        // We set up a separate shutdown signal to tie together the
598        // finalizer and the checkpoint writer task in the file
599        // server, to make it continue to write out updated
600        // checkpoints until all the acks have come in.
601        let (send_shutdown, shutdown2) = oneshot::channel::<()>();
602        let checkpoints = checkpointer.view();
603        tokio::spawn(async move {
604            while let Some((status, entry)) = ack_stream.next().await {
605                if status == BatchStatus::Delivered {
606                    checkpoints.update(entry.file_id, entry.offset);
607                }
608            }
609            send_shutdown.send(())
610        });
611        (Some(finalizer), shutdown2.map(|_| ()).boxed())
612    } else {
613        // When not dealing with end-to-end acknowledgements, just
614        // clone the global shutdown to stop the checkpoint writer.
615        (None, shutdown.clone().map(|_| ()).boxed())
616    };
617
618    let checkpoints = checkpointer.view();
619    let include_file_metric_tag = config.internal_metrics.include_file_tag;
620    Box::pin(async move {
621        info!(message = "Starting file server.", include = ?include, exclude = ?exclude);
622
623        let mut encoding_decoder = encoding_charset.map(Decoder::new);
624
625        // sizing here is just a guess
626        let (tx, rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
627        let rx = rx
628            .map(futures::stream::iter)
629            .flatten()
630            .map(move |mut line| {
631                emit!(FileBytesReceived {
632                    byte_size: line.text.len(),
633                    file: &line.filename,
634                    include_file_metric_tag,
635                });
636                // transcode each line from the file's encoding charset to utf8
637                line.text = match encoding_decoder.as_mut() {
638                    Some(d) => d.decode_to_utf8(line.text),
639                    None => line.text,
640                };
641                line
642            });
643
644        let messages: Box<dyn Stream<Item = Line> + Send + std::marker::Unpin> =
645            if let Some(ref multiline_config) = multiline_config {
646                wrap_with_line_agg(
647                    rx,
648                    multiline_config.try_into().unwrap(), // validated in build
649                )
650            } else if let Some(msi) = message_start_indicator {
651                wrap_with_line_agg(
652                    rx,
653                    line_agg::Config::for_legacy(
654                        Regex::new(&msi).unwrap(), // validated in build
655                        multi_line_timeout,
656                    ),
657                )
658            } else {
659                Box::new(rx)
660            };
661
662        // Once file server ends this will run until it has finished processing remaining
663        // logs in the queue.
664        let span = Span::current();
665        let mut messages = messages.map(move |line| {
666            let mut event = create_event(
667                line.text,
668                line.start_offset,
669                &line.filename,
670                &event_metadata,
671                log_namespace,
672                include_file_metric_tag,
673            );
674
675            if let Some(finalizer) = &finalizer {
676                let (batch, receiver) = BatchNotifier::new_with_receiver();
677                event = event.with_batch_notifier(&batch);
678                let entry = FinalizerEntry {
679                    file_id: line.file_id,
680                    offset: line.end_offset,
681                };
682                finalizer.add(entry, receiver);
683            } else {
684                checkpoints.update(line.file_id, line.end_offset);
685            }
686            event
687        });
688        tokio::spawn(async move {
689            match out
690                .send_event_stream(&mut messages)
691                .instrument(span.or_current())
692                .await
693            {
694                Ok(()) => {
695                    debug!("Finished sending.");
696                }
697                Err(_) => {
698                    let (count, _) = messages.size_hint();
699                    emit!(StreamClosedError { count });
700                }
701            }
702        });
703
704        let span = info_span!("file_server");
705        spawn_blocking(move || {
706            let _enter = span.enter();
707            let result = file_server.run(tx, shutdown, shutdown_checkpointer, checkpointer);
708            emit!(FileOpen { count: 0 });
709            // Panic if we encounter any error originating from the file server.
710            // We're at the `spawn_blocking` call, the panic will be caught and
711            // passed to the `JoinHandle` error, similar to the usual threads.
712            result.unwrap();
713        })
714        .map_err(|error| error!(message="File server unexpectedly stopped.", %error))
715        .await
716    })
717}
718
719/// Emit deprecation warning if the old option is used, and take it into account when determining
720/// defaults. Any of the newer options will override it when set directly.
721fn reconcile_position_options(
722    start_at_beginning: Option<bool>,
723    ignore_checkpoints: Option<bool>,
724    read_from: Option<ReadFromConfig>,
725) -> (bool, ReadFrom) {
726    if start_at_beginning.is_some() {
727        warn!(message = "Use of deprecated option `start_at_beginning`. Please use `ignore_checkpoints` and `read_from` options instead.")
728    }
729
730    match start_at_beginning {
731        Some(true) => (
732            ignore_checkpoints.unwrap_or(true),
733            read_from.map(Into::into).unwrap_or(ReadFrom::Beginning),
734        ),
735        _ => (
736            ignore_checkpoints.unwrap_or(false),
737            read_from.map(Into::into).unwrap_or_default(),
738        ),
739    }
740}
741
742fn wrap_with_line_agg(
743    rx: impl Stream<Item = Line> + Send + std::marker::Unpin + 'static,
744    config: line_agg::Config,
745) -> Box<dyn Stream<Item = Line> + Send + std::marker::Unpin + 'static> {
746    let logic = line_agg::Logic::new(config);
747    Box::new(
748        LineAgg::new(
749            rx.map(|line| {
750                (
751                    line.filename,
752                    line.text,
753                    (line.file_id, line.start_offset, line.end_offset),
754                )
755            }),
756            logic,
757        )
758        .map(
759            |(filename, text, (file_id, start_offset, initial_end), lastline_context)| Line {
760                text,
761                filename,
762                file_id,
763                start_offset,
764                end_offset: lastline_context.map_or(initial_end, |(_, _, lastline_end_offset)| {
765                    lastline_end_offset
766                }),
767            },
768        ),
769    )
770}
771
772struct EventMetadata {
773    host_key: Option<OwnedValuePath>,
774    hostname: Option<String>,
775    file_key: Option<OwnedValuePath>,
776    offset_key: Option<OwnedValuePath>,
777}
778
779fn create_event(
780    line: Bytes,
781    offset: u64,
782    file: &str,
783    meta: &EventMetadata,
784    log_namespace: LogNamespace,
785    include_file_metric_tag: bool,
786) -> LogEvent {
787    let deserializer = BytesDeserializer;
788    let mut event = deserializer.parse_single(line, log_namespace);
789
790    log_namespace.insert_vector_metadata(
791        &mut event,
792        log_schema().source_type_key(),
793        path!("source_type"),
794        Bytes::from_static(FileConfig::NAME.as_bytes()),
795    );
796    log_namespace.insert_vector_metadata(
797        &mut event,
798        log_schema().timestamp_key(),
799        path!("ingest_timestamp"),
800        Utc::now(),
801    );
802
803    let legacy_host_key = meta.host_key.as_ref().map(LegacyKey::Overwrite);
804    // `meta.host_key` is already `unwrap_or_else`ed so we can just pass it in.
805    if let Some(hostname) = &meta.hostname {
806        log_namespace.insert_source_metadata(
807            FileConfig::NAME,
808            &mut event,
809            legacy_host_key,
810            path!("host"),
811            hostname.clone(),
812        );
813    }
814
815    let legacy_offset_key = meta.offset_key.as_ref().map(LegacyKey::Overwrite);
816    log_namespace.insert_source_metadata(
817        FileConfig::NAME,
818        &mut event,
819        legacy_offset_key,
820        path!("offset"),
821        offset,
822    );
823
824    let legacy_file_key = meta.file_key.as_ref().map(LegacyKey::Overwrite);
825    log_namespace.insert_source_metadata(
826        FileConfig::NAME,
827        &mut event,
828        legacy_file_key,
829        path!("path"),
830        file,
831    );
832
833    emit!(FileEventsReceived {
834        count: 1,
835        file,
836        byte_size: event.estimated_json_encoded_size_of(),
837        include_file_metric_tag,
838    });
839
840    event
841}
842
843#[cfg(test)]
844mod tests {
845    use std::{
846        collections::HashSet,
847        fs::{self, File},
848        future::Future,
849        io::{Seek, Write},
850    };
851
852    use encoding_rs::UTF_16LE;
853    use similar_asserts::assert_eq;
854    use tempfile::tempdir;
855    use tokio::time::{sleep, timeout, Duration};
856    use vector_lib::schema::Definition;
857    use vrl::value::kind::Collection;
858
859    use super::*;
860    use crate::{
861        config::Config,
862        event::{Event, EventStatus, Value},
863        shutdown::ShutdownSignal,
864        sources::file,
865        test_util::components::{assert_source_compliance, FILE_SOURCE_TAGS},
866    };
867    use vrl::value;
868
869    #[test]
870    fn generate_config() {
871        crate::test_util::test_generate_config::<FileConfig>();
872    }
873
874    fn test_default_file_config(dir: &tempfile::TempDir) -> file::FileConfig {
875        file::FileConfig {
876            fingerprint: FingerprintConfig::Checksum {
877                bytes: Some(8),
878                ignored_header_bytes: 0,
879                lines: 1,
880            },
881            data_dir: Some(dir.path().to_path_buf()),
882            glob_minimum_cooldown_ms: Duration::from_millis(100),
883            internal_metrics: FileInternalMetricsConfig {
884                include_file_tag: true,
885            },
886            ..Default::default()
887        }
888    }
889
890    async fn sleep_500_millis() {
891        sleep(Duration::from_millis(500)).await;
892    }
893
894    #[test]
895    fn parse_config() {
896        let config: FileConfig = toml::from_str(
897            r#"
898            include = [ "/var/log/**/*.log" ]
899            file_key = "file"
900            glob_minimum_cooldown_ms = 1000
901            multi_line_timeout = 1000
902            max_read_bytes = 2048
903            line_delimiter = "\n"
904        "#,
905        )
906        .unwrap();
907        assert_eq!(config, FileConfig::default());
908        assert_eq!(
909            config.fingerprint,
910            FingerprintConfig::Checksum {
911                bytes: None,
912                ignored_header_bytes: 0,
913                lines: 1
914            }
915        );
916
917        let config: FileConfig = toml::from_str(
918            r#"
919        include = [ "/var/log/**/*.log" ]
920        [fingerprint]
921        strategy = "device_and_inode"
922        "#,
923        )
924        .unwrap();
925        assert_eq!(config.fingerprint, FingerprintConfig::DevInode);
926
927        let config: FileConfig = toml::from_str(
928            r#"
929        include = [ "/var/log/**/*.log" ]
930        [fingerprint]
931        strategy = "checksum"
932        bytes = 128
933        ignored_header_bytes = 512
934        "#,
935        )
936        .unwrap();
937        assert_eq!(
938            config.fingerprint,
939            FingerprintConfig::Checksum {
940                bytes: Some(128),
941                ignored_header_bytes: 512,
942                lines: 1
943            }
944        );
945
946        let config: FileConfig = toml::from_str(
947            r#"
948        include = [ "/var/log/**/*.log" ]
949        [encoding]
950        charset = "utf-16le"
951        "#,
952        )
953        .unwrap();
954        assert_eq!(config.encoding, Some(EncodingConfig { charset: UTF_16LE }));
955
956        let config: FileConfig = toml::from_str(
957            r#"
958        include = [ "/var/log/**/*.log" ]
959        read_from = "beginning"
960        "#,
961        )
962        .unwrap();
963        assert_eq!(config.read_from, ReadFromConfig::Beginning);
964
965        let config: FileConfig = toml::from_str(
966            r#"
967        include = [ "/var/log/**/*.log" ]
968        read_from = "end"
969        "#,
970        )
971        .unwrap();
972        assert_eq!(config.read_from, ReadFromConfig::End);
973    }
974
975    #[test]
976    fn resolve_data_dir() {
977        let global_dir = tempdir().unwrap();
978        let local_dir = tempdir().unwrap();
979
980        let mut config = Config::default();
981        config.global.data_dir = global_dir.keep().into();
982
983        // local path given -- local should win
984        let res = config
985            .global
986            .resolve_and_validate_data_dir(test_default_file_config(&local_dir).data_dir.as_ref())
987            .unwrap();
988        assert_eq!(res, local_dir.path());
989
990        // no local path given -- global fallback should be in effect
991        let res = config.global.resolve_and_validate_data_dir(None).unwrap();
992        assert_eq!(res, config.global.data_dir.unwrap());
993    }
994
995    #[test]
996    fn output_schema_definition_vector_namespace() {
997        let definitions = FileConfig::default()
998            .outputs(LogNamespace::Vector)
999            .remove(0)
1000            .schema_definition(true);
1001
1002        assert_eq!(
1003            definitions,
1004            Some(
1005                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1006                    .with_meaning(OwnedTargetPath::event_root(), "message")
1007                    .with_metadata_field(
1008                        &owned_value_path!("vector", "source_type"),
1009                        Kind::bytes(),
1010                        None
1011                    )
1012                    .with_metadata_field(
1013                        &owned_value_path!("vector", "ingest_timestamp"),
1014                        Kind::timestamp(),
1015                        None
1016                    )
1017                    .with_metadata_field(
1018                        &owned_value_path!("file", "host"),
1019                        Kind::bytes().or_undefined(),
1020                        Some("host")
1021                    )
1022                    .with_metadata_field(
1023                        &owned_value_path!("file", "offset"),
1024                        Kind::integer(),
1025                        None
1026                    )
1027                    .with_metadata_field(&owned_value_path!("file", "path"), Kind::bytes(), None)
1028            )
1029        )
1030    }
1031
1032    #[test]
1033    fn output_schema_definition_legacy_namespace() {
1034        let definitions = FileConfig::default()
1035            .outputs(LogNamespace::Legacy)
1036            .remove(0)
1037            .schema_definition(true);
1038
1039        assert_eq!(
1040            definitions,
1041            Some(
1042                Definition::new_with_default_metadata(
1043                    Kind::object(Collection::empty()),
1044                    [LogNamespace::Legacy]
1045                )
1046                .with_event_field(
1047                    &owned_value_path!("message"),
1048                    Kind::bytes(),
1049                    Some("message")
1050                )
1051                .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1052                .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1053                .with_event_field(
1054                    &owned_value_path!("host"),
1055                    Kind::bytes().or_undefined(),
1056                    Some("host")
1057                )
1058                .with_event_field(&owned_value_path!("offset"), Kind::undefined(), None)
1059                .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1060            )
1061        )
1062    }
1063
1064    #[test]
1065    fn create_event_legacy_namespace() {
1066        let line = Bytes::from("hello world");
1067        let file = "some_file.rs";
1068        let offset: u64 = 0;
1069
1070        let meta = EventMetadata {
1071            host_key: Some(owned_value_path!("host")),
1072            hostname: Some("Some.Machine".to_string()),
1073            file_key: Some(owned_value_path!("file")),
1074            offset_key: Some(owned_value_path!("offset")),
1075        };
1076        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1077
1078        assert_eq!(log["file"], "some_file.rs".into());
1079        assert_eq!(log["host"], "Some.Machine".into());
1080        assert_eq!(log["offset"], 0.into());
1081        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1082        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1083        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1084    }
1085
1086    #[test]
1087    fn create_event_custom_fields_legacy_namespace() {
1088        let line = Bytes::from("hello world");
1089        let file = "some_file.rs";
1090        let offset: u64 = 0;
1091
1092        let meta = EventMetadata {
1093            host_key: Some(owned_value_path!("hostname")),
1094            hostname: Some("Some.Machine".to_string()),
1095            file_key: Some(owned_value_path!("file_path")),
1096            offset_key: Some(owned_value_path!("off")),
1097        };
1098        let log = create_event(line, offset, file, &meta, LogNamespace::Legacy, false);
1099
1100        assert_eq!(log["file_path"], "some_file.rs".into());
1101        assert_eq!(log["hostname"], "Some.Machine".into());
1102        assert_eq!(log["off"], 0.into());
1103        assert_eq!(*log.get_message().unwrap(), "hello world".into());
1104        assert_eq!(*log.get_source_type().unwrap(), "file".into());
1105        assert!(log[log_schema().timestamp_key().unwrap().to_string()].is_timestamp());
1106    }
1107
1108    #[test]
1109    fn create_event_vector_namespace() {
1110        let line = Bytes::from("hello world");
1111        let file = "some_file.rs";
1112        let offset: u64 = 0;
1113
1114        let meta = EventMetadata {
1115            host_key: Some(owned_value_path!("ignored")),
1116            hostname: Some("Some.Machine".to_string()),
1117            file_key: Some(owned_value_path!("ignored")),
1118            offset_key: Some(owned_value_path!("ignored")),
1119        };
1120        let log = create_event(line, offset, file, &meta, LogNamespace::Vector, false);
1121
1122        assert_eq!(log.value(), &value!("hello world"));
1123
1124        assert_eq!(
1125            log.metadata()
1126                .value()
1127                .get(path!("vector", "source_type"))
1128                .unwrap(),
1129            &value!("file")
1130        );
1131        assert!(log
1132            .metadata()
1133            .value()
1134            .get(path!("vector", "ingest_timestamp"))
1135            .unwrap()
1136            .is_timestamp());
1137
1138        assert_eq!(
1139            log.metadata()
1140                .value()
1141                .get(path!(FileConfig::NAME, "host"))
1142                .unwrap(),
1143            &value!("Some.Machine")
1144        );
1145        assert_eq!(
1146            log.metadata()
1147                .value()
1148                .get(path!(FileConfig::NAME, "offset"))
1149                .unwrap(),
1150            &value!(0)
1151        );
1152        assert_eq!(
1153            log.metadata()
1154                .value()
1155                .get(path!(FileConfig::NAME, "path"))
1156                .unwrap(),
1157            &value!("some_file.rs")
1158        );
1159    }
1160
1161    #[tokio::test]
1162    async fn file_happy_path() {
1163        let n = 5;
1164
1165        let dir = tempdir().unwrap();
1166        let config = file::FileConfig {
1167            include: vec![dir.path().join("*")],
1168            ..test_default_file_config(&dir)
1169        };
1170
1171        let path1 = dir.path().join("file1");
1172        let path2 = dir.path().join("file2");
1173
1174        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1175            let mut file1 = File::create(&path1).unwrap();
1176            let mut file2 = File::create(&path2).unwrap();
1177
1178            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1179
1180            for i in 0..n {
1181                writeln!(&mut file1, "hello {i}").unwrap();
1182                writeln!(&mut file2, "goodbye {i}").unwrap();
1183            }
1184
1185            sleep_500_millis().await;
1186        })
1187        .await;
1188
1189        let mut hello_i = 0;
1190        let mut goodbye_i = 0;
1191
1192        for event in received {
1193            let line =
1194                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1195            if line.starts_with("hello") {
1196                assert_eq!(line, format!("hello {}", hello_i));
1197                assert_eq!(
1198                    event.as_log()["file"].to_string_lossy(),
1199                    path1.to_str().unwrap()
1200                );
1201                hello_i += 1;
1202            } else {
1203                assert_eq!(line, format!("goodbye {}", goodbye_i));
1204                assert_eq!(
1205                    event.as_log()["file"].to_string_lossy(),
1206                    path2.to_str().unwrap()
1207                );
1208                goodbye_i += 1;
1209            }
1210        }
1211        assert_eq!(hello_i, n);
1212        assert_eq!(goodbye_i, n);
1213    }
1214
1215    // https://github.com/vectordotdev/vector/issues/8363
1216    #[tokio::test]
1217    async fn file_read_empty_lines() {
1218        let n = 5;
1219
1220        let dir = tempdir().unwrap();
1221        let config = file::FileConfig {
1222            include: vec![dir.path().join("*")],
1223            ..test_default_file_config(&dir)
1224        };
1225
1226        let path = dir.path().join("file");
1227
1228        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1229            let mut file = File::create(&path).unwrap();
1230
1231            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1232
1233            writeln!(&mut file, "line for checkpointing").unwrap();
1234            for _i in 0..n {
1235                writeln!(&mut file).unwrap();
1236            }
1237
1238            sleep_500_millis().await;
1239        })
1240        .await;
1241
1242        assert_eq!(received.len(), n + 1);
1243    }
1244
1245    #[tokio::test]
1246    async fn file_truncate() {
1247        let n = 5;
1248
1249        let dir = tempdir().unwrap();
1250        let config = file::FileConfig {
1251            include: vec![dir.path().join("*")],
1252            ..test_default_file_config(&dir)
1253        };
1254        let path = dir.path().join("file");
1255        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1256            let mut file = File::create(&path).unwrap();
1257
1258            sleep_500_millis().await; // The files must be observed at its original length before writing to it
1259
1260            for i in 0..n {
1261                writeln!(&mut file, "pretrunc {i}").unwrap();
1262            }
1263
1264            sleep_500_millis().await; // The writes must be observed before truncating
1265
1266            file.set_len(0).unwrap();
1267            file.seek(std::io::SeekFrom::Start(0)).unwrap();
1268
1269            sleep_500_millis().await; // The truncate must be observed before writing again
1270
1271            for i in 0..n {
1272                writeln!(&mut file, "posttrunc {i}").unwrap();
1273            }
1274
1275            sleep_500_millis().await;
1276        })
1277        .await;
1278
1279        let mut i = 0;
1280        let mut pre_trunc = true;
1281
1282        for event in received {
1283            assert_eq!(
1284                event.as_log()["file"].to_string_lossy(),
1285                path.to_str().unwrap()
1286            );
1287
1288            let line =
1289                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1290
1291            if pre_trunc {
1292                assert_eq!(line, format!("pretrunc {}", i));
1293            } else {
1294                assert_eq!(line, format!("posttrunc {}", i));
1295            }
1296
1297            i += 1;
1298            if i == n {
1299                i = 0;
1300                pre_trunc = false;
1301            }
1302        }
1303    }
1304
1305    #[tokio::test]
1306    async fn file_rotate() {
1307        let n = 5;
1308
1309        let dir = tempdir().unwrap();
1310        let config = file::FileConfig {
1311            include: vec![dir.path().join("*")],
1312            ..test_default_file_config(&dir)
1313        };
1314
1315        let path = dir.path().join("file");
1316        let archive_path = dir.path().join("file");
1317        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1318            let mut file = File::create(&path).unwrap();
1319
1320            sleep_500_millis().await; // The files must be observed at its original length before writing to it
1321
1322            for i in 0..n {
1323                writeln!(&mut file, "prerot {i}").unwrap();
1324            }
1325
1326            sleep_500_millis().await; // The writes must be observed before rotating
1327
1328            fs::rename(&path, archive_path).expect("could not rename");
1329            let mut file = File::create(&path).unwrap();
1330
1331            sleep_500_millis().await; // The rotation must be observed before writing again
1332
1333            for i in 0..n {
1334                writeln!(&mut file, "postrot {i}").unwrap();
1335            }
1336
1337            sleep_500_millis().await;
1338        })
1339        .await;
1340
1341        let mut i = 0;
1342        let mut pre_rot = true;
1343
1344        for event in received {
1345            assert_eq!(
1346                event.as_log()["file"].to_string_lossy(),
1347                path.to_str().unwrap()
1348            );
1349
1350            let line =
1351                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1352
1353            if pre_rot {
1354                assert_eq!(line, format!("prerot {}", i));
1355            } else {
1356                assert_eq!(line, format!("postrot {}", i));
1357            }
1358
1359            i += 1;
1360            if i == n {
1361                i = 0;
1362                pre_rot = false;
1363            }
1364        }
1365    }
1366
1367    #[tokio::test]
1368    async fn file_multiple_paths() {
1369        let n = 5;
1370
1371        let dir = tempdir().unwrap();
1372        let config = file::FileConfig {
1373            include: vec![dir.path().join("*.txt"), dir.path().join("a.*")],
1374            exclude: vec![dir.path().join("a.*.txt")],
1375            ..test_default_file_config(&dir)
1376        };
1377
1378        let path1 = dir.path().join("a.txt");
1379        let path2 = dir.path().join("b.txt");
1380        let path3 = dir.path().join("a.log");
1381        let path4 = dir.path().join("a.ignore.txt");
1382        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1383            let mut file1 = File::create(&path1).unwrap();
1384            let mut file2 = File::create(&path2).unwrap();
1385            let mut file3 = File::create(&path3).unwrap();
1386            let mut file4 = File::create(&path4).unwrap();
1387
1388            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1389
1390            for i in 0..n {
1391                writeln!(&mut file1, "1 {i}").unwrap();
1392                writeln!(&mut file2, "2 {i}").unwrap();
1393                writeln!(&mut file3, "3 {i}").unwrap();
1394                writeln!(&mut file4, "4 {i}").unwrap();
1395            }
1396
1397            sleep_500_millis().await;
1398        })
1399        .await;
1400
1401        let mut is = [0; 3];
1402
1403        for event in received {
1404            let line =
1405                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1406            let mut split = line.split(' ');
1407            let file = split.next().unwrap().parse::<usize>().unwrap();
1408            assert_ne!(file, 4);
1409            let i = split.next().unwrap().parse::<usize>().unwrap();
1410
1411            assert_eq!(is[file - 1], i);
1412            is[file - 1] += 1;
1413        }
1414
1415        assert_eq!(is, [n as usize; 3]);
1416    }
1417
1418    #[tokio::test]
1419    async fn file_exclude_paths() {
1420        let n = 5;
1421
1422        let dir = tempdir().unwrap();
1423        let config = file::FileConfig {
1424            include: vec![dir.path().join("a//b/*.log.*")],
1425            exclude: vec![dir.path().join("a//b/test.log.*")],
1426            ..test_default_file_config(&dir)
1427        };
1428
1429        let path1 = dir.path().join("a//b/a.log.1");
1430        let path2 = dir.path().join("a//b/test.log.1");
1431        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1432            std::fs::create_dir_all(dir.path().join("a/b")).unwrap();
1433            let mut file1 = File::create(&path1).unwrap();
1434            let mut file2 = File::create(&path2).unwrap();
1435
1436            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1437
1438            for i in 0..n {
1439                writeln!(&mut file1, "1 {i}").unwrap();
1440                writeln!(&mut file2, "2 {i}").unwrap();
1441            }
1442
1443            sleep_500_millis().await;
1444        })
1445        .await;
1446
1447        let mut is = [0; 1];
1448
1449        for event in received {
1450            let line =
1451                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy();
1452            let mut split = line.split(' ');
1453            let file = split.next().unwrap().parse::<usize>().unwrap();
1454            assert_ne!(file, 4);
1455            let i = split.next().unwrap().parse::<usize>().unwrap();
1456
1457            assert_eq!(is[file - 1], i);
1458            is[file - 1] += 1;
1459        }
1460
1461        assert_eq!(is, [n as usize; 1]);
1462    }
1463
1464    #[tokio::test]
1465    async fn file_key_acknowledged() {
1466        file_key(Acks).await
1467    }
1468
1469    #[tokio::test]
1470    async fn file_key_no_acknowledge() {
1471        file_key(NoAcks).await
1472    }
1473
1474    async fn file_key(acks: AckingMode) {
1475        // Default
1476        {
1477            let dir = tempdir().unwrap();
1478            let config = file::FileConfig {
1479                include: vec![dir.path().join("*")],
1480                ..test_default_file_config(&dir)
1481            };
1482
1483            let path = dir.path().join("file");
1484            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1485                let mut file = File::create(&path).unwrap();
1486
1487                sleep_500_millis().await;
1488
1489                writeln!(&mut file, "hello there").unwrap();
1490
1491                sleep_500_millis().await;
1492            })
1493            .await;
1494
1495            assert_eq!(received.len(), 1);
1496            assert_eq!(
1497                received[0].as_log()["file"].to_string_lossy(),
1498                path.to_str().unwrap()
1499            );
1500        }
1501
1502        // Custom
1503        {
1504            let dir = tempdir().unwrap();
1505            let config = file::FileConfig {
1506                include: vec![dir.path().join("*")],
1507                file_key: OptionalValuePath::from(owned_value_path!("source")),
1508                ..test_default_file_config(&dir)
1509            };
1510
1511            let path = dir.path().join("file");
1512            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1513                let mut file = File::create(&path).unwrap();
1514
1515                sleep_500_millis().await;
1516
1517                writeln!(&mut file, "hello there").unwrap();
1518
1519                sleep_500_millis().await;
1520            })
1521            .await;
1522
1523            assert_eq!(received.len(), 1);
1524            assert_eq!(
1525                received[0].as_log()["source"].to_string_lossy(),
1526                path.to_str().unwrap()
1527            );
1528        }
1529
1530        // Hidden
1531        {
1532            let dir = tempdir().unwrap();
1533            let config = file::FileConfig {
1534                include: vec![dir.path().join("*")],
1535                ..test_default_file_config(&dir)
1536            };
1537
1538            let path = dir.path().join("file");
1539            let received = run_file_source(&config, true, acks, LogNamespace::Legacy, async {
1540                let mut file = File::create(&path).unwrap();
1541
1542                sleep_500_millis().await;
1543
1544                writeln!(&mut file, "hello there").unwrap();
1545
1546                sleep_500_millis().await;
1547            })
1548            .await;
1549
1550            assert_eq!(received.len(), 1);
1551            assert_eq!(
1552                received[0].as_log().keys().unwrap().collect::<HashSet<_>>(),
1553                vec![
1554                    default_file_key()
1555                        .path
1556                        .expect("file key to exist")
1557                        .to_string()
1558                        .into(),
1559                    log_schema().host_key().unwrap().to_string().into(),
1560                    log_schema().message_key().unwrap().to_string().into(),
1561                    log_schema().timestamp_key().unwrap().to_string().into(),
1562                    log_schema().source_type_key().unwrap().to_string().into()
1563                ]
1564                .into_iter()
1565                .collect::<HashSet<_>>()
1566            );
1567        }
1568    }
1569
1570    #[cfg(target_os = "linux")] // see #7988
1571    #[tokio::test]
1572    async fn file_start_position_server_restart_acknowledged() {
1573        file_start_position_server_restart(Acks).await
1574    }
1575
1576    #[cfg(target_os = "linux")] // see #7988
1577    #[tokio::test]
1578    async fn file_start_position_server_restart_no_acknowledge() {
1579        file_start_position_server_restart(NoAcks).await
1580    }
1581
1582    #[cfg(target_os = "linux")] // see #7988
1583    async fn file_start_position_server_restart(acking: AckingMode) {
1584        let dir = tempdir().unwrap();
1585        let config = file::FileConfig {
1586            include: vec![dir.path().join("*")],
1587            ..test_default_file_config(&dir)
1588        };
1589
1590        let path = dir.path().join("file");
1591        let mut file = File::create(&path).unwrap();
1592        writeln!(&mut file, "zeroth line").unwrap();
1593        sleep_500_millis().await;
1594
1595        // First time server runs it picks up existing lines.
1596        {
1597            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1598                sleep_500_millis().await;
1599                writeln!(&mut file, "first line").unwrap();
1600                sleep_500_millis().await;
1601            })
1602            .await;
1603
1604            let lines = extract_messages_string(received);
1605            assert_eq!(lines, vec!["zeroth line", "first line"]);
1606        }
1607        // Restart server, read file from checkpoint.
1608        {
1609            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1610                sleep_500_millis().await;
1611                writeln!(&mut file, "second line").unwrap();
1612                sleep_500_millis().await;
1613            })
1614            .await;
1615
1616            let lines = extract_messages_string(received);
1617            assert_eq!(lines, vec!["second line"]);
1618        }
1619        // Restart server, read files from beginning.
1620        {
1621            let config = file::FileConfig {
1622                include: vec![dir.path().join("*")],
1623                ignore_checkpoints: Some(true),
1624                read_from: ReadFromConfig::Beginning,
1625                ..test_default_file_config(&dir)
1626            };
1627            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1628                sleep_500_millis().await;
1629                writeln!(&mut file, "third line").unwrap();
1630                sleep_500_millis().await;
1631            })
1632            .await;
1633
1634            let lines = extract_messages_string(received);
1635            assert_eq!(
1636                lines,
1637                vec!["zeroth line", "first line", "second line", "third line"]
1638            );
1639        }
1640    }
1641
1642    #[tokio::test]
1643    async fn file_start_position_server_restart_unfinalized() {
1644        let dir = tempdir().unwrap();
1645        let config = file::FileConfig {
1646            include: vec![dir.path().join("*")],
1647            ..test_default_file_config(&dir)
1648        };
1649
1650        let path = dir.path().join("file");
1651        let mut file = File::create(&path).unwrap();
1652        writeln!(&mut file, "the line").unwrap();
1653        sleep_500_millis().await;
1654
1655        // First time server runs it picks up existing lines.
1656        let received = run_file_source(
1657            &config,
1658            false,
1659            Unfinalized,
1660            LogNamespace::Legacy,
1661            sleep_500_millis(),
1662        )
1663        .await;
1664        let lines = extract_messages_string(received);
1665        assert_eq!(lines, vec!["the line"]);
1666
1667        // Restart server, it re-reads file since the events were not acknowledged before shutdown
1668        let received = run_file_source(
1669            &config,
1670            false,
1671            Unfinalized,
1672            LogNamespace::Legacy,
1673            sleep_500_millis(),
1674        )
1675        .await;
1676        let lines = extract_messages_string(received);
1677        assert_eq!(lines, vec!["the line"]);
1678    }
1679
1680    #[tokio::test]
1681    async fn file_duplicate_processing_after_restart() {
1682        let dir = tempdir().unwrap();
1683        let config = file::FileConfig {
1684            include: vec![dir.path().join("*")],
1685            ..test_default_file_config(&dir)
1686        };
1687
1688        let path = dir.path().join("file");
1689        let mut file = File::create(&path).unwrap();
1690
1691        let line_count = 4000;
1692        for i in 0..line_count {
1693            writeln!(&mut file, "Here's a line for you: {i}").unwrap();
1694        }
1695        sleep_500_millis().await;
1696
1697        // First time server runs it should pick up a bunch of lines
1698        let received = run_file_source(
1699            &config,
1700            true,
1701            Acks,
1702            LogNamespace::Legacy,
1703            // shutdown signal is sent after this duration
1704            sleep_500_millis(),
1705        )
1706        .await;
1707        let lines = extract_messages_string(received);
1708
1709        // ...but not all the lines; if the first run processed the entire file, we may not hit the
1710        // bug we're testing for, which happens if the finalizer stream exits on shutdown with pending acks
1711        assert!(lines.len() < line_count);
1712
1713        // Restart the server, and it should read the rest without duplicating any
1714        let received = run_file_source(
1715            &config,
1716            true,
1717            Acks,
1718            LogNamespace::Legacy,
1719            sleep(Duration::from_secs(5)),
1720        )
1721        .await;
1722        let lines2 = extract_messages_string(received);
1723
1724        // Between both runs, we should have the expected number of lines
1725        assert_eq!(lines.len() + lines2.len(), line_count);
1726    }
1727
1728    #[tokio::test]
1729    async fn file_start_position_server_restart_with_file_rotation_acknowledged() {
1730        file_start_position_server_restart_with_file_rotation(Acks).await
1731    }
1732
1733    #[tokio::test]
1734    async fn file_start_position_server_restart_with_file_rotation_no_acknowledge() {
1735        file_start_position_server_restart_with_file_rotation(NoAcks).await
1736    }
1737
1738    async fn file_start_position_server_restart_with_file_rotation(acking: AckingMode) {
1739        let dir = tempdir().unwrap();
1740        let config = file::FileConfig {
1741            include: vec![dir.path().join("*")],
1742            ..test_default_file_config(&dir)
1743        };
1744
1745        let path = dir.path().join("file");
1746        let path_for_old_file = dir.path().join("file.old");
1747        // Run server first time, collect some lines.
1748        {
1749            let received = run_file_source(&config, true, acking, LogNamespace::Legacy, async {
1750                let mut file = File::create(&path).unwrap();
1751                sleep_500_millis().await;
1752                writeln!(&mut file, "first line").unwrap();
1753                sleep_500_millis().await;
1754            })
1755            .await;
1756
1757            let lines = extract_messages_string(received);
1758            assert_eq!(lines, vec!["first line"]);
1759        }
1760        // Perform 'file rotation' to archive old lines.
1761        fs::rename(&path, &path_for_old_file).expect("could not rename");
1762        // Restart the server and make sure it does not re-read the old file
1763        // even though it has a new name.
1764        {
1765            let received = run_file_source(&config, false, acking, LogNamespace::Legacy, async {
1766                let mut file = File::create(&path).unwrap();
1767                sleep_500_millis().await;
1768                writeln!(&mut file, "second line").unwrap();
1769                sleep_500_millis().await;
1770            })
1771            .await;
1772
1773            let lines = extract_messages_string(received);
1774            assert_eq!(lines, vec!["second line"]);
1775        }
1776    }
1777
1778    #[cfg(unix)] // this test uses unix-specific function `futimes` during test time
1779    #[tokio::test]
1780    async fn file_start_position_ignore_old_files() {
1781        use std::{
1782            os::unix::io::AsRawFd,
1783            time::{Duration, SystemTime},
1784        };
1785
1786        let dir = tempdir().unwrap();
1787        let config = file::FileConfig {
1788            include: vec![dir.path().join("*")],
1789            ignore_older_secs: Some(5),
1790            ..test_default_file_config(&dir)
1791        };
1792
1793        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1794            let before_path = dir.path().join("before");
1795            let mut before_file = File::create(&before_path).unwrap();
1796            let after_path = dir.path().join("after");
1797            let mut after_file = File::create(&after_path).unwrap();
1798
1799            writeln!(&mut before_file, "first line").unwrap(); // first few bytes make up unique file fingerprint
1800            writeln!(&mut after_file, "_first line").unwrap(); //   and therefore need to be non-identical
1801
1802            {
1803                // Set the modified times
1804                let before = SystemTime::now() - Duration::from_secs(8);
1805                let after = SystemTime::now() - Duration::from_secs(2);
1806
1807                let before_time = libc::timeval {
1808                    tv_sec: before
1809                        .duration_since(SystemTime::UNIX_EPOCH)
1810                        .unwrap()
1811                        .as_secs() as _,
1812                    tv_usec: 0,
1813                };
1814                let before_times = [before_time, before_time];
1815
1816                let after_time = libc::timeval {
1817                    tv_sec: after
1818                        .duration_since(SystemTime::UNIX_EPOCH)
1819                        .unwrap()
1820                        .as_secs() as _,
1821                    tv_usec: 0,
1822                };
1823                let after_times = [after_time, after_time];
1824
1825                unsafe {
1826                    libc::futimes(before_file.as_raw_fd(), before_times.as_ptr());
1827                    libc::futimes(after_file.as_raw_fd(), after_times.as_ptr());
1828                }
1829            }
1830
1831            sleep_500_millis().await;
1832            writeln!(&mut before_file, "second line").unwrap();
1833            writeln!(&mut after_file, "_second line").unwrap();
1834
1835            sleep_500_millis().await;
1836        })
1837        .await;
1838
1839        let before_lines = received
1840            .iter()
1841            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("before"))
1842            .map(|event| {
1843                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1844            })
1845            .collect::<Vec<_>>();
1846        let after_lines = received
1847            .iter()
1848            .filter(|event| event.as_log()["file"].to_string_lossy().ends_with("after"))
1849            .map(|event| {
1850                event.as_log()[log_schema().message_key().unwrap().to_string()].to_string_lossy()
1851            })
1852            .collect::<Vec<_>>();
1853        assert_eq!(before_lines, vec!["second line"]);
1854        assert_eq!(after_lines, vec!["_first line", "_second line"]);
1855    }
1856
1857    #[tokio::test]
1858    async fn file_max_line_bytes() {
1859        let dir = tempdir().unwrap();
1860        let config = file::FileConfig {
1861            include: vec![dir.path().join("*")],
1862            max_line_bytes: 10,
1863            ..test_default_file_config(&dir)
1864        };
1865
1866        let path = dir.path().join("file");
1867        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1868            let mut file = File::create(&path).unwrap();
1869
1870            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1871
1872            writeln!(&mut file, "short").unwrap();
1873            writeln!(&mut file, "this is too long").unwrap();
1874            writeln!(&mut file, "11 eleven11").unwrap();
1875            let super_long = "This line is super long and will take up more space than BufReader's internal buffer, just to make sure that everything works properly when multiple read calls are involved".repeat(10000);
1876            writeln!(&mut file, "{super_long}").unwrap();
1877            writeln!(&mut file, "exactly 10").unwrap();
1878            writeln!(&mut file, "it can end on a line that's too long").unwrap();
1879
1880            sleep_500_millis().await;
1881            sleep_500_millis().await;
1882
1883            writeln!(&mut file, "and then continue").unwrap();
1884            writeln!(&mut file, "last short").unwrap();
1885
1886            sleep_500_millis().await;
1887            sleep_500_millis().await;
1888        }).await;
1889
1890        let received = extract_messages_value(received);
1891
1892        assert_eq!(
1893            received,
1894            vec!["short".into(), "exactly 10".into(), "last short".into()]
1895        );
1896    }
1897
1898    #[tokio::test]
1899    async fn test_multi_line_aggregation_legacy() {
1900        let dir = tempdir().unwrap();
1901        let config = file::FileConfig {
1902            include: vec![dir.path().join("*")],
1903            message_start_indicator: Some("INFO".into()),
1904            multi_line_timeout: 25, // less than 50 in sleep()
1905            ..test_default_file_config(&dir)
1906        };
1907
1908        let path = dir.path().join("file");
1909        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1910            let mut file = File::create(&path).unwrap();
1911
1912            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1913
1914            writeln!(&mut file, "leftover foo").unwrap();
1915            writeln!(&mut file, "INFO hello").unwrap();
1916            writeln!(&mut file, "INFO goodbye").unwrap();
1917            writeln!(&mut file, "part of goodbye").unwrap();
1918
1919            sleep_500_millis().await;
1920
1921            writeln!(&mut file, "INFO hi again").unwrap();
1922            writeln!(&mut file, "and some more").unwrap();
1923            writeln!(&mut file, "INFO hello").unwrap();
1924
1925            sleep_500_millis().await;
1926
1927            writeln!(&mut file, "too slow").unwrap();
1928            writeln!(&mut file, "INFO doesn't have").unwrap();
1929            writeln!(&mut file, "to be INFO in").unwrap();
1930            writeln!(&mut file, "the middle").unwrap();
1931
1932            sleep_500_millis().await;
1933        })
1934        .await;
1935
1936        let received = extract_messages_value(received);
1937
1938        assert_eq!(
1939            received,
1940            vec![
1941                "leftover foo".into(),
1942                "INFO hello".into(),
1943                "INFO goodbye\npart of goodbye".into(),
1944                "INFO hi again\nand some more".into(),
1945                "INFO hello".into(),
1946                "too slow".into(),
1947                "INFO doesn't have".into(),
1948                "to be INFO in\nthe middle".into(),
1949            ]
1950        );
1951    }
1952
1953    #[tokio::test]
1954    async fn test_multi_line_aggregation() {
1955        let dir = tempdir().unwrap();
1956        let config = file::FileConfig {
1957            include: vec![dir.path().join("*")],
1958            multiline: Some(MultilineConfig {
1959                start_pattern: "INFO".to_owned(),
1960                condition_pattern: "INFO".to_owned(),
1961                mode: line_agg::Mode::HaltBefore,
1962                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
1963            }),
1964            ..test_default_file_config(&dir)
1965        };
1966
1967        let path = dir.path().join("file");
1968        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
1969            let mut file = File::create(&path).unwrap();
1970
1971            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
1972
1973            writeln!(&mut file, "leftover foo").unwrap();
1974            writeln!(&mut file, "INFO hello").unwrap();
1975            writeln!(&mut file, "INFO goodbye").unwrap();
1976            writeln!(&mut file, "part of goodbye").unwrap();
1977
1978            sleep_500_millis().await;
1979
1980            writeln!(&mut file, "INFO hi again").unwrap();
1981            writeln!(&mut file, "and some more").unwrap();
1982            writeln!(&mut file, "INFO hello").unwrap();
1983
1984            sleep_500_millis().await;
1985
1986            writeln!(&mut file, "too slow").unwrap();
1987            writeln!(&mut file, "INFO doesn't have").unwrap();
1988            writeln!(&mut file, "to be INFO in").unwrap();
1989            writeln!(&mut file, "the middle").unwrap();
1990
1991            sleep_500_millis().await;
1992        })
1993        .await;
1994
1995        let received = extract_messages_value(received);
1996
1997        assert_eq!(
1998            received,
1999            vec![
2000                "leftover foo".into(),
2001                "INFO hello".into(),
2002                "INFO goodbye\npart of goodbye".into(),
2003                "INFO hi again\nand some more".into(),
2004                "INFO hello".into(),
2005                "too slow".into(),
2006                "INFO doesn't have".into(),
2007                "to be INFO in\nthe middle".into(),
2008            ]
2009        );
2010    }
2011
2012    #[tokio::test]
2013    async fn test_multi_line_checkpointing() {
2014        let dir = tempdir().unwrap();
2015        let config = file::FileConfig {
2016            include: vec![dir.path().join("*")],
2017            offset_key: Some(OptionalValuePath::from(owned_value_path!("offset"))),
2018            multiline: Some(MultilineConfig {
2019                start_pattern: "INFO".to_owned(),
2020                condition_pattern: "INFO".to_owned(),
2021                mode: line_agg::Mode::HaltBefore,
2022                timeout_ms: Duration::from_millis(25), // less than 50 in sleep()
2023            }),
2024            ..test_default_file_config(&dir)
2025        };
2026
2027        let path = dir.path().join("file");
2028        let mut file = File::create(&path).unwrap();
2029
2030        writeln!(&mut file, "INFO hello").unwrap();
2031        writeln!(&mut file, "part of hello").unwrap();
2032
2033        // Read and aggregate existing lines
2034        let received = run_file_source(
2035            &config,
2036            false,
2037            Acks,
2038            LogNamespace::Legacy,
2039            sleep_500_millis(),
2040        )
2041        .await;
2042
2043        assert_eq!(received[0].as_log()["offset"], 0.into());
2044
2045        let lines = extract_messages_string(received);
2046        assert_eq!(lines, vec!["INFO hello\npart of hello"]);
2047
2048        // After restart, we should not see any part of the previously aggregated lines
2049        let received_after_restart =
2050            run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2051                writeln!(&mut file, "INFO goodbye").unwrap();
2052            })
2053            .await;
2054        assert_eq!(
2055            received_after_restart[0].as_log()["offset"],
2056            (lines[0].len() + 1).into()
2057        );
2058        let lines = extract_messages_string(received_after_restart);
2059        assert_eq!(lines, vec!["INFO goodbye"]);
2060    }
2061
2062    #[tokio::test]
2063    async fn test_fair_reads() {
2064        let dir = tempdir().unwrap();
2065        let config = file::FileConfig {
2066            include: vec![dir.path().join("*")],
2067            max_read_bytes: 1,
2068            oldest_first: false,
2069            ..test_default_file_config(&dir)
2070        };
2071
2072        let older_path = dir.path().join("z_older_file");
2073        let mut older = File::create(&older_path).unwrap();
2074
2075        sleep_500_millis().await;
2076
2077        let newer_path = dir.path().join("a_newer_file");
2078        let mut newer = File::create(&newer_path).unwrap();
2079
2080        writeln!(&mut older, "hello i am the old file").unwrap();
2081        writeln!(&mut older, "i have been around a while").unwrap();
2082        writeln!(&mut older, "you can read newer files at the same time").unwrap();
2083
2084        writeln!(&mut newer, "and i am the new file").unwrap();
2085        writeln!(&mut newer, "this should be interleaved with the old one").unwrap();
2086        writeln!(&mut newer, "which is fine because we want fairness").unwrap();
2087
2088        sleep_500_millis().await;
2089
2090        let received = run_file_source(
2091            &config,
2092            false,
2093            NoAcks,
2094            LogNamespace::Legacy,
2095            sleep_500_millis(),
2096        )
2097        .await;
2098
2099        let received = extract_messages_value(received);
2100
2101        assert_eq!(
2102            received,
2103            vec![
2104                "hello i am the old file".into(),
2105                "and i am the new file".into(),
2106                "i have been around a while".into(),
2107                "this should be interleaved with the old one".into(),
2108                "you can read newer files at the same time".into(),
2109                "which is fine because we want fairness".into(),
2110            ]
2111        );
2112    }
2113
2114    #[tokio::test]
2115    async fn test_oldest_first() {
2116        let dir = tempdir().unwrap();
2117        let config = file::FileConfig {
2118            include: vec![dir.path().join("*")],
2119            max_read_bytes: 1,
2120            oldest_first: true,
2121            ..test_default_file_config(&dir)
2122        };
2123
2124        let older_path = dir.path().join("z_older_file");
2125        let mut older = File::create(&older_path).unwrap();
2126
2127        sleep_500_millis().await;
2128
2129        let newer_path = dir.path().join("a_newer_file");
2130        let mut newer = File::create(&newer_path).unwrap();
2131
2132        writeln!(&mut older, "hello i am the old file").unwrap();
2133        writeln!(&mut older, "i have been around a while").unwrap();
2134        writeln!(&mut older, "you should definitely read all of me first").unwrap();
2135
2136        writeln!(&mut newer, "i'm new").unwrap();
2137        writeln!(&mut newer, "hopefully you read all the old stuff first").unwrap();
2138        writeln!(&mut newer, "because otherwise i'm not going to make sense").unwrap();
2139
2140        sleep_500_millis().await;
2141
2142        let received = run_file_source(
2143            &config,
2144            false,
2145            NoAcks,
2146            LogNamespace::Legacy,
2147            sleep_500_millis(),
2148        )
2149        .await;
2150
2151        let received = extract_messages_value(received);
2152
2153        assert_eq!(
2154            received,
2155            vec![
2156                "hello i am the old file".into(),
2157                "i have been around a while".into(),
2158                "you should definitely read all of me first".into(),
2159                "i'm new".into(),
2160                "hopefully you read all the old stuff first".into(),
2161                "because otherwise i'm not going to make sense".into(),
2162            ]
2163        );
2164    }
2165
2166    // Ignoring on mac: https://github.com/vectordotdev/vector/issues/8373
2167    #[cfg(not(target_os = "macos"))]
2168    #[tokio::test]
2169    async fn test_split_reads() {
2170        let dir = tempdir().unwrap();
2171        let config = file::FileConfig {
2172            include: vec![dir.path().join("*")],
2173            max_read_bytes: 1,
2174            ..test_default_file_config(&dir)
2175        };
2176
2177        let path = dir.path().join("file");
2178        let mut file = File::create(&path).unwrap();
2179
2180        writeln!(&mut file, "hello i am a normal line").unwrap();
2181
2182        sleep_500_millis().await;
2183
2184        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2185            sleep_500_millis().await;
2186
2187            write!(&mut file, "i am not a full line").unwrap();
2188
2189            // Longer than the EOF timeout
2190            sleep_500_millis().await;
2191
2192            writeln!(&mut file, " until now").unwrap();
2193
2194            sleep_500_millis().await;
2195        })
2196        .await;
2197
2198        let received = extract_messages_value(received);
2199
2200        assert_eq!(
2201            received,
2202            vec![
2203                "hello i am a normal line".into(),
2204                "i am not a full line until now".into(),
2205            ]
2206        );
2207    }
2208
2209    #[tokio::test]
2210    async fn test_gzipped_file() {
2211        let dir = tempdir().unwrap();
2212        let config = file::FileConfig {
2213            include: vec![PathBuf::from("tests/data/gzipped.log")],
2214            // TODO: remove this once files are fingerprinted after decompression
2215            //
2216            // Currently, this needs to be smaller than the total size of the compressed file
2217            // because the fingerprinter tries to read until a newline, which it's not going to see
2218            // in the compressed data, or this number of bytes. If it hits EOF before that, it
2219            // can't return a fingerprint because the value would change once more data is written.
2220            max_line_bytes: 100,
2221            ..test_default_file_config(&dir)
2222        };
2223
2224        let received = run_file_source(
2225            &config,
2226            false,
2227            NoAcks,
2228            LogNamespace::Legacy,
2229            sleep_500_millis(),
2230        )
2231        .await;
2232
2233        let received = extract_messages_value(received);
2234
2235        assert_eq!(
2236            received,
2237            vec![
2238                "this is a simple file".into(),
2239                "i have been compressed".into(),
2240                "in order to make me smaller".into(),
2241                "but you can still read me".into(),
2242                "hooray".into(),
2243            ]
2244        );
2245    }
2246
2247    #[tokio::test]
2248    async fn test_non_utf8_encoded_file() {
2249        let dir = tempdir().unwrap();
2250        let config = file::FileConfig {
2251            include: vec![PathBuf::from("tests/data/utf-16le.log")],
2252            encoding: Some(EncodingConfig { charset: UTF_16LE }),
2253            ..test_default_file_config(&dir)
2254        };
2255
2256        let received = run_file_source(
2257            &config,
2258            false,
2259            NoAcks,
2260            LogNamespace::Legacy,
2261            sleep_500_millis(),
2262        )
2263        .await;
2264
2265        let received = extract_messages_value(received);
2266
2267        assert_eq!(
2268            received,
2269            vec![
2270                "hello i am a file".into(),
2271                "i can unicode".into(),
2272                "but i do so in 16 bits".into(),
2273                "and when i byte".into(),
2274                "i become little-endian".into(),
2275            ]
2276        );
2277    }
2278
2279    #[tokio::test]
2280    async fn test_non_default_line_delimiter() {
2281        let dir = tempdir().unwrap();
2282        let config = file::FileConfig {
2283            include: vec![dir.path().join("*")],
2284            line_delimiter: "\r\n".to_string(),
2285            ..test_default_file_config(&dir)
2286        };
2287
2288        let path = dir.path().join("file");
2289        let received = run_file_source(&config, false, NoAcks, LogNamespace::Legacy, async {
2290            let mut file = File::create(&path).unwrap();
2291
2292            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
2293
2294            write!(&mut file, "hello i am a line\r\n").unwrap();
2295            write!(&mut file, "and i am too\r\n").unwrap();
2296            write!(&mut file, "CRLF is how we end\r\n").unwrap();
2297            write!(&mut file, "please treat us well\r\n").unwrap();
2298
2299            sleep_500_millis().await;
2300        })
2301        .await;
2302
2303        let received = extract_messages_value(received);
2304
2305        assert_eq!(
2306            received,
2307            vec![
2308                "hello i am a line".into(),
2309                "and i am too".into(),
2310                "CRLF is how we end".into(),
2311                "please treat us well".into()
2312            ]
2313        );
2314    }
2315
2316    #[tokio::test]
2317    async fn remove_file() {
2318        let n = 5;
2319        let remove_after_secs = 1;
2320
2321        let dir = tempdir().unwrap();
2322        let config = file::FileConfig {
2323            include: vec![dir.path().join("*")],
2324            remove_after_secs: Some(remove_after_secs),
2325            ..test_default_file_config(&dir)
2326        };
2327
2328        let path = dir.path().join("file");
2329        let received = run_file_source(&config, false, Acks, LogNamespace::Legacy, async {
2330            let mut file = File::create(&path).unwrap();
2331
2332            sleep_500_millis().await; // The files must be observed at their original lengths before writing to them
2333
2334            for i in 0..n {
2335                writeln!(&mut file, "{i}").unwrap();
2336            }
2337            drop(file);
2338
2339            for _ in 0..10 {
2340                // Wait for remove grace period to end.
2341                sleep(Duration::from_secs(remove_after_secs + 1)).await;
2342
2343                if File::open(&path).is_err() {
2344                    break;
2345                }
2346            }
2347        })
2348        .await;
2349
2350        assert_eq!(received.len(), n);
2351
2352        match File::open(&path) {
2353            Ok(_) => panic!("File wasn't removed"),
2354            Err(error) => assert_eq!(error.kind(), std::io::ErrorKind::NotFound),
2355        }
2356    }
2357
2358    #[derive(Clone, Copy, Eq, PartialEq)]
2359    enum AckingMode {
2360        NoAcks,      // No acknowledgement handling and no finalization
2361        Unfinalized, // Acknowledgement handling but no finalization
2362        Acks,        // Full acknowledgements and proper finalization
2363    }
2364    use vector_lib::lookup::OwnedTargetPath;
2365    use AckingMode::*;
2366
2367    async fn run_file_source(
2368        config: &FileConfig,
2369        wait_shutdown: bool,
2370        acking_mode: AckingMode,
2371        log_namespace: LogNamespace,
2372        inner: impl Future<Output = ()>,
2373    ) -> Vec<Event> {
2374        assert_source_compliance(&FILE_SOURCE_TAGS, async move {
2375            let (tx, rx) = if acking_mode == Acks {
2376                let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
2377                (tx, rx.boxed())
2378            } else {
2379                let (tx, rx) = SourceSender::new_test();
2380                (tx, rx.boxed())
2381            };
2382
2383            let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
2384            let data_dir = config.data_dir.clone().unwrap();
2385            let acks = !matches!(acking_mode, NoAcks);
2386
2387            tokio::spawn(file::file_source(
2388                config,
2389                data_dir,
2390                shutdown,
2391                tx,
2392                acks,
2393                log_namespace,
2394            ));
2395
2396            inner.await;
2397
2398            drop(trigger_shutdown);
2399
2400            let result = if acking_mode == Unfinalized {
2401                rx.take_until(tokio::time::sleep(Duration::from_secs(5)))
2402                    .collect::<Vec<_>>()
2403                    .await
2404            } else {
2405                timeout(Duration::from_secs(5), rx.collect::<Vec<_>>())
2406                    .await
2407                    .expect(
2408                        "Unclosed channel: may indicate file-server could not shutdown gracefully.",
2409                    )
2410            };
2411            if wait_shutdown {
2412                shutdown_done.await;
2413            }
2414
2415            result
2416        })
2417        .await
2418    }
2419
2420    fn extract_messages_string(received: Vec<Event>) -> Vec<String> {
2421        received
2422            .into_iter()
2423            .map(Event::into_log)
2424            .map(|log| log.get_message().unwrap().to_string_lossy().into_owned())
2425            .collect()
2426    }
2427
2428    fn extract_messages_value(received: Vec<Event>) -> Vec<Value> {
2429        received
2430            .into_iter()
2431            .map(Event::into_log)
2432            .map(|log| log.get_message().unwrap().clone())
2433            .collect()
2434    }
2435}