1use std::{
2 path::{Path, PathBuf},
3 time::Duration,
4};
5
6use chrono::{DateTime, Utc};
7use futures::StreamExt;
8use glob::{Pattern, PatternError};
9#[cfg(not(windows))]
10use heim::units::ratio::ratio;
11use heim::units::time::second;
12use serde_with::serde_as;
13use sysinfo::System;
14use tokio::time;
15use tokio_stream::wrappers::IntervalStream;
16use vector_lib::{
17 EstimatedJsonEncodedSizeOf,
18 config::LogNamespace,
19 configurable::configurable_component,
20 internal_event::{
21 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
22 },
23};
24
25use crate::{
26 SourceSender,
27 config::{SourceConfig, SourceContext, SourceOutput},
28 event::metric::{Metric, MetricKind, MetricTags, MetricValue},
29 internal_events::{EventsReceived, HostMetricsScrapeDetailError, StreamClosedError},
30 shutdown::ShutdownSignal,
31};
32
33#[cfg(target_os = "linux")]
34mod cgroups;
35mod cpu;
36mod disk;
37mod filesystem;
38mod memory;
39mod network;
40mod process;
41#[cfg(target_os = "linux")]
42mod tcp;
43
44#[serde_as]
46#[configurable_component]
47#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48#[serde(rename_all = "lowercase")]
49pub enum Collector {
50 CGroups,
54
55 Cpu,
57
58 Process,
60
61 Disk,
63
64 Filesystem,
66
67 Load,
69
70 Host,
72
73 Memory,
75
76 Network,
78
79 TCP,
81}
82
83#[configurable_component]
85#[derive(Clone, Debug, Default)]
86struct FilterList {
87 includes: Option<Vec<PatternWrapper>>,
91
92 excludes: Option<Vec<PatternWrapper>>,
96}
97
98#[serde_as]
100#[configurable_component(source("host_metrics", "Collect metric data from the local system."))]
101#[derive(Clone, Debug, Derivative)]
102#[derivative(Default)]
103#[serde(deny_unknown_fields)]
104pub struct HostMetricsConfig {
105 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
107 #[serde(default = "default_scrape_interval")]
108 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
109 pub scrape_interval_secs: Duration,
110
111 #[configurable(metadata(docs::examples = "example_collectors()"))]
115 #[derivative(Default(value = "default_collectors()"))]
116 #[serde(default = "default_collectors")]
117 pub collectors: Option<Vec<Collector>>,
118
119 #[derivative(Default(value = "default_namespace()"))]
121 #[serde(default = "default_namespace")]
122 pub namespace: Option<String>,
123
124 #[configurable(derived)]
125 #[derivative(Default(value = "default_cgroups_config()"))]
126 #[serde(default = "default_cgroups_config")]
127 pub cgroups: Option<CGroupsConfig>,
128
129 #[configurable(derived)]
130 #[serde(default)]
131 pub disk: disk::DiskConfig,
132
133 #[configurable(derived)]
134 #[serde(default)]
135 pub filesystem: filesystem::FilesystemConfig,
136
137 #[configurable(derived)]
138 #[serde(default)]
139 pub network: network::NetworkConfig,
140
141 #[configurable(derived)]
142 #[serde(default)]
143 pub process: process::ProcessConfig,
144}
145
146#[configurable_component]
150#[derive(Clone, Debug, Derivative)]
151#[derivative(Default)]
152#[serde(default)]
153pub struct CGroupsConfig {
154 #[derivative(Default(value = "default_levels()"))]
158 #[serde(default = "default_levels")]
159 #[configurable(metadata(docs::examples = 1))]
160 #[configurable(metadata(docs::examples = 3))]
161 levels: usize,
162
163 #[configurable(metadata(docs::examples = "/"))]
165 #[configurable(metadata(docs::examples = "system.slice/snapd.service"))]
166 pub(super) base: Option<PathBuf>,
167
168 #[configurable(metadata(docs::examples = "example_cgroups()"))]
171 #[serde(default = "default_all_devices")]
172 groups: FilterList,
173
174 #[serde(skip_serializing)]
176 #[configurable(metadata(docs::hidden))]
177 #[configurable(metadata(docs::human_name = "Base Directory"))]
178 base_dir: Option<PathBuf>,
179}
180
181const fn default_scrape_interval() -> Duration {
182 Duration::from_secs(15)
183}
184
185pub fn default_namespace() -> Option<String> {
186 Some(String::from("host"))
187}
188
189const fn example_collectors() -> [&'static str; 9] {
190 [
191 "cgroups",
192 "cpu",
193 "disk",
194 "filesystem",
195 "load",
196 "host",
197 "memory",
198 "network",
199 "tcp",
200 ]
201}
202
203fn default_collectors() -> Option<Vec<Collector>> {
204 let mut collectors = vec![
205 Collector::Cpu,
206 Collector::Disk,
207 Collector::Filesystem,
208 Collector::Load,
209 Collector::Host,
210 Collector::Memory,
211 Collector::Network,
212 Collector::Process,
213 ];
214
215 #[cfg(target_os = "linux")]
216 {
217 collectors.push(Collector::CGroups);
218 collectors.push(Collector::TCP);
219 }
220 #[cfg(not(target_os = "linux"))]
221 if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
222 collectors.push(Collector::CGroups);
223 collectors.push(Collector::TCP);
224 }
225
226 Some(collectors)
227}
228
229fn example_devices() -> FilterList {
230 FilterList {
231 includes: Some(vec!["sda".try_into().unwrap()]),
232 excludes: Some(vec!["dm-*".try_into().unwrap()]),
233 }
234}
235
236fn default_all_devices() -> FilterList {
237 FilterList {
238 includes: Some(vec!["*".try_into().unwrap()]),
239 excludes: None,
240 }
241}
242
243fn example_processes() -> FilterList {
244 FilterList {
245 includes: Some(vec!["docker".try_into().unwrap()]),
246 excludes: None,
247 }
248}
249
250fn default_all_processes() -> FilterList {
251 FilterList {
252 includes: Some(vec!["*".try_into().unwrap()]),
253 excludes: None,
254 }
255}
256
257const fn default_levels() -> usize {
258 100
259}
260
261fn example_cgroups() -> FilterList {
262 FilterList {
263 includes: Some(vec!["user.slice/*".try_into().unwrap()]),
264 excludes: Some(vec!["*.service".try_into().unwrap()]),
265 }
266}
267
268fn default_cgroups_config() -> Option<CGroupsConfig> {
269 if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
271 return Some(CGroupsConfig::default());
272 }
273
274 #[cfg(not(target_os = "linux"))]
275 {
276 None
277 }
278
279 #[cfg(target_os = "linux")]
280 {
281 Some(CGroupsConfig::default())
282 }
283}
284
285impl_generate_config_from_default!(HostMetricsConfig);
286
287#[async_trait::async_trait]
288#[typetag::serde(name = "host_metrics")]
289impl SourceConfig for HostMetricsConfig {
290 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
291 init_roots();
292
293 #[cfg(not(target_os = "linux"))]
294 {
295 if self.cgroups.is_some() || self.has_collector(Collector::CGroups) {
296 return Err("CGroups collector is only available on Linux systems".into());
297 }
298 if self.has_collector(Collector::TCP) {
299 return Err("TCP collector is only available on Linux systems".into());
300 }
301 }
302
303 let mut config = self.clone();
304 config.namespace = config.namespace.filter(|namespace| !namespace.is_empty());
305
306 Ok(Box::pin(config.run(cx.out, cx.shutdown)))
307 }
308
309 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
310 vec![SourceOutput::new_metrics()]
311 }
312
313 fn can_acknowledge(&self) -> bool {
314 false
315 }
316}
317
318impl HostMetricsConfig {
319 pub fn scrape_interval_secs(&mut self, value: f64) {
321 self.scrape_interval_secs = Duration::from_secs_f64(value);
322 }
323
324 async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
325 let duration = self.scrape_interval_secs;
326 let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
327
328 let mut generator = HostMetrics::new(self);
329
330 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
331
332 while interval.next().await.is_some() {
333 bytes_received.emit(ByteSize(0));
334 let metrics = generator.capture_metrics().await;
335 let count = metrics.len();
336 if (out.send_batch(metrics).await).is_err() {
337 emit!(StreamClosedError { count });
338 return Err(());
339 }
340 }
341
342 Ok(())
343 }
344
345 fn has_collector(&self, collector: Collector) -> bool {
346 match &self.collectors {
347 None => true,
348 Some(collectors) => collectors.contains(&collector),
349 }
350 }
351}
352
353pub struct HostMetrics {
354 config: HostMetricsConfig,
355 system: System,
356 #[cfg(target_os = "linux")]
357 root_cgroup: Option<cgroups::CGroupRoot>,
358 events_received: Registered<EventsReceived>,
359}
360
361impl HostMetrics {
362 #[cfg(not(target_os = "linux"))]
363 pub fn new(config: HostMetricsConfig) -> Self {
364 Self {
365 config,
366 system: System::new(),
367 events_received: register!(EventsReceived),
368 }
369 }
370
371 #[cfg(target_os = "linux")]
372 pub fn new(config: HostMetricsConfig) -> Self {
373 let cgroups = config.cgroups.clone().unwrap_or_default();
374 let root_cgroup = cgroups::CGroupRoot::new(&cgroups);
375 Self {
376 config,
377 system: System::new(),
378 root_cgroup,
379 events_received: register!(EventsReceived),
380 }
381 }
382
383 pub fn buffer(&self) -> MetricsBuffer {
384 MetricsBuffer::new(self.config.namespace.clone())
385 }
386
387 async fn capture_metrics(&mut self) -> Vec<Metric> {
388 let mut buffer = self.buffer();
389
390 #[cfg(target_os = "linux")]
391 if self.config.has_collector(Collector::CGroups) {
392 self.cgroups_metrics(&mut buffer).await;
393 }
394 if self.config.has_collector(Collector::Cpu) {
395 self.cpu_metrics(&mut buffer).await;
396 }
397 if self.config.has_collector(Collector::Process) {
398 self.process_metrics(&mut buffer).await;
399 }
400 if self.config.has_collector(Collector::Disk) {
401 self.disk_metrics(&mut buffer).await;
402 }
403 if self.config.has_collector(Collector::Filesystem) {
404 self.filesystem_metrics(&mut buffer).await;
405 }
406 if self.config.has_collector(Collector::Load) {
407 self.loadavg_metrics(&mut buffer).await;
408 }
409 if self.config.has_collector(Collector::Host) {
410 self.host_metrics(&mut buffer).await;
411 }
412 if self.config.has_collector(Collector::Memory) {
413 self.memory_metrics(&mut buffer).await;
414 self.swap_metrics(&mut buffer).await;
415 }
416 if self.config.has_collector(Collector::Network) {
417 self.network_metrics(&mut buffer).await;
418 }
419 #[cfg(target_os = "linux")]
420 if self.config.has_collector(Collector::TCP) {
421 self.tcp_metrics(&mut buffer).await;
422 }
423
424 let metrics = buffer.metrics;
425 self.events_received.emit(CountByteSize(
426 metrics.len(),
427 metrics.estimated_json_encoded_size_of(),
428 ));
429 metrics
430 }
431
432 pub async fn loadavg_metrics(&self, output: &mut MetricsBuffer) {
433 output.name = "load";
434 #[cfg(unix)]
435 match heim::cpu::os::unix::loadavg().await {
436 Ok(loadavg) => {
437 output.gauge(
438 "load1",
439 loadavg.0.get::<ratio>() as f64,
440 MetricTags::default(),
441 );
442 output.gauge(
443 "load5",
444 loadavg.1.get::<ratio>() as f64,
445 MetricTags::default(),
446 );
447 output.gauge(
448 "load15",
449 loadavg.2.get::<ratio>() as f64,
450 MetricTags::default(),
451 );
452 }
453 Err(error) => {
454 emit!(HostMetricsScrapeDetailError {
455 message: "Failed to load average info",
456 error,
457 });
458 }
459 }
460 }
461
462 pub async fn host_metrics(&self, output: &mut MetricsBuffer) {
463 output.name = "host";
464 match heim::host::uptime().await {
465 Ok(time) => output.gauge("uptime", time.get::<second>(), MetricTags::default()),
466 Err(error) => {
467 emit!(HostMetricsScrapeDetailError {
468 message: "Failed to load host uptime info",
469 error,
470 });
471 }
472 }
473
474 match heim::host::boot_time().await {
475 Ok(time) => output.gauge("boot_time", time.get::<second>(), MetricTags::default()),
476 Err(error) => {
477 emit!(HostMetricsScrapeDetailError {
478 message: "Failed to load host boot time info",
479 error,
480 });
481 }
482 }
483 }
484}
485
486#[derive(Default)]
487pub struct MetricsBuffer {
488 pub metrics: Vec<Metric>,
489 name: &'static str,
490 host: Option<String>,
491 timestamp: DateTime<Utc>,
492 namespace: Option<String>,
493}
494
495impl MetricsBuffer {
496 fn new(namespace: Option<String>) -> Self {
497 Self {
498 metrics: Vec::new(),
499 name: "",
500 host: crate::get_hostname().ok(),
501 timestamp: Utc::now(),
502 namespace,
503 }
504 }
505
506 fn tags(&self, mut tags: MetricTags) -> MetricTags {
507 tags.replace("collector".into(), self.name.to_string());
508 if let Some(host) = &self.host {
509 tags.replace("host".into(), host.clone());
510 }
511 tags
512 }
513
514 fn counter(&mut self, name: &str, value: f64, tags: MetricTags) {
515 self.metrics.push(
516 Metric::new(name, MetricKind::Absolute, MetricValue::Counter { value })
517 .with_namespace(self.namespace.clone())
518 .with_tags(Some(self.tags(tags)))
519 .with_timestamp(Some(self.timestamp)),
520 )
521 }
522
523 fn gauge(&mut self, name: &str, value: f64, tags: MetricTags) {
524 self.metrics.push(
525 Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
526 .with_namespace(self.namespace.clone())
527 .with_tags(Some(self.tags(tags)))
528 .with_timestamp(Some(self.timestamp)),
529 )
530 }
531}
532
533fn filter_result_sync<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
534where
535 E: std::error::Error,
536{
537 result
538 .map_err(|error| emit!(HostMetricsScrapeDetailError { message, error }))
539 .ok()
540}
541
542async fn filter_result<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
543where
544 E: std::error::Error,
545{
546 filter_result_sync(result, message)
547}
548
549#[allow(clippy::missing_const_for_fn)]
550fn init_roots() {
551 #[cfg(target_os = "linux")]
552 {
553 use std::sync::Once;
554
555 static INIT: Once = Once::new();
556
557 INIT.call_once(|| {
558 match std::env::var_os("PROCFS_ROOT") {
559 Some(procfs_root) => {
560 info!(
561 message = "PROCFS_ROOT is set in envvars. Using custom for procfs.",
562 custom = ?procfs_root
563 );
564 heim::os::linux::set_procfs_root(std::path::PathBuf::from(&procfs_root));
565 }
566 None => info!("PROCFS_ROOT is unset. Using default '/proc' for procfs root."),
567 };
568
569 match std::env::var_os("SYSFS_ROOT") {
570 Some(sysfs_root) => {
571 info!(
572 message = "SYSFS_ROOT is set in envvars. Using custom for sysfs.",
573 custom = ?sysfs_root
574 );
575 heim::os::linux::set_sysfs_root(std::path::PathBuf::from(&sysfs_root));
576 }
577 None => info!("SYSFS_ROOT is unset. Using default '/sys' for sysfs root."),
578 }
579 });
580 };
581}
582
583impl FilterList {
584 fn contains<T, M>(&self, value: &Option<T>, matches: M) -> bool
585 where
586 M: Fn(&PatternWrapper, &T) -> bool,
587 {
588 (match (&self.includes, value) {
589 (None, _) => true,
591 (Some(_), None) => false,
593 (Some(includes), Some(value)) => includes.iter().any(|pattern| matches(pattern, value)),
595 }) && match (&self.excludes, value) {
596 (None, _) => true,
598 (Some(_), None) => true,
600 (Some(excludes), Some(value)) => {
602 !excludes.iter().any(|pattern| matches(pattern, value))
603 }
604 }
605 }
606
607 fn contains_str(&self, value: Option<&str>) -> bool {
608 self.contains(&value, |pattern, s| pattern.matches_str(s))
609 }
610
611 fn contains_path(&self, value: Option<&Path>) -> bool {
612 self.contains(&value, |pattern, path| pattern.matches_path(path))
613 }
614
615 #[cfg(test)]
616 fn contains_test(&self, value: Option<&str>) -> bool {
617 let result = self.contains_str(value);
618 assert_eq!(result, self.contains_path(value.map(std::path::Path::new)));
619 result
620 }
621}
622
623#[configurable_component]
640#[derive(Clone, Debug)]
641#[serde(try_from = "String", into = "String")]
642struct PatternWrapper(Pattern);
643
644impl PatternWrapper {
645 fn matches_str(&self, s: &str) -> bool {
646 self.0.matches(s)
647 }
648
649 fn matches_path(&self, p: &Path) -> bool {
650 self.0.matches_path(p)
651 }
652}
653
654impl TryFrom<String> for PatternWrapper {
655 type Error = PatternError;
656
657 fn try_from(value: String) -> Result<Self, Self::Error> {
658 Pattern::new(value.as_ref()).map(PatternWrapper)
659 }
660}
661
662impl TryFrom<&str> for PatternWrapper {
663 type Error = PatternError;
664
665 fn try_from(value: &str) -> Result<Self, Self::Error> {
666 value.to_string().try_into()
667 }
668}
669
670impl From<PatternWrapper> for String {
671 fn from(pattern: PatternWrapper) -> Self {
672 pattern.0.to_string()
673 }
674}
675
676#[cfg(test)]
677mod tests {
678 use std::{collections::HashSet, future::Future, time::Duration};
679
680 use super::*;
681 use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
682
683 #[test]
684 fn filterlist_default_includes_everything() {
685 let filters = FilterList::default();
686 assert!(filters.contains_test(Some("anything")));
687 assert!(filters.contains_test(Some("should")));
688 assert!(filters.contains_test(Some("work")));
689 assert!(filters.contains_test(None));
690 }
691
692 #[test]
693 fn filterlist_includes_works() {
694 let filters = FilterList {
695 includes: Some(vec![
696 PatternWrapper::try_from("sda".to_string()).unwrap(),
697 PatternWrapper::try_from("dm-*".to_string()).unwrap(),
698 ]),
699 excludes: None,
700 };
701 assert!(!filters.contains_test(Some("sd")));
702 assert!(filters.contains_test(Some("sda")));
703 assert!(!filters.contains_test(Some("sda1")));
704 assert!(filters.contains_test(Some("dm-")));
705 assert!(filters.contains_test(Some("dm-5")));
706 assert!(!filters.contains_test(Some("xda")));
707 assert!(!filters.contains_test(None));
708 }
709
710 #[test]
711 fn filterlist_excludes_works() {
712 let filters = FilterList {
713 includes: None,
714 excludes: Some(vec![
715 PatternWrapper::try_from("sda".to_string()).unwrap(),
716 PatternWrapper::try_from("dm-*".to_string()).unwrap(),
717 ]),
718 };
719 assert!(filters.contains_test(Some("sd")));
720 assert!(!filters.contains_test(Some("sda")));
721 assert!(filters.contains_test(Some("sda1")));
722 assert!(!filters.contains_test(Some("dm-")));
723 assert!(!filters.contains_test(Some("dm-5")));
724 assert!(filters.contains_test(Some("xda")));
725 assert!(filters.contains_test(None));
726 }
727
728 #[test]
729 fn filterlist_includes_and_excludes_works() {
730 let filters = FilterList {
731 includes: Some(vec![
732 PatternWrapper::try_from("sda".to_string()).unwrap(),
733 PatternWrapper::try_from("dm-*".to_string()).unwrap(),
734 ]),
735 excludes: Some(vec![PatternWrapper::try_from("dm-5".to_string()).unwrap()]),
736 };
737 assert!(!filters.contains_test(Some("sd")));
738 assert!(filters.contains_test(Some("sda")));
739 assert!(!filters.contains_test(Some("sda1")));
740 assert!(filters.contains_test(Some("dm-")));
741 assert!(filters.contains_test(Some("dm-1")));
742 assert!(!filters.contains_test(Some("dm-5")));
743 assert!(!filters.contains_test(Some("xda")));
744 assert!(!filters.contains_test(None));
745 }
746
747 #[tokio::test]
748 async fn filters_on_collectors() {
749 let all_metrics_count = HostMetrics::new(HostMetricsConfig::default())
750 .capture_metrics()
751 .await
752 .len();
753
754 for collector in &[
755 #[cfg(target_os = "linux")]
756 Collector::CGroups,
757 Collector::Cpu,
758 Collector::Process,
759 Collector::Disk,
760 Collector::Filesystem,
761 Collector::Load,
762 Collector::Host,
763 Collector::Memory,
764 Collector::Network,
765 ] {
766 let some_metrics = HostMetrics::new(HostMetricsConfig {
767 collectors: Some(vec![*collector]),
768 ..Default::default()
769 })
770 .capture_metrics()
771 .await;
772
773 assert!(
774 all_metrics_count > some_metrics.len(),
775 "collector={collector:?}"
776 );
777 }
778 }
779
780 #[tokio::test]
781 async fn are_tagged_with_hostname() {
782 let metrics = HostMetrics::new(HostMetricsConfig::default())
783 .capture_metrics()
784 .await;
785 let hostname = crate::get_hostname().expect("Broken hostname");
786 assert!(!metrics.into_iter().any(|event| {
787 event
788 .tags()
789 .expect("Missing tags")
790 .get("host")
791 .expect("Missing \"host\" tag")
792 != hostname
793 }));
794 }
795
796 #[tokio::test]
797 async fn uses_custom_namespace() {
798 let metrics = HostMetrics::new(HostMetricsConfig {
799 namespace: Some("other".into()),
800 ..Default::default()
801 })
802 .capture_metrics()
803 .await;
804
805 assert!(
806 metrics
807 .into_iter()
808 .all(|event| event.namespace() == Some("other"))
809 );
810 }
811
812 #[tokio::test]
813 async fn uses_default_namespace() {
814 let metrics = HostMetrics::new(HostMetricsConfig::default())
815 .capture_metrics()
816 .await;
817
818 assert!(
819 metrics
820 .iter()
821 .all(|event| event.namespace() == Some("host"))
822 );
823 }
824
825 #[cfg(not(windows))]
827 #[tokio::test]
828 async fn generates_loadavg_metrics() {
829 let mut buffer = MetricsBuffer::new(None);
830 HostMetrics::new(HostMetricsConfig::default())
831 .loadavg_metrics(&mut buffer)
832 .await;
833 let metrics = buffer.metrics;
834 assert_eq!(metrics.len(), 3);
835 assert!(all_gauges(&metrics));
836
837 assert!(
839 !metrics
840 .iter()
841 .any(|metric| !metric.name().starts_with("load"))
842 );
843 }
844
845 #[tokio::test]
846 async fn generates_host_metrics() {
847 let mut buffer = MetricsBuffer::new(None);
848 HostMetrics::new(HostMetricsConfig::default())
849 .host_metrics(&mut buffer)
850 .await;
851 let metrics = buffer.metrics;
852 assert_eq!(metrics.len(), 2);
853 assert!(all_gauges(&metrics));
854 }
855
856 pub(super) fn all_counters(metrics: &[Metric]) -> bool {
857 !metrics
858 .iter()
859 .any(|metric| !matches!(metric.value(), &MetricValue::Counter { .. }))
860 }
861
862 pub(super) fn all_gauges(metrics: &[Metric]) -> bool {
863 !metrics
864 .iter()
865 .any(|metric| !matches!(metric.value(), &MetricValue::Gauge { .. }))
866 }
867
868 fn all_tags_match(metrics: &[Metric], tag: &str, matches: impl Fn(&str) -> bool) -> bool {
869 !metrics.iter().any(|metric| {
870 metric
871 .tags()
872 .unwrap()
873 .get(tag)
874 .map(|value| !matches(value))
875 .unwrap_or(false)
876 })
877 }
878
879 pub(super) fn count_name(metrics: &[Metric], name: &str) -> usize {
880 metrics
881 .iter()
882 .filter(|metric| metric.name() == name)
883 .count()
884 }
885
886 pub(super) fn count_tag(metrics: &[Metric], tag: &str) -> usize {
887 metrics
888 .iter()
889 .filter(|metric| {
890 metric
891 .tags()
892 .expect("Metric is missing tags")
893 .contains_key(tag)
894 })
895 .count()
896 }
897
898 fn collect_tag_values(metrics: &[Metric], tag: &str) -> HashSet<String> {
899 metrics
900 .iter()
901 .filter_map(|metric| metric.tags().unwrap().get(tag).map(ToOwned::to_owned))
902 .collect::<HashSet<_>>()
903 }
904
905 pub(super) async fn assert_filtered_metrics<Get, Fut>(tag: &str, get_metrics: Get)
907 where
908 Get: Fn(FilterList) -> Fut,
909 Fut: Future<Output = Vec<Metric>>,
910 {
911 let all_metrics = get_metrics(FilterList::default()).await;
912
913 let keys = collect_tag_values(&all_metrics, tag);
914 if let Some(key) = keys.into_iter().next() {
916 let key_prefix = &key[..key.len() - 1].to_string();
917 let key_prefix_pattern = PatternWrapper::try_from(format!("{key_prefix}*")).unwrap();
918 let key_pattern = PatternWrapper::try_from(key.clone()).unwrap();
919
920 let filter = FilterList {
921 includes: Some(vec![key_pattern.clone()]),
922 excludes: None,
923 };
924 let filtered_metrics_with = get_metrics(filter).await;
925
926 assert!(filtered_metrics_with.len() <= all_metrics.len());
927 assert!(!filtered_metrics_with.is_empty());
928 assert!(all_tags_match(&filtered_metrics_with, tag, |s| s == key));
929
930 let filter = FilterList {
931 includes: Some(vec![key_prefix_pattern.clone()]),
932 excludes: None,
933 };
934 let filtered_metrics_with_match = get_metrics(filter).await;
935
936 assert!(filtered_metrics_with_match.len() >= filtered_metrics_with.len());
937 assert!(all_tags_match(&filtered_metrics_with_match, tag, |s| {
938 s.starts_with(key_prefix)
939 }));
940
941 let filter = FilterList {
942 includes: None,
943 excludes: Some(vec![key_pattern]),
944 };
945 let filtered_metrics_without = get_metrics(filter).await;
946
947 assert!(filtered_metrics_without.len() <= all_metrics.len());
948 assert!(all_tags_match(&filtered_metrics_without, tag, |s| s != key));
949
950 let filter = FilterList {
951 includes: None,
952 excludes: Some(vec![key_prefix_pattern]),
953 };
954 let filtered_metrics_without_match = get_metrics(filter).await;
955
956 assert!(filtered_metrics_without_match.len() <= filtered_metrics_without.len());
957 assert!(all_tags_match(&filtered_metrics_without_match, tag, |s| {
958 !s.starts_with(key_prefix)
959 }));
960
961 assert!(
962 filtered_metrics_with.len() + filtered_metrics_without.len() <= all_metrics.len()
963 );
964 }
965 }
966
967 #[tokio::test]
968 async fn source_compliance() {
969 let config = HostMetricsConfig {
970 scrape_interval_secs: Duration::from_secs(1),
971 ..Default::default()
972 };
973
974 let events =
975 run_and_assert_source_compliance(config, Duration::from_secs(2), &SOURCE_TAGS).await;
976
977 assert!(!events.is_empty());
978 }
979}