vector/
lib.rs

1#![recursion_limit = "256"] // for async-stream
2#![deny(unreachable_pub)]
3#![deny(unused_extern_crates)]
4#![deny(unused_allocation)]
5#![deny(unused_assignments)]
6#![deny(unused_comparisons)]
7#![deny(warnings)]
8#![deny(missing_docs)]
9#![cfg_attr(docsrs, feature(doc_cfg), deny(rustdoc::broken_intra_doc_links))]
10#![allow(async_fn_in_trait)]
11#![allow(clippy::approx_constant)]
12#![allow(clippy::float_cmp)]
13#![allow(clippy::match_wild_err_arm)]
14#![allow(clippy::new_ret_no_self)]
15#![allow(clippy::type_complexity)]
16#![allow(clippy::unit_arg)]
17#![deny(clippy::clone_on_ref_ptr)]
18#![deny(clippy::trivially_copy_pass_by_ref)]
19#![deny(clippy::disallowed_methods)] // [nursery] mark some functions as verboten
20#![deny(clippy::missing_const_for_fn)] // [nursery] valuable to the optimizer, but may produce false positives
21
22//! The main library to support building Vector.
23
24#[cfg(all(unix, feature = "sinks-socket"))]
25#[macro_use]
26extern crate cfg_if;
27#[macro_use]
28extern crate derivative;
29#[macro_use]
30extern crate tracing;
31#[macro_use]
32extern crate vector_lib;
33
34pub use indoc::indoc;
35// re-export codecs for convenience
36pub use vector_lib::codecs;
37
38#[cfg(all(
39    unix,
40    feature = "tikv-jemallocator",
41    not(feature = "allocation-tracing")
42))]
43#[global_allocator]
44static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
45
46#[cfg(all(unix, feature = "tikv-jemallocator", feature = "allocation-tracing"))]
47#[global_allocator]
48static ALLOC: self::internal_telemetry::allocations::Allocator<tikv_jemallocator::Jemalloc> =
49    self::internal_telemetry::allocations::get_grouped_tracing_allocator(
50        tikv_jemallocator::Jemalloc,
51    );
52
53#[allow(unreachable_pub)]
54pub mod internal_telemetry;
55
56#[macro_use]
57#[allow(unreachable_pub)]
58pub mod config;
59pub mod cli;
60#[allow(unreachable_pub)]
61pub mod components;
62pub mod conditions;
63pub mod dns;
64#[cfg(feature = "docker")]
65pub mod docker;
66pub mod expiring_hash_map;
67pub mod generate;
68pub mod generate_schema;
69#[macro_use]
70#[allow(unreachable_pub)]
71pub mod internal_events;
72#[cfg(feature = "lapin")]
73pub mod amqp;
74#[cfg(feature = "api")]
75#[allow(unreachable_pub)]
76pub mod api;
77pub mod app;
78pub mod async_read;
79#[cfg(feature = "aws-config")]
80pub mod aws;
81pub mod common;
82pub mod completion;
83mod convert_config;
84pub mod encoding_transcode;
85pub mod enrichment_tables;
86pub mod extra_context;
87#[cfg(feature = "gcp")]
88pub mod gcp;
89pub(crate) mod graph;
90pub mod heartbeat;
91pub mod http;
92#[allow(unreachable_pub)]
93#[cfg(any(feature = "sources-kafka", feature = "sinks-kafka"))]
94pub mod kafka;
95#[allow(unreachable_pub)]
96pub mod kubernetes;
97pub mod line_agg;
98pub mod list;
99#[cfg(any(feature = "sources-nats", feature = "sinks-nats"))]
100pub mod nats;
101pub mod net;
102#[allow(unreachable_pub)]
103pub(crate) mod proto;
104pub mod providers;
105pub mod secrets;
106pub mod serde;
107#[cfg(windows)]
108pub mod service;
109pub mod signal;
110pub(crate) mod sink_ext;
111#[allow(unreachable_pub)]
112pub mod sinks;
113#[allow(unreachable_pub)]
114pub mod sources;
115#[cfg(feature = "api-client")]
116#[allow(unreachable_pub)]
117pub mod tap;
118pub mod template;
119pub mod test_util;
120#[cfg(feature = "top")]
121pub mod top;
122#[allow(unreachable_pub)]
123pub mod topology;
124pub mod trace;
125#[allow(unreachable_pub)]
126pub mod transforms;
127pub mod types;
128pub mod unit_test;
129pub(crate) mod utilization;
130pub mod validate;
131#[cfg(windows)]
132pub mod vector_windows;
133
134pub use vector_lib::{
135    Error, Result, event, metrics, schema, shutdown, source_sender::SourceSender, tcp, tls,
136};
137
138static APP_NAME_SLUG: std::sync::OnceLock<String> = std::sync::OnceLock::new();
139static USE_COLOR: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
140
141/// The name used to identify this Vector application.
142///
143/// This can be set at compile-time through the VECTOR_APP_NAME env variable.
144/// Defaults to "Vector".
145pub fn get_app_name() -> &'static str {
146    option_env!("VECTOR_APP_NAME").unwrap_or("Vector")
147}
148
149/// Returns a slugified version of the name used to identify this Vector application.
150///
151/// Defaults to "vector".
152pub fn get_slugified_app_name() -> String {
153    APP_NAME_SLUG
154        .get_or_init(|| get_app_name().to_lowercase().replace(' ', "-"))
155        .clone()
156}
157
158/// Sets the global color preference for diagnostics and CLI output.
159/// This should be called once during application startup.
160pub fn set_global_color(enabled: bool) {
161    if let Err(e) = USE_COLOR.set(enabled) {
162        error!(message = "Failed to set global color.", %e);
163    }
164}
165
166/// Returns true if color output is globally enabled.
167/// Defaults to false if not set.
168pub fn use_color() -> bool {
169    *USE_COLOR.get_or_init(|| false)
170}
171
172/// Formats VRL diagnostics honoring the global color setting.
173pub fn format_vrl_diagnostics(
174    source: &str,
175    diagnostics: impl Into<vrl::diagnostic::DiagnosticList>,
176) -> String {
177    let formatter = vrl::diagnostic::Formatter::new(source, diagnostics);
178    if use_color() {
179        formatter.colored().to_string()
180    } else {
181        formatter.to_string()
182    }
183}
184
185/// The current version of Vector in simplified format.
186/// `<version-number>-nightly`.
187pub fn vector_version() -> impl std::fmt::Display {
188    #[cfg(feature = "nightly")]
189    let pkg_version = format!("{}-nightly", built_info::PKG_VERSION);
190
191    #[cfg(not(feature = "nightly"))]
192    let pkg_version = match built_info::DEBUG {
193        // If any debug info is included, consider it a non-release build.
194        "1" | "2" | "true" => {
195            format!(
196                "{}-custom-{}",
197                built_info::PKG_VERSION,
198                built_info::GIT_SHORT_HASH
199            )
200        }
201        _ => built_info::PKG_VERSION.to_string(),
202    };
203
204    pkg_version
205}
206
207/// Returns a string containing full version information of the current build.
208pub fn get_version() -> String {
209    let pkg_version = vector_version();
210    let build_desc = built_info::VECTOR_BUILD_DESC;
211    let build_string = match build_desc {
212        Some(desc) => format!("{} {}", built_info::TARGET, desc),
213        None => built_info::TARGET.into(),
214    };
215
216    // We do not add 'debug' to the BUILD_DESC unless the caller has flagged on line
217    // or full debug symbols. See the Cargo Book profiling section for value meaning:
218    // https://doc.rust-lang.org/cargo/reference/profiles.html#debug
219    let build_string = match built_info::DEBUG {
220        "1" => format!("{build_string} debug=line"),
221        "2" | "true" => format!("{build_string} debug=full"),
222        _ => build_string,
223    };
224
225    format!("{pkg_version} ({build_string})")
226}
227
228/// Includes information about the current build.
229#[allow(warnings)]
230pub mod built_info {
231    include!(concat!(env!("OUT_DIR"), "/built.rs"));
232}
233
234/// Returns the host name of the current system.
235/// The hostname can be overridden by setting the VECTOR_HOSTNAME environment variable.
236pub fn get_hostname() -> std::io::Result<String> {
237    Ok(if let Ok(hostname) = std::env::var("VECTOR_HOSTNAME") {
238        hostname.to_string()
239    } else {
240        hostname::get()?.to_string_lossy().into_owned()
241    })
242}
243
244/// Spawn a task with the given name. The name is only used if
245/// built with [`tokio_unstable`][tokio_unstable].
246///
247/// [tokio_unstable]: https://docs.rs/tokio/latest/tokio/#unstable-features
248#[track_caller]
249pub(crate) fn spawn_named<T>(
250    task: impl std::future::Future<Output = T> + Send + 'static,
251    _name: &str,
252) -> tokio::task::JoinHandle<T>
253where
254    T: Send + 'static,
255{
256    #[cfg(tokio_unstable)]
257    return tokio::task::Builder::new()
258        .name(_name)
259        .spawn(task)
260        .expect("tokio task should spawn");
261
262    #[cfg(not(tokio_unstable))]
263    tokio::spawn(task)
264}
265
266/// Returns an estimate of the number of recommended threads that Vector should spawn.
267pub fn num_threads() -> usize {
268    let count = match std::thread::available_parallelism() {
269        Ok(count) => count,
270        Err(error) => {
271            warn!(message = "Failed to determine available parallelism for thread count, defaulting to 1.", %error);
272            std::num::NonZeroUsize::new(1).unwrap()
273        }
274    };
275    usize::from(count)
276}