vector/topology/
controller.rs1use std::sync::Arc;
2
3use futures_util::FutureExt as _;
4use tokio::sync::{Mutex, MutexGuard};
5
6#[cfg(feature = "api")]
7use crate::api;
8use crate::{
9 config,
10 extra_context::ExtraContext,
11 internal_events::{VectorRecoveryError, VectorReloadError, VectorReloaded},
12 signal::ShutdownError,
13 topology::RunningTopology,
14};
15
16#[derive(Clone, Debug)]
17pub struct SharedTopologyController(Arc<Mutex<TopologyController>>);
18
19impl SharedTopologyController {
20 pub fn new(inner: TopologyController) -> Self {
21 Self(Arc::new(Mutex::new(inner)))
22 }
23
24 pub fn blocking_lock(&self) -> MutexGuard<'_, TopologyController> {
25 self.0.blocking_lock()
26 }
27
28 pub async fn lock(&self) -> MutexGuard<'_, TopologyController> {
29 self.0.lock().await
30 }
31
32 pub fn try_into_inner(self) -> Result<Mutex<TopologyController>, Self> {
33 Arc::try_unwrap(self.0).map_err(Self)
34 }
35}
36
37pub struct TopologyController {
38 pub topology: RunningTopology,
39 pub config_paths: Vec<config::ConfigPath>,
40 pub require_healthy: Option<bool>,
41 #[cfg(feature = "api")]
42 pub api_server: Option<api::Server>,
43 pub extra_context: ExtraContext,
44}
45
46impl std::fmt::Debug for TopologyController {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("TopologyController")
49 .field("config_paths", &self.config_paths)
50 .field("require_healthy", &self.require_healthy)
51 .finish()
52 }
53}
54
55#[derive(Clone, Debug)]
56pub enum ReloadOutcome {
57 MissingApiKey,
58 Success,
59 RolledBack,
60 FatalError(ShutdownError),
61}
62
63impl TopologyController {
64 pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
65 new_config
66 .healthchecks
67 .set_require_healthy(self.require_healthy);
68
69 #[cfg(feature = "api")]
71 if !new_config.api.enabled {
72 if let Some(server) = self.api_server.take() {
73 debug!("Dropping api server.");
74 drop(server)
75 }
76 } else if self.api_server.is_none() {
77 use std::sync::atomic::AtomicBool;
78
79 use tokio::runtime::Handle;
80
81 use crate::internal_events::ApiStarted;
82
83 debug!("Starting api server.");
84
85 self.api_server = match api::Server::start(
86 self.topology.config(),
87 self.topology.watch(),
88 Arc::<AtomicBool>::clone(&self.topology.running),
89 &Handle::current(),
90 ) {
91 Ok(api_server) => {
92 emit!(ApiStarted {
93 addr: new_config.api.address.unwrap(),
94 playground: new_config.api.playground,
95 graphql: new_config.api.graphql,
96 });
97
98 Some(api_server)
99 }
100 Err(error) => {
101 let error = error.to_string();
102 error!("An error occurred that Vector couldn't handle: {}.", error);
103 return ReloadOutcome::FatalError(ShutdownError::ApiFailed { error });
104 }
105 }
106 }
107
108 match self
109 .topology
110 .reload_config_and_respawn(new_config, self.extra_context.clone())
111 .await
112 {
113 Ok(true) => {
114 #[cfg(feature = "api")]
115 if let Some(ref api_server) = self.api_server {
117 api_server.update_config(self.topology.config());
118 }
119
120 emit!(VectorReloaded {
121 config_paths: &self.config_paths
122 });
123 ReloadOutcome::Success
124 }
125 Ok(false) => {
126 emit!(VectorReloadError);
127 ReloadOutcome::RolledBack
128 }
129 Err(()) => {
131 emit!(VectorReloadError);
132 emit!(VectorRecoveryError);
133 ReloadOutcome::FatalError(ShutdownError::ReloadFailedToRestore)
134 }
135 }
136 }
137
138 pub async fn stop(self) {
139 self.topology.stop().await;
140 }
141
142 pub async fn sources_finished(mutex: SharedTopologyController) {
156 loop {
157 let initial = {
160 let tc = mutex.lock().await;
161 tc.topology.sources_finished()
162 };
163 initial.await;
164
165 let top = mutex.lock().await;
168 if top.topology.sources_finished().now_or_never().is_some() {
169 return;
170 } else {
171 continue;
172 }
173 }
174 }
175}