vector/top/
cmd.rs

1use std::time::Duration;
2
3use chrono::Local;
4use futures_util::future::join_all;
5use tokio::sync::{mpsc, oneshot};
6use vector_lib::api_client::{Client, connect_subscription_client};
7
8use vector_lib::top::{
9    dashboard::{init_dashboard, is_tty},
10    metrics,
11    state::{self, ConnectionStatus, EventType},
12};
13
14/// Delay (in milliseconds) before attempting to reconnect to the Vector API
15const RECONNECT_DELAY: u64 = 5000;
16
17/// CLI command func for displaying Vector components, and communicating with a local/remote
18/// Vector API server via HTTP/WebSockets
19pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
20    // Exit early if the terminal is not a teletype
21    if !is_tty() {
22        #[allow(clippy::print_stderr)]
23        {
24            eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
25        }
26        return exitcode::IOERR;
27    }
28
29    let url = opts.url();
30    // Create a new API client for connecting to the local/remote Vector instance.
31    let client = Client::new(url.clone());
32    #[allow(clippy::print_stderr)]
33    if client.healthcheck().await.is_err() {
34        eprintln!(
35            indoc::indoc! {"
36            Vector API server isn't reachable ({}).
37
38            Have you enabled the API?
39
40            To enable the API, add the following to your Vector config file:
41
42            [api]
43                enabled = true"},
44            url
45        );
46        return exitcode::UNAVAILABLE;
47    }
48
49    top(opts, client, "Vector").await
50}
51
52/// General monitoring
53pub async fn top(opts: &super::Opts, client: Client, dashboard_title: &str) -> exitcode::ExitCode {
54    // Channel for updating state via event messages
55    let (tx, rx) = tokio::sync::mpsc::channel(20);
56    let state_rx = state::updater(rx).await;
57    // Channel for shutdown signal
58    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
59
60    let connection = tokio::spawn(subscription(opts.clone(), client, tx, shutdown_tx));
61
62    // Initialize the dashboard
63    match init_dashboard(
64        dashboard_title,
65        opts.url().as_str(),
66        opts.interval,
67        opts.human_metrics,
68        state_rx,
69        shutdown_rx,
70    )
71    .await
72    {
73        Ok(_) => {
74            connection.abort();
75            exitcode::OK
76        }
77        Err(err) => {
78            #[allow(clippy::print_stderr)]
79            {
80                eprintln!("[top] Encountered shutdown error: {err}");
81            }
82            connection.abort();
83            exitcode::IOERR
84        }
85    }
86}
87
88// This task handles reconnecting the subscription client and all
89// subscriptions in the case of a web socket disconnect
90async fn subscription(
91    opts: super::Opts,
92    client: Client,
93    tx: mpsc::Sender<EventType>,
94    shutdown_tx: oneshot::Sender<()>,
95) {
96    let ws_url = opts.web_socket_url();
97
98    loop {
99        // Initialize state. On future reconnects, we re-initialize state in
100        // order to accurately capture added, removed, and edited
101        // components.
102        let state = match metrics::init_components(&client, &opts.components).await {
103            Ok(state) => state,
104            Err(_) => {
105                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
106                continue;
107            }
108        };
109        _ = tx.send(EventType::InitializeState(state)).await;
110
111        let subscription_client = match connect_subscription_client(ws_url.clone()).await {
112            Ok(c) => c,
113            Err(_) => {
114                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
115                continue;
116            }
117        };
118
119        // Subscribe to updated metrics
120        let finished = metrics::subscribe(
121            subscription_client,
122            tx.clone(),
123            opts.interval as i64,
124            opts.components.clone(),
125        );
126
127        _ = tx
128            .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
129                Local::now(),
130            )))
131            .await;
132        // Tasks spawned in metrics::subscribe finish when the subscription
133        // streams have completed. Currently, subscription streams only
134        // complete when the underlying web socket connection to the GraphQL
135        // server drops.
136        _ = join_all(finished).await;
137        _ = tx
138            .send(EventType::ConnectionUpdated(
139                ConnectionStatus::Disconnected(RECONNECT_DELAY),
140            ))
141            .await;
142        if opts.no_reconnect {
143            _ = shutdown_tx.send(());
144            break;
145        }
146    }
147}