1use std::{
2 convert::TryFrom,
3 num::NonZeroU64,
4 time::{Duration, Instant},
5};
6
7use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
8use async_trait::async_trait;
9use bytes::{Bytes, BytesMut};
10use futures::{
11 FutureExt, future,
12 stream::{BoxStream, StreamExt},
13};
14use serde_with::serde_as;
15use tokio::{
16 fs::{self, File},
17 io::AsyncWriteExt,
18};
19use tokio_util::{codec::Encoder as _, time::delay_queue::Expired};
20use vector_lib::{
21 EstimatedJsonEncodedSizeOf, TimeZone,
22 codecs::{
23 TextSerializerConfig,
24 encoding::{Framer, FramingConfig},
25 },
26 configurable::configurable_component,
27 internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
28};
29
30use crate::{
31 codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
32 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
33 event::{Event, EventStatus, Finalizable},
34 expiring_hash_map::ExpiringHashMap,
35 internal_events::{
36 FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
37 },
38 sinks::util::{StreamSink, timezone_to_offset},
39 template::Template,
40};
41
42mod bytes_path;
43
44use bytes_path::BytesPath;
45
46#[serde_as]
48#[configurable_component(sink("file", "Output observability events into files."))]
49#[derive(Clone, Debug)]
50#[serde(deny_unknown_fields)]
51pub struct FileSinkConfig {
52 #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log"))]
56 #[configurable(metadata(
57 docs::examples = "/tmp/application-{{ application_id }}-%Y-%m-%d.log"
58 ))]
59 #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log.zst"))]
60 #[configurable(metadata(
61 docs::warnings = "The rendered path can resolve to any location on the filesystem. Vector will write to it if the process has permission."
62 ))]
63 pub path: Template,
64
65 #[serde(default = "default_idle_timeout")]
69 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
70 #[serde(rename = "idle_timeout_secs")]
71 #[configurable(metadata(docs::examples = 600))]
72 #[configurable(metadata(docs::human_name = "Idle Timeout"))]
73 pub idle_timeout: Duration,
74
75 #[serde(flatten)]
76 pub encoding: EncodingConfigWithFraming,
77
78 #[configurable(derived)]
79 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
80 pub compression: Compression,
81
82 #[configurable(derived)]
83 #[serde(
84 default,
85 deserialize_with = "crate::serde::bool_or_struct",
86 skip_serializing_if = "crate::serde::is_default"
87 )]
88 pub acknowledgements: AcknowledgementsConfig,
89
90 #[configurable(derived)]
91 #[serde(default)]
92 pub timezone: Option<TimeZone>,
93
94 #[configurable(derived)]
95 #[serde(default)]
96 pub internal_metrics: FileInternalMetricsConfig,
97
98 #[configurable(derived)]
99 #[serde(default)]
100 pub truncate: FileTruncateConfig,
101}
102
103#[configurable_component]
105#[derive(Clone, Debug, Default)]
106#[serde(deny_unknown_fields)]
107pub struct FileTruncateConfig {
108 #[serde(default)]
110 pub after_close_time_secs: Option<NonZeroU64>,
111 #[serde(default)]
113 pub after_modified_time_secs: Option<NonZeroU64>,
114 #[serde(default)]
116 pub after_secs: Option<NonZeroU64>,
117}
118
119impl GenerateConfig for FileSinkConfig {
120 fn generate_config() -> toml::Value {
121 toml::Value::try_from(Self {
122 path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(),
123 idle_timeout: default_idle_timeout(),
124 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
125 compression: Default::default(),
126 acknowledgements: Default::default(),
127 timezone: Default::default(),
128 internal_metrics: Default::default(),
129 truncate: Default::default(),
130 })
131 .unwrap()
132 }
133}
134
135const fn default_idle_timeout() -> Duration {
136 Duration::from_secs(30)
137}
138
139#[configurable_component]
143#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
144#[serde(rename_all = "snake_case")]
145pub enum Compression {
146 Gzip,
150
151 Zstd,
155
156 #[default]
158 None,
159}
160
161struct OutFile {
162 created_at: Instant,
163 inner: OutFileInner,
164}
165
166enum OutFileInner {
167 Regular(File),
168 Gzip(GzipEncoder<File>),
169 Zstd(ZstdEncoder<File>),
170}
171
172impl OutFile {
173 fn new(file: File, compression: Compression) -> Self {
174 Self {
175 created_at: Instant::now(),
176 inner: match compression {
177 Compression::None => OutFileInner::Regular(file),
178 Compression::Gzip => OutFileInner::Gzip(GzipEncoder::new(file)),
179 Compression::Zstd => OutFileInner::Zstd(ZstdEncoder::new(file)),
180 },
181 }
182 }
183
184 async fn sync_all(&mut self) -> Result<(), std::io::Error> {
185 match &mut self.inner {
186 OutFileInner::Regular(file) => file.sync_all().await,
187 OutFileInner::Gzip(gzip) => gzip.get_mut().sync_all().await,
188 OutFileInner::Zstd(zstd) => zstd.get_mut().sync_all().await,
189 }
190 }
191
192 async fn shutdown(&mut self) -> Result<(), std::io::Error> {
193 match &mut self.inner {
194 OutFileInner::Regular(file) => file.shutdown().await,
195 OutFileInner::Gzip(gzip) => gzip.shutdown().await,
196 OutFileInner::Zstd(zstd) => zstd.shutdown().await,
197 }
198 }
199
200 async fn write_all(&mut self, src: &[u8]) -> Result<(), std::io::Error> {
201 match &mut self.inner {
202 OutFileInner::Regular(file) => file.write_all(src).await,
203 OutFileInner::Gzip(gzip) => gzip.write_all(src).await,
204 OutFileInner::Zstd(zstd) => zstd.write_all(src).await,
205 }
206 }
207
208 const fn created_at(&self) -> Instant {
209 self.created_at
210 }
211
212 async fn close(&mut self) -> Result<(), std::io::Error> {
215 self.shutdown().await?;
216 self.sync_all().await
217 }
218}
219
220#[async_trait::async_trait]
221#[typetag::serde(name = "file")]
222impl SinkConfig for FileSinkConfig {
223 async fn build(
224 &self,
225 cx: SinkContext,
226 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
227 let sink = FileSink::new(self, cx)?;
228 Ok((
229 super::VectorSink::from_event_streamsink(sink),
230 future::ok(()).boxed(),
231 ))
232 }
233
234 fn input(&self) -> Input {
235 Input::new(self.encoding.config().1.input_type())
236 }
237
238 fn acknowledgements(&self) -> &AcknowledgementsConfig {
239 &self.acknowledgements
240 }
241}
242
243pub struct FileSink {
244 path: Template,
245 transformer: Transformer,
246 encoder: Encoder<Framer>,
247 idle_timeout: Duration,
248 files: ExpiringHashMap<Bytes, OutFile>,
249 compression: Compression,
250 events_sent: Registered<EventsSent>,
251 include_file_metric_tag: bool,
252 truncation_config: FileTruncateConfig,
253}
254
255impl FileSink {
256 pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
257 let transformer = config.encoding.transformer();
258 let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
259 let encoder = Encoder::<Framer>::new(framer, serializer);
260
261 let offset = config
262 .timezone
263 .or(cx.globals.timezone)
264 .and_then(timezone_to_offset);
265
266 Ok(Self {
267 path: config.path.clone().with_tz_offset(offset),
268 transformer,
269 encoder,
270 idle_timeout: config.idle_timeout,
271 files: ExpiringHashMap::default(),
272 compression: config.compression,
273 events_sent: register!(EventsSent::from(Output(None))),
274 include_file_metric_tag: config.internal_metrics.include_file_tag,
275 truncation_config: config.truncate.clone(),
276 })
277 }
278
279 fn partition_event(&mut self, event: &Event) -> Option<bytes::Bytes> {
282 let bytes = match self.path.render(event) {
283 Ok(b) => b,
284 Err(error) => {
285 emit!(TemplateRenderingError {
286 error,
287 field: Some("path"),
288 drop_event: true,
289 });
290 return None;
291 }
292 };
293
294 Some(bytes)
295 }
296
297 fn deadline_at(&self) -> Instant {
298 Instant::now()
299 .checked_add(self.idle_timeout)
300 .expect("unable to compute next deadline")
301 }
302
303 async fn run(&mut self, mut input: BoxStream<'_, Event>) -> crate::Result<()> {
304 loop {
305 tokio::select! {
306 event = input.next() => {
307 match event {
308 Some(event) => self.process_event(event).await,
309 None => {
310 debug!(message = "Receiver exhausted, terminating the processing loop.");
312
313 debug!(message = "Closing all the open files.");
315 for (path, file) in self.files.iter_mut() {
316 if let Err(error) = file.close().await {
317 emit!(FileIoError {
318 error,
319 code: "failed_closing_file",
320 message: "Failed to close file.",
321 path,
322 dropped_events: 0,
323 });
324 } else{
325 trace!(message = "Successfully closed file.", path = ?path);
326 }
327 }
328
329 emit!(FileOpen {
330 count: 0
331 });
332
333 break;
334 }
335 }
336 }
337 result = self.files.next_expired(), if !self.files.is_empty() => {
338 match result {
339 None => unreachable!(),
342 Some((expired_file, path)) => {
343 self.close_file(expired_file, path).await;
346 }
347 }
348 }
349 }
350 }
351
352 Ok(())
353 }
354
355 async fn process_event(&mut self, mut event: Event) {
356 let path = match self.partition_event(&event) {
357 Some(path) => path,
358 None => {
359 event.metadata().update_status(EventStatus::Errored);
364 return;
365 }
366 };
367
368 let next_deadline = self.deadline_at();
369 trace!(message = "Computed next deadline.", next_deadline = ?next_deadline, path = ?path);
370
371 let bytes_path = BytesPath::new(path.clone());
372 let truncate = self.should_truncate(&bytes_path, &path).await;
373 let file = if !truncate && let Some(file) = self.files.reset_at(&path, next_deadline) {
374 trace!(message = "Working with an already opened file.", path = ?path);
375 file
376 } else {
377 trace!(message = "Opening new file.", ?path);
378 let file = match open_file(bytes_path, truncate).await {
379 Ok(file) => file,
380 Err(error) => {
381 emit!(FileIoError {
385 code: "failed_opening_file",
386 message: "Unable to open the file.",
387 error,
388 path: &path,
389 dropped_events: 1,
390 });
391 event.metadata().update_status(EventStatus::Errored);
392 return;
393 }
394 };
395
396 let outfile = OutFile::new(file, self.compression);
397
398 self.files.insert_at(path.clone(), outfile, next_deadline);
399 emit!(FileOpen {
400 count: self.files.len()
401 });
402 self.files.get_mut(&path).unwrap()
403 };
404
405 trace!(message = "Writing an event to file.", path = ?path);
406 let event_size = event.estimated_json_encoded_size_of();
407 let finalizers = event.take_finalizers();
408 match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await {
409 Ok(byte_size) => {
410 finalizers.update_status(EventStatus::Delivered);
411 self.events_sent.emit(CountByteSize(1, event_size));
412 emit!(FileBytesSent {
413 byte_size,
414 file: String::from_utf8_lossy(&path),
415 include_file_metric_tag: self.include_file_metric_tag,
416 });
417 }
418 Err(error) => {
419 finalizers.update_status(EventStatus::Errored);
420 emit!(FileIoError {
421 code: "failed_writing_file",
422 message: "Failed to write the file.",
423 error,
424 path: &path,
425 dropped_events: 1,
426 });
427 }
428 }
429 }
430
431 async fn should_truncate(&mut self, bytes_path: &BytesPath, path: &bytes::Bytes) -> bool {
432 let mut truncate = false;
433
434 if let Some(after_close_time_secs) = self.truncation_config.after_close_time_secs
435 && self.files.get(path).is_none()
436 && let Ok(metadata) = fs::metadata(bytes_path).await
437 && let Ok(time) = metadata
438 .modified()
439 .map_err(|_| ())
440 .and_then(|t| t.elapsed().map_err(|_| ()))
441 && time.as_secs() > after_close_time_secs.into()
442 {
443 truncate = true;
444 }
445
446 if let Some(after_secs) = self.truncation_config.after_secs
447 && let Some(file) = self.files.get(path)
448 && (file.created_at().elapsed().as_secs() > after_secs.into())
449 {
450 truncate = true;
451 }
452
453 if let Some(after_modified_time_secs) = self.truncation_config.after_modified_time_secs
454 && let Some(previous_modification) = self
455 .files
456 .get_with_deadline(path)
457 .and_then(|(_, deadline)| deadline.checked_sub(self.idle_timeout))
458 && previous_modification.elapsed().as_secs() > after_modified_time_secs.into()
459 {
460 truncate = true;
461 }
462
463 if truncate && let Some((file, path)) = self.files.remove(path) {
464 self.close_file(file, path).await;
465 }
466
467 truncate
468 }
469
470 async fn close_file(&self, mut file: OutFile, path: Expired<Bytes>) {
471 if let Err(error) = file.close().await {
472 emit!(FileIoError {
473 error,
474 code: "failed_closing_file",
475 message: "Failed to close file.",
476 path: &path,
477 dropped_events: 0,
478 });
479 }
480 drop(file); emit!(FileOpen {
482 count: self.files.len()
483 });
484 }
485}
486
487async fn open_file(path: impl AsRef<std::path::Path>, truncate: bool) -> std::io::Result<File> {
488 let parent = path.as_ref().parent();
489
490 if let Some(parent) = parent {
491 fs::create_dir_all(parent).await?;
492 }
493
494 fs::OpenOptions::new()
495 .read(false)
496 .write(true)
497 .create(true)
498 .append(!truncate)
499 .truncate(truncate)
500 .open(path)
501 .await
502}
503
504async fn write_event_to_file(
505 file: &mut OutFile,
506 mut event: Event,
507 transformer: &Transformer,
508 encoder: &mut Encoder<Framer>,
509) -> Result<usize, std::io::Error> {
510 transformer.transform(&mut event);
511 let mut buffer = BytesMut::new();
512 encoder
513 .encode(event, &mut buffer)
514 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))?;
515 file.write_all(&buffer).await.map(|()| buffer.len())
516}
517
518#[async_trait]
519impl StreamSink<Event> for FileSink {
520 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
521 FileSink::run(&mut self, input)
522 .await
523 .expect("file sink error");
524 Ok(())
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use std::convert::TryInto;
531
532 use chrono::{SubsecRound, Utc};
533 use futures::{SinkExt, stream};
534 use similar_asserts::assert_eq;
535 use vector_lib::{
536 codecs::JsonSerializerConfig,
537 event::{LogEvent, TraceEvent},
538 sink::VectorSink,
539 };
540
541 use super::*;
542 use crate::{
543 config::log_schema,
544 test_util::{
545 components::{FILE_SINK_TAGS, assert_sink_compliance},
546 lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
547 random_lines_with_stream, random_metrics_with_stream,
548 random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
549 },
550 };
551
552 #[test]
553 fn generate_config() {
554 crate::test_util::test_generate_config::<FileSinkConfig>();
555 }
556
557 #[tokio::test]
558 async fn log_single_partition() {
559 let template = temp_file();
560
561 let config = FileSinkConfig {
562 path: template.clone().try_into().unwrap(),
563 idle_timeout: default_idle_timeout(),
564 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
565 compression: Compression::None,
566 acknowledgements: Default::default(),
567 timezone: Default::default(),
568 internal_metrics: FileInternalMetricsConfig {
569 include_file_tag: true,
570 },
571 truncate: Default::default(),
572 };
573
574 let (input, _events) = random_lines_with_stream(100, 64, None);
575
576 run_assert_log_sink(&config, input.clone()).await;
577
578 let output = lines_from_file(template);
579 for (input, output) in input.into_iter().zip(output) {
580 assert_eq!(input, output);
581 }
582 }
583
584 #[tokio::test]
585 async fn log_single_partition_gzip() {
586 let template = temp_file();
587
588 let config = FileSinkConfig {
589 path: template.clone().try_into().unwrap(),
590 idle_timeout: default_idle_timeout(),
591 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
592 compression: Compression::Gzip,
593 acknowledgements: Default::default(),
594 timezone: Default::default(),
595 internal_metrics: FileInternalMetricsConfig {
596 include_file_tag: true,
597 },
598 truncate: Default::default(),
599 };
600
601 let (input, _) = random_lines_with_stream(100, 64, None);
602
603 run_assert_log_sink(&config, input.clone()).await;
604
605 let output = lines_from_gzip_file(template);
606 for (input, output) in input.into_iter().zip(output) {
607 assert_eq!(input, output);
608 }
609 }
610
611 #[tokio::test]
612 async fn log_single_partition_zstd() {
613 let template = temp_file();
614
615 let config = FileSinkConfig {
616 path: template.clone().try_into().unwrap(),
617 idle_timeout: default_idle_timeout(),
618 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
619 compression: Compression::Zstd,
620 acknowledgements: Default::default(),
621 timezone: Default::default(),
622 internal_metrics: FileInternalMetricsConfig {
623 include_file_tag: true,
624 },
625 truncate: Default::default(),
626 };
627
628 let (input, _) = random_lines_with_stream(100, 64, None);
629
630 run_assert_log_sink(&config, input.clone()).await;
631
632 let output = lines_from_zstd_file(template);
633 for (input, output) in input.into_iter().zip(output) {
634 assert_eq!(input, output);
635 }
636 }
637
638 #[tokio::test]
639 async fn log_many_partitions() {
640 let directory = temp_dir();
641
642 let mut template = directory.to_string_lossy().to_string();
643 template.push_str("/{{level}}s-{{date}}.log");
644
645 trace!(message = "Template.", %template);
646
647 let config = FileSinkConfig {
648 path: template.try_into().unwrap(),
649 idle_timeout: default_idle_timeout(),
650 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
651 compression: Compression::None,
652 acknowledgements: Default::default(),
653 timezone: Default::default(),
654 internal_metrics: FileInternalMetricsConfig {
655 include_file_tag: true,
656 },
657 truncate: Default::default(),
658 };
659
660 let (mut input, _events) = random_events_with_stream(32, 8, None);
661 input[0].as_mut_log().insert("date", "2019-26-07");
662 input[0].as_mut_log().insert("level", "warning");
663 input[1].as_mut_log().insert("date", "2019-26-07");
664 input[1].as_mut_log().insert("level", "error");
665 input[2].as_mut_log().insert("date", "2019-26-07");
666 input[2].as_mut_log().insert("level", "warning");
667 input[3].as_mut_log().insert("date", "2019-27-07");
668 input[3].as_mut_log().insert("level", "error");
669 input[4].as_mut_log().insert("date", "2019-27-07");
670 input[4].as_mut_log().insert("level", "warning");
671 input[5].as_mut_log().insert("date", "2019-27-07");
672 input[5].as_mut_log().insert("level", "warning");
673 input[6].as_mut_log().insert("date", "2019-28-07");
674 input[6].as_mut_log().insert("level", "warning");
675 input[7].as_mut_log().insert("date", "2019-29-07");
676 input[7].as_mut_log().insert("level", "error");
677
678 run_assert_sink(&config, input.clone().into_iter()).await;
679
680 let output = [
681 lines_from_file(directory.join("warnings-2019-26-07.log")),
682 lines_from_file(directory.join("errors-2019-26-07.log")),
683 lines_from_file(directory.join("warnings-2019-27-07.log")),
684 lines_from_file(directory.join("errors-2019-27-07.log")),
685 lines_from_file(directory.join("warnings-2019-28-07.log")),
686 lines_from_file(directory.join("errors-2019-29-07.log")),
687 ];
688
689 let message_key = log_schema().message_key().unwrap().to_string();
690 assert_eq!(
691 input[0].as_log()[&message_key],
692 From::<&str>::from(&output[0][0])
693 );
694 assert_eq!(
695 input[1].as_log()[&message_key],
696 From::<&str>::from(&output[1][0])
697 );
698 assert_eq!(
699 input[2].as_log()[&message_key],
700 From::<&str>::from(&output[0][1])
701 );
702 assert_eq!(
703 input[3].as_log()[&message_key],
704 From::<&str>::from(&output[3][0])
705 );
706 assert_eq!(
707 input[4].as_log()[&message_key],
708 From::<&str>::from(&output[2][0])
709 );
710 assert_eq!(
711 input[5].as_log()[&message_key],
712 From::<&str>::from(&output[2][1])
713 );
714 assert_eq!(
715 input[6].as_log()[&message_key],
716 From::<&str>::from(&output[4][0])
717 );
718 assert_eq!(
719 input[7].as_log()[message_key],
720 From::<&str>::from(&output[5][0])
721 );
722 }
723
724 #[tokio::test]
725 async fn log_reopening() {
726 trace_init();
727
728 let template = temp_file();
729
730 let config = FileSinkConfig {
731 path: template.clone().try_into().unwrap(),
732 idle_timeout: Duration::from_secs(1),
733 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
734 compression: Compression::None,
735 acknowledgements: Default::default(),
736 timezone: Default::default(),
737 internal_metrics: FileInternalMetricsConfig {
738 include_file_tag: true,
739 },
740 truncate: Default::default(),
741 };
742
743 let (mut input, _events) = random_lines_with_stream(10, 64, None);
744
745 let (mut tx, rx) = futures::channel::mpsc::channel(0);
746
747 let sink_handle = tokio::spawn(async move {
748 assert_sink_compliance(&FILE_SINK_TAGS, async move {
749 let sink = FileSink::new(&config, SinkContext::default()).unwrap();
750 VectorSink::from_event_streamsink(sink)
751 .run(Box::pin(rx.map(Into::into)))
752 .await
753 .expect("Running sink failed");
754 })
755 .await
756 });
757
758 for line in input.clone() {
760 tx.send(Event::Log(LogEvent::from(line))).await.unwrap();
761 }
762
763 tokio::time::sleep(Duration::from_secs(2)).await;
765
766 let last_line = "i should go at the end";
768 tx.send(LogEvent::from(last_line).into()).await.unwrap();
769 input.push(String::from(last_line));
770
771 tokio::time::sleep(Duration::from_secs(1)).await;
773
774 let output = lines_from_file(template);
776 assert_eq!(input, output);
777
778 drop(tx);
780 sink_handle.await.unwrap();
781 }
782
783 #[tokio::test]
784 async fn metric_single_partition() {
785 let template = temp_file();
786
787 let config = FileSinkConfig {
788 path: template.clone().try_into().unwrap(),
789 idle_timeout: default_idle_timeout(),
790 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
791 compression: Compression::None,
792 acknowledgements: Default::default(),
793 timezone: Default::default(),
794 internal_metrics: FileInternalMetricsConfig {
795 include_file_tag: true,
796 },
797 truncate: Default::default(),
798 };
799
800 let (input, _events) = random_metrics_with_stream(100, None, None);
801
802 run_assert_sink(&config, input.clone().into_iter()).await;
803
804 let output = lines_from_file(template);
805 for (input, output) in input.into_iter().zip(output) {
806 let metric_name = input.as_metric().name();
807 assert!(output.contains(metric_name));
808 }
809 }
810
811 #[tokio::test]
812 async fn metric_many_partitions() {
813 let directory = temp_dir();
814
815 let format = "%Y-%m-%d-%H-%M-%S";
816 let mut template = directory.to_string_lossy().to_string();
817 template.push_str(&format!("/{format}.log"));
818
819 let config = FileSinkConfig {
820 path: template.try_into().unwrap(),
821 idle_timeout: default_idle_timeout(),
822 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
823 compression: Compression::None,
824 acknowledgements: Default::default(),
825 timezone: Default::default(),
826 internal_metrics: FileInternalMetricsConfig {
827 include_file_tag: true,
828 },
829 truncate: Default::default(),
830 };
831
832 let metric_count = 3;
833 let timestamp = Utc::now().trunc_subsecs(3);
834 let timestamp_offset = Duration::from_secs(1);
835
836 let (input, _events) = random_metrics_with_stream_timestamp(
837 metric_count,
838 None,
839 None,
840 timestamp,
841 timestamp_offset,
842 );
843
844 run_assert_sink(&config, input.clone().into_iter()).await;
845
846 let output = (0..metric_count).map(|index| {
847 let expected_timestamp = timestamp + (timestamp_offset * index as u32);
848 let expected_filename =
849 directory.join(format!("{}.log", expected_timestamp.format(format)));
850
851 lines_from_file(expected_filename)
852 });
853 for (input, output) in input.iter().zip(output) {
854 assert_eq!(
856 output.len(),
857 1,
858 "Expected the output file to contain one metric"
859 );
860 let output = &output[0];
861
862 let metric_name = input.as_metric().name();
863 assert!(output.contains(metric_name));
864 }
865 }
866
867 #[tokio::test]
868 async fn trace_single_partition() {
869 let template = temp_file();
870
871 let config = FileSinkConfig {
872 path: template.clone().try_into().unwrap(),
873 idle_timeout: default_idle_timeout(),
874 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
875 compression: Compression::None,
876 acknowledgements: Default::default(),
877 timezone: Default::default(),
878 internal_metrics: FileInternalMetricsConfig {
879 include_file_tag: true,
880 },
881 truncate: Default::default(),
882 };
883
884 let (input, _events) = random_lines_with_stream(100, 64, None);
885
886 run_assert_trace_sink(&config, input.clone()).await;
887
888 let output = lines_from_file(template);
889 for (input, output) in input.iter().zip(output) {
890 assert!(output.contains(input));
891 }
892 }
893
894 async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
895 run_assert_sink(
896 config,
897 events.into_iter().map(LogEvent::from).map(Event::Log),
898 )
899 .await;
900 }
901
902 async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
903 run_assert_sink(
904 config,
905 events
906 .into_iter()
907 .map(LogEvent::from)
908 .map(TraceEvent::from)
909 .map(Event::Trace),
910 )
911 .await;
912 }
913
914 async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
915 assert_sink_compliance(&FILE_SINK_TAGS, async move {
916 let sink = FileSink::new(config, SinkContext::default()).unwrap();
917 VectorSink::from_event_streamsink(sink)
918 .run(Box::pin(stream::iter(events.map(Into::into))))
919 .await
920 .expect("Running sink failed")
921 })
922 .await;
923 }
924}