vector/topology/
controller.rs

1use std::sync::Arc;
2
3use futures_util::FutureExt as _;
4use tokio::sync::{Mutex, MutexGuard};
5
6#[cfg(feature = "api")]
7use crate::api;
8use crate::{
9    config,
10    extra_context::ExtraContext,
11    internal_events::{VectorRecoveryError, VectorReloadError, VectorReloaded},
12    signal::ShutdownError,
13    topology::RunningTopology,
14};
15
16#[derive(Clone, Debug)]
17pub struct SharedTopologyController(Arc<Mutex<TopologyController>>);
18
19impl SharedTopologyController {
20    pub fn new(inner: TopologyController) -> Self {
21        Self(Arc::new(Mutex::new(inner)))
22    }
23
24    pub fn blocking_lock(&self) -> MutexGuard<'_, TopologyController> {
25        self.0.blocking_lock()
26    }
27
28    pub async fn lock(&self) -> MutexGuard<'_, TopologyController> {
29        self.0.lock().await
30    }
31
32    pub fn try_into_inner(self) -> Result<Mutex<TopologyController>, Self> {
33        Arc::try_unwrap(self.0).map_err(Self)
34    }
35}
36
37pub struct TopologyController {
38    pub topology: RunningTopology,
39    pub config_paths: Vec<config::ConfigPath>,
40    pub require_healthy: Option<bool>,
41    #[cfg(feature = "api")]
42    pub api_server: Option<api::Server>,
43    pub extra_context: ExtraContext,
44}
45
46impl std::fmt::Debug for TopologyController {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("TopologyController")
49            .field("config_paths", &self.config_paths)
50            .field("require_healthy", &self.require_healthy)
51            .finish()
52    }
53}
54
55#[derive(Clone, Debug)]
56pub enum ReloadOutcome {
57    MissingApiKey,
58    Success,
59    RolledBack,
60    FatalError(ShutdownError),
61}
62
63impl TopologyController {
64    pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
65        new_config
66            .healthchecks
67            .set_require_healthy(self.require_healthy);
68
69        // Start the api server or disable it, if necessary
70        #[cfg(feature = "api")]
71        if !new_config.api.enabled {
72            if let Some(server) = self.api_server.take() {
73                debug!("Dropping api server.");
74                drop(server)
75            }
76        } else if self.api_server.is_none() {
77            use std::sync::atomic::AtomicBool;
78
79            use tokio::runtime::Handle;
80
81            use crate::internal_events::ApiStarted;
82
83            debug!("Starting api server.");
84
85            self.api_server = match api::Server::start(
86                self.topology.config(),
87                self.topology.watch(),
88                Arc::<AtomicBool>::clone(&self.topology.running),
89                &Handle::current(),
90            ) {
91                Ok(api_server) => {
92                    emit!(ApiStarted {
93                        addr: new_config.api.address.unwrap(),
94                        playground: new_config.api.playground,
95                        graphql: new_config.api.graphql,
96                    });
97
98                    Some(api_server)
99                }
100                Err(error) => {
101                    let error = error.to_string();
102                    error!("An error occurred that Vector couldn't handle: {}.", error);
103                    return ReloadOutcome::FatalError(ShutdownError::ApiFailed { error });
104                }
105            }
106        }
107
108        match self
109            .topology
110            .reload_config_and_respawn(new_config, self.extra_context.clone())
111            .await
112        {
113            Ok(true) => {
114                #[cfg(feature = "api")]
115                // Pass the new config to the API server.
116                if let Some(ref api_server) = self.api_server {
117                    api_server.update_config(self.topology.config());
118                }
119
120                emit!(VectorReloaded {
121                    config_paths: &self.config_paths
122                });
123                ReloadOutcome::Success
124            }
125            Ok(false) => {
126                emit!(VectorReloadError);
127                ReloadOutcome::RolledBack
128            }
129            // Trigger graceful shutdown for what remains of the topology
130            Err(()) => {
131                emit!(VectorReloadError);
132                emit!(VectorRecoveryError);
133                ReloadOutcome::FatalError(ShutdownError::ReloadFailedToRestore)
134            }
135        }
136    }
137
138    pub async fn stop(self) {
139        self.topology.stop().await;
140    }
141
142    // The `sources_finished` method on `RunningTopology` only considers sources that are currently
143    // running at the time the method is called. This presents a problem when the set of running
144    // sources can change while we are waiting on the resulting future to resolve.
145    //
146    // This function resolves that issue by waiting in two stages. The first is the usual asynchronous
147    // wait for the future to complete. When it does, we know that all of the sources that existed when
148    // the future was built have finished, but we don't know if that's because they were replaced as
149    // part of a reload (in which case we don't want to return yet). To differentiate, we acquire the
150    // lock on the topology, create a new future, and check whether it resolves immediately or not. If
151    // it does resolve, we know all sources are truly finished because we held the lock during the
152    // check, preventing anyone else from adding new sources. If it does not resolve, that indicates
153    // that new sources have been added since our original call and we should start the process over to
154    // continue waiting.
155    pub async fn sources_finished(mutex: SharedTopologyController) {
156        loop {
157            // Do an initial async wait while the topology is running, making sure not the hold the
158            // mutex lock while we wait on sources to finish.
159            let initial = {
160                let tc = mutex.lock().await;
161                tc.topology.sources_finished()
162            };
163            initial.await;
164
165            // Once the initial signal is tripped, hold lock on the topology while checking again. This
166            // ensures that no other task is adding new sources.
167            let top = mutex.lock().await;
168            if top.topology.sources_finished().now_or_never().is_some() {
169                return;
170            } else {
171                continue;
172            }
173        }
174    }
175}