vector/
signal.rs

1#![allow(missing_docs)]
2
3use std::collections::HashSet;
4
5use snafu::Snafu;
6use tokio::{runtime::Runtime, sync::broadcast};
7use tokio_stream::{Stream, StreamExt};
8
9use super::config::{ComponentKey, ConfigBuilder};
10
11pub type ShutdownTx = broadcast::Sender<()>;
12pub type SignalTx = broadcast::Sender<SignalTo>;
13pub type SignalRx = broadcast::Receiver<SignalTo>;
14
15#[derive(Debug, Clone)]
16/// Control messages used by Vector to drive topology and shutdown events.
17#[allow(clippy::large_enum_variant)] // discovered during Rust upgrade to 1.57; just allowing for now since we did previously
18pub enum SignalTo {
19    /// Signal to reload given components.
20    ReloadComponents(HashSet<ComponentKey>),
21    /// Signal to reload config from a string.
22    ReloadFromConfigBuilder(ConfigBuilder),
23    /// Signal to reload config from the filesystem and reload components with external files.
24    ReloadFromDisk,
25    /// Signal to reload all enrichment tables.
26    ReloadEnrichmentTables,
27    /// Signal to shutdown process.
28    Shutdown(Option<ShutdownError>),
29    /// Shutdown process immediately.
30    Quit,
31}
32
33impl PartialEq for SignalTo {
34    fn eq(&self, other: &Self) -> bool {
35        use SignalTo::*;
36
37        match (self, other) {
38            (ReloadComponents(a), ReloadComponents(b)) => a == b,
39            // TODO: This will require a lot of plumbing but ultimately we can derive equality for config builders.
40            (ReloadFromConfigBuilder(_), ReloadFromConfigBuilder(_)) => true,
41            (ReloadFromDisk, ReloadFromDisk) => true,
42            (ReloadEnrichmentTables, ReloadEnrichmentTables) => true,
43            (Shutdown(a), Shutdown(b)) => a == b,
44            (Quit, Quit) => true,
45            _ => false,
46        }
47    }
48}
49
50#[derive(Clone, Debug, Snafu, PartialEq, Eq)]
51pub enum ShutdownError {
52    // For future work: It would be nice if we could keep the actual errors in here, but
53    // `crate::Error` doesn't implement `Clone`, and adding `DynClone` for errors is tricky.
54    #[snafu(display("The API failed to start: {error}"))]
55    ApiFailed { error: String },
56    #[snafu(display("Reload failed, and then failed to restore the previous config"))]
57    ReloadFailedToRestore,
58    #[snafu(display(r#"The task for source "{key}" died during execution: {error}"#))]
59    SourceAborted { key: ComponentKey, error: String },
60    #[snafu(display(r#"The task for transform "{key}" died during execution: {error}"#))]
61    TransformAborted { key: ComponentKey, error: String },
62    #[snafu(display(r#"The task for sink "{key}" died during execution: {error}"#))]
63    SinkAborted { key: ComponentKey, error: String },
64}
65
66/// Convenience struct for app setup handling.
67pub struct SignalPair {
68    pub handler: SignalHandler,
69    pub receiver: SignalRx,
70}
71
72impl SignalPair {
73    /// Create a new signal handler pair, and set them up to receive OS signals.
74    pub fn new(runtime: &Runtime) -> Self {
75        let (handler, receiver) = SignalHandler::new();
76
77        #[cfg(unix)]
78        let signals = os_signals(runtime);
79
80        // If we passed `runtime` here, we would get the following:
81        // error[E0521]: borrowed data escapes outside of associated function
82        #[cfg(windows)]
83        let signals = os_signals();
84
85        handler.forever(runtime, signals);
86        Self { handler, receiver }
87    }
88}
89
90/// SignalHandler is a general `ControlTo` message receiver and transmitter. It's used by
91/// OS signals and providers to surface control events to the root of the application.
92pub struct SignalHandler {
93    tx: SignalTx,
94    shutdown_txs: Vec<ShutdownTx>,
95}
96
97impl SignalHandler {
98    /// Create a new signal handler with space for 128 control messages at a time, to
99    /// ensure the channel doesn't overflow and drop signals.
100    pub fn new() -> (Self, SignalRx) {
101        let (tx, rx) = broadcast::channel(128);
102        let handler = Self {
103            tx,
104            shutdown_txs: vec![],
105        };
106
107        (handler, rx)
108    }
109
110    /// Clones the transmitter.
111    pub fn clone_tx(&self) -> SignalTx {
112        self.tx.clone()
113    }
114
115    /// Subscribe to the stream, and return a new receiver.
116    pub fn subscribe(&self) -> SignalRx {
117        self.tx.subscribe()
118    }
119
120    /// Takes a stream who's elements are convertible to `SignalTo`, and spawns a permanent
121    /// task for transmitting to the receiver.
122    fn forever<T, S>(&self, runtime: &Runtime, stream: S)
123    where
124        T: Into<SignalTo> + Send + Sync,
125        S: Stream<Item = T> + 'static + Send,
126    {
127        let tx = self.tx.clone();
128
129        runtime.spawn(async move {
130            tokio::pin!(stream);
131
132            while let Some(value) = stream.next().await {
133                if tx.send(value.into()).is_err() {
134                    error!(
135                        message = "Couldn't send signal.",
136                        internal_log_rate_limit = false
137                    );
138                    break;
139                }
140            }
141        });
142    }
143
144    /// Takes a stream, sending to the underlying signal receiver. Returns a broadcast tx
145    /// channel which can be used by the caller to either subscribe to cancellation, or trigger
146    /// it. Useful for providers that may need to do both.
147    pub fn add<T, S>(&mut self, stream: S)
148    where
149        T: Into<SignalTo> + Send,
150        S: Stream<Item = T> + 'static + Send,
151    {
152        let (shutdown_tx, mut shutdown_rx) = broadcast::channel::<()>(2);
153        let tx = self.tx.clone();
154
155        self.shutdown_txs.push(shutdown_tx);
156
157        tokio::spawn(async move {
158            tokio::pin!(stream);
159
160            loop {
161                tokio::select! {
162                    biased;
163
164                    _ = shutdown_rx.recv() => break,
165                    Some(value) = stream.next() => {
166                        if tx.send(value.into()).is_err() {
167                            error!(message = "Couldn't send signal.", internal_log_rate_limit = false);
168                            break;
169                        }
170                    }
171                    else => {
172                        error!(message = "Underlying stream is closed.", internal_log_rate_limit = false);
173                        break;
174                    }
175                }
176            }
177        });
178    }
179
180    /// Shutdown active signal handlers.
181    pub fn clear(&mut self) {
182        for shutdown_tx in self.shutdown_txs.drain(..) {
183            // An error just means the channel was already shut down; safe to ignore.
184            _ = shutdown_tx.send(());
185        }
186    }
187}
188
189/// Signals from OS/user.
190#[cfg(unix)]
191fn os_signals(runtime: &Runtime) -> impl Stream<Item = SignalTo> + use<> {
192    use tokio::signal::unix::{SignalKind, signal};
193
194    // The `signal` function must be run within the context of a Tokio runtime.
195    runtime.block_on(async {
196        let mut sigint = signal(SignalKind::interrupt()).expect("Failed to set up SIGINT handler.");
197        let mut sigterm =
198            signal(SignalKind::terminate()).expect("Failed to set up SIGTERM handler.");
199        let mut sigquit = signal(SignalKind::quit()).expect("Failed to set up SIGQUIT handler.");
200        let mut sighup = signal(SignalKind::hangup()).expect("Failed to set up SIGHUP handler.");
201
202        async_stream::stream! {
203            loop {
204                let signal = tokio::select! {
205                    _ = sigint.recv() => {
206                        info!(message = "Signal received.", signal = "SIGINT");
207                        SignalTo::Shutdown(None)
208                    },
209                    _ = sigterm.recv() => {
210                        info!(message = "Signal received.", signal = "SIGTERM");
211                        SignalTo::Shutdown(None)
212                    } ,
213                    _ = sigquit.recv() => {
214                        info!(message = "Signal received.", signal = "SIGQUIT");
215                        SignalTo::Quit
216                    },
217                    _ = sighup.recv() => {
218                        info!(message = "Signal received.", signal = "SIGHUP");
219                        SignalTo::ReloadFromDisk
220                    },
221                };
222                yield signal;
223            }
224        }
225    })
226}
227
228/// Signals from OS/user.
229#[cfg(windows)]
230fn os_signals() -> impl Stream<Item = SignalTo> {
231    use futures::future::FutureExt;
232
233    async_stream::stream! {
234        loop {
235            let signal = tokio::signal::ctrl_c().map(|_| SignalTo::Shutdown(None)).await;
236            yield signal;
237        }
238    }
239}