1use std::time::Duration;
2
3use futures::StreamExt;
4use serde_with::serde_as;
5use tokio::time;
6use tokio_stream::wrappers::IntervalStream;
7use vector_lib::{
8 ByteSizeOf, EstimatedJsonEncodedSizeOf,
9 config::LogNamespace,
10 configurable::configurable_component,
11 internal_event::{ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol},
12 lookup::lookup_v2::OptionalValuePath,
13};
14
15use crate::{
16 SourceSender,
17 config::{SourceConfig, SourceContext, SourceOutput, log_schema},
18 internal_events::{EventsReceived, StreamClosedError},
19 metrics::Controller,
20 shutdown::ShutdownSignal,
21};
22
23#[serde_as]
25#[configurable_component(source(
26 "internal_metrics",
27 "Expose internal metrics emitted by the running Vector instance."
28))]
29#[derive(Clone, Debug)]
30#[serde(deny_unknown_fields, default)]
31pub struct InternalMetricsConfig {
32 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
34 #[serde(default = "default_scrape_interval")]
35 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
36 pub scrape_interval_secs: Duration,
37
38 #[configurable(derived)]
39 pub tags: TagsConfig,
40
41 #[serde(default = "default_namespace")]
43 pub namespace: String,
44}
45
46impl Default for InternalMetricsConfig {
47 fn default() -> Self {
48 Self {
49 scrape_interval_secs: default_scrape_interval(),
50 tags: TagsConfig::default(),
51 namespace: default_namespace(),
52 }
53 }
54}
55
56#[configurable_component]
58#[derive(Clone, Debug, Default)]
59#[serde(deny_unknown_fields, default)]
60pub struct TagsConfig {
61 pub host_key: Option<OptionalValuePath>,
71
72 #[configurable(metadata(docs::examples = "pid"))]
77 pub pid_key: Option<String>,
78}
79
80fn default_scrape_interval() -> Duration {
81 Duration::from_secs_f64(1.0)
82}
83
84fn default_namespace() -> String {
85 "vector".to_owned()
86}
87
88impl_generate_config_from_default!(InternalMetricsConfig);
89
90#[async_trait::async_trait]
91#[typetag::serde(name = "internal_metrics")]
92impl SourceConfig for InternalMetricsConfig {
93 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
94 if self.scrape_interval_secs.is_zero() {
95 warn!(
96 "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
97 );
98 }
99 let interval = self.scrape_interval_secs;
100
101 let namespace = self.namespace.clone();
103
104 let host_key = self
105 .tags
106 .host_key
107 .clone()
108 .unwrap_or(log_schema().host_key().cloned().into());
109
110 let pid_key = self
111 .tags
112 .pid_key
113 .as_deref()
114 .and_then(|tag| (!tag.is_empty()).then(|| tag.to_owned()));
115
116 Ok(Box::pin(
117 InternalMetrics {
118 namespace,
119 host_key,
120 pid_key,
121 controller: Controller::get()?,
122 interval,
123 out: cx.out,
124 shutdown: cx.shutdown,
125 }
126 .run(),
127 ))
128 }
129
130 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
131 vec![SourceOutput::new_metrics()]
132 }
133
134 fn can_acknowledge(&self) -> bool {
135 false
136 }
137}
138
139struct InternalMetrics<'a> {
140 namespace: String,
141 host_key: OptionalValuePath,
142 pid_key: Option<String>,
143 controller: &'a Controller,
144 interval: time::Duration,
145 out: SourceSender,
146 shutdown: ShutdownSignal,
147}
148
149impl InternalMetrics<'_> {
150 async fn run(mut self) -> Result<(), ()> {
151 let events_received = register!(EventsReceived);
152 let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
153 let mut interval =
154 IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
155 while interval.next().await.is_some() {
156 let hostname = crate::get_hostname();
157 let pid = std::process::id().to_string();
158
159 let metrics = self.controller.capture_metrics();
160 let count = metrics.len();
161 let byte_size = metrics.size_of();
162 let json_size = metrics.estimated_json_encoded_size_of();
163
164 bytes_received.emit(ByteSize(byte_size));
165 events_received.emit(CountByteSize(count, json_size));
166
167 let batch = metrics.into_iter().map(|mut metric| {
168 if self.namespace != "vector" {
171 metric = metric.with_namespace(Some(self.namespace.clone()));
172 }
173
174 if let Some(host_key) = &self.host_key.path
175 && let Ok(hostname) = &hostname
176 {
177 metric.replace_tag(host_key.to_string(), hostname.to_owned());
178 }
179 if let Some(pid_key) = &self.pid_key {
180 metric.replace_tag(pid_key.to_owned(), pid.clone());
181 }
182 metric
183 });
184
185 if (self.out.send_batch(batch).await).is_err() {
186 emit!(StreamClosedError { count });
187 return Err(());
188 }
189 }
190
191 Ok(())
192 }
193}
194
195#[cfg(test)]
196mod tests {
197 use std::collections::BTreeMap;
198
199 use metrics::{counter, gauge, histogram};
200 use vector_lib::{metric_tags, metrics::Controller};
201
202 use super::*;
203 use crate::{
204 event::{
205 Event,
206 metric::{Metric, MetricValue},
207 },
208 test_util::{
209 self,
210 components::{SOURCE_TAGS, run_and_assert_source_compliance},
211 },
212 };
213
214 #[test]
215 fn generate_config() {
216 test_util::test_generate_config::<InternalMetricsConfig>();
217 }
218
219 #[test]
220 fn captures_internal_metrics() {
221 test_util::trace_init();
222
223 std::thread::sleep(std::time::Duration::from_millis(300));
225
226 gauge!("foo").set(1.0);
227 gauge!("foo").set(2.0);
228 counter!("bar").increment(3);
229 counter!("bar").increment(4);
230 histogram!("baz").record(5.0);
231 histogram!("baz").record(6.0);
232 histogram!("quux", "host" => "foo").record(8.0);
233 histogram!("quux", "host" => "foo").record(8.1);
234
235 let controller = Controller::get().expect("no controller");
236
237 std::thread::sleep(std::time::Duration::from_millis(300));
239
240 let output = controller
241 .capture_metrics()
242 .into_iter()
243 .map(|metric| (metric.name().to_string(), metric))
244 .collect::<BTreeMap<String, Metric>>();
245
246 assert_eq!(&MetricValue::Gauge { value: 2.0 }, output["foo"].value());
247 assert_eq!(&MetricValue::Counter { value: 7.0 }, output["bar"].value());
248
249 match &output["baz"].value() {
250 MetricValue::AggregatedHistogram {
251 buckets,
252 count,
253 sum,
254 } => {
255 assert_eq!(buckets[9].count, 2);
260 assert_eq!(*count, 2);
261 assert_eq!(*sum, 11.0);
262 }
263 _ => panic!("wrong type"),
264 }
265
266 match &output["quux"].value() {
267 MetricValue::AggregatedHistogram {
268 buckets,
269 count,
270 sum,
271 } => {
272 assert_eq!(buckets[9].count, 1);
277 assert_eq!(buckets[10].count, 1);
278 assert_eq!(*count, 2);
279 assert_eq!(*sum, 16.1);
280 }
281 _ => panic!("wrong type"),
282 }
283
284 let labels = metric_tags!("host" => "foo");
285 assert_eq!(Some(&labels), output["quux"].tags());
286 }
287
288 async fn event_from_config(config: InternalMetricsConfig) -> Event {
289 let mut events = run_and_assert_source_compliance(
290 config,
291 time::Duration::from_millis(100),
292 &SOURCE_TAGS,
293 )
294 .await;
295
296 assert!(!events.is_empty());
297 events.remove(0)
298 }
299
300 #[tokio::test]
301 async fn default_namespace() {
302 let event = event_from_config(InternalMetricsConfig::default()).await;
303
304 assert_eq!(event.as_metric().namespace(), Some("vector"));
305 }
306
307 #[tokio::test]
308 async fn sets_tags() {
309 let event = event_from_config(InternalMetricsConfig {
310 tags: TagsConfig {
311 host_key: Some(OptionalValuePath::new("my_host_key")),
312 pid_key: Some(String::from("my_pid_key")),
313 },
314 ..Default::default()
315 })
316 .await;
317
318 let metric = event.as_metric();
319
320 assert!(metric.tag_value("my_host_key").is_some());
321 assert!(metric.tag_value("my_pid_key").is_some());
322 }
323
324 #[tokio::test]
325 async fn only_host_tags_by_default() {
326 let event = event_from_config(InternalMetricsConfig::default()).await;
327
328 let metric = event.as_metric();
329
330 assert!(metric.tag_value("host").is_some());
331 assert!(metric.tag_value("pid").is_none());
332 }
333
334 #[tokio::test]
335 async fn namespace() {
336 let namespace = "totally_custom";
337
338 let config = InternalMetricsConfig {
339 namespace: namespace.to_owned(),
340 ..InternalMetricsConfig::default()
341 };
342
343 let event = event_from_config(config).await;
344
345 assert_eq!(event.as_metric().namespace(), Some(namespace));
346 }
347}