1#![allow(missing_docs)]
2
3use std::collections::HashSet;
4
5use snafu::Snafu;
6use tokio::{runtime::Runtime, sync::broadcast};
7use tokio_stream::{Stream, StreamExt};
8
9use super::config::{ComponentKey, ConfigBuilder};
10
11pub type ShutdownTx = broadcast::Sender<()>;
12pub type SignalTx = broadcast::Sender<SignalTo>;
13pub type SignalRx = broadcast::Receiver<SignalTo>;
14
15#[derive(Debug, Clone)]
16#[allow(clippy::large_enum_variant)] pub enum SignalTo {
19 ReloadComponents(HashSet<ComponentKey>),
21 ReloadFromConfigBuilder(ConfigBuilder),
23 ReloadFromDisk,
25 ReloadEnrichmentTables,
27 Shutdown(Option<ShutdownError>),
29 Quit,
31}
32
33impl PartialEq for SignalTo {
34 fn eq(&self, other: &Self) -> bool {
35 use SignalTo::*;
36
37 match (self, other) {
38 (ReloadComponents(a), ReloadComponents(b)) => a == b,
39 (ReloadFromConfigBuilder(_), ReloadFromConfigBuilder(_)) => true,
41 (ReloadFromDisk, ReloadFromDisk) => true,
42 (ReloadEnrichmentTables, ReloadEnrichmentTables) => true,
43 (Shutdown(a), Shutdown(b)) => a == b,
44 (Quit, Quit) => true,
45 _ => false,
46 }
47 }
48}
49
50#[derive(Clone, Debug, Snafu, PartialEq, Eq)]
51pub enum ShutdownError {
52 #[snafu(display("The API failed to start: {error}"))]
55 ApiFailed { error: String },
56 #[snafu(display("Reload failed, and then failed to restore the previous config"))]
57 ReloadFailedToRestore,
58 #[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))]
59 SourceAborted { key: ComponentKey, error: String },
60 #[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))]
61 TransformAborted { key: ComponentKey, error: String },
62 #[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))]
63 SinkAborted { key: ComponentKey, error: String },
64}
65
66pub struct SignalPair {
68 pub handler: SignalHandler,
69 pub receiver: SignalRx,
70}
71
72impl SignalPair {
73 pub fn new(runtime: &Runtime) -> Self {
75 let (handler, receiver) = SignalHandler::new();
76
77 #[cfg(unix)]
78 let signals = os_signals(runtime);
79
80 #[cfg(windows)]
83 let signals = os_signals();
84
85 handler.forever(runtime, signals);
86 Self { handler, receiver }
87 }
88}
89
90pub struct SignalHandler {
93 tx: SignalTx,
94 shutdown_txs: Vec<ShutdownTx>,
95}
96
97impl SignalHandler {
98 fn new() -> (Self, SignalRx) {
101 let (tx, rx) = broadcast::channel(128);
102 let handler = Self {
103 tx,
104 shutdown_txs: vec![],
105 };
106
107 (handler, rx)
108 }
109
110 pub fn clone_tx(&self) -> SignalTx {
112 self.tx.clone()
113 }
114
115 pub fn subscribe(&self) -> SignalRx {
117 self.tx.subscribe()
118 }
119
120 fn forever<T, S>(&self, runtime: &Runtime, stream: S)
123 where
124 T: Into<SignalTo> + Send + Sync,
125 S: Stream<Item = T> + 'static + Send,
126 {
127 let tx = self.tx.clone();
128
129 runtime.spawn(async move {
130 tokio::pin!(stream);
131
132 while let Some(value) = stream.next().await {
133 if tx.send(value.into()).is_err() {
134 error!(message = "Couldn't send signal.");
135 break;
136 }
137 }
138 });
139 }
140
141 pub fn add<T, S>(&mut self, stream: S)
145 where
146 T: Into<SignalTo> + Send,
147 S: Stream<Item = T> + 'static + Send,
148 {
149 let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(2);
150 let tx = self.tx.clone();
151
152 self.shutdown_txs.push(shutdown_tx);
153
154 tokio::spawn(async move {
155 tokio::pin!(stream);
156
157 loop {
158 tokio::select! {
159 biased;
160
161 _ = shutdown_rx.recv() => break,
162 Some(value) = stream.next() => {
163 if tx.send(value.into()).is_err() {
164 error!(message = "Couldn't send signal.");
165 break;
166 }
167 }
168 else => {
169 error!(message = "Underlying stream is closed.");
170 break;
171 }
172 }
173 }
174 });
175 }
176
177 pub fn clear(&mut self) {
179 for shutdown_tx in self.shutdown_txs.drain(..) {
180 _ = shutdown_tx.send(());
182 }
183 }
184}
185
186#[cfg(unix)]
188fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> + use<> {
189 use tokio::signal::unix::{SignalKind, signal};
190
191 runtime.block_on(async {
193 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set up SIGINT handler.");
194 let mut sigterm =
195 signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler.");
196 let mut sigquit = signal(SignalKind::quit()).expect("Failed to set up SIGQUIT handler.");
197 let mut sighup = signal(SignalKind::hangup()).expect("Failed to set up SIGHUP handler.");
198
199 async_stream::stream! {
200 loop {
201 let signal = tokio::select! {
202 _ = sigint.recv() => {
203 info!(message = "Signal received.", signal = "SIGINT");
204 SignalTo::Shutdown(None)
205 },
206 _ = sigterm.recv() => {
207 info!(message = "Signal received.", signal = "SIGTERM");
208 SignalTo::Shutdown(None)
209 } ,
210 _ = sigquit.recv() => {
211 info!(message = "Signal received.", signal = "SIGQUIT");
212 SignalTo::Quit
213 },
214 _ = sighup.recv() => {
215 info!(message = "Signal received.", signal = "SIGHUP");
216 SignalTo::ReloadFromDisk
217 },
218 };
219 yield signal;
220 }
221 }
222 })
223}
224
225#[cfg(windows)]
227fn os_signals() -> impl Stream<Item = SignalTo> {
228 use futures::future::FutureExt;
229
230 async_stream::stream! {
231 loop {
232 let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await;
233 yield signal;
234 }
235 }
236}