vector/top/
cmd.rs

1use std::collections::BTreeMap;
2use std::time::Duration;
3
4use chrono::Local;
5use futures_util::future::join_all;
6use regex::Regex;
7use tokio::sync::{mpsc, oneshot};
8use vector_lib::api_client::{Client, connect_subscription_client};
9
10use vector_lib::top::{
11    dashboard::{init_dashboard, is_tty},
12    metrics,
13    state::{self, ConnectionStatus, EventType, State},
14};
15
16/// Delay (in milliseconds) before attempting to reconnect to the Vector API
17const RECONNECT_DELAY: u64 = 5000;
18
19/// CLI command func for displaying Vector components, and communicating with a local/remote
20/// Vector API server via HTTP/WebSockets
21pub async fn cmd(opts: &super::Opts) -> exitcode::ExitCode {
22    // Exit early if the terminal is not a teletype
23    if !is_tty() {
24        #[allow(clippy::print_stderr)]
25        {
26            eprintln!("Terminal must be a teletype (TTY) to display a Vector dashboard.");
27        }
28        return exitcode::IOERR;
29    }
30
31    let url = opts.url();
32    // Create a new API client for connecting to the local/remote Vector instance.
33    let client = Client::new(url.clone());
34    #[allow(clippy::print_stderr)]
35    if client.healthcheck().await.is_err() {
36        eprintln!(
37            indoc::indoc! {"
38            Vector API server isn't reachable ({}).
39
40            Have you enabled the API?
41
42            To enable the API, add the following to your Vector config file:
43
44            [api]
45                enabled = true"},
46            url
47        );
48        return exitcode::UNAVAILABLE;
49    }
50
51    top(opts, client, "Vector").await
52}
53
54/// General monitoring
55pub async fn top(opts: &super::Opts, client: Client, dashboard_title: &str) -> exitcode::ExitCode {
56    // Channel for updating state via event messages
57    let (tx, rx) = tokio::sync::mpsc::channel(20);
58    let mut starting_state = State::new(BTreeMap::new());
59    starting_state.sort_state.column = opts.sort_field;
60    starting_state.sort_state.reverse = opts.sort_desc;
61    starting_state.filter_state.column = opts.filter_field;
62    starting_state.filter_state.pattern = opts
63        .filter_value
64        .as_deref()
65        .map(Regex::new)
66        .and_then(Result::ok);
67    let state_rx = state::updater(rx, starting_state).await;
68    // Channel for shutdown signal
69    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71    let connection = tokio::spawn(subscription(opts.clone(), client, tx.clone(), shutdown_tx));
72
73    // Initialize the dashboard
74    match init_dashboard(
75        dashboard_title,
76        opts.url().as_str(),
77        opts.interval,
78        opts.human_metrics,
79        tx,
80        state_rx,
81        shutdown_rx,
82    )
83    .await
84    {
85        Ok(_) => {
86            connection.abort();
87            exitcode::OK
88        }
89        Err(err) => {
90            #[allow(clippy::print_stderr)]
91            {
92                eprintln!("[top] Encountered shutdown error: {err}");
93            }
94            connection.abort();
95            exitcode::IOERR
96        }
97    }
98}
99
100// This task handles reconnecting the subscription client and all
101// subscriptions in the case of a web socket disconnect
102async fn subscription(
103    opts: super::Opts,
104    client: Client,
105    tx: mpsc::Sender<EventType>,
106    shutdown_tx: oneshot::Sender<()>,
107) {
108    let ws_url = opts.web_socket_url();
109
110    loop {
111        // Initialize state. On future reconnects, we re-initialize state in
112        // order to accurately capture added, removed, and edited
113        // components.
114        let state = match metrics::init_components(&client, &opts.components).await {
115            Ok(state) => state,
116            Err(_) => {
117                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
118                continue;
119            }
120        };
121        _ = tx.send(EventType::InitializeState(state)).await;
122
123        let subscription_client = match connect_subscription_client(ws_url.clone()).await {
124            Ok(c) => c,
125            Err(_) => {
126                tokio::time::sleep(Duration::from_millis(RECONNECT_DELAY)).await;
127                continue;
128            }
129        };
130
131        // Subscribe to updated metrics
132        let finished = metrics::subscribe(
133            subscription_client,
134            tx.clone(),
135            opts.interval as i64,
136            opts.components.clone(),
137        );
138
139        _ = tx
140            .send(EventType::ConnectionUpdated(ConnectionStatus::Connected(
141                Local::now(),
142            )))
143            .await;
144        // Tasks spawned in metrics::subscribe finish when the subscription
145        // streams have completed. Currently, subscription streams only
146        // complete when the underlying web socket connection to the GraphQL
147        // server drops.
148        _ = join_all(finished).await;
149        _ = tx
150            .send(EventType::ConnectionUpdated(
151                ConnectionStatus::Disconnected(RECONNECT_DELAY),
152            ))
153            .await;
154        if opts.no_reconnect {
155            _ = shutdown_tx.send(());
156            break;
157        }
158    }
159}