1use std::time::Duration;
2
3use chrono::Local;
4use futures_util::future::join_all;
5use tokio::sync::{mpsc, oneshot};
6use vector_lib::api_client::{connect_subscription_client, Client};
7
8use super::{
9 dashboard::{init_dashboard, is_tty},
10 metrics,
11 state::{self, ConnectionStatus, EventType},
12};
13
14const RECONNECT_DELAY: u64 = 5000;
16
17pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
20 if !is_tty() {
22 #[allow(clippy::print_stderr)]
23 {
24 eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
25 }
26 return exitcode::IOERR;
27 }
28
29 let url = opts.url();
30 let client = Client::new(url.clone());
32 #[allow(clippy::print_stderr)]
33 if client.healthcheck().await.is_err() {
34 eprintln!(
35 indoc::indoc! {"
36 Vector API server isn't reachable ({}).
37
38 Have you enabled the API?
39
40 To enable the API, add the following to your Vector config file:
41
42 [api]
43 enabled = true"},
44 url
45 );
46 return exitcode::UNAVAILABLE;
47 }
48
49 top(opts, client, "Vector").await
50}
51
52pub async fn top(opts: &super::Opts, client: Client, dashboard_title: &str) -> exitcode::ExitCode {
54 let (tx, rx) = tokio::sync::mpsc::channel(20);
56 let state_rx = state::updater(rx).await;
57 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
59
60 let connection = tokio::spawn(subscription(opts.clone(), client, tx, shutdown_tx));
61
62 match init_dashboard(
64 dashboard_title,
65 opts.url().as_str(),
66 opts,
67 state_rx,
68 shutdown_rx,
69 )
70 .await
71 {
72 Ok(_) => {
73 connection.abort();
74 exitcode::OK
75 }
76 Err(err) => {
77 #[allow(clippy::print_stderr)]
78 {
79 eprintln!("[top] Encountered shutdown error: {err}");
80 }
81 connection.abort();
82 exitcode::IOERR
83 }
84 }
85}
86
87async fn subscription(
90 opts: super::Opts,
91 client: Client,
92 tx: mpsc::Sender<EventType>,
93 shutdown_tx: oneshot::Sender<()>,
94) {
95 let ws_url = opts.web_socket_url();
96
97 loop {
98 let state = match metrics::init_components(&client, &opts.components).await {
102 Ok(state) => state,
103 Err(_) => {
104 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
105 continue;
106 }
107 };
108 _ = tx.send(EventType::InitializeState(state)).await;
109
110 let subscription_client = match connect_subscription_client(ws_url.clone()).await {
111 Ok(c) => c,
112 Err(_) => {
113 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
114 continue;
115 }
116 };
117
118 let finished = metrics::subscribe(
120 subscription_client,
121 tx.clone(),
122 opts.interval as i64,
123 opts.components.clone(),
124 );
125
126 _ = tx
127 .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
128 Local::now(),
129 )))
130 .await;
131 _ = join_all(finished).await;
136 _ = tx
137 .send(EventType::ConnectionUpdated(
138 ConnectionStatus::Disconnected(RECONNECT_DELAY),
139 ))
140 .await;
141 if opts.no_reconnect {
142 _ = shutdown_tx.send(());
143 break;
144 }
145 }
146}