1use std::time::Duration;
2
3use futures::StreamExt;
4use serde_with::serde_as;
5use tokio::time;
6use tokio_stream::wrappers::IntervalStream;
7use vector_lib::configurable::configurable_component;
8use vector_lib::internal_event::{
9 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
10};
11use vector_lib::lookup::lookup_v2::OptionalValuePath;
12use vector_lib::{config::LogNamespace, ByteSizeOf, EstimatedJsonEncodedSizeOf};
13
14use crate::{
15 config::{log_schema, SourceConfig, SourceContext, SourceOutput},
16 internal_events::{EventsReceived, StreamClosedError},
17 metrics::Controller,
18 shutdown::ShutdownSignal,
19 SourceSender,
20};
21
22#[serde_as]
24#[configurable_component(source(
25 "internal_metrics",
26 "Expose internal metrics emitted by the running Vector instance."
27))]
28#[derive(Clone, Debug)]
29#[serde(deny_unknown_fields, default)]
30pub struct InternalMetricsConfig {
31 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
33 #[serde(default = "default_scrape_interval")]
34 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
35 pub scrape_interval_secs: Duration,
36
37 #[configurable(derived)]
38 pub tags: TagsConfig,
39
40 #[serde(default = "default_namespace")]
42 pub namespace: String,
43}
44
45impl Default for InternalMetricsConfig {
46 fn default() -> Self {
47 Self {
48 scrape_interval_secs: default_scrape_interval(),
49 tags: TagsConfig::default(),
50 namespace: default_namespace(),
51 }
52 }
53}
54
55#[configurable_component]
57#[derive(Clone, Debug, Default)]
58#[serde(deny_unknown_fields, default)]
59pub struct TagsConfig {
60 pub host_key: Option<OptionalValuePath>,
70
71 #[configurable(metadata(docs::examples = "pid"))]
76 pub pid_key: Option<String>,
77}
78
79fn default_scrape_interval() -> Duration {
80 Duration::from_secs_f64(1.0)
81}
82
83fn default_namespace() -> String {
84 "vector".to_owned()
85}
86
87impl_generate_config_from_default!(InternalMetricsConfig);
88
89#[async_trait::async_trait]
90#[typetag::serde(name = "internal_metrics")]
91impl SourceConfig for InternalMetricsConfig {
92 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
93 if self.scrape_interval_secs.is_zero() {
94 warn!(
95 "Interval set to 0 secs, this could result in high CPU utilization. It is suggested to use interval >= 1 secs.",
96 );
97 }
98 let interval = self.scrape_interval_secs;
99
100 let namespace = self.namespace.clone();
102
103 let host_key = self
104 .tags
105 .host_key
106 .clone()
107 .unwrap_or(log_schema().host_key().cloned().into());
108
109 let pid_key = self
110 .tags
111 .pid_key
112 .as_deref()
113 .and_then(|tag| (!tag.is_empty()).then(|| tag.to_owned()));
114
115 Ok(Box::pin(
116 InternalMetrics {
117 namespace,
118 host_key,
119 pid_key,
120 controller: Controller::get()?,
121 interval,
122 out: cx.out,
123 shutdown: cx.shutdown,
124 }
125 .run(),
126 ))
127 }
128
129 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
130 vec![SourceOutput::new_metrics()]
131 }
132
133 fn can_acknowledge(&self) -> bool {
134 false
135 }
136}
137
138struct InternalMetrics<'a> {
139 namespace: String,
140 host_key: OptionalValuePath,
141 pid_key: Option<String>,
142 controller: &'a Controller,
143 interval: time::Duration,
144 out: SourceSender,
145 shutdown: ShutdownSignal,
146}
147
148impl InternalMetrics<'_> {
149 async fn run(mut self) -> Result<(), ()> {
150 let events_received = register!(EventsReceived);
151 let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
152 let mut interval =
153 IntervalStream::new(time::interval(self.interval)).take_until(self.shutdown);
154 while interval.next().await.is_some() {
155 let hostname = crate::get_hostname();
156 let pid = std::process::id().to_string();
157
158 let metrics = self.controller.capture_metrics();
159 let count = metrics.len();
160 let byte_size = metrics.size_of();
161 let json_size = metrics.estimated_json_encoded_size_of();
162
163 bytes_received.emit(ByteSize(byte_size));
164 events_received.emit(CountByteSize(count, json_size));
165
166 let batch = metrics.into_iter().map(|mut metric| {
167 if self.namespace != "vector" {
170 metric = metric.with_namespace(Some(self.namespace.clone()));
171 }
172
173 if let Some(host_key) = &self.host_key.path {
174 if let Ok(hostname) = &hostname {
175 metric.replace_tag(host_key.to_string(), hostname.to_owned());
176 }
177 }
178 if let Some(pid_key) = &self.pid_key {
179 metric.replace_tag(pid_key.to_owned(), pid.clone());
180 }
181 metric
182 });
183
184 if (self.out.send_batch(batch).await).is_err() {
185 emit!(StreamClosedError { count });
186 return Err(());
187 }
188 }
189
190 Ok(())
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use std::collections::BTreeMap;
197
198 use metrics::{counter, gauge, histogram};
199 use vector_lib::{metric_tags, metrics::Controller};
200
201 use super::*;
202 use crate::{
203 event::{
204 metric::{Metric, MetricValue},
205 Event,
206 },
207 test_util::{
208 self,
209 components::{run_and_assert_source_compliance, SOURCE_TAGS},
210 },
211 };
212
213 #[test]
214 fn generate_config() {
215 test_util::test_generate_config::<InternalMetricsConfig>();
216 }
217
218 #[test]
219 fn captures_internal_metrics() {
220 test_util::trace_init();
221
222 std::thread::sleep(std::time::Duration::from_millis(300));
224
225 gauge!("foo").set(1.0);
226 gauge!("foo").set(2.0);
227 counter!("bar").increment(3);
228 counter!("bar").increment(4);
229 histogram!("baz").record(5.0);
230 histogram!("baz").record(6.0);
231 histogram!("quux", "host" => "foo").record(8.0);
232 histogram!("quux", "host" => "foo").record(8.1);
233
234 let controller = Controller::get().expect("no controller");
235
236 std::thread::sleep(std::time::Duration::from_millis(300));
238
239 let output = controller
240 .capture_metrics()
241 .into_iter()
242 .map(|metric| (metric.name().to_string(), metric))
243 .collect::<BTreeMap<String, Metric>>();
244
245 assert_eq!(&MetricValue::Gauge { value: 2.0 }, output["foo"].value());
246 assert_eq!(&MetricValue::Counter { value: 7.0 }, output["bar"].value());
247
248 match &output["baz"].value() {
249 MetricValue::AggregatedHistogram {
250 buckets,
251 count,
252 sum,
253 } => {
254 assert_eq!(buckets[9].count, 2);
259 assert_eq!(*count, 2);
260 assert_eq!(*sum, 11.0);
261 }
262 _ => panic!("wrong type"),
263 }
264
265 match &output["quux"].value() {
266 MetricValue::AggregatedHistogram {
267 buckets,
268 count,
269 sum,
270 } => {
271 assert_eq!(buckets[9].count, 1);
276 assert_eq!(buckets[10].count, 1);
277 assert_eq!(*count, 2);
278 assert_eq!(*sum, 16.1);
279 }
280 _ => panic!("wrong type"),
281 }
282
283 let labels = metric_tags!("host" => "foo");
284 assert_eq!(Some(&labels), output["quux"].tags());
285 }
286
287 async fn event_from_config(config: InternalMetricsConfig) -> Event {
288 let mut events = run_and_assert_source_compliance(
289 config,
290 time::Duration::from_millis(100),
291 &SOURCE_TAGS,
292 )
293 .await;
294
295 assert!(!events.is_empty());
296 events.remove(0)
297 }
298
299 #[tokio::test]
300 async fn default_namespace() {
301 let event = event_from_config(InternalMetricsConfig::default()).await;
302
303 assert_eq!(event.as_metric().namespace(), Some("vector"));
304 }
305
306 #[tokio::test]
307 async fn sets_tags() {
308 let event = event_from_config(InternalMetricsConfig {
309 tags: TagsConfig {
310 host_key: Some(OptionalValuePath::new("my_host_key")),
311 pid_key: Some(String::from("my_pid_key")),
312 },
313 ..Default::default()
314 })
315 .await;
316
317 let metric = event.as_metric();
318
319 assert!(metric.tag_value("my_host_key").is_some());
320 assert!(metric.tag_value("my_pid_key").is_some());
321 }
322
323 #[tokio::test]
324 async fn only_host_tags_by_default() {
325 let event = event_from_config(InternalMetricsConfig::default()).await;
326
327 let metric = event.as_metric();
328
329 assert!(metric.tag_value("host").is_some());
330 assert!(metric.tag_value("pid").is_none());
331 }
332
333 #[tokio::test]
334 async fn namespace() {
335 let namespace = "totally_custom";
336
337 let config = InternalMetricsConfig {
338 namespace: namespace.to_owned(),
339 ..InternalMetricsConfig::default()
340 };
341
342 let event = event_from_config(config).await;
343
344 assert_eq!(event.as_metric().namespace(), Some(namespace));
345 }
346}