vector/topology/
controller.rs1use std::sync::Arc;
2
3use futures_util::FutureExt as _;
4use tokio::sync::{Mutex, MutexGuard};
5
6#[cfg(feature = "api")]
7use crate::api;
8use crate::{
9 config,
10 extra_context::ExtraContext,
11 internal_events::{VectorRecoveryError, VectorReloadError, VectorReloaded},
12 signal::ShutdownError,
13 topology::{ReloadError, RunningTopology},
14};
15
16#[derive(Clone, Debug)]
17pub struct SharedTopologyController(Arc<Mutex<TopologyController>>);
18
19impl SharedTopologyController {
20 pub fn new(inner: TopologyController) -> Self {
21 Self(Arc::new(Mutex::new(inner)))
22 }
23
24 pub fn blocking_lock(&self) -> MutexGuard<'_, TopologyController> {
25 self.0.blocking_lock()
26 }
27
28 pub async fn lock(&self) -> MutexGuard<'_, TopologyController> {
29 self.0.lock().await
30 }
31
32 pub fn try_into_inner(self) -> Result<Mutex<TopologyController>, Self> {
33 Arc::try_unwrap(self.0).map_err(Self)
34 }
35}
36
37pub struct TopologyController {
38 pub topology: RunningTopology,
39 pub config_paths: Vec<config::ConfigPath>,
40 pub require_healthy: Option<bool>,
41 #[cfg(feature = "api")]
42 pub api_server: Option<api::Server>,
43 pub extra_context: ExtraContext,
44}
45
46impl std::fmt::Debug for TopologyController {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("TopologyController")
49 .field("config_paths", &self.config_paths)
50 .field("require_healthy", &self.require_healthy)
51 .finish()
52 }
53}
54
55#[derive(Clone, Debug)]
56pub enum ReloadOutcome {
57 MissingApiKey,
58 Success,
59 RolledBack,
60 FatalError(ShutdownError),
61}
62
63impl TopologyController {
64 pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
65 new_config
66 .healthchecks
67 .set_require_healthy(self.require_healthy);
68
69 #[cfg(feature = "api")]
71 if !new_config.api.enabled {
72 if let Some(server) = self.api_server.take() {
73 debug!("Dropping api server.");
74 drop(server)
75 }
76 } else if self.api_server.is_none() {
77 use std::sync::atomic::AtomicBool;
78
79 use tokio::runtime::Handle;
80
81 use crate::internal_events::ApiStarted;
82
83 debug!("Starting api server.");
84
85 self.api_server = match api::Server::start(
86 self.topology.config(),
87 self.topology.watch(),
88 Arc::<AtomicBool>::clone(&self.topology.running),
89 &Handle::current(),
90 ) {
91 Ok(api_server) => {
92 emit!(ApiStarted {
93 addr: new_config.api.address.unwrap(),
94 playground: new_config.api.playground,
95 graphql: new_config.api.graphql,
96 });
97
98 Some(api_server)
99 }
100 Err(error) => {
101 let error = error.to_string();
102 error!("An error occurred that Vector couldn't handle: {}.", error);
103 return ReloadOutcome::FatalError(ShutdownError::ApiFailed { error });
104 }
105 }
106 }
107
108 match self
109 .topology
110 .reload_config_and_respawn(new_config, self.extra_context.clone())
111 .await
112 {
113 Ok(()) => {
114 #[cfg(feature = "api")]
115 if let Some(ref api_server) = self.api_server {
117 api_server.update_config(self.topology.config());
118 }
119
120 emit!(VectorReloaded {
121 config_paths: &self.config_paths
122 });
123 ReloadOutcome::Success
124 }
125 Err(ReloadError::GlobalOptionsChanged { changed_fields }) => {
126 error!(
127 message = "Config reload rejected due to non-reloadable global options.",
128 changed_fields = %changed_fields.join(", "),
129 internal_log_rate_limit = false,
130 );
131 emit!(VectorReloadError {
132 reason: "global_options_changed",
133 });
134 ReloadOutcome::RolledBack
135 }
136 Err(ReloadError::GlobalDiffFailed { source }) => {
137 error!(
138 message = "Config reload rejected because computing global diff failed.",
139 error = %source,
140 internal_log_rate_limit = false,
141 );
142 emit!(VectorReloadError {
143 reason: "global_diff_failed",
144 });
145 ReloadOutcome::RolledBack
146 }
147 Err(ReloadError::TopologyBuildFailed) => {
148 emit!(VectorReloadError {
149 reason: "topology_build_failed",
150 });
151 ReloadOutcome::RolledBack
152 }
153 Err(ReloadError::FailedToRestore) => {
154 emit!(VectorReloadError {
155 reason: "restore_failed",
156 });
157 emit!(VectorRecoveryError);
158 ReloadOutcome::FatalError(ShutdownError::ReloadFailedToRestore)
159 }
160 }
161 }
162
163 pub async fn stop(self) {
164 self.topology.stop().await;
165 }
166
167 pub async fn sources_finished(mutex: SharedTopologyController) {
181 loop {
182 let initial = {
185 let tc = mutex.lock().await;
186 tc.topology.sources_finished()
187 };
188 initial.await;
189
190 let top = mutex.lock().await;
193 if top.topology.sources_finished().now_or_never().is_some() {
194 return;
195 } else {
196 continue;
197 }
198 }
199 }
200}