vector/sinks/file/
mod.rs

1use std::{
2    convert::TryFrom,
3    num::NonZeroU64,
4    time::{Duration, Instant},
5};
6
7use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
8use async_trait::async_trait;
9use bytes::{Bytes, BytesMut};
10use futures::{
11    FutureExt, future,
12    stream::{BoxStream, StreamExt},
13};
14use serde_with::serde_as;
15use tokio::{
16    fs::{self, File},
17    io::AsyncWriteExt,
18};
19use tokio_util::{codec::Encoder as _, time::delay_queue::Expired};
20use vector_lib::{
21    EstimatedJsonEncodedSizeOf, TimeZone,
22    codecs::{
23        TextSerializerConfig,
24        encoding::{Framer, FramingConfig},
25    },
26    configurable::configurable_component,
27    internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
28};
29
30use crate::{
31    codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
32    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
33    event::{Event, EventStatus, Finalizable},
34    expiring_hash_map::ExpiringHashMap,
35    internal_events::{
36        FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
37    },
38    sinks::util::{StreamSink, timezone_to_offset},
39    template::Template,
40};
41
42mod bytes_path;
43
44use bytes_path::BytesPath;
45
46/// Configuration for the `file` sink.
47#[serde_as]
48#[configurable_component(sink("file", "Output observability events into files."))]
49#[derive(Clone, Debug)]
50#[serde(deny_unknown_fields)]
51pub struct FileSinkConfig {
52    /// File path to write events to.
53    ///
54    /// Compression format extension must be explicit.
55    #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log"))]
56    #[configurable(metadata(
57        docs::examples = "/tmp/application-{{ application_id }}-%Y-%m-%d.log"
58    ))]
59    #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log.zst"))]
60    pub path: Template,
61
62    /// The amount of time that a file can be idle and stay open.
63    ///
64    /// After not receiving any events in this amount of time, the file is flushed and closed.
65    #[serde(default = "default_idle_timeout")]
66    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
67    #[serde(rename = "idle_timeout_secs")]
68    #[configurable(metadata(docs::examples = 600))]
69    #[configurable(metadata(docs::human_name = "Idle Timeout"))]
70    pub idle_timeout: Duration,
71
72    #[serde(flatten)]
73    pub encoding: EncodingConfigWithFraming,
74
75    #[configurable(derived)]
76    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
77    pub compression: Compression,
78
79    #[configurable(derived)]
80    #[serde(
81        default,
82        deserialize_with = "crate::serde::bool_or_struct",
83        skip_serializing_if = "crate::serde::is_default"
84    )]
85    pub acknowledgements: AcknowledgementsConfig,
86
87    #[configurable(derived)]
88    #[serde(default)]
89    pub timezone: Option<TimeZone>,
90
91    #[configurable(derived)]
92    #[serde(default)]
93    pub internal_metrics: FileInternalMetricsConfig,
94
95    #[configurable(derived)]
96    #[serde(default)]
97    pub truncate: FileTruncateConfig,
98}
99
100/// Configuration for truncating files.
101#[configurable_component]
102#[derive(Clone, Debug, Default)]
103#[serde(deny_unknown_fields)]
104pub struct FileTruncateConfig {
105    /// If this is set, files will be truncated after being closed for a set amount of seconds.
106    #[serde(default)]
107    pub after_close_time_secs: Option<NonZeroU64>,
108    /// If this is set, files will be truncated after set amount of seconds of no modifications.
109    #[serde(default)]
110    pub after_modified_time_secs: Option<NonZeroU64>,
111    /// If this is set, files will be truncated after set amount of seconds regardless of the state.
112    #[serde(default)]
113    pub after_secs: Option<NonZeroU64>,
114}
115
116impl GenerateConfig for FileSinkConfig {
117    fn generate_config() -> toml::Value {
118        toml::Value::try_from(Self {
119            path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(),
120            idle_timeout: default_idle_timeout(),
121            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
122            compression: Default::default(),
123            acknowledgements: Default::default(),
124            timezone: Default::default(),
125            internal_metrics: Default::default(),
126            truncate: Default::default(),
127        })
128        .unwrap()
129    }
130}
131
132const fn default_idle_timeout() -> Duration {
133    Duration::from_secs(30)
134}
135
136/// Compression configuration.
137// TODO: Why doesn't this already use `crate::sinks::util::Compression`
138// `crate::sinks::util::Compression` doesn't support zstd yet
139#[configurable_component]
140#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
141#[serde(rename_all = "snake_case")]
142pub enum Compression {
143    /// [Gzip][gzip] compression.
144    ///
145    /// [gzip]: https://www.gzip.org/
146    Gzip,
147
148    /// [Zstandard][zstd] compression.
149    ///
150    /// [zstd]: https://facebook.github.io/zstd/
151    Zstd,
152
153    /// No compression.
154    #[default]
155    None,
156}
157
158struct OutFile {
159    created_at: Instant,
160    inner: OutFileInner,
161}
162
163enum OutFileInner {
164    Regular(File),
165    Gzip(GzipEncoder<File>),
166    Zstd(ZstdEncoder<File>),
167}
168
169impl OutFile {
170    fn new(file: File, compression: Compression) -> Self {
171        Self {
172            created_at: Instant::now(),
173            inner: match compression {
174                Compression::None => OutFileInner::Regular(file),
175                Compression::Gzip => OutFileInner::Gzip(GzipEncoder::new(file)),
176                Compression::Zstd => OutFileInner::Zstd(ZstdEncoder::new(file)),
177            },
178        }
179    }
180
181    async fn sync_all(&mut self) -> Result<(), std::io::Error> {
182        match &mut self.inner {
183            OutFileInner::Regular(file) => file.sync_all().await,
184            OutFileInner::Gzip(gzip) => gzip.get_mut().sync_all().await,
185            OutFileInner::Zstd(zstd) => zstd.get_mut().sync_all().await,
186        }
187    }
188
189    async fn shutdown(&mut self) -> Result<(), std::io::Error> {
190        match &mut self.inner {
191            OutFileInner::Regular(file) => file.shutdown().await,
192            OutFileInner::Gzip(gzip) => gzip.shutdown().await,
193            OutFileInner::Zstd(zstd) => zstd.shutdown().await,
194        }
195    }
196
197    async fn write_all(&mut self, src: &[u8]) -> Result<(), std::io::Error> {
198        match &mut self.inner {
199            OutFileInner::Regular(file) => file.write_all(src).await,
200            OutFileInner::Gzip(gzip) => gzip.write_all(src).await,
201            OutFileInner::Zstd(zstd) => zstd.write_all(src).await,
202        }
203    }
204
205    const fn created_at(&self) -> Instant {
206        self.created_at
207    }
208
209    /// Shutdowns by flushing data, writing headers, and syncing all of that
210    /// data and metadata to the filesystem.
211    async fn close(&mut self) -> Result<(), std::io::Error> {
212        self.shutdown().await?;
213        self.sync_all().await
214    }
215}
216
217#[async_trait::async_trait]
218#[typetag::serde(name = "file")]
219impl SinkConfig for FileSinkConfig {
220    async fn build(
221        &self,
222        cx: SinkContext,
223    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
224        let sink = FileSink::new(self, cx)?;
225        Ok((
226            super::VectorSink::from_event_streamsink(sink),
227            future::ok(()).boxed(),
228        ))
229    }
230
231    fn input(&self) -> Input {
232        Input::new(self.encoding.config().1.input_type())
233    }
234
235    fn acknowledgements(&self) -> &AcknowledgementsConfig {
236        &self.acknowledgements
237    }
238}
239
240pub struct FileSink {
241    path: Template,
242    transformer: Transformer,
243    encoder: Encoder<Framer>,
244    idle_timeout: Duration,
245    files: ExpiringHashMap<Bytes, OutFile>,
246    compression: Compression,
247    events_sent: Registered<EventsSent>,
248    include_file_metric_tag: bool,
249    truncation_config: FileTruncateConfig,
250}
251
252impl FileSink {
253    pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
254        let transformer = config.encoding.transformer();
255        let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
256        let encoder = Encoder::<Framer>::new(framer, serializer);
257
258        let offset = config
259            .timezone
260            .or(cx.globals.timezone)
261            .and_then(timezone_to_offset);
262
263        Ok(Self {
264            path: config.path.clone().with_tz_offset(offset),
265            transformer,
266            encoder,
267            idle_timeout: config.idle_timeout,
268            files: ExpiringHashMap::default(),
269            compression: config.compression,
270            events_sent: register!(EventsSent::from(Output(None))),
271            include_file_metric_tag: config.internal_metrics.include_file_tag,
272            truncation_config: config.truncate.clone(),
273        })
274    }
275
276    /// Uses pass the `event` to `self.path` template to obtain the file path
277    /// to store the event as.
278    fn partition_event(&mut self, event: &Event) -> Option<bytes::Bytes> {
279        let bytes = match self.path.render(event) {
280            Ok(b) => b,
281            Err(error) => {
282                emit!(TemplateRenderingError {
283                    error,
284                    field: Some("path"),
285                    drop_event: true,
286                });
287                return None;
288            }
289        };
290
291        Some(bytes)
292    }
293
294    fn deadline_at(&self) -> Instant {
295        Instant::now()
296            .checked_add(self.idle_timeout)
297            .expect("unable to compute next deadline")
298    }
299
300    async fn run(&mut self, mut input: BoxStream<'_, Event>) -> crate::Result<()> {
301        loop {
302            tokio::select! {
303                event = input.next() => {
304                    match event {
305                        Some(event) => self.process_event(event).await,
306                        None => {
307                            // If we got `None` - terminate the processing.
308                            debug!(message = "Receiver exhausted, terminating the processing loop.");
309
310                            // Close all the open files.
311                            debug!(message = "Closing all the open files.");
312                            for (path, file) in self.files.iter_mut() {
313                                if let Err(error) = file.close().await {
314                                    emit!(FileIoError {
315                                        error,
316                                        code: "failed_closing_file",
317                                        message: "Failed to close file.",
318                                        path,
319                                        dropped_events: 0,
320                                    });
321                                } else{
322                                    trace!(message = "Successfully closed file.", path = ?path);
323                                }
324                            }
325
326                            emit!(FileOpen {
327                                count: 0
328                            });
329
330                            break;
331                        }
332                    }
333                }
334                result = self.files.next_expired(), if !self.files.is_empty() => {
335                    match result {
336                        // We do not poll map when it's empty, so we should
337                        // never reach this branch.
338                        None => unreachable!(),
339                        Some((expired_file, path)) => {
340                            // We got an expired file. All we really want is to
341                            // flush and close it.
342                            self.close_file(expired_file, path).await;
343                        }
344                    }
345                }
346            }
347        }
348
349        Ok(())
350    }
351
352    async fn process_event(&mut self, mut event: Event) {
353        let path = match self.partition_event(&event) {
354            Some(path) => path,
355            None => {
356                // We weren't able to find the path to use for the
357                // file.
358                // The error is already handled at `partition_event`, so
359                // here we just skip the event.
360                event.metadata().update_status(EventStatus::Errored);
361                return;
362            }
363        };
364
365        let next_deadline = self.deadline_at();
366        trace!(message = "Computed next deadline.", next_deadline = ?next_deadline, path = ?path);
367
368        let bytes_path = BytesPath::new(path.clone());
369        let truncate = self.should_truncate(&bytes_path, &path).await;
370        let file = if !truncate && let Some(file) = self.files.reset_at(&path, next_deadline) {
371            trace!(message = "Working with an already opened file.", path = ?path);
372            file
373        } else {
374            trace!(message = "Opening new file.", ?path);
375            let file = match open_file(bytes_path, truncate).await {
376                Ok(file) => file,
377                Err(error) => {
378                    // We couldn't open the file for this event.
379                    // Maybe other events will work though! Just log
380                    // the error and skip this event.
381                    emit!(FileIoError {
382                        code: "failed_opening_file",
383                        message: "Unable to open the file.",
384                        error,
385                        path: &path,
386                        dropped_events: 1,
387                    });
388                    event.metadata().update_status(EventStatus::Errored);
389                    return;
390                }
391            };
392
393            let outfile = OutFile::new(file, self.compression);
394
395            self.files.insert_at(path.clone(), outfile, next_deadline);
396            emit!(FileOpen {
397                count: self.files.len()
398            });
399            self.files.get_mut(&path).unwrap()
400        };
401
402        trace!(message = "Writing an event to file.", path = ?path);
403        let event_size = event.estimated_json_encoded_size_of();
404        let finalizers = event.take_finalizers();
405        match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await {
406            Ok(byte_size) => {
407                finalizers.update_status(EventStatus::Delivered);
408                self.events_sent.emit(CountByteSize(1, event_size));
409                emit!(FileBytesSent {
410                    byte_size,
411                    file: String::from_utf8_lossy(&path),
412                    include_file_metric_tag: self.include_file_metric_tag,
413                });
414            }
415            Err(error) => {
416                finalizers.update_status(EventStatus::Errored);
417                emit!(FileIoError {
418                    code: "failed_writing_file",
419                    message: "Failed to write the file.",
420                    error,
421                    path: &path,
422                    dropped_events: 1,
423                });
424            }
425        }
426    }
427
428    async fn should_truncate(&mut self, bytes_path: &BytesPath, path: &bytes::Bytes) -> bool {
429        let mut truncate = false;
430
431        if let Some(after_close_time_secs) = self.truncation_config.after_close_time_secs
432            && self.files.get(path).is_none()
433            && let Ok(metadata) = fs::metadata(bytes_path).await
434            && let Ok(time) = metadata
435                .modified()
436                .map_err(|_| ())
437                .and_then(|t| t.elapsed().map_err(|_| ()))
438            && time.as_secs() > after_close_time_secs.into()
439        {
440            truncate = true;
441        }
442
443        if let Some(after_secs) = self.truncation_config.after_secs
444            && let Some(file) = self.files.get(path)
445            && (file.created_at().elapsed().as_secs() > after_secs.into())
446        {
447            truncate = true;
448        }
449
450        if let Some(after_modified_time_secs) = self.truncation_config.after_modified_time_secs
451            && let Some(previous_modification) = self
452                .files
453                .get_with_deadline(path)
454                .and_then(|(_, deadline)| deadline.checked_sub(self.idle_timeout))
455            && previous_modification.elapsed().as_secs() > after_modified_time_secs.into()
456        {
457            truncate = true;
458        }
459
460        if truncate && let Some((file, path)) = self.files.remove(path) {
461            self.close_file(file, path).await;
462        }
463
464        truncate
465    }
466
467    async fn close_file(&self, mut file: OutFile, path: Expired<Bytes>) {
468        if let Err(error) = file.close().await {
469            emit!(FileIoError {
470                error,
471                code: "failed_closing_file",
472                message: "Failed to close file.",
473                path: &path,
474                dropped_events: 0,
475            });
476        }
477        drop(file); // ignore close error
478        emit!(FileOpen {
479            count: self.files.len()
480        });
481    }
482}
483
484async fn open_file(path: impl AsRef<std::path::Path>, truncate: bool) -> std::io::Result<File> {
485    let parent = path.as_ref().parent();
486
487    if let Some(parent) = parent {
488        fs::create_dir_all(parent).await?;
489    }
490
491    fs::OpenOptions::new()
492        .read(false)
493        .write(true)
494        .create(true)
495        .append(!truncate)
496        .truncate(truncate)
497        .open(path)
498        .await
499}
500
501async fn write_event_to_file(
502    file: &mut OutFile,
503    mut event: Event,
504    transformer: &Transformer,
505    encoder: &mut Encoder<Framer>,
506) -> Result<usize, std::io::Error> {
507    transformer.transform(&mut event);
508    let mut buffer = BytesMut::new();
509    encoder
510        .encode(event, &mut buffer)
511        .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))?;
512    file.write_all(&buffer).await.map(|()| buffer.len())
513}
514
515#[async_trait]
516impl StreamSink<Event> for FileSink {
517    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
518        FileSink::run(&mut self, input)
519            .await
520            .expect("file sink error");
521        Ok(())
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use std::convert::TryInto;
528
529    use chrono::{SubsecRound, Utc};
530    use futures::{SinkExt, stream};
531    use similar_asserts::assert_eq;
532    use vector_lib::{
533        codecs::JsonSerializerConfig,
534        event::{LogEvent, TraceEvent},
535        sink::VectorSink,
536    };
537
538    use super::*;
539    use crate::{
540        config::log_schema,
541        test_util::{
542            components::{FILE_SINK_TAGS, assert_sink_compliance},
543            lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
544            random_lines_with_stream, random_metrics_with_stream,
545            random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
546        },
547    };
548
549    #[test]
550    fn generate_config() {
551        crate::test_util::test_generate_config::<FileSinkConfig>();
552    }
553
554    #[tokio::test]
555    async fn log_single_partition() {
556        let template = temp_file();
557
558        let config = FileSinkConfig {
559            path: template.clone().try_into().unwrap(),
560            idle_timeout: default_idle_timeout(),
561            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
562            compression: Compression::None,
563            acknowledgements: Default::default(),
564            timezone: Default::default(),
565            internal_metrics: FileInternalMetricsConfig {
566                include_file_tag: true,
567            },
568            truncate: Default::default(),
569        };
570
571        let (input, _events) = random_lines_with_stream(100, 64, None);
572
573        run_assert_log_sink(&config, input.clone()).await;
574
575        let output = lines_from_file(template);
576        for (input, output) in input.into_iter().zip(output) {
577            assert_eq!(input, output);
578        }
579    }
580
581    #[tokio::test]
582    async fn log_single_partition_gzip() {
583        let template = temp_file();
584
585        let config = FileSinkConfig {
586            path: template.clone().try_into().unwrap(),
587            idle_timeout: default_idle_timeout(),
588            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
589            compression: Compression::Gzip,
590            acknowledgements: Default::default(),
591            timezone: Default::default(),
592            internal_metrics: FileInternalMetricsConfig {
593                include_file_tag: true,
594            },
595            truncate: Default::default(),
596        };
597
598        let (input, _) = random_lines_with_stream(100, 64, None);
599
600        run_assert_log_sink(&config, input.clone()).await;
601
602        let output = lines_from_gzip_file(template);
603        for (input, output) in input.into_iter().zip(output) {
604            assert_eq!(input, output);
605        }
606    }
607
608    #[tokio::test]
609    async fn log_single_partition_zstd() {
610        let template = temp_file();
611
612        let config = FileSinkConfig {
613            path: template.clone().try_into().unwrap(),
614            idle_timeout: default_idle_timeout(),
615            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
616            compression: Compression::Zstd,
617            acknowledgements: Default::default(),
618            timezone: Default::default(),
619            internal_metrics: FileInternalMetricsConfig {
620                include_file_tag: true,
621            },
622            truncate: Default::default(),
623        };
624
625        let (input, _) = random_lines_with_stream(100, 64, None);
626
627        run_assert_log_sink(&config, input.clone()).await;
628
629        let output = lines_from_zstd_file(template);
630        for (input, output) in input.into_iter().zip(output) {
631            assert_eq!(input, output);
632        }
633    }
634
635    #[tokio::test]
636    async fn log_many_partitions() {
637        let directory = temp_dir();
638
639        let mut template = directory.to_string_lossy().to_string();
640        template.push_str("/{{level}}s-{{date}}.log");
641
642        trace!(message = "Template.", %template);
643
644        let config = FileSinkConfig {
645            path: template.try_into().unwrap(),
646            idle_timeout: default_idle_timeout(),
647            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
648            compression: Compression::None,
649            acknowledgements: Default::default(),
650            timezone: Default::default(),
651            internal_metrics: FileInternalMetricsConfig {
652                include_file_tag: true,
653            },
654            truncate: Default::default(),
655        };
656
657        let (mut input, _events) = random_events_with_stream(32, 8, None);
658        input[0].as_mut_log().insert("date", "2019-26-07");
659        input[0].as_mut_log().insert("level", "warning");
660        input[1].as_mut_log().insert("date", "2019-26-07");
661        input[1].as_mut_log().insert("level", "error");
662        input[2].as_mut_log().insert("date", "2019-26-07");
663        input[2].as_mut_log().insert("level", "warning");
664        input[3].as_mut_log().insert("date", "2019-27-07");
665        input[3].as_mut_log().insert("level", "error");
666        input[4].as_mut_log().insert("date", "2019-27-07");
667        input[4].as_mut_log().insert("level", "warning");
668        input[5].as_mut_log().insert("date", "2019-27-07");
669        input[5].as_mut_log().insert("level", "warning");
670        input[6].as_mut_log().insert("date", "2019-28-07");
671        input[6].as_mut_log().insert("level", "warning");
672        input[7].as_mut_log().insert("date", "2019-29-07");
673        input[7].as_mut_log().insert("level", "error");
674
675        run_assert_sink(&config, input.clone().into_iter()).await;
676
677        let output = [
678            lines_from_file(directory.join("warnings-2019-26-07.log")),
679            lines_from_file(directory.join("errors-2019-26-07.log")),
680            lines_from_file(directory.join("warnings-2019-27-07.log")),
681            lines_from_file(directory.join("errors-2019-27-07.log")),
682            lines_from_file(directory.join("warnings-2019-28-07.log")),
683            lines_from_file(directory.join("errors-2019-29-07.log")),
684        ];
685
686        let message_key = log_schema().message_key().unwrap().to_string();
687        assert_eq!(
688            input[0].as_log()[&message_key],
689            From::<&str>::from(&output[0][0])
690        );
691        assert_eq!(
692            input[1].as_log()[&message_key],
693            From::<&str>::from(&output[1][0])
694        );
695        assert_eq!(
696            input[2].as_log()[&message_key],
697            From::<&str>::from(&output[0][1])
698        );
699        assert_eq!(
700            input[3].as_log()[&message_key],
701            From::<&str>::from(&output[3][0])
702        );
703        assert_eq!(
704            input[4].as_log()[&message_key],
705            From::<&str>::from(&output[2][0])
706        );
707        assert_eq!(
708            input[5].as_log()[&message_key],
709            From::<&str>::from(&output[2][1])
710        );
711        assert_eq!(
712            input[6].as_log()[&message_key],
713            From::<&str>::from(&output[4][0])
714        );
715        assert_eq!(
716            input[7].as_log()[message_key],
717            From::<&str>::from(&output[5][0])
718        );
719    }
720
721    #[tokio::test]
722    async fn log_reopening() {
723        trace_init();
724
725        let template = temp_file();
726
727        let config = FileSinkConfig {
728            path: template.clone().try_into().unwrap(),
729            idle_timeout: Duration::from_secs(1),
730            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
731            compression: Compression::None,
732            acknowledgements: Default::default(),
733            timezone: Default::default(),
734            internal_metrics: FileInternalMetricsConfig {
735                include_file_tag: true,
736            },
737            truncate: Default::default(),
738        };
739
740        let (mut input, _events) = random_lines_with_stream(10, 64, None);
741
742        let (mut tx, rx) = futures::channel::mpsc::channel(0);
743
744        let sink_handle = tokio::spawn(async move {
745            assert_sink_compliance(&FILE_SINK_TAGS, async move {
746                let sink = FileSink::new(&config, SinkContext::default()).unwrap();
747                VectorSink::from_event_streamsink(sink)
748                    .run(Box::pin(rx.map(Into::into)))
749                    .await
750                    .expect("Running sink failed");
751            })
752            .await
753        });
754
755        // send initial payload
756        for line in input.clone() {
757            tx.send(Event::Log(LogEvent::from(line))).await.unwrap();
758        }
759
760        // wait for file to go idle and be closed
761        tokio::time::sleep(Duration::from_secs(2)).await;
762
763        // trigger another write
764        let last_line = "i should go at the end";
765        tx.send(LogEvent::from(last_line).into()).await.unwrap();
766        input.push(String::from(last_line));
767
768        // wait for another flush
769        tokio::time::sleep(Duration::from_secs(1)).await;
770
771        // make sure we appended instead of overwriting
772        let output = lines_from_file(template);
773        assert_eq!(input, output);
774
775        // make sure sink stops and that it did not panic
776        drop(tx);
777        sink_handle.await.unwrap();
778    }
779
780    #[tokio::test]
781    async fn metric_single_partition() {
782        let template = temp_file();
783
784        let config = FileSinkConfig {
785            path: template.clone().try_into().unwrap(),
786            idle_timeout: default_idle_timeout(),
787            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
788            compression: Compression::None,
789            acknowledgements: Default::default(),
790            timezone: Default::default(),
791            internal_metrics: FileInternalMetricsConfig {
792                include_file_tag: true,
793            },
794            truncate: Default::default(),
795        };
796
797        let (input, _events) = random_metrics_with_stream(100, None, None);
798
799        run_assert_sink(&config, input.clone().into_iter()).await;
800
801        let output = lines_from_file(template);
802        for (input, output) in input.into_iter().zip(output) {
803            let metric_name = input.as_metric().name();
804            assert!(output.contains(metric_name));
805        }
806    }
807
808    #[tokio::test]
809    async fn metric_many_partitions() {
810        let directory = temp_dir();
811
812        let format = "%Y-%m-%d-%H-%M-%S";
813        let mut template = directory.to_string_lossy().to_string();
814        template.push_str(&format!("/{format}.log"));
815
816        let config = FileSinkConfig {
817            path: template.try_into().unwrap(),
818            idle_timeout: default_idle_timeout(),
819            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
820            compression: Compression::None,
821            acknowledgements: Default::default(),
822            timezone: Default::default(),
823            internal_metrics: FileInternalMetricsConfig {
824                include_file_tag: true,
825            },
826            truncate: Default::default(),
827        };
828
829        let metric_count = 3;
830        let timestamp = Utc::now().trunc_subsecs(3);
831        let timestamp_offset = Duration::from_secs(1);
832
833        let (input, _events) = random_metrics_with_stream_timestamp(
834            metric_count,
835            None,
836            None,
837            timestamp,
838            timestamp_offset,
839        );
840
841        run_assert_sink(&config, input.clone().into_iter()).await;
842
843        let output = (0..metric_count).map(|index| {
844            let expected_timestamp = timestamp + (timestamp_offset * index as u32);
845            let expected_filename =
846                directory.join(format!("{}.log", expected_timestamp.format(format)));
847
848            lines_from_file(expected_filename)
849        });
850        for (input, output) in input.iter().zip(output) {
851            // The format will partition by second and metrics are a second apart.
852            assert_eq!(
853                output.len(),
854                1,
855                "Expected the output file to contain one metric"
856            );
857            let output = &output[0];
858
859            let metric_name = input.as_metric().name();
860            assert!(output.contains(metric_name));
861        }
862    }
863
864    #[tokio::test]
865    async fn trace_single_partition() {
866        let template = temp_file();
867
868        let config = FileSinkConfig {
869            path: template.clone().try_into().unwrap(),
870            idle_timeout: default_idle_timeout(),
871            encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
872            compression: Compression::None,
873            acknowledgements: Default::default(),
874            timezone: Default::default(),
875            internal_metrics: FileInternalMetricsConfig {
876                include_file_tag: true,
877            },
878            truncate: Default::default(),
879        };
880
881        let (input, _events) = random_lines_with_stream(100, 64, None);
882
883        run_assert_trace_sink(&config, input.clone()).await;
884
885        let output = lines_from_file(template);
886        for (input, output) in input.iter().zip(output) {
887            assert!(output.contains(input));
888        }
889    }
890
891    async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
892        run_assert_sink(
893            config,
894            events.into_iter().map(LogEvent::from).map(Event::Log),
895        )
896        .await;
897    }
898
899    async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
900        run_assert_sink(
901            config,
902            events
903                .into_iter()
904                .map(LogEvent::from)
905                .map(TraceEvent::from)
906                .map(Event::Trace),
907        )
908        .await;
909    }
910
911    async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
912        assert_sink_compliance(&FILE_SINK_TAGS, async move {
913            let sink = FileSink::new(config, SinkContext::default()).unwrap();
914            VectorSink::from_event_streamsink(sink)
915                .run(Box::pin(stream::iter(events.map(Into::into))))
916                .await
917                .expect("Running sink failed")
918        })
919        .await;
920    }
921}