1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use super::{default_all_processes, example_processes, FilterList, HostMetrics};
use std::ffi::OsStr;
use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, System, UpdateKind};
use vector_lib::configurable::configurable_component;
#[cfg(target_os = "linux")]
use vector_lib::metric_tags;

/// Options for the process metrics collector.
#[configurable_component]
#[derive(Clone, Debug, Default)]
pub struct ProcessConfig {
    /// Lists of process name patterns to include or exclude.
    #[serde(default = "default_all_processes")]
    #[configurable(metadata(docs::examples = "example_processes()"))]
    processes: FilterList,
}

const RUNTIME: &str = "process_runtime";
const CPU_USAGE: &str = "process_cpu_usage";
const MEMORY_USAGE: &str = "process_memory_usage";

impl HostMetrics {
    pub async fn process_metrics(&self, output: &mut super::MetricsBuffer) {
        let mut sys = System::new();
        sys.refresh_processes_specifics(
            ProcessesToUpdate::All,
            true,
            ProcessRefreshKind::new()
                .with_memory()
                .with_cpu()
                .with_cmd(UpdateKind::OnlyIfNotSet),
        );
        output.name = "process";
        let sep = OsStr::new(" ");
        for (pid, process) in sys.processes().iter().filter(|&(_, proc)| {
            self.config
                .process
                .processes
                .contains_str(proc.name().to_str())
        }) {
            let tags = || {
                metric_tags!(
                "pid" => pid.as_u32().to_string(),
                "name" => process.name().to_str().unwrap_or("unknown"),
                "command" => process.cmd().join(sep).to_str().unwrap_or(""))
            };
            output.gauge(CPU_USAGE, process.cpu_usage().into(), tags());
            output.gauge(MEMORY_USAGE, process.memory() as f64, tags());
            output.counter(RUNTIME, process.run_time() as f64, tags());
        }
    }
}

#[cfg(test)]
mod tests {
    use crate::sources::host_metrics::tests::count_tag;

    use super::super::{HostMetrics, HostMetricsConfig, MetricsBuffer};

    #[tokio::test]
    async fn generates_process_metrics() {
        let mut buffer = MetricsBuffer::new(None);
        HostMetrics::new(HostMetricsConfig::default())
            .process_metrics(&mut buffer)
            .await;
        let metrics = buffer.metrics;
        assert!(!metrics.is_empty());

        // All metrics are named process_*
        assert!(!metrics
            .iter()
            .any(|metric| !metric.name().starts_with("process_")));

        // They should all have the required tag
        assert_eq!(count_tag(&metrics, "pid"), metrics.len());
        assert_eq!(count_tag(&metrics, "name"), metrics.len());
        assert_eq!(count_tag(&metrics, "command"), metrics.len());
    }
}