vector/tap/
cmd.rs

1use std::time::Duration;
2
3use vector_lib::{
4    api_client::{Client, RECONNECT_DELAY_MS},
5    tap::{EventFormatter, OutputChannel, TapRunner},
6};
7
8use crate::signal::{SignalRx, SignalTo};
9
10/// CLI command func for issuing 'tap' queries, and communicating with a local/remote
11/// Vector API server via HTTP/WebSockets.
12#[allow(clippy::print_stderr)]
13pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
14    let url = opts.url();
15    let Ok(uri) = url.as_str().parse() else {
16        eprintln!("Invalid API URL: {url}");
17        return exitcode::USAGE;
18    };
19    let mut client = Client::new(uri);
20
21    if client.connect().await.is_err() || client.health().await.is_err() {
22        eprintln!(
23            indoc::indoc! {"
24            Vector API server isn't reachable ({}).
25
26            Have you enabled the API?
27
28            To enable the API, add the following to your Vector config file:
29
30            [api]
31                enabled = true"},
32            url
33        );
34        return exitcode::UNAVAILABLE;
35    }
36
37    tap_internal(opts, signal_rx, Some(client)).await
38}
39
40/// Observe event flow from specified components
41pub async fn tap(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
42    tap_internal(opts, signal_rx, None).await
43}
44
45async fn tap_internal(
46    opts: &super::Opts,
47    mut signal_rx: SignalRx,
48    mut client_opt: Option<Client>,
49) -> exitcode::ExitCode {
50    let url = opts.url();
51    let output_channel = OutputChannel::Stdout(EventFormatter::new(opts.meta, opts.format));
52    let tap_runner = TapRunner::new(
53        &url,
54        opts.inputs_of.clone(),
55        opts.outputs_patterns().clone(),
56        &output_channel,
57    );
58
59    loop {
60        tokio::select! {
61            biased;
62            Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break,
63            exec_result = async {
64                if let Some(client) = client_opt.take() {
65                    tap_runner.run_tap_with_client(
66                        client,
67                        opts.interval as i64,
68                        opts.limit as i64,
69                        opts.duration_ms,
70                        opts.quiet,
71                    ).await
72                } else {
73                    tap_runner.run_tap(
74                        opts.interval as i64,
75                        opts.limit as i64,
76                        opts.duration_ms,
77                        opts.quiet,
78                    ).await
79                }
80            } => {
81                match exec_result {
82                    Ok(_) => {
83                        break;
84                    }
85                    Err(tap_executor_error) => {
86                        #[allow(clippy::print_stderr)]
87                        if tap_executor_error.is_fatal() {
88                            eprintln!("[tap] Error: {tap_executor_error:?}");
89                            break;
90                        } else if !opts.no_reconnect {
91                            eprintln!(
92                                "[tap] Connection failed with error {:?}. Reconnecting in {:?} seconds.",
93                                tap_executor_error,
94                                RECONNECT_DELAY_MS / 1000);
95                            tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
96                        } else {
97                            break;
98                        }
99                    }
100                }
101            }
102        }
103    }
104
105    exitcode::OK
106}