vector/sinks/file/
mod.rs

1use std::{
2    convert::TryFrom,
3    num::NonZeroU64,
4    time::{Duration, Instant},
5};
6
7use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
8use async_trait::async_trait;
9use bytes::{Bytes, BytesMut};
10use futures::{
11    FutureExt, future,
12    stream::{BoxStream, StreamExt},
13};
14use serde_with::serde_as;
15use tokio::{
16    fs::{self, File},
17    io::AsyncWriteExt,
18};
19use tokio_util::{codec::Encoder as _, time::delay_queue::Expired};
20use vector_lib::{
21    EstimatedJsonEncodedSizeOf, TimeZone,
22    codecs::{
23        TextSerializerConfig,
24        encoding::{Framer, FramingConfig},
25    },
26    configurable::configurable_component,
27    internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
28};
29
30use crate::{
31    codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
32    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
33    event::{Event, EventStatus, Finalizable},
34    expiring_hash_map::ExpiringHashMap,
35    internal_events::{
36        FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
37    },
38    sinks::util::{StreamSink, timezone_to_offset},
39    template::Template,
40};
41
42mod bytes_path;
43
44use bytes_path::BytesPath;
45
46/// Configuration for the `file` sink.
47#[serde_as]
48#[configurable_component(sink("file", "Output observability events into files."))]
49#[derive(Clone, Debug)]
50#[serde(deny_unknown_fields)]
51pub struct FileSinkConfig {
52    /// File path to write events to.
53    ///
54    /// Compression format extension must be explicit.
55    #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log"))]
56    #[configurable(metadata(
57        docs::examples = "/tmp/application-{{ application_id }}-%Y-%m-%d.log"
58    ))]
59    #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log.zst"))]
60    #[configurable(metadata(
61        docs::warnings = "The rendered path can resolve to any location on the filesystem. Vector will write to it if the process has permission."
62    ))]
63    pub path: Template,
64
65    /// The amount of time that a file can be idle and stay open.
66    ///
67    /// After not receiving any events in this amount of time, the file is flushed and closed.
68    #[serde(default = "default_idle_timeout")]
69    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
70    #[serde(rename = "idle_timeout_secs")]
71    #[configurable(metadata(docs::examples = 600))]
72    #[configurable(metadata(docs::human_name = "Idle Timeout"))]
73    pub idle_timeout: Duration,
74
75    #[serde(flatten)]
76    pub encoding: EncodingConfigWithFraming,
77
78    #[configurable(derived)]
79    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
80    pub compression: Compression,
81
82    #[configurable(derived)]
83    #[serde(
84        default,
85        deserialize_with = "crate::serde::bool_or_struct",
86        skip_serializing_if = "crate::serde::is_default"
87    )]
88    pub acknowledgements: AcknowledgementsConfig,
89
90    #[configurable(derived)]
91    #[serde(default)]
92    pub timezone: Option<TimeZone>,
93
94    #[configurable(derived)]
95    #[serde(default)]
96    pub internal_metrics: FileInternalMetricsConfig,
97
98    #[configurable(derived)]
99    #[serde(default)]
100    pub truncate: FileTruncateConfig,
101}
102
103/// Configuration for truncating files.
104#[configurable_component]
105#[derive(Clone, Debug, Default)]
106#[serde(deny_unknown_fields)]
107pub struct FileTruncateConfig {
108    /// If this is set, files will be truncated after being closed for a set amount of seconds.
109    #[serde(default)]
110    pub after_close_time_secs: Option<NonZeroU64>,
111    /// If this is set, files will be truncated after set amount of seconds of no modifications.
112    #[serde(default)]
113    pub after_modified_time_secs: Option<NonZeroU64>,
114    /// If this is set, files will be truncated after set amount of seconds regardless of the state.
115    #[serde(default)]
116    pub after_secs: Option<NonZeroU64>,
117}
118
119impl GenerateConfig for FileSinkConfig {
120    fn generate_config() -> toml::Value {
121        toml::Value::try_from(Self {
122            path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(),
123            idle_timeout: default_idle_timeout(),
124            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
125            compression: Default::default(),
126            acknowledgements: Default::default(),
127            timezone: Default::default(),
128            internal_metrics: Default::default(),
129            truncate: Default::default(),
130        })
131        .unwrap()
132    }
133}
134
135const fn default_idle_timeout() -> Duration {
136    Duration::from_secs(30)
137}
138
139/// Compression configuration.
140// TODO: Why doesn't this already use `crate::sinks::util::Compression`
141// `crate::sinks::util::Compression` doesn't support zstd yet
142#[configurable_component]
143#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
144#[serde(rename_all = "snake_case")]
145pub enum Compression {
146    /// [Gzip][gzip] compression.
147    ///
148    /// [gzip]: https://www.gzip.org/
149    Gzip,
150
151    /// [Zstandard][zstd] compression.
152    ///
153    /// [zstd]: https://facebook.github.io/zstd/
154    Zstd,
155
156    /// No compression.
157    #[default]
158    None,
159}
160
161struct OutFile {
162    created_at: Instant,
163    inner: OutFileInner,
164}
165
166enum OutFileInner {
167    Regular(File),
168    Gzip(GzipEncoder<File>),
169    Zstd(ZstdEncoder<File>),
170}
171
172impl OutFile {
173    fn new(file: File, compression: Compression) -> Self {
174        Self {
175            created_at: Instant::now(),
176            inner: match compression {
177                Compression::None => OutFileInner::Regular(file),
178                Compression::Gzip => OutFileInner::Gzip(GzipEncoder::new(file)),
179                Compression::Zstd => OutFileInner::Zstd(ZstdEncoder::new(file)),
180            },
181        }
182    }
183
184    async fn sync_all(&mut self) -> Result<(), std::io::Error> {
185        match &mut self.inner {
186            OutFileInner::Regular(file) => file.sync_all().await,
187            OutFileInner::Gzip(gzip) => gzip.get_mut().sync_all().await,
188            OutFileInner::Zstd(zstd) => zstd.get_mut().sync_all().await,
189        }
190    }
191
192    async fn shutdown(&mut self) -> Result<(), std::io::Error> {
193        match &mut self.inner {
194            OutFileInner::Regular(file) => file.shutdown().await,
195            OutFileInner::Gzip(gzip) => gzip.shutdown().await,
196            OutFileInner::Zstd(zstd) => zstd.shutdown().await,
197        }
198    }
199
200    async fn write_all(&mut self, src: &[u8]) -> Result<(), std::io::Error> {
201        match &mut self.inner {
202            OutFileInner::Regular(file) => file.write_all(src).await,
203            OutFileInner::Gzip(gzip) => gzip.write_all(src).await,
204            OutFileInner::Zstd(zstd) => zstd.write_all(src).await,
205        }
206    }
207
208    const fn created_at(&self) -> Instant {
209        self.created_at
210    }
211
212    /// Shutdowns by flushing data, writing headers, and syncing all of that
213    /// data and metadata to the filesystem.
214    async fn close(&mut self) -> Result<(), std::io::Error> {
215        self.shutdown().await?;
216        self.sync_all().await
217    }
218}
219
220#[async_trait::async_trait]
221#[typetag::serde(name = "file")]
222impl SinkConfig for FileSinkConfig {
223    async fn build(
224        &self,
225        cx: SinkContext,
226    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
227        let sink = FileSink::new(self, cx)?;
228        Ok((
229            super::VectorSink::from_event_streamsink(sink),
230            future::ok(()).boxed(),
231        ))
232    }
233
234    fn input(&self) -> Input {
235        Input::new(self.encoding.config().1.input_type())
236    }
237
238    fn acknowledgements(&self) -> &AcknowledgementsConfig {
239        &self.acknowledgements
240    }
241}
242
243pub struct FileSink {
244    path: Template,
245    transformer: Transformer,
246    encoder: Encoder<Framer>,
247    idle_timeout: Duration,
248    files: ExpiringHashMap<Bytes, OutFile>,
249    compression: Compression,
250    events_sent: Registered<EventsSent>,
251    include_file_metric_tag: bool,
252    truncation_config: FileTruncateConfig,
253}
254
255impl FileSink {
256    pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
257        let transformer = config.encoding.transformer();
258        let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
259        let encoder = Encoder::<Framer>::new(framer, serializer);
260
261        let offset = config
262            .timezone
263            .or(cx.globals.timezone)
264            .and_then(timezone_to_offset);
265
266        Ok(Self {
267            path: config.path.clone().with_tz_offset(offset),
268            transformer,
269            encoder,
270            idle_timeout: config.idle_timeout,
271            files: ExpiringHashMap::default(),
272            compression: config.compression,
273            events_sent: register!(EventsSent::from(Output(None))),
274            include_file_metric_tag: config.internal_metrics.include_file_tag,
275            truncation_config: config.truncate.clone(),
276        })
277    }
278
279    /// Uses pass the `event` to `self.path` template to obtain the file path
280    /// to store the event as.
281    fn partition_event(&mut self, event: &Event) -> Option<bytes::Bytes> {
282        let bytes = match self.path.render(event) {
283            Ok(b) => b,
284            Err(error) => {
285                emit!(TemplateRenderingError {
286                    error,
287                    field: Some("path"),
288                    drop_event: true,
289                });
290                return None;
291            }
292        };
293
294        Some(bytes)
295    }
296
297    fn deadline_at(&self) -> Instant {
298        Instant::now()
299            .checked_add(self.idle_timeout)
300            .expect("unable to compute next deadline")
301    }
302
303    async fn run(&mut self, mut input: BoxStream<'_, Event>) -> crate::Result<()> {
304        loop {
305            tokio::select! {
306                event = input.next() => {
307                    match event {
308                        Some(event) => self.process_event(event).await,
309                        None => {
310                            // If we got `None` - terminate the processing.
311                            debug!(message = "Receiver exhausted, terminating the processing loop.");
312
313                            // Close all the open files.
314                            debug!(message = "Closing all the open files.");
315                            for (path, file) in self.files.iter_mut() {
316                                if let Err(error) = file.close().await {
317                                    emit!(FileIoError {
318                                        error,
319                                        code: "failed_closing_file",
320                                        message: "Failed to close file.",
321                                        path,
322                                        dropped_events: 0,
323                                    });
324                                } else{
325                                    trace!(message = "Successfully closed file.", path = ?path);
326                                }
327                            }
328
329                            emit!(FileOpen {
330                                count: 0
331                            });
332
333                            break;
334                        }
335                    }
336                }
337                result = self.files.next_expired(), if !self.files.is_empty() => {
338                    match result {
339                        // We do not poll map when it's empty, so we should
340                        // never reach this branch.
341                        None => unreachable!(),
342                        Some((expired_file, path)) => {
343                            // We got an expired file. All we really want is to
344                            // flush and close it.
345                            self.close_file(expired_file, path).await;
346                        }
347                    }
348                }
349            }
350        }
351
352        Ok(())
353    }
354
355    async fn process_event(&mut self, mut event: Event) {
356        let path = match self.partition_event(&event) {
357            Some(path) => path,
358            None => {
359                // We weren't able to find the path to use for the
360                // file.
361                // The error is already handled at `partition_event`, so
362                // here we just skip the event.
363                event.metadata().update_status(EventStatus::Errored);
364                return;
365            }
366        };
367
368        let next_deadline = self.deadline_at();
369        trace!(message = "Computed next deadline.", next_deadline = ?next_deadline, path = ?path);
370
371        let bytes_path = BytesPath::new(path.clone());
372        let truncate = self.should_truncate(&bytes_path, &path).await;
373        let file = if !truncate && let Some(file) = self.files.reset_at(&path, next_deadline) {
374            trace!(message = "Working with an already opened file.", path = ?path);
375            file
376        } else {
377            trace!(message = "Opening new file.", ?path);
378            let file = match open_file(bytes_path, truncate).await {
379                Ok(file) => file,
380                Err(error) => {
381                    // We couldn't open the file for this event.
382                    // Maybe other events will work though! Just log
383                    // the error and skip this event.
384                    emit!(FileIoError {
385                        code: "failed_opening_file",
386                        message: "Unable to open the file.",
387                        error,
388                        path: &path,
389                        dropped_events: 1,
390                    });
391                    event.metadata().update_status(EventStatus::Errored);
392                    return;
393                }
394            };
395
396            let outfile = OutFile::new(file, self.compression);
397
398            self.files.insert_at(path.clone(), outfile, next_deadline);
399            emit!(FileOpen {
400                count: self.files.len()
401            });
402            self.files.get_mut(&path).unwrap()
403        };
404
405        trace!(message = "Writing an event to file.", path = ?path);
406        let event_size = event.estimated_json_encoded_size_of();
407        let finalizers = event.take_finalizers();
408        match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await {
409            Ok(byte_size) => {
410                finalizers.update_status(EventStatus::Delivered);
411                self.events_sent.emit(CountByteSize(1, event_size));
412                emit!(FileBytesSent {
413                    byte_size,
414                    file: String::from_utf8_lossy(&path),
415                    include_file_metric_tag: self.include_file_metric_tag,
416                });
417            }
418            Err(error) => {
419                finalizers.update_status(EventStatus::Errored);
420                emit!(FileIoError {
421                    code: "failed_writing_file",
422                    message: "Failed to write the file.",
423                    error,
424                    path: &path,
425                    dropped_events: 1,
426                });
427            }
428        }
429    }
430
431    async fn should_truncate(&mut self, bytes_path: &BytesPath, path: &bytes::Bytes) -> bool {
432        let mut truncate = false;
433
434        if let Some(after_close_time_secs) = self.truncation_config.after_close_time_secs
435            && self.files.get(path).is_none()
436            && let Ok(metadata) = fs::metadata(bytes_path).await
437            && let Ok(time) = metadata
438                .modified()
439                .map_err(|_| ())
440                .and_then(|t| t.elapsed().map_err(|_| ()))
441            && time.as_secs() > after_close_time_secs.into()
442        {
443            truncate = true;
444        }
445
446        if let Some(after_secs) = self.truncation_config.after_secs
447            && let Some(file) = self.files.get(path)
448            && (file.created_at().elapsed().as_secs() > after_secs.into())
449        {
450            truncate = true;
451        }
452
453        if let Some(after_modified_time_secs) = self.truncation_config.after_modified_time_secs
454            && let Some(previous_modification) = self
455                .files
456                .get_with_deadline(path)
457                .and_then(|(_, deadline)| deadline.checked_sub(self.idle_timeout))
458            && previous_modification.elapsed().as_secs() > after_modified_time_secs.into()
459        {
460            truncate = true;
461        }
462
463        if truncate && let Some((file, path)) = self.files.remove(path) {
464            self.close_file(file, path).await;
465        }
466
467        truncate
468    }
469
470    async fn close_file(&self, mut file: OutFile, path: Expired<Bytes>) {
471        if let Err(error) = file.close().await {
472            emit!(FileIoError {
473                error,
474                code: "failed_closing_file",
475                message: "Failed to close file.",
476                path: &path,
477                dropped_events: 0,
478            });
479        }
480        drop(file); // ignore close error
481        emit!(FileOpen {
482            count: self.files.len()
483        });
484    }
485}
486
487async fn open_file(path: impl AsRef<std::path::Path>, truncate: bool) -> std::io::Result<File> {
488    let parent = path.as_ref().parent();
489
490    if let Some(parent) = parent {
491        fs::create_dir_all(parent).await?;
492    }
493
494    fs::OpenOptions::new()
495        .read(false)
496        .write(true)
497        .create(true)
498        .append(!truncate)
499        .truncate(truncate)
500        .open(path)
501        .await
502}
503
504async fn write_event_to_file(
505    file: &mut OutFile,
506    mut event: Event,
507    transformer: &Transformer,
508    encoder: &mut Encoder<Framer>,
509) -> Result<usize, std::io::Error> {
510    transformer.transform(&mut event);
511    let mut buffer = BytesMut::new();
512    encoder
513        .encode(event, &mut buffer)
514        .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))?;
515    file.write_all(&buffer).await.map(|()| buffer.len())
516}
517
518#[async_trait]
519impl StreamSink<Event> for FileSink {
520    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
521        FileSink::run(&mut self, input)
522            .await
523            .expect("file sink error");
524        Ok(())
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use std::convert::TryInto;
531
532    use chrono::{SubsecRound, Utc};
533    use futures::{SinkExt, stream};
534    use similar_asserts::assert_eq;
535    use vector_lib::{
536        codecs::JsonSerializerConfig,
537        event::{LogEvent, TraceEvent},
538        sink::VectorSink,
539    };
540
541    use super::*;
542    use crate::{
543        config::log_schema,
544        test_util::{
545            components::{FILE_SINK_TAGS, assert_sink_compliance},
546            lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
547            random_lines_with_stream, random_metrics_with_stream,
548            random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
549        },
550    };
551
552    #[test]
553    fn generate_config() {
554        crate::test_util::test_generate_config::<FileSinkConfig>();
555    }
556
557    #[tokio::test]
558    async fn log_single_partition() {
559        let template = temp_file();
560
561        let config = FileSinkConfig {
562            path: template.clone().try_into().unwrap(),
563            idle_timeout: default_idle_timeout(),
564            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
565            compression: Compression::None,
566            acknowledgements: Default::default(),
567            timezone: Default::default(),
568            internal_metrics: FileInternalMetricsConfig {
569                include_file_tag: true,
570            },
571            truncate: Default::default(),
572        };
573
574        let (input, _events) = random_lines_with_stream(100, 64, None);
575
576        run_assert_log_sink(&config, input.clone()).await;
577
578        let output = lines_from_file(template);
579        for (input, output) in input.into_iter().zip(output) {
580            assert_eq!(input, output);
581        }
582    }
583
584    #[tokio::test]
585    async fn log_single_partition_gzip() {
586        let template = temp_file();
587
588        let config = FileSinkConfig {
589            path: template.clone().try_into().unwrap(),
590            idle_timeout: default_idle_timeout(),
591            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
592            compression: Compression::Gzip,
593            acknowledgements: Default::default(),
594            timezone: Default::default(),
595            internal_metrics: FileInternalMetricsConfig {
596                include_file_tag: true,
597            },
598            truncate: Default::default(),
599        };
600
601        let (input, _) = random_lines_with_stream(100, 64, None);
602
603        run_assert_log_sink(&config, input.clone()).await;
604
605        let output = lines_from_gzip_file(template);
606        for (input, output) in input.into_iter().zip(output) {
607            assert_eq!(input, output);
608        }
609    }
610
611    #[tokio::test]
612    async fn log_single_partition_zstd() {
613        let template = temp_file();
614
615        let config = FileSinkConfig {
616            path: template.clone().try_into().unwrap(),
617            idle_timeout: default_idle_timeout(),
618            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
619            compression: Compression::Zstd,
620            acknowledgements: Default::default(),
621            timezone: Default::default(),
622            internal_metrics: FileInternalMetricsConfig {
623                include_file_tag: true,
624            },
625            truncate: Default::default(),
626        };
627
628        let (input, _) = random_lines_with_stream(100, 64, None);
629
630        run_assert_log_sink(&config, input.clone()).await;
631
632        let output = lines_from_zstd_file(template);
633        for (input, output) in input.into_iter().zip(output) {
634            assert_eq!(input, output);
635        }
636    }
637
638    #[tokio::test]
639    async fn log_many_partitions() {
640        let directory = temp_dir();
641
642        let mut template = directory.to_string_lossy().to_string();
643        template.push_str("/{{level}}s-{{date}}.log");
644
645        trace!(message = "Template.", %template);
646
647        let config = FileSinkConfig {
648            path: template.try_into().unwrap(),
649            idle_timeout: default_idle_timeout(),
650            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
651            compression: Compression::None,
652            acknowledgements: Default::default(),
653            timezone: Default::default(),
654            internal_metrics: FileInternalMetricsConfig {
655                include_file_tag: true,
656            },
657            truncate: Default::default(),
658        };
659
660        let (mut input, _events) = random_events_with_stream(32, 8, None);
661        input[0].as_mut_log().insert("date", "2019-26-07");
662        input[0].as_mut_log().insert("level", "warning");
663        input[1].as_mut_log().insert("date", "2019-26-07");
664        input[1].as_mut_log().insert("level", "error");
665        input[2].as_mut_log().insert("date", "2019-26-07");
666        input[2].as_mut_log().insert("level", "warning");
667        input[3].as_mut_log().insert("date", "2019-27-07");
668        input[3].as_mut_log().insert("level", "error");
669        input[4].as_mut_log().insert("date", "2019-27-07");
670        input[4].as_mut_log().insert("level", "warning");
671        input[5].as_mut_log().insert("date", "2019-27-07");
672        input[5].as_mut_log().insert("level", "warning");
673        input[6].as_mut_log().insert("date", "2019-28-07");
674        input[6].as_mut_log().insert("level", "warning");
675        input[7].as_mut_log().insert("date", "2019-29-07");
676        input[7].as_mut_log().insert("level", "error");
677
678        run_assert_sink(&config, input.clone().into_iter()).await;
679
680        let output = [
681            lines_from_file(directory.join("warnings-2019-26-07.log")),
682            lines_from_file(directory.join("errors-2019-26-07.log")),
683            lines_from_file(directory.join("warnings-2019-27-07.log")),
684            lines_from_file(directory.join("errors-2019-27-07.log")),
685            lines_from_file(directory.join("warnings-2019-28-07.log")),
686            lines_from_file(directory.join("errors-2019-29-07.log")),
687        ];
688
689        let message_key = log_schema().message_key().unwrap().to_string();
690        assert_eq!(
691            input[0].as_log()[&message_key],
692            From::<&str>::from(&output[0][0])
693        );
694        assert_eq!(
695            input[1].as_log()[&message_key],
696            From::<&str>::from(&output[1][0])
697        );
698        assert_eq!(
699            input[2].as_log()[&message_key],
700            From::<&str>::from(&output[0][1])
701        );
702        assert_eq!(
703            input[3].as_log()[&message_key],
704            From::<&str>::from(&output[3][0])
705        );
706        assert_eq!(
707            input[4].as_log()[&message_key],
708            From::<&str>::from(&output[2][0])
709        );
710        assert_eq!(
711            input[5].as_log()[&message_key],
712            From::<&str>::from(&output[2][1])
713        );
714        assert_eq!(
715            input[6].as_log()[&message_key],
716            From::<&str>::from(&output[4][0])
717        );
718        assert_eq!(
719            input[7].as_log()[message_key],
720            From::<&str>::from(&output[5][0])
721        );
722    }
723
724    #[tokio::test]
725    async fn log_reopening() {
726        trace_init();
727
728        let template = temp_file();
729
730        let config = FileSinkConfig {
731            path: template.clone().try_into().unwrap(),
732            idle_timeout: Duration::from_secs(1),
733            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
734            compression: Compression::None,
735            acknowledgements: Default::default(),
736            timezone: Default::default(),
737            internal_metrics: FileInternalMetricsConfig {
738                include_file_tag: true,
739            },
740            truncate: Default::default(),
741        };
742
743        let (mut input, _events) = random_lines_with_stream(10, 64, None);
744
745        let (mut tx, rx) = futures::channel::mpsc::channel(0);
746
747        let sink_handle = tokio::spawn(async move {
748            assert_sink_compliance(&FILE_SINK_TAGS, async move {
749                let sink = FileSink::new(&config, SinkContext::default()).unwrap();
750                VectorSink::from_event_streamsink(sink)
751                    .run(Box::pin(rx.map(Into::into)))
752                    .await
753                    .expect("Running sink failed");
754            })
755            .await
756        });
757
758        // send initial payload
759        for line in input.clone() {
760            tx.send(Event::Log(LogEvent::from(line))).await.unwrap();
761        }
762
763        // wait for file to go idle and be closed
764        tokio::time::sleep(Duration::from_secs(2)).await;
765
766        // trigger another write
767        let last_line = "i should go at the end";
768        tx.send(LogEvent::from(last_line).into()).await.unwrap();
769        input.push(String::from(last_line));
770
771        // wait for another flush
772        tokio::time::sleep(Duration::from_secs(1)).await;
773
774        // make sure we appended instead of overwriting
775        let output = lines_from_file(template);
776        assert_eq!(input, output);
777
778        // make sure sink stops and that it did not panic
779        drop(tx);
780        sink_handle.await.unwrap();
781    }
782
783    #[tokio::test]
784    async fn metric_single_partition() {
785        let template = temp_file();
786
787        let config = FileSinkConfig {
788            path: template.clone().try_into().unwrap(),
789            idle_timeout: default_idle_timeout(),
790            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
791            compression: Compression::None,
792            acknowledgements: Default::default(),
793            timezone: Default::default(),
794            internal_metrics: FileInternalMetricsConfig {
795                include_file_tag: true,
796            },
797            truncate: Default::default(),
798        };
799
800        let (input, _events) = random_metrics_with_stream(100, None, None);
801
802        run_assert_sink(&config, input.clone().into_iter()).await;
803
804        let output = lines_from_file(template);
805        for (input, output) in input.into_iter().zip(output) {
806            let metric_name = input.as_metric().name();
807            assert!(output.contains(metric_name));
808        }
809    }
810
811    #[tokio::test]
812    async fn metric_many_partitions() {
813        let directory = temp_dir();
814
815        let format = "%Y-%m-%d-%H-%M-%S";
816        let mut template = directory.to_string_lossy().to_string();
817        template.push_str(&format!("/{format}.log"));
818
819        let config = FileSinkConfig {
820            path: template.try_into().unwrap(),
821            idle_timeout: default_idle_timeout(),
822            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
823            compression: Compression::None,
824            acknowledgements: Default::default(),
825            timezone: Default::default(),
826            internal_metrics: FileInternalMetricsConfig {
827                include_file_tag: true,
828            },
829            truncate: Default::default(),
830        };
831
832        let metric_count = 3;
833        let timestamp = Utc::now().trunc_subsecs(3);
834        let timestamp_offset = Duration::from_secs(1);
835
836        let (input, _events) = random_metrics_with_stream_timestamp(
837            metric_count,
838            None,
839            None,
840            timestamp,
841            timestamp_offset,
842        );
843
844        run_assert_sink(&config, input.clone().into_iter()).await;
845
846        let output = (0..metric_count).map(|index| {
847            let expected_timestamp = timestamp + (timestamp_offset * index as u32);
848            let expected_filename =
849                directory.join(format!("{}.log", expected_timestamp.format(format)));
850
851            lines_from_file(expected_filename)
852        });
853        for (input, output) in input.iter().zip(output) {
854            // The format will partition by second and metrics are a second apart.
855            assert_eq!(
856                output.len(),
857                1,
858                "Expected the output file to contain one metric"
859            );
860            let output = &output[0];
861
862            let metric_name = input.as_metric().name();
863            assert!(output.contains(metric_name));
864        }
865    }
866
867    #[tokio::test]
868    async fn trace_single_partition() {
869        let template = temp_file();
870
871        let config = FileSinkConfig {
872            path: template.clone().try_into().unwrap(),
873            idle_timeout: default_idle_timeout(),
874            encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
875            compression: Compression::None,
876            acknowledgements: Default::default(),
877            timezone: Default::default(),
878            internal_metrics: FileInternalMetricsConfig {
879                include_file_tag: true,
880            },
881            truncate: Default::default(),
882        };
883
884        let (input, _events) = random_lines_with_stream(100, 64, None);
885
886        run_assert_trace_sink(&config, input.clone()).await;
887
888        let output = lines_from_file(template);
889        for (input, output) in input.iter().zip(output) {
890            assert!(output.contains(input));
891        }
892    }
893
894    async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
895        run_assert_sink(
896            config,
897            events.into_iter().map(LogEvent::from).map(Event::Log),
898        )
899        .await;
900    }
901
902    async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
903        run_assert_sink(
904            config,
905            events
906                .into_iter()
907                .map(LogEvent::from)
908                .map(TraceEvent::from)
909                .map(Event::Trace),
910        )
911        .await;
912    }
913
914    async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
915        assert_sink_compliance(&FILE_SINK_TAGS, async move {
916            let sink = FileSink::new(config, SinkContext::default()).unwrap();
917            VectorSink::from_event_streamsink(sink)
918                .run(Box::pin(stream::iter(events.map(Into::into))))
919                .await
920                .expect("Running sink failed")
921        })
922        .await;
923    }
924}