1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use chrono::Local;
5use futures_util::future::join_all;
6use regex::Regex;
7use tokio::sync::{mpsc, oneshot};
8use vector_lib::api_client::{Client, connect_subscription_client};
9
10use vector_lib::top::{
11 dashboard::{init_dashboard, is_tty},
12 metrics,
13 state::{self, ConnectionStatus, EventType, State},
14};
15
16const RECONNECT_DELAY: u64 = 5000;
18
19pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
22 if !is_tty() {
24 #[allow(clippy::print_stderr)]
25 {
26 eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
27 }
28 return exitcode::IOERR;
29 }
30
31 let url = opts.url();
32 let client = Client::new(url.clone());
34 #[allow(clippy::print_stderr)]
35 if client.healthcheck().await.is_err() {
36 eprintln!(
37 indoc::indoc! {"
38 Vector API server isn't reachable ({}).
39
40 Have you enabled the API?
41
42 To enable the API, add the following to your Vector config file:
43
44 [api]
45 enabled = true"},
46 url
47 );
48 return exitcode::UNAVAILABLE;
49 }
50
51 top(opts, client, "Vector").await
52}
53
54pub async fn top(opts: &super::Opts, client: Client, dashboard_title: &str) -> exitcode::ExitCode {
56 let (tx, rx) = tokio::sync::mpsc::channel(20);
58 let mut starting_state = State::new(BTreeMap::new());
59 starting_state.sort_state.column = opts.sort_field;
60 starting_state.sort_state.reverse = opts.sort_desc;
61 starting_state.filter_state.column = opts.filter_field;
62 starting_state.filter_state.pattern = opts
63 .filter_value
64 .as_deref()
65 .map(Regex::new)
66 .and_then(Result::ok);
67 let state_rx = state::updater(rx, starting_state).await;
68 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71 let connection = tokio::spawn(subscription(opts.clone(), client, tx.clone(), shutdown_tx));
72
73 match init_dashboard(
75 dashboard_title,
76 opts.url().as_str(),
77 opts.interval,
78 opts.human_metrics,
79 tx,
80 state_rx,
81 shutdown_rx,
82 )
83 .await
84 {
85 Ok(_) => {
86 connection.abort();
87 exitcode::OK
88 }
89 Err(err) => {
90 #[allow(clippy::print_stderr)]
91 {
92 eprintln!("[top] Encountered shutdown error: {err}");
93 }
94 connection.abort();
95 exitcode::IOERR
96 }
97 }
98}
99
100async fn subscription(
103 opts: super::Opts,
104 client: Client,
105 tx: mpsc::Sender<EventType>,
106 shutdown_tx: oneshot::Sender<()>,
107) {
108 let ws_url = opts.web_socket_url();
109
110 loop {
111 let state = match metrics::init_components(&client, &opts.components).await {
115 Ok(state) => state,
116 Err(_) => {
117 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
118 continue;
119 }
120 };
121 _ = tx.send(EventType::InitializeState(state)).await;
122
123 let subscription_client = match connect_subscription_client(ws_url.clone()).await {
124 Ok(c) => c,
125 Err(_) => {
126 tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
127 continue;
128 }
129 };
130
131 let finished = metrics::subscribe(
133 subscription_client,
134 tx.clone(),
135 opts.interval as i64,
136 opts.components.clone(),
137 );
138
139 _ = tx
140 .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
141 Local::now(),
142 )))
143 .await;
144 _ = join_all(finished).await;
149 _ = tx
150 .send(EventType::ConnectionUpdated(
151 ConnectionStatus::Disconnected(RECONNECT_DELAY),
152 ))
153 .await;
154 if opts.no_reconnect {
155 _ = shutdown_tx.send(());
156 break;
157 }
158 }
159}