vector/topology/
controller.rs

1use std::sync::Arc;
2
3#[cfg(feature = "api")]
4use crate::api;
5use crate::extra_context::ExtraContext;
6use crate::internal_events::{VectorRecoveryError, VectorReloadError, VectorReloaded};
7use futures_util::FutureExt as _;
8use tokio::sync::{Mutex, MutexGuard};
9
10use crate::{config, signal::ShutdownError, topology::RunningTopology};
11
12#[derive(Clone, Debug)]
13pub struct SharedTopologyController(Arc<Mutex<TopologyController>>);
14
15impl SharedTopologyController {
16    pub fn new(inner: TopologyController) -> Self {
17        Self(Arc::new(Mutex::new(inner)))
18    }
19
20    pub fn blocking_lock(&self) -> MutexGuard<TopologyController> {
21        self.0.blocking_lock()
22    }
23
24    pub async fn lock(&self) -> MutexGuard<TopologyController> {
25        self.0.lock().await
26    }
27
28    pub fn try_into_inner(self) -> Result<Mutex<TopologyController>, Self> {
29        Arc::try_unwrap(self.0).map_err(Self)
30    }
31}
32
33pub struct TopologyController {
34    pub topology: RunningTopology,
35    pub config_paths: Vec<config::ConfigPath>,
36    pub require_healthy: Option<bool>,
37    #[cfg(feature = "api")]
38    pub api_server: Option<api::Server>,
39    pub extra_context: ExtraContext,
40}
41
42impl std::fmt::Debug for TopologyController {
43    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44        f.debug_struct("TopologyController")
45            .field("config_paths", &self.config_paths)
46            .field("require_healthy", &self.require_healthy)
47            .finish()
48    }
49}
50
51#[derive(Clone, Debug)]
52pub enum ReloadOutcome {
53    MissingApiKey,
54    Success,
55    RolledBack,
56    FatalError(ShutdownError),
57}
58
59impl TopologyController {
60    pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
61        new_config
62            .healthchecks
63            .set_require_healthy(self.require_healthy);
64
65        // Start the api server or disable it, if necessary
66        #[cfg(feature = "api")]
67        if !new_config.api.enabled {
68            if let Some(server) = self.api_server.take() {
69                debug!("Dropping api server.");
70                drop(server)
71            }
72        } else if self.api_server.is_none() {
73            use crate::internal_events::ApiStarted;
74            use std::sync::atomic::AtomicBool;
75            use tokio::runtime::Handle;
76
77            debug!("Starting api server.");
78
79            self.api_server = match api::Server::start(
80                self.topology.config(),
81                self.topology.watch(),
82                Arc::<AtomicBool>::clone(&self.topology.running),
83                &Handle::current(),
84            ) {
85                Ok(api_server) => {
86                    emit!(ApiStarted {
87                        addr: new_config.api.address.unwrap(),
88                        playground: new_config.api.playground,
89                        graphql: new_config.api.graphql,
90                    });
91
92                    Some(api_server)
93                }
94                Err(error) => {
95                    let error = error.to_string();
96                    error!("An error occurred that Vector couldn't handle: {}.", error);
97                    return ReloadOutcome::FatalError(ShutdownError::ApiFailed { error });
98                }
99            }
100        }
101
102        match self
103            .topology
104            .reload_config_and_respawn(new_config, self.extra_context.clone())
105            .await
106        {
107            Ok(true) => {
108                #[cfg(feature = "api")]
109                // Pass the new config to the API server.
110                if let Some(ref api_server) = self.api_server {
111                    api_server.update_config(self.topology.config());
112                }
113
114                emit!(VectorReloaded {
115                    config_paths: &self.config_paths
116                });
117                ReloadOutcome::Success
118            }
119            Ok(false) => {
120                emit!(VectorReloadError);
121                ReloadOutcome::RolledBack
122            }
123            // Trigger graceful shutdown for what remains of the topology
124            Err(()) => {
125                emit!(VectorReloadError);
126                emit!(VectorRecoveryError);
127                ReloadOutcome::FatalError(ShutdownError::ReloadFailedToRestore)
128            }
129        }
130    }
131
132    pub async fn stop(self) {
133        self.topology.stop().await;
134    }
135
136    // The `sources_finished` method on `RunningTopology` only considers sources that are currently
137    // running at the time the method is called. This presents a problem when the set of running
138    // sources can change while we are waiting on the resulting future to resolve.
139    //
140    // This function resolves that issue by waiting in two stages. The first is the usual asynchronous
141    // wait for the future to complete. When it does, we know that all of the sources that existed when
142    // the future was built have finished, but we don't know if that's because they were replaced as
143    // part of a reload (in which case we don't want to return yet). To differentiate, we acquire the
144    // lock on the topology, create a new future, and check whether it resolves immediately or not. If
145    // it does resolve, we know all sources are truly finished because we held the lock during the
146    // check, preventing anyone else from adding new sources. If it does not resolve, that indicates
147    // that new sources have been added since our original call and we should start the process over to
148    // continue waiting.
149    pub async fn sources_finished(mutex: SharedTopologyController) {
150        loop {
151            // Do an initial async wait while the topology is running, making sure not the hold the
152            // mutex lock while we wait on sources to finish.
153            let initial = {
154                let tc = mutex.lock().await;
155                tc.topology.sources_finished()
156            };
157            initial.await;
158
159            // Once the initial signal is tripped, hold lock on the topology while checking again. This
160            // ensures that no other task is adding new sources.
161            let top = mutex.lock().await;
162            if top.topology.sources_finished().now_or_never().is_some() {
163                return;
164            } else {
165                continue;
166            }
167        }
168    }
169}