vector/sinks/file/
mod.rs

1use std::{
2    convert::TryFrom,
3    time::{Duration, Instant},
4};
5
6use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
7use async_trait::async_trait;
8use bytes::{Bytes, BytesMut};
9use futures::{
10    FutureExt, future,
11    stream::{BoxStream, StreamExt},
12};
13use serde_with::serde_as;
14use tokio::{
15    fs::{self, File},
16    io::AsyncWriteExt,
17};
18use tokio_util::codec::Encoder as _;
19use vector_lib::{
20    EstimatedJsonEncodedSizeOf, TimeZone,
21    codecs::{
22        TextSerializerConfig,
23        encoding::{Framer, FramingConfig},
24    },
25    configurable::configurable_component,
26    internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
27};
28
29use crate::{
30    codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
31    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
32    event::{Event, EventStatus, Finalizable},
33    expiring_hash_map::ExpiringHashMap,
34    internal_events::{
35        FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
36    },
37    sinks::util::{StreamSink, timezone_to_offset},
38    template::Template,
39};
40
41mod bytes_path;
42
43use bytes_path::BytesPath;
44
45/// Configuration for the `file` sink.
46#[serde_as]
47#[configurable_component(sink("file", "Output observability events into files."))]
48#[derive(Clone, Debug)]
49#[serde(deny_unknown_fields)]
50pub struct FileSinkConfig {
51    /// File path to write events to.
52    ///
53    /// Compression format extension must be explicit.
54    #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log"))]
55    #[configurable(metadata(
56        docs::examples = "/tmp/application-{{ application_id }}-%Y-%m-%d.log"
57    ))]
58    #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log.zst"))]
59    pub path: Template,
60
61    /// The amount of time that a file can be idle and stay open.
62    ///
63    /// After not receiving any events in this amount of time, the file is flushed and closed.
64    #[serde(default = "default_idle_timeout")]
65    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66    #[serde(rename = "idle_timeout_secs")]
67    #[configurable(metadata(docs::examples = 600))]
68    #[configurable(metadata(docs::human_name = "Idle Timeout"))]
69    pub idle_timeout: Duration,
70
71    #[serde(flatten)]
72    pub encoding: EncodingConfigWithFraming,
73
74    #[configurable(derived)]
75    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
76    pub compression: Compression,
77
78    #[configurable(derived)]
79    #[serde(
80        default,
81        deserialize_with = "crate::serde::bool_or_struct",
82        skip_serializing_if = "crate::serde::is_default"
83    )]
84    pub acknowledgements: AcknowledgementsConfig,
85
86    #[configurable(derived)]
87    #[serde(default)]
88    pub timezone: Option<TimeZone>,
89
90    #[configurable(derived)]
91    #[serde(default)]
92    pub internal_metrics: FileInternalMetricsConfig,
93}
94
95impl GenerateConfig for FileSinkConfig {
96    fn generate_config() -> toml::Value {
97        toml::Value::try_from(Self {
98            path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(),
99            idle_timeout: default_idle_timeout(),
100            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
101            compression: Default::default(),
102            acknowledgements: Default::default(),
103            timezone: Default::default(),
104            internal_metrics: Default::default(),
105        })
106        .unwrap()
107    }
108}
109
110const fn default_idle_timeout() -> Duration {
111    Duration::from_secs(30)
112}
113
114/// Compression configuration.
115// TODO: Why doesn't this already use `crate::sinks::util::Compression`
116// `crate::sinks::util::Compression` doesn't support zstd yet
117#[configurable_component]
118#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
119#[serde(rename_all = "snake_case")]
120pub enum Compression {
121    /// [Gzip][gzip] compression.
122    ///
123    /// [gzip]: https://www.gzip.org/
124    Gzip,
125
126    /// [Zstandard][zstd] compression.
127    ///
128    /// [zstd]: https://facebook.github.io/zstd/
129    Zstd,
130
131    /// No compression.
132    #[default]
133    None,
134}
135
136enum OutFile {
137    Regular(File),
138    Gzip(GzipEncoder<File>),
139    Zstd(ZstdEncoder<File>),
140}
141
142impl OutFile {
143    fn new(file: File, compression: Compression) -> Self {
144        match compression {
145            Compression::None => OutFile::Regular(file),
146            Compression::Gzip => OutFile::Gzip(GzipEncoder::new(file)),
147            Compression::Zstd => OutFile::Zstd(ZstdEncoder::new(file)),
148        }
149    }
150
151    async fn sync_all(&mut self) -> Result<(), std::io::Error> {
152        match self {
153            OutFile::Regular(file) => file.sync_all().await,
154            OutFile::Gzip(gzip) => gzip.get_mut().sync_all().await,
155            OutFile::Zstd(zstd) => zstd.get_mut().sync_all().await,
156        }
157    }
158
159    async fn shutdown(&mut self) -> Result<(), std::io::Error> {
160        match self {
161            OutFile::Regular(file) => file.shutdown().await,
162            OutFile::Gzip(gzip) => gzip.shutdown().await,
163            OutFile::Zstd(zstd) => zstd.shutdown().await,
164        }
165    }
166
167    async fn write_all(&mut self, src: &[u8]) -> Result<(), std::io::Error> {
168        match self {
169            OutFile::Regular(file) => file.write_all(src).await,
170            OutFile::Gzip(gzip) => gzip.write_all(src).await,
171            OutFile::Zstd(zstd) => zstd.write_all(src).await,
172        }
173    }
174
175    /// Shutdowns by flushing data, writing headers, and syncing all of that
176    /// data and metadata to the filesystem.
177    async fn close(&mut self) -> Result<(), std::io::Error> {
178        self.shutdown().await?;
179        self.sync_all().await
180    }
181}
182
183#[async_trait::async_trait]
184#[typetag::serde(name = "file")]
185impl SinkConfig for FileSinkConfig {
186    async fn build(
187        &self,
188        cx: SinkContext,
189    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
190        let sink = FileSink::new(self, cx)?;
191        Ok((
192            super::VectorSink::from_event_streamsink(sink),
193            future::ok(()).boxed(),
194        ))
195    }
196
197    fn input(&self) -> Input {
198        Input::new(self.encoding.config().1.input_type())
199    }
200
201    fn acknowledgements(&self) -> &AcknowledgementsConfig {
202        &self.acknowledgements
203    }
204}
205
206pub struct FileSink {
207    path: Template,
208    transformer: Transformer,
209    encoder: Encoder<Framer>,
210    idle_timeout: Duration,
211    files: ExpiringHashMap<Bytes, OutFile>,
212    compression: Compression,
213    events_sent: Registered<EventsSent>,
214    include_file_metric_tag: bool,
215}
216
217impl FileSink {
218    pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
219        let transformer = config.encoding.transformer();
220        let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
221        let encoder = Encoder::<Framer>::new(framer, serializer);
222
223        let offset = config
224            .timezone
225            .or(cx.globals.timezone)
226            .and_then(timezone_to_offset);
227
228        Ok(Self {
229            path: config.path.clone().with_tz_offset(offset),
230            transformer,
231            encoder,
232            idle_timeout: config.idle_timeout,
233            files: ExpiringHashMap::default(),
234            compression: config.compression,
235            events_sent: register!(EventsSent::from(Output(None))),
236            include_file_metric_tag: config.internal_metrics.include_file_tag,
237        })
238    }
239
240    /// Uses pass the `event` to `self.path` template to obtain the file path
241    /// to store the event as.
242    fn partition_event(&mut self, event: &Event) -> Option<bytes::Bytes> {
243        let bytes = match self.path.render(event) {
244            Ok(b) => b,
245            Err(error) => {
246                emit!(TemplateRenderingError {
247                    error,
248                    field: Some("path"),
249                    drop_event: true,
250                });
251                return None;
252            }
253        };
254
255        Some(bytes)
256    }
257
258    fn deadline_at(&self) -> Instant {
259        Instant::now()
260            .checked_add(self.idle_timeout)
261            .expect("unable to compute next deadline")
262    }
263
264    async fn run(&mut self, mut input: BoxStream<'_, Event>) -> crate::Result<()> {
265        loop {
266            tokio::select! {
267                event = input.next() => {
268                    match event {
269                        Some(event) => self.process_event(event).await,
270                        None => {
271                            // If we got `None` - terminate the processing.
272                            debug!(message = "Receiver exhausted, terminating the processing loop.");
273
274                            // Close all the open files.
275                            debug!(message = "Closing all the open files.");
276                            for (path, file) in self.files.iter_mut() {
277                                if let Err(error) = file.close().await {
278                                    emit!(FileIoError {
279                                        error,
280                                        code: "failed_closing_file",
281                                        message: "Failed to close file.",
282                                        path,
283                                        dropped_events: 0,
284                                    });
285                                } else{
286                                    trace!(message = "Successfully closed file.", path = ?path);
287                                }
288                            }
289
290                            emit!(FileOpen {
291                                count: 0
292                            });
293
294                            break;
295                        }
296                    }
297                }
298                result = self.files.next_expired(), if !self.files.is_empty() => {
299                    match result {
300                        // We do not poll map when it's empty, so we should
301                        // never reach this branch.
302                        None => unreachable!(),
303                        Some((mut expired_file, path)) => {
304                            // We got an expired file. All we really want is to
305                            // flush and close it.
306                            if let Err(error) = expired_file.close().await {
307                                emit!(FileIoError {
308                                    error,
309                                    code: "failed_closing_file",
310                                    message: "Failed to close file.",
311                                    path: &path,
312                                    dropped_events: 0,
313                                });
314                            }
315                            drop(expired_file); // ignore close error
316                            emit!(FileOpen {
317                                count: self.files.len()
318                            });
319                        }
320                    }
321                }
322            }
323        }
324
325        Ok(())
326    }
327
328    async fn process_event(&mut self, mut event: Event) {
329        let path = match self.partition_event(&event) {
330            Some(path) => path,
331            None => {
332                // We weren't able to find the path to use for the
333                // file.
334                // The error is already handled at `partition_event`, so
335                // here we just skip the event.
336                event.metadata().update_status(EventStatus::Errored);
337                return;
338            }
339        };
340
341        let next_deadline = self.deadline_at();
342        trace!(message = "Computed next deadline.", next_deadline = ?next_deadline, path = ?path);
343
344        let file = if let Some(file) = self.files.reset_at(&path, next_deadline) {
345            trace!(message = "Working with an already opened file.", path = ?path);
346            file
347        } else {
348            trace!(message = "Opening new file.", ?path);
349            let file = match open_file(BytesPath::new(path.clone())).await {
350                Ok(file) => file,
351                Err(error) => {
352                    // We couldn't open the file for this event.
353                    // Maybe other events will work though! Just log
354                    // the error and skip this event.
355                    emit!(FileIoError {
356                        code: "failed_opening_file",
357                        message: "Unable to open the file.",
358                        error,
359                        path: &path,
360                        dropped_events: 1,
361                    });
362                    event.metadata().update_status(EventStatus::Errored);
363                    return;
364                }
365            };
366
367            let outfile = OutFile::new(file, self.compression);
368
369            self.files.insert_at(path.clone(), outfile, next_deadline);
370            emit!(FileOpen {
371                count: self.files.len()
372            });
373            self.files.get_mut(&path).unwrap()
374        };
375
376        trace!(message = "Writing an event to file.", path = ?path);
377        let event_size = event.estimated_json_encoded_size_of();
378        let finalizers = event.take_finalizers();
379        match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await {
380            Ok(byte_size) => {
381                finalizers.update_status(EventStatus::Delivered);
382                self.events_sent.emit(CountByteSize(1, event_size));
383                emit!(FileBytesSent {
384                    byte_size,
385                    file: String::from_utf8_lossy(&path),
386                    include_file_metric_tag: self.include_file_metric_tag,
387                });
388            }
389            Err(error) => {
390                finalizers.update_status(EventStatus::Errored);
391                emit!(FileIoError {
392                    code: "failed_writing_file",
393                    message: "Failed to write the file.",
394                    error,
395                    path: &path,
396                    dropped_events: 1,
397                });
398            }
399        }
400    }
401}
402
403async fn open_file(path: impl AsRef<std::path::Path>) -> std::io::Result<File> {
404    let parent = path.as_ref().parent();
405
406    if let Some(parent) = parent {
407        fs::create_dir_all(parent).await?;
408    }
409
410    fs::OpenOptions::new()
411        .read(false)
412        .write(true)
413        .create(true)
414        .append(true)
415        .open(path)
416        .await
417}
418
419async fn write_event_to_file(
420    file: &mut OutFile,
421    mut event: Event,
422    transformer: &Transformer,
423    encoder: &mut Encoder<Framer>,
424) -> Result<usize, std::io::Error> {
425    transformer.transform(&mut event);
426    let mut buffer = BytesMut::new();
427    encoder
428        .encode(event, &mut buffer)
429        .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))?;
430    file.write_all(&buffer).await.map(|()| buffer.len())
431}
432
433#[async_trait]
434impl StreamSink<Event> for FileSink {
435    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
436        FileSink::run(&mut self, input)
437            .await
438            .expect("file sink error");
439        Ok(())
440    }
441}
442
443#[cfg(test)]
444mod tests {
445    use std::convert::TryInto;
446
447    use chrono::{SubsecRound, Utc};
448    use futures::{SinkExt, stream};
449    use similar_asserts::assert_eq;
450    use vector_lib::{
451        codecs::JsonSerializerConfig,
452        event::{LogEvent, TraceEvent},
453        sink::VectorSink,
454    };
455
456    use super::*;
457    use crate::{
458        config::log_schema,
459        test_util::{
460            components::{FILE_SINK_TAGS, assert_sink_compliance},
461            lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
462            random_lines_with_stream, random_metrics_with_stream,
463            random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
464        },
465    };
466
467    #[test]
468    fn generate_config() {
469        crate::test_util::test_generate_config::<FileSinkConfig>();
470    }
471
472    #[tokio::test]
473    async fn log_single_partition() {
474        let template = temp_file();
475
476        let config = FileSinkConfig {
477            path: template.clone().try_into().unwrap(),
478            idle_timeout: default_idle_timeout(),
479            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
480            compression: Compression::None,
481            acknowledgements: Default::default(),
482            timezone: Default::default(),
483            internal_metrics: FileInternalMetricsConfig {
484                include_file_tag: true,
485            },
486        };
487
488        let (input, _events) = random_lines_with_stream(100, 64, None);
489
490        run_assert_log_sink(&config, input.clone()).await;
491
492        let output = lines_from_file(template);
493        for (input, output) in input.into_iter().zip(output) {
494            assert_eq!(input, output);
495        }
496    }
497
498    #[tokio::test]
499    async fn log_single_partition_gzip() {
500        let template = temp_file();
501
502        let config = FileSinkConfig {
503            path: template.clone().try_into().unwrap(),
504            idle_timeout: default_idle_timeout(),
505            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
506            compression: Compression::Gzip,
507            acknowledgements: Default::default(),
508            timezone: Default::default(),
509            internal_metrics: FileInternalMetricsConfig {
510                include_file_tag: true,
511            },
512        };
513
514        let (input, _) = random_lines_with_stream(100, 64, None);
515
516        run_assert_log_sink(&config, input.clone()).await;
517
518        let output = lines_from_gzip_file(template);
519        for (input, output) in input.into_iter().zip(output) {
520            assert_eq!(input, output);
521        }
522    }
523
524    #[tokio::test]
525    async fn log_single_partition_zstd() {
526        let template = temp_file();
527
528        let config = FileSinkConfig {
529            path: template.clone().try_into().unwrap(),
530            idle_timeout: default_idle_timeout(),
531            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
532            compression: Compression::Zstd,
533            acknowledgements: Default::default(),
534            timezone: Default::default(),
535            internal_metrics: FileInternalMetricsConfig {
536                include_file_tag: true,
537            },
538        };
539
540        let (input, _) = random_lines_with_stream(100, 64, None);
541
542        run_assert_log_sink(&config, input.clone()).await;
543
544        let output = lines_from_zstd_file(template);
545        for (input, output) in input.into_iter().zip(output) {
546            assert_eq!(input, output);
547        }
548    }
549
550    #[tokio::test]
551    async fn log_many_partitions() {
552        let directory = temp_dir();
553
554        let mut template = directory.to_string_lossy().to_string();
555        template.push_str("/{{level}}s-{{date}}.log");
556
557        trace!(message = "Template.", %template);
558
559        let config = FileSinkConfig {
560            path: template.try_into().unwrap(),
561            idle_timeout: default_idle_timeout(),
562            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
563            compression: Compression::None,
564            acknowledgements: Default::default(),
565            timezone: Default::default(),
566            internal_metrics: FileInternalMetricsConfig {
567                include_file_tag: true,
568            },
569        };
570
571        let (mut input, _events) = random_events_with_stream(32, 8, None);
572        input[0].as_mut_log().insert("date", "2019-26-07");
573        input[0].as_mut_log().insert("level", "warning");
574        input[1].as_mut_log().insert("date", "2019-26-07");
575        input[1].as_mut_log().insert("level", "error");
576        input[2].as_mut_log().insert("date", "2019-26-07");
577        input[2].as_mut_log().insert("level", "warning");
578        input[3].as_mut_log().insert("date", "2019-27-07");
579        input[3].as_mut_log().insert("level", "error");
580        input[4].as_mut_log().insert("date", "2019-27-07");
581        input[4].as_mut_log().insert("level", "warning");
582        input[5].as_mut_log().insert("date", "2019-27-07");
583        input[5].as_mut_log().insert("level", "warning");
584        input[6].as_mut_log().insert("date", "2019-28-07");
585        input[6].as_mut_log().insert("level", "warning");
586        input[7].as_mut_log().insert("date", "2019-29-07");
587        input[7].as_mut_log().insert("level", "error");
588
589        run_assert_sink(&config, input.clone().into_iter()).await;
590
591        let output = [
592            lines_from_file(directory.join("warnings-2019-26-07.log")),
593            lines_from_file(directory.join("errors-2019-26-07.log")),
594            lines_from_file(directory.join("warnings-2019-27-07.log")),
595            lines_from_file(directory.join("errors-2019-27-07.log")),
596            lines_from_file(directory.join("warnings-2019-28-07.log")),
597            lines_from_file(directory.join("errors-2019-29-07.log")),
598        ];
599
600        let message_key = log_schema().message_key().unwrap().to_string();
601        assert_eq!(
602            input[0].as_log()[&message_key],
603            From::<&str>::from(&output[0][0])
604        );
605        assert_eq!(
606            input[1].as_log()[&message_key],
607            From::<&str>::from(&output[1][0])
608        );
609        assert_eq!(
610            input[2].as_log()[&message_key],
611            From::<&str>::from(&output[0][1])
612        );
613        assert_eq!(
614            input[3].as_log()[&message_key],
615            From::<&str>::from(&output[3][0])
616        );
617        assert_eq!(
618            input[4].as_log()[&message_key],
619            From::<&str>::from(&output[2][0])
620        );
621        assert_eq!(
622            input[5].as_log()[&message_key],
623            From::<&str>::from(&output[2][1])
624        );
625        assert_eq!(
626            input[6].as_log()[&message_key],
627            From::<&str>::from(&output[4][0])
628        );
629        assert_eq!(
630            input[7].as_log()[message_key],
631            From::<&str>::from(&output[5][0])
632        );
633    }
634
635    #[tokio::test]
636    async fn log_reopening() {
637        trace_init();
638
639        let template = temp_file();
640
641        let config = FileSinkConfig {
642            path: template.clone().try_into().unwrap(),
643            idle_timeout: Duration::from_secs(1),
644            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
645            compression: Compression::None,
646            acknowledgements: Default::default(),
647            timezone: Default::default(),
648            internal_metrics: FileInternalMetricsConfig {
649                include_file_tag: true,
650            },
651        };
652
653        let (mut input, _events) = random_lines_with_stream(10, 64, None);
654
655        let (mut tx, rx) = futures::channel::mpsc::channel(0);
656
657        let sink_handle = tokio::spawn(async move {
658            assert_sink_compliance(&FILE_SINK_TAGS, async move {
659                let sink = FileSink::new(&config, SinkContext::default()).unwrap();
660                VectorSink::from_event_streamsink(sink)
661                    .run(Box::pin(rx.map(Into::into)))
662                    .await
663                    .expect("Running sink failed");
664            })
665            .await
666        });
667
668        // send initial payload
669        for line in input.clone() {
670            tx.send(Event::Log(LogEvent::from(line))).await.unwrap();
671        }
672
673        // wait for file to go idle and be closed
674        tokio::time::sleep(Duration::from_secs(2)).await;
675
676        // trigger another write
677        let last_line = "i should go at the end";
678        tx.send(LogEvent::from(last_line).into()).await.unwrap();
679        input.push(String::from(last_line));
680
681        // wait for another flush
682        tokio::time::sleep(Duration::from_secs(1)).await;
683
684        // make sure we appended instead of overwriting
685        let output = lines_from_file(template);
686        assert_eq!(input, output);
687
688        // make sure sink stops and that it did not panic
689        drop(tx);
690        sink_handle.await.unwrap();
691    }
692
693    #[tokio::test]
694    async fn metric_single_partition() {
695        let template = temp_file();
696
697        let config = FileSinkConfig {
698            path: template.clone().try_into().unwrap(),
699            idle_timeout: default_idle_timeout(),
700            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
701            compression: Compression::None,
702            acknowledgements: Default::default(),
703            timezone: Default::default(),
704            internal_metrics: FileInternalMetricsConfig {
705                include_file_tag: true,
706            },
707        };
708
709        let (input, _events) = random_metrics_with_stream(100, None, None);
710
711        run_assert_sink(&config, input.clone().into_iter()).await;
712
713        let output = lines_from_file(template);
714        for (input, output) in input.into_iter().zip(output) {
715            let metric_name = input.as_metric().name();
716            assert!(output.contains(metric_name));
717        }
718    }
719
720    #[tokio::test]
721    async fn metric_many_partitions() {
722        let directory = temp_dir();
723
724        let format = "%Y-%m-%d-%H-%M-%S";
725        let mut template = directory.to_string_lossy().to_string();
726        template.push_str(&format!("/{format}.log"));
727
728        let config = FileSinkConfig {
729            path: template.try_into().unwrap(),
730            idle_timeout: default_idle_timeout(),
731            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
732            compression: Compression::None,
733            acknowledgements: Default::default(),
734            timezone: Default::default(),
735            internal_metrics: FileInternalMetricsConfig {
736                include_file_tag: true,
737            },
738        };
739
740        let metric_count = 3;
741        let timestamp = Utc::now().trunc_subsecs(3);
742        let timestamp_offset = Duration::from_secs(1);
743
744        let (input, _events) = random_metrics_with_stream_timestamp(
745            metric_count,
746            None,
747            None,
748            timestamp,
749            timestamp_offset,
750        );
751
752        run_assert_sink(&config, input.clone().into_iter()).await;
753
754        let output = (0..metric_count).map(|index| {
755            let expected_timestamp = timestamp + (timestamp_offset * index as u32);
756            let expected_filename =
757                directory.join(format!("{}.log", expected_timestamp.format(format)));
758
759            lines_from_file(expected_filename)
760        });
761        for (input, output) in input.iter().zip(output) {
762            // The format will partition by second and metrics are a second apart.
763            assert_eq!(
764                output.len(),
765                1,
766                "Expected the output file to contain one metric"
767            );
768            let output = &output[0];
769
770            let metric_name = input.as_metric().name();
771            assert!(output.contains(metric_name));
772        }
773    }
774
775    #[tokio::test]
776    async fn trace_single_partition() {
777        let template = temp_file();
778
779        let config = FileSinkConfig {
780            path: template.clone().try_into().unwrap(),
781            idle_timeout: default_idle_timeout(),
782            encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
783            compression: Compression::None,
784            acknowledgements: Default::default(),
785            timezone: Default::default(),
786            internal_metrics: FileInternalMetricsConfig {
787                include_file_tag: true,
788            },
789        };
790
791        let (input, _events) = random_lines_with_stream(100, 64, None);
792
793        run_assert_trace_sink(&config, input.clone()).await;
794
795        let output = lines_from_file(template);
796        for (input, output) in input.iter().zip(output) {
797            assert!(output.contains(input));
798        }
799    }
800
801    async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
802        run_assert_sink(
803            config,
804            events.into_iter().map(LogEvent::from).map(Event::Log),
805        )
806        .await;
807    }
808
809    async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
810        run_assert_sink(
811            config,
812            events
813                .into_iter()
814                .map(LogEvent::from)
815                .map(TraceEvent::from)
816                .map(Event::Trace),
817        )
818        .await;
819    }
820
821    async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
822        assert_sink_compliance(&FILE_SINK_TAGS, async move {
823            let sink = FileSink::new(config, SinkContext::default()).unwrap();
824            VectorSink::from_event_streamsink(sink)
825                .run(Box::pin(stream::iter(events.map(Into::into))))
826                .await
827                .expect("Running sink failed")
828        })
829        .await;
830    }
831}