vector/sources/host_metrics/
mod.rs

1use std::{
2    path::{Path, PathBuf},
3    time::Duration,
4};
5
6use chrono::{DateTime, Utc};
7use futures::StreamExt;
8use glob::{Pattern, PatternError};
9#[cfg(not(windows))]
10use heim::units::ratio::ratio;
11use heim::units::time::second;
12use serde_with::serde_as;
13use sysinfo::System;
14use tokio::time;
15use tokio_stream::wrappers::IntervalStream;
16use vector_lib::{
17    EstimatedJsonEncodedSizeOf,
18    config::LogNamespace,
19    configurable::configurable_component,
20    internal_event::{
21        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
22    },
23};
24
25use crate::{
26    SourceSender,
27    config::{SourceConfig, SourceContext, SourceOutput},
28    event::metric::{Metric, MetricKind, MetricTags, MetricValue},
29    internal_events::{EventsReceived, HostMetricsScrapeDetailError, StreamClosedError},
30    shutdown::ShutdownSignal,
31};
32
33#[cfg(target_os = "linux")]
34mod cgroups;
35mod cpu;
36mod disk;
37mod filesystem;
38mod memory;
39mod network;
40mod process;
41#[cfg(target_os = "linux")]
42mod tcp;
43
44/// Collector types.
45#[serde_as]
46#[configurable_component]
47#[derive(Clone, Copy, Debug, Eq, PartialEq)]
48#[serde(rename_all = "lowercase")]
49pub enum Collector {
50    /// Metrics related to Linux control groups.
51    ///
52    /// Only available on Linux.
53    CGroups,
54
55    /// Metrics related to CPU utilization.
56    Cpu,
57
58    /// Metrics related to Process utilization.
59    Process,
60
61    /// Metrics related to disk I/O utilization.
62    Disk,
63
64    /// Metrics related to filesystem space utilization.
65    Filesystem,
66
67    /// Metrics related to the system load average.
68    Load,
69
70    /// Metrics related to the host.
71    Host,
72
73    /// Metrics related to memory utilization.
74    Memory,
75
76    /// Metrics related to network utilization.
77    Network,
78
79    /// Metrics related to TCP connections.
80    TCP,
81}
82
83/// Filtering configuration.
84#[configurable_component]
85#[derive(Clone, Debug, Default)]
86struct FilterList {
87    /// Any patterns which should be included.
88    ///
89    /// The patterns are matched using globbing.
90    includes: Option<Vec<PatternWrapper>>,
91
92    /// Any patterns which should be excluded.
93    ///
94    /// The patterns are matched using globbing.
95    excludes: Option<Vec<PatternWrapper>>,
96}
97
98/// Configuration for the `host_metrics` source.
99#[serde_as]
100#[configurable_component(source("host_metrics", "Collect metric data from the local system."))]
101#[derive(Clone, Debug, Derivative)]
102#[derivative(Default)]
103#[serde(deny_unknown_fields)]
104pub struct HostMetricsConfig {
105    /// The interval between metric gathering, in seconds.
106    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
107    #[serde(default = "default_scrape_interval")]
108    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
109    pub scrape_interval_secs: Duration,
110
111    /// The list of host metric collector services to use.
112    ///
113    /// Defaults to all collectors.
114    #[configurable(metadata(docs::examples = "example_collectors()"))]
115    #[derivative(Default(value = "default_collectors()"))]
116    #[serde(default = "default_collectors")]
117    pub collectors: Option<Vec<Collector>>,
118
119    /// Overrides the default namespace for the metrics emitted by the source.
120    #[derivative(Default(value = "default_namespace()"))]
121    #[serde(default = "default_namespace")]
122    pub namespace: Option<String>,
123
124    #[configurable(derived)]
125    #[derivative(Default(value = "default_cgroups_config()"))]
126    #[serde(default = "default_cgroups_config")]
127    pub cgroups: Option<CGroupsConfig>,
128
129    #[configurable(derived)]
130    #[serde(default)]
131    pub disk: disk::DiskConfig,
132
133    #[configurable(derived)]
134    #[serde(default)]
135    pub filesystem: filesystem::FilesystemConfig,
136
137    #[configurable(derived)]
138    #[serde(default)]
139    pub network: network::NetworkConfig,
140
141    #[configurable(derived)]
142    #[serde(default)]
143    pub process: process::ProcessConfig,
144}
145
146/// Options for the cgroups (controller groups) metrics collector.
147///
148/// This collector is only available on Linux systems, and only supports either version 2 or hybrid cgroups.
149#[configurable_component]
150#[derive(Clone, Debug, Derivative)]
151#[derivative(Default)]
152#[serde(default)]
153pub struct CGroupsConfig {
154    /// The number of levels of the cgroups hierarchy for which to report metrics.
155    ///
156    /// A value of `1` means the root or named cgroup.
157    #[derivative(Default(value = "default_levels()"))]
158    #[serde(default = "default_levels")]
159    #[configurable(metadata(docs::examples = 1))]
160    #[configurable(metadata(docs::examples = 3))]
161    levels: usize,
162
163    /// The base cgroup name to provide metrics for.
164    #[configurable(metadata(docs::examples = "/"))]
165    #[configurable(metadata(docs::examples = "system.slice/snapd.service"))]
166    pub(super) base: Option<PathBuf>,
167
168    /// Lists of cgroup name patterns to include or exclude in gathering
169    /// usage metrics.
170    #[configurable(metadata(docs::examples = "example_cgroups()"))]
171    #[serde(default = "default_all_devices")]
172    groups: FilterList,
173
174    /// Base cgroup directory, for testing use only
175    #[serde(skip_serializing)]
176    #[configurable(metadata(docs::hidden))]
177    #[configurable(metadata(docs::human_name = "Base Directory"))]
178    base_dir: Option<PathBuf>,
179}
180
181const fn default_scrape_interval() -> Duration {
182    Duration::from_secs(15)
183}
184
185pub fn default_namespace() -> Option<String> {
186    Some(String::from("host"))
187}
188
189const fn example_collectors() -> [&'static str; 9] {
190    [
191        "cgroups",
192        "cpu",
193        "disk",
194        "filesystem",
195        "load",
196        "host",
197        "memory",
198        "network",
199        "tcp",
200    ]
201}
202
203fn default_collectors() -> Option<Vec<Collector>> {
204    let mut collectors = vec![
205        Collector::Cpu,
206        Collector::Disk,
207        Collector::Filesystem,
208        Collector::Load,
209        Collector::Host,
210        Collector::Memory,
211        Collector::Network,
212        Collector::Process,
213    ];
214
215    #[cfg(target_os = "linux")]
216    {
217        collectors.push(Collector::CGroups);
218        collectors.push(Collector::TCP);
219    }
220    #[cfg(not(target_os = "linux"))]
221    if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
222        collectors.push(Collector::CGroups);
223        collectors.push(Collector::TCP);
224    }
225
226    Some(collectors)
227}
228
229fn example_devices() -> FilterList {
230    FilterList {
231        includes: Some(vec!["sda".try_into().unwrap()]),
232        excludes: Some(vec!["dm-*".try_into().unwrap()]),
233    }
234}
235
236fn default_all_devices() -> FilterList {
237    FilterList {
238        includes: Some(vec!["*".try_into().unwrap()]),
239        excludes: None,
240    }
241}
242
243fn example_processes() -> FilterList {
244    FilterList {
245        includes: Some(vec!["docker".try_into().unwrap()]),
246        excludes: None,
247    }
248}
249
250fn default_all_processes() -> FilterList {
251    FilterList {
252        includes: Some(vec!["*".try_into().unwrap()]),
253        excludes: None,
254    }
255}
256
257const fn default_levels() -> usize {
258    100
259}
260
261fn example_cgroups() -> FilterList {
262    FilterList {
263        includes: Some(vec!["user.slice/*".try_into().unwrap()]),
264        excludes: Some(vec!["*.service".try_into().unwrap()]),
265    }
266}
267
268fn default_cgroups_config() -> Option<CGroupsConfig> {
269    // Check env variable to allow generating docs on non-linux systems.
270    if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
271        return Some(CGroupsConfig::default());
272    }
273
274    #[cfg(not(target_os = "linux"))]
275    {
276        None
277    }
278
279    #[cfg(target_os = "linux")]
280    {
281        Some(CGroupsConfig::default())
282    }
283}
284
285impl_generate_config_from_default!(HostMetricsConfig);
286
287#[async_trait::async_trait]
288#[typetag::serde(name = "host_metrics")]
289impl SourceConfig for HostMetricsConfig {
290    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
291        init_roots();
292
293        #[cfg(not(target_os = "linux"))]
294        {
295            if self.cgroups.is_some() || self.has_collector(Collector::CGroups) {
296                return Err("CGroups collector is only available on Linux systems".into());
297            }
298            if self.has_collector(Collector::TCP) {
299                return Err("TCP collector is only available on Linux systems".into());
300            }
301        }
302
303        let mut config = self.clone();
304        config.namespace = config.namespace.filter(|namespace| !namespace.is_empty());
305
306        Ok(Box::pin(config.run(cx.out, cx.shutdown)))
307    }
308
309    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
310        vec![SourceOutput::new_metrics()]
311    }
312
313    fn can_acknowledge(&self) -> bool {
314        false
315    }
316}
317
318impl HostMetricsConfig {
319    /// Set the interval to collect internal metrics.
320    pub fn scrape_interval_secs(&mut self, value: f64) {
321        self.scrape_interval_secs = Duration::from_secs_f64(value);
322    }
323
324    async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
325        let duration = self.scrape_interval_secs;
326        let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
327
328        let mut generator = HostMetrics::new(self);
329
330        let bytes_received = register!(BytesReceived::from(Protocol::NONE));
331
332        while interval.next().await.is_some() {
333            bytes_received.emit(ByteSize(0));
334            let metrics = generator.capture_metrics().await;
335            let count = metrics.len();
336            if (out.send_batch(metrics).await).is_err() {
337                emit!(StreamClosedError { count });
338                return Err(());
339            }
340        }
341
342        Ok(())
343    }
344
345    fn has_collector(&self, collector: Collector) -> bool {
346        match &self.collectors {
347            None => true,
348            Some(collectors) => collectors.contains(&collector),
349        }
350    }
351}
352
353pub struct HostMetrics {
354    config: HostMetricsConfig,
355    system: System,
356    #[cfg(target_os = "linux")]
357    root_cgroup: Option<cgroups::CGroupRoot>,
358    events_received: Registered<EventsReceived>,
359}
360
361impl HostMetrics {
362    #[cfg(not(target_os = "linux"))]
363    pub fn new(config: HostMetricsConfig) -> Self {
364        Self {
365            config,
366            system: System::new(),
367            events_received: register!(EventsReceived),
368        }
369    }
370
371    #[cfg(target_os = "linux")]
372    pub fn new(config: HostMetricsConfig) -> Self {
373        let cgroups = config.cgroups.clone().unwrap_or_default();
374        let root_cgroup = cgroups::CGroupRoot::new(&cgroups);
375        Self {
376            config,
377            system: System::new(),
378            root_cgroup,
379            events_received: register!(EventsReceived),
380        }
381    }
382
383    pub fn buffer(&self) -> MetricsBuffer {
384        MetricsBuffer::new(self.config.namespace.clone())
385    }
386
387    async fn capture_metrics(&mut self) -> Vec<Metric> {
388        let mut buffer = self.buffer();
389
390        #[cfg(target_os = "linux")]
391        if self.config.has_collector(Collector::CGroups) {
392            self.cgroups_metrics(&mut buffer).await;
393        }
394        if self.config.has_collector(Collector::Cpu) {
395            self.cpu_metrics(&mut buffer).await;
396        }
397        if self.config.has_collector(Collector::Process) {
398            self.process_metrics(&mut buffer).await;
399        }
400        if self.config.has_collector(Collector::Disk) {
401            self.disk_metrics(&mut buffer).await;
402        }
403        if self.config.has_collector(Collector::Filesystem) {
404            self.filesystem_metrics(&mut buffer).await;
405        }
406        if self.config.has_collector(Collector::Load) {
407            self.loadavg_metrics(&mut buffer).await;
408        }
409        if self.config.has_collector(Collector::Host) {
410            self.host_metrics(&mut buffer).await;
411        }
412        if self.config.has_collector(Collector::Memory) {
413            self.memory_metrics(&mut buffer).await;
414            self.swap_metrics(&mut buffer).await;
415        }
416        if self.config.has_collector(Collector::Network) {
417            self.network_metrics(&mut buffer).await;
418        }
419        #[cfg(target_os = "linux")]
420        if self.config.has_collector(Collector::TCP) {
421            self.tcp_metrics(&mut buffer).await;
422        }
423
424        let metrics = buffer.metrics;
425        self.events_received.emit(CountByteSize(
426            metrics.len(),
427            metrics.estimated_json_encoded_size_of(),
428        ));
429        metrics
430    }
431
432    pub async fn loadavg_metrics(&self, output: &mut MetricsBuffer) {
433        output.name = "load";
434        #[cfg(unix)]
435        match heim::cpu::os::unix::loadavg().await {
436            Ok(loadavg) => {
437                output.gauge(
438                    "load1",
439                    loadavg.0.get::<ratio>() as f64,
440                    MetricTags::default(),
441                );
442                output.gauge(
443                    "load5",
444                    loadavg.1.get::<ratio>() as f64,
445                    MetricTags::default(),
446                );
447                output.gauge(
448                    "load15",
449                    loadavg.2.get::<ratio>() as f64,
450                    MetricTags::default(),
451                );
452            }
453            Err(error) => {
454                emit!(HostMetricsScrapeDetailError {
455                    message: "Failed to load average info",
456                    error,
457                });
458            }
459        }
460    }
461
462    pub async fn host_metrics(&self, output: &mut MetricsBuffer) {
463        output.name = "host";
464        match heim::host::uptime().await {
465            Ok(time) => output.gauge("uptime", time.get::<second>(), MetricTags::default()),
466            Err(error) => {
467                emit!(HostMetricsScrapeDetailError {
468                    message: "Failed to load host uptime info",
469                    error,
470                });
471            }
472        }
473
474        match heim::host::boot_time().await {
475            Ok(time) => output.gauge("boot_time", time.get::<second>(), MetricTags::default()),
476            Err(error) => {
477                emit!(HostMetricsScrapeDetailError {
478                    message: "Failed to load host boot time info",
479                    error,
480                });
481            }
482        }
483    }
484}
485
486#[derive(Default)]
487pub struct MetricsBuffer {
488    pub metrics: Vec<Metric>,
489    name: &'static str,
490    host: Option<String>,
491    timestamp: DateTime<Utc>,
492    namespace: Option<String>,
493}
494
495impl MetricsBuffer {
496    fn new(namespace: Option<String>) -> Self {
497        Self {
498            metrics: Vec::new(),
499            name: "",
500            host: crate::get_hostname().ok(),
501            timestamp: Utc::now(),
502            namespace,
503        }
504    }
505
506    fn tags(&self, mut tags: MetricTags) -> MetricTags {
507        tags.replace("collector".into(), self.name.to_string());
508        if let Some(host) = &self.host {
509            tags.replace("host".into(), host.clone());
510        }
511        tags
512    }
513
514    fn counter(&mut self, name: &str, value: f64, tags: MetricTags) {
515        self.metrics.push(
516            Metric::new(name, MetricKind::Absolute, MetricValue::Counter { value })
517                .with_namespace(self.namespace.clone())
518                .with_tags(Some(self.tags(tags)))
519                .with_timestamp(Some(self.timestamp)),
520        )
521    }
522
523    fn gauge(&mut self, name: &str, value: f64, tags: MetricTags) {
524        self.metrics.push(
525            Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
526                .with_namespace(self.namespace.clone())
527                .with_tags(Some(self.tags(tags)))
528                .with_timestamp(Some(self.timestamp)),
529        )
530    }
531}
532
533fn filter_result_sync<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
534where
535    E: std::error::Error,
536{
537    result
538        .map_err(|error| emit!(HostMetricsScrapeDetailError { message, error }))
539        .ok()
540}
541
542async fn filter_result<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
543where
544    E: std::error::Error,
545{
546    filter_result_sync(result, message)
547}
548
549#[allow(clippy::missing_const_for_fn)]
550fn init_roots() {
551    #[cfg(target_os = "linux")]
552    {
553        use std::sync::Once;
554
555        static INIT: Once = Once::new();
556
557        INIT.call_once(|| {
558            match std::env::var_os("PROCFS_ROOT") {
559                Some(procfs_root) => {
560                    info!(
561                        message = "PROCFS_ROOT is set in envvars. Using custom for procfs.",
562                        custom = ?procfs_root
563                    );
564                    heim::os::linux::set_procfs_root(std::path::PathBuf::from(&procfs_root));
565                }
566                None => info!("PROCFS_ROOT is unset. Using default '/proc' for procfs root."),
567            };
568
569            match std::env::var_os("SYSFS_ROOT") {
570                Some(sysfs_root) => {
571                    info!(
572                        message = "SYSFS_ROOT is set in envvars. Using custom for sysfs.",
573                        custom = ?sysfs_root
574                    );
575                    heim::os::linux::set_sysfs_root(std::path::PathBuf::from(&sysfs_root));
576                }
577                None => info!("SYSFS_ROOT is unset. Using default '/sys' for sysfs root."),
578            }
579        });
580    };
581}
582
583impl FilterList {
584    fn contains<T, M>(&self, value: &Option<T>, matches: M) -> bool
585    where
586        M: Fn(&PatternWrapper, &T) -> bool,
587    {
588        (match (&self.includes, value) {
589            // No includes list includes everything
590            (None, _) => true,
591            // Includes list matched against empty value returns false
592            (Some(_), None) => false,
593            // Otherwise find the given value
594            (Some(includes), Some(value)) => includes.iter().any(|pattern| matches(pattern, value)),
595        }) && match (&self.excludes, value) {
596            // No excludes, list excludes nothing
597            (None, _) => true,
598            // No value, never excluded
599            (Some(_), None) => true,
600            // Otherwise find the given value
601            (Some(excludes), Some(value)) => {
602                !excludes.iter().any(|pattern| matches(pattern, value))
603            }
604        }
605    }
606
607    fn contains_str(&self, value: Option<&str>) -> bool {
608        self.contains(&value, |pattern, s| pattern.matches_str(s))
609    }
610
611    fn contains_path(&self, value: Option<&Path>) -> bool {
612        self.contains(&value, |pattern, path| pattern.matches_path(path))
613    }
614
615    #[cfg(test)]
616    fn contains_test(&self, value: Option<&str>) -> bool {
617        let result = self.contains_str(value);
618        assert_eq!(result, self.contains_path(value.map(std::path::Path::new)));
619        result
620    }
621}
622
623/// A compiled Unix shell-style pattern.
624///
625/// - `?` matches any single character.
626/// - `*` matches any (possibly empty) sequence of characters.
627/// - `**` matches the current directory and arbitrary subdirectories. This sequence must form a single path component,
628///   so both `**a` and `b**` are invalid and will result in an error. A sequence of more than two consecutive `*`
629///   characters is also invalid.
630/// - `[...]` matches any character inside the brackets. Character sequences can also specify ranges of characters, as
631///   ordered by Unicode, so e.g. `[0-9]` specifies any character between 0 and 9 inclusive. An unclosed bracket is
632///   invalid.
633/// - `[!...]` is the negation of `[...]`, i.e. it matches any characters not in the brackets.
634///
635/// The metacharacters `?`, `*`, `[`, `]` can be matched by using brackets (e.g. `[?]`). When a `]` occurs immediately
636/// following `[` or `[!` then it is interpreted as being part of, rather then ending, the character set, so `]` and NOT
637/// `]` can be matched by `[]]` and `[!]]` respectively. The `-` character can be specified inside a character sequence
638/// pattern by placing it at the start or the end, e.g. `[abc-]`.
639#[configurable_component]
640#[derive(Clone, Debug)]
641#[serde(try_from = "String", into = "String")]
642struct PatternWrapper(Pattern);
643
644impl PatternWrapper {
645    fn matches_str(&self, s: &str) -> bool {
646        self.0.matches(s)
647    }
648
649    fn matches_path(&self, p: &Path) -> bool {
650        self.0.matches_path(p)
651    }
652}
653
654impl TryFrom<String> for PatternWrapper {
655    type Error = PatternError;
656
657    fn try_from(value: String) -> Result<Self, Self::Error> {
658        Pattern::new(value.as_ref()).map(PatternWrapper)
659    }
660}
661
662impl TryFrom<&str> for PatternWrapper {
663    type Error = PatternError;
664
665    fn try_from(value: &str) -> Result<Self, Self::Error> {
666        value.to_string().try_into()
667    }
668}
669
670impl From<PatternWrapper> for String {
671    fn from(pattern: PatternWrapper) -> Self {
672        pattern.0.to_string()
673    }
674}
675
676#[cfg(test)]
677mod tests {
678    use std::{collections::HashSet, future::Future, time::Duration};
679
680    use super::*;
681    use crate::test_util::components::{SOURCE_TAGS, run_and_assert_source_compliance};
682
683    #[test]
684    fn filterlist_default_includes_everything() {
685        let filters = FilterList::default();
686        assert!(filters.contains_test(Some("anything")));
687        assert!(filters.contains_test(Some("should")));
688        assert!(filters.contains_test(Some("work")));
689        assert!(filters.contains_test(None));
690    }
691
692    #[test]
693    fn filterlist_includes_works() {
694        let filters = FilterList {
695            includes: Some(vec![
696                PatternWrapper::try_from("sda".to_string()).unwrap(),
697                PatternWrapper::try_from("dm-*".to_string()).unwrap(),
698            ]),
699            excludes: None,
700        };
701        assert!(!filters.contains_test(Some("sd")));
702        assert!(filters.contains_test(Some("sda")));
703        assert!(!filters.contains_test(Some("sda1")));
704        assert!(filters.contains_test(Some("dm-")));
705        assert!(filters.contains_test(Some("dm-5")));
706        assert!(!filters.contains_test(Some("xda")));
707        assert!(!filters.contains_test(None));
708    }
709
710    #[test]
711    fn filterlist_excludes_works() {
712        let filters = FilterList {
713            includes: None,
714            excludes: Some(vec![
715                PatternWrapper::try_from("sda".to_string()).unwrap(),
716                PatternWrapper::try_from("dm-*".to_string()).unwrap(),
717            ]),
718        };
719        assert!(filters.contains_test(Some("sd")));
720        assert!(!filters.contains_test(Some("sda")));
721        assert!(filters.contains_test(Some("sda1")));
722        assert!(!filters.contains_test(Some("dm-")));
723        assert!(!filters.contains_test(Some("dm-5")));
724        assert!(filters.contains_test(Some("xda")));
725        assert!(filters.contains_test(None));
726    }
727
728    #[test]
729    fn filterlist_includes_and_excludes_works() {
730        let filters = FilterList {
731            includes: Some(vec![
732                PatternWrapper::try_from("sda".to_string()).unwrap(),
733                PatternWrapper::try_from("dm-*".to_string()).unwrap(),
734            ]),
735            excludes: Some(vec![PatternWrapper::try_from("dm-5".to_string()).unwrap()]),
736        };
737        assert!(!filters.contains_test(Some("sd")));
738        assert!(filters.contains_test(Some("sda")));
739        assert!(!filters.contains_test(Some("sda1")));
740        assert!(filters.contains_test(Some("dm-")));
741        assert!(filters.contains_test(Some("dm-1")));
742        assert!(!filters.contains_test(Some("dm-5")));
743        assert!(!filters.contains_test(Some("xda")));
744        assert!(!filters.contains_test(None));
745    }
746
747    #[tokio::test]
748    async fn filters_on_collectors() {
749        let all_metrics_count = HostMetrics::new(HostMetricsConfig::default())
750            .capture_metrics()
751            .await
752            .len();
753
754        for collector in &[
755            #[cfg(target_os = "linux")]
756            Collector::CGroups,
757            Collector::Cpu,
758            Collector::Process,
759            Collector::Disk,
760            Collector::Filesystem,
761            Collector::Load,
762            Collector::Host,
763            Collector::Memory,
764            Collector::Network,
765        ] {
766            let some_metrics = HostMetrics::new(HostMetricsConfig {
767                collectors: Some(vec![*collector]),
768                ..Default::default()
769            })
770            .capture_metrics()
771            .await;
772
773            assert!(
774                all_metrics_count > some_metrics.len(),
775                "collector={collector:?}"
776            );
777        }
778    }
779
780    #[tokio::test]
781    async fn are_tagged_with_hostname() {
782        let metrics = HostMetrics::new(HostMetricsConfig::default())
783            .capture_metrics()
784            .await;
785        let hostname = crate::get_hostname().expect("Broken hostname");
786        assert!(!metrics.into_iter().any(|event| {
787            event
788                .tags()
789                .expect("Missing tags")
790                .get("host")
791                .expect("Missing \"host\" tag")
792                != hostname
793        }));
794    }
795
796    #[tokio::test]
797    async fn uses_custom_namespace() {
798        let metrics = HostMetrics::new(HostMetricsConfig {
799            namespace: Some("other".into()),
800            ..Default::default()
801        })
802        .capture_metrics()
803        .await;
804
805        assert!(
806            metrics
807                .into_iter()
808                .all(|event| event.namespace() == Some("other"))
809        );
810    }
811
812    #[tokio::test]
813    async fn uses_default_namespace() {
814        let metrics = HostMetrics::new(HostMetricsConfig::default())
815            .capture_metrics()
816            .await;
817
818        assert!(
819            metrics
820                .iter()
821                .all(|event| event.namespace() == Some("host"))
822        );
823    }
824
825    // Windows does not produce load average metrics.
826    #[cfg(not(windows))]
827    #[tokio::test]
828    async fn generates_loadavg_metrics() {
829        let mut buffer = MetricsBuffer::new(None);
830        HostMetrics::new(HostMetricsConfig::default())
831            .loadavg_metrics(&mut buffer)
832            .await;
833        let metrics = buffer.metrics;
834        assert_eq!(metrics.len(), 3);
835        assert!(all_gauges(&metrics));
836
837        // All metrics are named load*
838        assert!(
839            !metrics
840                .iter()
841                .any(|metric| !metric.name().starts_with("load"))
842        );
843    }
844
845    #[tokio::test]
846    async fn generates_host_metrics() {
847        let mut buffer = MetricsBuffer::new(None);
848        HostMetrics::new(HostMetricsConfig::default())
849            .host_metrics(&mut buffer)
850            .await;
851        let metrics = buffer.metrics;
852        assert_eq!(metrics.len(), 2);
853        assert!(all_gauges(&metrics));
854    }
855
856    pub(super) fn all_counters(metrics: &[Metric]) -> bool {
857        !metrics
858            .iter()
859            .any(|metric| !matches!(metric.value(), &MetricValue::Counter { .. }))
860    }
861
862    pub(super) fn all_gauges(metrics: &[Metric]) -> bool {
863        !metrics
864            .iter()
865            .any(|metric| !matches!(metric.value(), &MetricValue::Gauge { .. }))
866    }
867
868    fn all_tags_match(metrics: &[Metric], tag: &str, matches: impl Fn(&str) -> bool) -> bool {
869        !metrics.iter().any(|metric| {
870            metric
871                .tags()
872                .unwrap()
873                .get(tag)
874                .map(|value| !matches(value))
875                .unwrap_or(false)
876        })
877    }
878
879    pub(super) fn count_name(metrics: &[Metric], name: &str) -> usize {
880        metrics
881            .iter()
882            .filter(|metric| metric.name() == name)
883            .count()
884    }
885
886    pub(super) fn count_tag(metrics: &[Metric], tag: &str) -> usize {
887        metrics
888            .iter()
889            .filter(|metric| {
890                metric
891                    .tags()
892                    .expect("Metric is missing tags")
893                    .contains_key(tag)
894            })
895            .count()
896    }
897
898    fn collect_tag_values(metrics: &[Metric], tag: &str) -> HashSet<String> {
899        metrics
900            .iter()
901            .filter_map(|metric| metric.tags().unwrap().get(tag).map(ToOwned::to_owned))
902            .collect::<HashSet<_>>()
903    }
904
905    // Run a series of tests using filters to ensure they are obeyed
906    pub(super) async fn assert_filtered_metrics<Get, Fut>(tag: &str, get_metrics: Get)
907    where
908        Get: Fn(FilterList) -> Fut,
909        Fut: Future<Output = Vec<Metric>>,
910    {
911        let all_metrics = get_metrics(FilterList::default()).await;
912
913        let keys = collect_tag_values(&all_metrics, tag);
914        // Pick an arbitrary key value
915        if let Some(key) = keys.into_iter().next() {
916            let key_prefix = &key[..key.len() - 1].to_string();
917            let key_prefix_pattern = PatternWrapper::try_from(format!("{key_prefix}*")).unwrap();
918            let key_pattern = PatternWrapper::try_from(key.clone()).unwrap();
919
920            let filter = FilterList {
921                includes: Some(vec![key_pattern.clone()]),
922                excludes: None,
923            };
924            let filtered_metrics_with = get_metrics(filter).await;
925
926            assert!(filtered_metrics_with.len() <= all_metrics.len());
927            assert!(!filtered_metrics_with.is_empty());
928            assert!(all_tags_match(&filtered_metrics_with, tag, |s| s == key));
929
930            let filter = FilterList {
931                includes: Some(vec![key_prefix_pattern.clone()]),
932                excludes: None,
933            };
934            let filtered_metrics_with_match = get_metrics(filter).await;
935
936            assert!(filtered_metrics_with_match.len() >= filtered_metrics_with.len());
937            assert!(all_tags_match(&filtered_metrics_with_match, tag, |s| {
938                s.starts_with(key_prefix)
939            }));
940
941            let filter = FilterList {
942                includes: None,
943                excludes: Some(vec![key_pattern]),
944            };
945            let filtered_metrics_without = get_metrics(filter).await;
946
947            assert!(filtered_metrics_without.len() <= all_metrics.len());
948            assert!(all_tags_match(&filtered_metrics_without, tag, |s| s != key));
949
950            let filter = FilterList {
951                includes: None,
952                excludes: Some(vec![key_prefix_pattern]),
953            };
954            let filtered_metrics_without_match = get_metrics(filter).await;
955
956            assert!(filtered_metrics_without_match.len() <= filtered_metrics_without.len());
957            assert!(all_tags_match(&filtered_metrics_without_match, tag, |s| {
958                !s.starts_with(key_prefix)
959            }));
960
961            assert!(
962                filtered_metrics_with.len() + filtered_metrics_without.len() <= all_metrics.len()
963            );
964        }
965    }
966
967    #[tokio::test]
968    async fn source_compliance() {
969        let config = HostMetricsConfig {
970            scrape_interval_secs: Duration::from_secs(1),
971            ..Default::default()
972        };
973
974        let events =
975            run_and_assert_source_compliance(config, Duration::from_secs(2), &SOURCE_TAGS).await;
976
977        assert!(!events.is_empty());
978    }
979}