vector_core/config/
telemetry.rs1use cfg_if::cfg_if;
2use vector_common::request_metadata::GroupedCountByteSize;
3use vector_config::configurable_component;
4
5cfg_if! {
6 if #[cfg(any(test, feature = "test"))] {
9 use std::sync::{Arc, Mutex};
10
11 thread_local! {
12 static TELEMETRY: Arc<Mutex<Option<Telemetry>>> = Arc::new(Mutex::new(None));
13 }
14
15 pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) {
22 TELEMETRY.with(|tl| {
23 let mut tl = tl.lock().expect("telemetry lock poisoned");
24 assert!(!(tl.is_some() && deny_if_set), "Couldn't set telemetry");
25 *tl = Some(telemetry);
26 });
27 }
28
29 pub fn telemetry() -> Telemetry {
35 TELEMETRY.with(|tl| {
36 let mut tl = tl.lock().expect("telemetry lock poisoned");
37 if tl.is_none() {
40 *tl = Some(Telemetry::default());
41 }
42 tl.clone().unwrap()
43 })
44 }
45 }
46 else {
47 use std::sync::{LazyLock, OnceLock};
48
49 static TELEMETRY: OnceLock<Telemetry> = OnceLock::new();
50 static TELEMETRY_DEFAULT: LazyLock<Telemetry> = LazyLock::new(Telemetry::default);
51
52 pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) {
64 assert!(
65 !(TELEMETRY.set(telemetry).is_err() && deny_if_set),
66 "Couldn't set telemetry"
67 );
68 }
69
70 pub fn telemetry() -> &'static Telemetry {
72 TELEMETRY.get().unwrap_or(&TELEMETRY_DEFAULT)
73 }
74 }
75}
76
77#[configurable_component]
79#[derive(Clone, Debug, Eq, PartialEq, Default)]
80#[serde(default)]
81pub struct Telemetry {
82 #[configurable(derived)]
83 pub tags: Tags,
84}
85
86impl Telemetry {
87 pub fn merge(&mut self, other: &Telemetry) {
89 self.tags.emit_service = self.tags.emit_service || other.tags.emit_service;
90 self.tags.emit_source = self.tags.emit_source || other.tags.emit_source;
91 }
92
93 pub fn has_tags(&self) -> bool {
95 self.tags.emit_service || self.tags.emit_source
96 }
97
98 pub fn tags(&self) -> &Tags {
99 &self.tags
100 }
101
102 pub fn create_request_count_byte_size(&self) -> GroupedCountByteSize {
104 if self.has_tags() {
105 GroupedCountByteSize::new_tagged()
106 } else {
107 GroupedCountByteSize::new_untagged()
108 }
109 }
110}
111
112#[configurable_component]
114#[derive(Clone, Debug, Eq, PartialEq, Default)]
115#[serde(default)]
116pub struct Tags {
117 pub emit_service: bool,
121
122 pub emit_source: bool,
126}
127
128#[cfg(test)]
129mod test {
130 use super::*;
131
132 #[test]
133 fn partial_telemetry() {
134 let toml = r"
135 emit_source = true
136 ";
137 toml::from_str::<Telemetry>(toml).unwrap();
138 }
139}