1use std::{io, num::ParseIntError, path::Path, path::PathBuf, str::FromStr};
2
3use futures::future::BoxFuture;
4use snafu::{ResultExt, Snafu};
5use tokio::{
6 fs::{self, File},
7 io::AsyncReadExt,
8};
9use vector_lib::metric_tags;
10
11use super::{filter_result_sync, CGroupsConfig, HostMetrics, MetricsBuffer};
12use crate::event::MetricTags;
13
14const MICROSECONDS: f64 = 1.0 / 1_000_000.0;
15
16#[derive(Debug, Snafu)]
17enum CGroupsError {
18 #[snafu(display("Could not open cgroup data file {:?}.", filename))]
19 Opening {
20 filename: PathBuf,
21 source: io::Error,
22 },
23 #[snafu(display("Could not read cgroup data file {:?}.", filename))]
24 Reading {
25 filename: PathBuf,
26 source: io::Error,
27 },
28 #[snafu(display("Could not parse cgroup data file {:?}.", filename))]
29 Parsing {
30 filename: PathBuf,
31 source: ParseIntError,
32 },
33}
34
35type CGroupsResult<T> = Result<T, CGroupsError>;
36
37impl HostMetrics {
38 pub(super) async fn cgroups_metrics(&self, output: &mut MetricsBuffer) {
39 if let Some(root) = &self.root_cgroup {
40 output.name = "cgroups";
41 let mut recurser = CGroupRecurser::new(self, output);
42 match &root.mode {
43 Mode::Modern(base) => recurser.scan_modern(root, base).await,
44 Mode::Legacy(base) => recurser.scan_legacy(root, base).await,
45 Mode::Hybrid(v1base, v2base) => {
46 recurser.scan_legacy(root, v1base).await;
54 recurser.scan_modern(root, v2base).await;
55 }
56 }
57 }
58 }
59}
60
61struct CGroupRecurser<'a> {
62 output: &'a mut MetricsBuffer,
63 buffer: String,
64 load_cpu: bool,
65 load_memory: bool,
66 config: CGroupsConfig,
67}
68
69impl<'a> CGroupRecurser<'a> {
70 fn new(host: &'a HostMetrics, output: &'a mut MetricsBuffer) -> Self {
71 let cgroups = host.config.cgroups.clone().unwrap_or_default();
72
73 Self {
74 output,
75 buffer: String::new(),
76 load_cpu: true,
77 load_memory: true,
78 config: cgroups,
79 }
80 }
81
82 async fn scan_modern(&mut self, root: &CGroupRoot, base: &Path) {
83 let cgroup = CGroup {
84 path: join_path(base, &root.path),
85 name: root.name.clone(),
86 };
87 self.load_cpu = true;
88 self.load_memory = true;
89 self.recurse(cgroup, 1).await;
90 }
91
92 async fn scan_legacy(&mut self, root: &CGroupRoot, base: &Path) {
93 let memory_base = join_path(base, "memory");
94 let cgroup = CGroup {
95 path: join_path(memory_base, &root.path),
96 name: root.name.clone(),
97 };
98 self.load_cpu = false;
99 self.load_memory = true;
100 self.recurse(cgroup, 1).await;
101
102 let cpu_base = join_path(base, "cpu");
103 let cgroup = CGroup {
104 path: join_path(cpu_base, &root.path),
105 name: root.name.clone(),
106 };
107 self.load_cpu = true;
108 self.load_memory = false;
109 self.recurse(cgroup, 1).await;
110 }
111
112 fn recurse(&mut self, cgroup: CGroup, level: usize) -> BoxFuture<'_, ()> {
113 Box::pin(async move {
114 let tags = cgroup.tags();
115
116 if self.load_cpu {
117 self.load_cpu(&cgroup, &tags).await;
118 }
119 if self.load_memory {
120 self.load_memory(&cgroup, &tags).await;
121 }
122
123 if level < self.config.levels {
124 let groups = self.config.groups.clone();
125 if let Some(children) =
126 filter_result_sync(cgroup.children().await, "Failed to load cgroups children.")
127 {
128 for child in children {
129 if groups.contains_path(Some(&child.name)) {
130 self.recurse(child, level + 1).await;
131 }
132 }
133 }
134 }
135 })
136 }
137
138 async fn load_cpu(&mut self, cgroup: &CGroup, tags: &MetricTags) {
140 if let Some(Some(cpu)) = filter_result_sync(
141 cgroup.load_cpu(&mut self.buffer).await,
142 "Failed to load cgroups CPU statistics.",
143 ) {
144 self.output.counter(
145 "cgroup_cpu_usage_seconds_total",
146 cpu.usage_usec as f64 * MICROSECONDS,
147 tags.clone(),
148 );
149 self.output.counter(
150 "cgroup_cpu_user_seconds_total",
151 cpu.user_usec as f64 * MICROSECONDS,
152 tags.clone(),
153 );
154 self.output.counter(
155 "cgroup_cpu_system_seconds_total",
156 cpu.system_usec as f64 * MICROSECONDS,
157 tags.clone(),
158 );
159 }
160 }
161
162 async fn load_memory(&mut self, cgroup: &CGroup, tags: &MetricTags) {
164 if let Some(Some(current)) = filter_result_sync(
165 cgroup.load_memory_current(&mut self.buffer).await,
166 "Failed to load cgroups current memory.",
167 ) {
168 self.output
169 .gauge("cgroup_memory_current_bytes", current as f64, tags.clone());
170 }
171
172 if let Some(Some(stat)) = filter_result_sync(
173 cgroup.load_memory_stat(&mut self.buffer).await,
174 "Failed to load cgroups memory statistics.",
175 ) {
176 self.output
177 .gauge("cgroup_memory_anon_bytes", stat.anon as f64, tags.clone());
178 self.output
179 .gauge("cgroup_memory_file_bytes", stat.file as f64, tags.clone());
180 self.output.gauge(
181 "cgroup_memory_anon_active_bytes",
182 stat.active_anon as f64,
183 tags.clone(),
184 );
185 self.output.gauge(
186 "cgroup_memory_anon_inactive_bytes",
187 stat.inactive_anon as f64,
188 tags.clone(),
189 );
190 self.output.gauge(
191 "cgroup_memory_file_active_bytes",
192 stat.active_file as f64,
193 tags.clone(),
194 );
195 self.output.gauge(
196 "cgroup_memory_file_inactive_bytes",
197 stat.inactive_file as f64,
198 tags.clone(),
199 );
200 }
201 }
202}
203
204#[derive(Clone, Debug)]
205pub(super) struct CGroupRoot {
206 name: PathBuf,
207 path: PathBuf,
208 mode: Mode,
209}
210
211#[derive(Clone, Debug)]
212enum Mode {
213 Legacy(PathBuf),
214 Hybrid(PathBuf, PathBuf),
215 Modern(PathBuf),
216}
217
218const CGROUP_CONTROLLERS: &str = "cgroup.controllers";
219
220impl CGroupRoot {
221 pub(super) fn new(config: &CGroupsConfig) -> Option<Self> {
222 let base_dir = config
243 .base_dir
244 .clone()
245 .unwrap_or_else(|| join_path(heim::os::linux::sysfs_root(), "fs/cgroup"));
246
247 let mode = {
248 let hybrid_root = join_path(&base_dir, "unified");
249 let hybrid_test_file = join_path(&hybrid_root, CGROUP_CONTROLLERS);
250 let modern_test_file = join_path(&base_dir, CGROUP_CONTROLLERS);
251 let cpu_dir = join_path(&base_dir, "cpu");
252 if is_file(hybrid_test_file) {
253 debug!(
254 message = "Detected hybrid cgroup base directory.",
255 ?base_dir
256 );
257 Mode::Hybrid(base_dir, hybrid_root)
258 } else if is_file(modern_test_file) {
259 debug!(
260 message = "Detected modern cgroup base directory.",
261 ?base_dir
262 );
263 Mode::Modern(base_dir)
264 } else if is_dir(cpu_dir) {
265 debug!(
266 message = "Detected legacy cgroup base directory.",
267 ?base_dir
268 );
269 Mode::Legacy(base_dir)
270 } else {
271 warn!(
272 message = "Could not detect cgroup base directory.",
273 ?base_dir
274 );
275 return None;
276 }
277 };
278
279 let (path, name) = match &config.base {
280 Some(base) => (base.to_path_buf(), base.to_path_buf()),
281 None => ("/".into(), "/".into()),
282 };
283 Some(Self { name, path, mode })
284 }
285}
286
287#[derive(Clone, Debug)]
288struct CGroup {
289 path: PathBuf,
290 name: PathBuf,
291}
292
293impl CGroup {
294 fn tags(&self) -> MetricTags {
295 metric_tags! {
296 "cgroup" => self.name.to_string_lossy(),
297 "collector" => "cgroups",
298 }
299 }
300
301 fn make_path(&self, filename: impl AsRef<Path>) -> PathBuf {
302 join_path(&self.path, filename)
303 }
304
305 async fn open_read(
309 &self,
310 filename: impl AsRef<Path>,
311 buffer: &mut String,
312 ) -> CGroupsResult<Option<PathBuf>> {
313 buffer.clear();
314 let filename = self.make_path(filename);
315 match File::open(&filename).await {
316 Ok(mut file) => {
317 file.read_to_string(buffer)
318 .await
319 .with_context(|_| ReadingSnafu {
320 filename: filename.clone(),
321 })?;
322 Ok(Some(filename))
323 }
324 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
325 Err(source) => Err(CGroupsError::Opening { source, filename }),
326 }
327 }
328
329 async fn open_read_parse<T: FromStr<Err = ParseIntError>>(
332 &self,
333 filename: impl AsRef<Path>,
334 buffer: &mut String,
335 ) -> CGroupsResult<Option<T>> {
336 self.open_read(filename, buffer)
337 .await?
338 .map(|filename| {
339 buffer
340 .trim()
341 .parse()
342 .with_context(|_| ParsingSnafu { filename })
343 })
344 .transpose()
345 }
346
347 async fn load_cpu(&self, buffer: &mut String) -> CGroupsResult<Option<CpuStat>> {
348 self.open_read_parse("cpu.stat", buffer).await
349 }
350
351 async fn load_memory_current(&self, buffer: &mut String) -> CGroupsResult<Option<u64>> {
352 self.open_read_parse("memory.current", buffer).await
353 }
354
355 async fn load_memory_stat(&self, buffer: &mut String) -> CGroupsResult<Option<MemoryStat>> {
356 self.open_read_parse("memory.stat", buffer).await
357 }
358
359 async fn children(&self) -> io::Result<Vec<CGroup>> {
360 let mut result = Vec::new();
361 let mut dir = fs::read_dir(&self.path).await?;
362 while let Some(entry) = dir.next_entry().await? {
363 let path = entry.path();
364 if is_dir(&path) {
365 let name = join_name(&self.name, entry.file_name());
366 result.push(CGroup { path, name });
367 }
368 }
369 Ok(result)
370 }
371}
372
373macro_rules! define_stat_struct {
374 ($name:ident ( $( $field:ident, )* )) => {
375 #[derive(Clone, Copy, Debug, Default)]
376 struct $name {
377 $( $field: u64, )*
378 }
379
380 impl FromStr for $name {
381 type Err = ParseIntError;
382 fn from_str(text:&str)->Result<Self,Self::Err>{
383 let mut result = Self::default();
384 for line in text.lines(){
385 if false {}
386 $(
387 else if line.starts_with(concat!(stringify!($field), ' ')) {
388 result.$field = line[stringify!($field).len()+1..].parse()?;
389 }
390 )*
391 }
392 Ok(result)
393 }
394 }
395 };
396}
397
398define_stat_struct! { CpuStat(
399 usage_usec,
400 user_usec,
401 system_usec,
402)}
403
404define_stat_struct! { MemoryStat(
405 anon,
411 file,
412 active_anon,
413 inactive_anon,
414 active_file,
415 inactive_file,
416)}
417
418fn is_dir(path: impl AsRef<Path>) -> bool {
419 std::fs::metadata(path.as_ref()).is_ok_and(|metadata| metadata.is_dir())
420}
421
422fn is_file(path: impl AsRef<Path>) -> bool {
423 std::fs::metadata(path.as_ref()).is_ok_and(|metadata| metadata.is_file())
424}
425
426fn join_path(base_path: impl AsRef<Path>, filename: impl AsRef<Path>) -> PathBuf {
428 let filename = filename.as_ref();
429 let base_path = base_path.as_ref();
430 if filename == Path::new("/") {
431 base_path.into()
433 } else {
434 [base_path, filename].iter().collect()
435 }
436}
437
438fn join_name(base_name: &Path, filename: impl AsRef<Path>) -> PathBuf {
440 let filename = filename.as_ref();
441 if base_name == Path::new("/") {
445 filename.into()
446 } else {
447 [base_name, filename].iter().collect()
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use std::collections::BTreeSet;
454 use std::fs::{self, File};
455 use std::io::Write;
456 use std::path::{Path, PathBuf};
457
458 use rand::{rngs::ThreadRng, Rng};
459 use similar_asserts::assert_eq;
460 use tempfile::TempDir;
461 use vector_lib::event::Metric;
462
463 use super::{
464 super::{
465 tests::{count_name, count_tag},
466 HostMetrics, HostMetricsConfig,
467 },
468 join_name, join_path, MetricsBuffer,
469 };
470
471 #[test]
472 fn joins_names_and_paths() {
473 assert_eq!(join_name(Path::new("/"), "foo"), PathBuf::from("foo"));
474 assert_eq!(join_name(Path::new("/"), "/"), PathBuf::from("/"));
475 assert_eq!(join_name(Path::new("foo"), "bar"), PathBuf::from("foo/bar"));
476
477 assert_eq!(join_path("/sys", "foo"), PathBuf::from("/sys/foo"));
478 assert_eq!(join_path("/sys", "/"), PathBuf::from("/sys"));
479 }
480
481 #[tokio::test]
482 async fn generates_cgroups_metrics() {
483 let config: HostMetricsConfig = toml::from_str(r#"collectors = ["cgroups"]"#).unwrap();
484 let mut buffer = MetricsBuffer::new(None);
485 HostMetrics::new(config).cgroups_metrics(&mut buffer).await;
486 let metrics = buffer.metrics;
487
488 assert!(!metrics.is_empty());
489 assert_eq!(count_tag(&metrics, "cgroup"), metrics.len());
490 assert_eq!(count_tag(&metrics, "collector"), metrics.len());
491 assert_ne!(count_name(&metrics, "cgroup_cpu_usage_seconds_total"), 0);
492 assert_ne!(count_name(&metrics, "cgroup_cpu_user_seconds_total"), 0);
493 assert_ne!(count_name(&metrics, "cgroup_cpu_system_seconds_total"), 0);
494 assert_ne!(count_name(&metrics, "cgroup_memory_anon_bytes"), 0);
495 assert_ne!(count_name(&metrics, "cgroup_memory_file_bytes"), 0);
496 assert_ne!(count_name(&metrics, "cgroup_memory_anon_active_bytes"), 0);
497 assert_ne!(count_name(&metrics, "cgroup_memory_anon_inactive_bytes"), 0);
498 assert_ne!(count_name(&metrics, "cgroup_memory_file_active_bytes"), 0);
499 assert_ne!(count_name(&metrics, "cgroup_memory_file_inactive_bytes"), 0);
500 }
501
502 #[tokio::test]
503 async fn parses_modern_cgroups() {
504 let mut base = Setup::new();
508 for subdir in SUBDIRS {
509 base.group(
510 subdir,
511 CPU_STAT | MEMORY_STAT,
512 Some(if subdir == "." {
513 "cpuset cpu memory pids\n"
514 } else {
515 "memory pids\n"
516 }),
517 );
518 }
519 base.test().await;
520 }
521
522 #[tokio::test]
523 async fn parses_hybrid_cgroups_1() {
524 let mut base = Setup::new();
529 base.d("memory");
530 base.d("unified");
531 for subdir in SUBDIRS {
532 base.group(&format!("unified/{subdir}"), CPU_STAT, Some(""));
533 base.group(&format!("memory/{subdir}"), MEMORY_STAT, None);
534 }
535 base.test().await;
536 }
537
538 #[tokio::test]
539 async fn parses_hybrid_cgroups_2() {
540 let mut base = Setup::new();
546 base.group("cpu", CPU_STAT, None); base.d("memory");
548 base.d("unified");
549 for subdir in SUBDIRS {
550 base.group(
551 &format!("unified/{subdir}"),
552 if subdir == "." { NONE } else { CPU_STAT },
553 Some(""),
554 );
555 base.group(&format!("memory/{subdir}"), MEMORY_STAT, None);
556 }
557 base.test().await;
558 }
559
560 #[tokio::test]
561 async fn parses_legacy_cgroups() {
562 let mut base = Setup::new();
565 base.d("cpu");
566 base.d("memory");
567 for subdir in SUBDIRS {
568 base.group(&format!("cpu/{subdir}"), CPU_STAT, None);
569 base.group(&format!("memory/{subdir}"), MEMORY_STAT, None);
570 }
571 }
572
573 const SUBDIRS: [&str; 5] = [
574 ".",
575 "system.slice",
576 "user.slice",
577 "user.slice/user-1000.slice",
578 "user.slice/user-1000.slice/session-40.scope",
579 ];
580
581 const GROUPS: [&str; 5] = ["/", SUBDIRS[1], SUBDIRS[2], SUBDIRS[3], SUBDIRS[4]];
582
583 struct Setup(TempDir, ThreadRng);
584
585 const NONE: usize = 0;
586 const CPU_STAT: usize = 1 << 1;
587 const MEMORY_STAT: usize = 1 << 2;
588
589 impl Setup {
590 fn new() -> Self {
591 Self(tempfile::tempdir().unwrap(), rand::rng())
592 }
593
594 async fn test(&self) {
595 let path = self.0.path();
596 let config: HostMetricsConfig = toml::from_str(&format!(
597 r#"
598 collectors = ["cgroups"]
599 cgroups.base_dir = {path:?}
600 "#
601 ))
602 .unwrap();
603 let mut buffer = MetricsBuffer::new(None);
604 HostMetrics::new(config).cgroups_metrics(&mut buffer).await;
605 let metrics = buffer.metrics;
606
607 assert_ne!(metrics.len(), 0);
608
609 assert_eq!(&all_tags(&metrics, "collector"), &["cgroups"]);
610 assert_eq!(&all_tags(&metrics, "cgroup"), &GROUPS);
611
612 assert_eq!(
613 count_name(&metrics, "cgroup_cpu_usage_seconds_total"),
614 SUBDIRS.len()
615 );
616 assert_eq!(
617 count_name(&metrics, "cgroup_cpu_user_seconds_total"),
618 SUBDIRS.len()
619 );
620 assert_eq!(
621 count_name(&metrics, "cgroup_cpu_system_seconds_total"),
622 SUBDIRS.len()
623 );
624 assert_eq!(
625 count_name(&metrics, "cgroup_memory_anon_bytes"),
626 SUBDIRS.len()
627 );
628 assert_eq!(
629 count_name(&metrics, "cgroup_memory_file_bytes"),
630 SUBDIRS.len()
631 );
632 assert_eq!(
633 count_name(&metrics, "cgroup_memory_anon_active_bytes"),
634 SUBDIRS.len()
635 );
636 assert_eq!(
637 count_name(&metrics, "cgroup_memory_anon_inactive_bytes"),
638 SUBDIRS.len()
639 );
640 assert_eq!(
641 count_name(&metrics, "cgroup_memory_file_active_bytes"),
642 SUBDIRS.len()
643 );
644 assert_eq!(
645 count_name(&metrics, "cgroup_memory_file_inactive_bytes"),
646 SUBDIRS.len()
647 );
648 }
649
650 fn group(&mut self, subdir: &str, flags: usize, controllers: Option<&str>) {
651 self.d(subdir);
652 if let Some(controllers) = controllers {
653 self.f(subdir, "cgroup.controllers", controllers);
654 }
655 if (flags & CPU_STAT) != 0 {
656 self.cpu_stat(subdir);
657 }
658 if (flags & MEMORY_STAT) != 0 {
659 self.memory_stat(subdir);
660 }
661 }
662
663 fn cpu_stat(&mut self, subdir: &str) {
664 let a = self.1.random_range(1000000..1000000000);
665 let b = self.1.random_range(1000000..1000000000);
666 let c = self.1.random_range(1000000..1000000000);
667 self.f(
668 subdir,
669 "cpu.stat",
670 &format!("usage_usec {a}\nuser_usec {b}\nsystem_usec {c}\nnr_periods 0\nnr_throttled 0\nthrottled_usec 0\n"),
671 );
672 }
673
674 fn memory_stat(&mut self, subdir: &str) {
675 let anon = self.1.random_range(1000000..1000000000);
676 let file = self.1.random_range(1000000..1000000000);
677 self.f(
678 subdir,
679 "memory.stat",
680 &format!("anon {anon}\nfile {file}\n",),
681 );
682 }
683
684 fn d(&self, subdir: &str) {
685 let path: PathBuf = [self.0.path(), subdir.as_ref()].iter().collect();
686 fs::create_dir_all(path).unwrap();
687 }
688
689 fn f(&self, subdir: &str, filename: &str, contents: &str) {
690 let path: PathBuf = [self.0.path(), subdir.as_ref(), filename.as_ref()]
691 .iter()
692 .collect();
693 let mut file = File::options()
694 .write(true)
695 .create(true)
696 .truncate(true)
697 .open(path)
698 .unwrap();
699 file.write_all(contents.as_bytes()).unwrap();
700 }
701 }
702
703 fn all_tags(metrics: &[Metric], tag: &str) -> Vec<String> {
704 metrics
705 .iter()
706 .map(|metric| {
707 metric
708 .tags()
709 .expect("The metrics should have tags")
710 .get(tag)
711 .expect("The metric is missing the specified tag")
712 .to_string()
713 })
714 .collect::<BTreeSet<String>>()
715 .into_iter()
716 .collect()
717 }
718}