vector/
app.rs

1#![allow(missing_docs)]
2#[cfg(unix)]
3use std::os::unix::process::ExitStatusExt;
4#[cfg(windows)]
5use std::os::windows::process::ExitStatusExt;
6use std::{
7    num::{NonZeroU64, NonZeroUsize},
8    path::PathBuf,
9    process::ExitStatus,
10    sync::atomic::{AtomicUsize, Ordering},
11    time::Duration,
12};
13
14use exitcode::ExitCode;
15use futures::StreamExt;
16use tokio::{
17    runtime::{self, Handle, Runtime},
18    sync::{MutexGuard, broadcast::error::RecvError},
19};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22#[cfg(feature = "api")]
23use crate::{api, internal_events::ApiStarted};
24use crate::{
25    cli::{LogFormat, Opts, RootOpts, WatchConfigMethod, handle_config_errors},
26    config::{self, ComponentConfig, ComponentType, Config, ConfigPath},
27    extra_context::ExtraContext,
28    heartbeat,
29    internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
30    signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
31    topology::{
32        ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
33        TopologyController,
34    },
35    trace,
36};
37
38static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
39
40pub fn worker_threads() -> Option<NonZeroUsize> {
41    NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
42}
43
44pub struct ApplicationConfig {
45    pub config_paths: Vec<config::ConfigPath>,
46    pub topology: RunningTopology,
47    pub graceful_crash_receiver: ShutdownErrorReceiver,
48    pub internal_topologies: Vec<RunningTopology>,
49    #[cfg(feature = "api")]
50    pub api: config::api::Options,
51    pub extra_context: ExtraContext,
52}
53
54pub struct Application {
55    pub root_opts: RootOpts,
56    pub config: ApplicationConfig,
57    pub signals: SignalPair,
58}
59
60impl ApplicationConfig {
61    pub async fn from_opts(
62        opts: &RootOpts,
63        signal_handler: &mut SignalHandler,
64        extra_context: ExtraContext,
65    ) -> Result<Self, ExitCode> {
66        let config_paths = opts.config_paths_with_formats();
67
68        let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
69            .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
70
71        let watcher_conf = if opts.watch_config {
72            Some(watcher_config(
73                opts.watch_config_method,
74                opts.watch_config_poll_interval_seconds,
75            ))
76        } else {
77            None
78        };
79
80        let config = load_configs(
81            &config_paths,
82            watcher_conf,
83            opts.require_healthy,
84            opts.allow_empty_config,
85            !opts.disable_env_var_interpolation,
86            graceful_shutdown_duration,
87            signal_handler,
88        )
89        .await?;
90
91        Self::from_config(config_paths, config, extra_context).await
92    }
93
94    pub async fn from_config(
95        config_paths: Vec<ConfigPath>,
96        config: Config,
97        extra_context: ExtraContext,
98    ) -> Result<Self, ExitCode> {
99        #[cfg(feature = "api")]
100        let api = config.api;
101
102        let (topology, graceful_crash_receiver) =
103            RunningTopology::start_init_validated(config, extra_context.clone())
104                .await
105                .ok_or(exitcode::CONFIG)?;
106
107        Ok(Self {
108            config_paths,
109            topology,
110            graceful_crash_receiver,
111            internal_topologies: Vec::new(),
112            #[cfg(feature = "api")]
113            api,
114            extra_context,
115        })
116    }
117
118    pub async fn add_internal_config(
119        &mut self,
120        config: Config,
121        extra_context: ExtraContext,
122    ) -> Result<(), ExitCode> {
123        let Some((topology, _)) =
124            RunningTopology::start_init_validated(config, extra_context).await
125        else {
126            return Err(exitcode::CONFIG);
127        };
128        self.internal_topologies.push(topology);
129        Ok(())
130    }
131
132    /// Configure the API server, if applicable
133    #[cfg(feature = "api")]
134    pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
135        if self.api.enabled {
136            match api::Server::start(
137                self.topology.config(),
138                self.topology.watch(),
139                std::sync::Arc::clone(&self.topology.running),
140                handle,
141            ) {
142                Ok(api_server) => {
143                    emit!(ApiStarted {
144                        addr: self.api.address.unwrap(),
145                        playground: self.api.playground,
146                        graphql: self.api.graphql
147                    });
148
149                    Some(api_server)
150                }
151                Err(error) => {
152                    let error = error.to_string();
153                    error!(message = "An error occurred that Vector couldn't handle.", %error, internal_log_rate_limit = false);
154                    _ = self
155                        .topology
156                        .abort_tx
157                        .send(crate::signal::ShutdownError::ApiFailed { error });
158                    None
159                }
160            }
161        } else {
162            info!(
163                message = "API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`."
164            );
165            None
166        }
167    }
168}
169
170impl Application {
171    pub fn run(extra_context: ExtraContext) -> ExitStatus {
172        let (runtime, app) =
173            Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
174
175        runtime.block_on(app.run())
176    }
177
178    pub fn prepare_start(
179        extra_context: ExtraContext,
180    ) -> Result<(Runtime, StartedApplication), ExitCode> {
181        Self::prepare(extra_context)
182            .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
183    }
184
185    pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
186        let opts = Opts::get_matches().map_err(|error| {
187            // Printing to stdout/err can itself fail; ignore it.
188            _ = error.print();
189            exitcode::USAGE
190        })?;
191
192        Self::prepare_from_opts(opts, extra_context)
193    }
194
195    pub fn prepare_from_opts(
196        opts: Opts,
197        extra_context: ExtraContext,
198    ) -> Result<(Runtime, Self), ExitCode> {
199        opts.root.init_global();
200
201        let color = opts.root.color.use_color();
202
203        init_logging(
204            color,
205            opts.root.log_format,
206            opts.log_level(),
207            opts.root.internal_log_rate_limit,
208        );
209
210        // Set global color preference for downstream modules
211        crate::set_global_color(color);
212
213        // Can only log this after initializing the logging subsystem
214        if opts.root.openssl_no_probe {
215            debug!(
216                message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL."
217            );
218        }
219
220        let runtime = build_runtime(opts.root.threads, "vector-worker")?;
221
222        // Signal handler for OS and provider messages.
223        let mut signals = SignalPair::new(&runtime);
224
225        if let Some(sub_command) = &opts.sub_command {
226            return Err(runtime.block_on(sub_command.execute(signals, color)));
227        }
228
229        let config = runtime.block_on(ApplicationConfig::from_opts(
230            &opts.root,
231            &mut signals.handler,
232            extra_context,
233        ))?;
234
235        Ok((
236            runtime,
237            Self {
238                root_opts: opts.root,
239                config,
240                signals,
241            },
242        ))
243    }
244
245    pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
246        // Any internal_logs sources will have grabbed a copy of the
247        // early buffer by this point and set up a subscriber.
248        crate::trace::stop_early_buffering();
249
250        emit!(VectorStarted);
251        handle.spawn(heartbeat::heartbeat());
252
253        let Self {
254            root_opts,
255            config,
256            signals,
257        } = self;
258
259        let topology_controller = SharedTopologyController::new(TopologyController {
260            #[cfg(feature = "api")]
261            api_server: config.setup_api(handle),
262            topology: config.topology,
263            config_paths: config.config_paths.clone(),
264            require_healthy: root_opts.require_healthy,
265            extra_context: config.extra_context,
266        });
267
268        Ok(StartedApplication {
269            config_paths: config.config_paths,
270            internal_topologies: config.internal_topologies,
271            graceful_crash_receiver: config.graceful_crash_receiver,
272            signals,
273            topology_controller,
274            allow_empty_config: root_opts.allow_empty_config,
275            interpolate_env: !root_opts.disable_env_var_interpolation,
276        })
277    }
278}
279
280pub struct StartedApplication {
281    pub config_paths: Vec<ConfigPath>,
282    pub internal_topologies: Vec<RunningTopology>,
283    pub graceful_crash_receiver: ShutdownErrorReceiver,
284    pub signals: SignalPair,
285    pub topology_controller: SharedTopologyController,
286    pub allow_empty_config: bool,
287    pub interpolate_env: bool,
288}
289
290impl StartedApplication {
291    pub async fn run(self) -> ExitStatus {
292        self.main().await.shutdown().await
293    }
294
295    pub async fn main(self) -> FinishedApplication {
296        let Self {
297            config_paths,
298            graceful_crash_receiver,
299            signals,
300            topology_controller,
301            internal_topologies,
302            allow_empty_config,
303            interpolate_env,
304        } = self;
305
306        let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
307
308        let mut signal_handler = signals.handler;
309        let mut signal_rx = signals.receiver;
310
311        let signal = loop {
312            let has_sources = !topology_controller.lock().await.topology.config.is_empty();
313            tokio::select! {
314                signal = signal_rx.recv() => if let Some(signal) = handle_signal(
315                    signal,
316                    &topology_controller,
317                    &config_paths,
318                    &mut signal_handler,
319                    allow_empty_config,
320                    interpolate_env,
321                ).await {
322                    break signal;
323                },
324                // Trigger graceful shutdown if a component crashed, or all sources have ended.
325                error = graceful_crash.next() => break SignalTo::Shutdown(error),
326                _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
327                    info!("All sources have finished.");
328                    break SignalTo::Shutdown(None)
329                } ,
330                else => unreachable!("Signal streams never end"),
331            }
332        };
333
334        FinishedApplication {
335            signal,
336            signal_rx,
337            topology_controller,
338            internal_topologies,
339        }
340    }
341}
342
343async fn handle_signal(
344    signal: Result<SignalTo, RecvError>,
345    topology_controller: &SharedTopologyController,
346    config_paths: &[ConfigPath],
347    signal_handler: &mut SignalHandler,
348    allow_empty_config: bool,
349    interpolate_env: bool,
350) -> Option<SignalTo> {
351    match signal {
352        Ok(SignalTo::ReloadComponents(components_to_reload)) => {
353            let mut topology_controller = topology_controller.lock().await;
354            topology_controller
355                .topology
356                .extend_reload_set(components_to_reload);
357
358            // Reload paths
359            if let Some(paths) = config::process_paths(config_paths) {
360                topology_controller.config_paths = paths;
361            }
362
363            // Reload config
364            let new_config = config::load_from_paths_with_provider_and_secrets(
365                &topology_controller.config_paths,
366                signal_handler,
367                allow_empty_config,
368                interpolate_env,
369            )
370            .await;
371
372            reload_config_from_result(topology_controller, new_config).await
373        }
374        Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
375            let topology_controller = topology_controller.lock().await;
376            reload_config_from_result(topology_controller, config_builder.build()).await
377        }
378        Ok(SignalTo::ReloadFromDisk) => {
379            let mut topology_controller = topology_controller.lock().await;
380
381            // Reload paths
382            if let Some(paths) = config::process_paths(config_paths) {
383                topology_controller.config_paths = paths;
384            }
385
386            // Reload config
387            let new_config = config::load_from_paths_with_provider_and_secrets(
388                &topology_controller.config_paths,
389                signal_handler,
390                allow_empty_config,
391                interpolate_env,
392            )
393            .await;
394
395            if let Ok(ref config) = new_config {
396                // Find all transforms that have external files to watch
397                let transform_keys_to_reload = config.transform_keys_with_external_files();
398
399                // Add these transforms to reload set
400                if !transform_keys_to_reload.is_empty() {
401                    info!(
402                        message = "Reloading transforms with external files.",
403                        count = transform_keys_to_reload.len()
404                    );
405                    topology_controller
406                        .topology
407                        .extend_reload_set(transform_keys_to_reload);
408                }
409            }
410
411            reload_config_from_result(topology_controller, new_config).await
412        }
413        Ok(SignalTo::ReloadEnrichmentTables) => {
414            let topology_controller = topology_controller.lock().await;
415
416            topology_controller
417                .topology
418                .reload_enrichment_tables()
419                .await;
420            None
421        }
422        Err(RecvError::Lagged(amt)) => {
423            warn!("Overflow, dropped {} signals.", amt);
424            None
425        }
426        Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
427        Ok(signal) => Some(signal),
428    }
429}
430
431async fn reload_config_from_result(
432    mut topology_controller: MutexGuard<'_, TopologyController>,
433    config: Result<Config, Vec<String>>,
434) -> Option<SignalTo> {
435    match config {
436        Ok(new_config) => match topology_controller.reload(new_config).await {
437            ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
438            _ => None,
439        },
440        Err(errors) => {
441            handle_config_errors(errors);
442            emit!(VectorConfigLoadError);
443            None
444        }
445    }
446}
447
448pub struct FinishedApplication {
449    pub signal: SignalTo,
450    pub signal_rx: SignalRx,
451    pub topology_controller: SharedTopologyController,
452    pub internal_topologies: Vec<RunningTopology>,
453}
454
455impl FinishedApplication {
456    pub async fn shutdown(self) -> ExitStatus {
457        let FinishedApplication {
458            signal,
459            signal_rx,
460            topology_controller,
461            internal_topologies,
462        } = self;
463
464        // At this point, we'll have the only reference to the shared topology controller and can
465        // safely remove it from the wrapper to shut down the topology.
466        let topology_controller = topology_controller
467            .try_into_inner()
468            .expect("fail to unwrap topology controller")
469            .into_inner();
470
471        let status = match signal {
472            SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
473            SignalTo::Quit => Self::quit(),
474            _ => unreachable!(),
475        };
476
477        for topology in internal_topologies {
478            topology.stop().await;
479        }
480
481        status
482    }
483
484    async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
485        emit!(VectorStopped);
486        tokio::select! {
487            _ = topology_controller.stop() => ExitStatus::from_raw({
488                #[cfg(windows)]
489                {
490                    exitcode::OK as u32
491                }
492                #[cfg(unix)]
493                exitcode::OK
494            }), // Graceful shutdown finished
495            _ = signal_rx.recv() => Self::quit(),
496        }
497    }
498
499    fn quit() -> ExitStatus {
500        // It is highly unlikely that this event will exit from topology.
501        emit!(VectorQuit);
502        ExitStatus::from_raw({
503            #[cfg(windows)]
504            {
505                exitcode::UNAVAILABLE as u32
506            }
507            #[cfg(unix)]
508            exitcode::OK
509        })
510    }
511}
512
513fn get_log_levels(default: &str) -> String {
514    std::env::var("VECTOR_LOG")
515        .or_else(|_| {
516            std::env::var("LOG").inspect(|_log| {
517                warn!(
518                    message =
519                        "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
520                );
521            })
522        })
523        .unwrap_or_else(|_| default.into())
524}
525
526pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtime, ExitCode> {
527    let mut rt_builder = runtime::Builder::new_multi_thread();
528    rt_builder.max_blocking_threads(20_000);
529    rt_builder.enable_all().thread_name(thread_name);
530
531    let threads = threads.unwrap_or_else(crate::num_threads);
532    if threads == 0 {
533        error!("The `threads` argument must be greater or equal to 1.");
534        return Err(exitcode::CONFIG);
535    }
536    WORKER_THREADS
537        .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
538        .unwrap_or_else(|_| panic!("double thread initialization"));
539    rt_builder.worker_threads(threads);
540
541    debug!(message = "Building runtime.", worker_threads = threads);
542    Ok(rt_builder.build().expect("Unable to create async runtime"))
543}
544
545pub async fn load_configs(
546    config_paths: &[ConfigPath],
547    watcher_conf: Option<config::watcher::WatcherConfig>,
548    require_healthy: Option<bool>,
549    allow_empty_config: bool,
550    interpolate_env: bool,
551    graceful_shutdown_duration: Option<Duration>,
552    signal_handler: &mut SignalHandler,
553) -> Result<Config, ExitCode> {
554    let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
555
556    let watched_paths = config_paths
557        .iter()
558        .map(<&PathBuf>::from)
559        .collect::<Vec<_>>();
560
561    info!(
562        message = "Loading configs.",
563        paths = ?watched_paths
564    );
565
566    let mut config = config::load_from_paths_with_provider_and_secrets(
567        &config_paths,
568        signal_handler,
569        allow_empty_config,
570        interpolate_env,
571    )
572    .await
573    .map_err(handle_config_errors)?;
574
575    let mut watched_component_paths = Vec::new();
576
577    if let Some(watcher_conf) = watcher_conf {
578        for (name, transform) in config.transforms() {
579            let files = transform.inner.files_to_watch();
580            let component_config = ComponentConfig::new(
581                files.into_iter().cloned().collect(),
582                name.clone(),
583                ComponentType::Transform,
584            );
585            watched_component_paths.push(component_config);
586        }
587
588        for (name, sink) in config.sinks() {
589            let files = sink.inner.files_to_watch();
590            let component_config = ComponentConfig::new(
591                files.into_iter().cloned().collect(),
592                name.clone(),
593                ComponentType::Sink,
594            );
595            watched_component_paths.push(component_config);
596        }
597
598        for (name, table) in config.enrichment_tables() {
599            let files = table.inner.files_to_watch();
600            let component_config = ComponentConfig::new(
601                files.into_iter().cloned().collect(),
602                name.clone(),
603                ComponentType::EnrichmentTable,
604            );
605            watched_component_paths.push(component_config);
606        }
607
608        info!(
609            message = "Starting watcher.",
610            paths = ?watched_paths
611        );
612        info!(
613            message = "Components to watch.",
614            paths = ?watched_component_paths
615        );
616
617        // Start listening for config changes.
618        config::watcher::spawn_thread(
619            watcher_conf,
620            signal_handler.clone_tx(),
621            watched_paths,
622            watched_component_paths,
623            None,
624        )
625        .map_err(|error| {
626            error!(message = "Unable to start config watcher.", %error);
627            exitcode::CONFIG
628        })?;
629    }
630
631    config::init_log_schema(config.global.log_schema.clone(), true);
632    config::init_telemetry(config.global.telemetry.clone(), true);
633
634    if !config.healthchecks.enabled {
635        info!("Health checks are disabled.");
636    }
637    config.healthchecks.set_require_healthy(require_healthy);
638    config.graceful_shutdown_duration = graceful_shutdown_duration;
639
640    Ok(config)
641}
642
643pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) {
644    let level = get_log_levels(log_level);
645    let json = match format {
646        LogFormat::Text => false,
647        LogFormat::Json => true,
648    };
649
650    trace::init(color, json, &level, rate);
651    debug!(
652        message = "Internal log rate limit configured.",
653        internal_log_rate_secs = rate,
654    );
655    info!(message = "Log level is enabled.", level = ?level);
656}
657
658pub fn watcher_config(
659    method: WatchConfigMethod,
660    interval: NonZeroU64,
661) -> config::watcher::WatcherConfig {
662    match method {
663        WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
664        WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
665    }
666}