vector_core/config/
telemetry.rs

1use cfg_if::cfg_if;
2use vector_common::request_metadata::GroupedCountByteSize;
3use vector_config::configurable_component;
4
5cfg_if! {
6    // The telemetry code assumes a process wide singleton. When running `cargo test`,
7    // multiple threads might try to read/write this global.
8    if #[cfg(any(test, feature = "test"))] {
9        use std::sync::{Arc, Mutex};
10
11        thread_local! {
12            static TELEMETRY: Arc<Mutex<Option<Telemetry>>> = Arc::new(Mutex::new(None));
13        }
14
15        /// Test implementation.
16        ///
17        /// # Panics
18        ///
19        /// If deny is set, will panic if telemetry has already been set.
20        /// Also, panics if the lock is poisoned.
21        pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) {
22            TELEMETRY.with(|tl| {
23                let mut tl = tl.lock().expect("telemetry lock poisoned");
24                assert!(!(tl.is_some() && deny_if_set), "Couldn't set telemetry");
25                *tl = Some(telemetry);
26            });
27        }
28
29        /// Test implementation.
30        ///
31        /// # Panics
32        ///
33        /// If the lock is poisoned.
34         pub fn telemetry() -> Telemetry {
35            TELEMETRY.with(|tl| {
36               let mut tl = tl.lock().expect("telemetry lock poisoned");
37                // For non-test code we return `TELEMETRY_DEFAULT`.
38                // For test code, we will instantiate a default instance per thread.
39                if tl.is_none() {
40                    *tl = Some(Telemetry::default());
41                }
42                tl.clone().unwrap()
43            })
44        }
45    }
46    else {
47        use std::sync::{LazyLock, OnceLock};
48
49        static TELEMETRY: OnceLock<Telemetry> = OnceLock::new();
50        static TELEMETRY_DEFAULT: LazyLock<Telemetry> = LazyLock::new(Telemetry::default);
51
52        /// Loads the telemetry options from configurations and sets the global options.
53        /// Once this is done, configurations can be correctly loaded using configured
54        /// log schema defaults.
55        ///
56        /// # Errors
57        ///
58        /// This function will fail if the `builder` fails.
59        ///
60        /// # Panics
61        ///
62        /// If deny is set, will panic if telemetry has already been set.
63        pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) {
64            assert!(
65                !(TELEMETRY.set(telemetry).is_err() && deny_if_set),
66                "Couldn't set telemetry"
67            );
68        }
69
70        /// Returns the telemetry configuration options.
71        pub fn telemetry() -> &'static Telemetry {
72            TELEMETRY.get().unwrap_or(&TELEMETRY_DEFAULT)
73        }
74    }
75}
76
77/// Sets options for the telemetry that Vector emits.
78#[configurable_component]
79#[derive(Clone, Debug, Eq, PartialEq, Default)]
80#[serde(default)]
81pub struct Telemetry {
82    #[configurable(derived)]
83    pub tags: Tags,
84}
85
86impl Telemetry {
87    /// Merge two `Telemetry` instances together.
88    pub fn merge(&mut self, other: &Telemetry) {
89        self.tags.emit_service = self.tags.emit_service || other.tags.emit_service;
90        self.tags.emit_source = self.tags.emit_source || other.tags.emit_source;
91    }
92
93    /// Returns true if any of the tag options are true.
94    pub fn has_tags(&self) -> bool {
95        self.tags.emit_service || self.tags.emit_source
96    }
97
98    pub fn tags(&self) -> &Tags {
99        &self.tags
100    }
101
102    /// The variant of `GroupedCountByteSize`
103    pub fn create_request_count_byte_size(&self) -> GroupedCountByteSize {
104        if self.has_tags() {
105            GroupedCountByteSize::new_tagged()
106        } else {
107            GroupedCountByteSize::new_untagged()
108        }
109    }
110}
111
112/// Configures whether to emit certain tags
113#[configurable_component]
114#[derive(Clone, Debug, Eq, PartialEq, Default)]
115#[serde(default)]
116pub struct Tags {
117    /// True if the `service` tag should be emitted
118    /// in the `component_received_*` and `component_sent_*`
119    /// telemetry.
120    pub emit_service: bool,
121
122    /// True if the `source` tag should be emitted
123    /// in the `component_received_*` and `component_sent_*`
124    /// telemetry.
125    pub emit_source: bool,
126}
127
128#[cfg(test)]
129mod test {
130    use super::*;
131
132    #[test]
133    fn partial_telemetry() {
134        let toml = r"
135            emit_source = true
136        ";
137        toml::from_str::<Telemetry>(toml).unwrap();
138    }
139}