vector/tap/
cmd.rs

1use std::time::Duration;
2
3use vector_lib::api_client::Client;
4use vector_lib::tap::{EventFormatter, OutputChannel, TapRunner};
5
6use crate::signal::{SignalRx, SignalTo};
7
8/// Delay (in milliseconds) before attempting to reconnect to the Vector API
9const RECONNECT_DELAY: u64 = 5000;
10
11/// CLI command func for issuing 'tap' queries, and communicating with a local/remote
12/// Vector API server via HTTP/WebSockets.
13pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
14    let url = opts.url();
15    // Return early with instructions for enabling the API if the endpoint isn't reachable
16    // via a healthcheck.
17    let client = Client::new(url.clone());
18    #[allow(clippy::print_stderr)]
19    if client.healthcheck().await.is_err() {
20        eprintln!(
21            indoc::indoc! {"
22            Vector API server isn't reachable ({}).
23
24            Have you enabled the API?
25
26            To enable the API, add the following to your Vector config file:
27
28            [api]
29                enabled = true"},
30            url
31        );
32        return exitcode::UNAVAILABLE;
33    }
34
35    tap(opts, signal_rx).await
36}
37
38/// Observe event flow from specified components
39pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode {
40    let subscription_url = opts.web_socket_url();
41    let output_channel = OutputChannel::Stdout(EventFormatter::new(opts.meta, opts.format));
42    let tap_runner = TapRunner::new(
43        &subscription_url,
44        opts.inputs_of.clone(),
45        opts.outputs_patterns().clone(),
46        &output_channel,
47        opts.format,
48    );
49
50    loop {
51        tokio::select! {
52            biased;
53            Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break,
54            exec_result = tap_runner.run_tap(
55                opts.interval as i64,
56                opts.limit as i64,
57                opts.duration_ms,
58                opts.quiet,
59            ) => {
60                match exec_result {
61                    Ok(_) => {
62                        break;
63                    }
64                    Err(tap_executor_error) => {
65                        if !opts.no_reconnect {
66                            #[allow(clippy::print_stderr)]
67                            {
68                                eprintln!(
69                                    "[tap] Connection failed with error {:?}. Reconnecting in {:?} seconds.",
70                                    tap_executor_error,
71                                    RECONNECT_DELAY / 1000);
72                            }
73                            tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
74                        }
75                        else {
76                            break;
77                        }
78                    }
79                }
80            }
81        }
82    }
83
84    exitcode::OK
85}