vector/
app.rs

1#![allow(missing_docs)]
2#[cfg(unix)]
3use std::os::unix::process::ExitStatusExt;
4#[cfg(windows)]
5use std::os::windows::process::ExitStatusExt;
6use std::{
7    num::{NonZeroU64, NonZeroUsize},
8    path::PathBuf,
9    process::ExitStatus,
10    sync::atomic::{AtomicUsize, Ordering},
11    time::Duration,
12};
13
14use exitcode::ExitCode;
15use futures::StreamExt;
16use tokio::{
17    runtime::{self, Handle, Runtime},
18    sync::{MutexGuard, broadcast::error::RecvError},
19};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21
22#[cfg(feature = "api")]
23use crate::{api, internal_events::ApiStarted};
24use crate::{
25    cli::{LogFormat, Opts, RootOpts, WatchConfigMethod, handle_config_errors},
26    config::{self, ComponentConfig, ComponentType, Config, ConfigPath},
27    extra_context::ExtraContext,
28    heartbeat,
29    internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
30    signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
31    topology::{
32        ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
33        TopologyController,
34    },
35    trace,
36};
37
38static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
39
40pub fn worker_threads() -> Option<NonZeroUsize> {
41    NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
42}
43
44pub struct ApplicationConfig {
45    pub config_paths: Vec<config::ConfigPath>,
46    pub topology: RunningTopology,
47    pub graceful_crash_receiver: ShutdownErrorReceiver,
48    pub internal_topologies: Vec<RunningTopology>,
49    #[cfg(feature = "api")]
50    pub api: config::api::Options,
51    pub extra_context: ExtraContext,
52}
53
54pub struct Application {
55    pub root_opts: RootOpts,
56    pub config: ApplicationConfig,
57    pub signals: SignalPair,
58}
59
60impl ApplicationConfig {
61    pub async fn from_opts(
62        opts: &RootOpts,
63        signal_handler: &mut SignalHandler,
64        extra_context: ExtraContext,
65    ) -> Result<Self, ExitCode> {
66        let config_paths = opts.config_paths_with_formats();
67
68        let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
69            .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
70
71        let watcher_conf = if opts.watch_config {
72            Some(watcher_config(
73                opts.watch_config_method,
74                opts.watch_config_poll_interval_seconds,
75            ))
76        } else {
77            None
78        };
79
80        let config = load_configs(
81            &config_paths,
82            watcher_conf,
83            opts.require_healthy,
84            opts.allow_empty_config,
85            graceful_shutdown_duration,
86            signal_handler,
87        )
88        .await?;
89
90        Self::from_config(config_paths, config, extra_context).await
91    }
92
93    pub async fn from_config(
94        config_paths: Vec<ConfigPath>,
95        config: Config,
96        extra_context: ExtraContext,
97    ) -> Result<Self, ExitCode> {
98        #[cfg(feature = "api")]
99        let api = config.api;
100
101        let (topology, graceful_crash_receiver) =
102            RunningTopology::start_init_validated(config, extra_context.clone())
103                .await
104                .ok_or(exitcode::CONFIG)?;
105
106        Ok(Self {
107            config_paths,
108            topology,
109            graceful_crash_receiver,
110            internal_topologies: Vec::new(),
111            #[cfg(feature = "api")]
112            api,
113            extra_context,
114        })
115    }
116
117    pub async fn add_internal_config(
118        &mut self,
119        config: Config,
120        extra_context: ExtraContext,
121    ) -> Result<(), ExitCode> {
122        let Some((topology, _)) =
123            RunningTopology::start_init_validated(config, extra_context).await
124        else {
125            return Err(exitcode::CONFIG);
126        };
127        self.internal_topologies.push(topology);
128        Ok(())
129    }
130
131    /// Configure the API server, if applicable
132    #[cfg(feature = "api")]
133    pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
134        if self.api.enabled {
135            match api::Server::start(
136                self.topology.config(),
137                self.topology.watch(),
138                std::sync::Arc::clone(&self.topology.running),
139                handle,
140            ) {
141                Ok(api_server) => {
142                    emit!(ApiStarted {
143                        addr: self.api.address.unwrap(),
144                        playground: self.api.playground,
145                        graphql: self.api.graphql
146                    });
147
148                    Some(api_server)
149                }
150                Err(error) => {
151                    let error = error.to_string();
152                    error!(message = "An error occurred that Vector couldn't handle.", %error, internal_log_rate_limit = false);
153                    _ = self
154                        .topology
155                        .abort_tx
156                        .send(crate::signal::ShutdownError::ApiFailed { error });
157                    None
158                }
159            }
160        } else {
161            info!(
162                message = "API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`."
163            );
164            None
165        }
166    }
167}
168
169impl Application {
170    pub fn run(extra_context: ExtraContext) -> ExitStatus {
171        let (runtime, app) =
172            Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
173
174        runtime.block_on(app.run())
175    }
176
177    pub fn prepare_start(
178        extra_context: ExtraContext,
179    ) -> Result<(Runtime, StartedApplication), ExitCode> {
180        Self::prepare(extra_context)
181            .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
182    }
183
184    pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
185        let opts = Opts::get_matches().map_err(|error| {
186            // Printing to stdout/err can itself fail; ignore it.
187            _ = error.print();
188            exitcode::USAGE
189        })?;
190
191        Self::prepare_from_opts(opts, extra_context)
192    }
193
194    pub fn prepare_from_opts(
195        opts: Opts,
196        extra_context: ExtraContext,
197    ) -> Result<(Runtime, Self), ExitCode> {
198        opts.root.init_global();
199
200        let color = opts.root.color.use_color();
201
202        init_logging(
203            color,
204            opts.root.log_format,
205            opts.log_level(),
206            opts.root.internal_log_rate_limit,
207        );
208
209        // Set global color preference for downstream modules
210        crate::set_global_color(color);
211
212        // Can only log this after initializing the logging subsystem
213        if opts.root.openssl_no_probe {
214            debug!(
215                message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL."
216            );
217        }
218
219        let runtime = build_runtime(opts.root.threads, "vector-worker")?;
220
221        // Signal handler for OS and provider messages.
222        let mut signals = SignalPair::new(&runtime);
223
224        if let Some(sub_command) = &opts.sub_command {
225            return Err(runtime.block_on(sub_command.execute(signals, color)));
226        }
227
228        let config = runtime.block_on(ApplicationConfig::from_opts(
229            &opts.root,
230            &mut signals.handler,
231            extra_context,
232        ))?;
233
234        Ok((
235            runtime,
236            Self {
237                root_opts: opts.root,
238                config,
239                signals,
240            },
241        ))
242    }
243
244    pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
245        // Any internal_logs sources will have grabbed a copy of the
246        // early buffer by this point and set up a subscriber.
247        crate::trace::stop_early_buffering();
248
249        emit!(VectorStarted);
250        handle.spawn(heartbeat::heartbeat());
251
252        let Self {
253            root_opts,
254            config,
255            signals,
256        } = self;
257
258        let topology_controller = SharedTopologyController::new(TopologyController {
259            #[cfg(feature = "api")]
260            api_server: config.setup_api(handle),
261            topology: config.topology,
262            config_paths: config.config_paths.clone(),
263            require_healthy: root_opts.require_healthy,
264            extra_context: config.extra_context,
265        });
266
267        Ok(StartedApplication {
268            config_paths: config.config_paths,
269            internal_topologies: config.internal_topologies,
270            graceful_crash_receiver: config.graceful_crash_receiver,
271            signals,
272            topology_controller,
273            allow_empty_config: root_opts.allow_empty_config,
274        })
275    }
276}
277
278pub struct StartedApplication {
279    pub config_paths: Vec<ConfigPath>,
280    pub internal_topologies: Vec<RunningTopology>,
281    pub graceful_crash_receiver: ShutdownErrorReceiver,
282    pub signals: SignalPair,
283    pub topology_controller: SharedTopologyController,
284    pub allow_empty_config: bool,
285}
286
287impl StartedApplication {
288    pub async fn run(self) -> ExitStatus {
289        self.main().await.shutdown().await
290    }
291
292    pub async fn main(self) -> FinishedApplication {
293        let Self {
294            config_paths,
295            graceful_crash_receiver,
296            signals,
297            topology_controller,
298            internal_topologies,
299            allow_empty_config,
300        } = self;
301
302        let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
303
304        let mut signal_handler = signals.handler;
305        let mut signal_rx = signals.receiver;
306
307        let signal = loop {
308            let has_sources = !topology_controller.lock().await.topology.config.is_empty();
309            tokio::select! {
310                signal = signal_rx.recv() => if let Some(signal) = handle_signal(
311                    signal,
312                    &topology_controller,
313                    &config_paths,
314                    &mut signal_handler,
315                    allow_empty_config,
316                ).await {
317                    break signal;
318                },
319                // Trigger graceful shutdown if a component crashed, or all sources have ended.
320                error = graceful_crash.next() => break SignalTo::Shutdown(error),
321                _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
322                    info!("All sources have finished.");
323                    break SignalTo::Shutdown(None)
324                } ,
325                else => unreachable!("Signal streams never end"),
326            }
327        };
328
329        FinishedApplication {
330            signal,
331            signal_rx,
332            topology_controller,
333            internal_topologies,
334        }
335    }
336}
337
338async fn handle_signal(
339    signal: Result<SignalTo, RecvError>,
340    topology_controller: &SharedTopologyController,
341    config_paths: &[ConfigPath],
342    signal_handler: &mut SignalHandler,
343    allow_empty_config: bool,
344) -> Option<SignalTo> {
345    match signal {
346        Ok(SignalTo::ReloadComponents(components_to_reload)) => {
347            let mut topology_controller = topology_controller.lock().await;
348            topology_controller
349                .topology
350                .extend_reload_set(components_to_reload);
351
352            // Reload paths
353            if let Some(paths) = config::process_paths(config_paths) {
354                topology_controller.config_paths = paths;
355            }
356
357            // Reload config
358            let new_config = config::load_from_paths_with_provider_and_secrets(
359                &topology_controller.config_paths,
360                signal_handler,
361                allow_empty_config,
362            )
363            .await;
364
365            reload_config_from_result(topology_controller, new_config).await
366        }
367        Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
368            let topology_controller = topology_controller.lock().await;
369            reload_config_from_result(topology_controller, config_builder.build()).await
370        }
371        Ok(SignalTo::ReloadFromDisk) => {
372            let mut topology_controller = topology_controller.lock().await;
373
374            // Reload paths
375            if let Some(paths) = config::process_paths(config_paths) {
376                topology_controller.config_paths = paths;
377            }
378
379            // Reload config
380            let new_config = config::load_from_paths_with_provider_and_secrets(
381                &topology_controller.config_paths,
382                signal_handler,
383                allow_empty_config,
384            )
385            .await;
386
387            if let Ok(ref config) = new_config {
388                // Find all transforms that have external files to watch
389                let transform_keys_to_reload = config.transform_keys_with_external_files();
390
391                // Add these transforms to reload set
392                if !transform_keys_to_reload.is_empty() {
393                    info!(
394                        message = "Reloading transforms with external files.",
395                        count = transform_keys_to_reload.len()
396                    );
397                    topology_controller
398                        .topology
399                        .extend_reload_set(transform_keys_to_reload);
400                }
401            }
402
403            reload_config_from_result(topology_controller, new_config).await
404        }
405        Ok(SignalTo::ReloadEnrichmentTables) => {
406            let topology_controller = topology_controller.lock().await;
407
408            topology_controller
409                .topology
410                .reload_enrichment_tables()
411                .await;
412            None
413        }
414        Err(RecvError::Lagged(amt)) => {
415            warn!("Overflow, dropped {} signals.", amt);
416            None
417        }
418        Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
419        Ok(signal) => Some(signal),
420    }
421}
422
423async fn reload_config_from_result(
424    mut topology_controller: MutexGuard<'_, TopologyController>,
425    config: Result<Config, Vec<String>>,
426) -> Option<SignalTo> {
427    match config {
428        Ok(new_config) => match topology_controller.reload(new_config).await {
429            ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
430            _ => None,
431        },
432        Err(errors) => {
433            handle_config_errors(errors);
434            emit!(VectorConfigLoadError);
435            None
436        }
437    }
438}
439
440pub struct FinishedApplication {
441    pub signal: SignalTo,
442    pub signal_rx: SignalRx,
443    pub topology_controller: SharedTopologyController,
444    pub internal_topologies: Vec<RunningTopology>,
445}
446
447impl FinishedApplication {
448    pub async fn shutdown(self) -> ExitStatus {
449        let FinishedApplication {
450            signal,
451            signal_rx,
452            topology_controller,
453            internal_topologies,
454        } = self;
455
456        // At this point, we'll have the only reference to the shared topology controller and can
457        // safely remove it from the wrapper to shut down the topology.
458        let topology_controller = topology_controller
459            .try_into_inner()
460            .expect("fail to unwrap topology controller")
461            .into_inner();
462
463        let status = match signal {
464            SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
465            SignalTo::Quit => Self::quit(),
466            _ => unreachable!(),
467        };
468
469        for topology in internal_topologies {
470            topology.stop().await;
471        }
472
473        status
474    }
475
476    async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
477        emit!(VectorStopped);
478        tokio::select! {
479            _ = topology_controller.stop() => ExitStatus::from_raw({
480                #[cfg(windows)]
481                {
482                    exitcode::OK as u32
483                }
484                #[cfg(unix)]
485                exitcode::OK
486            }), // Graceful shutdown finished
487            _ = signal_rx.recv() => Self::quit(),
488        }
489    }
490
491    fn quit() -> ExitStatus {
492        // It is highly unlikely that this event will exit from topology.
493        emit!(VectorQuit);
494        ExitStatus::from_raw({
495            #[cfg(windows)]
496            {
497                exitcode::UNAVAILABLE as u32
498            }
499            #[cfg(unix)]
500            exitcode::OK
501        })
502    }
503}
504
505fn get_log_levels(default: &str) -> String {
506    std::env::var("VECTOR_LOG")
507        .or_else(|_| {
508            std::env::var("LOG").inspect(|_log| {
509                warn!(
510                    message =
511                        "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
512                );
513            })
514        })
515        .unwrap_or_else(|_| default.into())
516}
517
518pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtime, ExitCode> {
519    let mut rt_builder = runtime::Builder::new_multi_thread();
520    rt_builder.max_blocking_threads(20_000);
521    rt_builder.enable_all().thread_name(thread_name);
522
523    let threads = threads.unwrap_or_else(crate::num_threads);
524    if threads == 0 {
525        error!("The `threads` argument must be greater or equal to 1.");
526        return Err(exitcode::CONFIG);
527    }
528    WORKER_THREADS
529        .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
530        .unwrap_or_else(|_| panic!("double thread initialization"));
531    rt_builder.worker_threads(threads);
532
533    debug!(message = "Building runtime.", worker_threads = threads);
534    Ok(rt_builder.build().expect("Unable to create async runtime"))
535}
536
537pub async fn load_configs(
538    config_paths: &[ConfigPath],
539    watcher_conf: Option<config::watcher::WatcherConfig>,
540    require_healthy: Option<bool>,
541    allow_empty_config: bool,
542    graceful_shutdown_duration: Option<Duration>,
543    signal_handler: &mut SignalHandler,
544) -> Result<Config, ExitCode> {
545    let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
546
547    let watched_paths = config_paths
548        .iter()
549        .map(<&PathBuf>::from)
550        .collect::<Vec<_>>();
551
552    info!(
553        message = "Loading configs.",
554        paths = ?watched_paths
555    );
556
557    let mut config = config::load_from_paths_with_provider_and_secrets(
558        &config_paths,
559        signal_handler,
560        allow_empty_config,
561    )
562    .await
563    .map_err(handle_config_errors)?;
564
565    let mut watched_component_paths = Vec::new();
566
567    if let Some(watcher_conf) = watcher_conf {
568        for (name, transform) in config.transforms() {
569            let files = transform.inner.files_to_watch();
570            let component_config = ComponentConfig::new(
571                files.into_iter().cloned().collect(),
572                name.clone(),
573                ComponentType::Transform,
574            );
575            watched_component_paths.push(component_config);
576        }
577
578        for (name, sink) in config.sinks() {
579            let files = sink.inner.files_to_watch();
580            let component_config = ComponentConfig::new(
581                files.into_iter().cloned().collect(),
582                name.clone(),
583                ComponentType::Sink,
584            );
585            watched_component_paths.push(component_config);
586        }
587
588        for (name, table) in config.enrichment_tables() {
589            let files = table.inner.files_to_watch();
590            let component_config = ComponentConfig::new(
591                files.into_iter().cloned().collect(),
592                name.clone(),
593                ComponentType::EnrichmentTable,
594            );
595            watched_component_paths.push(component_config);
596        }
597
598        info!(
599            message = "Starting watcher.",
600            paths = ?watched_paths
601        );
602        info!(
603            message = "Components to watch.",
604            paths = ?watched_component_paths
605        );
606
607        // Start listening for config changes.
608        config::watcher::spawn_thread(
609            watcher_conf,
610            signal_handler.clone_tx(),
611            watched_paths,
612            watched_component_paths,
613            None,
614        )
615        .map_err(|error| {
616            error!(message = "Unable to start config watcher.", %error);
617            exitcode::CONFIG
618        })?;
619    }
620
621    config::init_log_schema(config.global.log_schema.clone(), true);
622    config::init_telemetry(config.global.telemetry.clone(), true);
623
624    if !config.healthchecks.enabled {
625        info!("Health checks are disabled.");
626    }
627    config.healthchecks.set_require_healthy(require_healthy);
628    config.graceful_shutdown_duration = graceful_shutdown_duration;
629
630    Ok(config)
631}
632
633pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) {
634    let level = get_log_levels(log_level);
635    let json = match format {
636        LogFormat::Text => false,
637        LogFormat::Json => true,
638    };
639
640    trace::init(color, json, &level, rate);
641    debug!(
642        message = "Internal log rate limit configured.",
643        internal_log_rate_secs = rate,
644    );
645    info!(message = "Log level is enabled.", level = ?level);
646}
647
648pub fn watcher_config(
649    method: WatchConfigMethod,
650    interval: NonZeroU64,
651) -> config::watcher::WatcherConfig {
652    match method {
653        WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
654        WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
655    }
656}