vector/
signal.rs

1#![allow(missing_docs)]
2
3use snafu::Snafu;
4use std::collections::HashSet;
5use tokio::{runtime::Runtime, sync::broadcast};
6use tokio_stream::{Stream, StreamExt};
7
8use super::config::{ComponentKey, ConfigBuilder};
9
10pub type ShutdownTx = broadcast::Sender<()>;
11pub type SignalTx = broadcast::Sender<SignalTo>;
12pub type SignalRx = broadcast::Receiver<SignalTo>;
13
14#[derive(Debug, Clone)]
15/// Control messages used by Vector to drive topology and shutdown events.
16#[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously
17pub enum SignalTo {
18    /// Signal to reload given components.
19    ReloadComponents(HashSet<ComponentKey>),
20    /// Signal to reload config from a string.
21    ReloadFromConfigBuilder(ConfigBuilder),
22    /// Signal to reload config from the filesystem.
23    ReloadFromDisk,
24    /// Signal to reload all enrichment tables.
25    ReloadEnrichmentTables,
26    /// Signal to shutdown process.
27    Shutdown(Option<ShutdownError>),
28    /// Shutdown process immediately.
29    Quit,
30}
31
32impl PartialEq for SignalTo {
33    fn eq(&self, other: &Self) -> bool {
34        use SignalTo::*;
35
36        match (self, other) {
37            (ReloadComponents(a), ReloadComponents(b)) => a == b,
38            // TODO: This will require a lot of plumbing but ultimately we can derive equality for config builders.
39            (ReloadFromConfigBuilder(_), ReloadFromConfigBuilder(_)) => true,
40            (ReloadFromDisk, ReloadFromDisk) => true,
41            (ReloadEnrichmentTables, ReloadEnrichmentTables) => true,
42            (Shutdown(a), Shutdown(b)) => a == b,
43            (Quit, Quit) => true,
44            _ => false,
45        }
46    }
47}
48
49#[derive(Clone, Debug, Snafu, PartialEq, Eq)]
50pub enum ShutdownError {
51    // For future work: It would be nice if we could keep the actual errors in here, but
52    // `crate::Error` doesn't implement `Clone`, and adding `DynClone` for errors is tricky.
53    #[snafu(display("The API failed to start: {error}"))]
54    ApiFailed { error: String },
55    #[snafu(display("Reload failed, and then failed to restore the previous config"))]
56    ReloadFailedToRestore,
57    #[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))]
58    SourceAborted { key: ComponentKey, error: String },
59    #[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))]
60    TransformAborted { key: ComponentKey, error: String },
61    #[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))]
62    SinkAborted { key: ComponentKey, error: String },
63}
64
65/// Convenience struct for app setup handling.
66pub struct SignalPair {
67    pub handler: SignalHandler,
68    pub receiver: SignalRx,
69}
70
71impl SignalPair {
72    /// Create a new signal handler pair, and set them up to receive OS signals.
73    pub fn new(runtime: &Runtime) -> Self {
74        let (handler, receiver) = SignalHandler::new();
75        let signals = os_signals(runtime);
76        handler.forever(runtime, signals);
77        Self { handler, receiver }
78    }
79}
80
81/// SignalHandler is a general `ControlTo` message receiver and transmitter. It's used by
82/// OS signals and providers to surface control events to the root of the application.
83pub struct SignalHandler {
84    tx: SignalTx,
85    shutdown_txs: Vec<ShutdownTx>,
86}
87
88impl SignalHandler {
89    /// Create a new signal handler with space for 128 control messages at a time, to
90    /// ensure the channel doesn't overflow and drop signals.
91    fn new() -> (Self, SignalRx) {
92        let (tx, rx) = broadcast::channel(128);
93        let handler = Self {
94            tx,
95            shutdown_txs: vec![],
96        };
97
98        (handler, rx)
99    }
100
101    /// Clones the transmitter.
102    pub fn clone_tx(&self) -> SignalTx {
103        self.tx.clone()
104    }
105
106    /// Subscribe to the stream, and return a new receiver.
107    pub fn subscribe(&self) -> SignalRx {
108        self.tx.subscribe()
109    }
110
111    /// Takes a stream who's elements are convertible to `SignalTo`, and spawns a permanent
112    /// task for transmitting to the receiver.
113    fn forever<T, S>(&self, runtime: &Runtime, stream: S)
114    where
115        T: Into<SignalTo> + Send + Sync,
116        S: Stream<Item = T> + 'static + Send,
117    {
118        let tx = self.tx.clone();
119
120        runtime.spawn(async move {
121            tokio::pin!(stream);
122
123            while let Some(value) = stream.next().await {
124                if tx.send(value.into()).is_err() {
125                    error!(message = "Couldn't send signal.");
126                    break;
127                }
128            }
129        });
130    }
131
132    /// Takes a stream, sending to the underlying signal receiver. Returns a broadcast tx
133    /// channel which can be used by the caller to either subscribe to cancellation, or trigger
134    /// it. Useful for providers that may need to do both.
135    pub fn add<T, S>(&mut self, stream: S)
136    where
137        T: Into<SignalTo> + Send,
138        S: Stream<Item = T> + 'static + Send,
139    {
140        let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(2);
141        let tx = self.tx.clone();
142
143        self.shutdown_txs.push(shutdown_tx);
144
145        tokio::spawn(async move {
146            tokio::pin!(stream);
147
148            loop {
149                tokio::select! {
150                    biased;
151
152                    _ = shutdown_rx.recv() => break,
153                    Some(value) = stream.next() => {
154                        if tx.send(value.into()).is_err() {
155                            error!(message = "Couldn't send signal.");
156                            break;
157                        }
158                    }
159                    else => {
160                        error!(message = "Underlying stream is closed.");
161                        break;
162                    }
163                }
164            }
165        });
166    }
167
168    /// Shutdown active signal handlers.
169    pub fn clear(&mut self) {
170        for shutdown_tx in self.shutdown_txs.drain(..) {
171            // An error just means the channel was already shut down; safe to ignore.
172            _ = shutdown_tx.send(());
173        }
174    }
175}
176
177/// Signals from OS/user.
178#[cfg(unix)]
179fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> + use<> {
180    use tokio::signal::unix::{signal, SignalKind};
181
182    // The `signal` function must be run within the context of a Tokio runtime.
183    runtime.block_on(async {
184        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set up SIGINT handler.");
185        let mut sigterm =
186            signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler.");
187        let mut sigquit = signal(SignalKind::quit()).expect("Failed to set up SIGQUIT handler.");
188        let mut sighup = signal(SignalKind::hangup()).expect("Failed to set up SIGHUP handler.");
189
190        async_stream::stream! {
191            loop {
192                let signal = tokio::select! {
193                    _ = sigint.recv() => {
194                        info!(message = "Signal received.", signal = "SIGINT");
195                        SignalTo::Shutdown(None)
196                    },
197                    _ = sigterm.recv() => {
198                        info!(message = "Signal received.", signal = "SIGTERM");
199                        SignalTo::Shutdown(None)
200                    } ,
201                    _ = sigquit.recv() => {
202                        info!(message = "Signal received.", signal = "SIGQUIT");
203                        SignalTo::Quit
204                    },
205                    _ = sighup.recv() => {
206                        info!(message = "Signal received.", signal = "SIGHUP");
207                        SignalTo::ReloadFromDisk
208                    },
209                };
210                yield signal;
211            }
212        }
213    })
214}
215
216/// Signals from OS/user.
217#[cfg(windows)]
218fn os_signals(_: &Runtime) -> impl Stream<Item = SignalTo> {
219    use futures::future::FutureExt;
220
221    async_stream::stream! {
222        loop {
223            let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await;
224            yield signal;
225        }
226    }
227}