vector/
signal.rs

1#![allow(missing_docs)]
2
3use std::collections::HashSet;
4
5use snafu::Snafu;
6use tokio::{runtime::Runtime, sync::broadcast};
7use tokio_stream::{Stream, StreamExt};
8
9use super::config::{ComponentKey, ConfigBuilder};
10
11pub type ShutdownTx = broadcast::Sender<()>;
12pub type SignalTx = broadcast::Sender<SignalTo>;
13pub type SignalRx = broadcast::Receiver<SignalTo>;
14
15#[derive(Debug, Clone)]
16/// Control messages used by Vector to drive topology and shutdown events.
17#[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously
18pub enum SignalTo {
19    /// Signal to reload given components.
20    ReloadComponents(HashSet<ComponentKey>),
21    /// Signal to reload config from a string.
22    ReloadFromConfigBuilder(ConfigBuilder),
23    /// Signal to reload config from the filesystem.
24    ReloadFromDisk,
25    /// Signal to reload all enrichment tables.
26    ReloadEnrichmentTables,
27    /// Signal to shutdown process.
28    Shutdown(Option<ShutdownError>),
29    /// Shutdown process immediately.
30    Quit,
31}
32
33impl PartialEq for SignalTo {
34    fn eq(&self, other: &Self) -> bool {
35        use SignalTo::*;
36
37        match (self, other) {
38            (ReloadComponents(a), ReloadComponents(b)) => a == b,
39            // TODO: This will require a lot of plumbing but ultimately we can derive equality for config builders.
40            (ReloadFromConfigBuilder(_), ReloadFromConfigBuilder(_)) => true,
41            (ReloadFromDisk, ReloadFromDisk) => true,
42            (ReloadEnrichmentTables, ReloadEnrichmentTables) => true,
43            (Shutdown(a), Shutdown(b)) => a == b,
44            (Quit, Quit) => true,
45            _ => false,
46        }
47    }
48}
49
50#[derive(Clone, Debug, Snafu, PartialEq, Eq)]
51pub enum ShutdownError {
52    // For future work: It would be nice if we could keep the actual errors in here, but
53    // `crate::Error` doesn't implement `Clone`, and adding `DynClone` for errors is tricky.
54    #[snafu(display("The API failed to start: {error}"))]
55    ApiFailed { error: String },
56    #[snafu(display("Reload failed, and then failed to restore the previous config"))]
57    ReloadFailedToRestore,
58    #[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))]
59    SourceAborted { key: ComponentKey, error: String },
60    #[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))]
61    TransformAborted { key: ComponentKey, error: String },
62    #[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))]
63    SinkAborted { key: ComponentKey, error: String },
64}
65
66/// Convenience struct for app setup handling.
67pub struct SignalPair {
68    pub handler: SignalHandler,
69    pub receiver: SignalRx,
70}
71
72impl SignalPair {
73    /// Create a new signal handler pair, and set them up to receive OS signals.
74    pub fn new(runtime: &Runtime) -> Self {
75        let (handler, receiver) = SignalHandler::new();
76
77        #[cfg(unix)]
78        let signals = os_signals(runtime);
79
80        // If we passed `runtime` here, we would get the following:
81        // error[E0521]: borrowed data escapes outside of associated function
82        #[cfg(windows)]
83        let signals = os_signals();
84
85        handler.forever(runtime, signals);
86        Self { handler, receiver }
87    }
88}
89
90/// SignalHandler is a general `ControlTo` message receiver and transmitter. It's used by
91/// OS signals and providers to surface control events to the root of the application.
92pub struct SignalHandler {
93    tx: SignalTx,
94    shutdown_txs: Vec<ShutdownTx>,
95}
96
97impl SignalHandler {
98    /// Create a new signal handler with space for 128 control messages at a time, to
99    /// ensure the channel doesn't overflow and drop signals.
100    fn new() -> (Self, SignalRx) {
101        let (tx, rx) = broadcast::channel(128);
102        let handler = Self {
103            tx,
104            shutdown_txs: vec![],
105        };
106
107        (handler, rx)
108    }
109
110    /// Clones the transmitter.
111    pub fn clone_tx(&self) -> SignalTx {
112        self.tx.clone()
113    }
114
115    /// Subscribe to the stream, and return a new receiver.
116    pub fn subscribe(&self) -> SignalRx {
117        self.tx.subscribe()
118    }
119
120    /// Takes a stream who's elements are convertible to `SignalTo`, and spawns a permanent
121    /// task for transmitting to the receiver.
122    fn forever<T, S>(&self, runtime: &Runtime, stream: S)
123    where
124        T: Into<SignalTo> + Send + Sync,
125        S: Stream<Item = T> + 'static + Send,
126    {
127        let tx = self.tx.clone();
128
129        runtime.spawn(async move {
130            tokio::pin!(stream);
131
132            while let Some(value) = stream.next().await {
133                if tx.send(value.into()).is_err() {
134                    error!(message = "Couldn't send signal.");
135                    break;
136                }
137            }
138        });
139    }
140
141    /// Takes a stream, sending to the underlying signal receiver. Returns a broadcast tx
142    /// channel which can be used by the caller to either subscribe to cancellation, or trigger
143    /// it. Useful for providers that may need to do both.
144    pub fn add<T, S>(&mut self, stream: S)
145    where
146        T: Into<SignalTo> + Send,
147        S: Stream<Item = T> + 'static + Send,
148    {
149        let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(2);
150        let tx = self.tx.clone();
151
152        self.shutdown_txs.push(shutdown_tx);
153
154        tokio::spawn(async move {
155            tokio::pin!(stream);
156
157            loop {
158                tokio::select! {
159                    biased;
160
161                    _ = shutdown_rx.recv() => break,
162                    Some(value) = stream.next() => {
163                        if tx.send(value.into()).is_err() {
164                            error!(message = "Couldn't send signal.");
165                            break;
166                        }
167                    }
168                    else => {
169                        error!(message = "Underlying stream is closed.");
170                        break;
171                    }
172                }
173            }
174        });
175    }
176
177    /// Shutdown active signal handlers.
178    pub fn clear(&mut self) {
179        for shutdown_tx in self.shutdown_txs.drain(..) {
180            // An error just means the channel was already shut down; safe to ignore.
181            _ = shutdown_tx.send(());
182        }
183    }
184}
185
186/// Signals from OS/user.
187#[cfg(unix)]
188fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> + use<> {
189    use tokio::signal::unix::{SignalKind, signal};
190
191    // The `signal` function must be run within the context of a Tokio runtime.
192    runtime.block_on(async {
193        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set up SIGINT handler.");
194        let mut sigterm =
195            signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler.");
196        let mut sigquit = signal(SignalKind::quit()).expect("Failed to set up SIGQUIT handler.");
197        let mut sighup = signal(SignalKind::hangup()).expect("Failed to set up SIGHUP handler.");
198
199        async_stream::stream! {
200            loop {
201                let signal = tokio::select! {
202                    _ = sigint.recv() => {
203                        info!(message = "Signal received.", signal = "SIGINT");
204                        SignalTo::Shutdown(None)
205                    },
206                    _ = sigterm.recv() => {
207                        info!(message = "Signal received.", signal = "SIGTERM");
208                        SignalTo::Shutdown(None)
209                    } ,
210                    _ = sigquit.recv() => {
211                        info!(message = "Signal received.", signal = "SIGQUIT");
212                        SignalTo::Quit
213                    },
214                    _ = sighup.recv() => {
215                        info!(message = "Signal received.", signal = "SIGHUP");
216                        SignalTo::ReloadFromDisk
217                    },
218                };
219                yield signal;
220            }
221        }
222    })
223}
224
225/// Signals from OS/user.
226#[cfg(windows)]
227fn os_signals() -> impl Stream<Item = SignalTo> {
228    use futures::future::FutureExt;
229
230    async_stream::stream! {
231        loop {
232            let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await;
233            yield signal;
234        }
235    }
236}