1#![allow(missing_docs)]
2use std::{
3 num::{NonZeroU64, NonZeroUsize},
4 path::PathBuf,
5 process::ExitStatus,
6 sync::atomic::{AtomicUsize, Ordering},
7 time::Duration,
8};
9
10use exitcode::ExitCode;
11use futures::StreamExt;
12use tokio::runtime::{self, Runtime};
13use tokio::sync::{broadcast::error::RecvError, MutexGuard};
14use tokio_stream::wrappers::UnboundedReceiverStream;
15
16#[cfg(feature = "api")]
17use crate::{api, internal_events::ApiStarted};
18use crate::{
19 cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod},
20 config::{self, ComponentConfig, Config, ConfigPath},
21 heartbeat,
22 internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
23 signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
24 topology::{
25 ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
26 TopologyController,
27 },
28 trace,
29};
30use crate::{config::ComponentType, extra_context::ExtraContext};
31
32#[cfg(unix)]
33use std::os::unix::process::ExitStatusExt;
34#[cfg(windows)]
35use std::os::windows::process::ExitStatusExt;
36use tokio::runtime::Handle;
37
38static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
39
40pub fn worker_threads() -> Option<NonZeroUsize> {
41 NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
42}
43
44pub struct ApplicationConfig {
45 pub config_paths: Vec<config::ConfigPath>,
46 pub topology: RunningTopology,
47 pub graceful_crash_receiver: ShutdownErrorReceiver,
48 pub internal_topologies: Vec<RunningTopology>,
49 #[cfg(feature = "api")]
50 pub api: config::api::Options,
51 pub extra_context: ExtraContext,
52}
53
54pub struct Application {
55 pub root_opts: RootOpts,
56 pub config: ApplicationConfig,
57 pub signals: SignalPair,
58}
59
60impl ApplicationConfig {
61 pub async fn from_opts(
62 opts: &RootOpts,
63 signal_handler: &mut SignalHandler,
64 extra_context: ExtraContext,
65 ) -> Result<Self, ExitCode> {
66 let config_paths = opts.config_paths_with_formats();
67
68 let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
69 .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
70
71 let watcher_conf = if opts.watch_config {
72 Some(watcher_config(
73 opts.watch_config_method,
74 opts.watch_config_poll_interval_seconds,
75 ))
76 } else {
77 None
78 };
79
80 let config = load_configs(
81 &config_paths,
82 watcher_conf,
83 opts.require_healthy,
84 opts.allow_empty_config,
85 graceful_shutdown_duration,
86 signal_handler,
87 )
88 .await?;
89
90 Self::from_config(config_paths, config, extra_context).await
91 }
92
93 pub async fn from_config(
94 config_paths: Vec<ConfigPath>,
95 config: Config,
96 extra_context: ExtraContext,
97 ) -> Result<Self, ExitCode> {
98 #[cfg(feature = "api")]
99 let api = config.api;
100
101 let (topology, graceful_crash_receiver) =
102 RunningTopology::start_init_validated(config, extra_context.clone())
103 .await
104 .ok_or(exitcode::CONFIG)?;
105
106 Ok(Self {
107 config_paths,
108 topology,
109 graceful_crash_receiver,
110 internal_topologies: Vec::new(),
111 #[cfg(feature = "api")]
112 api,
113 extra_context,
114 })
115 }
116
117 pub async fn add_internal_config(
118 &mut self,
119 config: Config,
120 extra_context: ExtraContext,
121 ) -> Result<(), ExitCode> {
122 let Some((topology, _)) =
123 RunningTopology::start_init_validated(config, extra_context).await
124 else {
125 return Err(exitcode::CONFIG);
126 };
127 self.internal_topologies.push(topology);
128 Ok(())
129 }
130
131 #[cfg(feature = "api")]
133 pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
134 if self.api.enabled {
135 match api::Server::start(
136 self.topology.config(),
137 self.topology.watch(),
138 std::sync::Arc::clone(&self.topology.running),
139 handle,
140 ) {
141 Ok(api_server) => {
142 emit!(ApiStarted {
143 addr: self.api.address.unwrap(),
144 playground: self.api.playground,
145 graphql: self.api.graphql
146 });
147
148 Some(api_server)
149 }
150 Err(error) => {
151 let error = error.to_string();
152 error!("An error occurred that Vector couldn't handle: {}.", error);
153 _ = self
154 .topology
155 .abort_tx
156 .send(crate::signal::ShutdownError::ApiFailed { error });
157 None
158 }
159 }
160 } else {
161 info!(message="API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.");
162 None
163 }
164 }
165}
166
167impl Application {
168 pub fn run(extra_context: ExtraContext) -> ExitStatus {
169 let (runtime, app) =
170 Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
171
172 runtime.block_on(app.run())
173 }
174
175 pub fn prepare_start(
176 extra_context: ExtraContext,
177 ) -> Result<(Runtime, StartedApplication), ExitCode> {
178 Self::prepare(extra_context)
179 .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
180 }
181
182 pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
183 let opts = Opts::get_matches().map_err(|error| {
184 _ = error.print();
186 exitcode::USAGE
187 })?;
188
189 Self::prepare_from_opts(opts, extra_context)
190 }
191
192 pub fn prepare_from_opts(
193 opts: Opts,
194 extra_context: ExtraContext,
195 ) -> Result<(Runtime, Self), ExitCode> {
196 opts.root.init_global();
197
198 let color = opts.root.color.use_color();
199
200 init_logging(
201 color,
202 opts.root.log_format,
203 opts.log_level(),
204 opts.root.internal_log_rate_limit,
205 );
206
207 if opts.root.openssl_no_probe {
209 debug!(message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL.");
210 }
211
212 let runtime = build_runtime(opts.root.threads, "vector-worker")?;
213
214 let mut signals = SignalPair::new(&runtime);
216
217 if let Some(sub_command) = &opts.sub_command {
218 return Err(runtime.block_on(sub_command.execute(signals, color)));
219 }
220
221 let config = runtime.block_on(ApplicationConfig::from_opts(
222 &opts.root,
223 &mut signals.handler,
224 extra_context,
225 ))?;
226
227 Ok((
228 runtime,
229 Self {
230 root_opts: opts.root,
231 config,
232 signals,
233 },
234 ))
235 }
236
237 pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
238 crate::trace::stop_early_buffering();
241
242 emit!(VectorStarted);
243 handle.spawn(heartbeat::heartbeat());
244
245 let Self {
246 root_opts,
247 config,
248 signals,
249 } = self;
250
251 let topology_controller = SharedTopologyController::new(TopologyController {
252 #[cfg(feature = "api")]
253 api_server: config.setup_api(handle),
254 topology: config.topology,
255 config_paths: config.config_paths.clone(),
256 require_healthy: root_opts.require_healthy,
257 extra_context: config.extra_context,
258 });
259
260 Ok(StartedApplication {
261 config_paths: config.config_paths,
262 internal_topologies: config.internal_topologies,
263 graceful_crash_receiver: config.graceful_crash_receiver,
264 signals,
265 topology_controller,
266 allow_empty_config: root_opts.allow_empty_config,
267 })
268 }
269}
270
271pub struct StartedApplication {
272 pub config_paths: Vec<ConfigPath>,
273 pub internal_topologies: Vec<RunningTopology>,
274 pub graceful_crash_receiver: ShutdownErrorReceiver,
275 pub signals: SignalPair,
276 pub topology_controller: SharedTopologyController,
277 pub allow_empty_config: bool,
278}
279
280impl StartedApplication {
281 pub async fn run(self) -> ExitStatus {
282 self.main().await.shutdown().await
283 }
284
285 pub async fn main(self) -> FinishedApplication {
286 let Self {
287 config_paths,
288 graceful_crash_receiver,
289 signals,
290 topology_controller,
291 internal_topologies,
292 allow_empty_config,
293 } = self;
294
295 let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
296
297 let mut signal_handler = signals.handler;
298 let mut signal_rx = signals.receiver;
299
300 let signal = loop {
301 let has_sources = !topology_controller.lock().await.topology.config.is_empty();
302 tokio::select! {
303 signal = signal_rx.recv() => if let Some(signal) = handle_signal(
304 signal,
305 &topology_controller,
306 &config_paths,
307 &mut signal_handler,
308 allow_empty_config,
309 ).await {
310 break signal;
311 },
312 error = graceful_crash.next() => break SignalTo::Shutdown(error),
314 _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
315 info!("All sources have finished.");
316 break SignalTo::Shutdown(None)
317 } ,
318 else => unreachable!("Signal streams never end"),
319 }
320 };
321
322 FinishedApplication {
323 signal,
324 signal_rx,
325 topology_controller,
326 internal_topologies,
327 }
328 }
329}
330
331async fn handle_signal(
332 signal: Result<SignalTo, RecvError>,
333 topology_controller: &SharedTopologyController,
334 config_paths: &[ConfigPath],
335 signal_handler: &mut SignalHandler,
336 allow_empty_config: bool,
337) -> Option<SignalTo> {
338 match signal {
339 Ok(SignalTo::ReloadComponents(components_to_reload)) => {
340 let mut topology_controller = topology_controller.lock().await;
341 topology_controller
342 .topology
343 .extend_reload_set(components_to_reload);
344
345 if let Some(paths) = config::process_paths(config_paths) {
347 topology_controller.config_paths = paths;
348 }
349
350 let new_config = config::load_from_paths_with_provider_and_secrets(
352 &topology_controller.config_paths,
353 signal_handler,
354 allow_empty_config,
355 )
356 .await;
357
358 reload_config_from_result(topology_controller, new_config).await
359 }
360 Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
361 let topology_controller = topology_controller.lock().await;
362 reload_config_from_result(topology_controller, config_builder.build()).await
363 }
364 Ok(SignalTo::ReloadFromDisk) => {
365 let mut topology_controller = topology_controller.lock().await;
366
367 if let Some(paths) = config::process_paths(config_paths) {
369 topology_controller.config_paths = paths;
370 }
371
372 let new_config = config::load_from_paths_with_provider_and_secrets(
374 &topology_controller.config_paths,
375 signal_handler,
376 allow_empty_config,
377 )
378 .await;
379
380 reload_config_from_result(topology_controller, new_config).await
381 }
382 Ok(SignalTo::ReloadEnrichmentTables) => {
383 let topology_controller = topology_controller.lock().await;
384
385 topology_controller
386 .topology
387 .reload_enrichment_tables()
388 .await;
389 None
390 }
391 Err(RecvError::Lagged(amt)) => {
392 warn!("Overflow, dropped {} signals.", amt);
393 None
394 }
395 Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
396 Ok(signal) => Some(signal),
397 }
398}
399
400async fn reload_config_from_result(
401 mut topology_controller: MutexGuard<'_, TopologyController>,
402 config: Result<Config, Vec<String>>,
403) -> Option<SignalTo> {
404 match config {
405 Ok(new_config) => match topology_controller.reload(new_config).await {
406 ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
407 _ => None,
408 },
409 Err(errors) => {
410 handle_config_errors(errors);
411 emit!(VectorConfigLoadError);
412 None
413 }
414 }
415}
416
417pub struct FinishedApplication {
418 pub signal: SignalTo,
419 pub signal_rx: SignalRx,
420 pub topology_controller: SharedTopologyController,
421 pub internal_topologies: Vec<RunningTopology>,
422}
423
424impl FinishedApplication {
425 pub async fn shutdown(self) -> ExitStatus {
426 let FinishedApplication {
427 signal,
428 signal_rx,
429 topology_controller,
430 internal_topologies,
431 } = self;
432
433 let topology_controller = topology_controller
436 .try_into_inner()
437 .expect("fail to unwrap topology controller")
438 .into_inner();
439
440 let status = match signal {
441 SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
442 SignalTo::Quit => Self::quit(),
443 _ => unreachable!(),
444 };
445
446 for topology in internal_topologies {
447 topology.stop().await;
448 }
449
450 status
451 }
452
453 async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
454 emit!(VectorStopped);
455 tokio::select! {
456 _ = topology_controller.stop() => ExitStatus::from_raw({
457 #[cfg(windows)]
458 {
459 exitcode::OK as u32
460 }
461 #[cfg(unix)]
462 exitcode::OK
463 }), _ = signal_rx.recv() => Self::quit(),
465 }
466 }
467
468 fn quit() -> ExitStatus {
469 emit!(VectorQuit);
471 ExitStatus::from_raw({
472 #[cfg(windows)]
473 {
474 exitcode::UNAVAILABLE as u32
475 }
476 #[cfg(unix)]
477 exitcode::OK
478 })
479 }
480}
481
482fn get_log_levels(default: &str) -> String {
483 std::env::var("VECTOR_LOG")
484 .or_else(|_| {
485 std::env::var("LOG").inspect(|_log| {
486 warn!(
487 message =
488 "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
489 );
490 })
491 })
492 .unwrap_or_else(|_| default.into())
493}
494
495pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtime, ExitCode> {
496 let mut rt_builder = runtime::Builder::new_multi_thread();
497 rt_builder.max_blocking_threads(20_000);
498 rt_builder.enable_all().thread_name(thread_name);
499
500 let threads = threads.unwrap_or_else(crate::num_threads);
501 if threads == 0 {
502 error!("The `threads` argument must be greater or equal to 1.");
503 return Err(exitcode::CONFIG);
504 }
505 WORKER_THREADS
506 .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
507 .unwrap_or_else(|_| panic!("double thread initialization"));
508 rt_builder.worker_threads(threads);
509
510 debug!(messaged = "Building runtime.", worker_threads = threads);
511 Ok(rt_builder.build().expect("Unable to create async runtime"))
512}
513
514pub async fn load_configs(
515 config_paths: &[ConfigPath],
516 watcher_conf: Option<config::watcher::WatcherConfig>,
517 require_healthy: Option<bool>,
518 allow_empty_config: bool,
519 graceful_shutdown_duration: Option<Duration>,
520 signal_handler: &mut SignalHandler,
521) -> Result<Config, ExitCode> {
522 let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
523
524 let watched_paths = config_paths
525 .iter()
526 .map(<&PathBuf>::from)
527 .collect::<Vec<_>>();
528
529 info!(
530 message = "Loading configs.",
531 paths = ?watched_paths
532 );
533
534 let mut config = config::load_from_paths_with_provider_and_secrets(
535 &config_paths,
536 signal_handler,
537 allow_empty_config,
538 )
539 .await
540 .map_err(handle_config_errors)?;
541
542 let mut watched_component_paths = Vec::new();
543
544 if let Some(watcher_conf) = watcher_conf {
545 for (name, transform) in config.transforms() {
546 let files = transform.inner.files_to_watch();
547 let component_config = ComponentConfig::new(
548 files.into_iter().cloned().collect(),
549 name.clone(),
550 ComponentType::Transform,
551 );
552 watched_component_paths.push(component_config);
553 }
554
555 for (name, sink) in config.sinks() {
556 let files = sink.inner.files_to_watch();
557 let component_config = ComponentConfig::new(
558 files.into_iter().cloned().collect(),
559 name.clone(),
560 ComponentType::Sink,
561 );
562 watched_component_paths.push(component_config);
563 }
564
565 for (name, table) in config.enrichment_tables() {
566 let files = table.inner.files_to_watch();
567 let component_config = ComponentConfig::new(
568 files.into_iter().cloned().collect(),
569 name.clone(),
570 ComponentType::EnrichmentTable,
571 );
572 watched_component_paths.push(component_config);
573 }
574
575 info!(
576 message = "Starting watcher.",
577 paths = ?watched_paths
578 );
579 info!(
580 message = "Components to watch.",
581 paths = ?watched_component_paths
582 );
583
584 config::watcher::spawn_thread(
586 watcher_conf,
587 signal_handler.clone_tx(),
588 watched_paths,
589 watched_component_paths,
590 None,
591 )
592 .map_err(|error| {
593 error!(message = "Unable to start config watcher.", %error);
594 exitcode::CONFIG
595 })?;
596 }
597
598 config::init_log_schema(config.global.log_schema.clone(), true);
599 config::init_telemetry(config.global.telemetry.clone(), true);
600
601 if !config.healthchecks.enabled {
602 info!("Health checks are disabled.");
603 }
604 config.healthchecks.set_require_healthy(require_healthy);
605 config.graceful_shutdown_duration = graceful_shutdown_duration;
606
607 Ok(config)
608}
609
610pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) {
611 let level = get_log_levels(log_level);
612 let json = match format {
613 LogFormat::Text => false,
614 LogFormat::Json => true,
615 };
616
617 trace::init(color, json, &level, rate);
618 debug!(
619 message = "Internal log rate limit configured.",
620 internal_log_rate_secs = rate,
621 );
622 info!(message = "Log level is enabled.", level = ?level);
623}
624
625pub fn watcher_config(
626 method: WatchConfigMethod,
627 interval: NonZeroU64,
628) -> config::watcher::WatcherConfig {
629 match method {
630 WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
631 WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
632 }
633}