1use std::time::Duration;
2
3use vector_lib::{
4 api_client::Client,
5 tap::{EventFormatter, OutputChannel, TapRunner},
6};
7
8use crate::signal::{SignalRx, SignalTo};
9
10const RECONNECT_DELAY: u64 = 5000;
12
13pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
16 let url = opts.url();
17 let client = Client::new(url.clone());
20 #[allow(clippy::print_stderr)]
21 if client.healthcheck().await.is_err() {
22 eprintln!(
23 indoc::indoc! {"
24 Vector API server isn't reachable ({}).
25
26 Have you enabled the API?
27
28 To enable the API, add the following to your Vector config file:
29
30 [api]
31 enabled = true"},
32 url
33 );
34 return exitcode::UNAVAILABLE;
35 }
36
37 tap(opts, signal_rx).await
38}
39
40pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode {
42 let subscription_url = opts.web_socket_url();
43 let output_channel = OutputChannel::Stdout(EventFormatter::new(opts.meta, opts.format));
44 let tap_runner = TapRunner::new(
45 &subscription_url,
46 opts.inputs_of.clone(),
47 opts.outputs_patterns().clone(),
48 &output_channel,
49 opts.format,
50 );
51
52 loop {
53 tokio::select! {
54 biased;
55 Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break,
56 exec_result = tap_runner.run_tap(
57 opts.interval as i64,
58 opts.limit as i64,
59 opts.duration_ms,
60 opts.quiet,
61 ) => {
62 match exec_result {
63 Ok(_) => {
64 break;
65 }
66 Err(tap_executor_error) => {
67 if !opts.no_reconnect {
68 #[allow(clippy::print_stderr)]
69 {
70 eprintln!(
71 "[tap] Connection failed with error {:?}. Reconnecting in {:?} seconds.",
72 tap_executor_error,
73 RECONNECT_DELAY / 1000);
74 }
75 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
76 }
77 else {
78 break;
79 }
80 }
81 }
82 }
83 }
84 }
85
86 exitcode::OK
87}