vector/
app.rs

1#![allow(missing_docs)]
2use std::{
3    num::{NonZeroU64, NonZeroUsize},
4    path::PathBuf,
5    process::ExitStatus,
6    sync::atomic::{AtomicUsize, Ordering},
7    time::Duration,
8};
9
10use exitcode::ExitCode;
11use futures::StreamExt;
12use tokio::runtime::{self, Runtime};
13use tokio::sync::{broadcast::error::RecvError, MutexGuard};
14use tokio_stream::wrappers::UnboundedReceiverStream;
15
16#[cfg(feature = "api")]
17use crate::{api, internal_events::ApiStarted};
18use crate::{
19    cli::{handle_config_errors, LogFormat, Opts, RootOpts, WatchConfigMethod},
20    config::{self, ComponentConfig, Config, ConfigPath},
21    heartbeat,
22    internal_events::{VectorConfigLoadError, VectorQuit, VectorStarted, VectorStopped},
23    signal::{SignalHandler, SignalPair, SignalRx, SignalTo},
24    topology::{
25        ReloadOutcome, RunningTopology, SharedTopologyController, ShutdownErrorReceiver,
26        TopologyController,
27    },
28    trace,
29};
30use crate::{config::ComponentType, extra_context::ExtraContext};
31
32#[cfg(unix)]
33use std::os::unix::process::ExitStatusExt;
34#[cfg(windows)]
35use std::os::windows::process::ExitStatusExt;
36use tokio::runtime::Handle;
37
38static WORKER_THREADS: AtomicUsize = AtomicUsize::new(0);
39
40pub fn worker_threads() -> Option<NonZeroUsize> {
41    NonZeroUsize::new(WORKER_THREADS.load(Ordering::Relaxed))
42}
43
44pub struct ApplicationConfig {
45    pub config_paths: Vec<config::ConfigPath>,
46    pub topology: RunningTopology,
47    pub graceful_crash_receiver: ShutdownErrorReceiver,
48    pub internal_topologies: Vec<RunningTopology>,
49    #[cfg(feature = "api")]
50    pub api: config::api::Options,
51    pub extra_context: ExtraContext,
52}
53
54pub struct Application {
55    pub root_opts: RootOpts,
56    pub config: ApplicationConfig,
57    pub signals: SignalPair,
58}
59
60impl ApplicationConfig {
61    pub async fn from_opts(
62        opts: &RootOpts,
63        signal_handler: &mut SignalHandler,
64        extra_context: ExtraContext,
65    ) -> Result<Self, ExitCode> {
66        let config_paths = opts.config_paths_with_formats();
67
68        let graceful_shutdown_duration = (!opts.no_graceful_shutdown_limit)
69            .then(|| Duration::from_secs(u64::from(opts.graceful_shutdown_limit_secs)));
70
71        let watcher_conf = if opts.watch_config {
72            Some(watcher_config(
73                opts.watch_config_method,
74                opts.watch_config_poll_interval_seconds,
75            ))
76        } else {
77            None
78        };
79
80        let config = load_configs(
81            &config_paths,
82            watcher_conf,
83            opts.require_healthy,
84            opts.allow_empty_config,
85            graceful_shutdown_duration,
86            signal_handler,
87        )
88        .await?;
89
90        Self::from_config(config_paths, config, extra_context).await
91    }
92
93    pub async fn from_config(
94        config_paths: Vec<ConfigPath>,
95        config: Config,
96        extra_context: ExtraContext,
97    ) -> Result<Self, ExitCode> {
98        #[cfg(feature = "api")]
99        let api = config.api;
100
101        let (topology, graceful_crash_receiver) =
102            RunningTopology::start_init_validated(config, extra_context.clone())
103                .await
104                .ok_or(exitcode::CONFIG)?;
105
106        Ok(Self {
107            config_paths,
108            topology,
109            graceful_crash_receiver,
110            internal_topologies: Vec::new(),
111            #[cfg(feature = "api")]
112            api,
113            extra_context,
114        })
115    }
116
117    pub async fn add_internal_config(
118        &mut self,
119        config: Config,
120        extra_context: ExtraContext,
121    ) -> Result<(), ExitCode> {
122        let Some((topology, _)) =
123            RunningTopology::start_init_validated(config, extra_context).await
124        else {
125            return Err(exitcode::CONFIG);
126        };
127        self.internal_topologies.push(topology);
128        Ok(())
129    }
130
131    /// Configure the API server, if applicable
132    #[cfg(feature = "api")]
133    pub fn setup_api(&self, handle: &Handle) -> Option<api::Server> {
134        if self.api.enabled {
135            match api::Server::start(
136                self.topology.config(),
137                self.topology.watch(),
138                std::sync::Arc::clone(&self.topology.running),
139                handle,
140            ) {
141                Ok(api_server) => {
142                    emit!(ApiStarted {
143                        addr: self.api.address.unwrap(),
144                        playground: self.api.playground,
145                        graphql: self.api.graphql
146                    });
147
148                    Some(api_server)
149                }
150                Err(error) => {
151                    let error = error.to_string();
152                    error!("An error occurred that Vector couldn't handle: {}.", error);
153                    _ = self
154                        .topology
155                        .abort_tx
156                        .send(crate::signal::ShutdownError::ApiFailed { error });
157                    None
158                }
159            }
160        } else {
161            info!(message="API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.");
162            None
163        }
164    }
165}
166
167impl Application {
168    pub fn run(extra_context: ExtraContext) -> ExitStatus {
169        let (runtime, app) =
170            Self::prepare_start(extra_context).unwrap_or_else(|code| std::process::exit(code));
171
172        runtime.block_on(app.run())
173    }
174
175    pub fn prepare_start(
176        extra_context: ExtraContext,
177    ) -> Result<(Runtime, StartedApplication), ExitCode> {
178        Self::prepare(extra_context)
179            .and_then(|(runtime, app)| app.start(runtime.handle()).map(|app| (runtime, app)))
180    }
181
182    pub fn prepare(extra_context: ExtraContext) -> Result<(Runtime, Self), ExitCode> {
183        let opts = Opts::get_matches().map_err(|error| {
184            // Printing to stdout/err can itself fail; ignore it.
185            _ = error.print();
186            exitcode::USAGE
187        })?;
188
189        Self::prepare_from_opts(opts, extra_context)
190    }
191
192    pub fn prepare_from_opts(
193        opts: Opts,
194        extra_context: ExtraContext,
195    ) -> Result<(Runtime, Self), ExitCode> {
196        opts.root.init_global();
197
198        let color = opts.root.color.use_color();
199
200        init_logging(
201            color,
202            opts.root.log_format,
203            opts.log_level(),
204            opts.root.internal_log_rate_limit,
205        );
206
207        // Can only log this after initializing the logging subsystem
208        if opts.root.openssl_no_probe {
209            debug!(message = "Disabled probing and configuration of root certificate locations on the system for OpenSSL.");
210        }
211
212        let runtime = build_runtime(opts.root.threads, "vector-worker")?;
213
214        // Signal handler for OS and provider messages.
215        let mut signals = SignalPair::new(&runtime);
216
217        if let Some(sub_command) = &opts.sub_command {
218            return Err(runtime.block_on(sub_command.execute(signals, color)));
219        }
220
221        let config = runtime.block_on(ApplicationConfig::from_opts(
222            &opts.root,
223            &mut signals.handler,
224            extra_context,
225        ))?;
226
227        Ok((
228            runtime,
229            Self {
230                root_opts: opts.root,
231                config,
232                signals,
233            },
234        ))
235    }
236
237    pub fn start(self, handle: &Handle) -> Result<StartedApplication, ExitCode> {
238        // Any internal_logs sources will have grabbed a copy of the
239        // early buffer by this point and set up a subscriber.
240        crate::trace::stop_early_buffering();
241
242        emit!(VectorStarted);
243        handle.spawn(heartbeat::heartbeat());
244
245        let Self {
246            root_opts,
247            config,
248            signals,
249        } = self;
250
251        let topology_controller = SharedTopologyController::new(TopologyController {
252            #[cfg(feature = "api")]
253            api_server: config.setup_api(handle),
254            topology: config.topology,
255            config_paths: config.config_paths.clone(),
256            require_healthy: root_opts.require_healthy,
257            extra_context: config.extra_context,
258        });
259
260        Ok(StartedApplication {
261            config_paths: config.config_paths,
262            internal_topologies: config.internal_topologies,
263            graceful_crash_receiver: config.graceful_crash_receiver,
264            signals,
265            topology_controller,
266            allow_empty_config: root_opts.allow_empty_config,
267        })
268    }
269}
270
271pub struct StartedApplication {
272    pub config_paths: Vec<ConfigPath>,
273    pub internal_topologies: Vec<RunningTopology>,
274    pub graceful_crash_receiver: ShutdownErrorReceiver,
275    pub signals: SignalPair,
276    pub topology_controller: SharedTopologyController,
277    pub allow_empty_config: bool,
278}
279
280impl StartedApplication {
281    pub async fn run(self) -> ExitStatus {
282        self.main().await.shutdown().await
283    }
284
285    pub async fn main(self) -> FinishedApplication {
286        let Self {
287            config_paths,
288            graceful_crash_receiver,
289            signals,
290            topology_controller,
291            internal_topologies,
292            allow_empty_config,
293        } = self;
294
295        let mut graceful_crash = UnboundedReceiverStream::new(graceful_crash_receiver);
296
297        let mut signal_handler = signals.handler;
298        let mut signal_rx = signals.receiver;
299
300        let signal = loop {
301            let has_sources = !topology_controller.lock().await.topology.config.is_empty();
302            tokio::select! {
303                signal = signal_rx.recv() => if let Some(signal) = handle_signal(
304                    signal,
305                    &topology_controller,
306                    &config_paths,
307                    &mut signal_handler,
308                    allow_empty_config,
309                ).await {
310                    break signal;
311                },
312                // Trigger graceful shutdown if a component crashed, or all sources have ended.
313                error = graceful_crash.next() => break SignalTo::Shutdown(error),
314                _ = TopologyController::sources_finished(topology_controller.clone()), if has_sources => {
315                    info!("All sources have finished.");
316                    break SignalTo::Shutdown(None)
317                } ,
318                else => unreachable!("Signal streams never end"),
319            }
320        };
321
322        FinishedApplication {
323            signal,
324            signal_rx,
325            topology_controller,
326            internal_topologies,
327        }
328    }
329}
330
331async fn handle_signal(
332    signal: Result<SignalTo, RecvError>,
333    topology_controller: &SharedTopologyController,
334    config_paths: &[ConfigPath],
335    signal_handler: &mut SignalHandler,
336    allow_empty_config: bool,
337) -> Option<SignalTo> {
338    match signal {
339        Ok(SignalTo::ReloadComponents(components_to_reload)) => {
340            let mut topology_controller = topology_controller.lock().await;
341            topology_controller
342                .topology
343                .extend_reload_set(components_to_reload);
344
345            // Reload paths
346            if let Some(paths) = config::process_paths(config_paths) {
347                topology_controller.config_paths = paths;
348            }
349
350            // Reload config
351            let new_config = config::load_from_paths_with_provider_and_secrets(
352                &topology_controller.config_paths,
353                signal_handler,
354                allow_empty_config,
355            )
356            .await;
357
358            reload_config_from_result(topology_controller, new_config).await
359        }
360        Ok(SignalTo::ReloadFromConfigBuilder(config_builder)) => {
361            let topology_controller = topology_controller.lock().await;
362            reload_config_from_result(topology_controller, config_builder.build()).await
363        }
364        Ok(SignalTo::ReloadFromDisk) => {
365            let mut topology_controller = topology_controller.lock().await;
366
367            // Reload paths
368            if let Some(paths) = config::process_paths(config_paths) {
369                topology_controller.config_paths = paths;
370            }
371
372            // Reload config
373            let new_config = config::load_from_paths_with_provider_and_secrets(
374                &topology_controller.config_paths,
375                signal_handler,
376                allow_empty_config,
377            )
378            .await;
379
380            reload_config_from_result(topology_controller, new_config).await
381        }
382        Ok(SignalTo::ReloadEnrichmentTables) => {
383            let topology_controller = topology_controller.lock().await;
384
385            topology_controller
386                .topology
387                .reload_enrichment_tables()
388                .await;
389            None
390        }
391        Err(RecvError::Lagged(amt)) => {
392            warn!("Overflow, dropped {} signals.", amt);
393            None
394        }
395        Err(RecvError::Closed) => Some(SignalTo::Shutdown(None)),
396        Ok(signal) => Some(signal),
397    }
398}
399
400async fn reload_config_from_result(
401    mut topology_controller: MutexGuard<'_, TopologyController>,
402    config: Result<Config, Vec<String>>,
403) -> Option<SignalTo> {
404    match config {
405        Ok(new_config) => match topology_controller.reload(new_config).await {
406            ReloadOutcome::FatalError(error) => Some(SignalTo::Shutdown(Some(error))),
407            _ => None,
408        },
409        Err(errors) => {
410            handle_config_errors(errors);
411            emit!(VectorConfigLoadError);
412            None
413        }
414    }
415}
416
417pub struct FinishedApplication {
418    pub signal: SignalTo,
419    pub signal_rx: SignalRx,
420    pub topology_controller: SharedTopologyController,
421    pub internal_topologies: Vec<RunningTopology>,
422}
423
424impl FinishedApplication {
425    pub async fn shutdown(self) -> ExitStatus {
426        let FinishedApplication {
427            signal,
428            signal_rx,
429            topology_controller,
430            internal_topologies,
431        } = self;
432
433        // At this point, we'll have the only reference to the shared topology controller and can
434        // safely remove it from the wrapper to shut down the topology.
435        let topology_controller = topology_controller
436            .try_into_inner()
437            .expect("fail to unwrap topology controller")
438            .into_inner();
439
440        let status = match signal {
441            SignalTo::Shutdown(_) => Self::stop(topology_controller, signal_rx).await,
442            SignalTo::Quit => Self::quit(),
443            _ => unreachable!(),
444        };
445
446        for topology in internal_topologies {
447            topology.stop().await;
448        }
449
450        status
451    }
452
453    async fn stop(topology_controller: TopologyController, mut signal_rx: SignalRx) -> ExitStatus {
454        emit!(VectorStopped);
455        tokio::select! {
456            _ = topology_controller.stop() => ExitStatus::from_raw({
457                #[cfg(windows)]
458                {
459                    exitcode::OK as u32
460                }
461                #[cfg(unix)]
462                exitcode::OK
463            }), // Graceful shutdown finished
464            _ = signal_rx.recv() => Self::quit(),
465        }
466    }
467
468    fn quit() -> ExitStatus {
469        // It is highly unlikely that this event will exit from topology.
470        emit!(VectorQuit);
471        ExitStatus::from_raw({
472            #[cfg(windows)]
473            {
474                exitcode::UNAVAILABLE as u32
475            }
476            #[cfg(unix)]
477            exitcode::OK
478        })
479    }
480}
481
482fn get_log_levels(default: &str) -> String {
483    std::env::var("VECTOR_LOG")
484        .or_else(|_| {
485            std::env::var("LOG").inspect(|_log| {
486                warn!(
487                    message =
488                        "DEPRECATED: Use of $LOG is deprecated. Please use $VECTOR_LOG instead."
489                );
490            })
491        })
492        .unwrap_or_else(|_| default.into())
493}
494
495pub fn build_runtime(threads: Option<usize>, thread_name: &str) -> Result<Runtime, ExitCode> {
496    let mut rt_builder = runtime::Builder::new_multi_thread();
497    rt_builder.max_blocking_threads(20_000);
498    rt_builder.enable_all().thread_name(thread_name);
499
500    let threads = threads.unwrap_or_else(crate::num_threads);
501    if threads == 0 {
502        error!("The `threads` argument must be greater or equal to 1.");
503        return Err(exitcode::CONFIG);
504    }
505    WORKER_THREADS
506        .compare_exchange(0, threads, Ordering::Acquire, Ordering::Relaxed)
507        .unwrap_or_else(|_| panic!("double thread initialization"));
508    rt_builder.worker_threads(threads);
509
510    debug!(messaged = "Building runtime.", worker_threads = threads);
511    Ok(rt_builder.build().expect("Unable to create async runtime"))
512}
513
514pub async fn load_configs(
515    config_paths: &[ConfigPath],
516    watcher_conf: Option<config::watcher::WatcherConfig>,
517    require_healthy: Option<bool>,
518    allow_empty_config: bool,
519    graceful_shutdown_duration: Option<Duration>,
520    signal_handler: &mut SignalHandler,
521) -> Result<Config, ExitCode> {
522    let config_paths = config::process_paths(config_paths).ok_or(exitcode::CONFIG)?;
523
524    let watched_paths = config_paths
525        .iter()
526        .map(<&PathBuf>::from)
527        .collect::<Vec<_>>();
528
529    info!(
530        message = "Loading configs.",
531        paths = ?watched_paths
532    );
533
534    let mut config = config::load_from_paths_with_provider_and_secrets(
535        &config_paths,
536        signal_handler,
537        allow_empty_config,
538    )
539    .await
540    .map_err(handle_config_errors)?;
541
542    let mut watched_component_paths = Vec::new();
543
544    if let Some(watcher_conf) = watcher_conf {
545        for (name, transform) in config.transforms() {
546            let files = transform.inner.files_to_watch();
547            let component_config = ComponentConfig::new(
548                files.into_iter().cloned().collect(),
549                name.clone(),
550                ComponentType::Transform,
551            );
552            watched_component_paths.push(component_config);
553        }
554
555        for (name, sink) in config.sinks() {
556            let files = sink.inner.files_to_watch();
557            let component_config = ComponentConfig::new(
558                files.into_iter().cloned().collect(),
559                name.clone(),
560                ComponentType::Sink,
561            );
562            watched_component_paths.push(component_config);
563        }
564
565        for (name, table) in config.enrichment_tables() {
566            let files = table.inner.files_to_watch();
567            let component_config = ComponentConfig::new(
568                files.into_iter().cloned().collect(),
569                name.clone(),
570                ComponentType::EnrichmentTable,
571            );
572            watched_component_paths.push(component_config);
573        }
574
575        info!(
576            message = "Starting watcher.",
577            paths = ?watched_paths
578        );
579        info!(
580            message = "Components to watch.",
581            paths = ?watched_component_paths
582        );
583
584        // Start listening for config changes.
585        config::watcher::spawn_thread(
586            watcher_conf,
587            signal_handler.clone_tx(),
588            watched_paths,
589            watched_component_paths,
590            None,
591        )
592        .map_err(|error| {
593            error!(message = "Unable to start config watcher.", %error);
594            exitcode::CONFIG
595        })?;
596    }
597
598    config::init_log_schema(config.global.log_schema.clone(), true);
599    config::init_telemetry(config.global.telemetry.clone(), true);
600
601    if !config.healthchecks.enabled {
602        info!("Health checks are disabled.");
603    }
604    config.healthchecks.set_require_healthy(require_healthy);
605    config.graceful_shutdown_duration = graceful_shutdown_duration;
606
607    Ok(config)
608}
609
610pub fn init_logging(color: bool, format: LogFormat, log_level: &str, rate: u64) {
611    let level = get_log_levels(log_level);
612    let json = match format {
613        LogFormat::Text => false,
614        LogFormat::Json => true,
615    };
616
617    trace::init(color, json, &level, rate);
618    debug!(
619        message = "Internal log rate limit configured.",
620        internal_log_rate_secs = rate,
621    );
622    info!(message = "Log level is enabled.", level = ?level);
623}
624
625pub fn watcher_config(
626    method: WatchConfigMethod,
627    interval: NonZeroU64,
628) -> config::watcher::WatcherConfig {
629    match method {
630        WatchConfigMethod::Recommended => config::watcher::WatcherConfig::RecommendedWatcher,
631        WatchConfigMethod::Poll => config::watcher::WatcherConfig::PollWatcher(interval.into()),
632    }
633}