1#![allow(missing_docs)]
2#[cfg(unix)]
3use std::os::unix::process::ExitStatusExt;
4#[cfg(windows)]
5use std::os::windows::process::ExitStatusExt;
6use std::{
7 num::{NonZeroU64, NonZeroUsize},
8 path::PathBuf,
9 process::ExitStatus,
10 sync::atomic::{AtomicUsize, Ordering},
11 time::Duration,
12};
13
14use exitcode::ExitCode;
15use futures::StreamExt;
16use tokio::{
17 runtime::{self, Handle, Runtime},
18 sync::{MutexGuard, broadcast::error::RecvError},
19};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22#[cfg(feature = "api")]
23use crate::api;
24#[cfg(feature = "api")]
25use crate::internal_events::ApiStarted;
26use crate::{
27 cli::{LogFormat, Opts, RootOpts, WatchConfigMethod, handle_config_errors},
28 config::{self, ComponentConfig, ComponentType, Config, ConfigPath},
29 extra_context::ExtraContext,
30 heartbeat,
31 internal_events::{
32 VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped, VectorStopping,
33 },
34 signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
35 topology::{
36 ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
37 TopologyController,
38 },
39 trace,
40};
41
42static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
43
44pub fn worker_threads() -> Option<NonZeroUsize> {
45 NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
46}
47
48pub struct ApplicationConfig {
49 pub config_paths: Vec<config::ConfigPath>,
50 pub topology: RunningTopology,
51 pub graceful_crash_receiver: ShutdownErrorReceiver,
52 pub internal_topologies: Vec<RunningTopology>,
53 #[cfg(feature = "api")]
54 pub api: config::api::Options,
55 pub extra_context: ExtraContext,
56}
57
58pub struct Application {
59 pub root_opts: RootOpts,
60 pub config: ApplicationConfig,
61 pub signals: SignalPair,
62}
63
64impl ApplicationConfig {
65 pub async fn from_opts(
66 opts: &RootOpts,
67 signal_handler: &mut SignalHandler,
68 extra_context: ExtraContext,
69 ) -> Result<Self, ExitCode> {
70 let config_paths = opts.config_paths_with_formats();
71
72 let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
73 .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
74
75 let watcher_conf = if opts.watch_config {
76 Some(watcher_config(
77 opts.watch_config_method,
78 opts.watch_config_poll_interval_seconds,
79 ))
80 } else {
81 None
82 };
83
84 let config = load_configs(
85 &config_paths,
86 watcher_conf,
87 opts.require_healthy,
88 opts.allow_empty_config,
89 !opts.disable_env_var_interpolation,
90 graceful_shutdown_duration,
91 signal_handler,
92 )
93 .await?;
94
95 Self::from_config(config_paths, config, extra_context).await
96 }
97
98 pub async fn from_config(
99 config_paths: Vec<ConfigPath>,
100 config: Config,
101 extra_context: ExtraContext,
102 ) -> Result<Self, ExitCode> {
103 #[cfg(feature = "api")]
104 let api = config.api;
105
106 let (topology, graceful_crash_receiver) =
107 RunningTopology::start_init_validated(config, extra_context.clone())
108 .await
109 .ok_or(exitcode::CONFIG)?;
110
111 Ok(Self {
112 config_paths,
113 topology,
114 graceful_crash_receiver,
115 internal_topologies: Vec::new(),
116 #[cfg(feature = "api")]
117 api,
118 extra_context,
119 })
120 }
121
122 pub async fn add_internal_config(
123 &mut self,
124 config: Config,
125 extra_context: ExtraContext,
126 ) -> Result<(), ExitCode> {
127 let Some((topology, _)) =
128 RunningTopology::start_init_validated(config, extra_context).await
129 else {
130 return Err(exitcode::CONFIG);
131 };
132 self.internal_topologies.push(topology);
133 Ok(())
134 }
135
136 #[cfg(feature = "api")]
138 pub fn setup_api(&self, handle: &Handle) -> Option<api::GrpcServer> {
139 if self.api.enabled {
140 let api_server = handle.block_on(api::GrpcServer::start(
142 self.topology.config(),
143 self.topology.watch(),
144 ));
145 match api_server {
146 Ok(server) => {
147 emit!(ApiStarted {
148 addr: server.addr()
149 });
150 Some(server)
151 }
152 Err(error) => {
153 let error = error.to_string();
154 error!(
155 message = "An error occurred that Vector couldn't handle.",
156 %error,
157 internal_log_rate_limit = false
158 );
159 _ = self
162 .topology
163 .abort_tx
164 .send(crate::signal::ShutdownError::ApiFailed { error });
165 None
166 }
167 }
168 } else {
169 info!(
170 message = "API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`."
171 );
172 None
173 }
174 }
175}
176
177impl Application {
178 pub fn run(extra_context: ExtraContext) -> ExitStatus {
179 let (runtime, app) =
180 Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
181
182 runtime.block_on(app.run())
183 }
184
185 pub fn prepare_start(
186 extra_context: ExtraContext,
187 ) -> Result<(Runtime, StartedApplication), ExitCode> {
188 Self::prepare(extra_context)
189 .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
190 }
191
192 pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
193 let opts = Opts::get_matches().map_err(|error| {
194 _ = error.print();
196 exitcode::USAGE
197 })?;
198
199 Self::prepare_from_opts(opts, extra_context)
200 }
201
202 pub fn prepare_from_opts(
203 opts: Opts,
204 extra_context: ExtraContext,
205 ) -> Result<(Runtime, Self), ExitCode> {
206 opts.root.init_global();
207
208 let color = opts.root.color.use_color();
209
210 init_logging(
211 color,
212 opts.root.log_format,
213 opts.log_level(),
214 opts.root.internal_log_rate_limit,
215 );
216
217 crate::set_global_color(color);
219
220 if opts.root.openssl_no_probe {
222 debug!(
223 message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL."
224 );
225 }
226
227 let runtime = build_runtime(opts.root.threads, "vector-worker")?;
228
229 let mut signals = SignalPair::new(&runtime);
231
232 if let Some(sub_command) = &opts.sub_command {
233 return Err(runtime.block_on(sub_command.execute(signals, color)));
234 }
235
236 let config = runtime.block_on(ApplicationConfig::from_opts(
237 &opts.root,
238 &mut signals.handler,
239 extra_context,
240 ))?;
241
242 Ok((
243 runtime,
244 Self {
245 root_opts: opts.root,
246 config,
247 signals,
248 },
249 ))
250 }
251
252 pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
253 crate::trace::stop_early_buffering();
256
257 emit!(VectorStarted);
258 handle.spawn(heartbeat::heartbeat());
259
260 let Self {
261 root_opts,
262 config,
263 signals,
264 } = self;
265
266 #[cfg(feature = "api")]
267 let api_server = config.setup_api(handle);
268
269 let topology_controller = SharedTopologyController::new(TopologyController {
270 #[cfg(feature = "api")]
271 api_server,
272 topology: config.topology,
273 config_paths: config.config_paths.clone(),
274 require_healthy: root_opts.require_healthy,
275 extra_context: config.extra_context,
276 });
277
278 Ok(StartedApplication {
279 config_paths: config.config_paths,
280 internal_topologies: config.internal_topologies,
281 graceful_crash_receiver: config.graceful_crash_receiver,
282 signals,
283 topology_controller,
284 allow_empty_config: root_opts.allow_empty_config,
285 interpolate_env: !root_opts.disable_env_var_interpolation,
286 })
287 }
288}
289
290pub struct StartedApplication {
291 pub config_paths: Vec<ConfigPath>,
292 pub internal_topologies: Vec<RunningTopology>,
293 pub graceful_crash_receiver: ShutdownErrorReceiver,
294 pub signals: SignalPair,
295 pub topology_controller: SharedTopologyController,
296 pub allow_empty_config: bool,
297 pub interpolate_env: bool,
298}
299
300impl StartedApplication {
301 pub async fn run(self) -> ExitStatus {
302 self.main().await.shutdown().await
303 }
304
305 pub async fn main(self) -> FinishedApplication {
306 let Self {
307 config_paths,
308 graceful_crash_receiver,
309 signals,
310 topology_controller,
311 internal_topologies,
312 allow_empty_config,
313 interpolate_env,
314 } = self;
315
316 let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
317
318 let mut signal_handler = signals.handler;
319 let mut signal_rx = signals.receiver;
320
321 let signal = loop {
322 let has_sources = !topology_controller.lock().await.topology.config.is_empty();
323 tokio::select! {
324 signal = signal_rx.recv() => if let Some(signal) = handle_signal(
325 signal,
326 &topology_controller,
327 &config_paths,
328 &mut signal_handler,
329 allow_empty_config,
330 interpolate_env,
331 ).await {
332 break signal;
333 },
334 error = graceful_crash.next() => break SignalTo::Shutdown(error),
336 _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
337 info!("All sources have finished.");
338 break SignalTo::Shutdown(None)
339 } ,
340 else => unreachable!("Signal streams never end"),
341 }
342 };
343
344 FinishedApplication {
345 signal,
346 signal_rx,
347 topology_controller,
348 internal_topologies,
349 }
350 }
351}
352
353async fn handle_signal(
354 signal: Result<SignalTo, RecvError>,
355 topology_controller: &SharedTopologyController,
356 config_paths: &[ConfigPath],
357 signal_handler: &mut SignalHandler,
358 allow_empty_config: bool,
359 interpolate_env: bool,
360) -> Option<SignalTo> {
361 match signal {
362 Ok(SignalTo::ReloadComponents(components_to_reload)) => {
363 let mut topology_controller = topology_controller.lock().await;
364 topology_controller
365 .topology
366 .extend_reload_set(components_to_reload);
367
368 if let Some(paths) = config::process_paths(config_paths) {
370 topology_controller.config_paths = paths;
371 }
372
373 let new_config = config::load_from_paths_with_provider_and_secrets(
375 &topology_controller.config_paths,
376 signal_handler,
377 allow_empty_config,
378 interpolate_env,
379 )
380 .await;
381
382 reload_config_from_result(topology_controller, new_config).await
383 }
384 Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
385 let topology_controller = topology_controller.lock().await;
386 reload_config_from_result(topology_controller, config_builder.build()).await
387 }
388 Ok(SignalTo::ReloadFromDisk) => {
389 let mut topology_controller = topology_controller.lock().await;
390
391 if let Some(paths) = config::process_paths(config_paths) {
393 topology_controller.config_paths = paths;
394 }
395
396 let new_config = config::load_from_paths_with_provider_and_secrets(
398 &topology_controller.config_paths,
399 signal_handler,
400 allow_empty_config,
401 interpolate_env,
402 )
403 .await;
404
405 if let Ok(ref config) = new_config {
406 let transform_keys_to_reload = config.transform_keys_with_external_files();
408
409 if !transform_keys_to_reload.is_empty() {
411 info!(
412 message = "Reloading transforms with external files.",
413 count = transform_keys_to_reload.len()
414 );
415 topology_controller
416 .topology
417 .extend_reload_set(transform_keys_to_reload);
418 }
419 }
420
421 reload_config_from_result(topology_controller, new_config).await
422 }
423 Ok(SignalTo::ReloadEnrichmentTables) => {
424 let topology_controller = topology_controller.lock().await;
425
426 topology_controller
427 .topology
428 .reload_enrichment_tables()
429 .await;
430 None
431 }
432 Err(RecvError::Lagged(amt)) => {
433 warn!("Overflow, dropped {} signals.", amt);
434 None
435 }
436 Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
437 Ok(signal) => Some(signal),
438 }
439}
440
441async fn reload_config_from_result(
442 mut topology_controller: MutexGuard<'_, TopologyController>,
443 config: Result<Config, Vec<String>>,
444) -> Option<SignalTo> {
445 match config {
446 Ok(new_config) => match topology_controller.reload(new_config).await {
447 ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
448 _ => None,
449 },
450 Err(errors) => {
451 handle_config_errors(errors);
452 emit!(VectorConfigLoadError);
453 None
454 }
455 }
456}
457
458pub struct FinishedApplication {
459 pub signal: SignalTo,
460 pub signal_rx: SignalRx,
461 pub topology_controller: SharedTopologyController,
462 pub internal_topologies: Vec<RunningTopology>,
463}
464
465impl FinishedApplication {
466 pub async fn shutdown(self) -> ExitStatus {
467 let FinishedApplication {
468 signal,
469 signal_rx,
470 topology_controller,
471 internal_topologies,
472 } = self;
473
474 let topology_controller = topology_controller
477 .try_into_inner()
478 .expect("fail to unwrap topology controller")
479 .into_inner();
480
481 let status = match signal {
482 SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
483 SignalTo::Quit => Self::quit(),
484 _ => unreachable!(),
485 };
486
487 for topology in internal_topologies {
488 topology.stop().await;
489 }
490
491 status
492 }
493
494 async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
495 emit!(VectorStopping);
496 tokio::select! {
497 _ = topology_controller.stop() => {
498 emit!(VectorStopped);
499 ExitStatus::from_raw({
500 #[cfg(windows)]
501 {
502 exitcode::OK as u32
503 }
504 #[cfg(unix)]
505 exitcode::OK
506 })
507 }, _ = signal_rx.recv() => Self::quit(),
509 }
510 }
511
512 fn quit() -> ExitStatus {
513 emit!(VectorQuit);
515 ExitStatus::from_raw({
516 #[cfg(windows)]
517 {
518 exitcode::UNAVAILABLE as u32
519 }
520 #[cfg(unix)]
521 exitcode::OK
522 })
523 }
524}
525
526fn get_log_levels(default: &str) -> String {
527 std::env::var("VECTOR_LOG")
528 .or_else(|_| {
529 std::env::var("LOG").inspect(|_log| {
530 warn!(
531 message =
532 "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
533 );
534 })
535 })
536 .unwrap_or_else(|_| default.into())
537}
538
539pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtime, ExitCode> {
540 let mut rt_builder = runtime::Builder::new_multi_thread();
541 rt_builder.max_blocking_threads(20_000);
542 rt_builder.enable_all().thread_name(thread_name);
543
544 let threads = threads.unwrap_or_else(crate::num_threads);
545 if threads == 0 {
546 error!("The `threads` argument must be greater or equal to 1.");
547 return Err(exitcode::CONFIG);
548 }
549 WORKER_THREADS
550 .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
551 .unwrap_or_else(|_| panic!("double thread initialization"));
552 rt_builder.worker_threads(threads);
553
554 debug!(message = "Building runtime.", worker_threads = threads);
555 Ok(rt_builder.build().expect("Unable to create async runtime"))
556}
557
558pub async fn load_configs(
559 config_paths: &[ConfigPath],
560 watcher_conf: Option<config::watcher::WatcherConfig>,
561 require_healthy: Option<bool>,
562 allow_empty_config: bool,
563 interpolate_env: bool,
564 graceful_shutdown_duration: Option<Duration>,
565 signal_handler: &mut SignalHandler,
566) -> Result<Config, ExitCode> {
567 let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
568
569 let watched_paths = config_paths
570 .iter()
571 .map(<&PathBuf>::from)
572 .collect::<Vec<_>>();
573
574 info!(
575 message = "Loading configs.",
576 paths = ?watched_paths
577 );
578
579 let mut config = config::load_from_paths_with_provider_and_secrets(
580 &config_paths,
581 signal_handler,
582 allow_empty_config,
583 interpolate_env,
584 )
585 .await
586 .map_err(handle_config_errors)?;
587
588 let mut watched_component_paths = Vec::new();
589
590 if let Some(watcher_conf) = watcher_conf {
591 for (name, transform) in config.transforms() {
592 let files = transform.inner.files_to_watch();
593 let component_config = ComponentConfig::new(
594 files.into_iter().cloned().collect(),
595 name.clone(),
596 ComponentType::Transform,
597 );
598 watched_component_paths.push(component_config);
599 }
600
601 for (name, sink) in config.sinks() {
602 let files = sink.inner.files_to_watch();
603 let component_config = ComponentConfig::new(
604 files.into_iter().cloned().collect(),
605 name.clone(),
606 ComponentType::Sink,
607 );
608 watched_component_paths.push(component_config);
609 }
610
611 for (name, table) in config.enrichment_tables() {
612 let files = table.inner.files_to_watch();
613 let component_config = ComponentConfig::new(
614 files.into_iter().cloned().collect(),
615 name.clone(),
616 ComponentType::EnrichmentTable,
617 );
618 watched_component_paths.push(component_config);
619 }
620
621 info!(
622 message = "Starting watcher.",
623 paths = ?watched_paths
624 );
625 info!(
626 message = "Components to watch.",
627 paths = ?watched_component_paths
628 );
629
630 config::watcher::spawn_thread(
632 watcher_conf,
633 signal_handler.clone_tx(),
634 watched_paths,
635 watched_component_paths,
636 None,
637 )
638 .map_err(|error| {
639 error!(message = "Unable to start config watcher.", %error);
640 exitcode::CONFIG
641 })?;
642 }
643
644 config::init_log_schema(config.global.log_schema.clone(), true);
645 config::init_telemetry(config.global.telemetry.clone(), true);
646
647 if !config.healthchecks.enabled {
648 info!("Health checks are disabled.");
649 }
650 config.healthchecks.set_require_healthy(require_healthy);
651 config.graceful_shutdown_duration = graceful_shutdown_duration;
652
653 Ok(config)
654}
655
656pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) {
657 let level = get_log_levels(log_level);
658 let json = match format {
659 LogFormat::Text => false,
660 LogFormat::Json => true,
661 };
662
663 trace::init(color, json, &level, rate);
664 debug!(
665 message = "Internal log rate limit configured.",
666 internal_log_rate_secs = rate,
667 );
668 info!(message = "Log level is enabled.", level = ?level);
669}
670
671pub fn watcher_config(
672 method: WatchConfigMethod,
673 interval: NonZeroU64,
674) -> config::watcher::WatcherConfig {
675 match method {
676 WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
677 WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
678 }
679}