vector_core/config/
telemetry.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
use cfg_if::cfg_if;
use vector_common::request_metadata::GroupedCountByteSize;
use vector_config::configurable_component;

cfg_if! {
    // The telemetry code assumes a process wide singleton. When running `cargo test`,
    // multiple threads might try to read/write this global.
    if #[cfg(any(test, feature = "test"))] {
        use std::sync::{Arc, Mutex};

        thread_local! {
            static TELEMETRY: Arc<Mutex<Option<Telemetry>>> = Arc::new(Mutex::new(None));
        }

        /// Test implementation.
        ///
        /// # Panics
        ///
        /// If deny is set, will panic if telemetry has already been set.
        /// Also, panics if the lock is poisoned.
        pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) {
            TELEMETRY.with(|tl| {
                let mut tl = tl.lock().expect("telemetry lock poisoned");
                assert!(!(tl.is_some() && deny_if_set), "Couldn't set telemetry");
                *tl = Some(telemetry);
            });
        }

        /// Test implementation.
        ///
        /// # Panics
        ///
        /// If the lock is poisoned.
         pub fn telemetry() -> Telemetry {
            TELEMETRY.with(|tl| {
               let mut tl = tl.lock().expect("telemetry lock poisoned");
                // For non-test code we return `TELEMETRY_DEFAULT`.
                // For test code, we will instantiate a default instance per thread.
                if tl.is_none() {
                    *tl = Some(Telemetry::default());
                }
                tl.clone().unwrap()
            })
        }
    }
    else {
        use std::sync::{LazyLock, OnceLock};

        static TELEMETRY: OnceLock<Telemetry> = OnceLock::new();
        static TELEMETRY_DEFAULT: LazyLock<Telemetry> = LazyLock::new(Telemetry::default);

        /// Loads the telemetry options from configurations and sets the global options.
        /// Once this is done, configurations can be correctly loaded using configured
        /// log schema defaults.
        ///
        /// # Errors
        ///
        /// This function will fail if the `builder` fails.
        ///
        /// # Panics
        ///
        /// If deny is set, will panic if telemetry has already been set.
        pub fn init_telemetry(telemetry: Telemetry, deny_if_set: bool) {
            assert!(
                !(TELEMETRY.set(telemetry).is_err() && deny_if_set),
                "Couldn't set telemetry"
            );
        }

        /// Returns the telemetry configuration options.
        pub fn telemetry() -> &'static Telemetry {
            TELEMETRY.get().unwrap_or(&TELEMETRY_DEFAULT)
        }
    }
}

/// Sets options for the telemetry that Vector emits.
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq, Default)]
#[serde(default)]
pub struct Telemetry {
    #[configurable(derived)]
    pub tags: Tags,
}

impl Telemetry {
    /// Merge two `Telemetry` instances together.
    pub fn merge(&mut self, other: &Telemetry) {
        self.tags.emit_service = self.tags.emit_service || other.tags.emit_service;
        self.tags.emit_source = self.tags.emit_source || other.tags.emit_source;
    }

    /// Returns true if any of the tag options are true.
    pub fn has_tags(&self) -> bool {
        self.tags.emit_service || self.tags.emit_source
    }

    pub fn tags(&self) -> &Tags {
        &self.tags
    }

    /// The variant of `GroupedCountByteSize`
    pub fn create_request_count_byte_size(&self) -> GroupedCountByteSize {
        if self.has_tags() {
            GroupedCountByteSize::new_tagged()
        } else {
            GroupedCountByteSize::new_untagged()
        }
    }
}

/// Configures whether to emit certain tags
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq, Default)]
#[serde(default)]
pub struct Tags {
    /// True if the `service` tag should be emitted
    /// in the `component_received_*` and `component_sent_*`
    /// telemetry.
    pub emit_service: bool,

    /// True if the `source` tag should be emitted
    /// in the `component_received_*` and `component_sent_*`
    /// telemetry.
    pub emit_source: bool,
}

#[cfg(test)]
mod test {
    use super::*;

    #[test]
    fn partial_telemetry() {
        let toml = r"
            emit_source = true
        ";
        toml::from_str::<Telemetry>(toml).unwrap();
    }
}