vector/tap/
cmd.rs

1use std::time::Duration;
2
3use vector_lib::{
4    api_client::Client,
5    tap::{EventFormatter, OutputChannel, TapRunner},
6};
7
8use crate::signal::{SignalRx, SignalTo};
9
10/// Delay (in milliseconds) before attempting to reconnect to the Vector API
11const RECONNECT_DELAY: u64 = 5000;
12
13/// CLI command func for issuing 'tap' queries, and communicating with a local/remote
14/// Vector API server via HTTP/WebSockets.
15pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
16    let url = opts.url();
17    // Return early with instructions for enabling the API if the endpoint isn't reachable
18    // via a healthcheck.
19    let client = Client::new(url.clone());
20    #[allow(clippy::print_stderr)]
21    if client.healthcheck().await.is_err() {
22        eprintln!(
23            indoc::indoc! {"
24            Vector API server isn't reachable ({}).
25
26            Have you enabled the API?
27
28            To enable the API, add the following to your Vector config file:
29
30            [api]
31                enabled = true"},
32            url
33        );
34        return exitcode::UNAVAILABLE;
35    }
36
37    tap(opts, signal_rx).await
38}
39
40/// Observe event flow from specified components
41pub async fn tap(opts: &super::Opts, mut signal_rx: SignalRx) -> exitcode::ExitCode {
42    let subscription_url = opts.web_socket_url();
43    let output_channel = OutputChannel::Stdout(EventFormatter::new(opts.meta, opts.format));
44    let tap_runner = TapRunner::new(
45        &subscription_url,
46        opts.inputs_of.clone(),
47        opts.outputs_patterns().clone(),
48        &output_channel,
49        opts.format,
50    );
51
52    loop {
53        tokio::select! {
54            biased;
55            Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break,
56            exec_result = tap_runner.run_tap(
57                opts.interval as i64,
58                opts.limit as i64,
59                opts.duration_ms,
60                opts.quiet,
61            ) => {
62                match exec_result {
63                    Ok(_) => {
64                        break;
65                    }
66                    Err(tap_executor_error) => {
67                        if !opts.no_reconnect {
68                            #[allow(clippy::print_stderr)]
69                            {
70                                eprintln!(
71                                    "[tap] Connection failed with error {:?}. Reconnecting in {:?} seconds.",
72                                    tap_executor_error,
73                                    RECONNECT_DELAY / 1000);
74                            }
75                            tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
76                        }
77                        else {
78                            break;
79                        }
80                    }
81                }
82            }
83        }
84    }
85
86    exitcode::OK
87}