1use std::{
2    io,
3    num::ParseIntError,
4    path::{Path, PathBuf},
5    str::FromStr,
6};
7
8use futures::future::BoxFuture;
9use snafu::{ResultExt, Snafu};
10use tokio::{
11    fs::{self, File},
12    io::AsyncReadExt,
13};
14use vector_lib::metric_tags;
15
16use super::{CGroupsConfig, HostMetrics, MetricsBuffer, filter_result_sync};
17use crate::event::MetricTags;
18
19const MICROSECONDS: f64 = 1.0 / 1_000_000.0;
20
21#[derive(Debug, Snafu)]
22enum CGroupsError {
23    #[snafu(display("Could not open cgroup data file {:?}.", filename))]
24    Opening {
25        filename: PathBuf,
26        source: io::Error,
27    },
28    #[snafu(display("Could not read cgroup data file {:?}.", filename))]
29    Reading {
30        filename: PathBuf,
31        source: io::Error,
32    },
33    #[snafu(display("Could not parse cgroup data file {:?}.", filename))]
34    Parsing {
35        filename: PathBuf,
36        source: ParseIntError,
37    },
38}
39
40type CGroupsResult<T> = Result<T, CGroupsError>;
41
42impl HostMetrics {
43    pub(super) async fn cgroups_metrics(&self, output: &mut MetricsBuffer) {
44        if let Some(root) = &self.root_cgroup {
45            output.name = "cgroups";
46            let mut recurser = CGroupRecurser::new(self, output);
47            match &root.mode {
48                Mode::Modern(base) => recurser.scan_modern(root, base).await,
49                Mode::Legacy(base) => recurser.scan_legacy(root, base).await,
50                Mode::Hybrid(v1base, v2base) => {
51                    recurser.scan_legacy(root, v1base).await;
59                    recurser.scan_modern(root, v2base).await;
60                }
61            }
62        }
63    }
64}
65
66struct CGroupRecurser<'a> {
67    output: &'a mut MetricsBuffer,
68    buffer: String,
69    load_cpu: bool,
70    load_memory: bool,
71    config: CGroupsConfig,
72}
73
74impl<'a> CGroupRecurser<'a> {
75    fn new(host: &'a HostMetrics, output: &'a mut MetricsBuffer) -> Self {
76        let cgroups = host.config.cgroups.clone().unwrap_or_default();
77
78        Self {
79            output,
80            buffer: String::new(),
81            load_cpu: true,
82            load_memory: true,
83            config: cgroups,
84        }
85    }
86
87    async fn scan_modern(&mut self, root: &CGroupRoot, base: &Path) {
88        let cgroup = CGroup {
89            path: join_path(base, &root.path),
90            name: root.name.clone(),
91        };
92        self.load_cpu = true;
93        self.load_memory = true;
94        self.recurse(cgroup, 1).await;
95    }
96
97    async fn scan_legacy(&mut self, root: &CGroupRoot, base: &Path) {
98        let memory_base = join_path(base, "memory");
99        let cgroup = CGroup {
100            path: join_path(memory_base, &root.path),
101            name: root.name.clone(),
102        };
103        self.load_cpu = false;
104        self.load_memory = true;
105        self.recurse(cgroup, 1).await;
106
107        let cpu_base = join_path(base, "cpu");
108        let cgroup = CGroup {
109            path: join_path(cpu_base, &root.path),
110            name: root.name.clone(),
111        };
112        self.load_cpu = true;
113        self.load_memory = false;
114        self.recurse(cgroup, 1).await;
115    }
116
117    fn recurse(&mut self, cgroup: CGroup, level: usize) -> BoxFuture<'_, ()> {
118        Box::pin(async move {
119            let tags = cgroup.tags();
120
121            if self.load_cpu {
122                self.load_cpu(&cgroup, &tags).await;
123            }
124            if self.load_memory {
125                self.load_memory(&cgroup, &tags).await;
126            }
127
128            if level < self.config.levels {
129                let groups = self.config.groups.clone();
130                if let Some(children) =
131                    filter_result_sync(cgroup.children().await, "Failed to load cgroups children.")
132                {
133                    for child in children {
134                        if groups.contains_path(Some(&child.name)) {
135                            self.recurse(child, level + 1).await;
136                        }
137                    }
138                }
139            }
140        })
141    }
142
143    async fn load_cpu(&mut self, cgroup: &CGroup, tags: &MetricTags) {
145        if let Some(Some(cpu)) = filter_result_sync(
146            cgroup.load_cpu(&mut self.buffer).await,
147            "Failed to load cgroups CPU statistics.",
148        ) {
149            self.output.counter(
150                "cgroup_cpu_usage_seconds_total",
151                cpu.usage_usec as f64 * MICROSECONDS,
152                tags.clone(),
153            );
154            self.output.counter(
155                "cgroup_cpu_user_seconds_total",
156                cpu.user_usec as f64 * MICROSECONDS,
157                tags.clone(),
158            );
159            self.output.counter(
160                "cgroup_cpu_system_seconds_total",
161                cpu.system_usec as f64 * MICROSECONDS,
162                tags.clone(),
163            );
164        }
165    }
166
167    async fn load_memory(&mut self, cgroup: &CGroup, tags: &MetricTags) {
169        if let Some(Some(current)) = filter_result_sync(
170            cgroup.load_memory_current(&mut self.buffer).await,
171            "Failed to load cgroups current memory.",
172        ) {
173            self.output
174                .gauge("cgroup_memory_current_bytes", current as f64, tags.clone());
175        }
176
177        if let Some(Some(stat)) = filter_result_sync(
178            cgroup.load_memory_stat(&mut self.buffer).await,
179            "Failed to load cgroups memory statistics.",
180        ) {
181            self.output
182                .gauge("cgroup_memory_anon_bytes", stat.anon as f64, tags.clone());
183            self.output
184                .gauge("cgroup_memory_file_bytes", stat.file as f64, tags.clone());
185            self.output.gauge(
186                "cgroup_memory_anon_active_bytes",
187                stat.active_anon as f64,
188                tags.clone(),
189            );
190            self.output.gauge(
191                "cgroup_memory_anon_inactive_bytes",
192                stat.inactive_anon as f64,
193                tags.clone(),
194            );
195            self.output.gauge(
196                "cgroup_memory_file_active_bytes",
197                stat.active_file as f64,
198                tags.clone(),
199            );
200            self.output.gauge(
201                "cgroup_memory_file_inactive_bytes",
202                stat.inactive_file as f64,
203                tags.clone(),
204            );
205        }
206    }
207}
208
209#[derive(Clone, Debug)]
210pub(super) struct CGroupRoot {
211    name: PathBuf,
212    path: PathBuf,
213    mode: Mode,
214}
215
216#[derive(Clone, Debug)]
217enum Mode {
218    Legacy(PathBuf),
219    Hybrid(PathBuf, PathBuf),
220    Modern(PathBuf),
221}
222
223const CGROUP_CONTROLLERS: &str = "cgroup.controllers";
224
225impl CGroupRoot {
226    pub(super) fn new(config: &CGroupsConfig) -> Option<Self> {
227        let base_dir = config
248            .base_dir
249            .clone()
250            .unwrap_or_else(|| join_path(heim::os::linux::sysfs_root(), "fs/cgroup"));
251
252        let mode = {
253            let hybrid_root = join_path(&base_dir, "unified");
254            let hybrid_test_file = join_path(&hybrid_root, CGROUP_CONTROLLERS);
255            let modern_test_file = join_path(&base_dir, CGROUP_CONTROLLERS);
256            let cpu_dir = join_path(&base_dir, "cpu");
257            if is_file(hybrid_test_file) {
258                debug!(
259                    message = "Detected hybrid cgroup base directory.",
260                    ?base_dir
261                );
262                Mode::Hybrid(base_dir, hybrid_root)
263            } else if is_file(modern_test_file) {
264                debug!(
265                    message = "Detected modern cgroup base directory.",
266                    ?base_dir
267                );
268                Mode::Modern(base_dir)
269            } else if is_dir(cpu_dir) {
270                debug!(
271                    message = "Detected legacy cgroup base directory.",
272                    ?base_dir
273                );
274                Mode::Legacy(base_dir)
275            } else {
276                warn!(
277                    message = "Could not detect cgroup base directory.",
278                    ?base_dir
279                );
280                return None;
281            }
282        };
283
284        let (path, name) = match &config.base {
285            Some(base) => (base.to_path_buf(), base.to_path_buf()),
286            None => ("/".into(), "/".into()),
287        };
288        Some(Self { name, path, mode })
289    }
290}
291
292#[derive(Clone, Debug)]
293struct CGroup {
294    path: PathBuf,
295    name: PathBuf,
296}
297
298impl CGroup {
299    fn tags(&self) -> MetricTags {
300        metric_tags! {
301            "cgroup" => self.name.to_string_lossy(),
302            "collector" => "cgroups",
303        }
304    }
305
306    fn make_path(&self, filename: impl AsRef<Path>) -> PathBuf {
307        join_path(&self.path, filename)
308    }
309
310    async fn open_read(
314        &self,
315        filename: impl AsRef<Path>,
316        buffer: &mut String,
317    ) -> CGroupsResult<Option<PathBuf>> {
318        buffer.clear();
319        let filename = self.make_path(filename);
320        match File::open(&filename).await {
321            Ok(mut file) => {
322                file.read_to_string(buffer)
323                    .await
324                    .with_context(|_| ReadingSnafu {
325                        filename: filename.clone(),
326                    })?;
327                Ok(Some(filename))
328            }
329            Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
330            Err(source) => Err(CGroupsError::Opening { source, filename }),
331        }
332    }
333
334    async fn open_read_parse<T: FromStr<Err = ParseIntError>>(
337        &self,
338        filename: impl AsRef<Path>,
339        buffer: &mut String,
340    ) -> CGroupsResult<Option<T>> {
341        self.open_read(filename, buffer)
342            .await?
343            .map(|filename| {
344                buffer
345                    .trim()
346                    .parse()
347                    .with_context(|_| ParsingSnafu { filename })
348            })
349            .transpose()
350    }
351
352    async fn load_cpu(&self, buffer: &mut String) -> CGroupsResult<Option<CpuStat>> {
353        self.open_read_parse("cpu.stat", buffer).await
354    }
355
356    async fn load_memory_current(&self, buffer: &mut String) -> CGroupsResult<Option<u64>> {
357        self.open_read_parse("memory.current", buffer).await
358    }
359
360    async fn load_memory_stat(&self, buffer: &mut String) -> CGroupsResult<Option<MemoryStat>> {
361        self.open_read_parse("memory.stat", buffer).await
362    }
363
364    async fn children(&self) -> io::Result<Vec<CGroup>> {
365        let mut result = Vec::new();
366        let mut dir = fs::read_dir(&self.path).await?;
367        while let Some(entry) = dir.next_entry().await? {
368            let path = entry.path();
369            if is_dir(&path) {
370                let name = join_name(&self.name, entry.file_name());
371                result.push(CGroup { path, name });
372            }
373        }
374        Ok(result)
375    }
376}
377
378macro_rules! define_stat_struct {
379    ($name:ident ( $( $field:ident, )* )) => {
380        #[derive(Clone, Copy, Debug, Default)]
381        struct $name {
382            $( $field: u64, )*
383        }
384
385        impl FromStr for $name {
386            type Err = ParseIntError;
387            fn from_str(text:&str)->Result<Self,Self::Err>{
388                let mut result = Self::default();
389                for line in text.lines(){
390                    if false {}
391                    $(
392                        else if line.starts_with(concat!(stringify!($field), ' ')) {
393                            result.$field = line[stringify!($field).len()+1..].parse()?;
394                        }
395                    )*
396                }
397                Ok(result)
398            }
399        }
400    };
401}
402
403define_stat_struct! { CpuStat(
404    usage_usec,
405    user_usec,
406    system_usec,
407)}
408
409define_stat_struct! { MemoryStat(
410    anon,
416    file,
417    active_anon,
418    inactive_anon,
419    active_file,
420    inactive_file,
421)}
422
423fn is_dir(path: impl AsRef<Path>) -> bool {
424    std::fs::metadata(path.as_ref()).is_ok_and(|metadata| metadata.is_dir())
425}
426
427fn is_file(path: impl AsRef<Path>) -> bool {
428    std::fs::metadata(path.as_ref()).is_ok_and(|metadata| metadata.is_file())
429}
430
431fn join_path(base_path: impl AsRef<Path>, filename: impl AsRef<Path>) -> PathBuf {
433    let filename = filename.as_ref();
434    let base_path = base_path.as_ref();
435    if filename == Path::new("/") {
436        base_path.into()
438    } else {
439        [base_path, filename].iter().collect()
440    }
441}
442
443fn join_name(base_name: &Path, filename: impl AsRef<Path>) -> PathBuf {
445    let filename = filename.as_ref();
446    if base_name == Path::new("/") {
450        filename.into()
451    } else {
452        [base_name, filename].iter().collect()
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use std::{
459        collections::BTreeSet,
460        fs::{self, File},
461        io::Write,
462        path::{Path, PathBuf},
463    };
464
465    use rand::{Rng, rngs::ThreadRng};
466    use similar_asserts::assert_eq;
467    use tempfile::TempDir;
468    use vector_lib::event::Metric;
469
470    use super::{
471        super::{
472            HostMetrics, HostMetricsConfig,
473            tests::{count_name, count_tag},
474        },
475        MetricsBuffer, join_name, join_path,
476    };
477
478    #[test]
479    fn joins_names_and_paths() {
480        assert_eq!(join_name(Path::new("/"), "foo"), PathBuf::from("foo"));
481        assert_eq!(join_name(Path::new("/"), "/"), PathBuf::from("/"));
482        assert_eq!(join_name(Path::new("foo"), "bar"), PathBuf::from("foo/bar"));
483
484        assert_eq!(join_path("/sys", "foo"), PathBuf::from("/sys/foo"));
485        assert_eq!(join_path("/sys", "/"), PathBuf::from("/sys"));
486    }
487
488    #[tokio::test]
489    async fn generates_cgroups_metrics() {
490        let config: HostMetricsConfig = toml::from_str(r#"collectors = ["cgroups"]"#).unwrap();
491        let mut buffer = MetricsBuffer::new(None);
492        HostMetrics::new(config).cgroups_metrics(&mut buffer).await;
493        let metrics = buffer.metrics;
494
495        assert!(!metrics.is_empty());
496        assert_eq!(count_tag(&metrics, "cgroup"), metrics.len());
497        assert_eq!(count_tag(&metrics, "collector"), metrics.len());
498        assert_ne!(count_name(&metrics, "cgroup_cpu_usage_seconds_total"), 0);
499        assert_ne!(count_name(&metrics, "cgroup_cpu_user_seconds_total"), 0);
500        assert_ne!(count_name(&metrics, "cgroup_cpu_system_seconds_total"), 0);
501        assert_ne!(count_name(&metrics, "cgroup_memory_anon_bytes"), 0);
502        assert_ne!(count_name(&metrics, "cgroup_memory_file_bytes"), 0);
503        assert_ne!(count_name(&metrics, "cgroup_memory_anon_active_bytes"), 0);
504        assert_ne!(count_name(&metrics, "cgroup_memory_anon_inactive_bytes"), 0);
505        assert_ne!(count_name(&metrics, "cgroup_memory_file_active_bytes"), 0);
506        assert_ne!(count_name(&metrics, "cgroup_memory_file_inactive_bytes"), 0);
507    }
508
509    #[tokio::test]
510    async fn parses_modern_cgroups() {
511        let mut base = Setup::new();
515        for subdir in SUBDIRS {
516            base.group(
517                subdir,
518                CPU_STAT | MEMORY_STAT,
519                Some(if subdir == "." {
520                    "cpuset cpu memory pids\n"
521                } else {
522                    "memory pids\n"
523                }),
524            );
525        }
526        base.test().await;
527    }
528
529    #[tokio::test]
530    async fn parses_hybrid_cgroups_1() {
531        let mut base = Setup::new();
536        base.d("memory");
537        base.d("unified");
538        for subdir in SUBDIRS {
539            base.group(&format!("unified/{subdir}"), CPU_STAT, Some(""));
540            base.group(&format!("memory/{subdir}"), MEMORY_STAT, None);
541        }
542        base.test().await;
543    }
544
545    #[tokio::test]
546    async fn parses_hybrid_cgroups_2() {
547        let mut base = Setup::new();
553        base.group("cpu", CPU_STAT, None); base.d("memory");
555        base.d("unified");
556        for subdir in SUBDIRS {
557            base.group(
558                &format!("unified/{subdir}"),
559                if subdir == "." { NONE } else { CPU_STAT },
560                Some(""),
561            );
562            base.group(&format!("memory/{subdir}"), MEMORY_STAT, None);
563        }
564        base.test().await;
565    }
566
567    #[tokio::test]
568    async fn parses_legacy_cgroups() {
569        let mut base = Setup::new();
572        base.d("cpu");
573        base.d("memory");
574        for subdir in SUBDIRS {
575            base.group(&format!("cpu/{subdir}"), CPU_STAT, None);
576            base.group(&format!("memory/{subdir}"), MEMORY_STAT, None);
577        }
578    }
579
580    const SUBDIRS: [&str; 5] = [
581        ".",
582        "system.slice",
583        "user.slice",
584        "user.slice/user-1000.slice",
585        "user.slice/user-1000.slice/session-40.scope",
586    ];
587
588    const GROUPS: [&str; 5] = ["/", SUBDIRS[1], SUBDIRS[2], SUBDIRS[3], SUBDIRS[4]];
589
590    struct Setup(TempDir, ThreadRng);
591
592    const NONE: usize = 0;
593    const CPU_STAT: usize = 1 << 1;
594    const MEMORY_STAT: usize = 1 << 2;
595
596    impl Setup {
597        fn new() -> Self {
598            Self(tempfile::tempdir().unwrap(), rand::rng())
599        }
600
601        async fn test(&self) {
602            let path = self.0.path();
603            let config: HostMetricsConfig = toml::from_str(&format!(
604                r#"
605                collectors = ["cgroups"]
606                cgroups.base_dir = {path:?}
607                "#
608            ))
609            .unwrap();
610            let mut buffer = MetricsBuffer::new(None);
611            HostMetrics::new(config).cgroups_metrics(&mut buffer).await;
612            let metrics = buffer.metrics;
613
614            assert_ne!(metrics.len(), 0);
615
616            assert_eq!(&all_tags(&metrics, "collector"), &["cgroups"]);
617            assert_eq!(&all_tags(&metrics, "cgroup"), &GROUPS);
618
619            assert_eq!(
620                count_name(&metrics, "cgroup_cpu_usage_seconds_total"),
621                SUBDIRS.len()
622            );
623            assert_eq!(
624                count_name(&metrics, "cgroup_cpu_user_seconds_total"),
625                SUBDIRS.len()
626            );
627            assert_eq!(
628                count_name(&metrics, "cgroup_cpu_system_seconds_total"),
629                SUBDIRS.len()
630            );
631            assert_eq!(
632                count_name(&metrics, "cgroup_memory_anon_bytes"),
633                SUBDIRS.len()
634            );
635            assert_eq!(
636                count_name(&metrics, "cgroup_memory_file_bytes"),
637                SUBDIRS.len()
638            );
639            assert_eq!(
640                count_name(&metrics, "cgroup_memory_anon_active_bytes"),
641                SUBDIRS.len()
642            );
643            assert_eq!(
644                count_name(&metrics, "cgroup_memory_anon_inactive_bytes"),
645                SUBDIRS.len()
646            );
647            assert_eq!(
648                count_name(&metrics, "cgroup_memory_file_active_bytes"),
649                SUBDIRS.len()
650            );
651            assert_eq!(
652                count_name(&metrics, "cgroup_memory_file_inactive_bytes"),
653                SUBDIRS.len()
654            );
655        }
656
657        fn group(&mut self, subdir: &str, flags: usize, controllers: Option<&str>) {
658            self.d(subdir);
659            if let Some(controllers) = controllers {
660                self.f(subdir, "cgroup.controllers", controllers);
661            }
662            if (flags & CPU_STAT) != 0 {
663                self.cpu_stat(subdir);
664            }
665            if (flags & MEMORY_STAT) != 0 {
666                self.memory_stat(subdir);
667            }
668        }
669
670        fn cpu_stat(&mut self, subdir: &str) {
671            let a = self.1.random_range(1000000..1000000000);
672            let b = self.1.random_range(1000000..1000000000);
673            let c = self.1.random_range(1000000..1000000000);
674            self.f(
675                subdir,
676                "cpu.stat",
677                &format!("usage_usec {a}\nuser_usec {b}\nsystem_usec {c}\nnr_periods 0\nnr_throttled 0\nthrottled_usec 0\n"),
678            );
679        }
680
681        fn memory_stat(&mut self, subdir: &str) {
682            let anon = self.1.random_range(1000000..1000000000);
683            let file = self.1.random_range(1000000..1000000000);
684            self.f(
685                subdir,
686                "memory.stat",
687                &format!("anon {anon}\nfile {file}\n",),
688            );
689        }
690
691        fn d(&self, subdir: &str) {
692            let path: PathBuf = [self.0.path(), subdir.as_ref()].iter().collect();
693            fs::create_dir_all(path).unwrap();
694        }
695
696        fn f(&self, subdir: &str, filename: &str, contents: &str) {
697            let path: PathBuf = [self.0.path(), subdir.as_ref(), filename.as_ref()]
698                .iter()
699                .collect();
700            let mut file = File::options()
701                .write(true)
702                .create(true)
703                .truncate(true)
704                .open(path)
705                .unwrap();
706            file.write_all(contents.as_bytes()).unwrap();
707        }
708    }
709
710    fn all_tags(metrics: &[Metric], tag: &str) -> Vec<String> {
711        metrics
712            .iter()
713            .map(|metric| {
714                metric
715                    .tags()
716                    .expect("The metrics should have tags")
717                    .get(tag)
718                    .expect("The metric is missing the specified tag")
719                    .to_string()
720            })
721            .collect::<BTreeSet<String>>()
722            .into_iter()
723            .collect()
724    }
725}