vector/topology/
controller.rs1use std::sync::Arc;
2
3#[cfg(feature = "api")]
4use crate::api;
5use crate::extra_context::ExtraContext;
6use crate::internal_events::{VectorRecoveryError, VectorReloadError, VectorReloaded};
7use futures_util::FutureExt as _;
8use tokio::sync::{Mutex, MutexGuard};
9
10use crate::{config, signal::ShutdownError, topology::RunningTopology};
11
12#[derive(Clone, Debug)]
13pub struct SharedTopologyController(Arc<Mutex<TopologyController>>);
14
15impl SharedTopologyController {
16 pub fn new(inner: TopologyController) -> Self {
17 Self(Arc::new(Mutex::new(inner)))
18 }
19
20 pub fn blocking_lock(&self) -> MutexGuard<TopologyController> {
21 self.0.blocking_lock()
22 }
23
24 pub async fn lock(&self) -> MutexGuard<TopologyController> {
25 self.0.lock().await
26 }
27
28 pub fn try_into_inner(self) -> Result<Mutex<TopologyController>, Self> {
29 Arc::try_unwrap(self.0).map_err(Self)
30 }
31}
32
33pub struct TopologyController {
34 pub topology: RunningTopology,
35 pub config_paths: Vec<config::ConfigPath>,
36 pub require_healthy: Option<bool>,
37 #[cfg(feature = "api")]
38 pub api_server: Option<api::Server>,
39 pub extra_context: ExtraContext,
40}
41
42impl std::fmt::Debug for TopologyController {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("TopologyController")
45 .field("config_paths", &self.config_paths)
46 .field("require_healthy", &self.require_healthy)
47 .finish()
48 }
49}
50
51#[derive(Clone, Debug)]
52pub enum ReloadOutcome {
53 MissingApiKey,
54 Success,
55 RolledBack,
56 FatalError(ShutdownError),
57}
58
59impl TopologyController {
60 pub async fn reload(&mut self, mut new_config: config::Config) -> ReloadOutcome {
61 new_config
62 .healthchecks
63 .set_require_healthy(self.require_healthy);
64
65 #[cfg(feature = "api")]
67 if !new_config.api.enabled {
68 if let Some(server) = self.api_server.take() {
69 debug!("Dropping api server.");
70 drop(server)
71 }
72 } else if self.api_server.is_none() {
73 use crate::internal_events::ApiStarted;
74 use std::sync::atomic::AtomicBool;
75 use tokio::runtime::Handle;
76
77 debug!("Starting api server.");
78
79 self.api_server = match api::Server::start(
80 self.topology.config(),
81 self.topology.watch(),
82 Arc::<AtomicBool>::clone(&self.topology.running),
83 &Handle::current(),
84 ) {
85 Ok(api_server) => {
86 emit!(ApiStarted {
87 addr: new_config.api.address.unwrap(),
88 playground: new_config.api.playground,
89 graphql: new_config.api.graphql,
90 });
91
92 Some(api_server)
93 }
94 Err(error) => {
95 let error = error.to_string();
96 error!("An error occurred that Vector couldn't handle: {}.", error);
97 return ReloadOutcome::FatalError(ShutdownError::ApiFailed { error });
98 }
99 }
100 }
101
102 match self
103 .topology
104 .reload_config_and_respawn(new_config, self.extra_context.clone())
105 .await
106 {
107 Ok(true) => {
108 #[cfg(feature = "api")]
109 if let Some(ref api_server) = self.api_server {
111 api_server.update_config(self.topology.config());
112 }
113
114 emit!(VectorReloaded {
115 config_paths: &self.config_paths
116 });
117 ReloadOutcome::Success
118 }
119 Ok(false) => {
120 emit!(VectorReloadError);
121 ReloadOutcome::RolledBack
122 }
123 Err(()) => {
125 emit!(VectorReloadError);
126 emit!(VectorRecoveryError);
127 ReloadOutcome::FatalError(ShutdownError::ReloadFailedToRestore)
128 }
129 }
130 }
131
132 pub async fn stop(self) {
133 self.topology.stop().await;
134 }
135
136 pub async fn sources_finished(mutex: SharedTopologyController) {
150 loop {
151 let initial = {
154 let tc = mutex.lock().await;
155 tc.topology.sources_finished()
156 };
157 initial.await;
158
159 let top = mutex.lock().await;
162 if top.topology.sources_finished().now_or_never().is_some() {
163 return;
164 } else {
165 continue;
166 }
167 }
168 }
169}