1#![deny(missing_docs)]
4
5use std::path::PathBuf;
6
7use k8s_openapi::api::core::v1::{Namespace, Pod};
8use kube::runtime::reflector::{store::Store, ObjectRef};
9use vector_lib::file_source::paths_provider::PathsProvider;
10
11use super::path_helpers::build_pod_logs_directory;
12use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum;
13
14pub struct K8sPathsProvider {
17 pod_state: Store<Pod>,
18 namespace_state: Store<Namespace>,
19 include_paths: Vec<glob::Pattern>,
20 exclude_paths: Vec<glob::Pattern>,
21}
22
23impl K8sPathsProvider {
24 pub const fn new(
26 pod_state: Store<Pod>,
27 namespace_state: Store<Namespace>,
28 include_paths: Vec<glob::Pattern>,
29 exclude_paths: Vec<glob::Pattern>,
30 ) -> Self {
31 Self {
32 pod_state,
33 namespace_state,
34 include_paths,
35 exclude_paths,
36 }
37 }
38}
39
40impl PathsProvider for K8sPathsProvider {
41 type IntoIter = Vec<PathBuf>;
42
43 fn paths(&self) -> Vec<PathBuf> {
44 let state = self.pod_state.state();
45
46 state
47 .into_iter()
48 .filter(|pod| {
51 trace!(message = "Verifying Namespace metadata for pod.", pod = ?pod.metadata.name);
52 if let Some(namespace) = pod.metadata.namespace.as_ref() {
53 self.namespace_state
54 .get(&ObjectRef::<Namespace>::new(namespace))
55 .is_some()
56 } else {
57 false
58 }
59 })
60 .flat_map(|pod| {
61 trace!(message = "Providing log paths for pod.", pod = ?pod.metadata.name);
62 let paths_iter = list_pod_log_paths(real_glob, pod.as_ref());
63 filter_paths(
64 filter_paths(paths_iter, &self.include_paths, true),
65 &self.exclude_paths,
66 false,
67 )
68 .collect::<Vec<_>>()
69 })
70 .collect()
71 }
72}
73
74fn extract_pod_logs_directory(pod: &Pod) -> Option<PathBuf> {
94 let metadata = &pod.metadata;
95 let namespace = metadata.namespace.as_ref()?;
96 let name = metadata.name.as_ref()?;
97
98 let uid = if let Some(static_pod_config_hashsum) = extract_static_pod_config_hashsum(metadata) {
99 static_pod_config_hashsum
101 } else {
102 metadata.uid.as_ref()?
104 };
105
106 Some(build_pod_logs_directory(namespace, name, uid))
107}
108
109const CONTAINER_EXCLUSION_ANNOTATION_KEY: &str = "vector.dev/exclude-containers";
110
111fn extract_excluded_containers_for_pod(pod: &Pod) -> impl Iterator<Item = &str> {
112 let metadata = &pod.metadata;
113 metadata.annotations.iter().flat_map(|annotations| {
114 annotations
115 .iter()
116 .filter_map(|(key, value)| {
117 if key != CONTAINER_EXCLUSION_ANNOTATION_KEY {
118 return None;
119 }
120 Some(value)
121 })
122 .flat_map(|containers| containers.split(','))
123 .map(|container| container.trim())
124 })
125}
126
127fn build_container_exclusion_patterns<'a>(
128 pod_logs_dir: &'a str,
129 containers: impl Iterator<Item = &'a str> + 'a,
130) -> impl Iterator<Item = glob::Pattern> + 'a {
131 containers.filter_map(move |container| {
132 let escaped_container_name = glob::Pattern::escape(container);
133 glob::Pattern::new(&[pod_logs_dir, &escaped_container_name, "**"].join("/")).ok()
134 })
135}
136
137fn list_pod_log_paths<'a, G, GI>(
138 mut glob_impl: G,
139 pod: &'a Pod,
140) -> impl Iterator<Item = PathBuf> + 'a
141where
142 G: FnMut(&str) -> GI + 'a,
143 GI: Iterator<Item = PathBuf> + 'a,
144{
145 extract_pod_logs_directory(pod)
146 .into_iter()
147 .flat_map(move |dir| {
148 let dir = dir
149 .to_str()
150 .expect("non-utf8 path to pod logs dir is not supported");
151
152 let path_iter = glob_impl(
154 &[dir, "*/*.log*"].join("/"),
161 );
162
163 let excluded_containers = extract_excluded_containers_for_pod(pod);
166 let exclusion_patterns: Vec<_> =
167 build_container_exclusion_patterns(dir, excluded_containers).collect();
168
169 filter_paths(path_iter, exclusion_patterns, false)
171 })
172}
173
174fn real_glob(pattern: &str) -> impl Iterator<Item = PathBuf> + use<> {
175 glob::glob_with(
176 pattern,
177 glob::MatchOptions {
178 require_literal_separator: true,
179 ..Default::default()
180 },
181 )
182 .expect("the pattern is supposed to always be correct")
183 .flat_map(|paths| paths.into_iter())
184}
185
186fn filter_paths<'a>(
187 iter: impl Iterator<Item = PathBuf> + 'a,
188 patterns: impl AsRef<[glob::Pattern]> + 'a,
189 include: bool,
190) -> impl Iterator<Item = PathBuf> + 'a {
191 iter.filter(move |path| {
192 let m = patterns.as_ref().iter().any(|pattern| {
193 pattern.matches_path_with(
194 path,
195 glob::MatchOptions {
196 require_literal_separator: true,
197 ..Default::default()
198 },
199 )
200 });
201 if include {
202 m
203 } else {
204 !m
205 }
206 })
207}
208
209#[cfg(test)]
210mod tests {
211 use std::path::PathBuf;
212
213 use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta};
214
215 use super::{
216 build_container_exclusion_patterns, extract_excluded_containers_for_pod,
217 extract_pod_logs_directory, filter_paths, list_pod_log_paths,
218 };
219
220 #[test]
221 fn test_extract_pod_logs_directory() {
222 let cases = vec![
223 (Pod::default(), None),
225 (
227 Pod {
228 metadata: ObjectMeta {
229 namespace: Some("sandbox0-ns".to_owned()),
230 name: Some("sandbox0-name".to_owned()),
231 uid: Some("sandbox0-uid".to_owned()),
232 ..ObjectMeta::default()
233 },
234 ..Pod::default()
235 },
236 Some("/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid"),
237 ),
238 (
240 Pod {
241 metadata: ObjectMeta {
242 namespace: Some("sandbox0-ns".to_owned()),
243 name: Some("sandbox0-name".to_owned()),
244 ..ObjectMeta::default()
245 },
246 ..Pod::default()
247 },
248 None,
249 ),
250 (
252 Pod {
253 metadata: ObjectMeta {
254 namespace: Some("sandbox0-ns".to_owned()),
255 uid: Some("sandbox0-uid".to_owned()),
256 ..ObjectMeta::default()
257 },
258 ..Pod::default()
259 },
260 None,
261 ),
262 (
264 Pod {
265 metadata: ObjectMeta {
266 name: Some("sandbox0-name".to_owned()),
267 uid: Some("sandbox0-uid".to_owned()),
268 ..ObjectMeta::default()
269 },
270 ..Pod::default()
271 },
272 None,
273 ),
274 (
276 Pod {
277 metadata: ObjectMeta {
278 namespace: Some("sandbox0-ns".to_owned()),
279 name: Some("sandbox0-name".to_owned()),
280 uid: Some("sandbox0-uid".to_owned()),
281 annotations: Some(
282 vec![(
283 "kubernetes.io/config.mirror".to_owned(),
284 "sandbox0-config-hashsum".to_owned(),
285 )]
286 .into_iter()
287 .collect(),
288 ),
289 ..ObjectMeta::default()
290 },
291 ..Pod::default()
292 },
293 Some("/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-config-hashsum"),
294 ),
295 ];
296
297 for (pod, expected) in cases {
298 assert_eq!(
299 extract_pod_logs_directory(&pod),
300 expected.map(PathBuf::from)
301 );
302 }
303 }
304
305 #[test]
306 fn test_extract_excluded_containers_for_pod() {
307 let cases = vec![
308 (Pod::default(), vec![]),
310 (
312 Pod {
313 metadata: ObjectMeta {
314 annotations: Some(vec![].into_iter().collect()),
315 ..ObjectMeta::default()
316 },
317 ..Pod::default()
318 },
319 vec![],
320 ),
321 (
323 Pod {
324 metadata: ObjectMeta {
325 annotations: Some(
326 vec![("some-other-annotation".to_owned(), "some value".to_owned())]
327 .into_iter()
328 .collect(),
329 ),
330 ..ObjectMeta::default()
331 },
332 ..Pod::default()
333 },
334 vec![],
335 ),
336 (
338 Pod {
339 metadata: ObjectMeta {
340 annotations: Some(
341 vec![(
342 super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
343 "container1,container4".to_owned(),
344 )]
345 .into_iter()
346 .collect(),
347 ),
348 ..ObjectMeta::default()
349 },
350 ..Pod::default()
351 },
352 vec!["container1", "container4"],
353 ),
354 (
356 Pod {
357 metadata: ObjectMeta {
358 annotations: Some(
359 vec![(
360 super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
361 "container1, container4".to_owned(),
362 )]
363 .into_iter()
364 .collect(),
365 ),
366 ..ObjectMeta::default()
367 },
368 ..Pod::default()
369 },
370 vec!["container1", "container4"],
371 ),
372 ];
373
374 for (pod, expected) in cases {
375 let actual: Vec<&str> = extract_excluded_containers_for_pod(&pod).collect();
376 assert_eq!(actual, expected);
377 }
378 }
379
380 #[test]
381 fn test_list_pod_log_paths() {
382 let cases = vec![
383 (
386 Pod {
387 metadata: ObjectMeta {
388 namespace: Some("sandbox0-ns".to_owned()),
389 name: Some("sandbox0-name".to_owned()),
390 uid: Some("sandbox0-uid".to_owned()),
391 annotations: Some(
392 vec![(
393 super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
394 "excluded1,excluded2".to_owned(),
395 )]
396 .into_iter()
397 .collect(),
398 ),
399 ..ObjectMeta::default()
400 },
401 ..Pod::default()
402 },
403 vec![(
405 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/*/*.log*",
407 vec![
409 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/qwe.log",
410 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/qwe.log",
411 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/excluded1/qwe.log",
412 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container3/qwe.log",
413 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/excluded2/qwe.log",
414 ],
415 )],
416 vec![
418 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/qwe.log",
419 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/qwe.log",
420 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container3/qwe.log",
421 ],
422 ),
423 (Pod::default(), vec![], vec![]),
425 (
427 Pod {
428 metadata: ObjectMeta {
429 namespace: Some("sandbox0-ns".to_owned()),
430 name: Some("sandbox0-name".to_owned()),
431 uid: Some("sandbox0-uid".to_owned()),
432 ..ObjectMeta::default()
433 },
434 ..Pod::default()
435 },
436 vec![(
437 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/*/*.log*",
438 vec![],
439 )],
440 vec![],
441 ),
442 ];
443
444 for (pod, expected_calls, expected_paths) in cases {
445 let mut expected_calls = expected_calls.into_iter();
447 let mock_glob = move |pattern: &str| {
448 let (expected_pattern, paths_to_return) = expected_calls
449 .next()
450 .expect("implementation did a call that wasn't expected");
451
452 assert_eq!(pattern, expected_pattern);
453 paths_to_return.into_iter().map(PathBuf::from)
454 };
455
456 let actual_paths: Vec<_> = list_pod_log_paths(mock_glob, &pod).collect();
457 let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
458 assert_eq!(actual_paths, expected_paths)
459 }
460 }
461
462 #[test]
463 fn test_exclude_paths() {
464 let cases = vec![
465 (
467 vec![
468 "/var/log/pods/a.log",
469 "/var/log/pods/b.log",
470 "/var/log/pods/c.log.foo",
471 "/var/log/pods/d.logbar",
472 ],
473 vec![],
474 vec![
475 "/var/log/pods/a.log",
476 "/var/log/pods/b.log",
477 "/var/log/pods/c.log.foo",
478 "/var/log/pods/d.logbar",
479 ],
480 ),
481 (
483 vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
484 vec!["notmatched"],
485 vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
486 ),
487 (
489 vec![
490 "/var/log/pods/a.log",
491 "/var/log/pods/b.log",
492 "/var/log/pods/c.log",
493 ],
494 vec!["notmatched", "**/b.log", "**/c.log"],
495 vec!["/var/log/pods/a.log"],
496 ),
497 (
499 vec![
500 "/var/log/pods/a.log",
501 "/var/log/pods/b.log",
502 "/var/log/pods/c.log",
503 ],
504 vec!["*/b.log", "**/c.log"],
505 vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
506 ),
507 (
509 vec![
510 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/1.log",
511 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/2.log",
512 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/1.log",
513 ],
514 vec!["**/container1/**"],
515 vec!["/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/1.log"],
516 ),
517 ];
518
519 for (input_paths, str_patterns, expected_paths) in cases {
520 let patterns: Vec<_> = str_patterns
521 .iter()
522 .map(|pattern| glob::Pattern::new(pattern).unwrap())
523 .collect();
524 let actual_paths: Vec<_> =
525 filter_paths(input_paths.into_iter().map(Into::into), &patterns, false).collect();
526 let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
527 assert_eq!(
528 actual_paths, expected_paths,
529 "failed for patterns {:?}",
530 &str_patterns
531 )
532 }
533 }
534
535 #[test]
536 fn test_include_paths() {
537 let cases = vec![
538 (
539 vec![
540 "/var/log/pods/a.log",
541 "/var/log/pods/b.log",
542 "/var/log/pods/c.log.foo",
543 "/var/log/pods/d.logbar",
544 "/tmp/foo",
545 ],
546 vec!["/var/log/pods/*"],
547 vec![
548 "/var/log/pods/a.log",
549 "/var/log/pods/b.log",
550 "/var/log/pods/c.log.foo",
551 "/var/log/pods/d.logbar",
552 ],
553 ),
554 (
555 vec![
556 "/var/log/pods/a.log",
557 "/var/log/pods/b.log",
558 "/var/log/pods/c.log.foo",
559 "/var/log/pods/d.logbar",
560 ],
561 vec!["/tmp/*"],
562 vec![],
563 ),
564 (
565 vec!["/var/log/pods/a.log", "/tmp/foo"],
566 vec!["**/*"],
567 vec!["/var/log/pods/a.log", "/tmp/foo"],
568 ),
569 ];
570
571 for (input_paths, str_patterns, expected_paths) in cases {
572 let patterns: Vec<_> = str_patterns
573 .iter()
574 .map(|pattern| glob::Pattern::new(pattern).unwrap())
575 .collect();
576 let actual_paths: Vec<_> =
577 filter_paths(input_paths.into_iter().map(Into::into), &patterns, true).collect();
578 let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
579 assert_eq!(
580 actual_paths, expected_paths,
581 "failed for patterns {:?}",
582 &str_patterns
583 )
584 }
585 }
586
587 #[test]
588 fn test_build_container_exclusion_patterns() {
589 let cases = vec![
590 (
592 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
593 vec![],
594 vec![],
595 ),
596 (
598 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
599 vec!["container1", "container2"],
600 vec![
601 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/**",
602 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/**",
603 ],
604 ),
605 (
607 "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
608 vec!["*[]"],
609 vec!["/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/[*][[][]]/**"],
610 ),
611 ];
612
613 for (pod_logs_dir, containers, expected_patterns) in cases {
614 let actual_patterns: Vec<_> =
615 build_container_exclusion_patterns(pod_logs_dir, containers.clone().into_iter())
616 .collect();
617 let expected_patterns: Vec<_> = expected_patterns
618 .into_iter()
619 .map(|pattern| glob::Pattern::new(pattern).unwrap())
620 .collect();
621 assert_eq!(
622 actual_patterns, expected_patterns,
623 "failed for dir {:?} and containers {:?}",
624 &pod_logs_dir, &containers,
625 )
626 }
627 }
628}