vector/sources/
internal_metrics.rs

1use std::time::Duration;
2
3use futures::StreamExt;
4use serde_with::serde_as;
5use tokio::time;
6use tokio_stream::wrappers::IntervalStream;
7use vector_lib::configurable::configurable_component;
8use vector_lib::internal_event::{
9    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
10};
11use vector_lib::lookup::lookup_v2::OptionalValuePath;
12use vector_lib::{config::LogNamespace, ByteSizeOf, EstimatedJsonEncodedSizeOf};
13
14use crate::{
15    config::{log_schema, SourceConfig, SourceContext, SourceOutput},
16    internal_events::{EventsReceived, StreamClosedError},
17    metrics::Controller,
18    shutdown::ShutdownSignal,
19    SourceSender,
20};
21
22/// Configuration for the `internal_metrics` source.
23#[serde_as]
24#[configurable_component(source(
25    "internal_metrics",
26    "Expose internal metrics emitted by the running Vector instance."
27))]
28#[derive(Clone, Debug)]
29#[serde(deny_unknown_fields, default)]
30pub struct InternalMetricsConfig {
31    /// The interval between metric gathering, in seconds.
32    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
33    #[serde(default = "default_scrape_interval")]
34    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
35    pub scrape_interval_secs: Duration,
36
37    #[configurable(derived)]
38    pub tags: TagsConfig,
39
40    /// Overrides the default namespace for the metrics emitted by the source.
41    #[serde(default = "default_namespace")]
42    pub namespace: String,
43}
44
45impl Default for InternalMetricsConfig {
46    fn default() -> Self {
47        Self {
48            scrape_interval_secs: default_scrape_interval(),
49            tags: TagsConfig::default(),
50            namespace: default_namespace(),
51        }
52    }
53}
54
55/// Tag configuration for the `internal_metrics` source.
56#[configurable_component]
57#[derive(Clone, Debug, Default)]
58#[serde(deny_unknown_fields, default)]
59pub struct TagsConfig {
60    /// Overrides the name of the tag used to add the peer host to each metric.
61    ///
62    /// The value is the peer host's address, including the port. For example, `1.2.3.4:9000`.
63    ///
64    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
65    ///
66    /// Set to `""` to suppress this key.
67    ///
68    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
69    pub host_key: Option<OptionalValuePath>,
70
71    /// Sets the name of the tag to use to add the current process ID to each metric.
72    ///
73    ///
74    /// By default, this is not set and the tag is not automatically added.
75    #[configurable(metadata(docs::examples = "pid"))]
76    pub pid_key: Option<String>,
77}
78
79fn default_scrape_interval() -> Duration {
80    Duration::from_secs_f64(1.0)
81}
82
83fn default_namespace() -> String {
84    "vector".to_owned()
85}
86
87impl_generate_config_from_default!(InternalMetricsConfig);
88
89#[async_trait::async_trait]
90#[typetag::serde(name = "internal_metrics")]
91impl SourceConfig for InternalMetricsConfig {
92    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
93        if self.scrape_interval_secs.is_zero() {
94            warn!(
95                "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
96            );
97        }
98        let interval = self.scrape_interval_secs;
99
100        // namespace for created metrics is already "vector" by default.
101        let namespace = self.namespace.clone();
102
103        let host_key = self
104            .tags
105            .host_key
106            .clone()
107            .unwrap_or(log_schema().host_key().cloned().into());
108
109        let pid_key = self
110            .tags
111            .pid_key
112            .as_deref()
113            .and_then(|tag| (!tag.is_empty()).then(|| tag.to_owned()));
114
115        Ok(Box::pin(
116            InternalMetrics {
117                namespace,
118                host_key,
119                pid_key,
120                controller: Controller::get()?,
121                interval,
122                out: cx.out,
123                shutdown: cx.shutdown,
124            }
125            .run(),
126        ))
127    }
128
129    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
130        vec![SourceOutput::new_metrics()]
131    }
132
133    fn can_acknowledge(&self) -> bool {
134        false
135    }
136}
137
138struct InternalMetrics<'a> {
139    namespace: String,
140    host_key: OptionalValuePath,
141    pid_key: Option<String>,
142    controller: &'a Controller,
143    interval: time::Duration,
144    out: SourceSender,
145    shutdown: ShutdownSignal,
146}
147
148impl InternalMetrics<'_> {
149    async fn run(mut self) -> Result<(), ()> {
150        let events_received = register!(EventsReceived);
151        let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
152        let mut interval =
153            IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
154        while interval.next().await.is_some() {
155            let hostname = crate::get_hostname();
156            let pid = std::process::id().to_string();
157
158            let metrics = self.controller.capture_metrics();
159            let count = metrics.len();
160            let byte_size = metrics.size_of();
161            let json_size = metrics.estimated_json_encoded_size_of();
162
163            bytes_received.emit(ByteSize(byte_size));
164            events_received.emit(CountByteSize(count, json_size));
165
166            let batch = metrics.into_iter().map(|mut metric| {
167                // A metric starts out with a default "vector" namespace, but will be overridden
168                // if an explicit namespace is provided to this source.
169                if self.namespace != "vector" {
170                    metric = metric.with_namespace(Some(self.namespace.clone()));
171                }
172
173                if let Some(host_key) = &self.host_key.path {
174                    if let Ok(hostname) = &hostname {
175                        metric.replace_tag(host_key.to_string(), hostname.to_owned());
176                    }
177                }
178                if let Some(pid_key) = &self.pid_key {
179                    metric.replace_tag(pid_key.to_owned(), pid.clone());
180                }
181                metric
182            });
183
184            if (self.out.send_batch(batch).await).is_err() {
185                emit!(StreamClosedError { count });
186                return Err(());
187            }
188        }
189
190        Ok(())
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use std::collections::BTreeMap;
197
198    use metrics::{counter, gauge, histogram};
199    use vector_lib::{metric_tags, metrics::Controller};
200
201    use super::*;
202    use crate::{
203        event::{
204            metric::{Metric, MetricValue},
205            Event,
206        },
207        test_util::{
208            self,
209            components::{run_and_assert_source_compliance, SOURCE_TAGS},
210        },
211    };
212
213    #[test]
214    fn generate_config() {
215        test_util::test_generate_config::<InternalMetricsConfig>();
216    }
217
218    #[test]
219    fn captures_internal_metrics() {
220        test_util::trace_init();
221
222        // There *seems* to be a race condition here (CI was flaky), so add a slight delay.
223        std::thread::sleep(std::time::Duration::from_millis(300));
224
225        gauge!("foo").set(1.0);
226        gauge!("foo").set(2.0);
227        counter!("bar").increment(3);
228        counter!("bar").increment(4);
229        histogram!("baz").record(5.0);
230        histogram!("baz").record(6.0);
231        histogram!("quux", "host" => "foo").record(8.0);
232        histogram!("quux", "host" => "foo").record(8.1);
233
234        let controller = Controller::get().expect("no controller");
235
236        // There *seems* to be a race condition here (CI was flaky), so add a slight delay.
237        std::thread::sleep(std::time::Duration::from_millis(300));
238
239        let output = controller
240            .capture_metrics()
241            .into_iter()
242            .map(|metric| (metric.name().to_string(), metric))
243            .collect::<BTreeMap<String, Metric>>();
244
245        assert_eq!(&MetricValue::Gauge { value: 2.0 }, output["foo"].value());
246        assert_eq!(&MetricValue::Counter { value: 7.0 }, output["bar"].value());
247
248        match &output["baz"].value() {
249            MetricValue::AggregatedHistogram {
250                buckets,
251                count,
252                sum,
253            } => {
254                // This index is _only_ stable so long as the offsets in
255                // [`metrics::handle::Histogram::new`] are hard-coded. If this
256                // check fails you might look there and see if we've allowed
257                // users to set their own bucket widths.
258                assert_eq!(buckets[9].count, 2);
259                assert_eq!(*count, 2);
260                assert_eq!(*sum, 11.0);
261            }
262            _ => panic!("wrong type"),
263        }
264
265        match &output["quux"].value() {
266            MetricValue::AggregatedHistogram {
267                buckets,
268                count,
269                sum,
270            } => {
271                // This index is _only_ stable so long as the offsets in
272                // [`metrics::handle::Histogram::new`] are hard-coded. If this
273                // check fails you might look there and see if we've allowed
274                // users to set their own bucket widths.
275                assert_eq!(buckets[9].count, 1);
276                assert_eq!(buckets[10].count, 1);
277                assert_eq!(*count, 2);
278                assert_eq!(*sum, 16.1);
279            }
280            _ => panic!("wrong type"),
281        }
282
283        let labels = metric_tags!("host" => "foo");
284        assert_eq!(Some(&labels), output["quux"].tags());
285    }
286
287    async fn event_from_config(config: InternalMetricsConfig) -> Event {
288        let mut events = run_and_assert_source_compliance(
289            config,
290            time::Duration::from_millis(100),
291            &SOURCE_TAGS,
292        )
293        .await;
294
295        assert!(!events.is_empty());
296        events.remove(0)
297    }
298
299    #[tokio::test]
300    async fn default_namespace() {
301        let event = event_from_config(InternalMetricsConfig::default()).await;
302
303        assert_eq!(event.as_metric().namespace(), Some("vector"));
304    }
305
306    #[tokio::test]
307    async fn sets_tags() {
308        let event = event_from_config(InternalMetricsConfig {
309            tags: TagsConfig {
310                host_key: Some(OptionalValuePath::new("my_host_key")),
311                pid_key: Some(String::from("my_pid_key")),
312            },
313            ..Default::default()
314        })
315        .await;
316
317        let metric = event.as_metric();
318
319        assert!(metric.tag_value("my_host_key").is_some());
320        assert!(metric.tag_value("my_pid_key").is_some());
321    }
322
323    #[tokio::test]
324    async fn only_host_tags_by_default() {
325        let event = event_from_config(InternalMetricsConfig::default()).await;
326
327        let metric = event.as_metric();
328
329        assert!(metric.tag_value("host").is_some());
330        assert!(metric.tag_value("pid").is_none());
331    }
332
333    #[tokio::test]
334    async fn namespace() {
335        let namespace = "totally_custom";
336
337        let config = InternalMetricsConfig {
338            namespace: namespace.to_owned(),
339            ..InternalMetricsConfig::default()
340        };
341
342        let event = event_from_config(config).await;
343
344        assert_eq!(event.as_metric().namespace(), Some(namespace));
345    }
346}