vector/sources/
internal_metrics.rs

1use std::time::Duration;
2
3use futures::StreamExt;
4use serde_with::serde_as;
5use tokio::time;
6use tokio_stream::wrappers::IntervalStream;
7use vector_lib::{
8    ByteSizeOf, EstimatedJsonEncodedSizeOf,
9    config::LogNamespace,
10    configurable::configurable_component,
11    internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
12    lookup::lookup_v2::OptionalValuePath,
13};
14
15use crate::{
16    SourceSender,
17    config::{SourceConfig, SourceContext, SourceOutput, log_schema},
18    internal_events::{EventsReceived, StreamClosedError},
19    metrics::Controller,
20    shutdown::ShutdownSignal,
21};
22
23/// Configuration for the `internal_metrics` source.
24#[serde_as]
25#[configurable_component(source(
26    "internal_metrics",
27    "Expose internal metrics emitted by the running Vector instance."
28))]
29#[derive(Clone, Debug)]
30#[serde(deny_unknown_fields, default)]
31pub struct InternalMetricsConfig {
32    /// The interval between metric gathering, in seconds.
33    #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
34    #[serde(default = "default_scrape_interval")]
35    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
36    pub scrape_interval_secs: Duration,
37
38    #[configurable(derived)]
39    pub tags: TagsConfig,
40
41    /// Overrides the default namespace for the metrics emitted by the source.
42    #[serde(default = "default_namespace")]
43    pub namespace: String,
44}
45
46impl Default for InternalMetricsConfig {
47    fn default() -> Self {
48        Self {
49            scrape_interval_secs: default_scrape_interval(),
50            tags: TagsConfig::default(),
51            namespace: default_namespace(),
52        }
53    }
54}
55
56/// Tag configuration for the `internal_metrics` source.
57#[configurable_component]
58#[derive(Clone, Debug, Default)]
59#[serde(deny_unknown_fields, default)]
60pub struct TagsConfig {
61    /// Overrides the name of the tag used to add the peer host to each metric.
62    ///
63    /// The value is the peer host's address, including the port. For example, `1.2.3.4:9000`.
64    ///
65    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
66    ///
67    /// Set to `""` to suppress this key.
68    ///
69    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
70    pub host_key: Option<OptionalValuePath>,
71
72    /// Sets the name of the tag to use to add the current process ID to each metric.
73    ///
74    ///
75    /// By default, this is not set and the tag is not automatically added.
76    #[configurable(metadata(docs::examples = "pid"))]
77    pub pid_key: Option<String>,
78}
79
80fn default_scrape_interval() -> Duration {
81    Duration::from_secs_f64(1.0)
82}
83
84fn default_namespace() -> String {
85    "vector".to_owned()
86}
87
88impl_generate_config_from_default!(InternalMetricsConfig);
89
90#[async_trait::async_trait]
91#[typetag::serde(name = "internal_metrics")]
92impl SourceConfig for InternalMetricsConfig {
93    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
94        if self.scrape_interval_secs.is_zero() {
95            warn!(
96                "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
97            );
98        }
99        let interval = self.scrape_interval_secs;
100
101        // namespace for created metrics is already "vector" by default.
102        let namespace = self.namespace.clone();
103
104        let host_key = self
105            .tags
106            .host_key
107            .clone()
108            .unwrap_or(log_schema().host_key().cloned().into());
109
110        let pid_key = self
111            .tags
112            .pid_key
113            .as_deref()
114            .and_then(|tag| (!tag.is_empty()).then(|| tag.to_owned()));
115
116        Ok(Box::pin(
117            InternalMetrics {
118                namespace,
119                host_key,
120                pid_key,
121                controller: Controller::get()?,
122                interval,
123                out: cx.out,
124                shutdown: cx.shutdown,
125            }
126            .run(),
127        ))
128    }
129
130    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
131        vec![SourceOutput::new_metrics()]
132    }
133
134    fn can_acknowledge(&self) -> bool {
135        false
136    }
137}
138
139struct InternalMetrics<'a> {
140    namespace: String,
141    host_key: OptionalValuePath,
142    pid_key: Option<String>,
143    controller: &'a Controller,
144    interval: time::Duration,
145    out: SourceSender,
146    shutdown: ShutdownSignal,
147}
148
149impl InternalMetrics<'_> {
150    async fn run(mut self) -> Result<(), ()> {
151        let events_received = register!(EventsReceived);
152        let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
153        let mut interval =
154            IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
155        while interval.next().await.is_some() {
156            let hostname = crate::get_hostname();
157            let pid = std::process::id().to_string();
158
159            let metrics = self.controller.capture_metrics();
160            let count = metrics.len();
161            let byte_size = metrics.size_of();
162            let json_size = metrics.estimated_json_encoded_size_of();
163
164            bytes_received.emit(ByteSize(byte_size));
165            events_received.emit(CountByteSize(count, json_size));
166
167            let batch = metrics.into_iter().map(|mut metric| {
168                // A metric starts out with a default "vector" namespace, but will be overridden
169                // if an explicit namespace is provided to this source.
170                if self.namespace != "vector" {
171                    metric = metric.with_namespace(Some(self.namespace.clone()));
172                }
173
174                if let Some(host_key) = &self.host_key.path
175                    && let Ok(hostname) = &hostname
176                {
177                    metric.replace_tag(host_key.to_string(), hostname.to_owned());
178                }
179                if let Some(pid_key) = &self.pid_key {
180                    metric.replace_tag(pid_key.to_owned(), pid.clone());
181                }
182                metric
183            });
184
185            if (self.out.send_batch(batch).await).is_err() {
186                emit!(StreamClosedError { count });
187                return Err(());
188            }
189        }
190
191        Ok(())
192    }
193}
194
195#[cfg(test)]
196mod tests {
197    use std::collections::BTreeMap;
198
199    use metrics::{counter, gauge, histogram};
200    use vector_lib::{metric_tags, metrics::Controller};
201
202    use super::*;
203    use crate::{
204        event::{
205            Event,
206            metric::{Metric, MetricValue},
207        },
208        test_util::{
209            self,
210            components::{SOURCE_TAGS, run_and_assert_source_compliance},
211        },
212    };
213
214    #[test]
215    fn generate_config() {
216        test_util::test_generate_config::<InternalMetricsConfig>();
217    }
218
219    #[test]
220    fn captures_internal_metrics() {
221        test_util::trace_init();
222
223        // There *seems* to be a race condition here (CI was flaky), so add a slight delay.
224        std::thread::sleep(std::time::Duration::from_millis(300));
225
226        gauge!("foo").set(1.0);
227        gauge!("foo").set(2.0);
228        counter!("bar").increment(3);
229        counter!("bar").increment(4);
230        histogram!("baz").record(5.0);
231        histogram!("baz").record(6.0);
232        histogram!("quux", "host" => "foo").record(8.0);
233        histogram!("quux", "host" => "foo").record(8.1);
234
235        let controller = Controller::get().expect("no controller");
236
237        // There *seems* to be a race condition here (CI was flaky), so add a slight delay.
238        std::thread::sleep(std::time::Duration::from_millis(300));
239
240        let output = controller
241            .capture_metrics()
242            .into_iter()
243            .map(|metric| (metric.name().to_string(), metric))
244            .collect::<BTreeMap<String, Metric>>();
245
246        assert_eq!(&MetricValue::Gauge { value: 2.0 }, output["foo"].value());
247        assert_eq!(&MetricValue::Counter { value: 7.0 }, output["bar"].value());
248
249        match &output["baz"].value() {
250            MetricValue::AggregatedHistogram {
251                buckets,
252                count,
253                sum,
254            } => {
255                // This index is _only_ stable so long as the offsets in
256                // [`metrics::handle::Histogram::new`] are hard-coded. If this
257                // check fails you might look there and see if we've allowed
258                // users to set their own bucket widths.
259                assert_eq!(buckets[9].count, 2);
260                assert_eq!(*count, 2);
261                assert_eq!(*sum, 11.0);
262            }
263            _ => panic!("wrong type"),
264        }
265
266        match &output["quux"].value() {
267            MetricValue::AggregatedHistogram {
268                buckets,
269                count,
270                sum,
271            } => {
272                // This index is _only_ stable so long as the offsets in
273                // [`metrics::handle::Histogram::new`] are hard-coded. If this
274                // check fails you might look there and see if we've allowed
275                // users to set their own bucket widths.
276                assert_eq!(buckets[9].count, 1);
277                assert_eq!(buckets[10].count, 1);
278                assert_eq!(*count, 2);
279                assert_eq!(*sum, 16.1);
280            }
281            _ => panic!("wrong type"),
282        }
283
284        let labels = metric_tags!("host" => "foo");
285        assert_eq!(Some(&labels), output["quux"].tags());
286    }
287
288    async fn event_from_config(config: InternalMetricsConfig) -> Event {
289        let mut events = run_and_assert_source_compliance(
290            config,
291            time::Duration::from_millis(100),
292            &SOURCE_TAGS,
293        )
294        .await;
295
296        assert!(!events.is_empty());
297        events.remove(0)
298    }
299
300    #[tokio::test]
301    async fn default_namespace() {
302        let event = event_from_config(InternalMetricsConfig::default()).await;
303
304        assert_eq!(event.as_metric().namespace(), Some("vector"));
305    }
306
307    #[tokio::test]
308    async fn sets_tags() {
309        let event = event_from_config(InternalMetricsConfig {
310            tags: TagsConfig {
311                host_key: Some(OptionalValuePath::new("my_host_key")),
312                pid_key: Some(String::from("my_pid_key")),
313            },
314            ..Default::default()
315        })
316        .await;
317
318        let metric = event.as_metric();
319
320        assert!(metric.tag_value("my_host_key").is_some());
321        assert!(metric.tag_value("my_pid_key").is_some());
322    }
323
324    #[tokio::test]
325    async fn only_host_tags_by_default() {
326        let event = event_from_config(InternalMetricsConfig::default()).await;
327
328        let metric = event.as_metric();
329
330        assert!(metric.tag_value("host").is_some());
331        assert!(metric.tag_value("pid").is_none());
332    }
333
334    #[tokio::test]
335    async fn namespace() {
336        let namespace = "totally_custom";
337
338        let config = InternalMetricsConfig {
339            namespace: namespace.to_owned(),
340            ..InternalMetricsConfig::default()
341        };
342
343        let event = event_from_config(config).await;
344
345        assert_eq!(event.as_metric().namespace(), Some(namespace));
346    }
347}