1use std::{
2 path::{Path, PathBuf},
3 time::Duration,
4};
5
6use chrono::{DateTime, Utc};
7use futures::StreamExt;
8use glob::{Pattern, PatternError};
9#[cfg(not(windows))]
10use heim::units::ratio::ratio;
11use heim::units::time::second;
12use serde_with::serde_as;
13use sysinfo::System;
14use tokio::time;
15use tokio_stream::wrappers::IntervalStream;
16use vector_lib::config::LogNamespace;
17use vector_lib::configurable::configurable_component;
18use vector_lib::internal_event::{
19 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
20};
21use vector_lib::EstimatedJsonEncodedSizeOf;
22
23use crate::{
24 config::{SourceConfig, SourceContext, SourceOutput},
25 event::metric::{Metric, MetricKind, MetricTags, MetricValue},
26 internal_events::{EventsReceived, HostMetricsScrapeDetailError, StreamClosedError},
27 shutdown::ShutdownSignal,
28 SourceSender,
29};
30
31#[cfg(target_os = "linux")]
32mod cgroups;
33mod cpu;
34mod disk;
35mod filesystem;
36mod memory;
37mod network;
38mod process;
39#[cfg(target_os = "linux")]
40mod tcp;
41
42#[serde_as]
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Eq, PartialEq)]
46#[serde(rename_all = "lowercase")]
47pub enum Collector {
48 CGroups,
52
53 Cpu,
55
56 Process,
58
59 Disk,
61
62 Filesystem,
64
65 Load,
67
68 Host,
70
71 Memory,
73
74 Network,
76
77 TCP,
79}
80
81#[configurable_component]
83#[derive(Clone, Debug, Default)]
84struct FilterList {
85 includes: Option<Vec<PatternWrapper>>,
89
90 excludes: Option<Vec<PatternWrapper>>,
94}
95
96#[serde_as]
98#[configurable_component(source("host_metrics", "Collect metric data from the local system."))]
99#[derive(Clone, Debug, Derivative)]
100#[derivative(Default)]
101#[serde(deny_unknown_fields)]
102pub struct HostMetricsConfig {
103 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
105 #[serde(default = "default_scrape_interval")]
106 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
107 pub scrape_interval_secs: Duration,
108
109 #[configurable(metadata(docs::examples = "example_collectors()"))]
113 #[derivative(Default(value = "default_collectors()"))]
114 #[serde(default = "default_collectors")]
115 pub collectors: Option<Vec<Collector>>,
116
117 #[derivative(Default(value = "default_namespace()"))]
119 #[serde(default = "default_namespace")]
120 pub namespace: Option<String>,
121
122 #[configurable(derived)]
123 #[derivative(Default(value = "default_cgroups_config()"))]
124 #[serde(default = "default_cgroups_config")]
125 pub cgroups: Option<CGroupsConfig>,
126
127 #[configurable(derived)]
128 #[serde(default)]
129 pub disk: disk::DiskConfig,
130
131 #[configurable(derived)]
132 #[serde(default)]
133 pub filesystem: filesystem::FilesystemConfig,
134
135 #[configurable(derived)]
136 #[serde(default)]
137 pub network: network::NetworkConfig,
138
139 #[configurable(derived)]
140 #[serde(default)]
141 pub process: process::ProcessConfig,
142}
143
144#[configurable_component]
148#[derive(Clone, Debug, Derivative)]
149#[derivative(Default)]
150#[serde(default)]
151pub struct CGroupsConfig {
152 #[derivative(Default(value = "default_levels()"))]
156 #[serde(default = "default_levels")]
157 #[configurable(metadata(docs::examples = 1))]
158 #[configurable(metadata(docs::examples = 3))]
159 levels: usize,
160
161 #[configurable(metadata(docs::examples = "/"))]
163 #[configurable(metadata(docs::examples = "system.slice/snapd.service"))]
164 pub(super) base: Option<PathBuf>,
165
166 #[configurable(metadata(docs::examples = "example_cgroups()"))]
169 #[serde(default = "default_all_devices")]
170 groups: FilterList,
171
172 #[serde(skip_serializing)]
174 #[configurable(metadata(docs::hidden))]
175 #[configurable(metadata(docs::human_name = "Base Directory"))]
176 base_dir: Option<PathBuf>,
177}
178
179const fn default_scrape_interval() -> Duration {
180 Duration::from_secs(15)
181}
182
183pub fn default_namespace() -> Option<String> {
184 Some(String::from("host"))
185}
186
187const fn example_collectors() -> [&'static str; 9] {
188 [
189 "cgroups",
190 "cpu",
191 "disk",
192 "filesystem",
193 "load",
194 "host",
195 "memory",
196 "network",
197 "tcp",
198 ]
199}
200
201fn default_collectors() -> Option<Vec<Collector>> {
202 let mut collectors = vec![
203 Collector::Cpu,
204 Collector::Disk,
205 Collector::Filesystem,
206 Collector::Load,
207 Collector::Host,
208 Collector::Memory,
209 Collector::Network,
210 Collector::Process,
211 ];
212
213 #[cfg(target_os = "linux")]
214 {
215 collectors.push(Collector::CGroups);
216 collectors.push(Collector::TCP);
217 }
218 #[cfg(not(target_os = "linux"))]
219 if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
220 collectors.push(Collector::CGroups);
221 collectors.push(Collector::TCP);
222 }
223
224 Some(collectors)
225}
226
227fn example_devices() -> FilterList {
228 FilterList {
229 includes: Some(vec!["sda".try_into().unwrap()]),
230 excludes: Some(vec!["dm-*".try_into().unwrap()]),
231 }
232}
233
234fn default_all_devices() -> FilterList {
235 FilterList {
236 includes: Some(vec!["*".try_into().unwrap()]),
237 excludes: None,
238 }
239}
240
241fn example_processes() -> FilterList {
242 FilterList {
243 includes: Some(vec!["docker".try_into().unwrap()]),
244 excludes: None,
245 }
246}
247
248fn default_all_processes() -> FilterList {
249 FilterList {
250 includes: Some(vec!["*".try_into().unwrap()]),
251 excludes: None,
252 }
253}
254
255const fn default_levels() -> usize {
256 100
257}
258
259fn example_cgroups() -> FilterList {
260 FilterList {
261 includes: Some(vec!["user.slice/*".try_into().unwrap()]),
262 excludes: Some(vec!["*.service".try_into().unwrap()]),
263 }
264}
265
266fn default_cgroups_config() -> Option<CGroupsConfig> {
267 if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
269 return Some(CGroupsConfig::default());
270 }
271
272 #[cfg(not(target_os = "linux"))]
273 {
274 None
275 }
276
277 #[cfg(target_os = "linux")]
278 {
279 Some(CGroupsConfig::default())
280 }
281}
282
283impl_generate_config_from_default!(HostMetricsConfig);
284
285#[async_trait::async_trait]
286#[typetag::serde(name = "host_metrics")]
287impl SourceConfig for HostMetricsConfig {
288 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
289 init_roots();
290
291 #[cfg(not(target_os = "linux"))]
292 {
293 if self.cgroups.is_some() || self.has_collector(Collector::CGroups) {
294 return Err("CGroups collector is only available on Linux systems".into());
295 }
296 if self.has_collector(Collector::TCP) {
297 return Err("TCP collector is only available on Linux systems".into());
298 }
299 }
300
301 let mut config = self.clone();
302 config.namespace = config.namespace.filter(|namespace| !namespace.is_empty());
303
304 Ok(Box::pin(config.run(cx.out, cx.shutdown)))
305 }
306
307 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
308 vec![SourceOutput::new_metrics()]
309 }
310
311 fn can_acknowledge(&self) -> bool {
312 false
313 }
314}
315
316impl HostMetricsConfig {
317 pub fn scrape_interval_secs(&mut self, value: f64) {
319 self.scrape_interval_secs = Duration::from_secs_f64(value);
320 }
321
322 async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
323 let duration = self.scrape_interval_secs;
324 let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
325
326 let mut generator = HostMetrics::new(self);
327
328 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
329
330 while interval.next().await.is_some() {
331 bytes_received.emit(ByteSize(0));
332 let metrics = generator.capture_metrics().await;
333 let count = metrics.len();
334 if (out.send_batch(metrics).await).is_err() {
335 emit!(StreamClosedError { count });
336 return Err(());
337 }
338 }
339
340 Ok(())
341 }
342
343 fn has_collector(&self, collector: Collector) -> bool {
344 match &self.collectors {
345 None => true,
346 Some(collectors) => collectors.contains(&collector),
347 }
348 }
349}
350
351pub struct HostMetrics {
352 config: HostMetricsConfig,
353 system: System,
354 #[cfg(target_os = "linux")]
355 root_cgroup: Option<cgroups::CGroupRoot>,
356 events_received: Registered<EventsReceived>,
357}
358
359impl HostMetrics {
360 #[cfg(not(target_os = "linux"))]
361 pub fn new(config: HostMetricsConfig) -> Self {
362 Self {
363 config,
364 system: System::new(),
365 events_received: register!(EventsReceived),
366 }
367 }
368
369 #[cfg(target_os = "linux")]
370 pub fn new(config: HostMetricsConfig) -> Self {
371 let cgroups = config.cgroups.clone().unwrap_or_default();
372 let root_cgroup = cgroups::CGroupRoot::new(&cgroups);
373 Self {
374 config,
375 system: System::new(),
376 root_cgroup,
377 events_received: register!(EventsReceived),
378 }
379 }
380
381 pub fn buffer(&self) -> MetricsBuffer {
382 MetricsBuffer::new(self.config.namespace.clone())
383 }
384
385 async fn capture_metrics(&mut self) -> Vec<Metric> {
386 let mut buffer = self.buffer();
387
388 #[cfg(target_os = "linux")]
389 if self.config.has_collector(Collector::CGroups) {
390 self.cgroups_metrics(&mut buffer).await;
391 }
392 if self.config.has_collector(Collector::Cpu) {
393 self.cpu_metrics(&mut buffer).await;
394 }
395 if self.config.has_collector(Collector::Process) {
396 self.process_metrics(&mut buffer).await;
397 }
398 if self.config.has_collector(Collector::Disk) {
399 self.disk_metrics(&mut buffer).await;
400 }
401 if self.config.has_collector(Collector::Filesystem) {
402 self.filesystem_metrics(&mut buffer).await;
403 }
404 if self.config.has_collector(Collector::Load) {
405 self.loadavg_metrics(&mut buffer).await;
406 }
407 if self.config.has_collector(Collector::Host) {
408 self.host_metrics(&mut buffer).await;
409 }
410 if self.config.has_collector(Collector::Memory) {
411 self.memory_metrics(&mut buffer).await;
412 self.swap_metrics(&mut buffer).await;
413 }
414 if self.config.has_collector(Collector::Network) {
415 self.network_metrics(&mut buffer).await;
416 }
417 #[cfg(target_os = "linux")]
418 if self.config.has_collector(Collector::TCP) {
419 self.tcp_metrics(&mut buffer).await;
420 }
421
422 let metrics = buffer.metrics;
423 self.events_received.emit(CountByteSize(
424 metrics.len(),
425 metrics.estimated_json_encoded_size_of(),
426 ));
427 metrics
428 }
429
430 pub async fn loadavg_metrics(&self, output: &mut MetricsBuffer) {
431 output.name = "load";
432 #[cfg(unix)]
433 match heim::cpu::os::unix::loadavg().await {
434 Ok(loadavg) => {
435 output.gauge(
436 "load1",
437 loadavg.0.get::<ratio>() as f64,
438 MetricTags::default(),
439 );
440 output.gauge(
441 "load5",
442 loadavg.1.get::<ratio>() as f64,
443 MetricTags::default(),
444 );
445 output.gauge(
446 "load15",
447 loadavg.2.get::<ratio>() as f64,
448 MetricTags::default(),
449 );
450 }
451 Err(error) => {
452 emit!(HostMetricsScrapeDetailError {
453 message: "Failed to load average info",
454 error,
455 });
456 }
457 }
458 }
459
460 pub async fn host_metrics(&self, output: &mut MetricsBuffer) {
461 output.name = "host";
462 match heim::host::uptime().await {
463 Ok(time) => output.gauge("uptime", time.get::<second>(), MetricTags::default()),
464 Err(error) => {
465 emit!(HostMetricsScrapeDetailError {
466 message: "Failed to load host uptime info",
467 error,
468 });
469 }
470 }
471
472 match heim::host::boot_time().await {
473 Ok(time) => output.gauge("boot_time", time.get::<second>(), MetricTags::default()),
474 Err(error) => {
475 emit!(HostMetricsScrapeDetailError {
476 message: "Failed to load host boot time info",
477 error,
478 });
479 }
480 }
481 }
482}
483
484#[derive(Default)]
485pub struct MetricsBuffer {
486 pub metrics: Vec<Metric>,
487 name: &'static str,
488 host: Option<String>,
489 timestamp: DateTime<Utc>,
490 namespace: Option<String>,
491}
492
493impl MetricsBuffer {
494 fn new(namespace: Option<String>) -> Self {
495 Self {
496 metrics: Vec::new(),
497 name: "",
498 host: crate::get_hostname().ok(),
499 timestamp: Utc::now(),
500 namespace,
501 }
502 }
503
504 fn tags(&self, mut tags: MetricTags) -> MetricTags {
505 tags.replace("collector".into(), self.name.to_string());
506 if let Some(host) = &self.host {
507 tags.replace("host".into(), host.clone());
508 }
509 tags
510 }
511
512 fn counter(&mut self, name: &str, value: f64, tags: MetricTags) {
513 self.metrics.push(
514 Metric::new(name, MetricKind::Absolute, MetricValue::Counter { value })
515 .with_namespace(self.namespace.clone())
516 .with_tags(Some(self.tags(tags)))
517 .with_timestamp(Some(self.timestamp)),
518 )
519 }
520
521 fn gauge(&mut self, name: &str, value: f64, tags: MetricTags) {
522 self.metrics.push(
523 Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
524 .with_namespace(self.namespace.clone())
525 .with_tags(Some(self.tags(tags)))
526 .with_timestamp(Some(self.timestamp)),
527 )
528 }
529}
530
531fn filter_result_sync<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
532where
533 E: std::error::Error,
534{
535 result
536 .map_err(|error| emit!(HostMetricsScrapeDetailError { message, error }))
537 .ok()
538}
539
540async fn filter_result<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
541where
542 E: std::error::Error,
543{
544 filter_result_sync(result, message)
545}
546
547#[allow(clippy::missing_const_for_fn)]
548fn init_roots() {
549 #[cfg(target_os = "linux")]
550 {
551 use std::sync::Once;
552
553 static INIT: Once = Once::new();
554
555 INIT.call_once(|| {
556 match std::env::var_os("PROCFS_ROOT") {
557 Some(procfs_root) => {
558 info!(
559 message = "PROCFS_ROOT is set in envvars. Using custom for procfs.",
560 custom = ?procfs_root
561 );
562 heim::os::linux::set_procfs_root(std::path::PathBuf::from(&procfs_root));
563 }
564 None => info!("PROCFS_ROOT is unset. Using default '/proc' for procfs root."),
565 };
566
567 match std::env::var_os("SYSFS_ROOT") {
568 Some(sysfs_root) => {
569 info!(
570 message = "SYSFS_ROOT is set in envvars. Using custom for sysfs.",
571 custom = ?sysfs_root
572 );
573 heim::os::linux::set_sysfs_root(std::path::PathBuf::from(&sysfs_root));
574 }
575 None => info!("SYSFS_ROOT is unset. Using default '/sys' for sysfs root."),
576 }
577 });
578 };
579}
580
581impl FilterList {
582 fn contains<T, M>(&self, value: &Option<T>, matches: M) -> bool
583 where
584 M: Fn(&PatternWrapper, &T) -> bool,
585 {
586 (match (&self.includes, value) {
587 (None, _) => true,
589 (Some(_), None) => false,
591 (Some(includes), Some(value)) => includes.iter().any(|pattern| matches(pattern, value)),
593 }) && match (&self.excludes, value) {
594 (None, _) => true,
596 (Some(_), None) => true,
598 (Some(excludes), Some(value)) => {
600 !excludes.iter().any(|pattern| matches(pattern, value))
601 }
602 }
603 }
604
605 fn contains_str(&self, value: Option<&str>) -> bool {
606 self.contains(&value, |pattern, s| pattern.matches_str(s))
607 }
608
609 fn contains_path(&self, value: Option<&Path>) -> bool {
610 self.contains(&value, |pattern, path| pattern.matches_path(path))
611 }
612
613 #[cfg(test)]
614 fn contains_test(&self, value: Option<&str>) -> bool {
615 let result = self.contains_str(value);
616 assert_eq!(result, self.contains_path(value.map(std::path::Path::new)));
617 result
618 }
619}
620
621#[configurable_component]
638#[derive(Clone, Debug)]
639#[serde(try_from = "String", into = "String")]
640struct PatternWrapper(Pattern);
641
642impl PatternWrapper {
643 fn matches_str(&self, s: &str) -> bool {
644 self.0.matches(s)
645 }
646
647 fn matches_path(&self, p: &Path) -> bool {
648 self.0.matches_path(p)
649 }
650}
651
652impl TryFrom<String> for PatternWrapper {
653 type Error = PatternError;
654
655 fn try_from(value: String) -> Result<Self, Self::Error> {
656 Pattern::new(value.as_ref()).map(PatternWrapper)
657 }
658}
659
660impl TryFrom<&str> for PatternWrapper {
661 type Error = PatternError;
662
663 fn try_from(value: &str) -> Result<Self, Self::Error> {
664 value.to_string().try_into()
665 }
666}
667
668impl From<PatternWrapper> for String {
669 fn from(pattern: PatternWrapper) -> Self {
670 pattern.0.to_string()
671 }
672}
673
674#[cfg(test)]
675mod tests {
676 use crate::test_util::components::{run_and_assert_source_compliance, SOURCE_TAGS};
677 use std::{collections::HashSet, future::Future, time::Duration};
678
679 use super::*;
680
681 #[test]
682 fn filterlist_default_includes_everything() {
683 let filters = FilterList::default();
684 assert!(filters.contains_test(Some("anything")));
685 assert!(filters.contains_test(Some("should")));
686 assert!(filters.contains_test(Some("work")));
687 assert!(filters.contains_test(None));
688 }
689
690 #[test]
691 fn filterlist_includes_works() {
692 let filters = FilterList {
693 includes: Some(vec![
694 PatternWrapper::try_from("sda".to_string()).unwrap(),
695 PatternWrapper::try_from("dm-*".to_string()).unwrap(),
696 ]),
697 excludes: None,
698 };
699 assert!(!filters.contains_test(Some("sd")));
700 assert!(filters.contains_test(Some("sda")));
701 assert!(!filters.contains_test(Some("sda1")));
702 assert!(filters.contains_test(Some("dm-")));
703 assert!(filters.contains_test(Some("dm-5")));
704 assert!(!filters.contains_test(Some("xda")));
705 assert!(!filters.contains_test(None));
706 }
707
708 #[test]
709 fn filterlist_excludes_works() {
710 let filters = FilterList {
711 includes: None,
712 excludes: Some(vec![
713 PatternWrapper::try_from("sda".to_string()).unwrap(),
714 PatternWrapper::try_from("dm-*".to_string()).unwrap(),
715 ]),
716 };
717 assert!(filters.contains_test(Some("sd")));
718 assert!(!filters.contains_test(Some("sda")));
719 assert!(filters.contains_test(Some("sda1")));
720 assert!(!filters.contains_test(Some("dm-")));
721 assert!(!filters.contains_test(Some("dm-5")));
722 assert!(filters.contains_test(Some("xda")));
723 assert!(filters.contains_test(None));
724 }
725
726 #[test]
727 fn filterlist_includes_and_excludes_works() {
728 let filters = FilterList {
729 includes: Some(vec![
730 PatternWrapper::try_from("sda".to_string()).unwrap(),
731 PatternWrapper::try_from("dm-*".to_string()).unwrap(),
732 ]),
733 excludes: Some(vec![PatternWrapper::try_from("dm-5".to_string()).unwrap()]),
734 };
735 assert!(!filters.contains_test(Some("sd")));
736 assert!(filters.contains_test(Some("sda")));
737 assert!(!filters.contains_test(Some("sda1")));
738 assert!(filters.contains_test(Some("dm-")));
739 assert!(filters.contains_test(Some("dm-1")));
740 assert!(!filters.contains_test(Some("dm-5")));
741 assert!(!filters.contains_test(Some("xda")));
742 assert!(!filters.contains_test(None));
743 }
744
745 #[tokio::test]
746 async fn filters_on_collectors() {
747 let all_metrics_count = HostMetrics::new(HostMetricsConfig::default())
748 .capture_metrics()
749 .await
750 .len();
751
752 for collector in &[
753 #[cfg(target_os = "linux")]
754 Collector::CGroups,
755 Collector::Cpu,
756 Collector::Process,
757 Collector::Disk,
758 Collector::Filesystem,
759 Collector::Load,
760 Collector::Host,
761 Collector::Memory,
762 Collector::Network,
763 ] {
764 let some_metrics = HostMetrics::new(HostMetricsConfig {
765 collectors: Some(vec![*collector]),
766 ..Default::default()
767 })
768 .capture_metrics()
769 .await;
770
771 assert!(
772 all_metrics_count > some_metrics.len(),
773 "collector={collector:?}"
774 );
775 }
776 }
777
778 #[tokio::test]
779 async fn are_tagged_with_hostname() {
780 let metrics = HostMetrics::new(HostMetricsConfig::default())
781 .capture_metrics()
782 .await;
783 let hostname = crate::get_hostname().expect("Broken hostname");
784 assert!(!metrics.into_iter().any(|event| event
785 .tags()
786 .expect("Missing tags")
787 .get("host")
788 .expect("Missing \"host\" tag")
789 != hostname));
790 }
791
792 #[tokio::test]
793 async fn uses_custom_namespace() {
794 let metrics = HostMetrics::new(HostMetricsConfig {
795 namespace: Some("other".into()),
796 ..Default::default()
797 })
798 .capture_metrics()
799 .await;
800
801 assert!(metrics
802 .into_iter()
803 .all(|event| event.namespace() == Some("other")));
804 }
805
806 #[tokio::test]
807 async fn uses_default_namespace() {
808 let metrics = HostMetrics::new(HostMetricsConfig::default())
809 .capture_metrics()
810 .await;
811
812 assert!(metrics
813 .iter()
814 .all(|event| event.namespace() == Some("host")));
815 }
816
817 #[cfg(not(windows))]
819 #[tokio::test]
820 async fn generates_loadavg_metrics() {
821 let mut buffer = MetricsBuffer::new(None);
822 HostMetrics::new(HostMetricsConfig::default())
823 .loadavg_metrics(&mut buffer)
824 .await;
825 let metrics = buffer.metrics;
826 assert_eq!(metrics.len(), 3);
827 assert!(all_gauges(&metrics));
828
829 assert!(!metrics
831 .iter()
832 .any(|metric| !metric.name().starts_with("load")));
833 }
834
835 #[tokio::test]
836 async fn generates_host_metrics() {
837 let mut buffer = MetricsBuffer::new(None);
838 HostMetrics::new(HostMetricsConfig::default())
839 .host_metrics(&mut buffer)
840 .await;
841 let metrics = buffer.metrics;
842 assert_eq!(metrics.len(), 2);
843 assert!(all_gauges(&metrics));
844 }
845
846 pub(super) fn all_counters(metrics: &[Metric]) -> bool {
847 !metrics
848 .iter()
849 .any(|metric| !matches!(metric.value(), &MetricValue::Counter { .. }))
850 }
851
852 pub(super) fn all_gauges(metrics: &[Metric]) -> bool {
853 !metrics
854 .iter()
855 .any(|metric| !matches!(metric.value(), &MetricValue::Gauge { .. }))
856 }
857
858 fn all_tags_match(metrics: &[Metric], tag: &str, matches: impl Fn(&str) -> bool) -> bool {
859 !metrics.iter().any(|metric| {
860 metric
861 .tags()
862 .unwrap()
863 .get(tag)
864 .map(|value| !matches(value))
865 .unwrap_or(false)
866 })
867 }
868
869 pub(super) fn count_name(metrics: &[Metric], name: &str) -> usize {
870 metrics
871 .iter()
872 .filter(|metric| metric.name() == name)
873 .count()
874 }
875
876 pub(super) fn count_tag(metrics: &[Metric], tag: &str) -> usize {
877 metrics
878 .iter()
879 .filter(|metric| {
880 metric
881 .tags()
882 .expect("Metric is missing tags")
883 .contains_key(tag)
884 })
885 .count()
886 }
887
888 fn collect_tag_values(metrics: &[Metric], tag: &str) -> HashSet<String> {
889 metrics
890 .iter()
891 .filter_map(|metric| metric.tags().unwrap().get(tag).map(ToOwned::to_owned))
892 .collect::<HashSet<_>>()
893 }
894
895 pub(super) async fn assert_filtered_metrics<Get, Fut>(tag: &str, get_metrics: Get)
897 where
898 Get: Fn(FilterList) -> Fut,
899 Fut: Future<Output = Vec<Metric>>,
900 {
901 let all_metrics = get_metrics(FilterList::default()).await;
902
903 let keys = collect_tag_values(&all_metrics, tag);
904 if let Some(key) = keys.into_iter().next() {
906 let key_prefix = &key[..key.len() - 1].to_string();
907 let key_prefix_pattern = PatternWrapper::try_from(format!("{key_prefix}*")).unwrap();
908 let key_pattern = PatternWrapper::try_from(key.clone()).unwrap();
909
910 let filter = FilterList {
911 includes: Some(vec![key_pattern.clone()]),
912 excludes: None,
913 };
914 let filtered_metrics_with = get_metrics(filter).await;
915
916 assert!(filtered_metrics_with.len() <= all_metrics.len());
917 assert!(!filtered_metrics_with.is_empty());
918 assert!(all_tags_match(&filtered_metrics_with, tag, |s| s == key));
919
920 let filter = FilterList {
921 includes: Some(vec![key_prefix_pattern.clone()]),
922 excludes: None,
923 };
924 let filtered_metrics_with_match = get_metrics(filter).await;
925
926 assert!(filtered_metrics_with_match.len() >= filtered_metrics_with.len());
927 assert!(all_tags_match(&filtered_metrics_with_match, tag, |s| {
928 s.starts_with(key_prefix)
929 }));
930
931 let filter = FilterList {
932 includes: None,
933 excludes: Some(vec![key_pattern]),
934 };
935 let filtered_metrics_without = get_metrics(filter).await;
936
937 assert!(filtered_metrics_without.len() <= all_metrics.len());
938 assert!(all_tags_match(&filtered_metrics_without, tag, |s| s != key));
939
940 let filter = FilterList {
941 includes: None,
942 excludes: Some(vec![key_prefix_pattern]),
943 };
944 let filtered_metrics_without_match = get_metrics(filter).await;
945
946 assert!(filtered_metrics_without_match.len() <= filtered_metrics_without.len());
947 assert!(all_tags_match(&filtered_metrics_without_match, tag, |s| {
948 !s.starts_with(key_prefix)
949 }));
950
951 assert!(
952 filtered_metrics_with.len() + filtered_metrics_without.len() <= all_metrics.len()
953 );
954 }
955 }
956
957 #[tokio::test]
958 async fn source_compliance() {
959 let config = HostMetricsConfig {
960 scrape_interval_secs: Duration::from_secs(1),
961 ..Default::default()
962 };
963
964 let events =
965 run_and_assert_source_compliance(config, Duration::from_secs(2), &SOURCE_TAGS).await;
966
967 assert!(!events.is_empty());
968 }
969}