1use std::time::Duration;
2
3use vector_lib::api_client::Client;
4use vector_lib::tap::{EventFormatter, OutputChannel, TapRunner};
5
6use crate::signal::{SignalRx, SignalTo};
7
8const RECONNECT_DELAY: u64 = 5000;
10
11pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
14 let url = opts.url();
15 let client = Client::new(url.clone());
18 #[allow(clippy::print_stderr)]
19 if client.healthcheck().await.is_err() {
20 eprintln!(
21 indoc::indoc! {"
22 Vector API server isn't reachable ({}).
23
24 Have you enabled the API?
25
26 To enable the API, add the following to your Vector config file:
27
28 [api]
29 enabled = true"},
30 url
31 );
32 return exitcode::UNAVAILABLE;
33 }
34
35 tap(opts, signal_rx).await
36}
37
38pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode {
40 let subscription_url = opts.web_socket_url();
41 let output_channel = OutputChannel::Stdout(EventFormatter::new(opts.meta, opts.format));
42 let tap_runner = TapRunner::new(
43 &subscription_url,
44 opts.inputs_of.clone(),
45 opts.outputs_patterns().clone(),
46 &output_channel,
47 opts.format,
48 );
49
50 loop {
51 tokio::select! {
52 biased;
53 Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break,
54 exec_result = tap_runner.run_tap(
55 opts.interval as i64,
56 opts.limit as i64,
57 opts.duration_ms,
58 opts.quiet,
59 ) => {
60 match exec_result {
61 Ok(_) => {
62 break;
63 }
64 Err(tap_executor_error) => {
65 if !opts.no_reconnect {
66 #[allow(clippy::print_stderr)]
67 {
68 eprintln!(
69 "[tap] Connection failed with error {:?}. Reconnecting in {:?} seconds.",
70 tap_executor_error,
71 RECONNECT_DELAY / 1000);
72 }
73 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
74 }
75 else {
76 break;
77 }
78 }
79 }
80 }
81 }
82 }
83
84 exitcode::OK
85}