1#![allow(missing_docs)]
2
3use std::{
4 collections::HashMap,
5 convert::Infallible,
6 fs::File,
7 future::{Future, ready},
8 io::Read,
9 iter,
10 net::SocketAddr,
11 path::{Path, PathBuf},
12 pin::Pin,
13 sync::{
14 Arc,
15 atomic::{AtomicUsize, Ordering},
16 },
17 task::{Context, Poll, ready},
18};
19
20use chrono::{DateTime, SubsecRound, Utc};
21use flate2::read::MultiGzDecoder;
22use futures::{FutureExt, SinkExt, Stream, StreamExt, TryStreamExt, stream, task::noop_waker_ref};
23use openssl::ssl::{SslConnector, SslFiletype, SslMethod, SslVerifyMode};
24use rand::{Rng, rng};
25use rand_distr::Alphanumeric;
26use tokio::{
27 io::{AsyncRead, AsyncWrite, AsyncWriteExt, Result as IoResult},
28 net::{TcpListener, TcpStream, ToSocketAddrs},
29 runtime,
30 sync::oneshot,
31 task::JoinHandle,
32 time::{Duration, Instant, sleep},
33};
34use tokio_stream::wrappers::TcpListenerStream;
35#[cfg(unix)]
36use tokio_stream::wrappers::UnixListenerStream;
37use tokio_util::codec::{Encoder, FramedRead, FramedWrite, LinesCodec};
38use vector_lib::{
39 buffers::topology::channel::LimitedReceiver,
40 event::{
41 BatchNotifier, BatchStatusReceiver, Event, EventArray, LogEvent, Metric, MetricKind,
42 MetricTags, MetricValue,
43 },
44};
45#[cfg(test)]
46use zstd::Decoder as ZstdDecoder;
47
48use crate::{
49 config::{Config, GenerateConfig},
50 topology::{RunningTopology, ShutdownErrorReceiver},
51 trace,
52};
53
54const WAIT_FOR_SECS: u64 = 5; const WAIT_FOR_MIN_MILLIS: u64 = 5; const WAIT_FOR_MAX_MILLIS: u64 = 500; pub mod addr;
59pub mod compression;
60pub mod stats;
61
62#[cfg(any(test, feature = "test-utils"))]
63pub mod components;
64#[cfg(test)]
65pub mod http;
66#[cfg(test)]
67pub mod integration;
68#[cfg(test)]
69pub mod metrics;
70#[cfg(test)]
71pub mod mock;
72
73#[macro_export]
74macro_rules! assert_downcast_matches {
75 ($e:expr_2021, $t:ty, $v:pat) => {{
76 match $e.downcast_ref::<$t>() {
77 Some($v) => (),
78 got => panic!("Assertion failed: got wrong error variant {:?}", got),
79 }
80 }};
81}
82
83#[macro_export]
84macro_rules! log_event {
85 ($($key:expr_2021 => $value:expr_2021),* $(,)?) => {
86 #[allow(unused_variables)]
87 {
88 let mut event = $crate::event::Event::Log($crate::event::LogEvent::default());
89 let log = event.as_mut_log();
90 $(
91 log.insert($key, $value);
92 )*
93 event
94 }
95 };
96}
97
98pub fn test_generate_config<T>()
99where
100 for<'de> T: GenerateConfig + serde::Deserialize<'de>,
101{
102 let cfg = toml::to_string(&T::generate_config()).unwrap();
103
104 toml::from_str::<T>(&cfg)
105 .unwrap_or_else(|e| panic!("Invalid config generated from string:\n\n{e}\n'{cfg}'"));
106}
107
108pub fn open_fixture(path: impl AsRef<Path>) -> crate::Result<serde_json::Value> {
109 let test_file = File::open(path)?;
110 let value: serde_json::Value = serde_json::from_reader(test_file)?;
111 Ok(value)
112}
113
114pub fn trace_init() {
115 #[cfg(unix)]
116 let color = {
117 use std::io::IsTerminal;
118 std::io::stdout().is_terminal()
119 || std::env::var("NEXTEST")
120 .ok()
121 .and(Some(true))
122 .unwrap_or(false)
123 };
124 #[cfg(not(unix))]
127 let color = false;
128
129 let levels = std::env::var("VECTOR_LOG").unwrap_or_else(|_| "error".to_string());
130
131 trace::init(color, false, &levels, 10);
132
133 vector_lib::metrics::init_test();
135}
136
137pub async fn send_lines(
138 addr: SocketAddr,
139 lines: impl IntoIterator<Item = String>,
140) -> Result<SocketAddr, Infallible> {
141 send_encodable(addr, LinesCodec::new(), lines).await
142}
143
144pub async fn send_encodable<I, E: From<std::io::Error> + std::fmt::Debug>(
145 addr: SocketAddr,
146 encoder: impl Encoder<I, Error = E>,
147 lines: impl IntoIterator<Item = I>,
148) -> Result<SocketAddr, Infallible> {
149 let stream = TcpStream::connect(&addr).await.unwrap();
150
151 let local_addr = stream.local_addr().unwrap();
152
153 let mut sink = FramedWrite::new(stream, encoder);
154
155 let mut lines = stream::iter(lines.into_iter()).map(Ok);
156 sink.send_all(&mut lines).await.unwrap();
157
158 let stream = sink.get_mut();
159 stream.shutdown().await.unwrap();
160
161 Ok(local_addr)
162}
163
164pub async fn send_lines_tls(
165 addr: SocketAddr,
166 host: String,
167 lines: impl Iterator<Item = String>,
168 ca: impl Into<Option<&Path>>,
169 client_cert: impl Into<Option<&Path>>,
170 client_key: impl Into<Option<&Path>>,
171) -> Result<SocketAddr, Infallible> {
172 let stream = TcpStream::connect(&addr).await.unwrap();
173
174 let local_addr = stream.local_addr().unwrap();
175
176 let mut connector = SslConnector::builder(SslMethod::tls()).unwrap();
177 if let Some(ca) = ca.into() {
178 connector.set_ca_file(ca).unwrap();
179 } else {
180 connector.set_verify(SslVerifyMode::NONE);
181 }
182
183 if let Some(cert_file) = client_cert.into() {
184 connector.set_certificate_chain_file(cert_file).unwrap();
185 }
186
187 if let Some(key_file) = client_key.into() {
188 connector
189 .set_private_key_file(key_file, SslFiletype::PEM)
190 .unwrap();
191 }
192
193 let ssl = connector
194 .build()
195 .configure()
196 .unwrap()
197 .into_ssl(&host)
198 .unwrap();
199
200 let mut stream = tokio_openssl::SslStream::new(ssl, stream).unwrap();
201 Pin::new(&mut stream).connect().await.unwrap();
202 let mut sink = FramedWrite::new(stream, LinesCodec::new());
203
204 let mut lines = stream::iter(lines).map(Ok);
205 sink.send_all(&mut lines).await.unwrap();
206
207 let stream = sink.get_mut().get_mut();
208 stream.shutdown().await.unwrap();
209
210 Ok(local_addr)
211}
212
213pub fn temp_file() -> PathBuf {
214 let path = std::env::temp_dir();
215 let file_name = random_string(16);
216 path.join(file_name + ".log")
217}
218
219pub fn temp_dir() -> PathBuf {
220 let path = std::env::temp_dir();
221 let dir_name = random_string(16);
222 path.join(dir_name)
223}
224
225pub fn random_table_name() -> String {
226 format!("test_{}", random_string(10).to_lowercase())
227}
228
229pub fn map_event_batch_stream(
230 stream: impl Stream<Item = Event>,
231 batch: Option<BatchNotifier>,
232) -> impl Stream<Item = EventArray> {
233 stream.map(move |event| event.with_batch_notifier_option(&batch).into())
234}
235
236fn map_batch_stream(
238 stream: impl Stream<Item = LogEvent>,
239 batch: Option<BatchNotifier>,
240) -> impl Stream<Item = EventArray> {
241 stream.map(move |log| vec![log.with_batch_notifier_option(&batch)].into())
242}
243
244pub fn generate_lines_with_stream<Gen: FnMut(usize) -> String>(
245 generator: Gen,
246 count: usize,
247 batch: Option<BatchNotifier>,
248) -> (Vec<String>, impl Stream<Item = EventArray>) {
249 let lines = (0..count).map(generator).collect::<Vec<_>>();
250 let stream = map_batch_stream(
251 stream::iter(lines.clone()).map(LogEvent::from_str_legacy),
252 batch,
253 );
254 (lines, stream)
255}
256
257pub fn random_lines_with_stream(
258 len: usize,
259 count: usize,
260 batch: Option<BatchNotifier>,
261) -> (Vec<String>, impl Stream<Item = EventArray>) {
262 let generator = move |_| random_string(len);
263 generate_lines_with_stream(generator, count, batch)
264}
265
266pub fn generate_events_with_stream<Gen: FnMut(usize) -> Event>(
267 generator: Gen,
268 count: usize,
269 batch: Option<BatchNotifier>,
270) -> (Vec<Event>, impl Stream<Item = EventArray>) {
271 let events = (0..count).map(generator).collect::<Vec<_>>();
272 let stream = map_batch_stream(
273 stream::iter(events.clone()).map(|event| event.into_log()),
274 batch,
275 );
276 (events, stream)
277}
278
279pub fn random_metrics_with_stream(
280 count: usize,
281 batch: Option<BatchNotifier>,
282 tags: Option<MetricTags>,
283) -> (Vec<Event>, impl Stream<Item = EventArray>) {
284 random_metrics_with_stream_timestamp(
285 count,
286 batch,
287 tags,
288 Utc::now().trunc_subsecs(3),
289 std::time::Duration::from_secs(2),
290 )
291}
292
293pub fn random_metrics_with_stream_timestamp(
305 count: usize,
306 batch: Option<BatchNotifier>,
307 tags: Option<MetricTags>,
308 timestamp: DateTime<Utc>,
309 timestamp_offset: std::time::Duration,
310) -> (Vec<Event>, impl Stream<Item = EventArray>) {
311 let events: Vec<_> = (0..count)
312 .map(|index| {
313 let ts = timestamp + (timestamp_offset * index as u32);
314 Event::Metric(
315 Metric::new(
316 format!("counter_{}", rng().random::<u32>()),
317 MetricKind::Incremental,
318 MetricValue::Counter {
319 value: index as f64,
320 },
321 )
322 .with_timestamp(Some(ts))
323 .with_tags(tags.clone()),
324 )
325 .with_source_type("a_source_like_none_other")
327 })
328 .collect();
329
330 let stream = map_event_batch_stream(stream::iter(events.clone()), batch);
331 (events, stream)
332}
333
334pub fn random_events_with_stream(
335 len: usize,
336 count: usize,
337 batch: Option<BatchNotifier>,
338) -> (Vec<Event>, impl Stream<Item = EventArray>) {
339 let events = (0..count)
340 .map(|_| Event::from(LogEvent::from_str_legacy(random_string(len))))
341 .collect::<Vec<_>>();
342 let stream = map_batch_stream(
343 stream::iter(events.clone()).map(|event| event.into_log()),
344 batch,
345 );
346 (events, stream)
347}
348
349pub fn random_updated_events_with_stream<F>(
350 len: usize,
351 count: usize,
352 batch: Option<BatchNotifier>,
353 update_fn: F,
354) -> (Vec<Event>, impl Stream<Item = EventArray>)
355where
356 F: Fn((usize, LogEvent)) -> LogEvent,
357{
358 let events = (0..count)
359 .map(|_| LogEvent::from_str_legacy(random_string(len)))
360 .enumerate()
361 .map(update_fn)
362 .map(Event::Log)
363 .collect::<Vec<_>>();
364 let stream = map_batch_stream(
365 stream::iter(events.clone()).map(|event| event.into_log()),
366 batch,
367 );
368 (events, stream)
369}
370
371pub fn create_events_batch_with_fn<F: Fn() -> Event>(
372 create_event_fn: F,
373 num_events: usize,
374) -> (Vec<Event>, BatchStatusReceiver) {
375 let mut events = (0..num_events)
376 .map(|_| create_event_fn())
377 .collect::<Vec<_>>();
378 let receiver = BatchNotifier::apply_to(&mut events);
379 (events, receiver)
380}
381
382pub fn random_string(len: usize) -> String {
383 rng()
384 .sample_iter(&Alphanumeric)
385 .take(len)
386 .map(char::from)
387 .collect::<String>()
388}
389
390pub fn random_lines(len: usize) -> impl Iterator<Item = String> {
391 iter::repeat_with(move || random_string(len))
392}
393
394pub fn random_map(max_size: usize, field_len: usize) -> HashMap<String, String> {
395 let size = rng().random_range(0..max_size);
396
397 (0..size)
398 .map(move |_| (random_string(field_len), random_string(field_len)))
399 .collect()
400}
401
402pub fn random_maps(
403 max_size: usize,
404 field_len: usize,
405) -> impl Iterator<Item = HashMap<String, String>> {
406 iter::repeat_with(move || random_map(max_size, field_len))
407}
408
409pub async fn collect_n<S>(rx: S, n: usize) -> Vec<S::Item>
410where
411 S: Stream,
412{
413 rx.take(n).collect().await
414}
415
416pub async fn collect_n_stream<T, S: Stream<Item = T> + Unpin>(stream: &mut S, n: usize) -> Vec<T> {
417 let mut events = Vec::with_capacity(n);
418
419 while events.len() < n {
420 let e = stream.next().await.unwrap();
421 events.push(e);
422 }
423 events
424}
425
426pub async fn collect_ready<S>(mut rx: S) -> Vec<S::Item>
427where
428 S: Stream + Unpin,
429{
430 let waker = noop_waker_ref();
431 let mut cx = Context::from_waker(waker);
432
433 let mut vec = Vec::new();
434 loop {
435 match rx.poll_next_unpin(&mut cx) {
436 Poll::Ready(Some(item)) => vec.push(item),
437 Poll::Ready(None) | Poll::Pending => return vec,
438 }
439 }
440}
441
442pub async fn collect_limited<T: Send + 'static>(mut rx: LimitedReceiver<T>) -> Vec<T> {
443 let mut items = Vec::new();
444 while let Some(item) = rx.next().await {
445 items.push(item);
446 }
447 items
448}
449
450pub async fn collect_n_limited<T: Send + 'static>(mut rx: LimitedReceiver<T>, n: usize) -> Vec<T> {
451 let mut items = Vec::new();
452 while items.len() < n {
453 match rx.next().await {
454 Some(item) => items.push(item),
455 None => break,
456 }
457 }
458 items
459}
460
461pub fn lines_from_file<P: AsRef<Path>>(path: P) -> Vec<String> {
462 trace!(message = "Reading file.", path = %path.as_ref().display());
463 let mut file = File::open(path).unwrap();
464 let mut output = String::new();
465 file.read_to_string(&mut output).unwrap();
466 output.lines().map(|s| s.to_owned()).collect()
467}
468
469pub fn lines_from_gzip_file<P: AsRef<Path>>(path: P) -> Vec<String> {
470 trace!(message = "Reading gzip file.", path = %path.as_ref().display());
471 let mut file = File::open(path).unwrap();
472 let mut gzip_bytes = Vec::new();
473 file.read_to_end(&mut gzip_bytes).unwrap();
474 let mut output = String::new();
475 MultiGzDecoder::new(&gzip_bytes[..])
476 .read_to_string(&mut output)
477 .unwrap();
478 output.lines().map(|s| s.to_owned()).collect()
479}
480
481#[cfg(test)]
482pub fn lines_from_zstd_file<P: AsRef<Path>>(path: P) -> Vec<String> {
483 trace!(message = "Reading zstd file.", path = %path.as_ref().display());
484 let file = File::open(path).unwrap();
485 let mut output = String::new();
486 ZstdDecoder::new(file)
487 .unwrap()
488 .read_to_string(&mut output)
489 .unwrap();
490 output.lines().map(|s| s.to_owned()).collect()
491}
492
493pub fn runtime() -> runtime::Runtime {
494 runtime::Builder::new_multi_thread()
495 .enable_all()
496 .build()
497 .unwrap()
498}
499
500pub async fn wait_for_duration<F, Fut>(mut f: F, duration: Duration)
502where
503 F: FnMut() -> Fut,
504 Fut: Future<Output = bool> + Send + 'static,
505{
506 let started = Instant::now();
507 let mut delay = WAIT_FOR_MIN_MILLIS;
508 while !f().await {
509 sleep(Duration::from_millis(delay)).await;
510 if started.elapsed() > duration {
511 panic!("Timed out while waiting");
512 }
513 delay = (delay * 2).min(WAIT_FOR_MAX_MILLIS);
515 }
516}
517
518pub async fn wait_for<F, Fut>(f: F)
520where
521 F: FnMut() -> Fut,
522 Fut: Future<Output = bool> + Send + 'static,
523{
524 wait_for_duration(f, Duration::from_secs(WAIT_FOR_SECS)).await
525}
526
527pub async fn wait_for_tcp<A>(addr: A)
529where
530 A: ToSocketAddrs + Clone + Send + 'static,
531{
532 wait_for(move || {
533 let addr = addr.clone();
534 async move { TcpStream::connect(addr).await.is_ok() }
535 })
536 .await
537}
538
539pub async fn wait_for_tcp_duration(addr: SocketAddr, duration: Duration) {
541 wait_for_duration(
542 || async move { TcpStream::connect(addr).await.is_ok() },
543 duration,
544 )
545 .await
546}
547
548pub async fn wait_for_atomic_usize<T, F>(value: T, unblock: F)
549where
550 T: AsRef<AtomicUsize>,
551 F: Fn(usize) -> bool,
552{
553 let value = value.as_ref();
554 wait_for(|| ready(unblock(value.load(Ordering::SeqCst)))).await
555}
556
557pub async fn retry_until<'a, F, Fut, T, E>(mut f: F, retry: Duration, until: Duration) -> T
559where
560 F: FnMut() -> Fut,
561 Fut: Future<Output = Result<T, E>> + Send + 'a,
562{
563 let started = Instant::now();
564 while started.elapsed() < until {
565 match f().await {
566 Ok(res) => return res,
567 Err(_) => tokio::time::sleep(retry).await,
568 }
569 }
570 panic!("Timeout")
571}
572
573pub struct CountReceiver<T> {
574 count: Arc<AtomicUsize>,
575 trigger: Option<oneshot::Sender<()>>,
576 connected: Option<oneshot::Receiver<()>>,
577 handle: JoinHandle<Vec<T>>,
578}
579
580impl<T: Send + 'static> CountReceiver<T> {
581 pub fn count(&self) -> usize {
582 self.count.load(Ordering::Relaxed)
583 }
584
585 pub async fn connected(&mut self) {
587 if let Some(tripwire) = self.connected.take() {
588 tripwire.await.unwrap();
589 }
590 }
591
592 fn new<F, Fut>(make_fut: F) -> CountReceiver<T>
593 where
594 F: FnOnce(Arc<AtomicUsize>, oneshot::Receiver<()>, oneshot::Sender<()>) -> Fut,
595 Fut: Future<Output = Vec<T>> + Send + 'static,
596 {
597 let count = Arc::new(AtomicUsize::new(0));
598 let (trigger, tripwire) = oneshot::channel();
599 let (trigger_connected, connected) = oneshot::channel();
600
601 CountReceiver {
602 count: Arc::clone(&count),
603 trigger: Some(trigger),
604 connected: Some(connected),
605 handle: tokio::spawn(make_fut(count, tripwire, trigger_connected)),
606 }
607 }
608
609 pub fn receive_items_stream<S, F, Fut>(make_stream: F) -> CountReceiver<T>
610 where
611 S: Stream<Item = T> + Send + 'static,
612 F: FnOnce(oneshot::Receiver<()>, oneshot::Sender<()>) -> Fut + Send + 'static,
613 Fut: Future<Output = S> + Send + 'static,
614 {
615 CountReceiver::new(|count, tripwire, connected| async move {
616 let stream = make_stream(tripwire, connected).await;
617 stream
618 .inspect(move |_| {
619 count.fetch_add(1, Ordering::Relaxed);
620 })
621 .collect::<Vec<T>>()
622 .await
623 })
624 }
625}
626
627impl<T> Future for CountReceiver<T> {
628 type Output = Vec<T>;
629
630 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
631 let this = self.get_mut();
632 if let Some(trigger) = this.trigger.take() {
633 _ = trigger.send(());
634 }
635
636 let result = ready!(this.handle.poll_unpin(cx));
637 Poll::Ready(result.unwrap())
638 }
639}
640
641impl CountReceiver<String> {
642 pub fn receive_lines(addr: SocketAddr) -> CountReceiver<String> {
643 CountReceiver::new(|count, tripwire, connected| async move {
644 let listener = TcpListener::bind(addr).await.unwrap();
645 CountReceiver::receive_lines_stream(
646 TcpListenerStream::new(listener),
647 count,
648 tripwire,
649 Some(connected),
650 )
651 .await
652 })
653 }
654
655 #[cfg(unix)]
656 pub fn receive_lines_unix<P>(path: P) -> CountReceiver<String>
657 where
658 P: AsRef<Path> + Send + 'static,
659 {
660 CountReceiver::new(|count, tripwire, connected| async move {
661 let listener = tokio::net::UnixListener::bind(path).unwrap();
662 CountReceiver::receive_lines_stream(
663 UnixListenerStream::new(listener),
664 count,
665 tripwire,
666 Some(connected),
667 )
668 .await
669 })
670 }
671
672 async fn receive_lines_stream<S, T>(
673 stream: S,
674 count: Arc<AtomicUsize>,
675 tripwire: oneshot::Receiver<()>,
676 mut connected: Option<oneshot::Sender<()>>,
677 ) -> Vec<String>
678 where
679 S: Stream<Item = IoResult<T>>,
680 T: AsyncWrite + AsyncRead,
681 {
682 stream
683 .take_until(tripwire)
684 .map_ok(|socket| FramedRead::new(socket, LinesCodec::new()))
685 .map(|x| {
686 connected.take().map(|trigger| trigger.send(()));
687 x.unwrap()
688 })
689 .flatten()
690 .map(|x| x.unwrap())
691 .inspect(move |_| {
692 count.fetch_add(1, Ordering::Relaxed);
693 })
694 .collect::<Vec<String>>()
695 .await
696 }
697}
698
699impl CountReceiver<Event> {
700 pub fn receive_events<S>(stream: S) -> CountReceiver<Event>
701 where
702 S: Stream<Item = Event> + Send + 'static,
703 {
704 CountReceiver::new(|count, tripwire, connected| async move {
705 connected.send(()).unwrap();
706 stream
707 .take_until(tripwire)
708 .inspect(move |_| {
709 count.fetch_add(1, Ordering::Relaxed);
710 })
711 .collect::<Vec<Event>>()
712 .await
713 })
714 }
715}
716
717pub async fn start_topology(
718 mut config: Config,
719 require_healthy: impl Into<Option<bool>>,
720) -> (RunningTopology, ShutdownErrorReceiver) {
721 config.healthchecks.set_require_healthy(require_healthy);
722 RunningTopology::start_init_validated(config, Default::default())
723 .await
724 .unwrap()
725}
726
727pub async fn spawn_collect_n<F, S>(future: F, stream: S, n: usize) -> Vec<Event>
733where
734 F: Future<Output = ()> + Send + 'static,
735 S: Stream<Item = Event>,
736{
737 let sender = tokio::spawn(future);
742 let events = collect_n(stream, n).await;
743 sender.await.expect("Failed to send data");
744 events
745}
746
747pub async fn spawn_collect_ready<F, S>(future: F, stream: S, sleep: u64) -> Vec<Event>
753where
754 F: Future<Output = ()> + Send + 'static,
755 S: Stream<Item = Event> + Unpin,
756{
757 let sender = tokio::spawn(future);
758 tokio::time::sleep(Duration::from_secs(sleep)).await;
759 let events = collect_ready(stream).await;
760 sender.await.expect("Failed to send data");
761 events
762}
763
764#[cfg(test)]
765mod tests {
766 use std::{
767 sync::{Arc, RwLock},
768 time::Duration,
769 };
770
771 use super::retry_until;
772
773 async fn retry_until_helper(count: Arc<RwLock<i32>>) -> Result<(), ()> {
775 if *count.read().unwrap() < 3 {
776 let mut c = count.write().unwrap();
777 *c += 1;
778 return Err(());
779 }
780 Ok(())
781 }
782
783 #[tokio::test]
784 async fn retry_until_before_timeout() {
785 let count = Arc::new(RwLock::new(0));
786 let func = || {
787 let count = Arc::clone(&count);
788 retry_until_helper(count)
789 };
790
791 retry_until(func, Duration::from_millis(10), Duration::from_secs(1)).await;
792 }
793}