vector/top/
state.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    time::Duration,
4};
5
6use chrono::{DateTime, Local};
7use ratatui::{
8    style::{Color, Style},
9    text::Span,
10};
11use tokio::sync::mpsc;
12use vector_lib::internal_event::DEFAULT_OUTPUT;
13
14use crate::config::ComponentKey;
15
16type IdentifiedMetric = (ComponentKey, i64);
17
18#[derive(Debug)]
19pub struct SentEventsMetric {
20    pub key: ComponentKey,
21    pub total: i64,
22    pub outputs: HashMap<String, i64>,
23}
24
25#[derive(Debug)]
26pub enum EventType {
27    InitializeState(State),
28    UptimeChanged(f64),
29    ReceivedBytesTotals(Vec<IdentifiedMetric>),
30    /// Interval + identified metric
31    ReceivedBytesThroughputs(i64, Vec<IdentifiedMetric>),
32    ReceivedEventsTotals(Vec<IdentifiedMetric>),
33    /// Interval in ms + identified metric
34    ReceivedEventsThroughputs(i64, Vec<IdentifiedMetric>),
35    SentBytesTotals(Vec<IdentifiedMetric>),
36    /// Interval + identified metric
37    SentBytesThroughputs(i64, Vec<IdentifiedMetric>),
38    // Identified overall metric + output-specific metrics
39    SentEventsTotals(Vec<SentEventsMetric>),
40    /// Interval in ms + identified overall metric + output-specific metrics
41    SentEventsThroughputs(i64, Vec<SentEventsMetric>),
42    ErrorsTotals(Vec<IdentifiedMetric>),
43    #[cfg(feature = "allocation-tracing")]
44    AllocatedBytes(Vec<IdentifiedMetric>),
45    ComponentAdded(ComponentRow),
46    ComponentRemoved(ComponentKey),
47    ConnectionUpdated(ConnectionStatus),
48}
49
50#[derive(Debug, Copy, Clone)]
51pub enum ConnectionStatus {
52    // Initial state
53    Pending,
54    // Underlying web socket connection has dropped. Includes the delay between
55    // reconnect attempts
56    Disconnected(u64),
57    // Connection is working
58    Connected(DateTime<Local>),
59}
60
61impl ConnectionStatus {
62    pub fn as_ui_spans(&self) -> Vec<Span> {
63        match self {
64            Self::Pending => vec![Span::styled(
65                "Connecting...",
66                Style::default().fg(Color::Yellow),
67            )],
68            Self::Disconnected(delay) => vec![
69                Span::styled("Disconnected", Style::default().fg(Color::Red)),
70                Span::from(format!(" (reconnecting every {} seconds)", delay / 1000)),
71            ],
72            Self::Connected(since) => vec![
73                Span::styled("Connected", Style::default().fg(Color::Green)),
74                Span::from(format!(" (since {})", since.format("%F %r %Z"))),
75            ],
76        }
77    }
78}
79
80#[derive(Debug, Clone)]
81pub struct State {
82    pub connection_status: ConnectionStatus,
83    pub uptime: Duration,
84    pub components: BTreeMap<ComponentKey, ComponentRow>,
85}
86
87impl State {
88    pub const fn new(components: BTreeMap<ComponentKey, ComponentRow>) -> Self {
89        Self {
90            connection_status: ConnectionStatus::Pending,
91            uptime: Duration::from_secs(0),
92            components,
93        }
94    }
95}
96pub type EventTx = mpsc::Sender<EventType>;
97pub type EventRx = mpsc::Receiver<EventType>;
98pub type StateRx = mpsc::Receiver<State>;
99
100#[derive(Debug, Clone, Default)]
101pub struct OutputMetrics {
102    pub sent_events_total: i64,
103    pub sent_events_throughput_sec: i64,
104}
105
106impl From<i64> for OutputMetrics {
107    fn from(sent_events_total: i64) -> Self {
108        Self {
109            sent_events_total,
110            sent_events_throughput_sec: 0,
111        }
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct ComponentRow {
117    pub key: ComponentKey,
118    pub kind: String,
119    pub component_type: String,
120    pub outputs: HashMap<String, OutputMetrics>,
121    pub received_bytes_total: i64,
122    pub received_bytes_throughput_sec: i64,
123    pub received_events_total: i64,
124    pub received_events_throughput_sec: i64,
125    pub sent_bytes_total: i64,
126    pub sent_bytes_throughput_sec: i64,
127    pub sent_events_total: i64,
128    pub sent_events_throughput_sec: i64,
129    #[cfg(feature = "allocation-tracing")]
130    pub allocated_bytes: i64,
131    pub errors: i64,
132}
133
134impl ComponentRow {
135    /// Note, we ignore `outputs` if it only contains [`DEFAULT_OUTPUT`] to avoid
136    /// redundancy with information shown in the overall component row
137    pub fn has_displayable_outputs(&self) -> bool {
138        self.outputs.len() > 1
139            || (self.outputs.len() == 1 && !self.outputs.contains_key(DEFAULT_OUTPUT))
140    }
141}
142
143/// Takes the receiver `EventRx` channel, and returns a `StateRx` state receiver. This
144/// represents the single destination for handling subscriptions and returning 'immutable' state
145/// for re-rendering the dashboard. This approach uses channels vs. mutexes.
146pub async fn updater(mut event_rx: EventRx) -> StateRx {
147    let (tx, rx) = mpsc::channel(20);
148
149    let mut state = State::new(BTreeMap::new());
150    tokio::spawn(async move {
151        while let Some(event_type) = event_rx.recv().await {
152            match event_type {
153                EventType::InitializeState(new_state) => {
154                    state = new_state;
155                }
156                EventType::ReceivedBytesTotals(rows) => {
157                    for (key, v) in rows {
158                        if let Some(r) = state.components.get_mut(&key) {
159                            r.received_bytes_total = v;
160                        }
161                    }
162                }
163                EventType::ReceivedBytesThroughputs(interval, rows) => {
164                    for (key, v) in rows {
165                        if let Some(r) = state.components.get_mut(&key) {
166                            r.received_bytes_throughput_sec =
167                                (v as f64 * (1000.0 / interval as f64)) as i64;
168                        }
169                    }
170                }
171                EventType::ReceivedEventsTotals(rows) => {
172                    for (key, v) in rows {
173                        if let Some(r) = state.components.get_mut(&key) {
174                            r.received_events_total = v;
175                        }
176                    }
177                }
178                EventType::ReceivedEventsThroughputs(interval, rows) => {
179                    for (key, v) in rows {
180                        if let Some(r) = state.components.get_mut(&key) {
181                            r.received_events_throughput_sec =
182                                (v as f64 * (1000.0 / interval as f64)) as i64;
183                        }
184                    }
185                }
186                EventType::SentBytesTotals(rows) => {
187                    for (key, v) in rows {
188                        if let Some(r) = state.components.get_mut(&key) {
189                            r.sent_bytes_total = v;
190                        }
191                    }
192                }
193                EventType::SentBytesThroughputs(interval, rows) => {
194                    for (key, v) in rows {
195                        if let Some(r) = state.components.get_mut(&key) {
196                            r.sent_bytes_throughput_sec =
197                                (v as f64 * (1000.0 / interval as f64)) as i64;
198                        }
199                    }
200                }
201                EventType::SentEventsTotals(rows) => {
202                    for m in rows {
203                        if let Some(r) = state.components.get_mut(&m.key) {
204                            r.sent_events_total = m.total;
205                            for (id, v) in m.outputs {
206                                r.outputs
207                                    .entry(id)
208                                    .or_insert_with(OutputMetrics::default)
209                                    .sent_events_total = v;
210                            }
211                        }
212                    }
213                }
214                EventType::SentEventsThroughputs(interval, rows) => {
215                    for m in rows {
216                        if let Some(r) = state.components.get_mut(&m.key) {
217                            r.sent_events_throughput_sec =
218                                (m.total as f64 * (1000.0 / interval as f64)) as i64;
219                            for (id, v) in m.outputs {
220                                let throughput = (v as f64 * (1000.0 / interval as f64)) as i64;
221                                r.outputs
222                                    .entry(id)
223                                    .or_insert_with(OutputMetrics::default)
224                                    .sent_events_throughput_sec = throughput;
225                            }
226                        }
227                    }
228                }
229                EventType::ErrorsTotals(rows) => {
230                    for (key, v) in rows {
231                        if let Some(r) = state.components.get_mut(&key) {
232                            r.errors = v;
233                        }
234                    }
235                }
236                #[cfg(feature = "allocation-tracing")]
237                EventType::AllocatedBytes(rows) => {
238                    for (key, v) in rows {
239                        if let Some(r) = state.components.get_mut(&key) {
240                            r.allocated_bytes = v;
241                        }
242                    }
243                }
244                EventType::ComponentAdded(c) => {
245                    _ = state.components.insert(c.key.clone(), c);
246                }
247                EventType::ComponentRemoved(key) => {
248                    _ = state.components.remove(&key);
249                }
250                EventType::ConnectionUpdated(status) => {
251                    state.connection_status = status;
252                }
253                EventType::UptimeChanged(uptime) => {
254                    state.uptime = Duration::from_secs_f64(uptime);
255                }
256            }
257
258            // Send updated map to listeners
259            _ = tx.send(state.clone()).await;
260        }
261    });
262
263    rx
264}