1use std::{
2 convert::TryFrom,
3 num::NonZeroU64,
4 time::{Duration, Instant},
5};
6
7use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
8use async_trait::async_trait;
9use bytes::{Bytes, BytesMut};
10use futures::{
11 FutureExt, future,
12 stream::{BoxStream, StreamExt},
13};
14use serde_with::serde_as;
15use tokio::{
16 fs::{self, File},
17 io::AsyncWriteExt,
18};
19use tokio_util::{codec::Encoder as _, time::delay_queue::Expired};
20use vector_lib::{
21 EstimatedJsonEncodedSizeOf, TimeZone,
22 codecs::{
23 TextSerializerConfig,
24 encoding::{Framer, FramingConfig},
25 },
26 configurable::configurable_component,
27 internal_event::{CountByteSize, EventsSent, InternalEventHandle as _, Output, Registered},
28};
29
30use crate::{
31 codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
32 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
33 event::{Event, EventStatus, Finalizable},
34 expiring_hash_map::ExpiringHashMap,
35 internal_events::{
36 FileBytesSent, FileInternalMetricsConfig, FileIoError, FileOpen, TemplateRenderingError,
37 },
38 sinks::util::{StreamSink, timezone_to_offset},
39 template::Template,
40};
41
42mod bytes_path;
43
44use bytes_path::BytesPath;
45
46#[serde_as]
48#[configurable_component(sink("file", "Output observability events into files."))]
49#[derive(Clone, Debug)]
50#[serde(deny_unknown_fields)]
51pub struct FileSinkConfig {
52 #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log"))]
56 #[configurable(metadata(
57 docs::examples = "/tmp/application-{{ application_id }}-%Y-%m-%d.log"
58 ))]
59 #[configurable(metadata(docs::examples = "/tmp/vector-%Y-%m-%d.log.zst"))]
60 pub path: Template,
61
62 #[serde(default = "default_idle_timeout")]
66 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
67 #[serde(rename = "idle_timeout_secs")]
68 #[configurable(metadata(docs::examples = 600))]
69 #[configurable(metadata(docs::human_name = "Idle Timeout"))]
70 pub idle_timeout: Duration,
71
72 #[serde(flatten)]
73 pub encoding: EncodingConfigWithFraming,
74
75 #[configurable(derived)]
76 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
77 pub compression: Compression,
78
79 #[configurable(derived)]
80 #[serde(
81 default,
82 deserialize_with = "crate::serde::bool_or_struct",
83 skip_serializing_if = "crate::serde::is_default"
84 )]
85 pub acknowledgements: AcknowledgementsConfig,
86
87 #[configurable(derived)]
88 #[serde(default)]
89 pub timezone: Option<TimeZone>,
90
91 #[configurable(derived)]
92 #[serde(default)]
93 pub internal_metrics: FileInternalMetricsConfig,
94
95 #[configurable(derived)]
96 #[serde(default)]
97 pub truncate: FileTruncateConfig,
98}
99
100#[configurable_component]
102#[derive(Clone, Debug, Default)]
103#[serde(deny_unknown_fields)]
104pub struct FileTruncateConfig {
105 #[serde(default)]
107 pub after_close_time_secs: Option<NonZeroU64>,
108 #[serde(default)]
110 pub after_modified_time_secs: Option<NonZeroU64>,
111 #[serde(default)]
113 pub after_secs: Option<NonZeroU64>,
114}
115
116impl GenerateConfig for FileSinkConfig {
117 fn generate_config() -> toml::Value {
118 toml::Value::try_from(Self {
119 path: Template::try_from("/tmp/vector-%Y-%m-%d.log").unwrap(),
120 idle_timeout: default_idle_timeout(),
121 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
122 compression: Default::default(),
123 acknowledgements: Default::default(),
124 timezone: Default::default(),
125 internal_metrics: Default::default(),
126 truncate: Default::default(),
127 })
128 .unwrap()
129 }
130}
131
132const fn default_idle_timeout() -> Duration {
133 Duration::from_secs(30)
134}
135
136#[configurable_component]
140#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
141#[serde(rename_all = "snake_case")]
142pub enum Compression {
143 Gzip,
147
148 Zstd,
152
153 #[default]
155 None,
156}
157
158struct OutFile {
159 created_at: Instant,
160 inner: OutFileInner,
161}
162
163enum OutFileInner {
164 Regular(File),
165 Gzip(GzipEncoder<File>),
166 Zstd(ZstdEncoder<File>),
167}
168
169impl OutFile {
170 fn new(file: File, compression: Compression) -> Self {
171 Self {
172 created_at: Instant::now(),
173 inner: match compression {
174 Compression::None => OutFileInner::Regular(file),
175 Compression::Gzip => OutFileInner::Gzip(GzipEncoder::new(file)),
176 Compression::Zstd => OutFileInner::Zstd(ZstdEncoder::new(file)),
177 },
178 }
179 }
180
181 async fn sync_all(&mut self) -> Result<(), std::io::Error> {
182 match &mut self.inner {
183 OutFileInner::Regular(file) => file.sync_all().await,
184 OutFileInner::Gzip(gzip) => gzip.get_mut().sync_all().await,
185 OutFileInner::Zstd(zstd) => zstd.get_mut().sync_all().await,
186 }
187 }
188
189 async fn shutdown(&mut self) -> Result<(), std::io::Error> {
190 match &mut self.inner {
191 OutFileInner::Regular(file) => file.shutdown().await,
192 OutFileInner::Gzip(gzip) => gzip.shutdown().await,
193 OutFileInner::Zstd(zstd) => zstd.shutdown().await,
194 }
195 }
196
197 async fn write_all(&mut self, src: &[u8]) -> Result<(), std::io::Error> {
198 match &mut self.inner {
199 OutFileInner::Regular(file) => file.write_all(src).await,
200 OutFileInner::Gzip(gzip) => gzip.write_all(src).await,
201 OutFileInner::Zstd(zstd) => zstd.write_all(src).await,
202 }
203 }
204
205 const fn created_at(&self) -> Instant {
206 self.created_at
207 }
208
209 async fn close(&mut self) -> Result<(), std::io::Error> {
212 self.shutdown().await?;
213 self.sync_all().await
214 }
215}
216
217#[async_trait::async_trait]
218#[typetag::serde(name = "file")]
219impl SinkConfig for FileSinkConfig {
220 async fn build(
221 &self,
222 cx: SinkContext,
223 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
224 let sink = FileSink::new(self, cx)?;
225 Ok((
226 super::VectorSink::from_event_streamsink(sink),
227 future::ok(()).boxed(),
228 ))
229 }
230
231 fn input(&self) -> Input {
232 Input::new(self.encoding.config().1.input_type())
233 }
234
235 fn acknowledgements(&self) -> &AcknowledgementsConfig {
236 &self.acknowledgements
237 }
238}
239
240pub struct FileSink {
241 path: Template,
242 transformer: Transformer,
243 encoder: Encoder<Framer>,
244 idle_timeout: Duration,
245 files: ExpiringHashMap<Bytes, OutFile>,
246 compression: Compression,
247 events_sent: Registered<EventsSent>,
248 include_file_metric_tag: bool,
249 truncation_config: FileTruncateConfig,
250}
251
252impl FileSink {
253 pub fn new(config: &FileSinkConfig, cx: SinkContext) -> crate::Result<Self> {
254 let transformer = config.encoding.transformer();
255 let (framer, serializer) = config.encoding.build(SinkType::StreamBased)?;
256 let encoder = Encoder::<Framer>::new(framer, serializer);
257
258 let offset = config
259 .timezone
260 .or(cx.globals.timezone)
261 .and_then(timezone_to_offset);
262
263 Ok(Self {
264 path: config.path.clone().with_tz_offset(offset),
265 transformer,
266 encoder,
267 idle_timeout: config.idle_timeout,
268 files: ExpiringHashMap::default(),
269 compression: config.compression,
270 events_sent: register!(EventsSent::from(Output(None))),
271 include_file_metric_tag: config.internal_metrics.include_file_tag,
272 truncation_config: config.truncate.clone(),
273 })
274 }
275
276 fn partition_event(&mut self, event: &Event) -> Option<bytes::Bytes> {
279 let bytes = match self.path.render(event) {
280 Ok(b) => b,
281 Err(error) => {
282 emit!(TemplateRenderingError {
283 error,
284 field: Some("path"),
285 drop_event: true,
286 });
287 return None;
288 }
289 };
290
291 Some(bytes)
292 }
293
294 fn deadline_at(&self) -> Instant {
295 Instant::now()
296 .checked_add(self.idle_timeout)
297 .expect("unable to compute next deadline")
298 }
299
300 async fn run(&mut self, mut input: BoxStream<'_, Event>) -> crate::Result<()> {
301 loop {
302 tokio::select! {
303 event = input.next() => {
304 match event {
305 Some(event) => self.process_event(event).await,
306 None => {
307 debug!(message = "Receiver exhausted, terminating the processing loop.");
309
310 debug!(message = "Closing all the open files.");
312 for (path, file) in self.files.iter_mut() {
313 if let Err(error) = file.close().await {
314 emit!(FileIoError {
315 error,
316 code: "failed_closing_file",
317 message: "Failed to close file.",
318 path,
319 dropped_events: 0,
320 });
321 } else{
322 trace!(message = "Successfully closed file.", path = ?path);
323 }
324 }
325
326 emit!(FileOpen {
327 count: 0
328 });
329
330 break;
331 }
332 }
333 }
334 result = self.files.next_expired(), if !self.files.is_empty() => {
335 match result {
336 None => unreachable!(),
339 Some((expired_file, path)) => {
340 self.close_file(expired_file, path).await;
343 }
344 }
345 }
346 }
347 }
348
349 Ok(())
350 }
351
352 async fn process_event(&mut self, mut event: Event) {
353 let path = match self.partition_event(&event) {
354 Some(path) => path,
355 None => {
356 event.metadata().update_status(EventStatus::Errored);
361 return;
362 }
363 };
364
365 let next_deadline = self.deadline_at();
366 trace!(message = "Computed next deadline.", next_deadline = ?next_deadline, path = ?path);
367
368 let bytes_path = BytesPath::new(path.clone());
369 let truncate = self.should_truncate(&bytes_path, &path).await;
370 let file = if !truncate && let Some(file) = self.files.reset_at(&path, next_deadline) {
371 trace!(message = "Working with an already opened file.", path = ?path);
372 file
373 } else {
374 trace!(message = "Opening new file.", ?path);
375 let file = match open_file(bytes_path, truncate).await {
376 Ok(file) => file,
377 Err(error) => {
378 emit!(FileIoError {
382 code: "failed_opening_file",
383 message: "Unable to open the file.",
384 error,
385 path: &path,
386 dropped_events: 1,
387 });
388 event.metadata().update_status(EventStatus::Errored);
389 return;
390 }
391 };
392
393 let outfile = OutFile::new(file, self.compression);
394
395 self.files.insert_at(path.clone(), outfile, next_deadline);
396 emit!(FileOpen {
397 count: self.files.len()
398 });
399 self.files.get_mut(&path).unwrap()
400 };
401
402 trace!(message = "Writing an event to file.", path = ?path);
403 let event_size = event.estimated_json_encoded_size_of();
404 let finalizers = event.take_finalizers();
405 match write_event_to_file(file, event, &self.transformer, &mut self.encoder).await {
406 Ok(byte_size) => {
407 finalizers.update_status(EventStatus::Delivered);
408 self.events_sent.emit(CountByteSize(1, event_size));
409 emit!(FileBytesSent {
410 byte_size,
411 file: String::from_utf8_lossy(&path),
412 include_file_metric_tag: self.include_file_metric_tag,
413 });
414 }
415 Err(error) => {
416 finalizers.update_status(EventStatus::Errored);
417 emit!(FileIoError {
418 code: "failed_writing_file",
419 message: "Failed to write the file.",
420 error,
421 path: &path,
422 dropped_events: 1,
423 });
424 }
425 }
426 }
427
428 async fn should_truncate(&mut self, bytes_path: &BytesPath, path: &bytes::Bytes) -> bool {
429 let mut truncate = false;
430
431 if let Some(after_close_time_secs) = self.truncation_config.after_close_time_secs
432 && self.files.get(path).is_none()
433 && let Ok(metadata) = fs::metadata(bytes_path).await
434 && let Ok(time) = metadata
435 .modified()
436 .map_err(|_| ())
437 .and_then(|t| t.elapsed().map_err(|_| ()))
438 && time.as_secs() > after_close_time_secs.into()
439 {
440 truncate = true;
441 }
442
443 if let Some(after_secs) = self.truncation_config.after_secs
444 && let Some(file) = self.files.get(path)
445 && (file.created_at().elapsed().as_secs() > after_secs.into())
446 {
447 truncate = true;
448 }
449
450 if let Some(after_modified_time_secs) = self.truncation_config.after_modified_time_secs
451 && let Some(previous_modification) = self
452 .files
453 .get_with_deadline(path)
454 .and_then(|(_, deadline)| deadline.checked_sub(self.idle_timeout))
455 && previous_modification.elapsed().as_secs() > after_modified_time_secs.into()
456 {
457 truncate = true;
458 }
459
460 if truncate && let Some((file, path)) = self.files.remove(path) {
461 self.close_file(file, path).await;
462 }
463
464 truncate
465 }
466
467 async fn close_file(&self, mut file: OutFile, path: Expired<Bytes>) {
468 if let Err(error) = file.close().await {
469 emit!(FileIoError {
470 error,
471 code: "failed_closing_file",
472 message: "Failed to close file.",
473 path: &path,
474 dropped_events: 0,
475 });
476 }
477 drop(file); emit!(FileOpen {
479 count: self.files.len()
480 });
481 }
482}
483
484async fn open_file(path: impl AsRef<std::path::Path>, truncate: bool) -> std::io::Result<File> {
485 let parent = path.as_ref().parent();
486
487 if let Some(parent) = parent {
488 fs::create_dir_all(parent).await?;
489 }
490
491 fs::OpenOptions::new()
492 .read(false)
493 .write(true)
494 .create(true)
495 .append(!truncate)
496 .truncate(truncate)
497 .open(path)
498 .await
499}
500
501async fn write_event_to_file(
502 file: &mut OutFile,
503 mut event: Event,
504 transformer: &Transformer,
505 encoder: &mut Encoder<Framer>,
506) -> Result<usize, std::io::Error> {
507 transformer.transform(&mut event);
508 let mut buffer = BytesMut::new();
509 encoder
510 .encode(event, &mut buffer)
511 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidData, error))?;
512 file.write_all(&buffer).await.map(|()| buffer.len())
513}
514
515#[async_trait]
516impl StreamSink<Event> for FileSink {
517 async fn run(mut self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
518 FileSink::run(&mut self, input)
519 .await
520 .expect("file sink error");
521 Ok(())
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use std::convert::TryInto;
528
529 use chrono::{SubsecRound, Utc};
530 use futures::{SinkExt, stream};
531 use similar_asserts::assert_eq;
532 use vector_lib::{
533 codecs::JsonSerializerConfig,
534 event::{LogEvent, TraceEvent},
535 sink::VectorSink,
536 };
537
538 use super::*;
539 use crate::{
540 config::log_schema,
541 test_util::{
542 components::{FILE_SINK_TAGS, assert_sink_compliance},
543 lines_from_file, lines_from_gzip_file, lines_from_zstd_file, random_events_with_stream,
544 random_lines_with_stream, random_metrics_with_stream,
545 random_metrics_with_stream_timestamp, temp_dir, temp_file, trace_init,
546 },
547 };
548
549 #[test]
550 fn generate_config() {
551 crate::test_util::test_generate_config::<FileSinkConfig>();
552 }
553
554 #[tokio::test]
555 async fn log_single_partition() {
556 let template = temp_file();
557
558 let config = FileSinkConfig {
559 path: template.clone().try_into().unwrap(),
560 idle_timeout: default_idle_timeout(),
561 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
562 compression: Compression::None,
563 acknowledgements: Default::default(),
564 timezone: Default::default(),
565 internal_metrics: FileInternalMetricsConfig {
566 include_file_tag: true,
567 },
568 truncate: Default::default(),
569 };
570
571 let (input, _events) = random_lines_with_stream(100, 64, None);
572
573 run_assert_log_sink(&config, input.clone()).await;
574
575 let output = lines_from_file(template);
576 for (input, output) in input.into_iter().zip(output) {
577 assert_eq!(input, output);
578 }
579 }
580
581 #[tokio::test]
582 async fn log_single_partition_gzip() {
583 let template = temp_file();
584
585 let config = FileSinkConfig {
586 path: template.clone().try_into().unwrap(),
587 idle_timeout: default_idle_timeout(),
588 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
589 compression: Compression::Gzip,
590 acknowledgements: Default::default(),
591 timezone: Default::default(),
592 internal_metrics: FileInternalMetricsConfig {
593 include_file_tag: true,
594 },
595 truncate: Default::default(),
596 };
597
598 let (input, _) = random_lines_with_stream(100, 64, None);
599
600 run_assert_log_sink(&config, input.clone()).await;
601
602 let output = lines_from_gzip_file(template);
603 for (input, output) in input.into_iter().zip(output) {
604 assert_eq!(input, output);
605 }
606 }
607
608 #[tokio::test]
609 async fn log_single_partition_zstd() {
610 let template = temp_file();
611
612 let config = FileSinkConfig {
613 path: template.clone().try_into().unwrap(),
614 idle_timeout: default_idle_timeout(),
615 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
616 compression: Compression::Zstd,
617 acknowledgements: Default::default(),
618 timezone: Default::default(),
619 internal_metrics: FileInternalMetricsConfig {
620 include_file_tag: true,
621 },
622 truncate: Default::default(),
623 };
624
625 let (input, _) = random_lines_with_stream(100, 64, None);
626
627 run_assert_log_sink(&config, input.clone()).await;
628
629 let output = lines_from_zstd_file(template);
630 for (input, output) in input.into_iter().zip(output) {
631 assert_eq!(input, output);
632 }
633 }
634
635 #[tokio::test]
636 async fn log_many_partitions() {
637 let directory = temp_dir();
638
639 let mut template = directory.to_string_lossy().to_string();
640 template.push_str("/{{level}}s-{{date}}.log");
641
642 trace!(message = "Template.", %template);
643
644 let config = FileSinkConfig {
645 path: template.try_into().unwrap(),
646 idle_timeout: default_idle_timeout(),
647 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
648 compression: Compression::None,
649 acknowledgements: Default::default(),
650 timezone: Default::default(),
651 internal_metrics: FileInternalMetricsConfig {
652 include_file_tag: true,
653 },
654 truncate: Default::default(),
655 };
656
657 let (mut input, _events) = random_events_with_stream(32, 8, None);
658 input[0].as_mut_log().insert("date", "2019-26-07");
659 input[0].as_mut_log().insert("level", "warning");
660 input[1].as_mut_log().insert("date", "2019-26-07");
661 input[1].as_mut_log().insert("level", "error");
662 input[2].as_mut_log().insert("date", "2019-26-07");
663 input[2].as_mut_log().insert("level", "warning");
664 input[3].as_mut_log().insert("date", "2019-27-07");
665 input[3].as_mut_log().insert("level", "error");
666 input[4].as_mut_log().insert("date", "2019-27-07");
667 input[4].as_mut_log().insert("level", "warning");
668 input[5].as_mut_log().insert("date", "2019-27-07");
669 input[5].as_mut_log().insert("level", "warning");
670 input[6].as_mut_log().insert("date", "2019-28-07");
671 input[6].as_mut_log().insert("level", "warning");
672 input[7].as_mut_log().insert("date", "2019-29-07");
673 input[7].as_mut_log().insert("level", "error");
674
675 run_assert_sink(&config, input.clone().into_iter()).await;
676
677 let output = [
678 lines_from_file(directory.join("warnings-2019-26-07.log")),
679 lines_from_file(directory.join("errors-2019-26-07.log")),
680 lines_from_file(directory.join("warnings-2019-27-07.log")),
681 lines_from_file(directory.join("errors-2019-27-07.log")),
682 lines_from_file(directory.join("warnings-2019-28-07.log")),
683 lines_from_file(directory.join("errors-2019-29-07.log")),
684 ];
685
686 let message_key = log_schema().message_key().unwrap().to_string();
687 assert_eq!(
688 input[0].as_log()[&message_key],
689 From::<&str>::from(&output[0][0])
690 );
691 assert_eq!(
692 input[1].as_log()[&message_key],
693 From::<&str>::from(&output[1][0])
694 );
695 assert_eq!(
696 input[2].as_log()[&message_key],
697 From::<&str>::from(&output[0][1])
698 );
699 assert_eq!(
700 input[3].as_log()[&message_key],
701 From::<&str>::from(&output[3][0])
702 );
703 assert_eq!(
704 input[4].as_log()[&message_key],
705 From::<&str>::from(&output[2][0])
706 );
707 assert_eq!(
708 input[5].as_log()[&message_key],
709 From::<&str>::from(&output[2][1])
710 );
711 assert_eq!(
712 input[6].as_log()[&message_key],
713 From::<&str>::from(&output[4][0])
714 );
715 assert_eq!(
716 input[7].as_log()[message_key],
717 From::<&str>::from(&output[5][0])
718 );
719 }
720
721 #[tokio::test]
722 async fn log_reopening() {
723 trace_init();
724
725 let template = temp_file();
726
727 let config = FileSinkConfig {
728 path: template.clone().try_into().unwrap(),
729 idle_timeout: Duration::from_secs(1),
730 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
731 compression: Compression::None,
732 acknowledgements: Default::default(),
733 timezone: Default::default(),
734 internal_metrics: FileInternalMetricsConfig {
735 include_file_tag: true,
736 },
737 truncate: Default::default(),
738 };
739
740 let (mut input, _events) = random_lines_with_stream(10, 64, None);
741
742 let (mut tx, rx) = futures::channel::mpsc::channel(0);
743
744 let sink_handle = tokio::spawn(async move {
745 assert_sink_compliance(&FILE_SINK_TAGS, async move {
746 let sink = FileSink::new(&config, SinkContext::default()).unwrap();
747 VectorSink::from_event_streamsink(sink)
748 .run(Box::pin(rx.map(Into::into)))
749 .await
750 .expect("Running sink failed");
751 })
752 .await
753 });
754
755 for line in input.clone() {
757 tx.send(Event::Log(LogEvent::from(line))).await.unwrap();
758 }
759
760 tokio::time::sleep(Duration::from_secs(2)).await;
762
763 let last_line = "i should go at the end";
765 tx.send(LogEvent::from(last_line).into()).await.unwrap();
766 input.push(String::from(last_line));
767
768 tokio::time::sleep(Duration::from_secs(1)).await;
770
771 let output = lines_from_file(template);
773 assert_eq!(input, output);
774
775 drop(tx);
777 sink_handle.await.unwrap();
778 }
779
780 #[tokio::test]
781 async fn metric_single_partition() {
782 let template = temp_file();
783
784 let config = FileSinkConfig {
785 path: template.clone().try_into().unwrap(),
786 idle_timeout: default_idle_timeout(),
787 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
788 compression: Compression::None,
789 acknowledgements: Default::default(),
790 timezone: Default::default(),
791 internal_metrics: FileInternalMetricsConfig {
792 include_file_tag: true,
793 },
794 truncate: Default::default(),
795 };
796
797 let (input, _events) = random_metrics_with_stream(100, None, None);
798
799 run_assert_sink(&config, input.clone().into_iter()).await;
800
801 let output = lines_from_file(template);
802 for (input, output) in input.into_iter().zip(output) {
803 let metric_name = input.as_metric().name();
804 assert!(output.contains(metric_name));
805 }
806 }
807
808 #[tokio::test]
809 async fn metric_many_partitions() {
810 let directory = temp_dir();
811
812 let format = "%Y-%m-%d-%H-%M-%S";
813 let mut template = directory.to_string_lossy().to_string();
814 template.push_str(&format!("/{format}.log"));
815
816 let config = FileSinkConfig {
817 path: template.try_into().unwrap(),
818 idle_timeout: default_idle_timeout(),
819 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
820 compression: Compression::None,
821 acknowledgements: Default::default(),
822 timezone: Default::default(),
823 internal_metrics: FileInternalMetricsConfig {
824 include_file_tag: true,
825 },
826 truncate: Default::default(),
827 };
828
829 let metric_count = 3;
830 let timestamp = Utc::now().trunc_subsecs(3);
831 let timestamp_offset = Duration::from_secs(1);
832
833 let (input, _events) = random_metrics_with_stream_timestamp(
834 metric_count,
835 None,
836 None,
837 timestamp,
838 timestamp_offset,
839 );
840
841 run_assert_sink(&config, input.clone().into_iter()).await;
842
843 let output = (0..metric_count).map(|index| {
844 let expected_timestamp = timestamp + (timestamp_offset * index as u32);
845 let expected_filename =
846 directory.join(format!("{}.log", expected_timestamp.format(format)));
847
848 lines_from_file(expected_filename)
849 });
850 for (input, output) in input.iter().zip(output) {
851 assert_eq!(
853 output.len(),
854 1,
855 "Expected the output file to contain one metric"
856 );
857 let output = &output[0];
858
859 let metric_name = input.as_metric().name();
860 assert!(output.contains(metric_name));
861 }
862 }
863
864 #[tokio::test]
865 async fn trace_single_partition() {
866 let template = temp_file();
867
868 let config = FileSinkConfig {
869 path: template.clone().try_into().unwrap(),
870 idle_timeout: default_idle_timeout(),
871 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
872 compression: Compression::None,
873 acknowledgements: Default::default(),
874 timezone: Default::default(),
875 internal_metrics: FileInternalMetricsConfig {
876 include_file_tag: true,
877 },
878 truncate: Default::default(),
879 };
880
881 let (input, _events) = random_lines_with_stream(100, 64, None);
882
883 run_assert_trace_sink(&config, input.clone()).await;
884
885 let output = lines_from_file(template);
886 for (input, output) in input.iter().zip(output) {
887 assert!(output.contains(input));
888 }
889 }
890
891 async fn run_assert_log_sink(config: &FileSinkConfig, events: Vec<String>) {
892 run_assert_sink(
893 config,
894 events.into_iter().map(LogEvent::from).map(Event::Log),
895 )
896 .await;
897 }
898
899 async fn run_assert_trace_sink(config: &FileSinkConfig, events: Vec<String>) {
900 run_assert_sink(
901 config,
902 events
903 .into_iter()
904 .map(LogEvent::from)
905 .map(TraceEvent::from)
906 .map(Event::Trace),
907 )
908 .await;
909 }
910
911 async fn run_assert_sink(config: &FileSinkConfig, events: impl Iterator<Item = Event> + Send) {
912 assert_sink_compliance(&FILE_SINK_TAGS, async move {
913 let sink = FileSink::new(config, SinkContext::default()).unwrap();
914 VectorSink::from_event_streamsink(sink)
915 .run(Box::pin(stream::iter(events.map(Into::into))))
916 .await
917 .expect("Running sink failed")
918 })
919 .await;
920 }
921}