1use std::time::Duration;
2
3use vector_lib::{
4 api_client::{Client, RECONNECT_DELAY_MS},
5 tap::{EventFormatter, OutputChannel, TapRunner},
6};
7
8use crate::signal::{SignalRx, SignalTo};
9
10#[allow(clippy::print_stderr)]
13pub(crate) async fn cmd(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
14 let url = opts.url();
15 let Ok(uri) = url.as_str().parse() else {
16 eprintln!("Invalid API URL: {url}");
17 return exitcode::USAGE;
18 };
19 let mut client = Client::new(uri);
20
21 if client.connect().await.is_err() || client.health().await.is_err() {
22 eprintln!(
23 indoc::indoc! {"
24 Vector API server isn't reachable ({}).
25
26 Have you enabled the API?
27
28 To enable the API, add the following to your Vector config file:
29
30 [api]
31 enabled = true"},
32 url
33 );
34 return exitcode::UNAVAILABLE;
35 }
36
37 tap_internal(opts, signal_rx, Some(client)).await
38}
39
40pub async fn tap(opts: &super::Opts, signal_rx: SignalRx) -> exitcode::ExitCode {
42 tap_internal(opts, signal_rx, None).await
43}
44
45async fn tap_internal(
46 opts: &super::Opts,
47 mut signal_rx: SignalRx,
48 mut client_opt: Option<Client>,
49) -> exitcode::ExitCode {
50 let url = opts.url();
51 let output_channel = OutputChannel::Stdout(EventFormatter::new(opts.meta, opts.format));
52 let tap_runner = TapRunner::new(
53 &url,
54 opts.inputs_of.clone(),
55 opts.outputs_patterns().clone(),
56 &output_channel,
57 );
58
59 loop {
60 tokio::select! {
61 biased;
62 Ok(SignalTo::Shutdown(_) | SignalTo::Quit) = signal_rx.recv() => break,
63 exec_result = async {
64 if let Some(client) = client_opt.take() {
65 tap_runner.run_tap_with_client(
66 client,
67 opts.interval as i64,
68 opts.limit as i64,
69 opts.duration_ms,
70 opts.quiet,
71 ).await
72 } else {
73 tap_runner.run_tap(
74 opts.interval as i64,
75 opts.limit as i64,
76 opts.duration_ms,
77 opts.quiet,
78 ).await
79 }
80 } => {
81 match exec_result {
82 Ok(_) => {
83 break;
84 }
85 Err(tap_executor_error) => {
86 #[allow(clippy::print_stderr)]
87 if tap_executor_error.is_fatal() {
88 eprintln!("[tap] Error: {tap_executor_error:?}");
89 break;
90 } else if !opts.no_reconnect {
91 eprintln!(
92 "[tap] Connection failed with error {:?}. Reconnecting in {:?} seconds.",
93 tap_executor_error,
94 RECONNECT_DELAY_MS / 1000);
95 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY_MS)).await;
96 } else {
97 break;
98 }
99 }
100 }
101 }
102 }
103 }
104
105 exitcode::OK
106}