1#![allow(missing_docs)]
2
3use std::collections::HashSet;
4
5use snafu::Snafu;
6use tokio::{runtime::Runtime, sync::broadcast};
7use tokio_stream::{Stream, StreamExt};
8
9use super::config::{ComponentKey, ConfigBuilder};
10
11pub type ShutdownTx = broadcast::Sender<()>;
12pub type SignalTx = broadcast::Sender<SignalTo>;
13pub type SignalRx = broadcast::Receiver<SignalTo>;
14
15#[derive(Debug, Clone)]
16#[allow(clippy::large_enum_variant)] pub enum SignalTo {
19 ReloadComponents(HashSet<ComponentKey>),
21 ReloadFromConfigBuilder(ConfigBuilder),
23 ReloadFromDisk,
25 ReloadEnrichmentTables,
27 Shutdown(Option<ShutdownError>),
29 Quit,
31}
32
33impl PartialEq for SignalTo {
34 fn eq(&self, other: &Self) -> bool {
35 use SignalTo::*;
36
37 match (self, other) {
38 (ReloadComponents(a), ReloadComponents(b)) => a == b,
39 (ReloadFromConfigBuilder(_), ReloadFromConfigBuilder(_)) => true,
41 (ReloadFromDisk, ReloadFromDisk) => true,
42 (ReloadEnrichmentTables, ReloadEnrichmentTables) => true,
43 (Shutdown(a), Shutdown(b)) => a == b,
44 (Quit, Quit) => true,
45 _ => false,
46 }
47 }
48}
49
50#[derive(Clone, Debug, Snafu, PartialEq, Eq)]
51pub enum ShutdownError {
52 #[snafu(display("The API failed to start: {error}"))]
55 ApiFailed { error: String },
56 #[snafu(display("Reload failed, and then failed to restore the previous config"))]
57 ReloadFailedToRestore,
58 #[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))]
59 SourceAborted { key: ComponentKey, error: String },
60 #[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))]
61 TransformAborted { key: ComponentKey, error: String },
62 #[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))]
63 SinkAborted { key: ComponentKey, error: String },
64}
65
66pub struct SignalPair {
68 pub handler: SignalHandler,
69 pub receiver: SignalRx,
70}
71
72impl SignalPair {
73 pub fn new(runtime: &Runtime) -> Self {
75 let (handler, receiver) = SignalHandler::new();
76
77 #[cfg(unix)]
78 let signals = os_signals(runtime);
79
80 #[cfg(windows)]
83 let signals = os_signals();
84
85 handler.forever(runtime, signals);
86 Self { handler, receiver }
87 }
88}
89
90pub struct SignalHandler {
93 tx: SignalTx,
94 shutdown_txs: Vec<ShutdownTx>,
95}
96
97impl SignalHandler {
98 pub fn new() -> (Self, SignalRx) {
101 let (tx, rx) = broadcast::channel(128);
102 let handler = Self {
103 tx,
104 shutdown_txs: vec![],
105 };
106
107 (handler, rx)
108 }
109
110 pub fn clone_tx(&self) -> SignalTx {
112 self.tx.clone()
113 }
114
115 pub fn subscribe(&self) -> SignalRx {
117 self.tx.subscribe()
118 }
119
120 fn forever<T, S>(&self, runtime: &Runtime, stream: S)
123 where
124 T: Into<SignalTo> + Send + Sync,
125 S: Stream<Item = T> + 'static + Send,
126 {
127 let tx = self.tx.clone();
128
129 runtime.spawn(async move {
130 tokio::pin!(stream);
131
132 while let Some(value) = stream.next().await {
133 if tx.send(value.into()).is_err() {
134 error!(
135 message = "Couldn't send signal.",
136 internal_log_rate_limit = false
137 );
138 break;
139 }
140 }
141 });
142 }
143
144 pub fn add<T, S>(&mut self, stream: S)
148 where
149 T: Into<SignalTo> + Send,
150 S: Stream<Item = T> + 'static + Send,
151 {
152 let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(2);
153 let tx = self.tx.clone();
154
155 self.shutdown_txs.push(shutdown_tx);
156
157 tokio::spawn(async move {
158 tokio::pin!(stream);
159
160 loop {
161 tokio::select! {
162 biased;
163
164 _ = shutdown_rx.recv() => break,
165 Some(value) = stream.next() => {
166 if tx.send(value.into()).is_err() {
167 error!(message = "Couldn't send signal.", internal_log_rate_limit = false);
168 break;
169 }
170 }
171 else => {
172 error!(message = "Underlying stream is closed.", internal_log_rate_limit = false);
173 break;
174 }
175 }
176 }
177 });
178 }
179
180 pub fn clear(&mut self) {
182 for shutdown_tx in self.shutdown_txs.drain(..) {
183 _ = shutdown_tx.send(());
185 }
186 }
187}
188
189#[cfg(unix)]
191fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> + use<> {
192 use tokio::signal::unix::{SignalKind, signal};
193
194 runtime.block_on(async {
196 let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set up SIGINT handler.");
197 let mut sigterm =
198 signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler.");
199 let mut sigquit = signal(SignalKind::quit()).expect("Failed to set up SIGQUIT handler.");
200 let mut sighup = signal(SignalKind::hangup()).expect("Failed to set up SIGHUP handler.");
201
202 async_stream::stream! {
203 loop {
204 let signal = tokio::select! {
205 _ = sigint.recv() => {
206 info!(message = "Signal received.", signal = "SIGINT");
207 SignalTo::Shutdown(None)
208 },
209 _ = sigterm.recv() => {
210 info!(message = "Signal received.", signal = "SIGTERM");
211 SignalTo::Shutdown(None)
212 } ,
213 _ = sigquit.recv() => {
214 info!(message = "Signal received.", signal = "SIGQUIT");
215 SignalTo::Quit
216 },
217 _ = sighup.recv() => {
218 info!(message = "Signal received.", signal = "SIGHUP");
219 SignalTo::ReloadFromDisk
220 },
221 };
222 yield signal;
223 }
224 }
225 })
226}
227
228#[cfg(windows)]
230fn os_signals() -> impl Stream<Item = SignalTo> {
231 use futures::future::FutureExt;
232
233 async_stream::stream! {
234 loop {
235 let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await;
236 yield signal;
237 }
238 }
239}