1#![deny(missing_docs)]
4
5use std::path::PathBuf;
6
7use k8s_openapi::api::core::v1::{Namespace, Pod};
8use kube::runtime::reflector::{ObjectRef, store::Store};
9use vector_lib::file_source::paths_provider::PathsProvider;
10
11use super::path_helpers::build_pod_logs_directory;
12use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum;
13
14pub struct K8sPathsProvider {
17 pod_state: Store<Pod>,
18 namespace_state: Store<Namespace>,
19 include_paths: Vec<glob::Pattern>,
20 exclude_paths: Vec<glob::Pattern>,
21 insert_namespace_fields: bool,
22}
23
24impl K8sPathsProvider {
25 pub const fn new(
27 pod_state: Store<Pod>,
28 namespace_state: Store<Namespace>,
29 include_paths: Vec<glob::Pattern>,
30 exclude_paths: Vec<glob::Pattern>,
31 insert_namespace_fields: bool,
32 ) -> Self {
33 Self {
34 pod_state,
35 namespace_state,
36 include_paths,
37 exclude_paths,
38 insert_namespace_fields,
39 }
40 }
41}
42
43impl PathsProvider for K8sPathsProvider {
44 type IntoIter = Vec<PathBuf>;
45
46 fn paths(&self) -> Vec<PathBuf> {
47 let state = self.pod_state.state();
48
49 state
50 .into_iter()
51 .filter(|pod| {
55 if !self.insert_namespace_fields {
56 return true;
58 }
59 trace!(message = "Verifying Namespace metadata for pod.", pod = ?pod.metadata.name);
60 if let Some(namespace) = pod.metadata.namespace.as_ref() {
61 self.namespace_state
62 .get(&ObjectRef::<Namespace>::new(namespace))
63 .is_some()
64 } else {
65 false
66 }
67 })
68 .flat_map(|pod| {
69 trace!(message = "Providing log paths for pod.", pod = ?pod.metadata.name);
70 let paths_iter = list_pod_log_paths(real_glob, pod.as_ref());
71 filter_paths(
72 filter_paths(paths_iter, &self.include_paths, true),
73 &self.exclude_paths,
74 false,
75 )
76 .collect::<Vec<_>>()
77 })
78 .collect()
79 }
80}
81
82fn extract_pod_logs_directory(pod: &Pod) -> Option<PathBuf> {
102 let metadata = &pod.metadata;
103 let namespace = metadata.namespace.as_ref()?;
104 let name = metadata.name.as_ref()?;
105
106 let uid = if let Some(static_pod_config_hashsum) = extract_static_pod_config_hashsum(metadata) {
107 static_pod_config_hashsum
109 } else {
110 metadata.uid.as_ref()?
112 };
113
114 Some(build_pod_logs_directory(namespace, name, uid))
115}
116
117const CONTAINER_EXCLUSION_ANNOTATION_KEY: &str = "vector.dev/exclude-containers";
118
119fn extract_excluded_containers_for_pod(pod: &Pod) -> impl Iterator<Item = &str> {
120 let metadata = &pod.metadata;
121 metadata.annotations.iter().flat_map(|annotations| {
122 annotations
123 .iter()
124 .filter_map(|(key, value)| {
125 if key != CONTAINER_EXCLUSION_ANNOTATION_KEY {
126 return None;
127 }
128 Some(value)
129 })
130 .flat_map(|containers| containers.split(','))
131 .map(|container| container.trim())
132 })
133}
134
135fn build_container_exclusion_patterns<'a>(
136 pod_logs_dir: &'a str,
137 containers: impl Iterator<Item = &'a str> + 'a,
138) -> impl Iterator<Item = glob::Pattern> + 'a {
139 containers.filter_map(move |container| {
140 let escaped_container_name = glob::Pattern::escape(container);
141 glob::Pattern::new(&[pod_logs_dir, &escaped_container_name, "**"].join("/")).ok()
142 })
143}
144
145fn list_pod_log_paths<'a, G, GI>(
146 mut glob_impl: G,
147 pod: &'a Pod,
148) -> impl Iterator<Item = PathBuf> + 'a
149where
150 G: FnMut(&str) -> GI + 'a,
151 GI: Iterator<Item = PathBuf> + 'a,
152{
153 extract_pod_logs_directory(pod)
154 .into_iter()
155 .flat_map(move |dir| {
156 let dir = dir
157 .to_str()
158 .expect("non-utf8 path to pod logs dir is not supported");
159
160 let path_iter = glob_impl(
162 &[dir, "*/*.log*"].join("/"),
169 );
170
171 let excluded_containers = extract_excluded_containers_for_pod(pod);
174 let exclusion_patterns: Vec<_> =
175 build_container_exclusion_patterns(dir, excluded_containers).collect();
176
177 filter_paths(path_iter, exclusion_patterns, false)
179 })
180}
181
182fn real_glob(pattern: &str) -> impl Iterator<Item = PathBuf> + use<> {
183 glob::glob_with(
184 pattern,
185 glob::MatchOptions {
186 require_literal_separator: true,
187 ..Default::default()
188 },
189 )
190 .expect("the pattern is supposed to always be correct")
191 .flat_map(|paths| paths.into_iter())
192}
193
194fn filter_paths<'a>(
195 iter: impl Iterator<Item = PathBuf> + 'a,
196 patterns: impl AsRef<[glob::Pattern]> + 'a,
197 include: bool,
198) -> impl Iterator<Item = PathBuf> + 'a {
199 iter.filter(move |path| {
200 let m = patterns.as_ref().iter().any(|pattern| {
201 pattern.matches_path_with(
202 path,
203 glob::MatchOptions {
204 require_literal_separator: true,
205 ..Default::default()
206 },
207 )
208 });
209 if include { m } else { !m }
210 })
211}
212
213#[cfg(test)]
214mod tests {
215 use std::path::PathBuf;
216
217 use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta};
218
219 use super::{
220 build_container_exclusion_patterns, extract_excluded_containers_for_pod,
221 extract_pod_logs_directory, filter_paths, list_pod_log_paths,
222 };
223
224 #[test]
225 fn test_extract_pod_logs_directory() {
226 let cases = vec![
227 (Pod::default(), None),
229 (
231 Pod {
232 metadata: ObjectMeta {
233 namespace: Some("sandbox0-ns".to_owned()),
234 name: Some("sandbox0-name".to_owned()),
235 uid: Some("sandbox0-uid".to_owned()),
236 ..ObjectMeta::default()
237 },
238 ..Pod::default()
239 },
240 Some("/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid"),
241 ),
242 (
244 Pod {
245 metadata: ObjectMeta {
246 namespace: Some("sandbox0-ns".to_owned()),
247 name: Some("sandbox0-name".to_owned()),
248 ..ObjectMeta::default()
249 },
250 ..Pod::default()
251 },
252 None,
253 ),
254 (
256 Pod {
257 metadata: ObjectMeta {
258 namespace: Some("sandbox0-ns".to_owned()),
259 uid: Some("sandbox0-uid".to_owned()),
260 ..ObjectMeta::default()
261 },
262 ..Pod::default()
263 },
264 None,
265 ),
266 (
268 Pod {
269 metadata: ObjectMeta {
270 name: Some("sandbox0-name".to_owned()),
271 uid: Some("sandbox0-uid".to_owned()),
272 ..ObjectMeta::default()
273 },
274 ..Pod::default()
275 },
276 None,
277 ),
278 (
280 Pod {
281 metadata: ObjectMeta {
282 namespace: Some("sandbox0-ns".to_owned()),
283 name: Some("sandbox0-name".to_owned()),
284 uid: Some("sandbox0-uid".to_owned()),
285 annotations: Some(
286 vec![(
287 "kubernetes.io/config.mirror".to_owned(),
288 "sandbox0-config-hashsum".to_owned(),
289 )]
290 .into_iter()
291 .collect(),
292 ),
293 ..ObjectMeta::default()
294 },
295 ..Pod::default()
296 },
297 Some("/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-config-hashsum"),
298 ),
299 ];
300
301 for (pod, expected) in cases {
302 assert_eq!(
303 extract_pod_logs_directory(&pod),
304 expected.map(PathBuf::from)
305 );
306 }
307 }
308
309 #[test]
310 fn test_extract_excluded_containers_for_pod() {
311 let cases = vec![
312 (Pod::default(), vec![]),
314 (
316 Pod {
317 metadata: ObjectMeta {
318 annotations: Some(vec![].into_iter().collect()),
319 ..ObjectMeta::default()
320 },
321 ..Pod::default()
322 },
323 vec![],
324 ),
325 (
327 Pod {
328 metadata: ObjectMeta {
329 annotations: Some(
330 vec![("some-other-annotation".to_owned(), "some value".to_owned())]
331 .into_iter()
332 .collect(),
333 ),
334 ..ObjectMeta::default()
335 },
336 ..Pod::default()
337 },
338 vec![],
339 ),
340 (
342 Pod {
343 metadata: ObjectMeta {
344 annotations: Some(
345 vec![(
346 super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
347 "container1,container4".to_owned(),
348 )]
349 .into_iter()
350 .collect(),
351 ),
352 ..ObjectMeta::default()
353 },
354 ..Pod::default()
355 },
356 vec!["container1", "container4"],
357 ),
358 (
360 Pod {
361 metadata: ObjectMeta {
362 annotations: Some(
363 vec![(
364 super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
365 "container1, container4".to_owned(),
366 )]
367 .into_iter()
368 .collect(),
369 ),
370 ..ObjectMeta::default()
371 },
372 ..Pod::default()
373 },
374 vec!["container1", "container4"],
375 ),
376 ];
377
378 for (pod, expected) in cases {
379 let actual: Vec<&str> = extract_excluded_containers_for_pod(&pod).collect();
380 assert_eq!(actual, expected);
381 }
382 }
383
384 #[test]
385 fn test_list_pod_log_paths() {
386 let cases = vec![
387 (
390 Pod {
391 metadata: ObjectMeta {
392 namespace: Some("sandbox0-ns".to_owned()),
393 name: Some("sandbox0-name".to_owned()),
394 uid: Some("sandbox0-uid".to_owned()),
395 annotations: Some(
396 vec![(
397 super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
398 "excluded1,excluded2".to_owned(),
399 )]
400 .into_iter()
401 .collect(),
402 ),
403 ..ObjectMeta::default()
404 },
405 ..Pod::default()
406 },
407 vec![(
409 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/*/*.log*",
411 vec![
413 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/qwe.log",
414 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/qwe.log",
415 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/excluded1/qwe.log",
416 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container3/qwe.log",
417 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/excluded2/qwe.log",
418 ],
419 )],
420 vec![
422 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/qwe.log",
423 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/qwe.log",
424 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container3/qwe.log",
425 ],
426 ),
427 (Pod::default(), vec![], vec![]),
429 (
431 Pod {
432 metadata: ObjectMeta {
433 namespace: Some("sandbox0-ns".to_owned()),
434 name: Some("sandbox0-name".to_owned()),
435 uid: Some("sandbox0-uid".to_owned()),
436 ..ObjectMeta::default()
437 },
438 ..Pod::default()
439 },
440 vec![(
441 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/*/*.log*",
442 vec![],
443 )],
444 vec![],
445 ),
446 ];
447
448 for (pod, expected_calls, expected_paths) in cases {
449 let mut expected_calls = expected_calls.into_iter();
451 let mock_glob = move |pattern: &str| {
452 let (expected_pattern, paths_to_return) = expected_calls
453 .next()
454 .expect("implementation did a call that wasn't expected");
455
456 assert_eq!(pattern, expected_pattern);
457 paths_to_return.into_iter().map(PathBuf::from)
458 };
459
460 let actual_paths: Vec<_> = list_pod_log_paths(mock_glob, &pod).collect();
461 let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
462 assert_eq!(actual_paths, expected_paths)
463 }
464 }
465
466 #[test]
467 fn test_exclude_paths() {
468 let cases = vec![
469 (
471 vec![
472 "/var/log/pods/a.log",
473 "/var/log/pods/b.log",
474 "/var/log/pods/c.log.foo",
475 "/var/log/pods/d.logbar",
476 ],
477 vec![],
478 vec![
479 "/var/log/pods/a.log",
480 "/var/log/pods/b.log",
481 "/var/log/pods/c.log.foo",
482 "/var/log/pods/d.logbar",
483 ],
484 ),
485 (
487 vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
488 vec!["notmatched"],
489 vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
490 ),
491 (
493 vec![
494 "/var/log/pods/a.log",
495 "/var/log/pods/b.log",
496 "/var/log/pods/c.log",
497 ],
498 vec!["notmatched", "**/b.log", "**/c.log"],
499 vec!["/var/log/pods/a.log"],
500 ),
501 (
503 vec![
504 "/var/log/pods/a.log",
505 "/var/log/pods/b.log",
506 "/var/log/pods/c.log",
507 ],
508 vec!["*/b.log", "**/c.log"],
509 vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
510 ),
511 (
513 vec![
514 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/1.log",
515 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/2.log",
516 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/1.log",
517 ],
518 vec!["**/container1/**"],
519 vec!["/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/1.log"],
520 ),
521 ];
522
523 for (input_paths, str_patterns, expected_paths) in cases {
524 let patterns: Vec<_> = str_patterns
525 .iter()
526 .map(|pattern| glob::Pattern::new(pattern).unwrap())
527 .collect();
528 let actual_paths: Vec<_> =
529 filter_paths(input_paths.into_iter().map(Into::into), &patterns, false).collect();
530 let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
531 assert_eq!(
532 actual_paths, expected_paths,
533 "failed for patterns {:?}",
534 &str_patterns
535 )
536 }
537 }
538
539 #[test]
540 fn test_include_paths() {
541 let cases = vec![
542 (
543 vec![
544 "/var/log/pods/a.log",
545 "/var/log/pods/b.log",
546 "/var/log/pods/c.log.foo",
547 "/var/log/pods/d.logbar",
548 "/tmp/foo",
549 ],
550 vec!["/var/log/pods/*"],
551 vec![
552 "/var/log/pods/a.log",
553 "/var/log/pods/b.log",
554 "/var/log/pods/c.log.foo",
555 "/var/log/pods/d.logbar",
556 ],
557 ),
558 (
559 vec![
560 "/var/log/pods/a.log",
561 "/var/log/pods/b.log",
562 "/var/log/pods/c.log.foo",
563 "/var/log/pods/d.logbar",
564 ],
565 vec!["/tmp/*"],
566 vec![],
567 ),
568 (
569 vec!["/var/log/pods/a.log", "/tmp/foo"],
570 vec!["**/*"],
571 vec!["/var/log/pods/a.log", "/tmp/foo"],
572 ),
573 ];
574
575 for (input_paths, str_patterns, expected_paths) in cases {
576 let patterns: Vec<_> = str_patterns
577 .iter()
578 .map(|pattern| glob::Pattern::new(pattern).unwrap())
579 .collect();
580 let actual_paths: Vec<_> =
581 filter_paths(input_paths.into_iter().map(Into::into), &patterns, true).collect();
582 let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
583 assert_eq!(
584 actual_paths, expected_paths,
585 "failed for patterns {:?}",
586 &str_patterns
587 )
588 }
589 }
590
591 #[test]
592 fn test_build_container_exclusion_patterns() {
593 let cases = vec![
594 (
596 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
597 vec![],
598 vec![],
599 ),
600 (
602 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
603 vec!["container1", "container2"],
604 vec![
605 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/**",
606 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/**",
607 ],
608 ),
609 (
611 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
612 vec!["*[]"],
613 vec!["/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/[*][[][]]/**"],
614 ),
615 ];
616
617 for (pod_logs_dir, containers, expected_patterns) in cases {
618 let actual_patterns: Vec<_> =
619 build_container_exclusion_patterns(pod_logs_dir, containers.clone().into_iter())
620 .collect();
621 let expected_patterns: Vec<_> = expected_patterns
622 .into_iter()
623 .map(|pattern| glob::Pattern::new(pattern).unwrap())
624 .collect();
625 assert_eq!(
626 actual_patterns, expected_patterns,
627 "failed for dir {:?} and containers {:?}",
628 &pod_logs_dir, &containers,
629 )
630 }
631 }
632}