1use std::{
2 convert::TryFrom,
3 time::{Duration, Instant},
4};
5
6use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
7use async_trait::async_trait;
8use bytes::{Bytes, BytesMut};
9use futures::{
10 FutureExt, future,
11 stream::{BoxStream, StreamExt},
12};
13use serde_with::serde_as;
14use tokio::{
15 fs::{self, File},
16 io::AsyncWriteExt,
17};
18use tokio_util::codec::Encoder as _;
19use vector_lib::{
20 EstimatedJsonEncodedSizeOf, TimeZone,
21 codecs::{
22 TextSerializerConfig,
23 encoding::{Framer, FramingConfig},
24 },
25 configurable::configurable_component,
26 internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
27};
28
29use crate::{
30 codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
31 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
32 event::{Event, EventStatus, Finalizable},
33 expiring_hash_map::ExpiringHashMap,
34 internal_events::{
35 FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
36 },
37 sinks::util::{StreamSink, timezone_to_offset},
38 template::Template,
39};
40
41mod bytes_path;
42
43use bytes_path::BytesPath;
44
45#[serde_as]
47#[configurable_component(sink("file", "Output observability events into files."))]
48#[derive(Clone, Debug)]
49#[serde(deny_unknown_fields)]
50pub struct FileSinkConfig {
51 #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log"))]
55 #[configurable(metadata(
56 docs::examples = "/tmp/application-{{ application_id }}-%Y-%m-%d.log"
57 ))]
58 #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log.zst"))]
59 pub path: Template,
60
61 #[serde(default = "default_idle_timeout")]
65 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
66 #[serde(rename = "idle_timeout_secs")]
67 #[configurable(metadata(docs::examples = 600))]
68 #[configurable(metadata(docs::human_name = "Idle Timeout"))]
69 pub idle_timeout: Duration,
70
71 #[serde(flatten)]
72 pub encoding: EncodingConfigWithFraming,
73
74 #[configurable(derived)]
75 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
76 pub compression: Compression,
77
78 #[configurable(derived)]
79 #[serde(
80 default,
81 deserialize_with = "crate::serde::bool_or_struct",
82 skip_serializing_if = "crate::serde::is_default"
83 )]
84 pub acknowledgements: AcknowledgementsConfig,
85
86 #[configurable(derived)]
87 #[serde(default)]
88 pub timezone: Option<TimeZone>,
89
90 #[configurable(derived)]
91 #[serde(default)]
92 pub internal_metrics: FileInternalMetricsConfig,
93}
94
95impl GenerateConfig for FileSinkConfig {
96 fn generate_config() -> toml::Value {
97 toml::Value::try_from(Self {
98 path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(),
99 idle_timeout: default_idle_timeout(),
100 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
101 compression: Default::default(),
102 acknowledgements: Default::default(),
103 timezone: Default::default(),
104 internal_metrics: Default::default(),
105 })
106 .unwrap()
107 }
108}
109
110const fn default_idle_timeout() -> Duration {
111 Duration::from_secs(30)
112}
113
114#[configurable_component]
118#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
119#[serde(rename_all = "snake_case")]
120pub enum Compression {
121 Gzip,
125
126 Zstd,
130
131 #[default]
133 None,
134}
135
136enum OutFile {
137 Regular(File),
138 Gzip(GzipEncoder<File>),
139 Zstd(ZstdEncoder<File>),
140}
141
142impl OutFile {
143 fn new(file: File, compression: Compression) -> Self {
144 match compression {
145 Compression::None => OutFile::Regular(file),
146 Compression::Gzip => OutFile::Gzip(GzipEncoder::new(file)),
147 Compression::Zstd => OutFile::Zstd(ZstdEncoder::new(file)),
148 }
149 }
150
151 async fn sync_all(&mut self) -> Result<(), std::io::Error> {
152 match self {
153 OutFile::Regular(file) => file.sync_all().await,
154 OutFile::Gzip(gzip) => gzip.get_mut().sync_all().await,
155 OutFile::Zstd(zstd) => zstd.get_mut().sync_all().await,
156 }
157 }
158
159 async fn shutdown(&mut self) -> Result<(), std::io::Error> {
160 match self {
161 OutFile::Regular(file) => file.shutdown().await,
162 OutFile::Gzip(gzip) => gzip.shutdown().await,
163 OutFile::Zstd(zstd) => zstd.shutdown().await,
164 }
165 }
166
167 async fn write_all(&mut self, src: &[u8]) -> Result<(), std::io::Error> {
168 match self {
169 OutFile::Regular(file) => file.write_all(src).await,
170 OutFile::Gzip(gzip) => gzip.write_all(src).await,
171 OutFile::Zstd(zstd) => zstd.write_all(src).await,
172 }
173 }
174
175 async fn close(&mut self) -> Result<(), std::io::Error> {
178 self.shutdown().await?;
179 self.sync_all().await
180 }
181}
182
183#[async_trait::async_trait]
184#[typetag::serde(name = "file")]
185impl SinkConfig for FileSinkConfig {
186 async fn build(
187 &self,
188 cx: SinkContext,
189 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
190 let sink = FileSink::new(self, cx)?;
191 Ok((
192 super::VectorSink::from_event_streamsink(sink),
193 future::ok(()).boxed(),
194 ))
195 }
196
197 fn input(&self) -> Input {
198 Input::new(self.encoding.config().1.input_type())
199 }
200
201 fn acknowledgements(&self) -> &AcknowledgementsConfig {
202 &self.acknowledgements
203 }
204}
205
206pub struct FileSink {
207 path: Template,
208 transformer: Transformer,
209 encoder: Encoder<Framer>,
210 idle_timeout: Duration,
211 files: ExpiringHashMap<Bytes, OutFile>,
212 compression: Compression,
213 events_sent: Registered<EventsSent>,
214 include_file_metric_tag: bool,
215}
216
217impl FileSink {
218 pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
219 let transformer = config.encoding.transformer();
220 let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
221 let encoder = Encoder::<Framer>::new(framer, serializer);
222
223 let offset = config
224 .timezone
225 .or(cx.globals.timezone)
226 .and_then(timezone_to_offset);
227
228 Ok(Self {
229 path: config.path.clone().with_tz_offset(offset),
230 transformer,
231 encoder,
232 idle_timeout: config.idle_timeout,
233 files: ExpiringHashMap::default(),
234 compression: config.compression,
235 events_sent: register!(EventsSent::from(Output(None))),
236 include_file_metric_tag: config.internal_metrics.include_file_tag,
237 })
238 }
239
240 fn partition_event(&mut self, event: &Event) -> Option<bytes::Bytes> {
243 let bytes = match self.path.render(event) {
244 Ok(b) => b,
245 Err(error) => {
246 emit!(TemplateRenderingError {
247 error,
248 field: Some("path"),
249 drop_event: true,
250 });
251 return None;
252 }
253 };
254
255 Some(bytes)
256 }
257
258 fn deadline_at(&self) -> Instant {
259 Instant::now()
260 .checked_add(self.idle_timeout)
261 .expect("unable to compute next deadline")
262 }
263
264 async fn run(&mut self, mut input: BoxStream<'_, Event>) -> crate::Result<()> {
265 loop {
266 tokio::select! {
267 event = input.next() => {
268 match event {
269 Some(event) => self.process_event(event).await,
270 None => {
271 debug!(message = "Receiver exhausted, terminating the processing loop.");
273
274 debug!(message = "Closing all the open files.");
276 for (path, file) in self.files.iter_mut() {
277 if let Err(error) = file.close().await {
278 emit!(FileIoError {
279 error,
280 code: "failed_closing_file",
281 message: "Failed to close file.",
282 path,
283 dropped_events: 0,
284 });
285 } else{
286 trace!(message = "Successfully closed file.", path = ?path);
287 }
288 }
289
290 emit!(FileOpen {
291 count: 0
292 });
293
294 break;
295 }
296 }
297 }
298 result = self.files.next_expired(), if !self.files.is_empty() => {
299 match result {
300 None => unreachable!(),
303 Some((mut expired_file, path)) => {
304 if let Err(error) = expired_file.close().await {
307 emit!(FileIoError {
308 error,
309 code: "failed_closing_file",
310 message: "Failed to close file.",
311 path: &path,
312 dropped_events: 0,
313 });
314 }
315 drop(expired_file); emit!(FileOpen {
317 count: self.files.len()
318 });
319 }
320 }
321 }
322 }
323 }
324
325 Ok(())
326 }
327
328 async fn process_event(&mut self, mut event: Event) {
329 let path = match self.partition_event(&event) {
330 Some(path) => path,
331 None => {
332 event.metadata().update_status(EventStatus::Errored);
337 return;
338 }
339 };
340
341 let next_deadline = self.deadline_at();
342 trace!(message = "Computed next deadline.", next_deadline = ?next_deadline, path = ?path);
343
344 let file = if let Some(file) = self.files.reset_at(&path, next_deadline) {
345 trace!(message = "Working with an already opened file.", path = ?path);
346 file
347 } else {
348 trace!(message = "Opening new file.", ?path);
349 let file = match open_file(BytesPath::new(path.clone())).await {
350 Ok(file) => file,
351 Err(error) => {
352 emit!(FileIoError {
356 code: "failed_opening_file",
357 message: "Unable to open the file.",
358 error,
359 path: &path,
360 dropped_events: 1,
361 });
362 event.metadata().update_status(EventStatus::Errored);
363 return;
364 }
365 };
366
367 let outfile = OutFile::new(file, self.compression);
368
369 self.files.insert_at(path.clone(), outfile, next_deadline);
370 emit!(FileOpen {
371 count: self.files.len()
372 });
373 self.files.get_mut(&path).unwrap()
374 };
375
376 trace!(message = "Writing an event to file.", path = ?path);
377 let event_size = event.estimated_json_encoded_size_of();
378 let finalizers = event.take_finalizers();
379 match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await {
380 Ok(byte_size) => {
381 finalizers.update_status(EventStatus::Delivered);
382 self.events_sent.emit(CountByteSize(1, event_size));
383 emit!(FileBytesSent {
384 byte_size,
385 file: String::from_utf8_lossy(&path),
386 include_file_metric_tag: self.include_file_metric_tag,
387 });
388 }
389 Err(error) => {
390 finalizers.update_status(EventStatus::Errored);
391 emit!(FileIoError {
392 code: "failed_writing_file",
393 message: "Failed to write the file.",
394 error,
395 path: &path,
396 dropped_events: 1,
397 });
398 }
399 }
400 }
401}
402
403async fn open_file(path: impl AsRef<std::path::Path>) -> std::io::Result<File> {
404 let parent = path.as_ref().parent();
405
406 if let Some(parent) = parent {
407 fs::create_dir_all(parent).await?;
408 }
409
410 fs::OpenOptions::new()
411 .read(false)
412 .write(true)
413 .create(true)
414 .append(true)
415 .open(path)
416 .await
417}
418
419async fn write_event_to_file(
420 file: &mut OutFile,
421 mut event: Event,
422 transformer: &Transformer,
423 encoder: &mut Encoder<Framer>,
424) -> Result<usize, std::io::Error> {
425 transformer.transform(&mut event);
426 let mut buffer = BytesMut::new();
427 encoder
428 .encode(event, &mut buffer)
429 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))?;
430 file.write_all(&buffer).await.map(|()| buffer.len())
431}
432
433#[async_trait]
434impl StreamSink<Event> for FileSink {
435 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
436 FileSink::run(&mut self, input)
437 .await
438 .expect("file sink error");
439 Ok(())
440 }
441}
442
443#[cfg(test)]
444mod tests {
445 use std::convert::TryInto;
446
447 use chrono::{SubsecRound, Utc};
448 use futures::{SinkExt, stream};
449 use similar_asserts::assert_eq;
450 use vector_lib::{
451 codecs::JsonSerializerConfig,
452 event::{LogEvent, TraceEvent},
453 sink::VectorSink,
454 };
455
456 use super::*;
457 use crate::{
458 config::log_schema,
459 test_util::{
460 components::{FILE_SINK_TAGS, assert_sink_compliance},
461 lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
462 random_lines_with_stream, random_metrics_with_stream,
463 random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
464 },
465 };
466
467 #[test]
468 fn generate_config() {
469 crate::test_util::test_generate_config::<FileSinkConfig>();
470 }
471
472 #[tokio::test]
473 async fn log_single_partition() {
474 let template = temp_file();
475
476 let config = FileSinkConfig {
477 path: template.clone().try_into().unwrap(),
478 idle_timeout: default_idle_timeout(),
479 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
480 compression: Compression::None,
481 acknowledgements: Default::default(),
482 timezone: Default::default(),
483 internal_metrics: FileInternalMetricsConfig {
484 include_file_tag: true,
485 },
486 };
487
488 let (input, _events) = random_lines_with_stream(100, 64, None);
489
490 run_assert_log_sink(&config, input.clone()).await;
491
492 let output = lines_from_file(template);
493 for (input, output) in input.into_iter().zip(output) {
494 assert_eq!(input, output);
495 }
496 }
497
498 #[tokio::test]
499 async fn log_single_partition_gzip() {
500 let template = temp_file();
501
502 let config = FileSinkConfig {
503 path: template.clone().try_into().unwrap(),
504 idle_timeout: default_idle_timeout(),
505 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
506 compression: Compression::Gzip,
507 acknowledgements: Default::default(),
508 timezone: Default::default(),
509 internal_metrics: FileInternalMetricsConfig {
510 include_file_tag: true,
511 },
512 };
513
514 let (input, _) = random_lines_with_stream(100, 64, None);
515
516 run_assert_log_sink(&config, input.clone()).await;
517
518 let output = lines_from_gzip_file(template);
519 for (input, output) in input.into_iter().zip(output) {
520 assert_eq!(input, output);
521 }
522 }
523
524 #[tokio::test]
525 async fn log_single_partition_zstd() {
526 let template = temp_file();
527
528 let config = FileSinkConfig {
529 path: template.clone().try_into().unwrap(),
530 idle_timeout: default_idle_timeout(),
531 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
532 compression: Compression::Zstd,
533 acknowledgements: Default::default(),
534 timezone: Default::default(),
535 internal_metrics: FileInternalMetricsConfig {
536 include_file_tag: true,
537 },
538 };
539
540 let (input, _) = random_lines_with_stream(100, 64, None);
541
542 run_assert_log_sink(&config, input.clone()).await;
543
544 let output = lines_from_zstd_file(template);
545 for (input, output) in input.into_iter().zip(output) {
546 assert_eq!(input, output);
547 }
548 }
549
550 #[tokio::test]
551 async fn log_many_partitions() {
552 let directory = temp_dir();
553
554 let mut template = directory.to_string_lossy().to_string();
555 template.push_str("/{{level}}s-{{date}}.log");
556
557 trace!(message = "Template.", %template);
558
559 let config = FileSinkConfig {
560 path: template.try_into().unwrap(),
561 idle_timeout: default_idle_timeout(),
562 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
563 compression: Compression::None,
564 acknowledgements: Default::default(),
565 timezone: Default::default(),
566 internal_metrics: FileInternalMetricsConfig {
567 include_file_tag: true,
568 },
569 };
570
571 let (mut input, _events) = random_events_with_stream(32, 8, None);
572 input[0].as_mut_log().insert("date", "2019-26-07");
573 input[0].as_mut_log().insert("level", "warning");
574 input[1].as_mut_log().insert("date", "2019-26-07");
575 input[1].as_mut_log().insert("level", "error");
576 input[2].as_mut_log().insert("date", "2019-26-07");
577 input[2].as_mut_log().insert("level", "warning");
578 input[3].as_mut_log().insert("date", "2019-27-07");
579 input[3].as_mut_log().insert("level", "error");
580 input[4].as_mut_log().insert("date", "2019-27-07");
581 input[4].as_mut_log().insert("level", "warning");
582 input[5].as_mut_log().insert("date", "2019-27-07");
583 input[5].as_mut_log().insert("level", "warning");
584 input[6].as_mut_log().insert("date", "2019-28-07");
585 input[6].as_mut_log().insert("level", "warning");
586 input[7].as_mut_log().insert("date", "2019-29-07");
587 input[7].as_mut_log().insert("level", "error");
588
589 run_assert_sink(&config, input.clone().into_iter()).await;
590
591 let output = [
592 lines_from_file(directory.join("warnings-2019-26-07.log")),
593 lines_from_file(directory.join("errors-2019-26-07.log")),
594 lines_from_file(directory.join("warnings-2019-27-07.log")),
595 lines_from_file(directory.join("errors-2019-27-07.log")),
596 lines_from_file(directory.join("warnings-2019-28-07.log")),
597 lines_from_file(directory.join("errors-2019-29-07.log")),
598 ];
599
600 let message_key = log_schema().message_key().unwrap().to_string();
601 assert_eq!(
602 input[0].as_log()[&message_key],
603 From::<&str>::from(&output[0][0])
604 );
605 assert_eq!(
606 input[1].as_log()[&message_key],
607 From::<&str>::from(&output[1][0])
608 );
609 assert_eq!(
610 input[2].as_log()[&message_key],
611 From::<&str>::from(&output[0][1])
612 );
613 assert_eq!(
614 input[3].as_log()[&message_key],
615 From::<&str>::from(&output[3][0])
616 );
617 assert_eq!(
618 input[4].as_log()[&message_key],
619 From::<&str>::from(&output[2][0])
620 );
621 assert_eq!(
622 input[5].as_log()[&message_key],
623 From::<&str>::from(&output[2][1])
624 );
625 assert_eq!(
626 input[6].as_log()[&message_key],
627 From::<&str>::from(&output[4][0])
628 );
629 assert_eq!(
630 input[7].as_log()[message_key],
631 From::<&str>::from(&output[5][0])
632 );
633 }
634
635 #[tokio::test]
636 async fn log_reopening() {
637 trace_init();
638
639 let template = temp_file();
640
641 let config = FileSinkConfig {
642 path: template.clone().try_into().unwrap(),
643 idle_timeout: Duration::from_secs(1),
644 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
645 compression: Compression::None,
646 acknowledgements: Default::default(),
647 timezone: Default::default(),
648 internal_metrics: FileInternalMetricsConfig {
649 include_file_tag: true,
650 },
651 };
652
653 let (mut input, _events) = random_lines_with_stream(10, 64, None);
654
655 let (mut tx, rx) = futures::channel::mpsc::channel(0);
656
657 let sink_handle = tokio::spawn(async move {
658 assert_sink_compliance(&FILE_SINK_TAGS, async move {
659 let sink = FileSink::new(&config, SinkContext::default()).unwrap();
660 VectorSink::from_event_streamsink(sink)
661 .run(Box::pin(rx.map(Into::into)))
662 .await
663 .expect("Running sink failed");
664 })
665 .await
666 });
667
668 for line in input.clone() {
670 tx.send(Event::Log(LogEvent::from(line))).await.unwrap();
671 }
672
673 tokio::time::sleep(Duration::from_secs(2)).await;
675
676 let last_line = "i should go at the end";
678 tx.send(LogEvent::from(last_line).into()).await.unwrap();
679 input.push(String::from(last_line));
680
681 tokio::time::sleep(Duration::from_secs(1)).await;
683
684 let output = lines_from_file(template);
686 assert_eq!(input, output);
687
688 drop(tx);
690 sink_handle.await.unwrap();
691 }
692
693 #[tokio::test]
694 async fn metric_single_partition() {
695 let template = temp_file();
696
697 let config = FileSinkConfig {
698 path: template.clone().try_into().unwrap(),
699 idle_timeout: default_idle_timeout(),
700 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
701 compression: Compression::None,
702 acknowledgements: Default::default(),
703 timezone: Default::default(),
704 internal_metrics: FileInternalMetricsConfig {
705 include_file_tag: true,
706 },
707 };
708
709 let (input, _events) = random_metrics_with_stream(100, None, None);
710
711 run_assert_sink(&config, input.clone().into_iter()).await;
712
713 let output = lines_from_file(template);
714 for (input, output) in input.into_iter().zip(output) {
715 let metric_name = input.as_metric().name();
716 assert!(output.contains(metric_name));
717 }
718 }
719
720 #[tokio::test]
721 async fn metric_many_partitions() {
722 let directory = temp_dir();
723
724 let format = "%Y-%m-%d-%H-%M-%S";
725 let mut template = directory.to_string_lossy().to_string();
726 template.push_str(&format!("/{format}.log"));
727
728 let config = FileSinkConfig {
729 path: template.try_into().unwrap(),
730 idle_timeout: default_idle_timeout(),
731 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
732 compression: Compression::None,
733 acknowledgements: Default::default(),
734 timezone: Default::default(),
735 internal_metrics: FileInternalMetricsConfig {
736 include_file_tag: true,
737 },
738 };
739
740 let metric_count = 3;
741 let timestamp = Utc::now().trunc_subsecs(3);
742 let timestamp_offset = Duration::from_secs(1);
743
744 let (input, _events) = random_metrics_with_stream_timestamp(
745 metric_count,
746 None,
747 None,
748 timestamp,
749 timestamp_offset,
750 );
751
752 run_assert_sink(&config, input.clone().into_iter()).await;
753
754 let output = (0..metric_count).map(|index| {
755 let expected_timestamp = timestamp + (timestamp_offset * index as u32);
756 let expected_filename =
757 directory.join(format!("{}.log", expected_timestamp.format(format)));
758
759 lines_from_file(expected_filename)
760 });
761 for (input, output) in input.iter().zip(output) {
762 assert_eq!(
764 output.len(),
765 1,
766 "Expected the output file to contain one metric"
767 );
768 let output = &output[0];
769
770 let metric_name = input.as_metric().name();
771 assert!(output.contains(metric_name));
772 }
773 }
774
775 #[tokio::test]
776 async fn trace_single_partition() {
777 let template = temp_file();
778
779 let config = FileSinkConfig {
780 path: template.clone().try_into().unwrap(),
781 idle_timeout: default_idle_timeout(),
782 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
783 compression: Compression::None,
784 acknowledgements: Default::default(),
785 timezone: Default::default(),
786 internal_metrics: FileInternalMetricsConfig {
787 include_file_tag: true,
788 },
789 };
790
791 let (input, _events) = random_lines_with_stream(100, 64, None);
792
793 run_assert_trace_sink(&config, input.clone()).await;
794
795 let output = lines_from_file(template);
796 for (input, output) in input.iter().zip(output) {
797 assert!(output.contains(input));
798 }
799 }
800
801 async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
802 run_assert_sink(
803 config,
804 events.into_iter().map(LogEvent::from).map(Event::Log),
805 )
806 .await;
807 }
808
809 async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
810 run_assert_sink(
811 config,
812 events
813 .into_iter()
814 .map(LogEvent::from)
815 .map(TraceEvent::from)
816 .map(Event::Trace),
817 )
818 .await;
819 }
820
821 async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
822 assert_sink_compliance(&FILE_SINK_TAGS, async move {
823 let sink = FileSink::new(config, SinkContext::default()).unwrap();
824 VectorSink::from_event_streamsink(sink)
825 .run(Box::pin(stream::iter(events.map(Into::into))))
826 .await
827 .expect("Running sink failed")
828 })
829 .await;
830 }
831}