vector/top/
cmd.rs

1use std::time::Duration;
2
3use chrono::Local;
4use futures_util::future::join_all;
5use tokio::sync::{mpsc, oneshot};
6use vector_lib::api_client::{connect_subscription_client, Client};
7
8use super::{
9    dashboard::{init_dashboard, is_tty},
10    metrics,
11    state::{self, ConnectionStatus, EventType},
12};
13
14/// Delay (in milliseconds) before attempting to reconnect to the Vector API
15const RECONNECT_DELAY: u64 = 5000;
16
17/// CLI command func for displaying Vector components, and communicating with a local/remote
18/// Vector API server via HTTP/WebSockets
19pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
20    // Exit early if the terminal is not a teletype
21    if !is_tty() {
22        #[allow(clippy::print_stderr)]
23        {
24            eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
25        }
26        return exitcode::IOERR;
27    }
28
29    let url = opts.url();
30    // Create a new API client for connecting to the local/remote Vector instance.
31    let client = Client::new(url.clone());
32    #[allow(clippy::print_stderr)]
33    if client.healthcheck().await.is_err() {
34        eprintln!(
35            indoc::indoc! {"
36            Vector API server isn't reachable ({}).
37
38            Have you enabled the API?
39
40            To enable the API, add the following to your Vector config file:
41
42            [api]
43                enabled = true"},
44            url
45        );
46        return exitcode::UNAVAILABLE;
47    }
48
49    top(opts, client, "Vector").await
50}
51
52/// General monitoring
53pub async fn top(opts: &super::Opts, client: Client, dashboard_title: &str) -> exitcode::ExitCode {
54    // Channel for updating state via event messages
55    let (tx, rx) = tokio::sync::mpsc::channel(20);
56    let state_rx = state::updater(rx).await;
57    // Channel for shutdown signal
58    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
59
60    let connection = tokio::spawn(subscription(opts.clone(), client, tx, shutdown_tx));
61
62    // Initialize the dashboard
63    match init_dashboard(
64        dashboard_title,
65        opts.url().as_str(),
66        opts,
67        state_rx,
68        shutdown_rx,
69    )
70    .await
71    {
72        Ok(_) => {
73            connection.abort();
74            exitcode::OK
75        }
76        Err(err) => {
77            #[allow(clippy::print_stderr)]
78            {
79                eprintln!("[top] Encountered shutdown error: {err}");
80            }
81            connection.abort();
82            exitcode::IOERR
83        }
84    }
85}
86
87// This task handles reconnecting the subscription client and all
88// subscriptions in the case of a web socket disconnect
89async fn subscription(
90    opts: super::Opts,
91    client: Client,
92    tx: mpsc::Sender<EventType>,
93    shutdown_tx: oneshot::Sender<()>,
94) {
95    let ws_url = opts.web_socket_url();
96
97    loop {
98        // Initialize state. On future reconnects, we re-initialize state in
99        // order to accurately capture added, removed, and edited
100        // components.
101        let state = match metrics::init_components(&client, &opts.components).await {
102            Ok(state) => state,
103            Err(_) => {
104                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
105                continue;
106            }
107        };
108        _ = tx.send(EventType::InitializeState(state)).await;
109
110        let subscription_client = match connect_subscription_client(ws_url.clone()).await {
111            Ok(c) => c,
112            Err(_) => {
113                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
114                continue;
115            }
116        };
117
118        // Subscribe to updated metrics
119        let finished = metrics::subscribe(
120            subscription_client,
121            tx.clone(),
122            opts.interval as i64,
123            opts.components.clone(),
124        );
125
126        _ = tx
127            .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
128                Local::now(),
129            )))
130            .await;
131        // Tasks spawned in metrics::subscribe finish when the subscription
132        // streams have completed. Currently, subscription streams only
133        // complete when the underlying web socket connection to the GraphQL
134        // server drops.
135        _ = join_all(finished).await;
136        _ = tx
137            .send(EventType::ConnectionUpdated(
138                ConnectionStatus::Disconnected(RECONNECT_DELAY),
139            ))
140            .await;
141        if opts.no_reconnect {
142            _ = shutdown_tx.send(());
143            break;
144        }
145    }
146}