1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
use std::collections::{BTreeMap, HashMap};

use chrono::{DateTime, Local};
use ratatui::{
    style::{Color, Style},
    text::Span,
};
use tokio::sync::mpsc;
use vector_lib::internal_event::DEFAULT_OUTPUT;

use crate::config::ComponentKey;

type IdentifiedMetric = (ComponentKey, i64);

#[derive(Debug)]
pub struct SentEventsMetric {
    pub key: ComponentKey,
    pub total: i64,
    pub outputs: HashMap<String, i64>,
}

#[derive(Debug)]
pub enum EventType {
    InitializeState(State),
    ReceivedBytesTotals(Vec<IdentifiedMetric>),
    /// Interval + identified metric
    ReceivedBytesThroughputs(i64, Vec<IdentifiedMetric>),
    ReceivedEventsTotals(Vec<IdentifiedMetric>),
    /// Interval in ms + identified metric
    ReceivedEventsThroughputs(i64, Vec<IdentifiedMetric>),
    SentBytesTotals(Vec<IdentifiedMetric>),
    /// Interval + identified metric
    SentBytesThroughputs(i64, Vec<IdentifiedMetric>),
    // Identified overall metric + output-specific metrics
    SentEventsTotals(Vec<SentEventsMetric>),
    /// Interval in ms + identified overall metric + output-specific metrics
    SentEventsThroughputs(i64, Vec<SentEventsMetric>),
    ErrorsTotals(Vec<IdentifiedMetric>),
    #[cfg(feature = "allocation-tracing")]
    AllocatedBytes(Vec<IdentifiedMetric>),
    ComponentAdded(ComponentRow),
    ComponentRemoved(ComponentKey),
    ConnectionUpdated(ConnectionStatus),
}

#[derive(Debug, Copy, Clone)]
pub enum ConnectionStatus {
    // Initial state
    Pending,
    // Underlying web socket connection has dropped. Includes the delay between
    // reconnect attempts
    Disconnected(u64),
    // Connection is working
    Connected(DateTime<Local>),
}

impl ConnectionStatus {
    pub fn as_ui_spans(&self) -> Vec<Span> {
        match self {
            Self::Pending => vec![Span::styled(
                "Connecting...",
                Style::default().fg(Color::Yellow),
            )],
            Self::Disconnected(delay) => vec![
                Span::styled("Disconnected", Style::default().fg(Color::Red)),
                Span::from(format!(" (reconnecting every {} seconds)", delay / 1000)),
            ],
            Self::Connected(since) => vec![
                Span::styled("Connected", Style::default().fg(Color::Green)),
                Span::from(format!(" (since {})", since.format("%F %r %Z"))),
            ],
        }
    }
}

#[derive(Debug, Clone)]
pub struct State {
    pub connection_status: ConnectionStatus,
    pub components: BTreeMap<ComponentKey, ComponentRow>,
}

impl State {
    pub const fn new(components: BTreeMap<ComponentKey, ComponentRow>) -> Self {
        Self {
            connection_status: ConnectionStatus::Pending,
            components,
        }
    }
}
pub type EventTx = mpsc::Sender<EventType>;
pub type EventRx = mpsc::Receiver<EventType>;
pub type StateRx = mpsc::Receiver<State>;

#[derive(Debug, Clone, Default)]
pub struct OutputMetrics {
    pub sent_events_total: i64,
    pub sent_events_throughput_sec: i64,
}

impl From<i64> for OutputMetrics {
    fn from(sent_events_total: i64) -> Self {
        Self {
            sent_events_total,
            sent_events_throughput_sec: 0,
        }
    }
}

#[derive(Debug, Clone)]
pub struct ComponentRow {
    pub key: ComponentKey,
    pub kind: String,
    pub component_type: String,
    pub outputs: HashMap<String, OutputMetrics>,
    pub received_bytes_total: i64,
    pub received_bytes_throughput_sec: i64,
    pub received_events_total: i64,
    pub received_events_throughput_sec: i64,
    pub sent_bytes_total: i64,
    pub sent_bytes_throughput_sec: i64,
    pub sent_events_total: i64,
    pub sent_events_throughput_sec: i64,
    #[cfg(feature = "allocation-tracing")]
    pub allocated_bytes: i64,
    pub errors: i64,
}

