1use std::{
2 io,
3 num::ParseIntError,
4 path::{Path, PathBuf},
5 str::FromStr,
6};
7
8use futures::future::BoxFuture;
9use snafu::{ResultExt, Snafu};
10use tokio::{
11 fs::{self, File},
12 io::AsyncReadExt,
13};
14use vector_lib::metric_tags;
15
16use super::{CGroupsConfig, HostMetrics, MetricsBuffer, filter_result_sync};
17use crate::event::MetricTags;
18
19const MICROSECONDS: f64 = 1.0 / 1_000_000.0;
20
21#[derive(Debug, Snafu)]
22enum CGroupsError {
23 #[snafu(display("Could not open cgroup data file {:?}.", filename))]
24 Opening {
25 filename: PathBuf,
26 source: io::Error,
27 },
28 #[snafu(display("Could not read cgroup data file {:?}.", filename))]
29 Reading {
30 filename: PathBuf,
31 source: io::Error,
32 },
33 #[snafu(display("Could not parse cgroup data file {:?}.", filename))]
34 Parsing {
35 filename: PathBuf,
36 source: ParseIntError,
37 },
38}
39
40type CGroupsResult<T> = Result<T, CGroupsError>;
41
42impl HostMetrics {
43 pub(super) async fn cgroups_metrics(&self, output: &mut MetricsBuffer) {
44 if let Some(root) = &self.root_cgroup {
45 output.name = "cgroups";
46 let mut recurser = CGroupRecurser::new(self, output);
47 match &root.mode {
48 Mode::Modern(base) => recurser.scan_modern(root, base).await,
49 Mode::Legacy(base) => recurser.scan_legacy(root, base).await,
50 Mode::Hybrid(v1base, v2base) => {
51 recurser.scan_legacy(root, v1base).await;
59 recurser.scan_modern(root, v2base).await;
60 }
61 }
62 }
63 }
64}
65
66struct CGroupRecurser<'a> {
67 output: &'a mut MetricsBuffer,
68 buffer: String,
69 load_cpu: bool,
70 load_memory: bool,
71 config: CGroupsConfig,
72}
73
74impl<'a> CGroupRecurser<'a> {
75 fn new(host: &'a HostMetrics, output: &'a mut MetricsBuffer) -> Self {
76 let cgroups = host.config.cgroups.clone().unwrap_or_default();
77
78 Self {
79 output,
80 buffer: String::new(),
81 load_cpu: true,
82 load_memory: true,
83 config: cgroups,
84 }
85 }
86
87 async fn scan_modern(&mut self, root: &CGroupRoot, base: &Path) {
88 let cgroup = CGroup {
89 path: join_path(base, &root.path),
90 name: root.name.clone(),
91 };
92 self.load_cpu = true;
93 self.load_memory = true;
94 self.recurse(cgroup, 1).await;
95 }
96
97 async fn scan_legacy(&mut self, root: &CGroupRoot, base: &Path) {
98 let memory_base = join_path(base, "memory");
99 let cgroup = CGroup {
100 path: join_path(memory_base, &root.path),
101 name: root.name.clone(),
102 };
103 self.load_cpu = false;
104 self.load_memory = true;
105 self.recurse(cgroup, 1).await;
106
107 let cpu_base = join_path(base, "cpu");
108 let cgroup = CGroup {
109 path: join_path(cpu_base, &root.path),
110 name: root.name.clone(),
111 };
112 self.load_cpu = true;
113 self.load_memory = false;
114 self.recurse(cgroup, 1).await;
115 }
116
117 fn recurse(&mut self, cgroup: CGroup, level: usize) -> BoxFuture<'_, ()> {
118 Box::pin(async move {
119 let tags = cgroup.tags();
120
121 if self.load_cpu {
122 self.load_cpu(&cgroup, &tags).await;
123 }
124 if self.load_memory {
125 self.load_memory(&cgroup, &tags).await;
126 }
127
128 if level < self.config.levels {
129 let groups = self.config.groups.clone();
130 if let Some(children) =
131 filter_result_sync(cgroup.children().await, "Failed to load cgroups children.")
132 {
133 for child in children {
134 if groups.contains_path(Some(&child.name)) {
135 self.recurse(child, level + 1).await;
136 }
137 }
138 }
139 }
140 })
141 }
142
143 async fn load_cpu(&mut self, cgroup: &CGroup, tags: &MetricTags) {
145 if let Some(Some(cpu)) = filter_result_sync(
146 cgroup.load_cpu(&mut self.buffer).await,
147 "Failed to load cgroups CPU statistics.",
148 ) {
149 self.output.counter(
150 "cgroup_cpu_usage_seconds_total",
151 cpu.usage_usec as f64 * MICROSECONDS,
152 tags.clone(),
153 );
154 self.output.counter(
155 "cgroup_cpu_user_seconds_total",
156 cpu.user_usec as f64 * MICROSECONDS,
157 tags.clone(),
158 );
159 self.output.counter(
160 "cgroup_cpu_system_seconds_total",
161 cpu.system_usec as f64 * MICROSECONDS,
162 tags.clone(),
163 );
164 }
165 }
166
167 async fn load_memory(&mut self, cgroup: &CGroup, tags: &MetricTags) {
169 if let Some(Some(current)) = filter_result_sync(
170 cgroup.load_memory_current(&mut self.buffer).await,
171 "Failed to load cgroups current memory.",
172 ) {
173 self.output
174 .gauge("cgroup_memory_current_bytes", current as f64, tags.clone());
175 }
176
177 if let Some(Some(stat)) = filter_result_sync(
178 cgroup.load_memory_stat(&mut self.buffer).await,
179 "Failed to load cgroups memory statistics.",
180 ) {
181 self.output
182 .gauge("cgroup_memory_anon_bytes", stat.anon as f64, tags.clone());
183 self.output
184 .gauge("cgroup_memory_file_bytes", stat.file as f64, tags.clone());
185 self.output.gauge(
186 "cgroup_memory_anon_active_bytes",
187 stat.active_anon as f64,
188 tags.clone(),
189 );
190 self.output.gauge(
191 "cgroup_memory_anon_inactive_bytes",
192 stat.inactive_anon as f64,
193 tags.clone(),
194 );
195 self.output.gauge(
196 "cgroup_memory_file_active_bytes",
197 stat.active_file as f64,
198 tags.clone(),
199 );
200 self.output.gauge(
201 "cgroup_memory_file_inactive_bytes",
202 stat.inactive_file as f64,
203 tags.clone(),
204 );
205 }
206 }
207}
208
209#[derive(Clone, Debug)]
210pub(super) struct CGroupRoot {
211 name: PathBuf,
212 path: PathBuf,
213 mode: Mode,
214}
215
216#[derive(Clone, Debug)]
217enum Mode {
218 Legacy(PathBuf),
219 Hybrid(PathBuf, PathBuf),
220 Modern(PathBuf),
221}
222
223const CGROUP_CONTROLLERS: &str = "cgroup.controllers";
224
225impl CGroupRoot {
226 pub(super) fn new(config: &CGroupsConfig) -> Option<Self> {
227 let base_dir = config
248 .base_dir
249 .clone()
250 .unwrap_or_else(|| join_path(heim::os::linux::sysfs_root(), "fs/cgroup"));
251
252 let mode = {
253 let hybrid_root = join_path(&base_dir, "unified");
254 let hybrid_test_file = join_path(&hybrid_root, CGROUP_CONTROLLERS);
255 let modern_test_file = join_path(&base_dir, CGROUP_CONTROLLERS);
256 let cpu_dir = join_path(&base_dir, "cpu");
257 if is_file(hybrid_test_file) {
258 debug!(
259 message = "Detected hybrid cgroup base directory.",
260 ?base_dir
261 );
262 Mode::Hybrid(base_dir, hybrid_root)
263 } else if is_file(modern_test_file) {
264 debug!(
265 message = "Detected modern cgroup base directory.",
266 ?base_dir
267 );
268 Mode::Modern(base_dir)
269 } else if is_dir(cpu_dir) {
270 debug!(
271 message = "Detected legacy cgroup base directory.",
272 ?base_dir
273 );
274 Mode::Legacy(base_dir)
275 } else {
276 warn!(
277 message = "Could not detect cgroup base directory.",
278 ?base_dir
279 );
280 return None;
281 }
282 };
283
284 let (path, name) = match &config.base {
285 Some(base) => (base.to_path_buf(), base.to_path_buf()),
286 None => ("/".into(), "/".into()),
287 };
288 Some(Self { name, path, mode })
289 }
290}
291
292#[derive(Clone, Debug)]
293struct CGroup {
294 path: PathBuf,
295 name: PathBuf,
296}
297
298impl CGroup {
299 fn tags(&self) -> MetricTags {
300 metric_tags! {
301 "cgroup" => self.name.to_string_lossy(),
302 "collector" => "cgroups",
303 }
304 }
305
306 fn make_path(&self, filename: impl AsRef<Path>) -> PathBuf {
307 join_path(&self.path, filename)
308 }
309
310 async fn open_read(
314 &self,
315 filename: impl AsRef<Path>,
316 buffer: &mut String,
317 ) -> CGroupsResult<Option<PathBuf>> {
318 buffer.clear();
319 let filename = self.make_path(filename);
320 match File::open(&filename).await {
321 Ok(mut file) => {
322 file.read_to_string(buffer)
323 .await
324 .with_context(|_| ReadingSnafu {
325 filename: filename.clone(),
326 })?;
327 Ok(Some(filename))
328 }
329 Err(error) if error.kind() == io::ErrorKind::NotFound => Ok(None),
330 Err(source) => Err(CGroupsError::Opening { source, filename }),
331 }
332 }
333
334 async fn open_read_parse<T: FromStr<Err = ParseIntError>>(
337 &self,
338 filename: impl AsRef<Path>,
339 buffer: &mut String,
340 ) -> CGroupsResult<Option<T>> {
341 self.open_read(filename, buffer)
342 .await?
343 .map(|filename| {
344 buffer
345 .trim()
346 .parse()
347 .with_context(|_| ParsingSnafu { filename })
348 })
349 .transpose()
350 }
351
352 async fn load_cpu(&self, buffer: &mut String) -> CGroupsResult<Option<CpuStat>> {
353 self.open_read_parse("cpu.stat", buffer).await
354 }
355
356 async fn load_memory_current(&self, buffer: &mut String) -> CGroupsResult<Option<u64>> {
357 self.open_read_parse("memory.current", buffer).await
358 }
359
360 async fn load_memory_stat(&self, buffer: &mut String) -> CGroupsResult<Option<MemoryStat>> {
361 self.open_read_parse("memory.stat", buffer).await
362 }
363
364 async fn children(&self) -> io::Result<Vec<CGroup>> {
365 let mut result = Vec::new();
366 let mut dir = fs::read_dir(&self.path).await?;
367 while let Some(entry) = dir.next_entry().await? {
368 let path = entry.path();
369 if is_dir(&path) {
370 let name = join_name(&self.name, entry.file_name());
371 result.push(CGroup { path, name });
372 }
373 }
374 Ok(result)
375 }
376}
377
378macro_rules! define_stat_struct {
379 ($name:ident ( $( $field:ident, )* )) => {
380 #[derive(Clone, Copy, Debug, Default)]
381 struct $name {
382 $( $field: u64, )*
383 }
384
385 impl FromStr for $name {
386 type Err = ParseIntError;
387 fn from_str(text:&str)->Result<Self,Self::Err>{
388 let mut result = Self::default();
389 for line in text.lines(){
390 if false {}
391 $(
392 else if line.starts_with(concat!(stringify!($field), ' ')) {
393 result.$field = line[stringify!($field).len()+1..].parse()?;
394 }
395 )*
396 }
397 Ok(result)
398 }
399 }
400 };
401}
402
403define_stat_struct! { CpuStat(
404 usage_usec,
405 user_usec,
406 system_usec,
407)}
408
409define_stat_struct! { MemoryStat(
410 anon,
416 file,
417 active_anon,
418 inactive_anon,
419 active_file,
420 inactive_file,
421)}
422
423fn is_dir(path: impl AsRef<Path>) -> bool {
424 std::fs::metadata(path.as_ref()).is_ok_and(|metadata| metadata.is_dir())
425}
426
427fn is_file(path: impl AsRef<Path>) -> bool {
428 std::fs::metadata(path.as_ref()).is_ok_and(|metadata| metadata.is_file())
429}
430
431fn join_path(base_path: impl AsRef<Path>, filename: impl AsRef<Path>) -> PathBuf {
433 let filename = filename.as_ref();
434 let base_path = base_path.as_ref();
435 if filename == Path::new("/") {
436 base_path.into()
438 } else {
439 [base_path, filename].iter().collect()
440 }
441}
442
443fn join_name(base_name: &Path, filename: impl AsRef<Path>) -> PathBuf {
445 let filename = filename.as_ref();
446 if base_name == Path::new("/") {
450 filename.into()
451 } else {
452 [base_name, filename].iter().collect()
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use std::{
459 collections::BTreeSet,
460 fs::{self, File},
461 io::Write,
462 path::{Path, PathBuf},
463 };
464
465 use rand::{Rng, rngs::ThreadRng};
466 use similar_asserts::assert_eq;
467 use tempfile::TempDir;
468 use vector_lib::event::Metric;
469
470 use super::{
471 super::{
472 HostMetrics, HostMetricsConfig,
473 tests::{count_name, count_tag},
474 },
475 MetricsBuffer, join_name, join_path,
476 };
477
478 #[test]
479 fn joins_names_and_paths() {
480 assert_eq!(join_name(Path::new("/"), "foo"), PathBuf::from("foo"));
481 assert_eq!(join_name(Path::new("/"), "/"), PathBuf::from("/"));
482 assert_eq!(join_name(Path::new("foo"), "bar"), PathBuf::from("foo/bar"));
483
484 assert_eq!(join_path("/sys", "foo"), PathBuf::from("/sys/foo"));
485 assert_eq!(join_path("/sys", "/"), PathBuf::from("/sys"));
486 }
487
488 #[tokio::test]
489 async fn generates_cgroups_metrics() {
490 let config: HostMetricsConfig = toml::from_str(r#"collectors = ["cgroups"]"#).unwrap();
491 let mut buffer = MetricsBuffer::new(None);
492 HostMetrics::new(config).cgroups_metrics(&mut buffer).await;
493 let metrics = buffer.metrics;
494
495 assert!(!metrics.is_empty());
496 assert_eq!(count_tag(&metrics, "cgroup"), metrics.len());
497 assert_eq!(count_tag(&metrics, "collector"), metrics.len());
498 assert_ne!(count_name(&metrics, "cgroup_cpu_usage_seconds_total"), 0);
499 assert_ne!(count_name(&metrics, "cgroup_cpu_user_seconds_total"), 0);
500 assert_ne!(count_name(&metrics, "cgroup_cpu_system_seconds_total"), 0);
501 assert_ne!(count_name(&metrics, "cgroup_memory_anon_bytes"), 0);
502 assert_ne!(count_name(&metrics, "cgroup_memory_file_bytes"), 0);
503 assert_ne!(count_name(&metrics, "cgroup_memory_anon_active_bytes"), 0);
504 assert_ne!(count_name(&metrics, "cgroup_memory_anon_inactive_bytes"), 0);
505 assert_ne!(count_name(&metrics, "cgroup_memory_file_active_bytes"), 0);
506 assert_ne!(count_name(&metrics, "cgroup_memory_file_inactive_bytes"), 0);
507 }
508
509 #[tokio::test]
510 async fn parses_modern_cgroups() {
511 let mut base = Setup::new();
515 for subdir in SUBDIRS {
516 base.group(
517 subdir,
518 CPU_STAT | MEMORY_STAT,
519 Some(if subdir == "." {
520 "cpuset cpu memory pids\n"
521 } else {
522 "memory pids\n"
523 }),
524 );
525 }
526 base.test().await;
527 }
528
529 #[tokio::test]
530 async fn parses_hybrid_cgroups_1() {
531 let mut base = Setup::new();
536 base.d("memory");
537 base.d("unified");
538 for subdir in SUBDIRS {
539 base.group(&format!("unified/{subdir}"), CPU_STAT, Some(""));
540 base.group(&format!("memory/{subdir}"), MEMORY_STAT, None);
541 }
542 base.test().await;
543 }
544
545 #[tokio::test]
546 async fn parses_hybrid_cgroups_2() {
547 let mut base = Setup::new();
553 base.group("cpu", CPU_STAT, None); base.d("memory");
555 base.d("unified");
556 for subdir in SUBDIRS {
557 base.group(
558 &format!("unified/{subdir}"),
559 if subdir == "." { NONE } else { CPU_STAT },
560 Some(""),
561 );
562 base.group(&format!("memory/{subdir}"), MEMORY_STAT, None);
563 }
564 base.test().await;
565 }
566
567 #[tokio::test]
568 async fn parses_legacy_cgroups() {
569 let mut base = Setup::new();
572 base.d("cpu");
573 base.d("memory");
574 for subdir in SUBDIRS {
575 base.group(&format!("cpu/{subdir}"), CPU_STAT, None);
576 base.group(&format!("memory/{subdir}"), MEMORY_STAT, None);
577 }
578 }
579
580 const SUBDIRS: [&str; 5] = [
581 ".",
582 "system.slice",
583 "user.slice",
584 "user.slice/user-1000.slice",
585 "user.slice/user-1000.slice/session-40.scope",
586 ];
587
588 const GROUPS: [&str; 5] = ["/", SUBDIRS[1], SUBDIRS[2], SUBDIRS[3], SUBDIRS[4]];
589
590 struct Setup(TempDir, ThreadRng);
591
592 const NONE: usize = 0;
593 const CPU_STAT: usize = 1 << 1;
594 const MEMORY_STAT: usize = 1 << 2;
595
596 impl Setup {
597 fn new() -> Self {
598 Self(tempfile::tempdir().unwrap(), rand::rng())
599 }
600
601 async fn test(&self) {
602 let path = self.0.path();
603 let config: HostMetricsConfig = toml::from_str(&format!(
604 r#"
605 collectors = ["cgroups"]
606 cgroups.base_dir = {path:?}
607 "#
608 ))
609 .unwrap();
610 let mut buffer = MetricsBuffer::new(None);
611 HostMetrics::new(config).cgroups_metrics(&mut buffer).await;
612 let metrics = buffer.metrics;
613
614 assert_ne!(metrics.len(), 0);
615
616 assert_eq!(&all_tags(&metrics, "collector"), &["cgroups"]);
617 assert_eq!(&all_tags(&metrics, "cgroup"), &GROUPS);
618
619 assert_eq!(
620 count_name(&metrics, "cgroup_cpu_usage_seconds_total"),
621 SUBDIRS.len()
622 );
623 assert_eq!(
624 count_name(&metrics, "cgroup_cpu_user_seconds_total"),
625 SUBDIRS.len()
626 );
627 assert_eq!(
628 count_name(&metrics, "cgroup_cpu_system_seconds_total"),
629 SUBDIRS.len()
630 );
631 assert_eq!(
632 count_name(&metrics, "cgroup_memory_anon_bytes"),
633 SUBDIRS.len()
634 );
635 assert_eq!(
636 count_name(&metrics, "cgroup_memory_file_bytes"),
637 SUBDIRS.len()
638 );
639 assert_eq!(
640 count_name(&metrics, "cgroup_memory_anon_active_bytes"),
641 SUBDIRS.len()
642 );
643 assert_eq!(
644 count_name(&metrics, "cgroup_memory_anon_inactive_bytes"),
645 SUBDIRS.len()
646 );
647 assert_eq!(
648 count_name(&metrics, "cgroup_memory_file_active_bytes"),
649 SUBDIRS.len()
650 );
651 assert_eq!(
652 count_name(&metrics, "cgroup_memory_file_inactive_bytes"),
653 SUBDIRS.len()
654 );
655 }
656
657 fn group(&mut self, subdir: &str, flags: usize, controllers: Option<&str>) {
658 self.d(subdir);
659 if let Some(controllers) = controllers {
660 self.f(subdir, "cgroup.controllers", controllers);
661 }
662 if (flags & CPU_STAT) != 0 {
663 self.cpu_stat(subdir);
664 }
665 if (flags & MEMORY_STAT) != 0 {
666 self.memory_stat(subdir);
667 }
668 }
669
670 fn cpu_stat(&mut self, subdir: &str) {
671 let a = self.1.random_range(1000000..1000000000);
672 let b = self.1.random_range(1000000..1000000000);
673 let c = self.1.random_range(1000000..1000000000);
674 self.f(
675 subdir,
676 "cpu.stat",
677 &format!("usage_usec {a}\nuser_usec {b}\nsystem_usec {c}\nnr_periods 0\nnr_throttled 0\nthrottled_usec 0\n"),
678 );
679 }
680
681 fn memory_stat(&mut self, subdir: &str) {
682 let anon = self.1.random_range(1000000..1000000000);
683 let file = self.1.random_range(1000000..1000000000);
684 self.f(
685 subdir,
686 "memory.stat",
687 &format!("anon {anon}\nfile {file}\n",),
688 );
689 }
690
691 fn d(&self, subdir: &str) {
692 let path: PathBuf = [self.0.path(), subdir.as_ref()].iter().collect();
693 fs::create_dir_all(path).unwrap();
694 }
695
696 fn f(&self, subdir: &str, filename: &str, contents: &str) {
697 let path: PathBuf = [self.0.path(), subdir.as_ref(), filename.as_ref()]
698 .iter()
699 .collect();
700 let mut file = File::options()
701 .write(true)
702 .create(true)
703 .truncate(true)
704 .open(path)
705 .unwrap();
706 file.write_all(contents.as_bytes()).unwrap();
707 }
708 }
709
710 fn all_tags(metrics: &[Metric], tag: &str) -> Vec<String> {
711 metrics
712 .iter()
713 .map(|metric| {
714 metric
715 .tags()
716 .expect("The metrics should have tags")
717 .get(tag)
718 .expect("The metric is missing the specified tag")
719 .to_string()
720 })
721 .collect::<BTreeSet<String>>()
722 .into_iter()
723 .collect()
724 }
725}