1#![allow(missing_docs)]
2#[cfg(unix)]
3use std::os::unix::process::ExitStatusExt;
4#[cfg(windows)]
5use std::os::windows::process::ExitStatusExt;
6use std::{
7 num::{NonZeroU64, NonZeroUsize},
8 path::PathBuf,
9 process::ExitStatus,
10 sync::atomic::{AtomicUsize, Ordering},
11 time::Duration,
12};
13
14use exitcode::ExitCode;
15use futures::StreamExt;
16use tokio::{
17 runtime::{self, Handle, Runtime},
18 sync::{MutexGuard, broadcast::error::RecvError},
19};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22#[cfg(feature = "api")]
23use crate::{api, internal_events::ApiStarted};
24use crate::{
25 cli::{LogFormat, Opts, RootOpts, WatchConfigMethod, handle_config_errors},
26 config::{self, ComponentConfig, ComponentType, Config, ConfigPath},
27 extra_context::ExtraContext,
28 heartbeat,
29 internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
30 signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
31 topology::{
32 ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
33 TopologyController,
34 },
35 trace,
36};
37
38static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
39
40pub fn worker_threads() -> Option<NonZeroUsize> {
41 NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
42}
43
44pub struct ApplicationConfig {
45 pub config_paths: Vec<config::ConfigPath>,
46 pub topology: RunningTopology,
47 pub graceful_crash_receiver: ShutdownErrorReceiver,
48 pub internal_topologies: Vec<RunningTopology>,
49 #[cfg(feature = "api")]
50 pub api: config::api::Options,
51 pub extra_context: ExtraContext,
52}
53
54pub struct Application {
55 pub root_opts: RootOpts,
56 pub config: ApplicationConfig,
57 pub signals: SignalPair,
58}
59
60impl ApplicationConfig {
61 pub async fn from_opts(
62 opts: &RootOpts,
63 signal_handler: &mut SignalHandler,
64 extra_context: ExtraContext,
65 ) -> Result<Self, ExitCode> {
66 let config_paths = opts.config_paths_with_formats();
67
68 let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
69 .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
70
71 let watcher_conf = if opts.watch_config {
72 Some(watcher_config(
73 opts.watch_config_method,
74 opts.watch_config_poll_interval_seconds,
75 ))
76 } else {
77 None
78 };
79
80 let config = load_configs(
81 &config_paths,
82 watcher_conf,
83 opts.require_healthy,
84 opts.allow_empty_config,
85 !opts.disable_env_var_interpolation,
86 graceful_shutdown_duration,
87 signal_handler,
88 )
89 .await?;
90
91 Self::from_config(config_paths, config, extra_context).await
92 }
93
94 pub async fn from_config(
95 config_paths: Vec<ConfigPath>,
96 config: Config,
97 extra_context: ExtraContext,
98 ) -> Result<Self, ExitCode> {
99 #[cfg(feature = "api")]
100 let api = config.api;
101
102 let (topology, graceful_crash_receiver) =
103 RunningTopology::start_init_validated(config, extra_context.clone())
104 .await
105 .ok_or(exitcode::CONFIG)?;
106
107 Ok(Self {
108 config_paths,
109 topology,
110 graceful_crash_receiver,
111 internal_topologies: Vec::new(),
112 #[cfg(feature = "api")]
113 api,
114 extra_context,
115 })
116 }
117
118 pub async fn add_internal_config(
119 &mut self,
120 config: Config,
121 extra_context: ExtraContext,
122 ) -> Result<(), ExitCode> {
123 let Some((topology, _)) =
124 RunningTopology::start_init_validated(config, extra_context).await
125 else {
126 return Err(exitcode::CONFIG);
127 };
128 self.internal_topologies.push(topology);
129 Ok(())
130 }
131
132 #[cfg(feature = "api")]
134 pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
135 if self.api.enabled {
136 match api::Server::start(
137 self.topology.config(),
138 self.topology.watch(),
139 std::sync::Arc::clone(&self.topology.running),
140 handle,
141 ) {
142 Ok(api_server) => {
143 emit!(ApiStarted {
144 addr: self.api.address.unwrap(),
145 playground: self.api.playground,
146 graphql: self.api.graphql
147 });
148
149 Some(api_server)
150 }
151 Err(error) => {
152 let error = error.to_string();
153 error!(message = "An error occurred that Vector couldn't handle.", %error, internal_log_rate_limit = false);
154 _ = self
155 .topology
156 .abort_tx
157 .send(crate::signal::ShutdownError::ApiFailed { error });
158 None
159 }
160 }
161 } else {
162 info!(
163 message = "API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`."
164 );
165 None
166 }
167 }
168}
169
170impl Application {
171 pub fn run(extra_context: ExtraContext) -> ExitStatus {
172 let (runtime, app) =
173 Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
174
175 runtime.block_on(app.run())
176 }
177
178 pub fn prepare_start(
179 extra_context: ExtraContext,
180 ) -> Result<(Runtime, StartedApplication), ExitCode> {
181 Self::prepare(extra_context)
182 .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
183 }
184
185 pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
186 let opts = Opts::get_matches().map_err(|error| {
187 _ = error.print();
189 exitcode::USAGE
190 })?;
191
192 Self::prepare_from_opts(opts, extra_context)
193 }
194
195 pub fn prepare_from_opts(
196 opts: Opts,
197 extra_context: ExtraContext,
198 ) -> Result<(Runtime, Self), ExitCode> {
199 opts.root.init_global();
200
201 let color = opts.root.color.use_color();
202
203 init_logging(
204 color,
205 opts.root.log_format,
206 opts.log_level(),
207 opts.root.internal_log_rate_limit,
208 );
209
210 crate::set_global_color(color);
212
213 if opts.root.openssl_no_probe {
215 debug!(
216 message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL."
217 );
218 }
219
220 let runtime = build_runtime(opts.root.threads, "vector-worker")?;
221
222 let mut signals = SignalPair::new(&runtime);
224
225 if let Some(sub_command) = &opts.sub_command {
226 return Err(runtime.block_on(sub_command.execute(signals, color)));
227 }
228
229 let config = runtime.block_on(ApplicationConfig::from_opts(
230 &opts.root,
231 &mut signals.handler,
232 extra_context,
233 ))?;
234
235 Ok((
236 runtime,
237 Self {
238 root_opts: opts.root,
239 config,
240 signals,
241 },
242 ))
243 }
244
245 pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
246 crate::trace::stop_early_buffering();
249
250 emit!(VectorStarted);
251 handle.spawn(heartbeat::heartbeat());
252
253 let Self {
254 root_opts,
255 config,
256 signals,
257 } = self;
258
259 let topology_controller = SharedTopologyController::new(TopologyController {
260 #[cfg(feature = "api")]
261 api_server: config.setup_api(handle),
262 topology: config.topology,
263 config_paths: config.config_paths.clone(),
264 require_healthy: root_opts.require_healthy,
265 extra_context: config.extra_context,
266 });
267
268 Ok(StartedApplication {
269 config_paths: config.config_paths,
270 internal_topologies: config.internal_topologies,
271 graceful_crash_receiver: config.graceful_crash_receiver,
272 signals,
273 topology_controller,
274 allow_empty_config: root_opts.allow_empty_config,
275 interpolate_env: !root_opts.disable_env_var_interpolation,
276 })
277 }
278}
279
280pub struct StartedApplication {
281 pub config_paths: Vec<ConfigPath>,
282 pub internal_topologies: Vec<RunningTopology>,
283 pub graceful_crash_receiver: ShutdownErrorReceiver,
284 pub signals: SignalPair,
285 pub topology_controller: SharedTopologyController,
286 pub allow_empty_config: bool,
287 pub interpolate_env: bool,
288}
289
290impl StartedApplication {
291 pub async fn run(self) -> ExitStatus {
292 self.main().await.shutdown().await
293 }
294
295 pub async fn main(self) -> FinishedApplication {
296 let Self {
297 config_paths,
298 graceful_crash_receiver,
299 signals,
300 topology_controller,
301 internal_topologies,
302 allow_empty_config,
303 interpolate_env,
304 } = self;
305
306 let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
307
308 let mut signal_handler = signals.handler;
309 let mut signal_rx = signals.receiver;
310
311 let signal = loop {
312 let has_sources = !topology_controller.lock().await.topology.config.is_empty();
313 tokio::select! {
314 signal = signal_rx.recv() => if let Some(signal) = handle_signal(
315 signal,
316 &topology_controller,
317 &config_paths,
318 &mut signal_handler,
319 allow_empty_config,
320 interpolate_env,
321 ).await {
322 break signal;
323 },
324 error = graceful_crash.next() => break SignalTo::Shutdown(error),
326 _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
327 info!("All sources have finished.");
328 break SignalTo::Shutdown(None)
329 } ,
330 else => unreachable!("Signal streams never end"),
331 }
332 };
333
334 FinishedApplication {
335 signal,
336 signal_rx,
337 topology_controller,
338 internal_topologies,
339 }
340 }
341}
342
343async fn handle_signal(
344 signal: Result<SignalTo, RecvError>,
345 topology_controller: &SharedTopologyController,
346 config_paths: &[ConfigPath],
347 signal_handler: &mut SignalHandler,
348 allow_empty_config: bool,
349 interpolate_env: bool,
350) -> Option<SignalTo> {
351 match signal {
352 Ok(SignalTo::ReloadComponents(components_to_reload)) => {
353 let mut topology_controller = topology_controller.lock().await;
354 topology_controller
355 .topology
356 .extend_reload_set(components_to_reload);
357
358 if let Some(paths) = config::process_paths(config_paths) {
360 topology_controller.config_paths = paths;
361 }
362
363 let new_config = config::load_from_paths_with_provider_and_secrets(
365 &topology_controller.config_paths,
366 signal_handler,
367 allow_empty_config,
368 interpolate_env,
369 )
370 .await;
371
372 reload_config_from_result(topology_controller, new_config).await
373 }
374 Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
375 let topology_controller = topology_controller.lock().await;
376 reload_config_from_result(topology_controller, config_builder.build()).await
377 }
378 Ok(SignalTo::ReloadFromDisk) => {
379 let mut topology_controller = topology_controller.lock().await;
380
381 if let Some(paths) = config::process_paths(config_paths) {
383 topology_controller.config_paths = paths;
384 }
385
386 let new_config = config::load_from_paths_with_provider_and_secrets(
388 &topology_controller.config_paths,
389 signal_handler,
390 allow_empty_config,
391 interpolate_env,
392 )
393 .await;
394
395 if let Ok(ref config) = new_config {
396 let transform_keys_to_reload = config.transform_keys_with_external_files();
398
399 if !transform_keys_to_reload.is_empty() {
401 info!(
402 message = "Reloading transforms with external files.",
403 count = transform_keys_to_reload.len()
404 );
405 topology_controller
406 .topology
407 .extend_reload_set(transform_keys_to_reload);
408 }
409 }
410
411 reload_config_from_result(topology_controller, new_config).await
412 }
413 Ok(SignalTo::ReloadEnrichmentTables) => {
414 let topology_controller = topology_controller.lock().await;
415
416 topology_controller
417 .topology
418 .reload_enrichment_tables()
419 .await;
420 None
421 }
422 Err(RecvError::Lagged(amt)) => {
423 warn!("Overflow, dropped {} signals.", amt);
424 None
425 }
426 Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
427 Ok(signal) => Some(signal),
428 }
429}
430
431async fn reload_config_from_result(
432 mut topology_controller: MutexGuard<'_, TopologyController>,
433 config: Result<Config, Vec<String>>,
434) -> Option<SignalTo> {
435 match config {
436 Ok(new_config) => match topology_controller.reload(new_config).await {
437 ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
438 _ => None,
439 },
440 Err(errors) => {
441 handle_config_errors(errors);
442 emit!(VectorConfigLoadError);
443 None
444 }
445 }
446}
447
448pub struct FinishedApplication {
449 pub signal: SignalTo,
450 pub signal_rx: SignalRx,
451 pub topology_controller: SharedTopologyController,
452 pub internal_topologies: Vec<RunningTopology>,
453}
454
455impl FinishedApplication {
456 pub async fn shutdown(self) -> ExitStatus {
457 let FinishedApplication {
458 signal,
459 signal_rx,
460 topology_controller,
461 internal_topologies,
462 } = self;
463
464 let topology_controller = topology_controller
467 .try_into_inner()
468 .expect("fail to unwrap topology controller")
469 .into_inner();
470
471 let status = match signal {
472 SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
473 SignalTo::Quit => Self::quit(),
474 _ => unreachable!(),
475 };
476
477 for topology in internal_topologies {
478 topology.stop().await;
479 }
480
481 status
482 }
483
484 async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
485 emit!(VectorStopped);
486 tokio::select! {
487 _ = topology_controller.stop() => ExitStatus::from_raw({
488 #[cfg(windows)]
489 {
490 exitcode::OK as u32
491 }
492 #[cfg(unix)]
493 exitcode::OK
494 }), _ = signal_rx.recv() => Self::quit(),
496 }
497 }
498
499 fn quit() -> ExitStatus {
500 emit!(VectorQuit);
502 ExitStatus::from_raw({
503 #[cfg(windows)]
504 {
505 exitcode::UNAVAILABLE as u32
506 }
507 #[cfg(unix)]
508 exitcode::OK
509 })
510 }
511}
512
513fn get_log_levels(default: &str) -> String {
514 std::env::var("VECTOR_LOG")
515 .or_else(|_| {
516 std::env::var("LOG").inspect(|_log| {
517 warn!(
518 message =
519 "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
520 );
521 })
522 })
523 .unwrap_or_else(|_| default.into())
524}
525
526pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtime, ExitCode> {
527 let mut rt_builder = runtime::Builder::new_multi_thread();
528 rt_builder.max_blocking_threads(20_000);
529 rt_builder.enable_all().thread_name(thread_name);
530
531 let threads = threads.unwrap_or_else(crate::num_threads);
532 if threads == 0 {
533 error!("The `threads` argument must be greater or equal to 1.");
534 return Err(exitcode::CONFIG);
535 }
536 WORKER_THREADS
537 .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
538 .unwrap_or_else(|_| panic!("double thread initialization"));
539 rt_builder.worker_threads(threads);
540
541 debug!(message = "Building runtime.", worker_threads = threads);
542 Ok(rt_builder.build().expect("Unable to create async runtime"))
543}
544
545pub async fn load_configs(
546 config_paths: &[ConfigPath],
547 watcher_conf: Option<config::watcher::WatcherConfig>,
548 require_healthy: Option<bool>,
549 allow_empty_config: bool,
550 interpolate_env: bool,
551 graceful_shutdown_duration: Option<Duration>,
552 signal_handler: &mut SignalHandler,
553) -> Result<Config, ExitCode> {
554 let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
555
556 let watched_paths = config_paths
557 .iter()
558 .map(<&PathBuf>::from)
559 .collect::<Vec<_>>();
560
561 info!(
562 message = "Loading configs.",
563 paths = ?watched_paths
564 );
565
566 let mut config = config::load_from_paths_with_provider_and_secrets(
567 &config_paths,
568 signal_handler,
569 allow_empty_config,
570 interpolate_env,
571 )
572 .await
573 .map_err(handle_config_errors)?;
574
575 let mut watched_component_paths = Vec::new();
576
577 if let Some(watcher_conf) = watcher_conf {
578 for (name, transform) in config.transforms() {
579 let files = transform.inner.files_to_watch();
580 let component_config = ComponentConfig::new(
581 files.into_iter().cloned().collect(),
582 name.clone(),
583 ComponentType::Transform,
584 );
585 watched_component_paths.push(component_config);
586 }
587
588 for (name, sink) in config.sinks() {
589 let files = sink.inner.files_to_watch();
590 let component_config = ComponentConfig::new(
591 files.into_iter().cloned().collect(),
592 name.clone(),
593 ComponentType::Sink,
594 );
595 watched_component_paths.push(component_config);
596 }
597
598 for (name, table) in config.enrichment_tables() {
599 let files = table.inner.files_to_watch();
600 let component_config = ComponentConfig::new(
601 files.into_iter().cloned().collect(),
602 name.clone(),
603 ComponentType::EnrichmentTable,
604 );
605 watched_component_paths.push(component_config);
606 }
607
608 info!(
609 message = "Starting watcher.",
610 paths = ?watched_paths
611 );
612 info!(
613 message = "Components to watch.",
614 paths = ?watched_component_paths
615 );
616
617 config::watcher::spawn_thread(
619 watcher_conf,
620 signal_handler.clone_tx(),
621 watched_paths,
622 watched_component_paths,
623 None,
624 )
625 .map_err(|error| {
626 error!(message = "Unable to start config watcher.", %error);
627 exitcode::CONFIG
628 })?;
629 }
630
631 config::init_log_schema(config.global.log_schema.clone(), true);
632 config::init_telemetry(config.global.telemetry.clone(), true);
633
634 if !config.healthchecks.enabled {
635 info!("Health checks are disabled.");
636 }
637 config.healthchecks.set_require_healthy(require_healthy);
638 config.graceful_shutdown_duration = graceful_shutdown_duration;
639
640 Ok(config)
641}
642
643pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) {
644 let level = get_log_levels(log_level);
645 let json = match format {
646 LogFormat::Text => false,
647 LogFormat::Json => true,
648 };
649
650 trace::init(color, json, &level, rate);
651 debug!(
652 message = "Internal log rate limit configured.",
653 internal_log_rate_secs = rate,
654 );
655 info!(message = "Log level is enabled.", level = ?level);
656}
657
658pub fn watcher_config(
659 method: WatchConfigMethod,
660 interval: NonZeroU64,
661) -> config::watcher::WatcherConfig {
662 match method {
663 WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
664 WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
665 }
666}