vector/sources/kubernetes_logs/
k8s_paths_provider.rs

1//! A paths provider for k8s logs.
2
3#![deny(missing_docs)]
4
5use std::path::PathBuf;
6
7use k8s_openapi::api::core::v1::{Namespace, Pod};
8use kube::runtime::reflector::{ObjectRef, store::Store};
9use vector_lib::file_source::paths_provider::PathsProvider;
10
11use super::path_helpers::build_pod_logs_directory;
12use crate::kubernetes::pod_manager_logic::extract_static_pod_config_hashsum;
13
14/// A paths provider implementation that uses the state obtained from the
15/// the k8s API.
16pub struct K8sPathsProvider {
17    pod_state: Store<Pod>,
18    namespace_state: Store<Namespace>,
19    include_paths: Vec<glob::Pattern>,
20    exclude_paths: Vec<glob::Pattern>,
21    insert_namespace_fields: bool,
22}
23
24impl K8sPathsProvider {
25    /// Create a new [`K8sPathsProvider`].
26    pub const fn new(
27        pod_state: Store<Pod>,
28        namespace_state: Store<Namespace>,
29        include_paths: Vec<glob::Pattern>,
30        exclude_paths: Vec<glob::Pattern>,
31        insert_namespace_fields: bool,
32    ) -> Self {
33        Self {
34            pod_state,
35            namespace_state,
36            include_paths,
37            exclude_paths,
38            insert_namespace_fields,
39        }
40    }
41}
42
43impl PathsProvider for K8sPathsProvider {
44    type IntoIter = Vec<PathBuf>;
45
46    fn paths(&self) -> Vec<PathBuf> {
47        let state = self.pod_state.state();
48
49        state
50            .into_iter()
51            // filter out pods where we haven't fetched the namespace metadata yet
52            // they will be picked up on a later run
53            // Only check namespace metadata if insert_namespace_fields is enabled
54            .filter(|pod| {
55                if !self.insert_namespace_fields {
56                    // Skip namespace metadata check when namespace fields are disabled
57                    return true;
58                }
59                trace!(message = "Verifying Namespace metadata for pod.", pod = ?pod.metadata.name);
60                if let Some(namespace) = pod.metadata.namespace.as_ref() {
61                    self.namespace_state
62                        .get(&ObjectRef::<Namespace>::new(namespace))
63                        .is_some()
64                } else {
65                    false
66                }
67            })
68            .flat_map(|pod| {
69                trace!(message = "Providing log paths for pod.", pod = ?pod.metadata.name);
70                let paths_iter = list_pod_log_paths(real_glob, pod.as_ref());
71                filter_paths(
72                    filter_paths(paths_iter, &self.include_paths, true),
73                    &self.exclude_paths,
74                    false,
75                )
76                .collect::<Vec<_>>()
77            })
78            .collect()
79    }
80}
81
82/// This function takes a `Pod` resource and returns the path to where the logs
83/// for the said `Pod` are expected to be found.
84///
85/// In the common case, the effective path is built using the `namespace`,
86/// `name` and `uid` of the Pod. However, there's a special case for
87/// `Static Pod`s: they keep their logs at the path that consists of config
88/// hashsum instead of the `Pod` `uid`. The reason for this is `kubelet` is
89/// locally authoritative over those `Pod`s, and the API only has
90/// `Monitor Pod`s - the "dummy" entries useful for discovery and association.
91/// Their UIDs are generated at the Kubernetes API side, and do not represent
92/// the actual config hashsum as one would expect.
93///
94/// To work around this, we use the mirror pod annotations (if any) to obtain
95/// the effective config hashsum, see the `extract_static_pod_config_hashsum`
96/// function that does this.
97///
98/// See <https://github.com/vectordotdev/vector/issues/6001>
99/// See <https://github.com/kubernetes/kubernetes/blob/ef3337a443b402756c9f0bfb1f844b1b45ce289d/pkg/kubelet/pod/pod_manager.go#L30-L44>
100/// See <https://github.com/kubernetes/kubernetes/blob/cea1d4e20b4a7886d8ff65f34c6d4f95efcb4742/pkg/kubelet/pod/mirror_client.go#L80-L81>
101fn extract_pod_logs_directory(pod: &Pod) -> Option<PathBuf> {
102    let metadata = &pod.metadata;
103    let namespace = metadata.namespace.as_ref()?;
104    let name = metadata.name.as_ref()?;
105
106    let uid = if let Some(static_pod_config_hashsum) = extract_static_pod_config_hashsum(metadata) {
107        // If there's a static pod config hashsum - use it instead of uid.
108        static_pod_config_hashsum
109    } else {
110        // In the common case - just fallback to the real pod uid.
111        metadata.uid.as_ref()?
112    };
113
114    Some(build_pod_logs_directory(namespace, name, uid))
115}
116
117const CONTAINER_EXCLUSION_ANNOTATION_KEY: &str = "vector.dev/exclude-containers";
118
119fn extract_excluded_containers_for_pod(pod: &Pod) -> impl Iterator<Item = &str> {
120    let metadata = &pod.metadata;
121    metadata.annotations.iter().flat_map(|annotations| {
122        annotations
123            .iter()
124            .filter_map(|(key, value)| {
125                if key != CONTAINER_EXCLUSION_ANNOTATION_KEY {
126                    return None;
127                }
128                Some(value)
129            })
130            .flat_map(|containers| containers.split(','))
131            .map(|container| container.trim())
132    })
133}
134
135fn build_container_exclusion_patterns<'a>(
136    pod_logs_dir: &'a str,
137    containers: impl Iterator<Item = &'a str> + 'a,
138) -> impl Iterator<Item = glob::Pattern> + 'a {
139    containers.filter_map(move |container| {
140        let escaped_container_name = glob::Pattern::escape(container);
141        glob::Pattern::new(&[pod_logs_dir, &escaped_container_name, "**"].join("/")).ok()
142    })
143}
144
145fn list_pod_log_paths<'a, G, GI>(
146    mut glob_impl: G,
147    pod: &'a Pod,
148) -> impl Iterator<Item = PathBuf> + 'a
149where
150    G: FnMut(&str) -> GI + 'a,
151    GI: Iterator<Item = PathBuf> + 'a,
152{
153    extract_pod_logs_directory(pod)
154        .into_iter()
155        .flat_map(move |dir| {
156            let dir = dir
157                .to_str()
158                .expect("non-utf8 path to pod logs dir is not supported");
159
160            // Run the glob to get a list of unfiltered paths.
161            let path_iter = glob_impl(
162                // We seek to match the paths like
163                // `<pod_logs_dir>/<container_name>/<n>.log` - paths managed by
164                // the `kubelet` as part of Kubernetes core logging
165                // architecture.
166                // In some setups, there will also be paths like
167                // `<pod_logs_dir>/<hash>.log` - those we want to skip.
168                &[dir, "*/*.log*"].join("/"),
169            );
170
171            // Extract the containers to exclude, then build patterns from them
172            // and cache the results into a Vec.
173            let excluded_containers = extract_excluded_containers_for_pod(pod);
174            let exclusion_patterns: Vec<_> =
175                build_container_exclusion_patterns(dir, excluded_containers).collect();
176
177            // Return paths filtered with container exclusion.
178            filter_paths(path_iter, exclusion_patterns, false)
179        })
180}
181
182fn real_glob(pattern: &str) -> impl Iterator<Item = PathBuf> + use<> {
183    glob::glob_with(
184        pattern,
185        glob::MatchOptions {
186            require_literal_separator: true,
187            ..Default::default()
188        },
189    )
190    .expect("the pattern is supposed to always be correct")
191    .flat_map(|paths| paths.into_iter())
192}
193
194fn filter_paths<'a>(
195    iter: impl Iterator<Item = PathBuf> + 'a,
196    patterns: impl AsRef<[glob::Pattern]> + 'a,
197    include: bool,
198) -> impl Iterator<Item = PathBuf> + 'a {
199    iter.filter(move |path| {
200        let m = patterns.as_ref().iter().any(|pattern| {
201            pattern.matches_path_with(
202                path,
203                glob::MatchOptions {
204                    require_literal_separator: true,
205                    ..Default::default()
206                },
207            )
208        });
209        if include { m } else { !m }
210    })
211}
212
213#[cfg(test)]
214mod tests {
215    use std::path::PathBuf;
216
217    use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta};
218
219    use super::{
220        build_container_exclusion_patterns, extract_excluded_containers_for_pod,
221        extract_pod_logs_directory, filter_paths, list_pod_log_paths,
222    };
223
224    #[test]
225    fn test_extract_pod_logs_directory() {
226        let cases = vec![
227            // Empty pod.
228            (Pod::default(), None),
229            // Happy path.
230            (
231                Pod {
232                    metadata: ObjectMeta {
233                        namespace: Some("sandbox0-ns".to_owned()),
234                        name: Some("sandbox0-name".to_owned()),
235                        uid: Some("sandbox0-uid".to_owned()),
236                        ..ObjectMeta::default()
237                    },
238                    ..Pod::default()
239                },
240                Some("/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid"),
241            ),
242            // No uid.
243            (
244                Pod {
245                    metadata: ObjectMeta {
246                        namespace: Some("sandbox0-ns".to_owned()),
247                        name: Some("sandbox0-name".to_owned()),
248                        ..ObjectMeta::default()
249                    },
250                    ..Pod::default()
251                },
252                None,
253            ),
254            // No name.
255            (
256                Pod {
257                    metadata: ObjectMeta {
258                        namespace: Some("sandbox0-ns".to_owned()),
259                        uid: Some("sandbox0-uid".to_owned()),
260                        ..ObjectMeta::default()
261                    },
262                    ..Pod::default()
263                },
264                None,
265            ),
266            // No namespace.
267            (
268                Pod {
269                    metadata: ObjectMeta {
270                        name: Some("sandbox0-name".to_owned()),
271                        uid: Some("sandbox0-uid".to_owned()),
272                        ..ObjectMeta::default()
273                    },
274                    ..Pod::default()
275                },
276                None,
277            ),
278            // Static pod config hashsum as uid.
279            (
280                Pod {
281                    metadata: ObjectMeta {
282                        namespace: Some("sandbox0-ns".to_owned()),
283                        name: Some("sandbox0-name".to_owned()),
284                        uid: Some("sandbox0-uid".to_owned()),
285                        annotations: Some(
286                            vec![(
287                                "kubernetes.io/config.mirror".to_owned(),
288                                "sandbox0-config-hashsum".to_owned(),
289                            )]
290                            .into_iter()
291                            .collect(),
292                        ),
293                        ..ObjectMeta::default()
294                    },
295                    ..Pod::default()
296                },
297                Some("/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-config-hashsum"),
298            ),
299        ];
300
301        for (pod, expected) in cases {
302            assert_eq!(
303                extract_pod_logs_directory(&pod),
304                expected.map(PathBuf::from)
305            );
306        }
307    }
308
309    #[test]
310    fn test_extract_excluded_containers_for_pod() {
311        let cases = vec![
312            // No annotations.
313            (Pod::default(), vec![]),
314            // Empty annotations.
315            (
316                Pod {
317                    metadata: ObjectMeta {
318                        annotations: Some(vec![].into_iter().collect()),
319                        ..ObjectMeta::default()
320                    },
321                    ..Pod::default()
322                },
323                vec![],
324            ),
325            // Irrelevant annotations.
326            (
327                Pod {
328                    metadata: ObjectMeta {
329                        annotations: Some(
330                            vec![("some-other-annotation".to_owned(), "some value".to_owned())]
331                                .into_iter()
332                                .collect(),
333                        ),
334                        ..ObjectMeta::default()
335                    },
336                    ..Pod::default()
337                },
338                vec![],
339            ),
340            // Proper annotation without spaces.
341            (
342                Pod {
343                    metadata: ObjectMeta {
344                        annotations: Some(
345                            vec![(
346                                super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
347                                "container1,container4".to_owned(),
348                            )]
349                            .into_iter()
350                            .collect(),
351                        ),
352                        ..ObjectMeta::default()
353                    },
354                    ..Pod::default()
355                },
356                vec!["container1", "container4"],
357            ),
358            // Proper annotation with spaces.
359            (
360                Pod {
361                    metadata: ObjectMeta {
362                        annotations: Some(
363                            vec![(
364                                super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
365                                "container1, container4".to_owned(),
366                            )]
367                            .into_iter()
368                            .collect(),
369                        ),
370                        ..ObjectMeta::default()
371                    },
372                    ..Pod::default()
373                },
374                vec!["container1", "container4"],
375            ),
376        ];
377
378        for (pod, expected) in cases {
379            let actual: Vec<&str> = extract_excluded_containers_for_pod(&pod).collect();
380            assert_eq!(actual, expected);
381        }
382    }
383
384    #[test]
385    fn test_list_pod_log_paths() {
386        let cases = vec![
387            // Pod exists and has some containers that write logs, and some of
388            // the containers are excluded.
389            (
390                Pod {
391                    metadata: ObjectMeta {
392                        namespace: Some("sandbox0-ns".to_owned()),
393                        name: Some("sandbox0-name".to_owned()),
394                        uid: Some("sandbox0-uid".to_owned()),
395                        annotations: Some(
396                            vec![(
397                                super::CONTAINER_EXCLUSION_ANNOTATION_KEY.to_owned(),
398                                "excluded1,excluded2".to_owned(),
399                            )]
400                            .into_iter()
401                            .collect(),
402                        ),
403                        ..ObjectMeta::default()
404                    },
405                    ..Pod::default()
406                },
407                // Calls to the glob mock.
408                vec![(
409                    // The pattern to expect at the mock.
410                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/*/*.log*",
411                    // The paths to return from the mock.
412                    vec![
413                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/qwe.log",
414                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/qwe.log",
415                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/excluded1/qwe.log",
416                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container3/qwe.log",
417                        "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/excluded2/qwe.log",
418                    ],
419                )],
420                // Expected result.
421                vec![
422                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/qwe.log",
423                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/qwe.log",
424                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container3/qwe.log",
425                ],
426            ),
427            // Pod doesn't have the metadata set.
428            (Pod::default(), vec![], vec![]),
429            // Pod has proper metadata, but doesn't have log files.
430            (
431                Pod {
432                    metadata: ObjectMeta {
433                        namespace: Some("sandbox0-ns".to_owned()),
434                        name: Some("sandbox0-name".to_owned()),
435                        uid: Some("sandbox0-uid".to_owned()),
436                        ..ObjectMeta::default()
437                    },
438                    ..Pod::default()
439                },
440                vec![(
441                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/*/*.log*",
442                    vec![],
443                )],
444                vec![],
445            ),
446        ];
447
448        for (pod, expected_calls, expected_paths) in cases {
449            // Prepare the mock fn.
450            let mut expected_calls = expected_calls.into_iter();
451            let mock_glob = move |pattern: &str| {
452                let (expected_pattern, paths_to_return) = expected_calls
453                    .next()
454                    .expect("implementation did a call that wasn't expected");
455
456                assert_eq!(pattern, expected_pattern);
457                paths_to_return.into_iter().map(PathBuf::from)
458            };
459
460            let actual_paths: Vec<_> = list_pod_log_paths(mock_glob, &pod).collect();
461            let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
462            assert_eq!(actual_paths, expected_paths)
463        }
464    }
465
466    #[test]
467    fn test_exclude_paths() {
468        let cases = vec![
469            // No exclusion pattern allows everything.
470            (
471                vec![
472                    "/var/log/pods/a.log",
473                    "/var/log/pods/b.log",
474                    "/var/log/pods/c.log.foo",
475                    "/var/log/pods/d.logbar",
476                ],
477                vec![],
478                vec![
479                    "/var/log/pods/a.log",
480                    "/var/log/pods/b.log",
481                    "/var/log/pods/c.log.foo",
482                    "/var/log/pods/d.logbar",
483                ],
484            ),
485            // Test a filter that doesn't apply to anything.
486            (
487                vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
488                vec!["notmatched"],
489                vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
490            ),
491            // Multiple filters.
492            (
493                vec![
494                    "/var/log/pods/a.log",
495                    "/var/log/pods/b.log",
496                    "/var/log/pods/c.log",
497                ],
498                vec!["notmatched", "**/b.log", "**/c.log"],
499                vec!["/var/log/pods/a.log"],
500            ),
501            // Requires literal path separator (`*` does not include dirs).
502            (
503                vec![
504                    "/var/log/pods/a.log",
505                    "/var/log/pods/b.log",
506                    "/var/log/pods/c.log",
507                ],
508                vec!["*/b.log", "**/c.log"],
509                vec!["/var/log/pods/a.log", "/var/log/pods/b.log"],
510            ),
511            // Filtering by container name with a real-life-like file path.
512            (
513                vec![
514                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/1.log",
515                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/2.log",
516                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/1.log",
517                ],
518                vec!["**/container1/**"],
519                vec!["/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/1.log"],
520            ),
521        ];
522
523        for (input_paths, str_patterns, expected_paths) in cases {
524            let patterns: Vec<_> = str_patterns
525                .iter()
526                .map(|pattern| glob::Pattern::new(pattern).unwrap())
527                .collect();
528            let actual_paths: Vec<_> =
529                filter_paths(input_paths.into_iter().map(Into::into), &patterns, false).collect();
530            let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
531            assert_eq!(
532                actual_paths, expected_paths,
533                "failed for patterns {:?}",
534                &str_patterns
535            )
536        }
537    }
538
539    #[test]
540    fn test_include_paths() {
541        let cases = vec![
542            (
543                vec![
544                    "/var/log/pods/a.log",
545                    "/var/log/pods/b.log",
546                    "/var/log/pods/c.log.foo",
547                    "/var/log/pods/d.logbar",
548                    "/tmp/foo",
549                ],
550                vec!["/var/log/pods/*"],
551                vec![
552                    "/var/log/pods/a.log",
553                    "/var/log/pods/b.log",
554                    "/var/log/pods/c.log.foo",
555                    "/var/log/pods/d.logbar",
556                ],
557            ),
558            (
559                vec![
560                    "/var/log/pods/a.log",
561                    "/var/log/pods/b.log",
562                    "/var/log/pods/c.log.foo",
563                    "/var/log/pods/d.logbar",
564                ],
565                vec!["/tmp/*"],
566                vec![],
567            ),
568            (
569                vec!["/var/log/pods/a.log", "/tmp/foo"],
570                vec!["**/*"],
571                vec!["/var/log/pods/a.log", "/tmp/foo"],
572            ),
573        ];
574
575        for (input_paths, str_patterns, expected_paths) in cases {
576            let patterns: Vec<_> = str_patterns
577                .iter()
578                .map(|pattern| glob::Pattern::new(pattern).unwrap())
579                .collect();
580            let actual_paths: Vec<_> =
581                filter_paths(input_paths.into_iter().map(Into::into), &patterns, true).collect();
582            let expected_paths: Vec<_> = expected_paths.into_iter().map(PathBuf::from).collect();
583            assert_eq!(
584                actual_paths, expected_paths,
585                "failed for patterns {:?}",
586                &str_patterns
587            )
588        }
589    }
590
591    #[test]
592    fn test_build_container_exclusion_patterns() {
593        let cases = vec![
594            // No excluded containers - no exclusion patterns.
595            (
596                "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
597                vec![],
598                vec![],
599            ),
600            // Ensure the paths are concatenated correctly and look good.
601            (
602                "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
603                vec!["container1", "container2"],
604                vec![
605                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container1/**",
606                    "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/container2/**",
607                ],
608            ),
609            // Ensure control characters are escaped properly.
610            (
611                "/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid",
612                vec!["*[]"],
613                vec!["/var/log/pods/sandbox0-ns_sandbox0-name_sandbox0-uid/[*][[][]]/**"],
614            ),
615        ];
616
617        for (pod_logs_dir, containers, expected_patterns) in cases {
618            let actual_patterns: Vec<_> =
619                build_container_exclusion_patterns(pod_logs_dir, containers.clone().into_iter())
620                    .collect();
621            let expected_patterns: Vec<_> = expected_patterns
622                .into_iter()
623                .map(|pattern| glob::Pattern::new(pattern).unwrap())
624                .collect();
625            assert_eq!(
626                actual_patterns, expected_patterns,
627                "failed for dir {:?} and containers {:?}",
628                &pod_logs_dir, &containers,
629            )
630        }
631    }
632}