1use std::{
2 collections::{BTreeMap, HashMap},
3 time::Duration,
4};
5
6use chrono::{DateTime, Local};
7use ratatui::{
8 style::{Color, Style},
9 text::Span,
10};
11use tokio::sync::mpsc;
12use vector_lib::internal_event::DEFAULT_OUTPUT;
13
14use crate::config::ComponentKey;
15
16type IdentifiedMetric = (ComponentKey, i64);
17
18#[derive(Debug)]
19pub struct SentEventsMetric {
20 pub key: ComponentKey,
21 pub total: i64,
22 pub outputs: HashMap<String, i64>,
23}
24
25#[derive(Debug)]
26pub enum EventType {
27 InitializeState(State),
28 UptimeChanged(f64),
29 ReceivedBytesTotals(Vec<IdentifiedMetric>),
30 ReceivedBytesThroughputs(i64, Vec<IdentifiedMetric>),
32 ReceivedEventsTotals(Vec<IdentifiedMetric>),
33 ReceivedEventsThroughputs(i64, Vec<IdentifiedMetric>),
35 SentBytesTotals(Vec<IdentifiedMetric>),
36 SentBytesThroughputs(i64, Vec<IdentifiedMetric>),
38 SentEventsTotals(Vec<SentEventsMetric>),
40 SentEventsThroughputs(i64, Vec<SentEventsMetric>),
42 ErrorsTotals(Vec<IdentifiedMetric>),
43 #[cfg(feature = "allocation-tracing")]
44 AllocatedBytes(Vec<IdentifiedMetric>),
45 ComponentAdded(ComponentRow),
46 ComponentRemoved(ComponentKey),
47 ConnectionUpdated(ConnectionStatus),
48}
49
50#[derive(Debug, Copy, Clone)]
51pub enum ConnectionStatus {
52 Pending,
54 Disconnected(u64),
57 Connected(DateTime<Local>),
59}
60
61impl ConnectionStatus {
62 pub fn as_ui_spans(&self) -> Vec<Span> {
63 match self {
64 Self::Pending => vec![Span::styled(
65 "Connecting...",
66 Style::default().fg(Color::Yellow),
67 )],
68 Self::Disconnected(delay) => vec![
69 Span::styled("Disconnected", Style::default().fg(Color::Red)),
70 Span::from(format!(" (reconnecting every {} seconds)", delay / 1000)),
71 ],
72 Self::Connected(since) => vec![
73 Span::styled("Connected", Style::default().fg(Color::Green)),
74 Span::from(format!(" (since {})", since.format("%F %r %Z"))),
75 ],
76 }
77 }
78}
79
80#[derive(Debug, Clone)]
81pub struct State {
82 pub connection_status: ConnectionStatus,
83 pub uptime: Duration,
84 pub components: BTreeMap<ComponentKey, ComponentRow>,
85}
86
87impl State {
88 pub const fn new(components: BTreeMap<ComponentKey, ComponentRow>) -> Self {
89 Self {
90 connection_status: ConnectionStatus::Pending,
91 uptime: Duration::from_secs(0),
92 components,
93 }
94 }
95}
96pub type EventTx = mpsc::Sender<EventType>;
97pub type EventRx = mpsc::Receiver<EventType>;
98pub type StateRx = mpsc::Receiver<State>;
99
100#[derive(Debug, Clone, Default)]
101pub struct OutputMetrics {
102 pub sent_events_total: i64,
103 pub sent_events_throughput_sec: i64,
104}
105
106impl From<i64> for OutputMetrics {
107 fn from(sent_events_total: i64) -> Self {
108 Self {
109 sent_events_total,
110 sent_events_throughput_sec: 0,
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
116pub struct ComponentRow {
117 pub key: ComponentKey,
118 pub kind: String,
119 pub component_type: String,
120 pub outputs: HashMap<String, OutputMetrics>,
121 pub received_bytes_total: i64,
122 pub received_bytes_throughput_sec: i64,
123 pub received_events_total: i64,
124 pub received_events_throughput_sec: i64,
125 pub sent_bytes_total: i64,
126 pub sent_bytes_throughput_sec: i64,
127 pub sent_events_total: i64,
128 pub sent_events_throughput_sec: i64,
129 #[cfg(feature = "allocation-tracing")]
130 pub allocated_bytes: i64,
131 pub errors: i64,
132}
133
134impl ComponentRow {
135 pub fn has_displayable_outputs(&self) -> bool {
138 self.outputs.len() > 1
139 || (self.outputs.len() == 1 && !self.outputs.contains_key(DEFAULT_OUTPUT))
140 }
141}
142
143pub async fn updater(mut event_rx: EventRx) -> StateRx {
147 let (tx, rx) = mpsc::channel(20);
148
149 let mut state = State::new(BTreeMap::new());
150 tokio::spawn(async move {
151 while let Some(event_type) = event_rx.recv().await {
152 match event_type {
153 EventType::InitializeState(new_state) => {
154 state = new_state;
155 }
156 EventType::ReceivedBytesTotals(rows) => {
157 for (key, v) in rows {
158 if let Some(r) = state.components.get_mut(&key) {
159 r.received_bytes_total = v;
160 }
161 }
162 }
163 EventType::ReceivedBytesThroughputs(interval, rows) => {
164 for (key, v) in rows {
165 if let Some(r) = state.components.get_mut(&key) {
166 r.received_bytes_throughput_sec =
167 (v as f64 * (1000.0 / interval as f64)) as i64;
168 }
169 }
170 }
171 EventType::ReceivedEventsTotals(rows) => {
172 for (key, v) in rows {
173 if let Some(r) = state.components.get_mut(&key) {
174 r.received_events_total = v;
175 }
176 }
177 }
178 EventType::ReceivedEventsThroughputs(interval, rows) => {
179 for (key, v) in rows {
180 if let Some(r) = state.components.get_mut(&key) {
181 r.received_events_throughput_sec =
182 (v as f64 * (1000.0 / interval as f64)) as i64;
183 }
184 }
185 }
186 EventType::SentBytesTotals(rows) => {
187 for (key, v) in rows {
188 if let Some(r) = state.components.get_mut(&key) {
189 r.sent_bytes_total = v;
190 }
191 }
192 }
193 EventType::SentBytesThroughputs(interval, rows) => {
194 for (key, v) in rows {
195 if let Some(r) = state.components.get_mut(&key) {
196 r.sent_bytes_throughput_sec =
197 (v as f64 * (1000.0 / interval as f64)) as i64;
198 }
199 }
200 }
201 EventType::SentEventsTotals(rows) => {
202 for m in rows {
203 if let Some(r) = state.components.get_mut(&m.key) {
204 r.sent_events_total = m.total;
205 for (id, v) in m.outputs {
206 r.outputs
207 .entry(id)
208 .or_insert_with(OutputMetrics::default)
209 .sent_events_total = v;
210 }
211 }
212 }
213 }
214 EventType::SentEventsThroughputs(interval, rows) => {
215 for m in rows {
216 if let Some(r) = state.components.get_mut(&m.key) {
217 r.sent_events_throughput_sec =
218 (m.total as f64 * (1000.0 / interval as f64)) as i64;
219 for (id, v) in m.outputs {
220 let throughput = (v as f64 * (1000.0 / interval as f64)) as i64;
221 r.outputs
222 .entry(id)
223 .or_insert_with(OutputMetrics::default)
224 .sent_events_throughput_sec = throughput;
225 }
226 }
227 }
228 }
229 EventType::ErrorsTotals(rows) => {
230 for (key, v) in rows {
231 if let Some(r) = state.components.get_mut(&key) {
232 r.errors = v;
233 }
234 }
235 }
236 #[cfg(feature = "allocation-tracing")]
237 EventType::AllocatedBytes(rows) => {
238 for (key, v) in rows {
239 if let Some(r) = state.components.get_mut(&key) {
240 r.allocated_bytes = v;
241 }
242 }
243 }
244 EventType::ComponentAdded(c) => {
245 _ = state.components.insert(c.key.clone(), c);
246 }
247 EventType::ComponentRemoved(key) => {
248 _ = state.components.remove(&key);
249 }
250 EventType::ConnectionUpdated(status) => {
251 state.connection_status = status;
252 }
253 EventType::UptimeChanged(uptime) => {
254 state.uptime = Duration::from_secs_f64(uptime);
255 }
256 }
257
258 _ = tx.send(state.clone()).await;
260 }
261 });
262
263 rx
264}