1#![allow(missing_docs)]
2#[cfg(unix)]
3use std::os::unix::process::ExitStatusExt;
4#[cfg(windows)]
5use std::os::windows::process::ExitStatusExt;
6use std::{
7 num::{NonZeroU64, NonZeroUsize},
8 path::PathBuf,
9 process::ExitStatus,
10 sync::atomic::{AtomicUsize, Ordering},
11 time::Duration,
12};
13
14use exitcode::ExitCode;
15use futures::StreamExt;
16use tokio::{
17 runtime::{self, Handle, Runtime},
18 sync::{MutexGuard, broadcast::error::RecvError},
19};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22#[cfg(feature = "api")]
23use crate::{api, internal_events::ApiStarted};
24use crate::{
25 cli::{LogFormat, Opts, RootOpts, WatchConfigMethod, handle_config_errors},
26 config::{self, ComponentConfig, ComponentType, Config, ConfigPath},
27 extra_context::ExtraContext,
28 heartbeat,
29 internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
30 signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
31 topology::{
32 ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
33 TopologyController,
34 },
35 trace,
36};
37
38static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
39
40pub fn worker_threads() -> Option<NonZeroUsize> {
41 NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
42}
43
44pub struct ApplicationConfig {
45 pub config_paths: Vec<config::ConfigPath>,
46 pub topology: RunningTopology,
47 pub graceful_crash_receiver: ShutdownErrorReceiver,
48 pub internal_topologies: Vec<RunningTopology>,
49 #[cfg(feature = "api")]
50 pub api: config::api::Options,
51 pub extra_context: ExtraContext,
52}
53
54pub struct Application {
55 pub root_opts: RootOpts,
56 pub config: ApplicationConfig,
57 pub signals: SignalPair,
58}
59
60impl ApplicationConfig {
61 pub async fn from_opts(
62 opts: &RootOpts,
63 signal_handler: &mut SignalHandler,
64 extra_context: ExtraContext,
65 ) -> Result<Self, ExitCode> {
66 let config_paths = opts.config_paths_with_formats();
67
68 let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
69 .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
70
71 let watcher_conf = if opts.watch_config {
72 Some(watcher_config(
73 opts.watch_config_method,
74 opts.watch_config_poll_interval_seconds,
75 ))
76 } else {
77 None
78 };
79
80 let config = load_configs(
81 &config_paths,
82 watcher_conf,
83 opts.require_healthy,
84 opts.allow_empty_config,
85 graceful_shutdown_duration,
86 signal_handler,
87 )
88 .await?;
89
90 Self::from_config(config_paths, config, extra_context).await
91 }
92
93 pub async fn from_config(
94 config_paths: Vec<ConfigPath>,
95 config: Config,
96 extra_context: ExtraContext,
97 ) -> Result<Self, ExitCode> {
98 #[cfg(feature = "api")]
99 let api = config.api;
100
101 let (topology, graceful_crash_receiver) =
102 RunningTopology::start_init_validated(config, extra_context.clone())
103 .await
104 .ok_or(exitcode::CONFIG)?;
105
106 Ok(Self {
107 config_paths,
108 topology,
109 graceful_crash_receiver,
110 internal_topologies: Vec::new(),
111 #[cfg(feature = "api")]
112 api,
113 extra_context,
114 })
115 }
116
117 pub async fn add_internal_config(
118 &mut self,
119 config: Config,
120 extra_context: ExtraContext,
121 ) -> Result<(), ExitCode> {
122 let Some((topology, _)) =
123 RunningTopology::start_init_validated(config, extra_context).await
124 else {
125 return Err(exitcode::CONFIG);
126 };
127 self.internal_topologies.push(topology);
128 Ok(())
129 }
130
131 #[cfg(feature = "api")]
133 pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
134 if self.api.enabled {
135 match api::Server::start(
136 self.topology.config(),
137 self.topology.watch(),
138 std::sync::Arc::clone(&self.topology.running),
139 handle,
140 ) {
141 Ok(api_server) => {
142 emit!(ApiStarted {
143 addr: self.api.address.unwrap(),
144 playground: self.api.playground,
145 graphql: self.api.graphql
146 });
147
148 Some(api_server)
149 }
150 Err(error) => {
151 let error = error.to_string();
152 error!(message = "An error occurred that Vector couldn't handle.", %error, internal_log_rate_limit = false);
153 _ = self
154 .topology
155 .abort_tx
156 .send(crate::signal::ShutdownError::ApiFailed { error });
157 None
158 }
159 }
160 } else {
161 info!(
162 message = "API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`."
163 );
164 None
165 }
166 }
167}
168
169impl Application {
170 pub fn run(extra_context: ExtraContext) -> ExitStatus {
171 let (runtime, app) =
172 Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
173
174 runtime.block_on(app.run())
175 }
176
177 pub fn prepare_start(
178 extra_context: ExtraContext,
179 ) -> Result<(Runtime, StartedApplication), ExitCode> {
180 Self::prepare(extra_context)
181 .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
182 }
183
184 pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
185 let opts = Opts::get_matches().map_err(|error| {
186 _ = error.print();
188 exitcode::USAGE
189 })?;
190
191 Self::prepare_from_opts(opts, extra_context)
192 }
193
194 pub fn prepare_from_opts(
195 opts: Opts,
196 extra_context: ExtraContext,
197 ) -> Result<(Runtime, Self), ExitCode> {
198 opts.root.init_global();
199
200 let color = opts.root.color.use_color();
201
202 init_logging(
203 color,
204 opts.root.log_format,
205 opts.log_level(),
206 opts.root.internal_log_rate_limit,
207 );
208
209 crate::set_global_color(color);
211
212 if opts.root.openssl_no_probe {
214 debug!(
215 message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL."
216 );
217 }
218
219 let runtime = build_runtime(opts.root.threads, "vector-worker")?;
220
221 let mut signals = SignalPair::new(&runtime);
223
224 if let Some(sub_command) = &opts.sub_command {
225 return Err(runtime.block_on(sub_command.execute(signals, color)));
226 }
227
228 let config = runtime.block_on(ApplicationConfig::from_opts(
229 &opts.root,
230 &mut signals.handler,
231 extra_context,
232 ))?;
233
234 Ok((
235 runtime,
236 Self {
237 root_opts: opts.root,
238 config,
239 signals,
240 },
241 ))
242 }
243
244 pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
245 crate::trace::stop_early_buffering();
248
249 emit!(VectorStarted);
250 handle.spawn(heartbeat::heartbeat());
251
252 let Self {
253 root_opts,
254 config,
255 signals,
256 } = self;
257
258 let topology_controller = SharedTopologyController::new(TopologyController {
259 #[cfg(feature = "api")]
260 api_server: config.setup_api(handle),
261 topology: config.topology,
262 config_paths: config.config_paths.clone(),
263 require_healthy: root_opts.require_healthy,
264 extra_context: config.extra_context,
265 });
266
267 Ok(StartedApplication {
268 config_paths: config.config_paths,
269 internal_topologies: config.internal_topologies,
270 graceful_crash_receiver: config.graceful_crash_receiver,
271 signals,
272 topology_controller,
273 allow_empty_config: root_opts.allow_empty_config,
274 })
275 }
276}
277
278pub struct StartedApplication {
279 pub config_paths: Vec<ConfigPath>,
280 pub internal_topologies: Vec<RunningTopology>,
281 pub graceful_crash_receiver: ShutdownErrorReceiver,
282 pub signals: SignalPair,
283 pub topology_controller: SharedTopologyController,
284 pub allow_empty_config: bool,
285}
286
287impl StartedApplication {
288 pub async fn run(self) -> ExitStatus {
289 self.main().await.shutdown().await
290 }
291
292 pub async fn main(self) -> FinishedApplication {
293 let Self {
294 config_paths,
295 graceful_crash_receiver,
296 signals,
297 topology_controller,
298 internal_topologies,
299 allow_empty_config,
300 } = self;
301
302 let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
303
304 let mut signal_handler = signals.handler;
305 let mut signal_rx = signals.receiver;
306
307 let signal = loop {
308 let has_sources = !topology_controller.lock().await.topology.config.is_empty();
309 tokio::select! {
310 signal = signal_rx.recv() => if let Some(signal) = handle_signal(
311 signal,
312 &topology_controller,
313 &config_paths,
314 &mut signal_handler,
315 allow_empty_config,
316 ).await {
317 break signal;
318 },
319 error = graceful_crash.next() => break SignalTo::Shutdown(error),
321 _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
322 info!("All sources have finished.");
323 break SignalTo::Shutdown(None)
324 } ,
325 else => unreachable!("Signal streams never end"),
326 }
327 };
328
329 FinishedApplication {
330 signal,
331 signal_rx,
332 topology_controller,
333 internal_topologies,
334 }
335 }
336}
337
338async fn handle_signal(
339 signal: Result<SignalTo, RecvError>,
340 topology_controller: &SharedTopologyController,
341 config_paths: &[ConfigPath],
342 signal_handler: &mut SignalHandler,
343 allow_empty_config: bool,
344) -> Option<SignalTo> {
345 match signal {
346 Ok(SignalTo::ReloadComponents(components_to_reload)) => {
347 let mut topology_controller = topology_controller.lock().await;
348 topology_controller
349 .topology
350 .extend_reload_set(components_to_reload);
351
352 if let Some(paths) = config::process_paths(config_paths) {
354 topology_controller.config_paths = paths;
355 }
356
357 let new_config = config::load_from_paths_with_provider_and_secrets(
359 &topology_controller.config_paths,
360 signal_handler,
361 allow_empty_config,
362 )
363 .await;
364
365 reload_config_from_result(topology_controller, new_config).await
366 }
367 Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
368 let topology_controller = topology_controller.lock().await;
369 reload_config_from_result(topology_controller, config_builder.build()).await
370 }
371 Ok(SignalTo::ReloadFromDisk) => {
372 let mut topology_controller = topology_controller.lock().await;
373
374 if let Some(paths) = config::process_paths(config_paths) {
376 topology_controller.config_paths = paths;
377 }
378
379 let new_config = config::load_from_paths_with_provider_and_secrets(
381 &topology_controller.config_paths,
382 signal_handler,
383 allow_empty_config,
384 )
385 .await;
386
387 if let Ok(ref config) = new_config {
388 let transform_keys_to_reload = config.transform_keys_with_external_files();
390
391 if !transform_keys_to_reload.is_empty() {
393 info!(
394 message = "Reloading transforms with external files.",
395 count = transform_keys_to_reload.len()
396 );
397 topology_controller
398 .topology
399 .extend_reload_set(transform_keys_to_reload);
400 }
401 }
402
403 reload_config_from_result(topology_controller, new_config).await
404 }
405 Ok(SignalTo::ReloadEnrichmentTables) => {
406 let topology_controller = topology_controller.lock().await;
407
408 topology_controller
409 .topology
410 .reload_enrichment_tables()
411 .await;
412 None
413 }
414 Err(RecvError::Lagged(amt)) => {
415 warn!("Overflow, dropped {} signals.", amt);
416 None
417 }
418 Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
419 Ok(signal) => Some(signal),
420 }
421}
422
423async fn reload_config_from_result(
424 mut topology_controller: MutexGuard<'_, TopologyController>,
425 config: Result<Config, Vec<String>>,
426) -> Option<SignalTo> {
427 match config {
428 Ok(new_config) => match topology_controller.reload(new_config).await {
429 ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
430 _ => None,
431 },
432 Err(errors) => {
433 handle_config_errors(errors);
434 emit!(VectorConfigLoadError);
435 None
436 }
437 }
438}
439
440pub struct FinishedApplication {
441 pub signal: SignalTo,
442 pub signal_rx: SignalRx,
443 pub topology_controller: SharedTopologyController,
444 pub internal_topologies: Vec<RunningTopology>,
445}
446
447impl FinishedApplication {
448 pub async fn shutdown(self) -> ExitStatus {
449 let FinishedApplication {
450 signal,
451 signal_rx,
452 topology_controller,
453 internal_topologies,
454 } = self;
455
456 let topology_controller = topology_controller
459 .try_into_inner()
460 .expect("fail to unwrap topology controller")
461 .into_inner();
462
463 let status = match signal {
464 SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
465 SignalTo::Quit => Self::quit(),
466 _ => unreachable!(),
467 };
468
469 for topology in internal_topologies {
470 topology.stop().await;
471 }
472
473 status
474 }
475
476 async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
477 emit!(VectorStopped);
478 tokio::select! {
479 _ = topology_controller.stop() => ExitStatus::from_raw({
480 #[cfg(windows)]
481 {
482 exitcode::OK as u32
483 }
484 #[cfg(unix)]
485 exitcode::OK
486 }), _ = signal_rx.recv() => Self::quit(),
488 }
489 }
490
491 fn quit() -> ExitStatus {
492 emit!(VectorQuit);
494 ExitStatus::from_raw({
495 #[cfg(windows)]
496 {
497 exitcode::UNAVAILABLE as u32
498 }
499 #[cfg(unix)]
500 exitcode::OK
501 })
502 }
503}
504
505fn get_log_levels(default: &str) -> String {
506 std::env::var("VECTOR_LOG")
507 .or_else(|_| {
508 std::env::var("LOG").inspect(|_log| {
509 warn!(
510 message =
511 "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
512 );
513 })
514 })
515 .unwrap_or_else(|_| default.into())
516}
517
518pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtime, ExitCode> {
519 let mut rt_builder = runtime::Builder::new_multi_thread();
520 rt_builder.max_blocking_threads(20_000);
521 rt_builder.enable_all().thread_name(thread_name);
522
523 let threads = threads.unwrap_or_else(crate::num_threads);
524 if threads == 0 {
525 error!("The `threads` argument must be greater or equal to 1.");
526 return Err(exitcode::CONFIG);
527 }
528 WORKER_THREADS
529 .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
530 .unwrap_or_else(|_| panic!("double thread initialization"));
531 rt_builder.worker_threads(threads);
532
533 debug!(message = "Building runtime.", worker_threads = threads);
534 Ok(rt_builder.build().expect("Unable to create async runtime"))
535}
536
537pub async fn load_configs(
538 config_paths: &[ConfigPath],
539 watcher_conf: Option<config::watcher::WatcherConfig>,
540 require_healthy: Option<bool>,
541 allow_empty_config: bool,
542 graceful_shutdown_duration: Option<Duration>,
543 signal_handler: &mut SignalHandler,
544) -> Result<Config, ExitCode> {
545 let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
546
547 let watched_paths = config_paths
548 .iter()
549 .map(<&PathBuf>::from)
550 .collect::<Vec<_>>();
551
552 info!(
553 message = "Loading configs.",
554 paths = ?watched_paths
555 );
556
557 let mut config = config::load_from_paths_with_provider_and_secrets(
558 &config_paths,
559 signal_handler,
560 allow_empty_config,
561 )
562 .await
563 .map_err(handle_config_errors)?;
564
565 let mut watched_component_paths = Vec::new();
566
567 if let Some(watcher_conf) = watcher_conf {
568 for (name, transform) in config.transforms() {
569 let files = transform.inner.files_to_watch();
570 let component_config = ComponentConfig::new(
571 files.into_iter().cloned().collect(),
572 name.clone(),
573 ComponentType::Transform,
574 );
575 watched_component_paths.push(component_config);
576 }
577
578 for (name, sink) in config.sinks() {
579 let files = sink.inner.files_to_watch();
580 let component_config = ComponentConfig::new(
581 files.into_iter().cloned().collect(),
582 name.clone(),
583 ComponentType::Sink,
584 );
585 watched_component_paths.push(component_config);
586 }
587
588 for (name, table) in config.enrichment_tables() {
589 let files = table.inner.files_to_watch();
590 let component_config = ComponentConfig::new(
591 files.into_iter().cloned().collect(),
592 name.clone(),
593 ComponentType::EnrichmentTable,
594 );
595 watched_component_paths.push(component_config);
596 }
597
598 info!(
599 message = "Starting watcher.",
600 paths = ?watched_paths
601 );
602 info!(
603 message = "Components to watch.",
604 paths = ?watched_component_paths
605 );
606
607 config::watcher::spawn_thread(
609 watcher_conf,
610 signal_handler.clone_tx(),
611 watched_paths,
612 watched_component_paths,
613 None,
614 )
615 .map_err(|error| {
616 error!(message = "Unable to start config watcher.", %error);
617 exitcode::CONFIG
618 })?;
619 }
620
621 config::init_log_schema(config.global.log_schema.clone(), true);
622 config::init_telemetry(config.global.telemetry.clone(), true);
623
624 if !config.healthchecks.enabled {
625 info!("Health checks are disabled.");
626 }
627 config.healthchecks.set_require_healthy(require_healthy);
628 config.graceful_shutdown_duration = graceful_shutdown_duration;
629
630 Ok(config)
631}
632
633pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) {
634 let level = get_log_levels(log_level);
635 let json = match format {
636 LogFormat::Text => false,
637 LogFormat::Json => true,
638 };
639
640 trace::init(color, json, &level, rate);
641 debug!(
642 message = "Internal log rate limit configured.",
643 internal_log_rate_secs = rate,
644 );
645 info!(message = "Log level is enabled.", level = ?level);
646}
647
648pub fn watcher_config(
649 method: WatchConfigMethod,
650 interval: NonZeroU64,
651) -> config::watcher::WatcherConfig {
652 match method {
653 WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
654 WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
655 }
656}