vector/sources/kubernetes_logs/
mod.rs

1//! This mod implements `kubernetes_logs` source.
2//! The scope of this source is to consume the log files that a kubelet keeps
3//! at "/var/log/pods" on the host of the Kubernetes Node when Vector itself is
4//! running inside the cluster as a DaemonSet.
5
6#![deny(missing_docs)]
7use std::{cmp::min, path::PathBuf, time::Duration};
8
9use bytes::Bytes;
10use chrono::Utc;
11use futures::{future::FutureExt, stream::StreamExt};
12use futures_util::Stream;
13use http_1::{HeaderName, HeaderValue};
14use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
15use k8s_paths_provider::K8sPathsProvider;
16use kube::{
17    api::Api,
18    config::{self, KubeConfigOptions},
19    runtime::{reflector, watcher, WatchStreamExt},
20    Client, Config as ClientConfig,
21};
22use lifecycle::Lifecycle;
23use serde_with::serde_as;
24use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig};
25use vector_lib::configurable::configurable_component;
26use vector_lib::file_source::{
27    calculate_ignore_before, Checkpointer, FileServer, FileServerShutdown, FingerprintStrategy,
28    Fingerprinter, Line, ReadFrom, ReadFromConfig,
29};
30use vector_lib::lookup::{lookup_v2::OptionalTargetPath, owned_value_path, path, OwnedTargetPath};
31use vector_lib::{config::LegacyKey, config::LogNamespace, EstimatedJsonEncodedSizeOf};
32use vector_lib::{
33    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
34    TimeZone,
35};
36use vrl::value::{kind::Collection, Kind};
37
38use crate::{
39    built_info::{PKG_NAME, PKG_VERSION},
40    sources::kubernetes_logs::partial_events_merger::merge_partial_events,
41};
42use crate::{
43    config::{
44        log_schema, ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig,
45        SourceContext, SourceOutput,
46    },
47    event::Event,
48    internal_events::{
49        FileInternalMetricsConfig, FileSourceInternalEventsEmitter, KubernetesLifecycleError,
50        KubernetesLogsEventAnnotationError, KubernetesLogsEventNamespaceAnnotationError,
51        KubernetesLogsEventNodeAnnotationError, KubernetesLogsEventsReceived,
52        KubernetesLogsPodInfo, StreamClosedError,
53    },
54    kubernetes::{custom_reflector, meta_cache::MetaCache},
55    shutdown::ShutdownSignal,
56    sources,
57    transforms::{FunctionTransform, OutputBuffer},
58    SourceSender,
59};
60
61mod k8s_paths_provider;
62mod lifecycle;
63mod namespace_metadata_annotator;
64mod node_metadata_annotator;
65mod parser;
66mod partial_events_merger;
67mod path_helpers;
68mod pod_metadata_annotator;
69mod transform_utils;
70mod util;
71
72use self::namespace_metadata_annotator::NamespaceMetadataAnnotator;
73use self::node_metadata_annotator::NodeMetadataAnnotator;
74use self::parser::Parser;
75use self::pod_metadata_annotator::PodMetadataAnnotator;
76
77/// The `self_node_name` value env var key.
78const SELF_NODE_NAME_ENV_KEY: &str = "VECTOR_SELF_NODE_NAME";
79
80/// Configuration for the `kubernetes_logs` source.
81#[serde_as]
82#[configurable_component(source("kubernetes_logs", "Collect Pod logs from Kubernetes Nodes."))]
83#[derive(Clone, Debug)]
84#[serde(deny_unknown_fields, default)]
85pub struct Config {
86    /// Specifies the [label selector][label_selector] to filter [Pods][pods] with, to be used in
87    /// addition to the built-in [exclude][exclude] filter.
88    ///
89    /// [label_selector]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
90    /// [pods]: https://kubernetes.io/docs/concepts/workloads/pods/
91    /// [exclude]: https://vector.dev/docs/reference/configuration/sources/kubernetes_logs/#pod-exclusion
92    #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
93    #[configurable(metadata(
94        docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
95    ))]
96    extra_label_selector: String,
97
98    /// Specifies the [label selector][label_selector] to filter [Namespaces][namespaces] with, to
99    /// be used in addition to the built-in [exclude][exclude] filter.
100    ///
101    /// [label_selector]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
102    /// [namespaces]: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
103    /// [exclude]: https://vector.dev/docs/reference/configuration/sources/kubernetes_logs/#namespace-exclusion
104    #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
105    #[configurable(metadata(
106        docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
107    ))]
108    extra_namespace_label_selector: String,
109
110    /// The name of the Kubernetes [Node][node] that is running.
111    ///
112    /// Configured to use an environment variable by default, to be evaluated to a value provided by
113    /// Kubernetes at Pod creation.
114    ///
115    /// [node]: https://kubernetes.io/docs/concepts/architecture/nodes/
116    self_node_name: String,
117
118    /// Specifies the [field selector][field_selector] to filter Pods with, to be used in addition
119    /// to the built-in [Node][node] filter.
120    ///
121    /// The built-in Node filter uses `self_node_name` to only watch Pods located on the same Node.
122    ///
123    /// [field_selector]: https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/
124    /// [node]: https://kubernetes.io/docs/concepts/architecture/nodes/
125    #[configurable(metadata(docs::examples = "metadata.name!=pod-name-to-exclude"))]
126    #[configurable(metadata(
127        docs::examples = "metadata.name!=pod-name-to-exclude,metadata.name=mypod"
128    ))]
129    extra_field_selector: String,
130
131    /// Whether or not to automatically merge partial events.
132    ///
133    /// Partial events are messages that were split by the Kubernetes Container Runtime
134    /// log driver.
135    auto_partial_merge: bool,
136
137    /// The directory used to persist file checkpoint positions.
138    ///
139    /// By default, the [global `data_dir` option][global_data_dir] is used.
140    /// Make sure the running user has write permissions to this directory.
141    ///
142    /// If this directory is specified, then Vector will attempt to create it.
143    ///
144    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
145    #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
146    #[configurable(metadata(docs::human_name = "Data Directory"))]
147    data_dir: Option<PathBuf>,
148
149    #[configurable(derived)]
150    #[serde(alias = "annotation_fields")]
151    pod_annotation_fields: pod_metadata_annotator::FieldsSpec,
152
153    #[configurable(derived)]
154    namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec,
155
156    #[configurable(derived)]
157    node_annotation_fields: node_metadata_annotator::FieldsSpec,
158
159    /// A list of glob patterns to include while reading the files.
160    #[configurable(metadata(docs::examples = "**/include/**"))]
161    include_paths_glob_patterns: Vec<PathBuf>,
162
163    /// A list of glob patterns to exclude from reading the files.
164    #[configurable(metadata(docs::examples = "**/exclude/**"))]
165    exclude_paths_glob_patterns: Vec<PathBuf>,
166
167    #[configurable(derived)]
168    #[serde(default = "default_read_from")]
169    read_from: ReadFromConfig,
170
171    /// Ignore files with a data modification date older than the specified number of seconds.
172    #[serde(default)]
173    #[configurable(metadata(docs::type_unit = "seconds"))]
174    #[configurable(metadata(docs::examples = 600))]
175    #[configurable(metadata(docs::human_name = "Ignore Files Older Than"))]
176    ignore_older_secs: Option<u64>,
177
178    /// Max amount of bytes to read from a single file before switching over to the next file.
179    /// **Note:** This does not apply when `oldest_first` is `true`.
180    ///
181    /// This allows distributing the reads more or less evenly across
182    /// the files.
183    #[configurable(metadata(docs::type_unit = "bytes"))]
184    max_read_bytes: usize,
185
186    /// Instead of balancing read capacity fairly across all watched files, prioritize draining the oldest files before moving on to read data from more recent files.
187    #[serde(default = "default_oldest_first")]
188    pub oldest_first: bool,
189
190    /// The maximum number of bytes a line can contain before being discarded.
191    ///
192    /// This protects against malformed lines or tailing incorrect files.
193    #[configurable(metadata(docs::type_unit = "bytes"))]
194    max_line_bytes: usize,
195
196    /// The maximum number of bytes a line can contain - after merging - before being discarded.
197    ///
198    /// This protects against malformed lines or tailing incorrect files.
199    ///
200    /// Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this
201    /// config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply
202    /// if auto_partial_merge is true, so if this is set to be 1 MiB, for example, but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped.
203    #[configurable(metadata(docs::type_unit = "bytes"))]
204    max_merged_line_bytes: Option<usize>,
205
206    /// The number of lines to read for generating the checksum.
207    ///
208    /// If your files share a common header that is not always a fixed size,
209    ///
210    /// If the file has less than this amount of lines, it won't be read at all.
211    #[configurable(metadata(docs::type_unit = "lines"))]
212    fingerprint_lines: usize,
213
214    /// The interval at which the file system is polled to identify new files to read from.
215    ///
216    /// This is quite efficient, yet might still create some load on the
217    /// file system; in addition, it is currently coupled with checksum dumping
218    /// in the underlying file server, so setting it too low may introduce
219    /// a significant overhead.
220    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
221    #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
222    glob_minimum_cooldown_ms: Duration,
223
224    /// Overrides the name of the log field used to add the ingestion timestamp to each event.
225    ///
226    /// This is useful to compute the latency between important event processing
227    /// stages. For example, the time delta between when a log line was written and when it was
228    /// processed by the `kubernetes_logs` source.
229    #[configurable(metadata(docs::examples = ".ingest_timestamp", docs::examples = "ingest_ts"))]
230    ingestion_timestamp_field: Option<OptionalTargetPath>,
231
232    /// The default time zone for timestamps without an explicit zone.
233    timezone: Option<TimeZone>,
234
235    /// Optional path to a readable [kubeconfig][kubeconfig] file.
236    ///
237    /// If not set, a connection to Kubernetes is made using the in-cluster configuration.
238    ///
239    /// [kubeconfig]: https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/
240    #[configurable(metadata(docs::examples = "/path/to/.kube/config"))]
241    kube_config_file: Option<PathBuf>,
242
243    /// Determines if requests to the kube-apiserver can be served by a cache.
244    use_apiserver_cache: bool,
245
246    /// How long to delay removing metadata entries from the cache when a pod deletion event
247    /// event is received from the watch stream.
248    ///
249    /// A longer delay allows for continued enrichment of logs after the originating Pod is
250    /// removed. If relevant metadata has been removed, the log is forwarded un-enriched and a
251    /// warning is emitted.
252    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
253    #[configurable(metadata(docs::human_name = "Delay Deletion"))]
254    delay_deletion_ms: Duration,
255
256    /// The namespace to use for logs. This overrides the global setting.
257    #[configurable(metadata(docs::hidden))]
258    #[serde(default)]
259    log_namespace: Option<bool>,
260
261    #[configurable(derived)]
262    #[serde(default)]
263    internal_metrics: FileInternalMetricsConfig,
264
265    /// How long to keep an open handle to a rotated log file.
266    /// The default value represents "no limit"
267    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
268    #[configurable(metadata(docs::type_unit = "seconds"))]
269    #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
270    rotate_wait: Duration,
271}
272
273const fn default_read_from() -> ReadFromConfig {
274    ReadFromConfig::Beginning
275}
276
277impl GenerateConfig for Config {
278    fn generate_config() -> toml::Value {
279        toml::Value::try_from(Self {
280            self_node_name: default_self_node_name_env_template(),
281            auto_partial_merge: true,
282            ..Default::default()
283        })
284        .unwrap()
285    }
286}
287
288impl Default for Config {
289    fn default() -> Self {
290        Self {
291            extra_label_selector: "".to_string(),
292            extra_namespace_label_selector: "".to_string(),
293            self_node_name: default_self_node_name_env_template(),
294            extra_field_selector: "".to_string(),
295            auto_partial_merge: true,
296            data_dir: None,
297            pod_annotation_fields: pod_metadata_annotator::FieldsSpec::default(),
298            namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec::default(),
299            node_annotation_fields: node_metadata_annotator::FieldsSpec::default(),
300            include_paths_glob_patterns: default_path_inclusion(),
301            exclude_paths_glob_patterns: default_path_exclusion(),
302            read_from: default_read_from(),
303            ignore_older_secs: None,
304            max_read_bytes: default_max_read_bytes(),
305            oldest_first: default_oldest_first(),
306            max_line_bytes: default_max_line_bytes(),
307            max_merged_line_bytes: None,
308            fingerprint_lines: default_fingerprint_lines(),
309            glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
310            ingestion_timestamp_field: None,
311            timezone: None,
312            kube_config_file: None,
313            use_apiserver_cache: false,
314            delay_deletion_ms: default_delay_deletion_ms(),
315            log_namespace: None,
316            internal_metrics: Default::default(),
317            rotate_wait: default_rotate_wait(),
318        }
319    }
320}
321
322#[async_trait::async_trait]
323#[typetag::serde(name = "kubernetes_logs")]
324impl SourceConfig for Config {
325    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
326        let log_namespace = cx.log_namespace(self.log_namespace);
327        let source = Source::new(self, &cx.globals, &cx.key).await?;
328
329        Ok(Box::pin(
330            source
331                .run(cx.out, cx.shutdown, log_namespace)
332                .map(|result| {
333                    result.map_err(|error| {
334                        error!(message = "Source future failed.", %error);
335                    })
336                }),
337        ))
338    }
339
340    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
341        let log_namespace = global_log_namespace.merge(self.log_namespace);
342        let schema_definition = BytesDeserializerConfig
343            .schema_definition(log_namespace)
344            .with_source_metadata(
345                Self::NAME,
346                Some(LegacyKey::Overwrite(owned_value_path!("file"))),
347                &owned_value_path!("file"),
348                Kind::bytes(),
349                None,
350            )
351            .with_source_metadata(
352                Self::NAME,
353                self.pod_annotation_fields
354                    .container_id
355                    .path
356                    .clone()
357                    .map(|k| k.path)
358                    .map(LegacyKey::Overwrite),
359                &owned_value_path!("container_id"),
360                Kind::bytes().or_undefined(),
361                None,
362            )
363            .with_source_metadata(
364                Self::NAME,
365                self.pod_annotation_fields
366                    .container_image
367                    .path
368                    .clone()
369                    .map(|k| k.path)
370                    .map(LegacyKey::Overwrite),
371                &owned_value_path!("container_image"),
372                Kind::bytes().or_undefined(),
373                None,
374            )
375            .with_source_metadata(
376                Self::NAME,
377                self.pod_annotation_fields
378                    .container_name
379                    .path
380                    .clone()
381                    .map(|k| k.path)
382                    .map(LegacyKey::Overwrite),
383                &owned_value_path!("container_name"),
384                Kind::bytes().or_undefined(),
385                None,
386            )
387            .with_source_metadata(
388                Self::NAME,
389                self.namespace_annotation_fields
390                    .namespace_labels
391                    .path
392                    .clone()
393                    .map(|x| LegacyKey::Overwrite(x.path)),
394                &owned_value_path!("namespace_labels"),
395                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
396                None,
397            )
398            .with_source_metadata(
399                Self::NAME,
400                self.node_annotation_fields
401                    .node_labels
402                    .path
403                    .clone()
404                    .map(|x| LegacyKey::Overwrite(x.path)),
405                &owned_value_path!("node_labels"),
406                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
407                None,
408            )
409            .with_source_metadata(
410                Self::NAME,
411                self.pod_annotation_fields
412                    .pod_annotations
413                    .path
414                    .clone()
415                    .map(|k| k.path)
416                    .map(LegacyKey::Overwrite),
417                &owned_value_path!("pod_annotations"),
418                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
419                None,
420            )
421            .with_source_metadata(
422                Self::NAME,
423                self.pod_annotation_fields
424                    .pod_ip
425                    .path
426                    .clone()
427                    .map(|k| k.path)
428                    .map(LegacyKey::Overwrite),
429                &owned_value_path!("pod_ip"),
430                Kind::bytes().or_undefined(),
431                None,
432            )
433            .with_source_metadata(
434                Self::NAME,
435                self.pod_annotation_fields
436                    .pod_ips
437                    .path
438                    .clone()
439                    .map(|k| k.path)
440                    .map(LegacyKey::Overwrite),
441                &owned_value_path!("pod_ips"),
442                Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
443                None,
444            )
445            .with_source_metadata(
446                Self::NAME,
447                self.pod_annotation_fields
448                    .pod_labels
449                    .path
450                    .clone()
451                    .map(|k| k.path)
452                    .map(LegacyKey::Overwrite),
453                &owned_value_path!("pod_labels"),
454                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
455                None,
456            )
457            .with_source_metadata(
458                Self::NAME,
459                self.pod_annotation_fields
460                    .pod_name
461                    .path
462                    .clone()
463                    .map(|k| k.path)
464                    .map(LegacyKey::Overwrite),
465                &owned_value_path!("pod_name"),
466                Kind::bytes().or_undefined(),
467                None,
468            )
469            .with_source_metadata(
470                Self::NAME,
471                self.pod_annotation_fields
472                    .pod_namespace
473                    .path
474                    .clone()
475                    .map(|k| k.path)
476                    .map(LegacyKey::Overwrite),
477                &owned_value_path!("pod_namespace"),
478                Kind::bytes().or_undefined(),
479                None,
480            )
481            .with_source_metadata(
482                Self::NAME,
483                self.pod_annotation_fields
484                    .pod_node_name
485                    .path
486                    .clone()
487                    .map(|k| k.path)
488                    .map(LegacyKey::Overwrite),
489                &owned_value_path!("pod_node_name"),
490                Kind::bytes().or_undefined(),
491                None,
492            )
493            .with_source_metadata(
494                Self::NAME,
495                self.pod_annotation_fields
496                    .pod_owner
497                    .path
498                    .clone()
499                    .map(|k| k.path)
500                    .map(LegacyKey::Overwrite),
501                &owned_value_path!("pod_owner"),
502                Kind::bytes().or_undefined(),
503                None,
504            )
505            .with_source_metadata(
506                Self::NAME,
507                self.pod_annotation_fields
508                    .pod_uid
509                    .path
510                    .clone()
511                    .map(|k| k.path)
512                    .map(LegacyKey::Overwrite),
513                &owned_value_path!("pod_uid"),
514                Kind::bytes().or_undefined(),
515                None,
516            )
517            .with_source_metadata(
518                Self::NAME,
519                Some(LegacyKey::Overwrite(owned_value_path!("stream"))),
520                &owned_value_path!("stream"),
521                Kind::bytes(),
522                None,
523            )
524            .with_source_metadata(
525                Self::NAME,
526                log_schema()
527                    .timestamp_key()
528                    .cloned()
529                    .map(LegacyKey::Overwrite),
530                &owned_value_path!("timestamp"),
531                Kind::timestamp(),
532                Some("timestamp"),
533            )
534            .with_standard_vector_source_metadata();
535
536        vec![SourceOutput::new_maybe_logs(
537            DataType::Log,
538            schema_definition,
539        )]
540    }
541
542    fn can_acknowledge(&self) -> bool {
543        false
544    }
545}
546
547#[derive(Clone)]
548struct Source {
549    client: Client,
550    data_dir: PathBuf,
551    auto_partial_merge: bool,
552    pod_fields_spec: pod_metadata_annotator::FieldsSpec,
553    namespace_fields_spec: namespace_metadata_annotator::FieldsSpec,
554    node_field_spec: node_metadata_annotator::FieldsSpec,
555    field_selector: String,
556    label_selector: String,
557    namespace_label_selector: String,
558    node_selector: String,
559    self_node_name: String,
560    include_paths: Vec<glob::Pattern>,
561    exclude_paths: Vec<glob::Pattern>,
562    read_from: ReadFrom,
563    ignore_older_secs: Option<u64>,
564    max_read_bytes: usize,
565    oldest_first: bool,
566    max_line_bytes: usize,
567    max_merged_line_bytes: Option<usize>,
568    fingerprint_lines: usize,
569    glob_minimum_cooldown: Duration,
570    use_apiserver_cache: bool,
571    ingestion_timestamp_field: Option<OwnedTargetPath>,
572    delay_deletion: Duration,
573    include_file_metric_tag: bool,
574    rotate_wait: Duration,
575}
576
577impl Source {
578    async fn new(
579        config: &Config,
580        globals: &GlobalOptions,
581        key: &ComponentKey,
582    ) -> crate::Result<Self> {
583        let self_node_name = if config.self_node_name.is_empty()
584            || config.self_node_name == default_self_node_name_env_template()
585        {
586            std::env::var(SELF_NODE_NAME_ENV_KEY).map_err(|_| {
587                format!(
588                    "self_node_name config value or {SELF_NODE_NAME_ENV_KEY} env var is not set"
589                )
590            })?
591        } else {
592            config.self_node_name.clone()
593        };
594
595        let field_selector = prepare_field_selector(config, self_node_name.as_str())?;
596        let label_selector = prepare_label_selector(config.extra_label_selector.as_ref());
597        let namespace_label_selector =
598            prepare_label_selector(config.extra_namespace_label_selector.as_ref());
599        let node_selector = prepare_node_selector(self_node_name.as_str())?;
600
601        // If the user passed a custom Kubeconfig use it, otherwise
602        // we attempt to load the local kubeconfig, followed by the
603        // in-cluster environment variables
604        let mut client_config = match &config.kube_config_file {
605            Some(kc) => {
606                ClientConfig::from_custom_kubeconfig(
607                    config::Kubeconfig::read_from(kc)?,
608                    &KubeConfigOptions::default(),
609                )
610                .await?
611            }
612            None => ClientConfig::infer().await?,
613        };
614        if let Ok(user_agent) = HeaderValue::from_str(&format!("{PKG_NAME}/{PKG_VERSION}")) {
615            client_config
616                .headers
617                .push((HeaderName::from_static("user-agent"), user_agent));
618        }
619        let client = Client::try_from(client_config)?;
620
621        let data_dir = globals.resolve_and_make_data_subdir(config.data_dir.as_ref(), key.id())?;
622
623        let include_paths = prepare_include_paths(config)?;
624
625        let exclude_paths = prepare_exclude_paths(config)?;
626
627        let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
628
629        let delay_deletion = config.delay_deletion_ms;
630
631        let ingestion_timestamp_field = config
632            .ingestion_timestamp_field
633            .clone()
634            .and_then(|k| k.path);
635
636        Ok(Self {
637            client,
638            data_dir,
639            auto_partial_merge: config.auto_partial_merge,
640            pod_fields_spec: config.pod_annotation_fields.clone(),
641            namespace_fields_spec: config.namespace_annotation_fields.clone(),
642            node_field_spec: config.node_annotation_fields.clone(),
643            field_selector,
644            label_selector,
645            namespace_label_selector,
646            node_selector,
647            self_node_name,
648            include_paths,
649            exclude_paths,
650            read_from: ReadFrom::from(config.read_from),
651            ignore_older_secs: config.ignore_older_secs,
652            max_read_bytes: config.max_read_bytes,
653            oldest_first: config.oldest_first,
654            max_line_bytes: config.max_line_bytes,
655            max_merged_line_bytes: config.max_merged_line_bytes,
656            fingerprint_lines: config.fingerprint_lines,
657            glob_minimum_cooldown,
658            use_apiserver_cache: config.use_apiserver_cache,
659            ingestion_timestamp_field,
660            delay_deletion,
661            include_file_metric_tag: config.internal_metrics.include_file_tag,
662            rotate_wait: config.rotate_wait,
663        })
664    }
665
666    async fn run(
667        self,
668        mut out: SourceSender,
669        global_shutdown: ShutdownSignal,
670        log_namespace: LogNamespace,
671    ) -> crate::Result<()> {
672        let Self {
673            client,
674            data_dir,
675            auto_partial_merge,
676            pod_fields_spec,
677            namespace_fields_spec,
678            node_field_spec,
679            field_selector,
680            label_selector,
681            namespace_label_selector,
682            node_selector,
683            self_node_name,
684            include_paths,
685            exclude_paths,
686            read_from,
687            ignore_older_secs,
688            max_read_bytes,
689            oldest_first,
690            max_line_bytes,
691            max_merged_line_bytes,
692            fingerprint_lines,
693            glob_minimum_cooldown,
694            use_apiserver_cache,
695            ingestion_timestamp_field,
696            delay_deletion,
697            include_file_metric_tag,
698            rotate_wait,
699        } = self;
700
701        let mut reflectors = Vec::new();
702
703        let pods = Api::<Pod>::all(client.clone());
704
705        let list_semantic = if use_apiserver_cache {
706            watcher::ListSemantic::Any
707        } else {
708            watcher::ListSemantic::MostRecent
709        };
710
711        let pod_watcher = watcher(
712            pods,
713            watcher::Config {
714                field_selector: Some(field_selector),
715                label_selector: Some(label_selector),
716                list_semantic: list_semantic.clone(),
717                page_size: get_page_size(use_apiserver_cache),
718                ..Default::default()
719            },
720        )
721        .backoff(watcher::DefaultBackoff::default());
722
723        let pod_store_w = reflector::store::Writer::default();
724        let pod_state = pod_store_w.as_reader();
725        let pod_cacher = MetaCache::new();
726
727        reflectors.push(tokio::spawn(custom_reflector(
728            pod_store_w,
729            pod_cacher,
730            pod_watcher,
731            delay_deletion,
732        )));
733
734        // -----------------------------------------------------------------
735
736        let namespaces = Api::<Namespace>::all(client.clone());
737        let ns_watcher = watcher(
738            namespaces,
739            watcher::Config {
740                label_selector: Some(namespace_label_selector),
741                list_semantic: list_semantic.clone(),
742                page_size: get_page_size(use_apiserver_cache),
743                ..Default::default()
744            },
745        )
746        .backoff(watcher::DefaultBackoff::default());
747        let ns_store_w = reflector::store::Writer::default();
748        let ns_state = ns_store_w.as_reader();
749        let ns_cacher = MetaCache::new();
750
751        reflectors.push(tokio::spawn(custom_reflector(
752            ns_store_w,
753            ns_cacher,
754            ns_watcher,
755            delay_deletion,
756        )));
757
758        // -----------------------------------------------------------------
759
760        let nodes = Api::<Node>::all(client);
761        let node_watcher = watcher(
762            nodes,
763            watcher::Config {
764                field_selector: Some(node_selector),
765                list_semantic,
766                page_size: get_page_size(use_apiserver_cache),
767                ..Default::default()
768            },
769        )
770        .backoff(watcher::DefaultBackoff::default());
771        let node_store_w = reflector::store::Writer::default();
772        let node_state = node_store_w.as_reader();
773        let node_cacher = MetaCache::new();
774
775        reflectors.push(tokio::spawn(custom_reflector(
776            node_store_w,
777            node_cacher,
778            node_watcher,
779            delay_deletion,
780        )));
781
782        let paths_provider = K8sPathsProvider::new(
783            pod_state.clone(),
784            ns_state.clone(),
785            include_paths,
786            exclude_paths,
787        );
788        let annotator = PodMetadataAnnotator::new(pod_state, pod_fields_spec, log_namespace);
789        let ns_annotator =
790            NamespaceMetadataAnnotator::new(ns_state, namespace_fields_spec, log_namespace);
791        let node_annotator = NodeMetadataAnnotator::new(node_state, node_field_spec, log_namespace);
792
793        let ignore_before = calculate_ignore_before(ignore_older_secs);
794
795        let mut resolved_max_line_bytes = max_line_bytes;
796        if auto_partial_merge {
797            resolved_max_line_bytes = min(
798                max_line_bytes,
799                max_merged_line_bytes.unwrap_or(max_line_bytes),
800            );
801        }
802
803        // TODO: maybe more of the parameters have to be configurable.
804
805        let checkpointer = Checkpointer::new(&data_dir);
806        let file_server = FileServer {
807            // Use our special paths provider.
808            paths_provider,
809            // Max amount of bytes to read from a single file before switching
810            // over to the next file.
811            // This allows distributing the reads more or less evenly across
812            // the files.
813            max_read_bytes,
814            // We want to use checkpointing mechanism, and resume from where we
815            // left off.
816            ignore_checkpoints: false,
817            // Match the default behavior
818            read_from,
819            // We're now aware of the use cases that would require specifying
820            // the starting point in time since when we should collect the logs,
821            // so we just disable it. If users ask, we can expose it. There may
822            // be other, more sound ways for users considering the use of this
823            // option to solve their use case, so take consideration.
824            ignore_before,
825            // The maximum number of bytes a line can contain before being discarded. This
826            // protects against malformed lines or tailing incorrect files.
827            max_line_bytes: resolved_max_line_bytes,
828            // Delimiter bytes that is used to read the file line-by-line
829            line_delimiter: Bytes::from("\n"),
830            // The directory where to keep the checkpoints.
831            data_dir,
832            // This value specifies not exactly the globbing, but interval
833            // between the polling the files to watch from the `paths_provider`.
834            glob_minimum_cooldown,
835            // The shape of the log files is well-known in the Kubernetes
836            // environment, so we pick the a specially crafted fingerprinter
837            // for the log files.
838            fingerprinter: Fingerprinter {
839                strategy: FingerprintStrategy::FirstLinesChecksum {
840                    // Max line length to expect during fingerprinting, see the
841                    // explanation above.
842                    ignored_header_bytes: 0,
843                    lines: fingerprint_lines,
844                },
845                max_line_length: resolved_max_line_bytes,
846                ignore_not_found: true,
847            },
848            oldest_first,
849            // We do not remove the log files, `kubelet` is responsible for it.
850            remove_after: None,
851            // The standard emitter.
852            emitter: FileSourceInternalEventsEmitter {
853                include_file_metric_tag,
854            },
855            // A handle to the current tokio runtime
856            handle: tokio::runtime::Handle::current(),
857            rotate_wait,
858        };
859
860        let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
861
862        let checkpoints = checkpointer.view();
863        let events = file_source_rx.flat_map(futures::stream::iter);
864        let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
865        let events = events.map(move |line| {
866            let byte_size = line.text.len();
867            bytes_received.emit(ByteSize(byte_size));
868
869            let mut event = create_event(
870                line.text,
871                &line.filename,
872                ingestion_timestamp_field.as_ref(),
873                log_namespace,
874            );
875
876            let file_info = annotator.annotate(&mut event, &line.filename);
877
878            emit!(KubernetesLogsEventsReceived {
879                file: &line.filename,
880                byte_size: event.estimated_json_encoded_size_of(),
881                pod_info: file_info.as_ref().map(|info| KubernetesLogsPodInfo {
882                    name: info.pod_name.to_owned(),
883                    namespace: info.pod_namespace.to_owned(),
884                }),
885            });
886
887            if file_info.is_none() {
888                emit!(KubernetesLogsEventAnnotationError { event: &event });
889            } else {
890                let namespace = file_info.as_ref().map(|info| info.pod_namespace);
891
892                if let Some(name) = namespace {
893                    let ns_info = ns_annotator.annotate(&mut event, name);
894
895                    if ns_info.is_none() {
896                        emit!(KubernetesLogsEventNamespaceAnnotationError { event: &event });
897                    }
898                }
899
900                let node_info = node_annotator.annotate(&mut event, self_node_name.as_str());
901
902                if node_info.is_none() {
903                    emit!(KubernetesLogsEventNodeAnnotationError { event: &event });
904                }
905            }
906
907            checkpoints.update(line.file_id, line.end_offset);
908            event
909        });
910
911        let mut parser = Parser::new(log_namespace);
912        let events = events.flat_map(move |event| {
913            let mut buf = OutputBuffer::with_capacity(1);
914            parser.transform(&mut buf, event);
915            futures::stream::iter(buf.into_events())
916        });
917
918        let (events_count, _) = events.size_hint();
919
920        let mut stream = if auto_partial_merge {
921            merge_partial_events(events, log_namespace, max_merged_line_bytes).left_stream()
922        } else {
923            events.right_stream()
924        };
925
926        let event_processing_loop = out.send_event_stream(&mut stream);
927
928        let mut lifecycle = Lifecycle::new();
929        {
930            let (slot, shutdown) = lifecycle.add();
931            let fut = util::run_file_server(file_server, file_source_tx, shutdown, checkpointer)
932                .map(|result| match result {
933                    Ok(FileServerShutdown) => info!(message = "File server completed gracefully."),
934                    Err(error) => emit!(KubernetesLifecycleError {
935                        message: "File server exited with an error.",
936                        error,
937                        count: events_count,
938                    }),
939                });
940            slot.bind(Box::pin(fut));
941        }
942        {
943            let (slot, shutdown) = lifecycle.add();
944            let fut = util::complete_with_deadline_on_signal(
945                event_processing_loop,
946                shutdown,
947                Duration::from_secs(30), // more than enough time to propagate
948            )
949            .map(|result| {
950                match result {
951                    Ok(Ok(())) => info!(message = "Event processing loop completed gracefully."),
952                    Ok(Err(_)) => emit!(StreamClosedError {
953                        count: events_count
954                    }),
955                    Err(error) => emit!(KubernetesLifecycleError {
956                        error,
957                        message: "Event processing loop timed out during the shutdown.",
958                        count: events_count,
959                    }),
960                };
961            });
962            slot.bind(Box::pin(fut));
963        }
964
965        lifecycle.run(global_shutdown).await;
966        // Stop Kubernetes object reflectors to avoid their leak on vector reload.
967        for reflector in reflectors {
968            reflector.abort();
969        }
970        info!(message = "Done.");
971        Ok(())
972    }
973}
974
975// Set page size to None if use_apiserver_cache is true, to make the list requests containing `resourceVersion=0`` parameters.
976fn get_page_size(use_apiserver_cache: bool) -> Option<u32> {
977    if use_apiserver_cache {
978        None
979    } else {
980        watcher::Config::default().page_size
981    }
982}
983
984fn create_event(
985    line: Bytes,
986    file: &str,
987    ingestion_timestamp_field: Option<&OwnedTargetPath>,
988    log_namespace: LogNamespace,
989) -> Event {
990    let deserializer = BytesDeserializer;
991    let mut log = deserializer.parse_single(line, log_namespace);
992
993    log_namespace.insert_source_metadata(
994        Config::NAME,
995        &mut log,
996        Some(LegacyKey::Overwrite(path!("file"))),
997        path!("file"),
998        file,
999    );
1000
1001    log_namespace.insert_vector_metadata(
1002        &mut log,
1003        log_schema().source_type_key(),
1004        path!("source_type"),
1005        Bytes::from(Config::NAME),
1006    );
1007    match (log_namespace, ingestion_timestamp_field) {
1008        // When using LogNamespace::Vector always set the ingest_timestamp.
1009        (LogNamespace::Vector, _) => {
1010            log.metadata_mut()
1011                .value_mut()
1012                .insert(path!("vector", "ingest_timestamp"), Utc::now());
1013        }
1014        // When LogNamespace::Legacy, only set when the `ingestion_timestamp_field` is configured.
1015        (LogNamespace::Legacy, Some(ingestion_timestamp_field)) => {
1016            log.try_insert(ingestion_timestamp_field, Utc::now())
1017        }
1018        // The CRI/Docker parsers handle inserting the `log_schema().timestamp_key()` value.
1019        (LogNamespace::Legacy, None) => (),
1020    };
1021
1022    log.into()
1023}
1024
1025/// This function returns the default value for `self_node_name` variable
1026/// as it should be at the generated config file.
1027fn default_self_node_name_env_template() -> String {
1028    format!("${{{}}}", SELF_NODE_NAME_ENV_KEY.to_owned())
1029}
1030
1031fn default_path_inclusion() -> Vec<PathBuf> {
1032    vec![PathBuf::from("**/*")]
1033}
1034
1035fn default_path_exclusion() -> Vec<PathBuf> {
1036    vec![PathBuf::from("**/*.gz"), PathBuf::from("**/*.tmp")]
1037}
1038
1039const fn default_max_read_bytes() -> usize {
1040    2048
1041}
1042
1043// We'd like to consume rotated pod log files first to release our file handle and let
1044// the space be reclaimed
1045const fn default_oldest_first() -> bool {
1046    true
1047}
1048
1049const fn default_max_line_bytes() -> usize {
1050    // NOTE: The below comment documents an incorrect assumption, see
1051    // https://github.com/vectordotdev/vector/issues/6967
1052    //
1053    // The 16KB is the maximum size of the payload at single line for both
1054    // docker and CRI log formats.
1055    // We take a double of that to account for metadata and padding, and to
1056    // have a power of two rounding. Line splitting is countered at the
1057    // parsers, see the `partial_events_merger` logic.
1058
1059    32 * 1024 // 32 KiB
1060}
1061
1062const fn default_glob_minimum_cooldown_ms() -> Duration {
1063    Duration::from_millis(60_000)
1064}
1065
1066const fn default_fingerprint_lines() -> usize {
1067    1
1068}
1069
1070const fn default_delay_deletion_ms() -> Duration {
1071    Duration::from_millis(60_000)
1072}
1073
1074const fn default_rotate_wait() -> Duration {
1075    Duration::from_secs(u64::MAX / 2)
1076}
1077
1078// This function constructs the patterns we include for file watching, created
1079// from the defaults or user provided configuration.
1080fn prepare_include_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1081    prepare_glob_patterns(&config.include_paths_glob_patterns, "Including")
1082}
1083
1084// This function constructs the patterns we exclude from file watching, created
1085// from the defaults or user provided configuration.
1086fn prepare_exclude_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1087    prepare_glob_patterns(&config.exclude_paths_glob_patterns, "Excluding")
1088}
1089
1090// This function constructs the patterns for file watching, created
1091// from the defaults or user provided configuration.
1092fn prepare_glob_patterns(paths: &[PathBuf], op: &str) -> crate::Result<Vec<glob::Pattern>> {
1093    let ret = paths
1094        .iter()
1095        .map(|pattern| {
1096            let pattern = pattern
1097                .to_str()
1098                .ok_or("glob pattern is not a valid UTF-8 string")?;
1099            Ok(glob::Pattern::new(pattern)?)
1100        })
1101        .collect::<crate::Result<Vec<_>>>()?;
1102
1103    info!(
1104        message = format!("{op} matching files."),
1105        ret = ?ret
1106            .iter()
1107            .map(glob::Pattern::as_str)
1108            .collect::<Vec<_>>()
1109    );
1110
1111    Ok(ret)
1112}
1113
1114// This function constructs the effective field selector to use, based on
1115// the specified configuration.
1116fn prepare_field_selector(config: &Config, self_node_name: &str) -> crate::Result<String> {
1117    info!(
1118        message = "Obtained Kubernetes Node name to collect logs for (self).",
1119        ?self_node_name
1120    );
1121
1122    let field_selector = format!("spec.nodeName={self_node_name}");
1123
1124    if config.extra_field_selector.is_empty() {
1125        return Ok(field_selector);
1126    }
1127
1128    Ok(format!(
1129        "{},{}",
1130        field_selector, config.extra_field_selector
1131    ))
1132}
1133
1134// This function constructs the selector for a node to annotate entries with a node metadata.
1135fn prepare_node_selector(self_node_name: &str) -> crate::Result<String> {
1136    Ok(format!("metadata.name={self_node_name}"))
1137}
1138
1139// This function constructs the effective label selector to use, based on
1140// the specified configuration.
1141fn prepare_label_selector(selector: &str) -> String {
1142    const BUILT_IN: &str = "vector.dev/exclude!=true";
1143
1144    if selector.is_empty() {
1145        return BUILT_IN.to_string();
1146    }
1147
1148    format!("{BUILT_IN},{selector}")
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153    use similar_asserts::assert_eq;
1154    use vector_lib::lookup::{owned_value_path, OwnedTargetPath};
1155    use vector_lib::{config::LogNamespace, schema::Definition};
1156    use vrl::value::{kind::Collection, Kind};
1157
1158    use crate::config::SourceConfig;
1159
1160    use super::Config;
1161
1162    #[test]
1163    fn generate_config() {
1164        crate::test_util::test_generate_config::<Config>();
1165    }
1166
1167    #[test]
1168    fn prepare_exclude_paths() {
1169        let cases = vec![
1170            (
1171                Config::default(),
1172                vec![
1173                    glob::Pattern::new("**/*.gz").unwrap(),
1174                    glob::Pattern::new("**/*.tmp").unwrap(),
1175                ],
1176            ),
1177            (
1178                Config {
1179                    exclude_paths_glob_patterns: vec![std::path::PathBuf::from("**/*.tmp")],
1180                    ..Default::default()
1181                },
1182                vec![glob::Pattern::new("**/*.tmp").unwrap()],
1183            ),
1184            (
1185                Config {
1186                    exclude_paths_glob_patterns: vec![
1187                        std::path::PathBuf::from("**/kube-system_*/**"),
1188                        std::path::PathBuf::from("**/*.gz"),
1189                        std::path::PathBuf::from("**/*.tmp"),
1190                    ],
1191                    ..Default::default()
1192                },
1193                vec![
1194                    glob::Pattern::new("**/kube-system_*/**").unwrap(),
1195                    glob::Pattern::new("**/*.gz").unwrap(),
1196                    glob::Pattern::new("**/*.tmp").unwrap(),
1197                ],
1198            ),
1199        ];
1200
1201        for (input, mut expected) in cases {
1202            let mut output = super::prepare_exclude_paths(&input).unwrap();
1203            expected.sort();
1204            output.sort();
1205            assert_eq!(expected, output, "expected left, actual right");
1206        }
1207    }
1208
1209    #[test]
1210    fn prepare_field_selector() {
1211        let cases = vec![
1212            // We're not testing `Config::default()` or empty `self_node_name`
1213            // as passing env vars in the concurrent tests is difficult.
1214            (
1215                Config {
1216                    self_node_name: "qwe".to_owned(),
1217                    ..Default::default()
1218                },
1219                "spec.nodeName=qwe",
1220            ),
1221            (
1222                Config {
1223                    self_node_name: "qwe".to_owned(),
1224                    extra_field_selector: "".to_owned(),
1225                    ..Default::default()
1226                },
1227                "spec.nodeName=qwe",
1228            ),
1229            (
1230                Config {
1231                    self_node_name: "qwe".to_owned(),
1232                    extra_field_selector: "foo=bar".to_owned(),
1233                    ..Default::default()
1234                },
1235                "spec.nodeName=qwe,foo=bar",
1236            ),
1237        ];
1238
1239        for (input, expected) in cases {
1240            let output = super::prepare_field_selector(&input, "qwe").unwrap();
1241            assert_eq!(expected, output, "expected left, actual right");
1242        }
1243    }
1244
1245    #[test]
1246    fn prepare_label_selector() {
1247        let cases = vec![
1248            (
1249                Config::default().extra_label_selector,
1250                "vector.dev/exclude!=true",
1251            ),
1252            (
1253                Config::default().extra_namespace_label_selector,
1254                "vector.dev/exclude!=true",
1255            ),
1256            (
1257                Config {
1258                    extra_label_selector: "".to_owned(),
1259                    ..Default::default()
1260                }
1261                .extra_label_selector,
1262                "vector.dev/exclude!=true",
1263            ),
1264            (
1265                Config {
1266                    extra_namespace_label_selector: "".to_owned(),
1267                    ..Default::default()
1268                }
1269                .extra_namespace_label_selector,
1270                "vector.dev/exclude!=true",
1271            ),
1272            (
1273                Config {
1274                    extra_label_selector: "qwe".to_owned(),
1275                    ..Default::default()
1276                }
1277                .extra_label_selector,
1278                "vector.dev/exclude!=true,qwe",
1279            ),
1280            (
1281                Config {
1282                    extra_namespace_label_selector: "qwe".to_owned(),
1283                    ..Default::default()
1284                }
1285                .extra_namespace_label_selector,
1286                "vector.dev/exclude!=true,qwe",
1287            ),
1288        ];
1289
1290        for (input, expected) in cases {
1291            let output = super::prepare_label_selector(&input);
1292            assert_eq!(expected, output, "expected left, actual right");
1293        }
1294    }
1295
1296    #[test]
1297    fn test_output_schema_definition_vector_namespace() {
1298        let definitions = toml::from_str::<Config>("")
1299            .unwrap()
1300            .outputs(LogNamespace::Vector)
1301            .remove(0)
1302            .schema_definition(true);
1303
1304        assert_eq!(
1305            definitions,
1306            Some(
1307                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1308                    .with_metadata_field(
1309                        &owned_value_path!("kubernetes_logs", "file"),
1310                        Kind::bytes(),
1311                        None
1312                    )
1313                    .with_metadata_field(
1314                        &owned_value_path!("kubernetes_logs", "container_id"),
1315                        Kind::bytes().or_undefined(),
1316                        None
1317                    )
1318                    .with_metadata_field(
1319                        &owned_value_path!("kubernetes_logs", "container_image"),
1320                        Kind::bytes().or_undefined(),
1321                        None
1322                    )
1323                    .with_metadata_field(
1324                        &owned_value_path!("kubernetes_logs", "container_name"),
1325                        Kind::bytes().or_undefined(),
1326                        None
1327                    )
1328                    .with_metadata_field(
1329                        &owned_value_path!("kubernetes_logs", "namespace_labels"),
1330                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1331                            .or_undefined(),
1332                        None
1333                    )
1334                    .with_metadata_field(
1335                        &owned_value_path!("kubernetes_logs", "node_labels"),
1336                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1337                            .or_undefined(),
1338                        None
1339                    )
1340                    .with_metadata_field(
1341                        &owned_value_path!("kubernetes_logs", "pod_annotations"),
1342                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1343                            .or_undefined(),
1344                        None
1345                    )
1346                    .with_metadata_field(
1347                        &owned_value_path!("kubernetes_logs", "pod_ip"),
1348                        Kind::bytes().or_undefined(),
1349                        None
1350                    )
1351                    .with_metadata_field(
1352                        &owned_value_path!("kubernetes_logs", "pod_ips"),
1353                        Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1354                        None
1355                    )
1356                    .with_metadata_field(
1357                        &owned_value_path!("kubernetes_logs", "pod_labels"),
1358                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1359                            .or_undefined(),
1360                        None
1361                    )
1362                    .with_metadata_field(
1363                        &owned_value_path!("kubernetes_logs", "pod_name"),
1364                        Kind::bytes().or_undefined(),
1365                        None
1366                    )
1367                    .with_metadata_field(
1368                        &owned_value_path!("kubernetes_logs", "pod_namespace"),
1369                        Kind::bytes().or_undefined(),
1370                        None
1371                    )
1372                    .with_metadata_field(
1373                        &owned_value_path!("kubernetes_logs", "pod_node_name"),
1374                        Kind::bytes().or_undefined(),
1375                        None
1376                    )
1377                    .with_metadata_field(
1378                        &owned_value_path!("kubernetes_logs", "pod_owner"),
1379                        Kind::bytes().or_undefined(),
1380                        None
1381                    )
1382                    .with_metadata_field(
1383                        &owned_value_path!("kubernetes_logs", "pod_uid"),
1384                        Kind::bytes().or_undefined(),
1385                        None
1386                    )
1387                    .with_metadata_field(
1388                        &owned_value_path!("kubernetes_logs", "stream"),
1389                        Kind::bytes(),
1390                        None
1391                    )
1392                    .with_metadata_field(
1393                        &owned_value_path!("kubernetes_logs", "timestamp"),
1394                        Kind::timestamp(),
1395                        Some("timestamp")
1396                    )
1397                    .with_metadata_field(
1398                        &owned_value_path!("vector", "source_type"),
1399                        Kind::bytes(),
1400                        None
1401                    )
1402                    .with_metadata_field(
1403                        &owned_value_path!("vector", "ingest_timestamp"),
1404                        Kind::timestamp(),
1405                        None
1406                    )
1407                    .with_meaning(OwnedTargetPath::event_root(), "message")
1408            )
1409        )
1410    }
1411
1412    #[test]
1413    fn test_output_schema_definition_legacy_namespace() {
1414        let definitions = toml::from_str::<Config>("")
1415            .unwrap()
1416            .outputs(LogNamespace::Legacy)
1417            .remove(0)
1418            .schema_definition(true);
1419
1420        assert_eq!(
1421            definitions,
1422            Some(
1423                Definition::new_with_default_metadata(
1424                    Kind::object(Collection::empty()),
1425                    [LogNamespace::Legacy]
1426                )
1427                .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1428                .with_event_field(
1429                    &owned_value_path!("message"),
1430                    Kind::bytes(),
1431                    Some("message")
1432                )
1433                .with_event_field(
1434                    &owned_value_path!("kubernetes", "container_id"),
1435                    Kind::bytes().or_undefined(),
1436                    None
1437                )
1438                .with_event_field(
1439                    &owned_value_path!("kubernetes", "container_image"),
1440                    Kind::bytes().or_undefined(),
1441                    None
1442                )
1443                .with_event_field(
1444                    &owned_value_path!("kubernetes", "container_name"),
1445                    Kind::bytes().or_undefined(),
1446                    None
1447                )
1448                .with_event_field(
1449                    &owned_value_path!("kubernetes", "namespace_labels"),
1450                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1451                    None
1452                )
1453                .with_event_field(
1454                    &owned_value_path!("kubernetes", "node_labels"),
1455                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1456                    None
1457                )
1458                .with_event_field(
1459                    &owned_value_path!("kubernetes", "pod_annotations"),
1460                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1461                    None
1462                )
1463                .with_event_field(
1464                    &owned_value_path!("kubernetes", "pod_ip"),
1465                    Kind::bytes().or_undefined(),
1466                    None
1467                )
1468                .with_event_field(
1469                    &owned_value_path!("kubernetes", "pod_ips"),
1470                    Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1471                    None
1472                )
1473                .with_event_field(
1474                    &owned_value_path!("kubernetes", "pod_labels"),
1475                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1476                    None
1477                )
1478                .with_event_field(
1479                    &owned_value_path!("kubernetes", "pod_name"),
1480                    Kind::bytes().or_undefined(),
1481                    None
1482                )
1483                .with_event_field(
1484                    &owned_value_path!("kubernetes", "pod_namespace"),
1485                    Kind::bytes().or_undefined(),
1486                    None
1487                )
1488                .with_event_field(
1489                    &owned_value_path!("kubernetes", "pod_node_name"),
1490                    Kind::bytes().or_undefined(),
1491                    None
1492                )
1493                .with_event_field(
1494                    &owned_value_path!("kubernetes", "pod_owner"),
1495                    Kind::bytes().or_undefined(),
1496                    None
1497                )
1498                .with_event_field(
1499                    &owned_value_path!("kubernetes", "pod_uid"),
1500                    Kind::bytes().or_undefined(),
1501                    None
1502                )
1503                .with_event_field(&owned_value_path!("stream"), Kind::bytes(), None)
1504                .with_event_field(
1505                    &owned_value_path!("timestamp"),
1506                    Kind::timestamp(),
1507                    Some("timestamp")
1508                )
1509                .with_event_field(
1510                    &owned_value_path!("source_type"),
1511                    Kind::bytes(),
1512                    None
1513                )
1514            )
1515        )
1516    }
1517}