1use std::convert::TryFrom;
2use std::time::{Duration, Instant};
3
4use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
5use async_trait::async_trait;
6use bytes::{Bytes, BytesMut};
7use futures::{
8 future,
9 stream::{BoxStream, StreamExt},
10 FutureExt,
11};
12use serde_with::serde_as;
13use tokio::{
14 fs::{self, File},
15 io::AsyncWriteExt,
16};
17use tokio_util::codec::Encoder as _;
18use vector_lib::codecs::{
19 encoding::{Framer, FramingConfig},
20 TextSerializerConfig,
21};
22use vector_lib::configurable::configurable_component;
23use vector_lib::{
24 internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
25 EstimatedJsonEncodedSizeOf, TimeZone,
26};
27
28use crate::{
29 codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
30 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
31 event::{Event, EventStatus, Finalizable},
32 expiring_hash_map::ExpiringHashMap,
33 internal_events::{
34 FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
35 },
36 sinks::util::{timezone_to_offset, StreamSink},
37 template::Template,
38};
39
40mod bytes_path;
41
42use bytes_path::BytesPath;
43
44#[serde_as]
46#[configurable_component(sink("file", "Output observability events into files."))]
47#[derive(Clone, Debug)]
48#[serde(deny_unknown_fields)]
49pub struct FileSinkConfig {
50 #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log"))]
54 #[configurable(metadata(
55 docs::examples = "/tmp/application-{{ application_id }}-%Y-%m-%d.log"
56 ))]
57 #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log.zst"))]
58 pub path: Template,
59
60 #[serde(default = "default_idle_timeout")]
64 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
65 #[serde(rename = "idle_timeout_secs")]
66 #[configurable(metadata(docs::examples = 600))]
67 #[configurable(metadata(docs::human_name = "Idle Timeout"))]
68 pub idle_timeout: Duration,
69
70 #[serde(flatten)]
71 pub encoding: EncodingConfigWithFraming,
72
73 #[configurable(derived)]
74 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
75 pub compression: Compression,
76
77 #[configurable(derived)]
78 #[serde(
79 default,
80 deserialize_with = "crate::serde::bool_or_struct",
81 skip_serializing_if = "crate::serde::is_default"
82 )]
83 pub acknowledgements: AcknowledgementsConfig,
84
85 #[configurable(derived)]
86 #[serde(default)]
87 pub timezone: Option<TimeZone>,
88
89 #[configurable(derived)]
90 #[serde(default)]
91 pub internal_metrics: FileInternalMetricsConfig,
92}
93
94impl GenerateConfig for FileSinkConfig {
95 fn generate_config() -> toml::Value {
96 toml::Value::try_from(Self {
97 path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(),
98 idle_timeout: default_idle_timeout(),
99 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
100 compression: Default::default(),
101 acknowledgements: Default::default(),
102 timezone: Default::default(),
103 internal_metrics: Default::default(),
104 })
105 .unwrap()
106 }
107}
108
109const fn default_idle_timeout() -> Duration {
110 Duration::from_secs(30)
111}
112
113#[configurable_component]
117#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
118#[serde(rename_all = "snake_case")]
119pub enum Compression {
120 Gzip,
124
125 Zstd,
129
130 #[default]
132 None,
133}
134
135enum OutFile {
136 Regular(File),
137 Gzip(GzipEncoder<File>),
138 Zstd(ZstdEncoder<File>),
139}
140
141impl OutFile {
142 fn new(file: File, compression: Compression) -> Self {
143 match compression {
144 Compression::None => OutFile::Regular(file),
145 Compression::Gzip => OutFile::Gzip(GzipEncoder::new(file)),
146 Compression::Zstd => OutFile::Zstd(ZstdEncoder::new(file)),
147 }
148 }
149
150 async fn sync_all(&mut self) -> Result<(), std::io::Error> {
151 match self {
152 OutFile::Regular(file) => file.sync_all().await,
153 OutFile::Gzip(gzip) => gzip.get_mut().sync_all().await,
154 OutFile::Zstd(zstd) => zstd.get_mut().sync_all().await,
155 }
156 }
157
158 async fn shutdown(&mut self) -> Result<(), std::io::Error> {
159 match self {
160 OutFile::Regular(file) => file.shutdown().await,
161 OutFile::Gzip(gzip) => gzip.shutdown().await,
162 OutFile::Zstd(zstd) => zstd.shutdown().await,
163 }
164 }
165
166 async fn write_all(&mut self, src: &[u8]) -> Result<(), std::io::Error> {
167 match self {
168 OutFile::Regular(file) => file.write_all(src).await,
169 OutFile::Gzip(gzip) => gzip.write_all(src).await,
170 OutFile::Zstd(zstd) => zstd.write_all(src).await,
171 }
172 }
173
174 async fn close(&mut self) -> Result<(), std::io::Error> {
177 self.shutdown().await?;
178 self.sync_all().await
179 }
180}
181
182#[async_trait::async_trait]
183#[typetag::serde(name = "file")]
184impl SinkConfig for FileSinkConfig {
185 async fn build(
186 &self,
187 cx: SinkContext,
188 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
189 let sink = FileSink::new(self, cx)?;
190 Ok((
191 super::VectorSink::from_event_streamsink(sink),
192 future::ok(()).boxed(),
193 ))
194 }
195
196 fn input(&self) -> Input {
197 Input::new(self.encoding.config().1.input_type())
198 }
199
200 fn acknowledgements(&self) -> &AcknowledgementsConfig {
201 &self.acknowledgements
202 }
203}
204
205pub struct FileSink {
206 path: Template,
207 transformer: Transformer,
208 encoder: Encoder<Framer>,
209 idle_timeout: Duration,
210 files: ExpiringHashMap<Bytes, OutFile>,
211 compression: Compression,
212 events_sent: Registered<EventsSent>,
213 include_file_metric_tag: bool,
214}
215
216impl FileSink {
217 pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
218 let transformer = config.encoding.transformer();
219 let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
220 let encoder = Encoder::<Framer>::new(framer, serializer);
221
222 let offset = config
223 .timezone
224 .or(cx.globals.timezone)
225 .and_then(timezone_to_offset);
226
227 Ok(Self {
228 path: config.path.clone().with_tz_offset(offset),
229 transformer,
230 encoder,
231 idle_timeout: config.idle_timeout,
232 files: ExpiringHashMap::default(),
233 compression: config.compression,
234 events_sent: register!(EventsSent::from(Output(None))),
235 include_file_metric_tag: config.internal_metrics.include_file_tag,
236 })
237 }
238
239 fn partition_event(&mut self, event: &Event) -> Option<bytes::Bytes> {
242 let bytes = match self.path.render(event) {
243 Ok(b) => b,
244 Err(error) => {
245 emit!(TemplateRenderingError {
246 error,
247 field: Some("path"),
248 drop_event: true,
249 });
250 return None;
251 }
252 };
253
254 Some(bytes)
255 }
256
257 fn deadline_at(&self) -> Instant {
258 Instant::now()
259 .checked_add(self.idle_timeout)
260 .expect("unable to compute next deadline")
261 }
262
263 async fn run(&mut self, mut input: BoxStream<'_, Event>) -> crate::Result<()> {
264 loop {
265 tokio::select! {
266 event = input.next() => {
267 match event {
268 Some(event) => self.process_event(event).await,
269 None => {
270 debug!(message = "Receiver exhausted, terminating the processing loop.");
272
273 debug!(message = "Closing all the open files.");
275 for (path, file) in self.files.iter_mut() {
276 if let Err(error) = file.close().await {
277 emit!(FileIoError {
278 error,
279 code: "failed_closing_file",
280 message: "Failed to close file.",
281 path,
282 dropped_events: 0,
283 });
284 } else{
285 trace!(message = "Successfully closed file.", path = ?path);
286 }
287 }
288
289 emit!(FileOpen {
290 count: 0
291 });
292
293 break;
294 }
295 }
296 }
297 result = self.files.next_expired(), if !self.files.is_empty() => {
298 match result {
299 None => unreachable!(),
302 Some((mut expired_file, path)) => {
303 if let Err(error) = expired_file.close().await {
306 emit!(FileIoError {
307 error,
308 code: "failed_closing_file",
309 message: "Failed to close file.",
310 path: &path,
311 dropped_events: 0,
312 });
313 }
314 drop(expired_file); emit!(FileOpen {
316 count: self.files.len()
317 });
318 }
319 }
320 }
321 }
322 }
323
324 Ok(())
325 }
326
327 async fn process_event(&mut self, mut event: Event) {
328 let path = match self.partition_event(&event) {
329 Some(path) => path,
330 None => {
331 event.metadata().update_status(EventStatus::Errored);
336 return;
337 }
338 };
339
340 let next_deadline = self.deadline_at();
341 trace!(message = "Computed next deadline.", next_deadline = ?next_deadline, path = ?path);
342
343 let file = if let Some(file) = self.files.reset_at(&path, next_deadline) {
344 trace!(message = "Working with an already opened file.", path = ?path);
345 file
346 } else {
347 trace!(message = "Opening new file.", ?path);
348 let file = match open_file(BytesPath::new(path.clone())).await {
349 Ok(file) => file,
350 Err(error) => {
351 emit!(FileIoError {
355 code: "failed_opening_file",
356 message: "Unable to open the file.",
357 error,
358 path: &path,
359 dropped_events: 1,
360 });
361 event.metadata().update_status(EventStatus::Errored);
362 return;
363 }
364 };
365
366 let outfile = OutFile::new(file, self.compression);
367
368 self.files.insert_at(path.clone(), outfile, next_deadline);
369 emit!(FileOpen {
370 count: self.files.len()
371 });
372 self.files.get_mut(&path).unwrap()
373 };
374
375 trace!(message = "Writing an event to file.", path = ?path);
376 let event_size = event.estimated_json_encoded_size_of();
377 let finalizers = event.take_finalizers();
378 match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await {
379 Ok(byte_size) => {
380 finalizers.update_status(EventStatus::Delivered);
381 self.events_sent.emit(CountByteSize(1, event_size));
382 emit!(FileBytesSent {
383 byte_size,
384 file: String::from_utf8_lossy(&path),
385 include_file_metric_tag: self.include_file_metric_tag,
386 });
387 }
388 Err(error) => {
389 finalizers.update_status(EventStatus::Errored);
390 emit!(FileIoError {
391 code: "failed_writing_file",
392 message: "Failed to write the file.",
393 error,
394 path: &path,
395 dropped_events: 1,
396 });
397 }
398 }
399 }
400}
401
402async fn open_file(path: impl AsRef<std::path::Path>) -> std::io::Result<File> {
403 let parent = path.as_ref().parent();
404
405 if let Some(parent) = parent {
406 fs::create_dir_all(parent).await?;
407 }
408
409 fs::OpenOptions::new()
410 .read(false)
411 .write(true)
412 .create(true)
413 .append(true)
414 .open(path)
415 .await
416}
417
418async fn write_event_to_file(
419 file: &mut OutFile,
420 mut event: Event,
421 transformer: &Transformer,
422 encoder: &mut Encoder<Framer>,
423) -> Result<usize, std::io::Error> {
424 transformer.transform(&mut event);
425 let mut buffer = BytesMut::new();
426 encoder
427 .encode(event, &mut buffer)
428 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))?;
429 file.write_all(&buffer).await.map(|()| buffer.len())
430}
431
432#[async_trait]
433impl StreamSink<Event> for FileSink {
434 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
435 FileSink::run(&mut self, input)
436 .await
437 .expect("file sink error");
438 Ok(())
439 }
440}
441
442#[cfg(test)]
443mod tests {
444 use std::convert::TryInto;
445
446 use chrono::{SubsecRound, Utc};
447 use futures::{stream, SinkExt};
448 use similar_asserts::assert_eq;
449 use vector_lib::{
450 codecs::JsonSerializerConfig,
451 event::{LogEvent, TraceEvent},
452 sink::VectorSink,
453 };
454
455 use super::*;
456 use crate::{
457 config::log_schema,
458 test_util::{
459 components::{assert_sink_compliance, FILE_SINK_TAGS},
460 lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
461 random_lines_with_stream, random_metrics_with_stream,
462 random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
463 },
464 };
465
466 #[test]
467 fn generate_config() {
468 crate::test_util::test_generate_config::<FileSinkConfig>();
469 }
470
471 #[tokio::test]
472 async fn log_single_partition() {
473 let template = temp_file();
474
475 let config = FileSinkConfig {
476 path: template.clone().try_into().unwrap(),
477 idle_timeout: default_idle_timeout(),
478 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
479 compression: Compression::None,
480 acknowledgements: Default::default(),
481 timezone: Default::default(),
482 internal_metrics: FileInternalMetricsConfig {
483 include_file_tag: true,
484 },
485 };
486
487 let (input, _events) = random_lines_with_stream(100, 64, None);
488
489 run_assert_log_sink(&config, input.clone()).await;
490
491 let output = lines_from_file(template);
492 for (input, output) in input.into_iter().zip(output) {
493 assert_eq!(input, output);
494 }
495 }
496
497 #[tokio::test]
498 async fn log_single_partition_gzip() {
499 let template = temp_file();
500
501 let config = FileSinkConfig {
502 path: template.clone().try_into().unwrap(),
503 idle_timeout: default_idle_timeout(),
504 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
505 compression: Compression::Gzip,
506 acknowledgements: Default::default(),
507 timezone: Default::default(),
508 internal_metrics: FileInternalMetricsConfig {
509 include_file_tag: true,
510 },
511 };
512
513 let (input, _) = random_lines_with_stream(100, 64, None);
514
515 run_assert_log_sink(&config, input.clone()).await;
516
517 let output = lines_from_gzip_file(template);
518 for (input, output) in input.into_iter().zip(output) {
519 assert_eq!(input, output);
520 }
521 }
522
523 #[tokio::test]
524 async fn log_single_partition_zstd() {
525 let template = temp_file();
526
527 let config = FileSinkConfig {
528 path: template.clone().try_into().unwrap(),
529 idle_timeout: default_idle_timeout(),
530 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
531 compression: Compression::Zstd,
532 acknowledgements: Default::default(),
533 timezone: Default::default(),
534 internal_metrics: FileInternalMetricsConfig {
535 include_file_tag: true,
536 },
537 };
538
539 let (input, _) = random_lines_with_stream(100, 64, None);
540
541 run_assert_log_sink(&config, input.clone()).await;
542
543 let output = lines_from_zstd_file(template);
544 for (input, output) in input.into_iter().zip(output) {
545 assert_eq!(input, output);
546 }
547 }
548
549 #[tokio::test]
550 async fn log_many_partitions() {
551 let directory = temp_dir();
552
553 let mut template = directory.to_string_lossy().to_string();
554 template.push_str("/{{level}}s-{{date}}.log");
555
556 trace!(message = "Template.", %template);
557
558 let config = FileSinkConfig {
559 path: template.try_into().unwrap(),
560 idle_timeout: default_idle_timeout(),
561 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
562 compression: Compression::None,
563 acknowledgements: Default::default(),
564 timezone: Default::default(),
565 internal_metrics: FileInternalMetricsConfig {
566 include_file_tag: true,
567 },
568 };
569
570 let (mut input, _events) = random_events_with_stream(32, 8, None);
571 input[0].as_mut_log().insert("date", "2019-26-07");
572 input[0].as_mut_log().insert("level", "warning");
573 input[1].as_mut_log().insert("date", "2019-26-07");
574 input[1].as_mut_log().insert("level", "error");
575 input[2].as_mut_log().insert("date", "2019-26-07");
576 input[2].as_mut_log().insert("level", "warning");
577 input[3].as_mut_log().insert("date", "2019-27-07");
578 input[3].as_mut_log().insert("level", "error");
579 input[4].as_mut_log().insert("date", "2019-27-07");
580 input[4].as_mut_log().insert("level", "warning");
581 input[5].as_mut_log().insert("date", "2019-27-07");
582 input[5].as_mut_log().insert("level", "warning");
583 input[6].as_mut_log().insert("date", "2019-28-07");
584 input[6].as_mut_log().insert("level", "warning");
585 input[7].as_mut_log().insert("date", "2019-29-07");
586 input[7].as_mut_log().insert("level", "error");
587
588 run_assert_sink(&config, input.clone().into_iter()).await;
589
590 let output = [
591 lines_from_file(directory.join("warnings-2019-26-07.log")),
592 lines_from_file(directory.join("errors-2019-26-07.log")),
593 lines_from_file(directory.join("warnings-2019-27-07.log")),
594 lines_from_file(directory.join("errors-2019-27-07.log")),
595 lines_from_file(directory.join("warnings-2019-28-07.log")),
596 lines_from_file(directory.join("errors-2019-29-07.log")),
597 ];
598
599 let message_key = log_schema().message_key().unwrap().to_string();
600 assert_eq!(
601 input[0].as_log()[&message_key],
602 From::<&str>::from(&output[0][0])
603 );
604 assert_eq!(
605 input[1].as_log()[&message_key],
606 From::<&str>::from(&output[1][0])
607 );
608 assert_eq!(
609 input[2].as_log()[&message_key],
610 From::<&str>::from(&output[0][1])
611 );
612 assert_eq!(
613 input[3].as_log()[&message_key],
614 From::<&str>::from(&output[3][0])
615 );
616 assert_eq!(
617 input[4].as_log()[&message_key],
618 From::<&str>::from(&output[2][0])
619 );
620 assert_eq!(
621 input[5].as_log()[&message_key],
622 From::<&str>::from(&output[2][1])
623 );
624 assert_eq!(
625 input[6].as_log()[&message_key],
626 From::<&str>::from(&output[4][0])
627 );
628 assert_eq!(
629 input[7].as_log()[message_key],
630 From::<&str>::from(&output[5][0])
631 );
632 }
633
634 #[tokio::test]
635 async fn log_reopening() {
636 trace_init();
637
638 let template = temp_file();
639
640 let config = FileSinkConfig {
641 path: template.clone().try_into().unwrap(),
642 idle_timeout: Duration::from_secs(1),
643 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
644 compression: Compression::None,
645 acknowledgements: Default::default(),
646 timezone: Default::default(),
647 internal_metrics: FileInternalMetricsConfig {
648 include_file_tag: true,
649 },
650 };
651
652 let (mut input, _events) = random_lines_with_stream(10, 64, None);
653
654 let (mut tx, rx) = futures::channel::mpsc::channel(0);
655
656 let sink_handle = tokio::spawn(async move {
657 assert_sink_compliance(&FILE_SINK_TAGS, async move {
658 let sink = FileSink::new(&config, SinkContext::default()).unwrap();
659 VectorSink::from_event_streamsink(sink)
660 .run(Box::pin(rx.map(Into::into)))
661 .await
662 .expect("Running sink failed");
663 })
664 .await
665 });
666
667 for line in input.clone() {
669 tx.send(Event::Log(LogEvent::from(line))).await.unwrap();
670 }
671
672 tokio::time::sleep(Duration::from_secs(2)).await;
674
675 let last_line = "i should go at the end";
677 tx.send(LogEvent::from(last_line).into()).await.unwrap();
678 input.push(String::from(last_line));
679
680 tokio::time::sleep(Duration::from_secs(1)).await;
682
683 let output = lines_from_file(template);
685 assert_eq!(input, output);
686
687 drop(tx);
689 sink_handle.await.unwrap();
690 }
691
692 #[tokio::test]
693 async fn metric_single_partition() {
694 let template = temp_file();
695
696 let config = FileSinkConfig {
697 path: template.clone().try_into().unwrap(),
698 idle_timeout: default_idle_timeout(),
699 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
700 compression: Compression::None,
701 acknowledgements: Default::default(),
702 timezone: Default::default(),
703 internal_metrics: FileInternalMetricsConfig {
704 include_file_tag: true,
705 },
706 };
707
708 let (input, _events) = random_metrics_with_stream(100, None, None);
709
710 run_assert_sink(&config, input.clone().into_iter()).await;
711
712 let output = lines_from_file(template);
713 for (input, output) in input.into_iter().zip(output) {
714 let metric_name = input.as_metric().name();
715 assert!(output.contains(metric_name));
716 }
717 }
718
719 #[tokio::test]
720 async fn metric_many_partitions() {
721 let directory = temp_dir();
722
723 let format = "%Y-%m-%d-%H-%M-%S";
724 let mut template = directory.to_string_lossy().to_string();
725 template.push_str(&format!("/{format}.log"));
726
727 let config = FileSinkConfig {
728 path: template.try_into().unwrap(),
729 idle_timeout: default_idle_timeout(),
730 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
731 compression: Compression::None,
732 acknowledgements: Default::default(),
733 timezone: Default::default(),
734 internal_metrics: FileInternalMetricsConfig {
735 include_file_tag: true,
736 },
737 };
738
739 let metric_count = 3;
740 let timestamp = Utc::now().trunc_subsecs(3);
741 let timestamp_offset = Duration::from_secs(1);
742
743 let (input, _events) = random_metrics_with_stream_timestamp(
744 metric_count,
745 None,
746 None,
747 timestamp,
748 timestamp_offset,
749 );
750
751 run_assert_sink(&config, input.clone().into_iter()).await;
752
753 let output = (0..metric_count).map(|index| {
754 let expected_timestamp = timestamp + (timestamp_offset * index as u32);
755 let expected_filename =
756 directory.join(format!("{}.log", expected_timestamp.format(format)));
757
758 lines_from_file(expected_filename)
759 });
760 for (input, output) in input.iter().zip(output) {
761 assert_eq!(
763 output.len(),
764 1,
765 "Expected the output file to contain one metric"
766 );
767 let output = &output[0];
768
769 let metric_name = input.as_metric().name();
770 assert!(output.contains(metric_name));
771 }
772 }
773
774 #[tokio::test]
775 async fn trace_single_partition() {
776 let template = temp_file();
777
778 let config = FileSinkConfig {
779 path: template.clone().try_into().unwrap(),
780 idle_timeout: default_idle_timeout(),
781 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
782 compression: Compression::None,
783 acknowledgements: Default::default(),
784 timezone: Default::default(),
785 internal_metrics: FileInternalMetricsConfig {
786 include_file_tag: true,
787 },
788 };
789
790 let (input, _events) = random_lines_with_stream(100, 64, None);
791
792 run_assert_trace_sink(&config, input.clone()).await;
793
794 let output = lines_from_file(template);
795 for (input, output) in input.iter().zip(output) {
796 assert!(output.contains(input));
797 }
798 }
799
800 async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
801 run_assert_sink(
802 config,
803 events.into_iter().map(LogEvent::from).map(Event::Log),
804 )
805 .await;
806 }
807
808 async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
809 run_assert_sink(
810 config,
811 events
812 .into_iter()
813 .map(LogEvent::from)
814 .map(TraceEvent::from)
815 .map(Event::Trace),
816 )
817 .await;
818 }
819
820 async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
821 assert_sink_compliance(&FILE_SINK_TAGS, async move {
822 let sink = FileSink::new(config, SinkContext::default()).unwrap();
823 VectorSink::from_event_streamsink(sink)
824 .run(Box::pin(stream::iter(events.map(Into::into))))
825 .await
826 .expect("Running sink failed")
827 })
828 .await;
829 }
830}