impl ComponentRow {
    /// Note, we ignore `outputs` if it only contains [`DEFAULT_OUTPUT`] to avoid
    /// redundancy with information shown in the overall component row
    pub fn has_displayable_outputs(&self) -> bool {
        self.outputs.len() > 1
            || (self.outputs.len() == 1 && !self.outputs.contains_key(DEFAULT_OUTPUT))
    }
}

/// Takes the receiver `EventRx` channel, and returns a `StateRx` state receiver. This
/// represents the single destination for handling subscriptions and returning 'immutable' state
/// for re-rendering the dashboard. This approach uses channels vs. mutexes.
pub async fn updater(mut event_rx: EventRx) -> StateRx {
    let (tx, rx) = mpsc::channel(20);

    let mut state = State::new(BTreeMap::new());
    tokio::spawn(async move {
        while let Some(event_type) = event_rx.recv().await {
            match event_type {
                EventType::InitializeState(new_state) => {
                    state = new_state;
                }
                EventType::ReceivedBytesTotals(rows) => {
                    for (key, v) in rows {
                        if let Some(r) = state.components.get_mut(&key) {
                            r.received_bytes_total = v;
                        }
                    }
                }
                EventType::ReceivedBytesThroughputs(interval, rows) => {
                    for (key, v) in rows {
                        if let Some(r) = state.components.get_mut(&key) {
                            r.received_bytes_throughput_sec =
                                (v as f64 * (1000.0 / interval as f64)) as i64;
                        }
                    }
                }
                EventType::ReceivedEventsTotals(rows) => {
                    for (key, v) in rows {
                        if let Some(r) = state.components.get_mut(&key) {
                            r.received_events_total = v;
                        }
                    }
                }
                EventType::ReceivedEventsThroughputs(interval, rows) => {
                    for (key, v) in rows {
                        if let Some(r) = state.components.get_mut(&key) {
                            r.received_events_throughput_sec =
                                (v as f64 * (1000.0 / interval as f64)) as i64;
                        }
                    }
                }
                EventType::SentBytesTotals(rows) => {
                    for (key, v) in rows {
                        if let Some(r) = state.components.get_mut(&key) {
                            r.sent_bytes_total = v;
                        }
                    }
                }
                EventType::SentBytesThroughputs(interval, rows) => {
                    for (key, v) in rows {
                        if let Some(r) = state.components.get_mut(&key) {
                            r.sent_bytes_throughput_sec =
                                (v as f64 * (1000.0 / interval as f64)) as i64;
                        }
                    }
                }
                EventType::SentEventsTotals(rows) => {
                    for m in rows {
                        if let Some(r) = state.components.get_mut(&m.key) {
                            r.sent_events_total = m.total;
                            for (id, v) in m.outputs {
                                r.outputs
                                    .entry(id)
                                    .or_insert_with(OutputMetrics::default)
                                    .sent_events_total = v;
                            }
                        }
                    }
                }
                EventType::SentEventsThroughputs(interval, rows) => {
                    for m in rows {
                        if let Some(r) = state.components.get_mut(&m.key) {
                            r.sent_events_throughput_sec =
                                (m.total as f64 * (1000.0 / interval as f64)) as i64;
                            for (id, v) in m.outputs {
                                let throughput = (v as f64 * (1000.0 / interval as f64)) as i64;
                                r.outputs
                                    .entry(id)
                                    .or_insert_with(OutputMetrics::default)
                                    .sent_events_throughput_sec = throughput;
                            }
                        }
                    }
                }
                EventType::ErrorsTotals(rows) => {
                    for (key, v) in rows {
                        if let Some(r) = state.components.get_mut(&key) {
                            r.errors = v;
                        }
                    }
                }
                #[cfg(feature = "allocation-tracing")]
                EventType::AllocatedBytes(rows) => {
                    for (key, v) in rows {
                        if let Some(r) = state.components.get_mut(&key) {
                            r.allocated_bytes = v;
                        }
                    }
                }
                EventType::ComponentAdded(c) => {
                    _ = state.components.insert(c.key.clone(), c);
                }
                EventType::ComponentRemoved(key) => {
                    _ = state.components.remove(&key);
                }
                EventType::ConnectionUpdated(status) => {
                    state.connection_status = status;
                }
            }

            // Send updated map to listeners
            _ = tx.send(state.clone()).await;
        }
    });

    rx
}