vector/topology/
controller.rs

1use std::sync::Arc;
2
3use futures_util::FutureExt as _;
4use tokio::sync::{Mutex, MutexGuard};
5
6#[cfg(feature = "api")]
7use crate::api;
8use crate::{
9    config,
10    extra_context::ExtraContext,
11    internal_events::{VectorRecoveryError, VectorReloadError, VectorReloaded},
12    signal::ShutdownError,
13    topology::{ReloadError, RunningTopology},
14};
15
16#[derive(Clone, Debug)]
17pub struct SharedTopologyController(Arc<Mutex<TopologyController>>);
18
19impl SharedTopologyController {
20    pub fn new(inner: TopologyController) -> Self {
21        Self(Arc::new(Mutex::new(inner)))
22    }
23
24    pub fn blocking_lock(&self) -> MutexGuard<'_, TopologyController> {
25        self.0.blocking_lock()
26    }
27
28    pub async fn lock(&self) -> MutexGuard<'_, TopologyController> {
29        self.0.lock().await
30    }
31
32    pub fn try_into_inner(self) -> Result<Mutex<TopologyController>, Self> {
33        Arc::try_unwrap(self.0).map_err(Self)
34    }
35}
36
37pub struct TopologyController {
38    pub topology: RunningTopology,
39    pub config_paths: Vec<config::ConfigPath>,
40    pub require_healthy: Option<bool>,
41    #[cfg(feature = "api")]
42    pub api_server: Option<api::Server>,
43    pub extra_context: ExtraContext,
44}
45
46impl std::fmt::Debug for TopologyController {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("TopologyController")
49            .field("config_paths", &self.config_paths)
50            .field("require_healthy", &self.require_healthy)
51            .finish()
52    }
53}
54
55#[derive(Clone, Debug)]
56pub enum ReloadOutcome {
57    MissingApiKey,
58    Success,
59    RolledBack,
60    FatalError(ShutdownError),
61}
62
63impl TopologyController {
64    pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
65        new_config
66            .healthchecks
67            .set_require_healthy(self.require_healthy);
68
69        // Start the api server or disable it, if necessary
70        #[cfg(feature = "api")]
71        if !new_config.api.enabled {
72            if let Some(server) = self.api_server.take() {
73                debug!("Dropping api server.");
74                drop(server)
75            }
76        } else if self.api_server.is_none() {
77            use std::sync::atomic::AtomicBool;
78
79            use tokio::runtime::Handle;
80
81            use crate::internal_events::ApiStarted;
82
83            debug!("Starting api server.");
84
85            self.api_server = match api::Server::start(
86                self.topology.config(),
87                self.topology.watch(),
88                Arc::<AtomicBool>::clone(&self.topology.running),
89                &Handle::current(),
90            ) {
91                Ok(api_server) => {
92                    emit!(ApiStarted {
93                        addr: new_config.api.address.unwrap(),
94                        playground: new_config.api.playground,
95                        graphql: new_config.api.graphql,
96                    });
97
98                    Some(api_server)
99                }
100                Err(error) => {
101                    let error = error.to_string();
102                    error!("An error occurred that Vector couldn't handle: {}.", error);
103                    return ReloadOutcome::FatalError(ShutdownError::ApiFailed { error });
104                }
105            }
106        }
107
108        match self
109            .topology
110            .reload_config_and_respawn(new_config, self.extra_context.clone())
111            .await
112        {
113            Ok(()) => {
114                #[cfg(feature = "api")]
115                // Pass the new config to the API server.
116                if let Some(ref api_server) = self.api_server {
117                    api_server.update_config(self.topology.config());
118                }
119
120                emit!(VectorReloaded {
121                    config_paths: &self.config_paths
122                });
123                ReloadOutcome::Success
124            }
125            Err(ReloadError::GlobalOptionsChanged { changed_fields }) => {
126                error!(
127                    message = "Config reload rejected due to non-reloadable global options.",
128                    changed_fields = %changed_fields.join(", "),
129                    internal_log_rate_limit = false,
130                );
131                emit!(VectorReloadError {
132                    reason: "global_options_changed",
133                });
134                ReloadOutcome::RolledBack
135            }
136            Err(ReloadError::GlobalDiffFailed { source }) => {
137                error!(
138                    message = "Config reload rejected because computing global diff failed.",
139                    error = %source,
140                    internal_log_rate_limit = false,
141                );
142                emit!(VectorReloadError {
143                    reason: "global_diff_failed",
144                });
145                ReloadOutcome::RolledBack
146            }
147            Err(ReloadError::TopologyBuildFailed) => {
148                emit!(VectorReloadError {
149                    reason: "topology_build_failed",
150                });
151                ReloadOutcome::RolledBack
152            }
153            Err(ReloadError::FailedToRestore) => {
154                emit!(VectorReloadError {
155                    reason: "restore_failed",
156                });
157                emit!(VectorRecoveryError);
158                ReloadOutcome::FatalError(ShutdownError::ReloadFailedToRestore)
159            }
160        }
161    }
162
163    pub async fn stop(self) {
164        self.topology.stop().await;
165    }
166
167    // The `sources_finished` method on `RunningTopology` only considers sources that are currently
168    // running at the time the method is called. This presents a problem when the set of running
169    // sources can change while we are waiting on the resulting future to resolve.
170    //
171    // This function resolves that issue by waiting in two stages. The first is the usual asynchronous
172    // wait for the future to complete. When it does, we know that all of the sources that existed when
173    // the future was built have finished, but we don't know if that's because they were replaced as
174    // part of a reload (in which case we don't want to return yet). To differentiate, we acquire the
175    // lock on the topology, create a new future, and check whether it resolves immediately or not. If
176    // it does resolve, we know all sources are truly finished because we held the lock during the
177    // check, preventing anyone else from adding new sources. If it does not resolve, that indicates
178    // that new sources have been added since our original call and we should start the process over to
179    // continue waiting.
180    pub async fn sources_finished(mutex: SharedTopologyController) {
181        loop {
182            // Do an initial async wait while the topology is running, making sure not the hold the
183            // mutex lock while we wait on sources to finish.
184            let initial = {
185                let tc = mutex.lock().await;
186                tc.topology.sources_finished()
187            };
188            initial.await;
189
190            // Once the initial signal is tripped, hold lock on the topology while checking again. This
191            // ensures that no other task is adding new sources.
192            let top = mutex.lock().await;
193            if top.topology.sources_finished().now_or_never().is_some() {
194                return;
195            } else {
196                continue;
197            }
198        }
199    }
200}