vector/sinks/file/
mod.rs

1use std::convert::TryFrom;
2use std::time::{Duration, Instant};
3
4use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
5use async_trait::async_trait;
6use bytes::{Bytes, BytesMut};
7use futures::{
8    future,
9    stream::{BoxStream, StreamExt},
10    FutureExt,
11};
12use serde_with::serde_as;
13use tokio::{
14    fs::{self, File},
15    io::AsyncWriteExt,
16};
17use tokio_util::codec::Encoder as _;
18use vector_lib::codecs::{
19    encoding::{Framer, FramingConfig},
20    TextSerializerConfig,
21};
22use vector_lib::configurable::configurable_component;
23use vector_lib::{
24    internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
25    EstimatedJsonEncodedSizeOf, TimeZone,
26};
27
28use crate::{
29    codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
30    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
31    event::{Event, EventStatus, Finalizable},
32    expiring_hash_map::ExpiringHashMap,
33    internal_events::{
34        FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
35    },
36    sinks::util::{timezone_to_offset, StreamSink},
37    template::Template,
38};
39
40mod bytes_path;
41
42use bytes_path::BytesPath;
43
44/// Configuration for the `file` sink.
45#[serde_as]
46#[configurable_component(sink("file", "Output observability events into files."))]
47#[derive(Clone, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct FileSinkConfig {
50    /// File path to write events to.
51    ///
52    /// Compression format extension must be explicit.
53    #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log"))]
54    #[configurable(metadata(
55        docs::examples = "/tmp/application-{{ application_id }}-%Y-%m-%d.log"
56    ))]
57    #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log.zst"))]
58    pub path: Template,
59
60    /// The amount of time that a file can be idle and stay open.
61    ///
62    /// After not receiving any events in this amount of time, the file is flushed and closed.
63    #[serde(default = "default_idle_timeout")]
64    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
65    #[serde(rename = "idle_timeout_secs")]
66    #[configurable(metadata(docs::examples = 600))]
67    #[configurable(metadata(docs::human_name = "Idle Timeout"))]
68    pub idle_timeout: Duration,
69
70    #[serde(flatten)]
71    pub encoding: EncodingConfigWithFraming,
72
73    #[configurable(derived)]
74    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
75    pub compression: Compression,
76
77    #[configurable(derived)]
78    #[serde(
79        default,
80        deserialize_with = "crate::serde::bool_or_struct",
81        skip_serializing_if = "crate::serde::is_default"
82    )]
83    pub acknowledgements: AcknowledgementsConfig,
84
85    #[configurable(derived)]
86    #[serde(default)]
87    pub timezone: Option<TimeZone>,
88
89    #[configurable(derived)]
90    #[serde(default)]
91    pub internal_metrics: FileInternalMetricsConfig,
92}
93
94impl GenerateConfig for FileSinkConfig {
95    fn generate_config() -> toml::Value {
96        toml::Value::try_from(Self {
97            path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(),
98            idle_timeout: default_idle_timeout(),
99            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
100            compression: Default::default(),
101            acknowledgements: Default::default(),
102            timezone: Default::default(),
103            internal_metrics: Default::default(),
104        })
105        .unwrap()
106    }
107}
108
109const fn default_idle_timeout() -> Duration {
110    Duration::from_secs(30)
111}
112
113/// Compression configuration.
114// TODO: Why doesn't this already use `crate::sinks::util::Compression`
115// `crate::sinks::util::Compression` doesn't support zstd yet
116#[configurable_component]
117#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
118#[serde(rename_all = "snake_case")]
119pub enum Compression {
120    /// [Gzip][gzip] compression.
121    ///
122    /// [gzip]: https://www.gzip.org/
123    Gzip,
124
125    /// [Zstandard][zstd] compression.
126    ///
127    /// [zstd]: https://facebook.github.io/zstd/
128    Zstd,
129
130    /// No compression.
131    #[default]
132    None,
133}
134
135enum OutFile {
136    Regular(File),
137    Gzip(GzipEncoder<File>),
138    Zstd(ZstdEncoder<File>),
139}
140
141impl OutFile {
142    fn new(file: File, compression: Compression) -> Self {
143        match compression {
144            Compression::None => OutFile::Regular(file),
145            Compression::Gzip => OutFile::Gzip(GzipEncoder::new(file)),
146            Compression::Zstd => OutFile::Zstd(ZstdEncoder::new(file)),
147        }
148    }
149
150    async fn sync_all(&mut self) -> Result<(), std::io::Error> {
151        match self {
152            OutFile::Regular(file) => file.sync_all().await,
153            OutFile::Gzip(gzip) => gzip.get_mut().sync_all().await,
154            OutFile::Zstd(zstd) => zstd.get_mut().sync_all().await,
155        }
156    }
157
158    async fn shutdown(&mut self) -> Result<(), std::io::Error> {
159        match self {
160            OutFile::Regular(file) => file.shutdown().await,
161            OutFile::Gzip(gzip) => gzip.shutdown().await,
162            OutFile::Zstd(zstd) => zstd.shutdown().await,
163        }
164    }
165
166    async fn write_all(&mut self, src: &[u8]) -> Result<(), std::io::Error> {
167        match self {
168            OutFile::Regular(file) => file.write_all(src).await,
169            OutFile::Gzip(gzip) => gzip.write_all(src).await,
170            OutFile::Zstd(zstd) => zstd.write_all(src).await,
171        }
172    }
173
174    /// Shutdowns by flushing data, writing headers, and syncing all of that
175    /// data and metadata to the filesystem.
176    async fn close(&mut self) -> Result<(), std::io::Error> {
177        self.shutdown().await?;
178        self.sync_all().await
179    }
180}
181
182#[async_trait::async_trait]
183#[typetag::serde(name = "file")]
184impl SinkConfig for FileSinkConfig {
185    async fn build(
186        &self,
187        cx: SinkContext,
188    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
189        let sink = FileSink::new(self, cx)?;
190        Ok((
191            super::VectorSink::from_event_streamsink(sink),
192            future::ok(()).boxed(),
193        ))
194    }
195
196    fn input(&self) -> Input {
197        Input::new(self.encoding.config().1.input_type())
198    }
199
200    fn acknowledgements(&self) -> &AcknowledgementsConfig {
201        &self.acknowledgements
202    }
203}
204
205pub struct FileSink {
206    path: Template,
207    transformer: Transformer,
208    encoder: Encoder<Framer>,
209    idle_timeout: Duration,
210    files: ExpiringHashMap<Bytes, OutFile>,
211    compression: Compression,
212    events_sent: Registered<EventsSent>,
213    include_file_metric_tag: bool,
214}
215
216impl FileSink {
217    pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
218        let transformer = config.encoding.transformer();
219        let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
220        let encoder = Encoder::<Framer>::new(framer, serializer);
221
222        let offset = config
223            .timezone
224            .or(cx.globals.timezone)
225            .and_then(timezone_to_offset);
226
227        Ok(Self {
228            path: config.path.clone().with_tz_offset(offset),
229            transformer,
230            encoder,
231            idle_timeout: config.idle_timeout,
232            files: ExpiringHashMap::default(),
233            compression: config.compression,
234            events_sent: register!(EventsSent::from(Output(None))),
235            include_file_metric_tag: config.internal_metrics.include_file_tag,
236        })
237    }
238
239    /// Uses pass the `event` to `self.path` template to obtain the file path
240    /// to store the event as.
241    fn partition_event(&mut self, event: &Event) -> Option<bytes::Bytes> {
242        let bytes = match self.path.render(event) {
243            Ok(b) => b,
244            Err(error) => {
245                emit!(TemplateRenderingError {
246                    error,
247                    field: Some("path"),
248                    drop_event: true,
249                });
250                return None;
251            }
252        };
253
254        Some(bytes)
255    }
256
257    fn deadline_at(&self) -> Instant {
258        Instant::now()
259            .checked_add(self.idle_timeout)
260            .expect("unable to compute next deadline")
261    }
262
263    async fn run(&mut self, mut input: BoxStream<'_, Event>) -> crate::Result<()> {
264        loop {
265            tokio::select! {
266                event = input.next() => {
267                    match event {
268                        Some(event) => self.process_event(event).await,
269                        None => {
270                            // If we got `None` - terminate the processing.
271                            debug!(message = "Receiver exhausted, terminating the processing loop.");
272
273                            // Close all the open files.
274                            debug!(message = "Closing all the open files.");
275                            for (path, file) in self.files.iter_mut() {
276                                if let Err(error) = file.close().await {
277                                    emit!(FileIoError {
278                                        error,
279                                        code: "failed_closing_file",
280                                        message: "Failed to close file.",
281                                        path,
282                                        dropped_events: 0,
283                                    });
284                                } else{
285                                    trace!(message = "Successfully closed file.", path = ?path);
286                                }
287                            }
288
289                            emit!(FileOpen {
290                                count: 0
291                            });
292
293                            break;
294                        }
295                    }
296                }
297                result = self.files.next_expired(), if !self.files.is_empty() => {
298                    match result {
299                        // We do not poll map when it's empty, so we should
300                        // never reach this branch.
301                        None => unreachable!(),
302                        Some((mut expired_file, path)) => {
303                            // We got an expired file. All we really want is to
304                            // flush and close it.
305                            if let Err(error) = expired_file.close().await {
306                                emit!(FileIoError {
307                                    error,
308                                    code: "failed_closing_file",
309                                    message: "Failed to close file.",
310                                    path: &path,
311                                    dropped_events: 0,
312                                });
313                            }
314                            drop(expired_file); // ignore close error
315                            emit!(FileOpen {
316                                count: self.files.len()
317                            });
318                        }
319                    }
320                }
321            }
322        }
323
324        Ok(())
325    }
326
327    async fn process_event(&mut self, mut event: Event) {
328        let path = match self.partition_event(&event) {
329            Some(path) => path,
330            None => {
331                // We weren't able to find the path to use for the
332                // file.
333                // The error is already handled at `partition_event`, so
334                // here we just skip the event.
335                event.metadata().update_status(EventStatus::Errored);
336                return;
337            }
338        };
339
340        let next_deadline = self.deadline_at();
341        trace!(message = "Computed next deadline.", next_deadline = ?next_deadline, path = ?path);
342
343        let file = if let Some(file) = self.files.reset_at(&path, next_deadline) {
344            trace!(message = "Working with an already opened file.", path = ?path);
345            file
346        } else {
347            trace!(message = "Opening new file.", ?path);
348            let file = match open_file(BytesPath::new(path.clone())).await {
349                Ok(file) => file,
350                Err(error) => {
351                    // We couldn't open the file for this event.
352                    // Maybe other events will work though! Just log
353                    // the error and skip this event.
354                    emit!(FileIoError {
355                        code: "failed_opening_file",
356                        message: "Unable to open the file.",
357                        error,
358                        path: &path,
359                        dropped_events: 1,
360                    });
361                    event.metadata().update_status(EventStatus::Errored);
362                    return;
363                }
364            };
365
366            let outfile = OutFile::new(file, self.compression);
367
368            self.files.insert_at(path.clone(), outfile, next_deadline);
369            emit!(FileOpen {
370                count: self.files.len()
371            });
372            self.files.get_mut(&path).unwrap()
373        };
374
375        trace!(message = "Writing an event to file.", path = ?path);
376        let event_size = event.estimated_json_encoded_size_of();
377        let finalizers = event.take_finalizers();
378        match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await {
379            Ok(byte_size) => {
380                finalizers.update_status(EventStatus::Delivered);
381                self.events_sent.emit(CountByteSize(1, event_size));
382                emit!(FileBytesSent {
383                    byte_size,
384                    file: String::from_utf8_lossy(&path),
385                    include_file_metric_tag: self.include_file_metric_tag,
386                });
387            }
388            Err(error) => {
389                finalizers.update_status(EventStatus::Errored);
390                emit!(FileIoError {
391                    code: "failed_writing_file",
392                    message: "Failed to write the file.",
393                    error,
394                    path: &path,
395                    dropped_events: 1,
396                });
397            }
398        }
399    }
400}
401
402async fn open_file(path: impl AsRef<std::path::Path>) -> std::io::Result<File> {
403    let parent = path.as_ref().parent();
404
405    if let Some(parent) = parent {
406        fs::create_dir_all(parent).await?;
407    }
408
409    fs::OpenOptions::new()
410        .read(false)
411        .write(true)
412        .create(true)
413        .append(true)
414        .open(path)
415        .await
416}
417
418async fn write_event_to_file(
419    file: &mut OutFile,
420    mut event: Event,
421    transformer: &Transformer,
422    encoder: &mut Encoder<Framer>,
423) -> Result<usize, std::io::Error> {
424    transformer.transform(&mut event);
425    let mut buffer = BytesMut::new();
426    encoder
427        .encode(event, &mut buffer)
428        .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))?;
429    file.write_all(&buffer).await.map(|()| buffer.len())
430}
431
432#[async_trait]
433impl StreamSink<Event> for FileSink {
434    async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
435        FileSink::run(&mut self, input)
436            .await
437            .expect("file sink error");
438        Ok(())
439    }
440}
441
442#[cfg(test)]
443mod tests {
444    use std::convert::TryInto;
445
446    use chrono::{SubsecRound, Utc};
447    use futures::{stream, SinkExt};
448    use similar_asserts::assert_eq;
449    use vector_lib::{
450        codecs::JsonSerializerConfig,
451        event::{LogEvent, TraceEvent},
452        sink::VectorSink,
453    };
454
455    use super::*;
456    use crate::{
457        config::log_schema,
458        test_util::{
459            components::{assert_sink_compliance, FILE_SINK_TAGS},
460            lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
461            random_lines_with_stream, random_metrics_with_stream,
462            random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
463        },
464    };
465
466    #[test]
467    fn generate_config() {
468        crate::test_util::test_generate_config::<FileSinkConfig>();
469    }
470
471    #[tokio::test]
472    async fn log_single_partition() {
473        let template = temp_file();
474
475        let config = FileSinkConfig {
476            path: template.clone().try_into().unwrap(),
477            idle_timeout: default_idle_timeout(),
478            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
479            compression: Compression::None,
480            acknowledgements: Default::default(),
481            timezone: Default::default(),
482            internal_metrics: FileInternalMetricsConfig {
483                include_file_tag: true,
484            },
485        };
486
487        let (input, _events) = random_lines_with_stream(100, 64, None);
488
489        run_assert_log_sink(&config, input.clone()).await;
490
491        let output = lines_from_file(template);
492        for (input, output) in input.into_iter().zip(output) {
493            assert_eq!(input, output);
494        }
495    }
496
497    #[tokio::test]
498    async fn log_single_partition_gzip() {
499        let template = temp_file();
500
501        let config = FileSinkConfig {
502            path: template.clone().try_into().unwrap(),
503            idle_timeout: default_idle_timeout(),
504            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
505            compression: Compression::Gzip,
506            acknowledgements: Default::default(),
507            timezone: Default::default(),
508            internal_metrics: FileInternalMetricsConfig {
509                include_file_tag: true,
510            },
511        };
512
513        let (input, _) = random_lines_with_stream(100, 64, None);
514
515        run_assert_log_sink(&config, input.clone()).await;
516
517        let output = lines_from_gzip_file(template);
518        for (input, output) in input.into_iter().zip(output) {
519            assert_eq!(input, output);
520        }
521    }
522
523    #[tokio::test]
524    async fn log_single_partition_zstd() {
525        let template = temp_file();
526
527        let config = FileSinkConfig {
528            path: template.clone().try_into().unwrap(),
529            idle_timeout: default_idle_timeout(),
530            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
531            compression: Compression::Zstd,
532            acknowledgements: Default::default(),
533            timezone: Default::default(),
534            internal_metrics: FileInternalMetricsConfig {
535                include_file_tag: true,
536            },
537        };
538
539        let (input, _) = random_lines_with_stream(100, 64, None);
540
541        run_assert_log_sink(&config, input.clone()).await;
542
543        let output = lines_from_zstd_file(template);
544        for (input, output) in input.into_iter().zip(output) {
545            assert_eq!(input, output);
546        }
547    }
548
549    #[tokio::test]
550    async fn log_many_partitions() {
551        let directory = temp_dir();
552
553        let mut template = directory.to_string_lossy().to_string();
554        template.push_str("/{{level}}s-{{date}}.log");
555
556        trace!(message = "Template.", %template);
557
558        let config = FileSinkConfig {
559            path: template.try_into().unwrap(),
560            idle_timeout: default_idle_timeout(),
561            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
562            compression: Compression::None,
563            acknowledgements: Default::default(),
564            timezone: Default::default(),
565            internal_metrics: FileInternalMetricsConfig {
566                include_file_tag: true,
567            },
568        };
569
570        let (mut input, _events) = random_events_with_stream(32, 8, None);
571        input[0].as_mut_log().insert("date", "2019-26-07");
572        input[0].as_mut_log().insert("level", "warning");
573        input[1].as_mut_log().insert("date", "2019-26-07");
574        input[1].as_mut_log().insert("level", "error");
575        input[2].as_mut_log().insert("date", "2019-26-07");
576        input[2].as_mut_log().insert("level", "warning");
577        input[3].as_mut_log().insert("date", "2019-27-07");
578        input[3].as_mut_log().insert("level", "error");
579        input[4].as_mut_log().insert("date", "2019-27-07");
580        input[4].as_mut_log().insert("level", "warning");
581        input[5].as_mut_log().insert("date", "2019-27-07");
582        input[5].as_mut_log().insert("level", "warning");
583        input[6].as_mut_log().insert("date", "2019-28-07");
584        input[6].as_mut_log().insert("level", "warning");
585        input[7].as_mut_log().insert("date", "2019-29-07");
586        input[7].as_mut_log().insert("level", "error");
587
588        run_assert_sink(&config, input.clone().into_iter()).await;
589
590        let output = [
591            lines_from_file(directory.join("warnings-2019-26-07.log")),
592            lines_from_file(directory.join("errors-2019-26-07.log")),
593            lines_from_file(directory.join("warnings-2019-27-07.log")),
594            lines_from_file(directory.join("errors-2019-27-07.log")),
595            lines_from_file(directory.join("warnings-2019-28-07.log")),
596            lines_from_file(directory.join("errors-2019-29-07.log")),
597        ];
598
599        let message_key = log_schema().message_key().unwrap().to_string();
600        assert_eq!(
601            input[0].as_log()[&message_key],
602            From::<&str>::from(&output[0][0])
603        );
604        assert_eq!(
605            input[1].as_log()[&message_key],
606            From::<&str>::from(&output[1][0])
607        );
608        assert_eq!(
609            input[2].as_log()[&message_key],
610            From::<&str>::from(&output[0][1])
611        );
612        assert_eq!(
613            input[3].as_log()[&message_key],
614            From::<&str>::from(&output[3][0])
615        );
616        assert_eq!(
617            input[4].as_log()[&message_key],
618            From::<&str>::from(&output[2][0])
619        );
620        assert_eq!(
621            input[5].as_log()[&message_key],
622            From::<&str>::from(&output[2][1])
623        );
624        assert_eq!(
625            input[6].as_log()[&message_key],
626            From::<&str>::from(&output[4][0])
627        );
628        assert_eq!(
629            input[7].as_log()[message_key],
630            From::<&str>::from(&output[5][0])
631        );
632    }
633
634    #[tokio::test]
635    async fn log_reopening() {
636        trace_init();
637
638        let template = temp_file();
639
640        let config = FileSinkConfig {
641            path: template.clone().try_into().unwrap(),
642            idle_timeout: Duration::from_secs(1),
643            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
644            compression: Compression::None,
645            acknowledgements: Default::default(),
646            timezone: Default::default(),
647            internal_metrics: FileInternalMetricsConfig {
648                include_file_tag: true,
649            },
650        };
651
652        let (mut input, _events) = random_lines_with_stream(10, 64, None);
653
654        let (mut tx, rx) = futures::channel::mpsc::channel(0);
655
656        let sink_handle = tokio::spawn(async move {
657            assert_sink_compliance(&FILE_SINK_TAGS, async move {
658                let sink = FileSink::new(&config, SinkContext::default()).unwrap();
659                VectorSink::from_event_streamsink(sink)
660                    .run(Box::pin(rx.map(Into::into)))
661                    .await
662                    .expect("Running sink failed");
663            })
664            .await
665        });
666
667        // send initial payload
668        for line in input.clone() {
669            tx.send(Event::Log(LogEvent::from(line))).await.unwrap();
670        }
671
672        // wait for file to go idle and be closed
673        tokio::time::sleep(Duration::from_secs(2)).await;
674
675        // trigger another write
676        let last_line = "i should go at the end";
677        tx.send(LogEvent::from(last_line).into()).await.unwrap();
678        input.push(String::from(last_line));
679
680        // wait for another flush
681        tokio::time::sleep(Duration::from_secs(1)).await;
682
683        // make sure we appended instead of overwriting
684        let output = lines_from_file(template);
685        assert_eq!(input, output);
686
687        // make sure sink stops and that it did not panic
688        drop(tx);
689        sink_handle.await.unwrap();
690    }
691
692    #[tokio::test]
693    async fn metric_single_partition() {
694        let template = temp_file();
695
696        let config = FileSinkConfig {
697            path: template.clone().try_into().unwrap(),
698            idle_timeout: default_idle_timeout(),
699            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
700            compression: Compression::None,
701            acknowledgements: Default::default(),
702            timezone: Default::default(),
703            internal_metrics: FileInternalMetricsConfig {
704                include_file_tag: true,
705            },
706        };
707
708        let (input, _events) = random_metrics_with_stream(100, None, None);
709
710        run_assert_sink(&config, input.clone().into_iter()).await;
711
712        let output = lines_from_file(template);
713        for (input, output) in input.into_iter().zip(output) {
714            let metric_name = input.as_metric().name();
715            assert!(output.contains(metric_name));
716        }
717    }
718
719    #[tokio::test]
720    async fn metric_many_partitions() {
721        let directory = temp_dir();
722
723        let format = "%Y-%m-%d-%H-%M-%S";
724        let mut template = directory.to_string_lossy().to_string();
725        template.push_str(&format!("/{format}.log"));
726
727        let config = FileSinkConfig {
728            path: template.try_into().unwrap(),
729            idle_timeout: default_idle_timeout(),
730            encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
731            compression: Compression::None,
732            acknowledgements: Default::default(),
733            timezone: Default::default(),
734            internal_metrics: FileInternalMetricsConfig {
735                include_file_tag: true,
736            },
737        };
738
739        let metric_count = 3;
740        let timestamp = Utc::now().trunc_subsecs(3);
741        let timestamp_offset = Duration::from_secs(1);
742
743        let (input, _events) = random_metrics_with_stream_timestamp(
744            metric_count,
745            None,
746            None,
747            timestamp,
748            timestamp_offset,
749        );
750
751        run_assert_sink(&config, input.clone().into_iter()).await;
752
753        let output = (0..metric_count).map(|index| {
754            let expected_timestamp = timestamp + (timestamp_offset * index as u32);
755            let expected_filename =
756                directory.join(format!("{}.log", expected_timestamp.format(format)));
757
758            lines_from_file(expected_filename)
759        });
760        for (input, output) in input.iter().zip(output) {
761            // The format will partition by second and metrics are a second apart.
762            assert_eq!(
763                output.len(),
764                1,
765                "Expected the output file to contain one metric"
766            );
767            let output = &output[0];
768
769            let metric_name = input.as_metric().name();
770            assert!(output.contains(metric_name));
771        }
772    }
773
774    #[tokio::test]
775    async fn trace_single_partition() {
776        let template = temp_file();
777
778        let config = FileSinkConfig {
779            path: template.clone().try_into().unwrap(),
780            idle_timeout: default_idle_timeout(),
781            encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
782            compression: Compression::None,
783            acknowledgements: Default::default(),
784            timezone: Default::default(),
785            internal_metrics: FileInternalMetricsConfig {
786                include_file_tag: true,
787            },
788        };
789
790        let (input, _events) = random_lines_with_stream(100, 64, None);
791
792        run_assert_trace_sink(&config, input.clone()).await;
793
794        let output = lines_from_file(template);
795        for (input, output) in input.iter().zip(output) {
796            assert!(output.contains(input));
797        }
798    }
799
800    async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
801        run_assert_sink(
802            config,
803            events.into_iter().map(LogEvent::from).map(Event::Log),
804        )
805        .await;
806    }
807
808    async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
809        run_assert_sink(
810            config,
811            events
812                .into_iter()
813                .map(LogEvent::from)
814                .map(TraceEvent::from)
815                .map(Event::Trace),
816        )
817        .await;
818    }
819
820    async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
821        assert_sink_compliance(&FILE_SINK_TAGS, async move {
822            let sink = FileSink::new(config, SinkContext::default()).unwrap();
823            VectorSink::from_event_streamsink(sink)
824                .run(Box::pin(stream::iter(events.map(Into::into))))
825                .await
826                .expect("Running sink failed")
827        })
828        .await;
829    }
830}