1use std::time::Duration;
2
3use chrono::Local;
4use futures_util::future::join_all;
5use tokio::sync::{mpsc, oneshot};
6use vector_lib::api_client::{Client, connect_subscription_client};
7
8use vector_lib::top::{
9 dashboard::{init_dashboard, is_tty},
10 metrics,
11 state::{self, ConnectionStatus, EventType},
12};
13
14const RECONNECT_DELAY: u64 = 5000;
16
17pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
20 if !is_tty() {
22 #[allow(clippy::print_stderr)]
23 {
24 eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
25 }
26 return exitcode::IOERR;
27 }
28
29 let url = opts.url();
30 let client = Client::new(url.clone());
32 #[allow(clippy::print_stderr)]
33 if client.healthcheck().await.is_err() {
34 eprintln!(
35 indoc::indoc! {"
36 Vector API server isn't reachable ({}).
37
38 Have you enabled the API?
39
40 To enable the API, add the following to your Vector config file:
41
42 [api]
43 enabled = true"},
44 url
45 );
46 return exitcode::UNAVAILABLE;
47 }
48
49 top(opts, client, "Vector").await
50}
51
52pub async fn top(opts: &super::Opts, client: Client, dashboard_title: &str) -> exitcode::ExitCode {
54 let (tx, rx) = tokio::sync::mpsc::channel(20);
56 let state_rx = state::updater(rx).await;
57 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
59
60 let connection = tokio::spawn(subscription(opts.clone(), client, tx, shutdown_tx));
61
62 match init_dashboard(
64 dashboard_title,
65 opts.url().as_str(),
66 opts.interval,
67 opts.human_metrics,
68 state_rx,
69 shutdown_rx,
70 )
71 .await
72 {
73 Ok(_) => {
74 connection.abort();
75 exitcode::OK
76 }
77 Err(err) => {
78 #[allow(clippy::print_stderr)]
79 {
80 eprintln!("[top] Encountered shutdown error: {err}");
81 }
82 connection.abort();
83 exitcode::IOERR
84 }
85 }
86}
87
88async fn subscription(
91 opts: super::Opts,
92 client: Client,
93 tx: mpsc::Sender<EventType>,
94 shutdown_tx: oneshot::Sender<()>,
95) {
96 let ws_url = opts.web_socket_url();
97
98 loop {
99 let state = match metrics::init_components(&client, &opts.components).await {
103 Ok(state) => state,
104 Err(_) => {
105 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
106 continue;
107 }
108 };
109 _ = tx.send(EventType::InitializeState(state)).await;
110
111 let subscription_client = match connect_subscription_client(ws_url.clone()).await {
112 Ok(c) => c,
113 Err(_) => {
114 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
115 continue;
116 }
117 };
118
119 let finished = metrics::subscribe(
121 subscription_client,
122 tx.clone(),
123 opts.interval as i64,
124 opts.components.clone(),
125 );
126
127 _ = tx
128 .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
129 Local::now(),
130 )))
131 .await;
132 _ = join_all(finished).await;
137 _ = tx
138 .send(EventType::ConnectionUpdated(
139 ConnectionStatus::Disconnected(RECONNECT_DELAY),
140 ))
141 .await;
142 if opts.no_reconnect {
143 _ = shutdown_tx.send(());
144 break;
145 }
146 }
147}