vector/sources/kubernetes_logs/
k8s_paths_provider.rs

1//! A paths provider for k8s logs.
2
3#![deny(missing_docs)]
4
5use std::path::PathBuf;
6
7use k8s_openapi::api::core::v1::{Namespace, Pod};
8use kube::runtime::reflector::{store::Store, ObjectRef};
9use vector_lib::file_source::paths_provider::PathsProvider;
10
11use super::path_helpers::build_pod_logs_directory;
12use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum;
13
14/// A paths provider implementation that uses the state obtained from the
15/// the k8s API.
16pub struct K8sPathsProvider {
17    pod_state: Store<Pod>,
18    namespace_state: Store<Namespace>,
19    include_paths: Vec<glob::Pattern>,
20    exclude_paths: Vec<glob::Pattern>,
21}
22
23impl K8sPathsProvider {
24    /// Create a new [`K8sPathsProvider`].
25    pub const fn new(
26        pod_state: Store<Pod>,
27        namespace_state: Store<Namespace>,
28        include_paths: Vec<glob::Pattern>,
29        exclude_paths: Vec<glob::Pattern>,
30    ) -> Self {
31        Self {
32            pod_state,
33            namespace_state,
34            include_paths,
35            exclude_paths,
36        }
37    }
38}
39
40impl PathsProvider for K8sPathsProvider {
41    type IntoIter = Vec<PathBuf>;
42
43    fn paths(&self) -> Vec<PathBuf> {
44        let state = self.pod_state.state();
45
46        state
47            .into_iter()
48            // filter out pods where we haven't fetched the namespace metadata yet
49            // they will be picked up on a later run
50            .filter(|pod| {
51                trace!(message = "Verifying Namespace metadata for pod.", pod = ?pod.metadata.name);
52                if let Some(namespace) = pod.metadata.namespace.as_ref() {
53                    self.namespace_state
54                        .get(&ObjectRef::<Namespace>::new(namespace))
55                        .is_some()
56                } else {
57                    false
58                }
59            })
60            .flat_map(|pod| {
61                trace!(message = "Providing log paths for pod.", pod = ?pod.metadata.name);
62                let paths_iter = list_pod_log_paths(real_glob, pod.as_ref());
63                filter_paths(
64                    filter_paths(paths_iter, &self.include_paths, true),
65                    &self.exclude_paths,
66                    false,
67                )
68                .collect::<Vec<_>>()
69            })
70            .collect()
71    }
72}
73
74/// This function takes a `Pod` resource and returns the path to where the logs
75/// for the said `Pod` are expected to be found.
76///
77/// In the common case, the effective path is built using the `namespace`,
78/// `name` and `uid` of the Pod. However, there's a special case for
79/// `Static Pod`s: they keep their logs at the path that consists of config
80/// hashsum instead of the `Pod` `uid`. The reason for this is `kubelet` is
81/// locally authoritative over those `Pod`s, and the API only has
82/// `Monitor Pod`s - the "dummy" entries useful for discovery and association.
83/// Their UIDs are generated at the Kubernetes API side, and do not represent
84/// the actual config hashsum as one would expect.
85///
86/// To work around this, we use the mirror pod annotations (if any) to obtain
87/// the effective config hashsum, see the `extract_static_pod_config_hashsum`
88/// function that does this.
89///
90/// See <https://github.com/vectordotdev/vector/issues/6001>
91/// See <https://github.com/kubernetes/kubernetes/blob/ef3337a443b402756c9f0bfb1f844b1b45ce289d/pkg/kubelet/pod/pod_manager.go#L30-L44>
92/// See <https://github.com/kubernetes/kubernetes/blob/cea1d4e20b4a7886d8ff65f34c6d4f95efcb4742/pkg/kubelet/pod/mirror_client.go#L80-L81>
93fn extract_pod_logs_directory(pod: &Pod) -> Option<PathBuf> {
94    let metadata = &pod.metadata;
95    let namespace = metadata.namespace.as_ref()?;
96    let name = metadata.name.as_ref()?;
97
98    let uid = if let Some(static_pod_config_hashsum) = extract_static_pod_config_hashsum(metadata) {
99        // If there's a static pod config hashsum - use it instead of uid.
100        static_pod_config_hashsum
101    } else {
102        // In the common case - just fallback to the real pod uid.
103        metadata.uid.as_ref()?
104    };
105
106    Some(build_pod_logs_directory(namespace, name, uid))
107}
108
109const CONTAINER_EXCLUSION_ANNOTATION_KEY: &str = "vector.dev/exclude-containers";
110
111fn extract_excluded_containers_for_pod(pod: &Pod) -> impl Iterator<Item = &str> {
112    let metadata = &pod.metadata;
113    metadata.annotations.iter().flat_map(|annotations| {
114        annotations
115            .iter()
116            .filter_map(|(key, value)| {
117                if key != CONTAINER_EXCLUSION_ANNOTATION_KEY {
118                    return None;
119                }
120                Some(value)
121            })
122            .flat_map(|containers| containers.split(','))
123            .map(|container| container.trim())
124    })
125}
126
127fn build_container_exclusion_patterns<'a>(
128    pod_logs_dir: &'a str,
129    containers: impl Iterator<Item = &'a str> + 'a,
130) -> impl Iterator<Item = glob::Pattern> + 'a {
131    containers.filter_map(move |container| {
132        let escaped_container_name = glob::Pattern::escape(container);
133        glob::Pattern::new(&[pod_logs_dir, &escaped_container_name, "**"].join("/")).ok()
134    })
135}
136
137fn list_pod_log_paths<'a, G, GI>(
138    mut glob_impl: G,
139    pod: &'a Pod,
140) -> impl Iterator<Item = PathBuf> + 'a
141where
142    G: FnMut(&str) -> GI + 'a,
143    GI: Iterator<Item = PathBuf> + 'a,
144{
145    extract_pod_logs_directory(pod)
146        .into_iter()
147        .flat_map(move |dir| {
148            let dir = dir
149                .to_str()
150                .expect("non-utf8 path to pod logs dir is not supported");
151
152            // Run the glob to get a list of unfiltered paths.
153            let path_iter = glob_impl(
154                // We seek to match the paths like
155                // `<pod_logs_dir>/<container_name>/<n>.log` - paths managed by
156                // the `kubelet` as part of Kubernetes core logging
157                // architecture.
158                // In some setups, there will also be paths like
159                // `<pod_logs_dir>/<hash>.log` - those we want to skip.
160                &[dir, "*/*.log*"].join("/"),
161            );
162
163            // Extract the containers to exclude, then build patterns from them
164            // and cache the results into a Vec.
165            let excluded_containers = extract_excluded_containers_for_pod(pod);
166            let exclusion_patterns: Vec<_> =
167                build_container_exclusion_patterns(dir, excluded_containers).collect();
168
169            // Return paths filtered with container exclusion.
170            filter_paths(path_iter, exclusion_patterns, false)
171        })
172}
173
174fn real_glob(pattern: &str) -> impl Iterator<Item = PathBuf> + use<> {
175    glob::glob_with(
176        pattern,
177        glob::MatchOptions {
178            require_literal_separator: true,
179            ..Default::default()
180        },
181    )
182    .expect("the pattern is supposed to always be correct")
183    .flat_map(|paths| paths.into_iter())
184}
185
186fn filter_paths<'a>(
187    iter: impl Iterator<Item = PathBuf> + 'a,
188    patterns: impl AsRef<[glob::Pattern]> + 'a,
189    include: bool,
190) -> impl Iterator<Item = PathBuf> + 'a {
191    iter.filter(move |path| {
192        let m = patterns.as_ref().iter().any(|pattern| {
193            pattern.matches_path_with(
194                path,
195                glob::MatchOptions {
196                    require_literal_separator: true,
197                    ..Default::default()
198                },
199            )
200        });
201        if include {
202            m
203        } else {
204            !m
205        }
206    })
207}
208
209#[cfg(test)]
210mod tests {
211    use std::path::PathBuf;
212
213    use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta};
214
215    use super::{
216        build_container_exclusion_patterns, extract_excluded_containers_for_pod,
217        extract_pod_logs_directory, filter_paths, list_pod_log_paths,
218    };
219
220    #[test]
221    fn test_extract_pod_logs_directory() {
222        let cases = vec![
223            // Empty pod.
224            (Pod::default(), None),
225            // Happy path.
226            (
227                Pod {
228                    metadata: ObjectMeta {
229                        namespace: Some("sandbox0-ns".to_owned()),
230                        name: Some("sandbox0-name".to_owned()),
231                        uid: Some("sandbox0-uid".to_owned()),
232                        ..ObjectMeta::default()
233                    },
234                    ..Pod::default()
235                },
236                Some("/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid"),
237            ),
238            // No uid.
239            (
240                Pod {
241                    metadata: ObjectMeta {
242                        namespace: Some("sandbox0-ns".to_owned()),
243                        name: Some("sandbox0-name".to_owned()),
244                        ..ObjectMeta::default()
245                    },
246                    ..Pod::default()
247                },
248                None,
249            ),
250            // No name.
251            (
252                Pod {
253                    metadata: ObjectMeta {
254                        namespace: Some("sandbox0-ns".to_owned()),
255                        uid: Some("sandbox0-uid".to_owned()),
256                        ..ObjectMeta::default()
257                    },
258                    ..Pod::default()
259                },
260                None,
261            ),
262            // No namespace.
263            (
264                Pod {
265                    metadata: ObjectMeta {
266                        name: Some("sandbox0-name".to_owned()),
267                        uid: Some("sandbox0-uid".to_owned()),
268                        ..ObjectMeta::default()
269                    },
270                    ..Pod::default()
271                },
272                None,
273            ),
274            // Static pod config hashsum as uid.
275            (
276                Pod {
277                    metadata: ObjectMeta {
278                        namespace: Some("sandbox0-ns".to_owned()),
279                        name: Some("sandbox0-name".to_owned()),
280                        uid: Some("sandbox0-uid".to_owned()),
281                        annotations: Some(
282                            vec![(
283                                "kubernetes.io/config.mirror".to_owned(),
284                                "sandbox0-config-hashsum".to_owned(),
285                            )]
286                            .into_iter()
287                            .collect(),
288                        ),
289                        ..ObjectMeta::default()
290                    },
291                    ..Pod::default()
292                },
293                Some("/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-config-hashsum"),
294            ),
295        ];
296
297        for (pod, expected) in cases {
298            assert_eq!(
299                extract_pod_logs_directory(&pod),
300                expected.map(PathBuf::from)
301            );
302        }
303    }
304
305    #[test]
306    fn test_extract_excluded_containers_for_pod() {
307        let cases = vec![
308            // No annotations.
309            (Pod::default(), vec![]),
310            // Empty annotations.
311            (
312                Pod {
313                    metadata: ObjectMeta {
314                        annotations: Some(vec![].into_iter().collect()),
315                        ..ObjectMeta::default()
316                    },
317                    ..Pod::default()
318                },
319                vec![],
320            ),
321            // Irrelevant annotations.
322            (
323                Pod {
324                    metadata: ObjectMeta {
325                        annotations: Some(
326                            vec![("some-other-annotation".to_owned(), "some value".to_owned())]
327                                .into_iter()
328                                .collect(),
329                        ),
330                        ..ObjectMeta::default()
331                    },
332                    ..Pod::default()
333                },
334                vec![],
335            ),
336            // Proper annotation without spaces.
337            (
338                Pod {
339                    metadata: ObjectMeta {
340                        annotations: Some(
341                            vec![(
342                                super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
343                                "container1,container4".to_owned(),
344                            )]
345                            .into_iter()
346                            .collect(),
347                        ),
348                        ..ObjectMeta::default()
349                    },
350                    ..Pod::default()
351                },
352                vec!["container1", "container4"],
353            ),
354            // Proper annotation with spaces.
355            (
356                Pod {
357                    metadata: ObjectMeta {
358                        annotations: Some(
359                            vec![(
360                                super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
361                                "container1, container4".to_owned(),
362                            )]
363                            .into_iter()
364                            .collect(),
365                        ),
366                        ..ObjectMeta::default()
367                    },
368                    ..Pod::default()
369                },
370                vec!["container1", "container4"],
371            ),
372        ];
373
374        for (pod, expected) in cases {
375            let actual: Vec<&str> = extract_excluded_containers_for_pod(&pod).collect();
376            assert_eq!(actual, expected);
377        }
378    }
379
380    #[test]
381    fn test_list_pod_log_paths() {
382        let cases = vec![
383            // Pod exists and has some containers that write logs, and some of
384            // the containers are excluded.
385            (
386                Pod {
387                    metadata: ObjectMeta {
388                        namespace: Some("sandbox0-ns".to_owned()),
389                        name: Some("sandbox0-name".to_owned()),
390                        uid: Some("sandbox0-uid".to_owned()),
391                        annotations: Some(
392                            vec![(
393                                super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
394                                "excluded1,excluded2".to_owned(),
395                            )]
396                            .into_iter()
397                            .collect(),
398                        ),
399                        ..ObjectMeta::default()
400                    },
401                    ..Pod::default()
402                },
403                // Calls to the glob mock.
404                vec![(
405                    // The pattern to expect at the mock.
406                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/*/*.log*",
407                    // The paths to return from the mock.
408                    vec![
409                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/qwe.log",
410                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/qwe.log",
411                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/excluded1/qwe.log",
412                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container3/qwe.log",
413                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/excluded2/qwe.log",
414                    ],
415                )],
416                // Expected result.
417                vec![
418                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/qwe.log",
419                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/qwe.log",
420                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container3/qwe.log",
421                ],
422            ),
423            // Pod doesn't have the metadata set.
424            (Pod::default(), vec![], vec![]),
425            // Pod has proper metadata, but doesn't have log files.
426            (
427                Pod {
428                    metadata: ObjectMeta {
429                        namespace: Some("sandbox0-ns".to_owned()),
430                        name: Some("sandbox0-name".to_owned()),
431                        uid: Some("sandbox0-uid".to_owned()),
432                        ..ObjectMeta::default()
433                    },
434                    ..Pod::default()
435                },
436                vec![(
437                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/*/*.log*",
438                    vec![],
439                )],
440                vec![],
441            ),
442        ];
443
444        for (pod, expected_calls, expected_paths) in cases {
445            // Prepare the mock fn.
446            let mut expected_calls = expected_calls.into_iter();
447            let mock_glob = move |pattern: &str| {
448                let (expected_pattern, paths_to_return) = expected_calls
449                    .next()
450                    .expect("implementation did a call that wasn't expected");
451
452                assert_eq!(pattern, expected_pattern);
453                paths_to_return.into_iter().map(PathBuf::from)
454            };
455
456            let actual_paths: Vec<_> = list_pod_log_paths(mock_glob, &pod).collect();
457            let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
458            assert_eq!(actual_paths, expected_paths)
459        }
460    }
461
462    #[test]
463    fn test_exclude_paths() {
464        let cases = vec![
465            // No exclusion pattern allows everything.
466            (
467                vec![
468                    "/var/log/pods/a.log",
469                    "/var/log/pods/b.log",
470                    "/var/log/pods/c.log.foo",
471                    "/var/log/pods/d.logbar",
472                ],
473                vec![],
474                vec![
475                    "/var/log/pods/a.log",
476                    "/var/log/pods/b.log",
477                    "/var/log/pods/c.log.foo",
478                    "/var/log/pods/d.logbar",
479                ],
480            ),
481            // Test a filter that doesn't apply to anything.
482            (
483                vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
484                vec!["notmatched"],
485                vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
486            ),
487            // Multiple filters.
488            (
489                vec![
490                    "/var/log/pods/a.log",
491                    "/var/log/pods/b.log",
492                    "/var/log/pods/c.log",
493                ],
494                vec!["notmatched", "**/b.log", "**/c.log"],
495                vec!["/var/log/pods/a.log"],
496            ),
497            // Requires literal path separator (`*` does not include dirs).
498            (
499                vec![
500                    "/var/log/pods/a.log",
501                    "/var/log/pods/b.log",
502                    "/var/log/pods/c.log",
503                ],
504                vec!["*/b.log", "**/c.log"],
505                vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
506            ),
507            // Filtering by container name with a real-life-like file path.
508            (
509                vec![
510                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/1.log",
511                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/2.log",
512                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/1.log",
513                ],
514                vec!["**/container1/**"],
515                vec!["/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/1.log"],
516            ),
517        ];
518
519        for (input_paths, str_patterns, expected_paths) in cases {
520            let patterns: Vec<_> = str_patterns
521                .iter()
522                .map(|pattern| glob::Pattern::new(pattern).unwrap())
523                .collect();
524            let actual_paths: Vec<_> =
525                filter_paths(input_paths.into_iter().map(Into::into), &patterns, false).collect();
526            let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
527            assert_eq!(
528                actual_paths, expected_paths,
529                "failed for patterns {:?}",
530                &str_patterns
531            )
532        }
533    }
534
535    #[test]
536    fn test_include_paths() {
537        let cases = vec![
538            (
539                vec![
540                    "/var/log/pods/a.log",
541                    "/var/log/pods/b.log",
542                    "/var/log/pods/c.log.foo",
543                    "/var/log/pods/d.logbar",
544                    "/tmp/foo",
545                ],
546                vec!["/var/log/pods/*"],
547                vec![
548                    "/var/log/pods/a.log",
549                    "/var/log/pods/b.log",
550                    "/var/log/pods/c.log.foo",
551                    "/var/log/pods/d.logbar",
552                ],
553            ),
554            (
555                vec![
556                    "/var/log/pods/a.log",
557                    "/var/log/pods/b.log",
558                    "/var/log/pods/c.log.foo",
559                    "/var/log/pods/d.logbar",
560                ],
561                vec!["/tmp/*"],
562                vec![],
563            ),
564            (
565                vec!["/var/log/pods/a.log", "/tmp/foo"],
566                vec!["**/*"],
567                vec!["/var/log/pods/a.log", "/tmp/foo"],
568            ),
569        ];
570
571        for (input_paths, str_patterns, expected_paths) in cases {
572            let patterns: Vec<_> = str_patterns
573                .iter()
574                .map(|pattern| glob::Pattern::new(pattern).unwrap())
575                .collect();
576            let actual_paths: Vec<_> =
577                filter_paths(input_paths.into_iter().map(Into::into), &patterns, true).collect();
578            let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
579            assert_eq!(
580                actual_paths, expected_paths,
581                "failed for patterns {:?}",
582                &str_patterns
583            )
584        }
585    }
586
587    #[test]
588    fn test_build_container_exclusion_patterns() {
589        let cases = vec![
590            // No excluded containers - no exclusion patterns.
591            (
592                "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
593                vec![],
594                vec![],
595            ),
596            // Ensure the paths are concatenated correctly and look good.
597            (
598                "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
599                vec!["container1", "container2"],
600                vec![
601                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/**",
602                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/**",
603                ],
604            ),
605            // Ensure control characters are escaped properly.
606            (
607                "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
608                vec!["*[]"],
609                vec!["/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/[*][[][]]/**"],
610            ),
611        ];
612
613        for (pod_logs_dir, containers, expected_patterns) in cases {
614            let actual_patterns: Vec<_> =
615                build_container_exclusion_patterns(pod_logs_dir, containers.clone().into_iter())
616                    .collect();
617            let expected_patterns: Vec<_> = expected_patterns
618                .into_iter()
619                .map(|pattern| glob::Pattern::new(pattern).unwrap())
620                .collect();
621            assert_eq!(
622                actual_patterns, expected_patterns,
623                "failed for dir {:?} and containers {:?}",
624                &pod_logs_dir, &containers,
625            )
626        }
627    }
628}