1#![allow(missing_docs)]
2
3use snafu::Snafu;
4use std::collections::HashSet;
5use tokio::{runtime::Runtime, sync::broadcast};
6use tokio_stream::{Stream, StreamExt};
7
8use super::config::{ComponentKey, ConfigBuilder};
9
10pub type ShutdownTx = broadcast::Sender<()>;
11pub type SignalTx = broadcast::Sender<SignalTo>;
12pub type SignalRx = broadcast::Receiver<SignalTo>;
13
14#[derive(Debug, Clone)]
15#[allow(clippy::large_enum_variant)] pub enum SignalTo {
18 ReloadComponents(HashSet<ComponentKey>),
20 ReloadFromConfigBuilder(ConfigBuilder),
22 ReloadFromDisk,
24 ReloadEnrichmentTables,
26 Shutdown(Option<ShutdownError>),
28 Quit,
30}
31
32impl PartialEq for SignalTo {
33 fn eq(&self, other: &Self) -> bool {
34 use SignalTo::*;
35
36 match (self, other) {
37 (ReloadComponents(a), ReloadComponents(b)) => a == b,
38 (ReloadFromConfigBuilder(_), ReloadFromConfigBuilder(_)) => true,
40 (ReloadFromDisk, ReloadFromDisk) => true,
41 (ReloadEnrichmentTables, ReloadEnrichmentTables) => true,
42 (Shutdown(a), Shutdown(b)) => a == b,
43 (Quit, Quit) => true,
44 _ => false,
45 }
46 }
47}
48
49#[derive(Clone, Debug, Snafu, PartialEq, Eq)]
50pub enum ShutdownError {
51 #[snafu(display("The API failed to start: {error}"))]
54 ApiFailed { error: String },
55 #[snafu(display("Reload failed, and then failed to restore the previous config"))]
56 ReloadFailedToRestore,
57 #[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))]
58 SourceAborted { key: ComponentKey, error: String },
59 #[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))]
60 TransformAborted { key: ComponentKey, error: String },
61 #[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))]
62 SinkAborted { key: ComponentKey, error: String },
63}
64
65pub struct SignalPair {
67 pub handler: SignalHandler,
68 pub receiver: SignalRx,
69}
70
71impl SignalPair {
72 pub fn new(runtime: &Runtime) -> Self {
74 let (handler, receiver) = SignalHandler::new();
75 let signals = os_signals(runtime);
76 handler.forever(runtime, signals);
77 Self { handler, receiver }
78 }
79}
80
81pub struct SignalHandler {
84 tx: SignalTx,
85 shutdown_txs: Vec<ShutdownTx>,
86}
87
88impl SignalHandler {
89 fn new() -> (Self, SignalRx) {
92 let (tx, rx) = broadcast::channel(128);
93 let handler = Self {
94 tx,
95 shutdown_txs: vec![],
96 };
97
98 (handler, rx)
99 }
100
101 pub fn clone_tx(&self) -> SignalTx {
103 self.tx.clone()
104 }
105
106 pub fn subscribe(&self) -> SignalRx {
108 self.tx.subscribe()
109 }
110
111 fn forever<T, S>(&self, runtime: &Runtime, stream: S)
114 where
115 T: Into<SignalTo> + Send + Sync,
116 S: Stream<Item = T> + 'static + Send,
117 {
118 let tx = self.tx.clone();
119
120 runtime.spawn(async move {
121 tokio::pin!(stream);
122
123 while let Some(value) = stream.next().await {
124 if tx.send(value.into()).is_err() {
125 error!(message = "Couldn't send signal.");
126 break;
127 }
128 }
129 });
130 }
131
132 pub fn add<T, S>(&mut self, stream: S)
136 where
137 T: Into<SignalTo> + Send,
138 S: Stream<Item = T> + 'static + Send,
139 {
140 let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(2);
141 let tx = self.tx.clone();
142
143 self.shutdown_txs.push(shutdown_tx);
144
145 tokio::spawn(async move {
146 tokio::pin!(stream);
147
148 loop {
149 tokio::select! {
150 biased;
151
152 _ = shutdown_rx.recv() => break,
153 Some(value) = stream.next() => {
154 if tx.send(value.into()).is_err() {
155 error!(message = "Couldn't send signal.");
156 break;
157 }
158 }
159 else => {
160 error!(message = "Underlying stream is closed.");
161 break;
162 }
163 }
164 }
165 });
166 }
167
168 pub fn clear(&mut self) {
170 for shutdown_tx in self.shutdown_txs.drain(..) {
171 _ = shutdown_tx.send(());
173 }
174 }
175}
176
177#[cfg(unix)]
179fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> + use<> {
180 use tokio::signal::unix::{signal, SignalKind};
181
182 runtime.block_on(async {
184 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set up SIGINT handler.");
185 let mut sigterm =
186 signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler.");
187 let mut sigquit = signal(SignalKind::quit()).expect("Failed to set up SIGQUIT handler.");
188 let mut sighup = signal(SignalKind::hangup()).expect("Failed to set up SIGHUP handler.");
189
190 async_stream::stream! {
191 loop {
192 let signal = tokio::select! {
193 _ = sigint.recv() => {
194 info!(message = "Signal received.", signal = "SIGINT");
195 SignalTo::Shutdown(None)
196 },
197 _ = sigterm.recv() => {
198 info!(message = "Signal received.", signal = "SIGTERM");
199 SignalTo::Shutdown(None)
200 } ,
201 _ = sigquit.recv() => {
202 info!(message = "Signal received.", signal = "SIGQUIT");
203 SignalTo::Quit
204 },
205 _ = sighup.recv() => {
206 info!(message = "Signal received.", signal = "SIGHUP");
207 SignalTo::ReloadFromDisk
208 },
209 };
210 yield signal;
211 }
212 }
213 })
214}
215
216#[cfg(windows)]
218fn os_signals(_: &Runtime) -> impl Stream<Item = SignalTo> {
219 use futures::future::FutureExt;
220
221 async_stream::stream! {
222 loop {
223 let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await;
224 yield signal;
225 }
226 }
227}