vector/
app.rs

1#![allow(missing_docs)]
2#[cfg(unix)]
3use std::os::unix::process::ExitStatusExt;
4#[cfg(windows)]
5use std::os::windows::process::ExitStatusExt;
6use std::{
7    num::{NonZeroU64, NonZeroUsize},
8    path::PathBuf,
9    process::ExitStatus,
10    sync::atomic::{AtomicUsize, Ordering},
11    time::Duration,
12};
13
14use exitcode::ExitCode;
15use futures::StreamExt;
16use tokio::{
17    runtime::{self, Handle, Runtime},
18    sync::{MutexGuard, broadcast::error::RecvError},
19};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22#[cfg(feature = "api")]
23use crate::api;
24#[cfg(feature = "api")]
25use crate::internal_events::ApiStarted;
26use crate::{
27    cli::{LogFormat, Opts, RootOpts, WatchConfigMethod, handle_config_errors},
28    config::{self, ComponentConfig, ComponentType, Config, ConfigPath},
29    extra_context::ExtraContext,
30    heartbeat,
31    internal_events::{
32        VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped, VectorStopping,
33    },
34    signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
35    topology::{
36        ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
37        TopologyController,
38    },
39    trace,
40};
41
42static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
43
44pub fn worker_threads() -> Option<NonZeroUsize> {
45    NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
46}
47
48pub struct ApplicationConfig {
49    pub config_paths: Vec<config::ConfigPath>,
50    pub topology: RunningTopology,
51    pub graceful_crash_receiver: ShutdownErrorReceiver,
52    pub internal_topologies: Vec<RunningTopology>,
53    #[cfg(feature = "api")]
54    pub api: config::api::Options,
55    pub extra_context: ExtraContext,
56}
57
58pub struct Application {
59    pub root_opts: RootOpts,
60    pub config: ApplicationConfig,
61    pub signals: SignalPair,
62}
63
64impl ApplicationConfig {
65    pub async fn from_opts(
66        opts: &RootOpts,
67        signal_handler: &mut SignalHandler,
68        extra_context: ExtraContext,
69    ) -> Result<Self, ExitCode> {
70        let config_paths = opts.config_paths_with_formats();
71
72        let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
73            .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
74
75        let watcher_conf = if opts.watch_config {
76            Some(watcher_config(
77                opts.watch_config_method,
78                opts.watch_config_poll_interval_seconds,
79            ))
80        } else {
81            None
82        };
83
84        let config = load_configs(
85            &config_paths,
86            watcher_conf,
87            opts.require_healthy,
88            opts.allow_empty_config,
89            !opts.disable_env_var_interpolation,
90            graceful_shutdown_duration,
91            signal_handler,
92        )
93        .await?;
94
95        Self::from_config(config_paths, config, extra_context).await
96    }
97
98    pub async fn from_config(
99        config_paths: Vec<ConfigPath>,
100        config: Config,
101        extra_context: ExtraContext,
102    ) -> Result<Self, ExitCode> {
103        #[cfg(feature = "api")]
104        let api = config.api;
105
106        let (topology, graceful_crash_receiver) =
107            RunningTopology::start_init_validated(config, extra_context.clone())
108                .await
109                .ok_or(exitcode::CONFIG)?;
110
111        Ok(Self {
112            config_paths,
113            topology,
114            graceful_crash_receiver,
115            internal_topologies: Vec::new(),
116            #[cfg(feature = "api")]
117            api,
118            extra_context,
119        })
120    }
121
122    pub async fn add_internal_config(
123        &mut self,
124        config: Config,
125        extra_context: ExtraContext,
126    ) -> Result<(), ExitCode> {
127        let Some((topology, _)) =
128            RunningTopology::start_init_validated(config, extra_context).await
129        else {
130            return Err(exitcode::CONFIG);
131        };
132        self.internal_topologies.push(topology);
133        Ok(())
134    }
135
136    /// Configure the gRPC API server, if applicable
137    #[cfg(feature = "api")]
138    pub fn setup_api(&self, handle: &Handle) -> Option<api::GrpcServer> {
139        if self.api.enabled {
140            // Start gRPC server
141            let api_server = handle.block_on(api::GrpcServer::start(
142                self.topology.config(),
143                self.topology.watch(),
144            ));
145            match api_server {
146                Ok(server) => {
147                    emit!(ApiStarted {
148                        addr: server.addr()
149                    });
150                    Some(server)
151                }
152                Err(error) => {
153                    let error = error.to_string();
154                    error!(
155                        message = "An error occurred that Vector couldn't handle.",
156                        %error,
157                        internal_log_rate_limit = false
158                    );
159                    // Trigger shutdown because the API was explicitly enabled but failed to start
160                    // This ensures users don't run Vector thinking top/tap will work when they won't
161                    _ = self
162                        .topology
163                        .abort_tx
164                        .send(crate::signal::ShutdownError::ApiFailed { error });
165                    None
166                }
167            }
168        } else {
169            info!(
170                message = "API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`."
171            );
172            None
173        }
174    }
175}
176
177impl Application {
178    pub fn run(extra_context: ExtraContext) -> ExitStatus {
179        let (runtime, app) =
180            Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
181
182        runtime.block_on(app.run())
183    }
184
185    pub fn prepare_start(
186        extra_context: ExtraContext,
187    ) -> Result<(Runtime, StartedApplication), ExitCode> {
188        Self::prepare(extra_context)
189            .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
190    }
191
192    pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
193        let opts = Opts::get_matches().map_err(|error| {
194            // Printing to stdout/err can itself fail; ignore it.
195            _ = error.print();
196            exitcode::USAGE
197        })?;
198
199        Self::prepare_from_opts(opts, extra_context)
200    }
201
202    pub fn prepare_from_opts(
203        opts: Opts,
204        extra_context: ExtraContext,
205    ) -> Result<(Runtime, Self), ExitCode> {
206        opts.root.init_global();
207
208        let color = opts.root.color.use_color();
209
210        init_logging(
211            color,
212            opts.root.log_format,
213            opts.log_level(),
214            opts.root.internal_log_rate_limit,
215        );
216
217        // Set global color preference for downstream modules
218        crate::set_global_color(color);
219
220        // Can only log this after initializing the logging subsystem
221        if opts.root.openssl_no_probe {
222            debug!(
223                message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL."
224            );
225        }
226
227        let runtime = build_runtime(opts.root.threads, "vector-worker")?;
228
229        // Signal handler for OS and provider messages.
230        let mut signals = SignalPair::new(&runtime);
231
232        if let Some(sub_command) = &opts.sub_command {
233            return Err(runtime.block_on(sub_command.execute(signals, color)));
234        }
235
236        let config = runtime.block_on(ApplicationConfig::from_opts(
237            &opts.root,
238            &mut signals.handler,
239            extra_context,
240        ))?;
241
242        Ok((
243            runtime,
244            Self {
245                root_opts: opts.root,
246                config,
247                signals,
248            },
249        ))
250    }
251
252    pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
253        // Any internal_logs sources will have grabbed a copy of the
254        // early buffer by this point and set up a subscriber.
255        crate::trace::stop_early_buffering();
256
257        emit!(VectorStarted);
258        handle.spawn(heartbeat::heartbeat());
259
260        let Self {
261            root_opts,
262            config,
263            signals,
264        } = self;
265
266        #[cfg(feature = "api")]
267        let api_server = config.setup_api(handle);
268
269        let topology_controller = SharedTopologyController::new(TopologyController {
270            #[cfg(feature = "api")]
271            api_server,
272            topology: config.topology,
273            config_paths: config.config_paths.clone(),
274            require_healthy: root_opts.require_healthy,
275            extra_context: config.extra_context,
276        });
277
278        Ok(StartedApplication {
279            config_paths: config.config_paths,
280            internal_topologies: config.internal_topologies,
281            graceful_crash_receiver: config.graceful_crash_receiver,
282            signals,
283            topology_controller,
284            allow_empty_config: root_opts.allow_empty_config,
285            interpolate_env: !root_opts.disable_env_var_interpolation,
286        })
287    }
288}
289
290pub struct StartedApplication {
291    pub config_paths: Vec<ConfigPath>,
292    pub internal_topologies: Vec<RunningTopology>,
293    pub graceful_crash_receiver: ShutdownErrorReceiver,
294    pub signals: SignalPair,
295    pub topology_controller: SharedTopologyController,
296    pub allow_empty_config: bool,
297    pub interpolate_env: bool,
298}
299
300impl StartedApplication {
301    pub async fn run(self) -> ExitStatus {
302        self.main().await.shutdown().await
303    }
304
305    pub async fn main(self) -> FinishedApplication {
306        let Self {
307            config_paths,
308            graceful_crash_receiver,
309            signals,
310            topology_controller,
311            internal_topologies,
312            allow_empty_config,
313            interpolate_env,
314        } = self;
315
316        let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
317
318        let mut signal_handler = signals.handler;
319        let mut signal_rx = signals.receiver;
320
321        let signal = loop {
322            let has_sources = !topology_controller.lock().await.topology.config.is_empty();
323            tokio::select! {
324                signal = signal_rx.recv() => if let Some(signal) = handle_signal(
325                    signal,
326                    &topology_controller,
327                    &config_paths,
328                    &mut signal_handler,
329                    allow_empty_config,
330                    interpolate_env,
331                ).await {
332                    break signal;
333                },
334                // Trigger graceful shutdown if a component crashed, or all sources have ended.
335                error = graceful_crash.next() => break SignalTo::Shutdown(error),
336                _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
337                    info!("All sources have finished.");
338                    break SignalTo::Shutdown(None)
339                } ,
340                else => unreachable!("Signal streams never end"),
341            }
342        };
343
344        FinishedApplication {
345            signal,
346            signal_rx,
347            topology_controller,
348            internal_topologies,
349        }
350    }
351}
352
353async fn handle_signal(
354    signal: Result<SignalTo, RecvError>,
355    topology_controller: &SharedTopologyController,
356    config_paths: &[ConfigPath],
357    signal_handler: &mut SignalHandler,
358    allow_empty_config: bool,
359    interpolate_env: bool,
360) -> Option<SignalTo> {
361    match signal {
362        Ok(SignalTo::ReloadComponents(components_to_reload)) => {
363            let mut topology_controller = topology_controller.lock().await;
364            topology_controller
365                .topology
366                .extend_reload_set(components_to_reload);
367
368            // Reload paths
369            if let Some(paths) = config::process_paths(config_paths) {
370                topology_controller.config_paths = paths;
371            }
372
373            // Reload config
374            let new_config = config::load_from_paths_with_provider_and_secrets(
375                &topology_controller.config_paths,
376                signal_handler,
377                allow_empty_config,
378                interpolate_env,
379            )
380            .await;
381
382            reload_config_from_result(topology_controller, new_config).await
383        }
384        Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
385            let topology_controller = topology_controller.lock().await;
386            reload_config_from_result(topology_controller, config_builder.build()).await
387        }
388        Ok(SignalTo::ReloadFromDisk) => {
389            let mut topology_controller = topology_controller.lock().await;
390
391            // Reload paths
392            if let Some(paths) = config::process_paths(config_paths) {
393                topology_controller.config_paths = paths;
394            }
395
396            // Reload config
397            let new_config = config::load_from_paths_with_provider_and_secrets(
398                &topology_controller.config_paths,
399                signal_handler,
400                allow_empty_config,
401                interpolate_env,
402            )
403            .await;
404
405            if let Ok(ref config) = new_config {
406                // Find all transforms that have external files to watch
407                let transform_keys_to_reload = config.transform_keys_with_external_files();
408
409                // Add these transforms to reload set
410                if !transform_keys_to_reload.is_empty() {
411                    info!(
412                        message = "Reloading transforms with external files.",
413                        count = transform_keys_to_reload.len()
414                    );
415                    topology_controller
416                        .topology
417                        .extend_reload_set(transform_keys_to_reload);
418                }
419            }
420
421            reload_config_from_result(topology_controller, new_config).await
422        }
423        Ok(SignalTo::ReloadEnrichmentTables) => {
424            let topology_controller = topology_controller.lock().await;
425
426            topology_controller
427                .topology
428                .reload_enrichment_tables()
429                .await;
430            None
431        }
432        Err(RecvError::Lagged(amt)) => {
433            warn!("Overflow, dropped {} signals.", amt);
434            None
435        }
436        Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
437        Ok(signal) => Some(signal),
438    }
439}
440
441async fn reload_config_from_result(
442    mut topology_controller: MutexGuard<'_, TopologyController>,
443    config: Result<Config, Vec<String>>,
444) -> Option<SignalTo> {
445    match config {
446        Ok(new_config) => match topology_controller.reload(new_config).await {
447            ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
448            _ => None,
449        },
450        Err(errors) => {
451            handle_config_errors(errors);
452            emit!(VectorConfigLoadError);
453            None
454        }
455    }
456}
457
458pub struct FinishedApplication {
459    pub signal: SignalTo,
460    pub signal_rx: SignalRx,
461    pub topology_controller: SharedTopologyController,
462    pub internal_topologies: Vec<RunningTopology>,
463}
464
465impl FinishedApplication {
466    pub async fn shutdown(self) -> ExitStatus {
467        let FinishedApplication {
468            signal,
469            signal_rx,
470            topology_controller,
471            internal_topologies,
472        } = self;
473
474        // At this point, we'll have the only reference to the shared topology controller and can
475        // safely remove it from the wrapper to shut down the topology.
476        let topology_controller = topology_controller
477            .try_into_inner()
478            .expect("fail to unwrap topology controller")
479            .into_inner();
480
481        let status = match signal {
482            SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
483            SignalTo::Quit => Self::quit(),
484            _ => unreachable!(),
485        };
486
487        for topology in internal_topologies {
488            topology.stop().await;
489        }
490
491        status
492    }
493
494    async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
495        emit!(VectorStopping);
496        tokio::select! {
497            _ = topology_controller.stop() => {
498                emit!(VectorStopped);
499                ExitStatus::from_raw({
500                    #[cfg(windows)]
501                    {
502                        exitcode::OK as u32
503                    }
504                    #[cfg(unix)]
505                    exitcode::OK
506                })
507            }, // Graceful shutdown finished
508            _ = signal_rx.recv() => Self::quit(),
509        }
510    }
511
512    fn quit() -> ExitStatus {
513        // It is highly unlikely that this event will exit from topology.
514        emit!(VectorQuit);
515        ExitStatus::from_raw({
516            #[cfg(windows)]
517            {
518                exitcode::UNAVAILABLE as u32
519            }
520            #[cfg(unix)]
521            exitcode::OK
522        })
523    }
524}
525
526fn get_log_levels(default: &str) -> String {
527    std::env::var("VECTOR_LOG")
528        .or_else(|_| {
529            std::env::var("LOG").inspect(|_log| {
530                warn!(
531                    message =
532                        "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
533                );
534            })
535        })
536        .unwrap_or_else(|_| default.into())
537}
538
539pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtime, ExitCode> {
540    let mut rt_builder = runtime::Builder::new_multi_thread();
541    rt_builder.max_blocking_threads(20_000);
542    rt_builder.enable_all().thread_name(thread_name);
543
544    let threads = threads.unwrap_or_else(crate::num_threads);
545    if threads == 0 {
546        error!("The `threads` argument must be greater or equal to 1.");
547        return Err(exitcode::CONFIG);
548    }
549    WORKER_THREADS
550        .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
551        .unwrap_or_else(|_| panic!("double thread initialization"));
552    rt_builder.worker_threads(threads);
553
554    debug!(message = "Building runtime.", worker_threads = threads);
555    Ok(rt_builder.build().expect("Unable to create async runtime"))
556}
557
558pub async fn load_configs(
559    config_paths: &[ConfigPath],
560    watcher_conf: Option<config::watcher::WatcherConfig>,
561    require_healthy: Option<bool>,
562    allow_empty_config: bool,
563    interpolate_env: bool,
564    graceful_shutdown_duration: Option<Duration>,
565    signal_handler: &mut SignalHandler,
566) -> Result<Config, ExitCode> {
567    let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
568
569    let watched_paths = config_paths
570        .iter()
571        .map(<&PathBuf>::from)
572        .collect::<Vec<_>>();
573
574    info!(
575        message = "Loading configs.",
576        paths = ?watched_paths
577    );
578
579    let mut config = config::load_from_paths_with_provider_and_secrets(
580        &config_paths,
581        signal_handler,
582        allow_empty_config,
583        interpolate_env,
584    )
585    .await
586    .map_err(handle_config_errors)?;
587
588    let mut watched_component_paths = Vec::new();
589
590    if let Some(watcher_conf) = watcher_conf {
591        for (name, transform) in config.transforms() {
592            let files = transform.inner.files_to_watch();
593            let component_config = ComponentConfig::new(
594                files.into_iter().cloned().collect(),
595                name.clone(),
596                ComponentType::Transform,
597            );
598            watched_component_paths.push(component_config);
599        }
600
601        for (name, sink) in config.sinks() {
602            let files = sink.inner.files_to_watch();
603            let component_config = ComponentConfig::new(
604                files.into_iter().cloned().collect(),
605                name.clone(),
606                ComponentType::Sink,
607            );
608            watched_component_paths.push(component_config);
609        }
610
611        for (name, table) in config.enrichment_tables() {
612            let files = table.inner.files_to_watch();
613            let component_config = ComponentConfig::new(
614                files.into_iter().cloned().collect(),
615                name.clone(),
616                ComponentType::EnrichmentTable,
617            );
618            watched_component_paths.push(component_config);
619        }
620
621        info!(
622            message = "Starting watcher.",
623            paths = ?watched_paths
624        );
625        info!(
626            message = "Components to watch.",
627            paths = ?watched_component_paths
628        );
629
630        // Start listening for config changes.
631        config::watcher::spawn_thread(
632            watcher_conf,
633            signal_handler.clone_tx(),
634            watched_paths,
635            watched_component_paths,
636            None,
637        )
638        .map_err(|error| {
639            error!(message = "Unable to start config watcher.", %error);
640            exitcode::CONFIG
641        })?;
642    }
643
644    config::init_log_schema(config.global.log_schema.clone(), true);
645    config::init_telemetry(config.global.telemetry.clone(), true);
646
647    if !config.healthchecks.enabled {
648        info!("Health checks are disabled.");
649    }
650    config.healthchecks.set_require_healthy(require_healthy);
651    config.graceful_shutdown_duration = graceful_shutdown_duration;
652
653    Ok(config)
654}
655
656pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) {
657    let level = get_log_levels(log_level);
658    let json = match format {
659        LogFormat::Text => false,
660        LogFormat::Json => true,
661    };
662
663    trace::init(color, json, &level, rate);
664    debug!(
665        message = "Internal log rate limit configured.",
666        internal_log_rate_secs = rate,
667    );
668    info!(message = "Log level is enabled.", level = ?level);
669}
670
671pub fn watcher_config(
672    method: WatchConfigMethod,
673    interval: NonZeroU64,
674) -> config::watcher::WatcherConfig {
675    match method {
676        WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
677        WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
678    }
679}