vector/sources/host_metrics/
mod.rs

1use std::{
2    path::{Path, PathBuf},
3    time::Duration,
4};
5
6use chrono::{DateTime, Utc};
7use futures::StreamExt;
8use glob::{Pattern, PatternError};
9#[cfg(not(windows))]
10use heim::units::ratio::ratio;
11use heim::units::time::second;
12use serde_with::serde_as;
13use sysinfo::System;
14use tokio::time;
15use tokio_stream::wrappers::IntervalStream;
16use vector_lib::config::LogNamespace;
17use vector_lib::configurable::configurable_component;
18use vector_lib::internal_event::{
19    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
20};
21use vector_lib::EstimatedJsonEncodedSizeOf;
22
23use crate::{
24    config::{SourceConfig, SourceContext, SourceOutput},
25    event::metric::{Metric, MetricKind, MetricTags, MetricValue},
26    internal_events::{EventsReceived, HostMetricsScrapeDetailError, StreamClosedError},
27    shutdown::ShutdownSignal,
28    SourceSender,
29};
30
31#[cfg(target_os = "linux")]
32mod cgroups;
33mod cpu;
34mod disk;
35mod filesystem;
36mod memory;
37mod network;
38mod process;
39#[cfg(target_os = "linux")]
40mod tcp;
41
42/// Collector types.
43#[serde_as]
44#[configurable_component]
45#[derive(Clone, Copy, Debug, Eq, PartialEq)]
46#[serde(rename_all = "lowercase")]
47pub enum Collector {
48    /// Metrics related to Linux control groups.
49    ///
50    /// Only available on Linux.
51    CGroups,
52
53    /// Metrics related to CPU utilization.
54    Cpu,
55
56    /// Metrics related to Process utilization.
57    Process,
58
59    /// Metrics related to disk I/O utilization.
60    Disk,
61
62    /// Metrics related to filesystem space utilization.
63    Filesystem,
64
65    /// Metrics related to the system load average.
66    Load,
67
68    /// Metrics related to the host.
69    Host,
70
71    /// Metrics related to memory utilization.
72    Memory,
73
74    /// Metrics related to network utilization.
75    Network,
76
77    /// Metrics related to TCP connections.
78    TCP,
79}
80
81/// Filtering configuration.
82#[configurable_component]
83#[derive(Clone, Debug, Default)]
84struct FilterList {
85    /// Any patterns which should be included.
86    ///
87    /// The patterns are matched using globbing.
88    includes: Option<Vec<PatternWrapper>>,
89
90    /// Any patterns which should be excluded.
91    ///
92    /// The patterns are matched using globbing.
93    excludes: Option<Vec<PatternWrapper>>,
94}
95
96/// Configuration for the `host_metrics` source.
97#[serde_as]
98#[configurable_component(source("host_metrics", "Collect metric data from the local system."))]
99#[derive(Clone, Debug, Derivative)]
100#[derivative(Default)]
101#[serde(deny_unknown_fields)]
102pub struct HostMetricsConfig {
103    /// The interval between metric gathering, in seconds.
104    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
105    #[serde(default = "default_scrape_interval")]
106    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
107    pub scrape_interval_secs: Duration,
108
109    /// The list of host metric collector services to use.
110    ///
111    /// Defaults to all collectors.
112    #[configurable(metadata(docs::examples = "example_collectors()"))]
113    #[derivative(Default(value = "default_collectors()"))]
114    #[serde(default = "default_collectors")]
115    pub collectors: Option<Vec<Collector>>,
116
117    /// Overrides the default namespace for the metrics emitted by the source.
118    #[derivative(Default(value = "default_namespace()"))]
119    #[serde(default = "default_namespace")]
120    pub namespace: Option<String>,
121
122    #[configurable(derived)]
123    #[derivative(Default(value = "default_cgroups_config()"))]
124    #[serde(default = "default_cgroups_config")]
125    pub cgroups: Option<CGroupsConfig>,
126
127    #[configurable(derived)]
128    #[serde(default)]
129    pub disk: disk::DiskConfig,
130
131    #[configurable(derived)]
132    #[serde(default)]
133    pub filesystem: filesystem::FilesystemConfig,
134
135    #[configurable(derived)]
136    #[serde(default)]
137    pub network: network::NetworkConfig,
138
139    #[configurable(derived)]
140    #[serde(default)]
141    pub process: process::ProcessConfig,
142}
143
144/// Options for the cgroups (controller groups) metrics collector.
145///
146/// This collector is only available on Linux systems, and only supports either version 2 or hybrid cgroups.
147#[configurable_component]
148#[derive(Clone, Debug, Derivative)]
149#[derivative(Default)]
150#[serde(default)]
151pub struct CGroupsConfig {
152    /// The number of levels of the cgroups hierarchy for which to report metrics.
153    ///
154    /// A value of `1` means the root or named cgroup.
155    #[derivative(Default(value = "default_levels()"))]
156    #[serde(default = "default_levels")]
157    #[configurable(metadata(docs::examples = 1))]
158    #[configurable(metadata(docs::examples = 3))]
159    levels: usize,
160
161    /// The base cgroup name to provide metrics for.
162    #[configurable(metadata(docs::examples = "/"))]
163    #[configurable(metadata(docs::examples = "system.slice/snapd.service"))]
164    pub(super) base: Option<PathBuf>,
165
166    /// Lists of cgroup name patterns to include or exclude in gathering
167    /// usage metrics.
168    #[configurable(metadata(docs::examples = "example_cgroups()"))]
169    #[serde(default = "default_all_devices")]
170    groups: FilterList,
171
172    /// Base cgroup directory, for testing use only
173    #[serde(skip_serializing)]
174    #[configurable(metadata(docs::hidden))]
175    #[configurable(metadata(docs::human_name = "Base Directory"))]
176    base_dir: Option<PathBuf>,
177}
178
179const fn default_scrape_interval() -> Duration {
180    Duration::from_secs(15)
181}
182
183pub fn default_namespace() -> Option<String> {
184    Some(String::from("host"))
185}
186
187const fn example_collectors() -> [&'static str; 9] {
188    [
189        "cgroups",
190        "cpu",
191        "disk",
192        "filesystem",
193        "load",
194        "host",
195        "memory",
196        "network",
197        "tcp",
198    ]
199}
200
201fn default_collectors() -> Option<Vec<Collector>> {
202    let mut collectors = vec![
203        Collector::Cpu,
204        Collector::Disk,
205        Collector::Filesystem,
206        Collector::Load,
207        Collector::Host,
208        Collector::Memory,
209        Collector::Network,
210        Collector::Process,
211    ];
212
213    #[cfg(target_os = "linux")]
214    {
215        collectors.push(Collector::CGroups);
216        collectors.push(Collector::TCP);
217    }
218    #[cfg(not(target_os = "linux"))]
219    if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
220        collectors.push(Collector::CGroups);
221        collectors.push(Collector::TCP);
222    }
223
224    Some(collectors)
225}
226
227fn example_devices() -> FilterList {
228    FilterList {
229        includes: Some(vec!["sda".try_into().unwrap()]),
230        excludes: Some(vec!["dm-*".try_into().unwrap()]),
231    }
232}
233
234fn default_all_devices() -> FilterList {
235    FilterList {
236        includes: Some(vec!["*".try_into().unwrap()]),
237        excludes: None,
238    }
239}
240
241fn example_processes() -> FilterList {
242    FilterList {
243        includes: Some(vec!["docker".try_into().unwrap()]),
244        excludes: None,
245    }
246}
247
248fn default_all_processes() -> FilterList {
249    FilterList {
250        includes: Some(vec!["*".try_into().unwrap()]),
251        excludes: None,
252    }
253}
254
255const fn default_levels() -> usize {
256    100
257}
258
259fn example_cgroups() -> FilterList {
260    FilterList {
261        includes: Some(vec!["user.slice/*".try_into().unwrap()]),
262        excludes: Some(vec!["*.service".try_into().unwrap()]),
263    }
264}
265
266fn default_cgroups_config() -> Option<CGroupsConfig> {
267    // Check env variable to allow generating docs on non-linux systems.
268    if std::env::var("VECTOR_GENERATE_SCHEMA").is_ok() {
269        return Some(CGroupsConfig::default());
270    }
271
272    #[cfg(not(target_os = "linux"))]
273    {
274        None
275    }
276
277    #[cfg(target_os = "linux")]
278    {
279        Some(CGroupsConfig::default())
280    }
281}
282
283impl_generate_config_from_default!(HostMetricsConfig);
284
285#[async_trait::async_trait]
286#[typetag::serde(name = "host_metrics")]
287impl SourceConfig for HostMetricsConfig {
288    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
289        init_roots();
290
291        #[cfg(not(target_os = "linux"))]
292        {
293            if self.cgroups.is_some() || self.has_collector(Collector::CGroups) {
294                return Err("CGroups collector is only available on Linux systems".into());
295            }
296            if self.has_collector(Collector::TCP) {
297                return Err("TCP collector is only available on Linux systems".into());
298            }
299        }
300
301        let mut config = self.clone();
302        config.namespace = config.namespace.filter(|namespace| !namespace.is_empty());
303
304        Ok(Box::pin(config.run(cx.out, cx.shutdown)))
305    }
306
307    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
308        vec![SourceOutput::new_metrics()]
309    }
310
311    fn can_acknowledge(&self) -> bool {
312        false
313    }
314}
315
316impl HostMetricsConfig {
317    /// Set the interval to collect internal metrics.
318    pub fn scrape_interval_secs(&mut self, value: f64) {
319        self.scrape_interval_secs = Duration::from_secs_f64(value);
320    }
321
322    async fn run(self, mut out: SourceSender, shutdown: ShutdownSignal) -> Result<(), ()> {
323        let duration = self.scrape_interval_secs;
324        let mut interval = IntervalStream::new(time::interval(duration)).take_until(shutdown);
325
326        let mut generator = HostMetrics::new(self);
327
328        let bytes_received = register!(BytesReceived::from(Protocol::NONE));
329
330        while interval.next().await.is_some() {
331            bytes_received.emit(ByteSize(0));
332            let metrics = generator.capture_metrics().await;
333            let count = metrics.len();
334            if (out.send_batch(metrics).await).is_err() {
335                emit!(StreamClosedError { count });
336                return Err(());
337            }
338        }
339
340        Ok(())
341    }
342
343    fn has_collector(&self, collector: Collector) -> bool {
344        match &self.collectors {
345            None => true,
346            Some(collectors) => collectors.contains(&collector),
347        }
348    }
349}
350
351pub struct HostMetrics {
352    config: HostMetricsConfig,
353    system: System,
354    #[cfg(target_os = "linux")]
355    root_cgroup: Option<cgroups::CGroupRoot>,
356    events_received: Registered<EventsReceived>,
357}
358
359impl HostMetrics {
360    #[cfg(not(target_os = "linux"))]
361    pub fn new(config: HostMetricsConfig) -> Self {
362        Self {
363            config,
364            system: System::new(),
365            events_received: register!(EventsReceived),
366        }
367    }
368
369    #[cfg(target_os = "linux")]
370    pub fn new(config: HostMetricsConfig) -> Self {
371        let cgroups = config.cgroups.clone().unwrap_or_default();
372        let root_cgroup = cgroups::CGroupRoot::new(&cgroups);
373        Self {
374            config,
375            system: System::new(),
376            root_cgroup,
377            events_received: register!(EventsReceived),
378        }
379    }
380
381    pub fn buffer(&self) -> MetricsBuffer {
382        MetricsBuffer::new(self.config.namespace.clone())
383    }
384
385    async fn capture_metrics(&mut self) -> Vec<Metric> {
386        let mut buffer = self.buffer();
387
388        #[cfg(target_os = "linux")]
389        if self.config.has_collector(Collector::CGroups) {
390            self.cgroups_metrics(&mut buffer).await;
391        }
392        if self.config.has_collector(Collector::Cpu) {
393            self.cpu_metrics(&mut buffer).await;
394        }
395        if self.config.has_collector(Collector::Process) {
396            self.process_metrics(&mut buffer).await;
397        }
398        if self.config.has_collector(Collector::Disk) {
399            self.disk_metrics(&mut buffer).await;
400        }
401        if self.config.has_collector(Collector::Filesystem) {
402            self.filesystem_metrics(&mut buffer).await;
403        }
404        if self.config.has_collector(Collector::Load) {
405            self.loadavg_metrics(&mut buffer).await;
406        }
407        if self.config.has_collector(Collector::Host) {
408            self.host_metrics(&mut buffer).await;
409        }
410        if self.config.has_collector(Collector::Memory) {
411            self.memory_metrics(&mut buffer).await;
412            self.swap_metrics(&mut buffer).await;
413        }
414        if self.config.has_collector(Collector::Network) {
415            self.network_metrics(&mut buffer).await;
416        }
417        #[cfg(target_os = "linux")]
418        if self.config.has_collector(Collector::TCP) {
419            self.tcp_metrics(&mut buffer).await;
420        }
421
422        let metrics = buffer.metrics;
423        self.events_received.emit(CountByteSize(
424            metrics.len(),
425            metrics.estimated_json_encoded_size_of(),
426        ));
427        metrics
428    }
429
430    pub async fn loadavg_metrics(&self, output: &mut MetricsBuffer) {
431        output.name = "load";
432        #[cfg(unix)]
433        match heim::cpu::os::unix::loadavg().await {
434            Ok(loadavg) => {
435                output.gauge(
436                    "load1",
437                    loadavg.0.get::<ratio>() as f64,
438                    MetricTags::default(),
439                );
440                output.gauge(
441                    "load5",
442                    loadavg.1.get::<ratio>() as f64,
443                    MetricTags::default(),
444                );
445                output.gauge(
446                    "load15",
447                    loadavg.2.get::<ratio>() as f64,
448                    MetricTags::default(),
449                );
450            }
451            Err(error) => {
452                emit!(HostMetricsScrapeDetailError {
453                    message: "Failed to load average info",
454                    error,
455                });
456            }
457        }
458    }
459
460    pub async fn host_metrics(&self, output: &mut MetricsBuffer) {
461        output.name = "host";
462        match heim::host::uptime().await {
463            Ok(time) => output.gauge("uptime", time.get::<second>(), MetricTags::default()),
464            Err(error) => {
465                emit!(HostMetricsScrapeDetailError {
466                    message: "Failed to load host uptime info",
467                    error,
468                });
469            }
470        }
471
472        match heim::host::boot_time().await {
473            Ok(time) => output.gauge("boot_time", time.get::<second>(), MetricTags::default()),
474            Err(error) => {
475                emit!(HostMetricsScrapeDetailError {
476                    message: "Failed to load host boot time info",
477                    error,
478                });
479            }
480        }
481    }
482}
483
484#[derive(Default)]
485pub struct MetricsBuffer {
486    pub metrics: Vec<Metric>,
487    name: &'static str,
488    host: Option<String>,
489    timestamp: DateTime<Utc>,
490    namespace: Option<String>,
491}
492
493impl MetricsBuffer {
494    fn new(namespace: Option<String>) -> Self {
495        Self {
496            metrics: Vec::new(),
497            name: "",
498            host: crate::get_hostname().ok(),
499            timestamp: Utc::now(),
500            namespace,
501        }
502    }
503
504    fn tags(&self, mut tags: MetricTags) -> MetricTags {
505        tags.replace("collector".into(), self.name.to_string());
506        if let Some(host) = &self.host {
507            tags.replace("host".into(), host.clone());
508        }
509        tags
510    }
511
512    fn counter(&mut self, name: &str, value: f64, tags: MetricTags) {
513        self.metrics.push(
514            Metric::new(name, MetricKind::Absolute, MetricValue::Counter { value })
515                .with_namespace(self.namespace.clone())
516                .with_tags(Some(self.tags(tags)))
517                .with_timestamp(Some(self.timestamp)),
518        )
519    }
520
521    fn gauge(&mut self, name: &str, value: f64, tags: MetricTags) {
522        self.metrics.push(
523            Metric::new(name, MetricKind::Absolute, MetricValue::Gauge { value })
524                .with_namespace(self.namespace.clone())
525                .with_tags(Some(self.tags(tags)))
526                .with_timestamp(Some(self.timestamp)),
527        )
528    }
529}
530
531fn filter_result_sync<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
532where
533    E: std::error::Error,
534{
535    result
536        .map_err(|error| emit!(HostMetricsScrapeDetailError { message, error }))
537        .ok()
538}
539
540async fn filter_result<T, E>(result: Result<T, E>, message: &'static str) -> Option<T>
541where
542    E: std::error::Error,
543{
544    filter_result_sync(result, message)
545}
546
547#[allow(clippy::missing_const_for_fn)]
548fn init_roots() {
549    #[cfg(target_os = "linux")]
550    {
551        use std::sync::Once;
552
553        static INIT: Once = Once::new();
554
555        INIT.call_once(|| {
556            match std::env::var_os("PROCFS_ROOT") {
557                Some(procfs_root) => {
558                    info!(
559                        message = "PROCFS_ROOT is set in envvars. Using custom for procfs.",
560                        custom = ?procfs_root
561                    );
562                    heim::os::linux::set_procfs_root(std::path::PathBuf::from(&procfs_root));
563                }
564                None => info!("PROCFS_ROOT is unset. Using default '/proc' for procfs root."),
565            };
566
567            match std::env::var_os("SYSFS_ROOT") {
568                Some(sysfs_root) => {
569                    info!(
570                        message = "SYSFS_ROOT is set in envvars. Using custom for sysfs.",
571                        custom = ?sysfs_root
572                    );
573                    heim::os::linux::set_sysfs_root(std::path::PathBuf::from(&sysfs_root));
574                }
575                None => info!("SYSFS_ROOT is unset. Using default '/sys' for sysfs root."),
576            }
577        });
578    };
579}
580
581impl FilterList {
582    fn contains<T, M>(&self, value: &Option<T>, matches: M) -> bool
583    where
584        M: Fn(&PatternWrapper, &T) -> bool,
585    {
586        (match (&self.includes, value) {
587            // No includes list includes everything
588            (None, _) => true,
589            // Includes list matched against empty value returns false
590            (Some(_), None) => false,
591            // Otherwise find the given value
592            (Some(includes), Some(value)) => includes.iter().any(|pattern| matches(pattern, value)),
593        }) && match (&self.excludes, value) {
594            // No excludes, list excludes nothing
595            (None, _) => true,
596            // No value, never excluded
597            (Some(_), None) => true,
598            // Otherwise find the given value
599            (Some(excludes), Some(value)) => {
600                !excludes.iter().any(|pattern| matches(pattern, value))
601            }
602        }
603    }
604
605    fn contains_str(&self, value: Option<&str>) -> bool {
606        self.contains(&value, |pattern, s| pattern.matches_str(s))
607    }
608
609    fn contains_path(&self, value: Option<&Path>) -> bool {
610        self.contains(&value, |pattern, path| pattern.matches_path(path))
611    }
612
613    #[cfg(test)]
614    fn contains_test(&self, value: Option<&str>) -> bool {
615        let result = self.contains_str(value);
616        assert_eq!(result, self.contains_path(value.map(std::path::Path::new)));
617        result
618    }
619}
620
621/// A compiled Unix shell-style pattern.
622///
623/// - `?` matches any single character.
624/// - `*` matches any (possibly empty) sequence of characters.
625/// - `**` matches the current directory and arbitrary subdirectories. This sequence must form a single path component,
626///   so both `**a` and `b**` are invalid and will result in an error. A sequence of more than two consecutive `*`
627///   characters is also invalid.
628/// - `[...]` matches any character inside the brackets. Character sequences can also specify ranges of characters, as
629///   ordered by Unicode, so e.g. `[0-9]` specifies any character between 0 and 9 inclusive. An unclosed bracket is
630///   invalid.
631/// - `[!...]` is the negation of `[...]`, i.e. it matches any characters not in the brackets.
632///
633/// The metacharacters `?`, `*`, `[`, `]` can be matched by using brackets (e.g. `[?]`). When a `]` occurs immediately
634/// following `[` or `[!` then it is interpreted as being part of, rather then ending, the character set, so `]` and NOT
635/// `]` can be matched by `[]]` and `[!]]` respectively. The `-` character can be specified inside a character sequence
636/// pattern by placing it at the start or the end, e.g. `[abc-]`.
637#[configurable_component]
638#[derive(Clone, Debug)]
639#[serde(try_from = "String", into = "String")]
640struct PatternWrapper(Pattern);
641
642impl PatternWrapper {
643    fn matches_str(&self, s: &str) -> bool {
644        self.0.matches(s)
645    }
646
647    fn matches_path(&self, p: &Path) -> bool {
648        self.0.matches_path(p)
649    }
650}
651
652impl TryFrom<String> for PatternWrapper {
653    type Error = PatternError;
654
655    fn try_from(value: String) -> Result<Self, Self::Error> {
656        Pattern::new(value.as_ref()).map(PatternWrapper)
657    }
658}
659
660impl TryFrom<&str> for PatternWrapper {
661    type Error = PatternError;
662
663    fn try_from(value: &str) -> Result<Self, Self::Error> {
664        value.to_string().try_into()
665    }
666}
667
668impl From<PatternWrapper> for String {
669    fn from(pattern: PatternWrapper) -> Self {
670        pattern.0.to_string()
671    }
672}
673
674#[cfg(test)]
675mod tests {
676    use crate::test_util::components::{run_and_assert_source_compliance, SOURCE_TAGS};
677    use std::{collections::HashSet, future::Future, time::Duration};
678
679    use super::*;
680
681    #[test]
682    fn filterlist_default_includes_everything() {
683        let filters = FilterList::default();
684        assert!(filters.contains_test(Some("anything")));
685        assert!(filters.contains_test(Some("should")));
686        assert!(filters.contains_test(Some("work")));
687        assert!(filters.contains_test(None));
688    }
689
690    #[test]
691    fn filterlist_includes_works() {
692        let filters = FilterList {
693            includes: Some(vec![
694                PatternWrapper::try_from("sda".to_string()).unwrap(),
695                PatternWrapper::try_from("dm-*".to_string()).unwrap(),
696            ]),
697            excludes: None,
698        };
699        assert!(!filters.contains_test(Some("sd")));
700        assert!(filters.contains_test(Some("sda")));
701        assert!(!filters.contains_test(Some("sda1")));
702        assert!(filters.contains_test(Some("dm-")));
703        assert!(filters.contains_test(Some("dm-5")));
704        assert!(!filters.contains_test(Some("xda")));
705        assert!(!filters.contains_test(None));
706    }
707
708    #[test]
709    fn filterlist_excludes_works() {
710        let filters = FilterList {
711            includes: None,
712            excludes: Some(vec![
713                PatternWrapper::try_from("sda".to_string()).unwrap(),
714                PatternWrapper::try_from("dm-*".to_string()).unwrap(),
715            ]),
716        };
717        assert!(filters.contains_test(Some("sd")));
718        assert!(!filters.contains_test(Some("sda")));
719        assert!(filters.contains_test(Some("sda1")));
720        assert!(!filters.contains_test(Some("dm-")));
721        assert!(!filters.contains_test(Some("dm-5")));
722        assert!(filters.contains_test(Some("xda")));
723        assert!(filters.contains_test(None));
724    }
725
726    #[test]
727    fn filterlist_includes_and_excludes_works() {
728        let filters = FilterList {
729            includes: Some(vec![
730                PatternWrapper::try_from("sda".to_string()).unwrap(),
731                PatternWrapper::try_from("dm-*".to_string()).unwrap(),
732            ]),
733            excludes: Some(vec![PatternWrapper::try_from("dm-5".to_string()).unwrap()]),
734        };
735        assert!(!filters.contains_test(Some("sd")));
736        assert!(filters.contains_test(Some("sda")));
737        assert!(!filters.contains_test(Some("sda1")));
738        assert!(filters.contains_test(Some("dm-")));
739        assert!(filters.contains_test(Some("dm-1")));
740        assert!(!filters.contains_test(Some("dm-5")));
741        assert!(!filters.contains_test(Some("xda")));
742        assert!(!filters.contains_test(None));
743    }
744
745    #[tokio::test]
746    async fn filters_on_collectors() {
747        let all_metrics_count = HostMetrics::new(HostMetricsConfig::default())
748            .capture_metrics()
749            .await
750            .len();
751
752        for collector in &[
753            #[cfg(target_os = "linux")]
754            Collector::CGroups,
755            Collector::Cpu,
756            Collector::Process,
757            Collector::Disk,
758            Collector::Filesystem,
759            Collector::Load,
760            Collector::Host,
761            Collector::Memory,
762            Collector::Network,
763        ] {
764            let some_metrics = HostMetrics::new(HostMetricsConfig {
765                collectors: Some(vec![*collector]),
766                ..Default::default()
767            })
768            .capture_metrics()
769            .await;
770
771            assert!(
772                all_metrics_count > some_metrics.len(),
773                "collector={collector:?}"
774            );
775        }
776    }
777
778    #[tokio::test]
779    async fn are_tagged_with_hostname() {
780        let metrics = HostMetrics::new(HostMetricsConfig::default())
781            .capture_metrics()
782            .await;
783        let hostname = crate::get_hostname().expect("Broken hostname");
784        assert!(!metrics.into_iter().any(|event| event
785            .tags()
786            .expect("Missing tags")
787            .get("host")
788            .expect("Missing \"host\" tag")
789            != hostname));
790    }
791
792    #[tokio::test]
793    async fn uses_custom_namespace() {
794        let metrics = HostMetrics::new(HostMetricsConfig {
795            namespace: Some("other".into()),
796            ..Default::default()
797        })
798        .capture_metrics()
799        .await;
800
801        assert!(metrics
802            .into_iter()
803            .all(|event| event.namespace() == Some("other")));
804    }
805
806    #[tokio::test]
807    async fn uses_default_namespace() {
808        let metrics = HostMetrics::new(HostMetricsConfig::default())
809            .capture_metrics()
810            .await;
811
812        assert!(metrics
813            .iter()
814            .all(|event| event.namespace() == Some("host")));
815    }
816
817    // Windows does not produce load average metrics.
818    #[cfg(not(windows))]
819    #[tokio::test]
820    async fn generates_loadavg_metrics() {
821        let mut buffer = MetricsBuffer::new(None);
822        HostMetrics::new(HostMetricsConfig::default())
823            .loadavg_metrics(&mut buffer)
824            .await;
825        let metrics = buffer.metrics;
826        assert_eq!(metrics.len(), 3);
827        assert!(all_gauges(&metrics));
828
829        // All metrics are named load*
830        assert!(!metrics
831            .iter()
832            .any(|metric| !metric.name().starts_with("load")));
833    }
834
835    #[tokio::test]
836    async fn generates_host_metrics() {
837        let mut buffer = MetricsBuffer::new(None);
838        HostMetrics::new(HostMetricsConfig::default())
839            .host_metrics(&mut buffer)
840            .await;
841        let metrics = buffer.metrics;
842        assert_eq!(metrics.len(), 2);
843        assert!(all_gauges(&metrics));
844    }
845
846    pub(super) fn all_counters(metrics: &[Metric]) -> bool {
847        !metrics
848            .iter()
849            .any(|metric| !matches!(metric.value(), &MetricValue::Counter { .. }))
850    }
851
852    pub(super) fn all_gauges(metrics: &[Metric]) -> bool {
853        !metrics
854            .iter()
855            .any(|metric| !matches!(metric.value(), &MetricValue::Gauge { .. }))
856    }
857
858    fn all_tags_match(metrics: &[Metric], tag: &str, matches: impl Fn(&str) -> bool) -> bool {
859        !metrics.iter().any(|metric| {
860            metric
861                .tags()
862                .unwrap()
863                .get(tag)
864                .map(|value| !matches(value))
865                .unwrap_or(false)
866        })
867    }
868
869    pub(super) fn count_name(metrics: &[Metric], name: &str) -> usize {
870        metrics
871            .iter()
872            .filter(|metric| metric.name() == name)
873            .count()
874    }
875
876    pub(super) fn count_tag(metrics: &[Metric], tag: &str) -> usize {
877        metrics
878            .iter()
879            .filter(|metric| {
880                metric
881                    .tags()
882                    .expect("Metric is missing tags")
883                    .contains_key(tag)
884            })
885            .count()
886    }
887
888    fn collect_tag_values(metrics: &[Metric], tag: &str) -> HashSet<String> {
889        metrics
890            .iter()
891            .filter_map(|metric| metric.tags().unwrap().get(tag).map(ToOwned::to_owned))
892            .collect::<HashSet<_>>()
893    }
894
895    // Run a series of tests using filters to ensure they are obeyed
896    pub(super) async fn assert_filtered_metrics<Get, Fut>(tag: &str, get_metrics: Get)
897    where
898        Get: Fn(FilterList) -> Fut,
899        Fut: Future<Output = Vec<Metric>>,
900    {
901        let all_metrics = get_metrics(FilterList::default()).await;
902
903        let keys = collect_tag_values(&all_metrics, tag);
904        // Pick an arbitrary key value
905        if let Some(key) = keys.into_iter().next() {
906            let key_prefix = &key[..key.len() - 1].to_string();
907            let key_prefix_pattern = PatternWrapper::try_from(format!("{key_prefix}*")).unwrap();
908            let key_pattern = PatternWrapper::try_from(key.clone()).unwrap();
909
910            let filter = FilterList {
911                includes: Some(vec![key_pattern.clone()]),
912                excludes: None,
913            };
914            let filtered_metrics_with = get_metrics(filter).await;
915
916            assert!(filtered_metrics_with.len() <= all_metrics.len());
917            assert!(!filtered_metrics_with.is_empty());
918            assert!(all_tags_match(&filtered_metrics_with, tag, |s| s == key));
919
920            let filter = FilterList {
921                includes: Some(vec![key_prefix_pattern.clone()]),
922                excludes: None,
923            };
924            let filtered_metrics_with_match = get_metrics(filter).await;
925
926            assert!(filtered_metrics_with_match.len() >= filtered_metrics_with.len());
927            assert!(all_tags_match(&filtered_metrics_with_match, tag, |s| {
928                s.starts_with(key_prefix)
929            }));
930
931            let filter = FilterList {
932                includes: None,
933                excludes: Some(vec![key_pattern]),
934            };
935            let filtered_metrics_without = get_metrics(filter).await;
936
937            assert!(filtered_metrics_without.len() <= all_metrics.len());
938            assert!(all_tags_match(&filtered_metrics_without, tag, |s| s != key));
939
940            let filter = FilterList {
941                includes: None,
942                excludes: Some(vec![key_prefix_pattern]),
943            };
944            let filtered_metrics_without_match = get_metrics(filter).await;
945
946            assert!(filtered_metrics_without_match.len() <= filtered_metrics_without.len());
947            assert!(all_tags_match(&filtered_metrics_without_match, tag, |s| {
948                !s.starts_with(key_prefix)
949            }));
950
951            assert!(
952                filtered_metrics_with.len() + filtered_metrics_without.len() <= all_metrics.len()
953            );
954        }
955    }
956
957    #[tokio::test]
958    async fn source_compliance() {
959        let config = HostMetricsConfig {
960            scrape_interval_secs: Duration::from_secs(1),
961            ..Default::default()
962        };
963
964        let events =
965            run_and_assert_source_compliance(config, Duration::from_secs(2), &SOURCE_TAGS).await;
966
967        assert!(!events.is_empty());
968    }
969}