vector/sources/kubernetes_logs/
mod.rs

1//! This mod implements `kubernetes_logs` source.
2//! The scope of this source is to consume the log files that a kubelet keeps
3//! at "/var/log/pods" on the host of the Kubernetes Node when Vector itself is
4//! running inside the cluster as a DaemonSet.
5
6#![deny(missing_docs)]
7use std::{cmp::min, path::PathBuf, time::Duration};
8
9use bytes::Bytes;
10use chrono::Utc;
11use futures::{future::FutureExt, stream::StreamExt};
12use futures_util::Stream;
13use http_1::{HeaderName, HeaderValue};
14use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
15use k8s_paths_provider::K8sPathsProvider;
16use kube::{
17    Client, Config as ClientConfig,
18    api::Api,
19    config::{self, KubeConfigOptions},
20    runtime::{WatchStreamExt, reflector, watcher},
21};
22use lifecycle::Lifecycle;
23use serde_with::serde_as;
24use vector_lib::{
25    EstimatedJsonEncodedSizeOf, TimeZone,
26    codecs::{BytesDeserializer, BytesDeserializerConfig},
27    config::{LegacyKey, LogNamespace},
28    configurable::configurable_component,
29    file_source::file_server::{
30        FileServer, Line, Shutdown as FileServerShutdown, calculate_ignore_before,
31    },
32    file_source_common::{
33        Checkpointer, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
34    },
35    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
36    lookup::{OwnedTargetPath, lookup_v2::OptionalTargetPath, owned_value_path, path},
37};
38use vrl::value::{Kind, kind::Collection};
39
40use crate::{
41    SourceSender,
42    built_info::{PKG_NAME, PKG_VERSION},
43    config::{
44        ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig, SourceContext,
45        SourceOutput, log_schema,
46    },
47    event::Event,
48    internal_events::{
49        FileInternalMetricsConfig, FileSourceInternalEventsEmitter, KubernetesLifecycleError,
50        KubernetesLogsEventAnnotationError, KubernetesLogsEventNamespaceAnnotationError,
51        KubernetesLogsEventNodeAnnotationError, KubernetesLogsEventsReceived,
52        KubernetesLogsPodInfo, StreamClosedError,
53    },
54    kubernetes::{custom_reflector, meta_cache::MetaCache},
55    shutdown::ShutdownSignal,
56    sources,
57    sources::kubernetes_logs::partial_events_merger::merge_partial_events,
58    transforms::{FunctionTransform, OutputBuffer},
59};
60
61mod k8s_paths_provider;
62mod lifecycle;
63mod namespace_metadata_annotator;
64mod node_metadata_annotator;
65mod parser;
66mod partial_events_merger;
67mod path_helpers;
68mod pod_metadata_annotator;
69mod transform_utils;
70mod util;
71
72use self::{
73    namespace_metadata_annotator::NamespaceMetadataAnnotator,
74    node_metadata_annotator::NodeMetadataAnnotator, parser::Parser,
75    pod_metadata_annotator::PodMetadataAnnotator,
76};
77
78/// The `self_node_name` value env var key.
79const SELF_NODE_NAME_ENV_KEY: &str = "VECTOR_SELF_NODE_NAME";
80
81/// Configuration for the `kubernetes_logs` source.
82#[serde_as]
83#[configurable_component(source("kubernetes_logs", "Collect Pod logs from Kubernetes Nodes."))]
84#[derive(Clone, Debug)]
85#[serde(deny_unknown_fields, default)]
86pub struct Config {
87    /// Specifies the [label selector][label_selector] to filter [Pods][pods] with, to be used in
88    /// addition to the built-in [exclude][exclude] filter.
89    ///
90    /// [label_selector]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
91    /// [pods]: https://kubernetes.io/docs/concepts/workloads/pods/
92    /// [exclude]: https://vector.dev/docs/reference/configuration/sources/kubernetes_logs/#pod-exclusion
93    #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
94    #[configurable(metadata(
95        docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
96    ))]
97    extra_label_selector: String,
98
99    /// Specifies the [label selector][label_selector] to filter [Namespaces][namespaces] with, to
100    /// be used in addition to the built-in [exclude][exclude] filter.
101    ///
102    /// [label_selector]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
103    /// [namespaces]: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/
104    /// [exclude]: https://vector.dev/docs/reference/configuration/sources/kubernetes_logs/#namespace-exclusion
105    #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
106    #[configurable(metadata(
107        docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
108    ))]
109    extra_namespace_label_selector: String,
110
111    /// Specifies whether or not to enrich logs with namespace fields.
112    ///
113    /// Setting to `false` prevents Vector from pulling in namespaces and thus namespace label fields will not
114    /// be available. This helps reduce load on the `kube-apiserver` and lowers daemonset memory usage in clusters
115    /// with many namespaces.
116    ///
117    #[serde(default = "default_insert_namespace_fields")]
118    insert_namespace_fields: bool,
119
120    /// The name of the Kubernetes [Node][node] that is running.
121    ///
122    /// Configured to use an environment variable by default, to be evaluated to a value provided by
123    /// Kubernetes at Pod creation.
124    ///
125    /// [node]: https://kubernetes.io/docs/concepts/architecture/nodes/
126    self_node_name: String,
127
128    /// Specifies the [field selector][field_selector] to filter Pods with, to be used in addition
129    /// to the built-in [Node][node] filter.
130    ///
131    /// The built-in Node filter uses `self_node_name` to only watch Pods located on the same Node.
132    ///
133    /// [field_selector]: https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/
134    /// [node]: https://kubernetes.io/docs/concepts/architecture/nodes/
135    #[configurable(metadata(docs::examples = "metadata.name!=pod-name-to-exclude"))]
136    #[configurable(metadata(
137        docs::examples = "metadata.name!=pod-name-to-exclude,metadata.name=mypod"
138    ))]
139    extra_field_selector: String,
140
141    /// Whether or not to automatically merge partial events.
142    ///
143    /// Partial events are messages that were split by the Kubernetes Container Runtime
144    /// log driver.
145    auto_partial_merge: bool,
146
147    /// The directory used to persist file checkpoint positions.
148    ///
149    /// By default, the [global `data_dir` option][global_data_dir] is used.
150    /// Make sure the running user has write permissions to this directory.
151    ///
152    /// If this directory is specified, then Vector will attempt to create it.
153    ///
154    /// [global_data_dir]: https://vector.dev/docs/reference/configuration/global-options/#data_dir
155    #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
156    #[configurable(metadata(docs::human_name = "Data Directory"))]
157    data_dir: Option<PathBuf>,
158
159    #[configurable(derived)]
160    #[serde(alias = "annotation_fields")]
161    pod_annotation_fields: pod_metadata_annotator::FieldsSpec,
162
163    #[configurable(derived)]
164    namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec,
165
166    #[configurable(derived)]
167    node_annotation_fields: node_metadata_annotator::FieldsSpec,
168
169    /// A list of glob patterns to include while reading the files.
170    #[configurable(metadata(docs::examples = "**/include/**"))]
171    include_paths_glob_patterns: Vec<PathBuf>,
172
173    /// A list of glob patterns to exclude from reading the files.
174    #[configurable(metadata(docs::examples = "**/exclude/**"))]
175    exclude_paths_glob_patterns: Vec<PathBuf>,
176
177    #[configurable(derived)]
178    #[serde(default = "default_read_from")]
179    read_from: ReadFromConfig,
180
181    /// Ignore files with a data modification date older than the specified number of seconds.
182    #[serde(default)]
183    #[configurable(metadata(docs::type_unit = "seconds"))]
184    #[configurable(metadata(docs::examples = 600))]
185    #[configurable(metadata(docs::human_name = "Ignore Files Older Than"))]
186    ignore_older_secs: Option<u64>,
187
188    /// Max amount of bytes to read from a single file before switching over to the next file.
189    /// **Note:** This does not apply when `oldest_first` is `true`.
190    ///
191    /// This allows distributing the reads more or less evenly across
192    /// the files.
193    #[configurable(metadata(docs::type_unit = "bytes"))]
194    max_read_bytes: usize,
195
196    /// Instead of balancing read capacity fairly across all watched files, prioritize draining the oldest files before moving on to read data from more recent files.
197    #[serde(default = "default_oldest_first")]
198    pub oldest_first: bool,
199
200    /// The maximum number of bytes a line can contain before being discarded.
201    ///
202    /// This protects against malformed lines or tailing incorrect files.
203    #[configurable(metadata(docs::type_unit = "bytes"))]
204    max_line_bytes: usize,
205
206    /// The maximum number of bytes a line can contain - after merging - before being discarded.
207    ///
208    /// This protects against malformed lines or tailing incorrect files.
209    ///
210    /// Note that, if auto_partial_merge is false, this config will be ignored. Also, if max_line_bytes is too small to reach the continuation character, then this
211    /// config will have no practical impact (the same is true of `auto_partial_merge`). Finally, the smaller of `max_merged_line_bytes` and `max_line_bytes` will apply
212    /// if auto_partial_merge is true, so if this is set to be 1 MiB, for example, but `max_line_bytes` is set to ~2.5 MiB, then every line greater than 1 MiB will be dropped.
213    #[configurable(metadata(docs::type_unit = "bytes"))]
214    max_merged_line_bytes: Option<usize>,
215
216    /// The number of lines to read for generating the checksum.
217    ///
218    /// If your files share a common header that is not always a fixed size,
219    ///
220    /// If the file has less than this amount of lines, it won't be read at all.
221    #[configurable(metadata(docs::type_unit = "lines"))]
222    fingerprint_lines: usize,
223
224    /// The interval at which the file system is polled to identify new files to read from.
225    ///
226    /// This is quite efficient, yet might still create some load on the
227    /// file system; in addition, it is currently coupled with checksum dumping
228    /// in the underlying file server, so setting it too low may introduce
229    /// a significant overhead.
230    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
231    #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
232    glob_minimum_cooldown_ms: Duration,
233
234    /// Overrides the name of the log field used to add the ingestion timestamp to each event.
235    ///
236    /// This is useful to compute the latency between important event processing
237    /// stages. For example, the time delta between when a log line was written and when it was
238    /// processed by the `kubernetes_logs` source.
239    #[configurable(metadata(docs::examples = ".ingest_timestamp", docs::examples = "ingest_ts"))]
240    ingestion_timestamp_field: Option<OptionalTargetPath>,
241
242    /// The default time zone for timestamps without an explicit zone.
243    timezone: Option<TimeZone>,
244
245    /// Optional path to a readable [kubeconfig][kubeconfig] file.
246    ///
247    /// If not set, a connection to Kubernetes is made using the in-cluster configuration.
248    ///
249    /// [kubeconfig]: https://kubernetes.io/docs/concepts/configuration/organize-cluster-access-kubeconfig/
250    #[configurable(metadata(docs::examples = "/path/to/.kube/config"))]
251    kube_config_file: Option<PathBuf>,
252
253    /// Determines if requests to the kube-apiserver can be served by a cache.
254    use_apiserver_cache: bool,
255
256    /// How long to delay removing metadata entries from the cache when a pod deletion event
257    /// event is received from the watch stream.
258    ///
259    /// A longer delay allows for continued enrichment of logs after the originating Pod is
260    /// removed. If relevant metadata has been removed, the log is forwarded un-enriched and a
261    /// warning is emitted.
262    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
263    #[configurable(metadata(docs::human_name = "Delay Deletion"))]
264    delay_deletion_ms: Duration,
265
266    /// The namespace to use for logs. This overrides the global setting.
267    #[configurable(metadata(docs::hidden))]
268    #[serde(default)]
269    log_namespace: Option<bool>,
270
271    #[configurable(derived)]
272    #[serde(default)]
273    internal_metrics: FileInternalMetricsConfig,
274
275    /// How long to keep an open handle to a rotated log file.
276    /// The default value represents "no limit"
277    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
278    #[configurable(metadata(docs::type_unit = "seconds"))]
279    #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
280    rotate_wait: Duration,
281}
282
283const fn default_read_from() -> ReadFromConfig {
284    ReadFromConfig::Beginning
285}
286
287impl GenerateConfig for Config {
288    fn generate_config() -> toml::Value {
289        toml::Value::try_from(Self {
290            self_node_name: default_self_node_name_env_template(),
291            auto_partial_merge: true,
292            ..Default::default()
293        })
294        .unwrap()
295    }
296}
297
298impl Default for Config {
299    fn default() -> Self {
300        Self {
301            extra_label_selector: "".to_string(),
302            extra_namespace_label_selector: "".to_string(),
303            insert_namespace_fields: true,
304            self_node_name: default_self_node_name_env_template(),
305            extra_field_selector: "".to_string(),
306            auto_partial_merge: true,
307            data_dir: None,
308            pod_annotation_fields: pod_metadata_annotator::FieldsSpec::default(),
309            namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec::default(),
310            node_annotation_fields: node_metadata_annotator::FieldsSpec::default(),
311            include_paths_glob_patterns: default_path_inclusion(),
312            exclude_paths_glob_patterns: default_path_exclusion(),
313            read_from: default_read_from(),
314            ignore_older_secs: None,
315            max_read_bytes: default_max_read_bytes(),
316            oldest_first: default_oldest_first(),
317            max_line_bytes: default_max_line_bytes(),
318            max_merged_line_bytes: None,
319            fingerprint_lines: default_fingerprint_lines(),
320            glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
321            ingestion_timestamp_field: None,
322            timezone: None,
323            kube_config_file: None,
324            use_apiserver_cache: false,
325            delay_deletion_ms: default_delay_deletion_ms(),
326            log_namespace: None,
327            internal_metrics: Default::default(),
328            rotate_wait: default_rotate_wait(),
329        }
330    }
331}
332
333#[async_trait::async_trait]
334#[typetag::serde(name = "kubernetes_logs")]
335impl SourceConfig for Config {
336    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
337        let log_namespace = cx.log_namespace(self.log_namespace);
338        let source = Source::new(self, &cx.globals, &cx.key).await?;
339
340        Ok(Box::pin(
341            source
342                .run(cx.out, cx.shutdown, log_namespace)
343                .map(|result| {
344                    result.map_err(|error| {
345                        error!(message = "Source future failed.", %error);
346                    })
347                }),
348        ))
349    }
350
351    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
352        let log_namespace = global_log_namespace.merge(self.log_namespace);
353        let schema_definition = BytesDeserializerConfig
354            .schema_definition(log_namespace)
355            .with_source_metadata(
356                Self::NAME,
357                Some(LegacyKey::Overwrite(owned_value_path!("file"))),
358                &owned_value_path!("file"),
359                Kind::bytes(),
360                None,
361            )
362            .with_source_metadata(
363                Self::NAME,
364                self.pod_annotation_fields
365                    .container_id
366                    .path
367                    .clone()
368                    .map(|k| k.path)
369                    .map(LegacyKey::Overwrite),
370                &owned_value_path!("container_id"),
371                Kind::bytes().or_undefined(),
372                None,
373            )
374            .with_source_metadata(
375                Self::NAME,
376                self.pod_annotation_fields
377                    .container_image
378                    .path
379                    .clone()
380                    .map(|k| k.path)
381                    .map(LegacyKey::Overwrite),
382                &owned_value_path!("container_image"),
383                Kind::bytes().or_undefined(),
384                None,
385            )
386            .with_source_metadata(
387                Self::NAME,
388                self.pod_annotation_fields
389                    .container_name
390                    .path
391                    .clone()
392                    .map(|k| k.path)
393                    .map(LegacyKey::Overwrite),
394                &owned_value_path!("container_name"),
395                Kind::bytes().or_undefined(),
396                None,
397            )
398            .with_source_metadata(
399                Self::NAME,
400                self.namespace_annotation_fields
401                    .namespace_labels
402                    .path
403                    .clone()
404                    .map(|x| LegacyKey::Overwrite(x.path)),
405                &owned_value_path!("namespace_labels"),
406                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
407                None,
408            )
409            .with_source_metadata(
410                Self::NAME,
411                self.node_annotation_fields
412                    .node_labels
413                    .path
414                    .clone()
415                    .map(|x| LegacyKey::Overwrite(x.path)),
416                &owned_value_path!("node_labels"),
417                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
418                None,
419            )
420            .with_source_metadata(
421                Self::NAME,
422                self.pod_annotation_fields
423                    .pod_annotations
424                    .path
425                    .clone()
426                    .map(|k| k.path)
427                    .map(LegacyKey::Overwrite),
428                &owned_value_path!("pod_annotations"),
429                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
430                None,
431            )
432            .with_source_metadata(
433                Self::NAME,
434                self.pod_annotation_fields
435                    .pod_ip
436                    .path
437                    .clone()
438                    .map(|k| k.path)
439                    .map(LegacyKey::Overwrite),
440                &owned_value_path!("pod_ip"),
441                Kind::bytes().or_undefined(),
442                None,
443            )
444            .with_source_metadata(
445                Self::NAME,
446                self.pod_annotation_fields
447                    .pod_ips
448                    .path
449                    .clone()
450                    .map(|k| k.path)
451                    .map(LegacyKey::Overwrite),
452                &owned_value_path!("pod_ips"),
453                Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
454                None,
455            )
456            .with_source_metadata(
457                Self::NAME,
458                self.pod_annotation_fields
459                    .pod_labels
460                    .path
461                    .clone()
462                    .map(|k| k.path)
463                    .map(LegacyKey::Overwrite),
464                &owned_value_path!("pod_labels"),
465                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
466                None,
467            )
468            .with_source_metadata(
469                Self::NAME,
470                self.pod_annotation_fields
471                    .pod_name
472                    .path
473                    .clone()
474                    .map(|k| k.path)
475                    .map(LegacyKey::Overwrite),
476                &owned_value_path!("pod_name"),
477                Kind::bytes().or_undefined(),
478                None,
479            )
480            .with_source_metadata(
481                Self::NAME,
482                self.pod_annotation_fields
483                    .pod_namespace
484                    .path
485                    .clone()
486                    .map(|k| k.path)
487                    .map(LegacyKey::Overwrite),
488                &owned_value_path!("pod_namespace"),
489                Kind::bytes().or_undefined(),
490                None,
491            )
492            .with_source_metadata(
493                Self::NAME,
494                self.pod_annotation_fields
495                    .pod_node_name
496                    .path
497                    .clone()
498                    .map(|k| k.path)
499                    .map(LegacyKey::Overwrite),
500                &owned_value_path!("pod_node_name"),
501                Kind::bytes().or_undefined(),
502                None,
503            )
504            .with_source_metadata(
505                Self::NAME,
506                self.pod_annotation_fields
507                    .pod_owner
508                    .path
509                    .clone()
510                    .map(|k| k.path)
511                    .map(LegacyKey::Overwrite),
512                &owned_value_path!("pod_owner"),
513                Kind::bytes().or_undefined(),
514                None,
515            )
516            .with_source_metadata(
517                Self::NAME,
518                self.pod_annotation_fields
519                    .pod_uid
520                    .path
521                    .clone()
522                    .map(|k| k.path)
523                    .map(LegacyKey::Overwrite),
524                &owned_value_path!("pod_uid"),
525                Kind::bytes().or_undefined(),
526                None,
527            )
528            .with_source_metadata(
529                Self::NAME,
530                Some(LegacyKey::Overwrite(owned_value_path!("stream"))),
531                &owned_value_path!("stream"),
532                Kind::bytes(),
533                None,
534            )
535            .with_source_metadata(
536                Self::NAME,
537                log_schema()
538                    .timestamp_key()
539                    .cloned()
540                    .map(LegacyKey::Overwrite),
541                &owned_value_path!("timestamp"),
542                Kind::timestamp(),
543                Some("timestamp"),
544            )
545            .with_standard_vector_source_metadata();
546
547        vec![SourceOutput::new_maybe_logs(
548            DataType::Log,
549            schema_definition,
550        )]
551    }
552
553    fn can_acknowledge(&self) -> bool {
554        false
555    }
556}
557
558#[derive(Clone)]
559struct Source {
560    client: Client,
561    data_dir: PathBuf,
562    auto_partial_merge: bool,
563    pod_fields_spec: pod_metadata_annotator::FieldsSpec,
564    namespace_fields_spec: namespace_metadata_annotator::FieldsSpec,
565    node_field_spec: node_metadata_annotator::FieldsSpec,
566    field_selector: String,
567    label_selector: String,
568    namespace_label_selector: String,
569    insert_namespace_fields: bool,
570    node_selector: String,
571    self_node_name: String,
572    include_paths: Vec<glob::Pattern>,
573    exclude_paths: Vec<glob::Pattern>,
574    read_from: ReadFrom,
575    ignore_older_secs: Option<u64>,
576    max_read_bytes: usize,
577    oldest_first: bool,
578    max_line_bytes: usize,
579    max_merged_line_bytes: Option<usize>,
580    fingerprint_lines: usize,
581    glob_minimum_cooldown: Duration,
582    use_apiserver_cache: bool,
583    ingestion_timestamp_field: Option<OwnedTargetPath>,
584    delay_deletion: Duration,
585    include_file_metric_tag: bool,
586    rotate_wait: Duration,
587}
588
589impl Source {
590    async fn new(
591        config: &Config,
592        globals: &GlobalOptions,
593        key: &ComponentKey,
594    ) -> crate::Result<Self> {
595        let self_node_name = if config.self_node_name.is_empty()
596            || config.self_node_name == default_self_node_name_env_template()
597        {
598            std::env::var(SELF_NODE_NAME_ENV_KEY).map_err(|_| {
599                format!(
600                    "self_node_name config value or {SELF_NODE_NAME_ENV_KEY} env var is not set"
601                )
602            })?
603        } else {
604            config.self_node_name.clone()
605        };
606
607        let field_selector = prepare_field_selector(config, self_node_name.as_str())?;
608        let label_selector = prepare_label_selector(config.extra_label_selector.as_ref());
609        let namespace_label_selector =
610            prepare_label_selector(config.extra_namespace_label_selector.as_ref());
611        let node_selector = prepare_node_selector(self_node_name.as_str())?;
612
613        // If the user passed a custom Kubeconfig use it, otherwise
614        // we attempt to load the local kubeconfig, followed by the
615        // in-cluster environment variables
616        let mut client_config = match &config.kube_config_file {
617            Some(kc) => {
618                ClientConfig::from_custom_kubeconfig(
619                    config::Kubeconfig::read_from(kc)?,
620                    &KubeConfigOptions::default(),
621                )
622                .await?
623            }
624            None => ClientConfig::infer().await?,
625        };
626        if let Ok(user_agent) = HeaderValue::from_str(&format!("{PKG_NAME}/{PKG_VERSION}")) {
627            client_config
628                .headers
629                .push((HeaderName::from_static("user-agent"), user_agent));
630        }
631        let client = Client::try_from(client_config)?;
632
633        let data_dir = globals.resolve_and_make_data_subdir(config.data_dir.as_ref(), key.id())?;
634
635        let include_paths = prepare_include_paths(config)?;
636
637        let exclude_paths = prepare_exclude_paths(config)?;
638
639        let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
640
641        let delay_deletion = config.delay_deletion_ms;
642
643        let ingestion_timestamp_field = config
644            .ingestion_timestamp_field
645            .clone()
646            .and_then(|k| k.path);
647
648        Ok(Self {
649            client,
650            data_dir,
651            auto_partial_merge: config.auto_partial_merge,
652            pod_fields_spec: config.pod_annotation_fields.clone(),
653            namespace_fields_spec: config.namespace_annotation_fields.clone(),
654            node_field_spec: config.node_annotation_fields.clone(),
655            field_selector,
656            label_selector,
657            namespace_label_selector,
658            insert_namespace_fields: config.insert_namespace_fields,
659            node_selector,
660            self_node_name,
661            include_paths,
662            exclude_paths,
663            read_from: ReadFrom::from(config.read_from),
664            ignore_older_secs: config.ignore_older_secs,
665            max_read_bytes: config.max_read_bytes,
666            oldest_first: config.oldest_first,
667            max_line_bytes: config.max_line_bytes,
668            max_merged_line_bytes: config.max_merged_line_bytes,
669            fingerprint_lines: config.fingerprint_lines,
670            glob_minimum_cooldown,
671            use_apiserver_cache: config.use_apiserver_cache,
672            ingestion_timestamp_field,
673            delay_deletion,
674            include_file_metric_tag: config.internal_metrics.include_file_tag,
675            rotate_wait: config.rotate_wait,
676        })
677    }
678
679    async fn run(
680        self,
681        mut out: SourceSender,
682        global_shutdown: ShutdownSignal,
683        log_namespace: LogNamespace,
684    ) -> crate::Result<()> {
685        let Self {
686            client,
687            data_dir,
688            auto_partial_merge,
689            pod_fields_spec,
690            namespace_fields_spec,
691            node_field_spec,
692            field_selector,
693            label_selector,
694            namespace_label_selector,
695            insert_namespace_fields,
696            node_selector,
697            self_node_name,
698            include_paths,
699            exclude_paths,
700            read_from,
701            ignore_older_secs,
702            max_read_bytes,
703            oldest_first,
704            max_line_bytes,
705            max_merged_line_bytes,
706            fingerprint_lines,
707            glob_minimum_cooldown,
708            use_apiserver_cache,
709            ingestion_timestamp_field,
710            delay_deletion,
711            include_file_metric_tag,
712            rotate_wait,
713        } = self;
714
715        let mut reflectors = Vec::new();
716
717        let pods = Api::<Pod>::all(client.clone());
718
719        let list_semantic = if use_apiserver_cache {
720            watcher::ListSemantic::Any
721        } else {
722            watcher::ListSemantic::MostRecent
723        };
724
725        let pod_watcher = watcher(
726            pods,
727            watcher::Config {
728                field_selector: Some(field_selector),
729                label_selector: Some(label_selector),
730                list_semantic: list_semantic.clone(),
731                page_size: get_page_size(use_apiserver_cache),
732                ..Default::default()
733            },
734        )
735        .backoff(watcher::DefaultBackoff::default());
736
737        let pod_store_w = reflector::store::Writer::default();
738        let pod_state = pod_store_w.as_reader();
739        let pod_cacher = MetaCache::new();
740
741        reflectors.push(tokio::spawn(custom_reflector(
742            pod_store_w,
743            pod_cacher,
744            pod_watcher,
745            delay_deletion,
746        )));
747
748        // -----------------------------------------------------------------
749
750        let ns_store_w = reflector::store::Writer::default();
751        let ns_state = ns_store_w.as_reader();
752        if insert_namespace_fields {
753            let namespaces = Api::<Namespace>::all(client.clone());
754            let ns_watcher = watcher(
755                namespaces,
756                watcher::Config {
757                    label_selector: Some(namespace_label_selector),
758                    list_semantic: list_semantic.clone(),
759                    page_size: get_page_size(use_apiserver_cache),
760                    ..Default::default()
761                },
762            )
763            .backoff(watcher::DefaultBackoff::default());
764
765            reflectors.push(tokio::spawn(custom_reflector(
766                ns_store_w,
767                MetaCache::new(),
768                ns_watcher,
769                delay_deletion,
770            )));
771        }
772
773        // -----------------------------------------------------------------
774
775        let nodes = Api::<Node>::all(client);
776        let node_watcher = watcher(
777            nodes,
778            watcher::Config {
779                field_selector: Some(node_selector),
780                list_semantic,
781                page_size: get_page_size(use_apiserver_cache),
782                ..Default::default()
783            },
784        )
785        .backoff(watcher::DefaultBackoff::default());
786        let node_store_w = reflector::store::Writer::default();
787        let node_state = node_store_w.as_reader();
788        let node_cacher = MetaCache::new();
789
790        reflectors.push(tokio::spawn(custom_reflector(
791            node_store_w,
792            node_cacher,
793            node_watcher,
794            delay_deletion,
795        )));
796
797        let paths_provider = K8sPathsProvider::new(
798            pod_state.clone(),
799            ns_state.clone(),
800            include_paths,
801            exclude_paths,
802            insert_namespace_fields,
803        );
804        let annotator = PodMetadataAnnotator::new(pod_state, pod_fields_spec, log_namespace);
805        let ns_annotator =
806            NamespaceMetadataAnnotator::new(ns_state, namespace_fields_spec, log_namespace);
807        let node_annotator = NodeMetadataAnnotator::new(node_state, node_field_spec, log_namespace);
808
809        let ignore_before = calculate_ignore_before(ignore_older_secs);
810
811        let mut resolved_max_line_bytes = max_line_bytes;
812        if auto_partial_merge {
813            resolved_max_line_bytes = min(
814                max_line_bytes,
815                max_merged_line_bytes.unwrap_or(max_line_bytes),
816            );
817        }
818
819        // TODO: maybe more of the parameters have to be configurable.
820
821        let checkpointer = Checkpointer::new(&data_dir);
822        let file_server = FileServer {
823            // Use our special paths provider.
824            paths_provider,
825            // Max amount of bytes to read from a single file before switching
826            // over to the next file.
827            // This allows distributing the reads more or less evenly across
828            // the files.
829            max_read_bytes,
830            // We want to use checkpointing mechanism, and resume from where we
831            // left off.
832            ignore_checkpoints: false,
833            // Match the default behavior
834            read_from,
835            // We're now aware of the use cases that would require specifying
836            // the starting point in time since when we should collect the logs,
837            // so we just disable it. If users ask, we can expose it. There may
838            // be other, more sound ways for users considering the use of this
839            // option to solve their use case, so take consideration.
840            ignore_before,
841            // The maximum number of bytes a line can contain before being discarded. This
842            // protects against malformed lines or tailing incorrect files.
843            max_line_bytes: resolved_max_line_bytes,
844            // Delimiter bytes that is used to read the file line-by-line
845            line_delimiter: Bytes::from("\n"),
846            // The directory where to keep the checkpoints.
847            data_dir,
848            // This value specifies not exactly the globbing, but interval
849            // between the polling the files to watch from the `paths_provider`.
850            glob_minimum_cooldown,
851            // The shape of the log files is well-known in the Kubernetes
852            // environment, so we pick the a specially crafted fingerprinter
853            // for the log files.
854            fingerprinter: Fingerprinter {
855                strategy: FingerprintStrategy::FirstLinesChecksum {
856                    // Max line length to expect during fingerprinting, see the
857                    // explanation above.
858                    ignored_header_bytes: 0,
859                    lines: fingerprint_lines,
860                },
861                max_line_length: resolved_max_line_bytes,
862                ignore_not_found: true,
863            },
864            oldest_first,
865            // We do not remove the log files, `kubelet` is responsible for it.
866            remove_after: None,
867            // The standard emitter.
868            emitter: FileSourceInternalEventsEmitter {
869                include_file_metric_tag,
870            },
871            // A handle to the current tokio runtime
872            rotate_wait,
873        };
874
875        let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
876
877        let checkpoints = checkpointer.view();
878        let events = file_source_rx.flat_map(futures::stream::iter);
879        let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
880        let events = events.map(move |line| {
881            let byte_size = line.text.len();
882            bytes_received.emit(ByteSize(byte_size));
883
884            let mut event = create_event(
885                line.text,
886                &line.filename,
887                ingestion_timestamp_field.as_ref(),
888                log_namespace,
889            );
890
891            let file_info = annotator.annotate(&mut event, &line.filename);
892
893            emit!(KubernetesLogsEventsReceived {
894                file: &line.filename,
895                byte_size: event.estimated_json_encoded_size_of(),
896                pod_info: file_info.as_ref().map(|info| KubernetesLogsPodInfo {
897                    name: info.pod_name.to_owned(),
898                    namespace: info.pod_namespace.to_owned(),
899                }),
900            });
901
902            if file_info.is_none() {
903                emit!(KubernetesLogsEventAnnotationError { event: &event });
904            } else {
905                let namespace = file_info.as_ref().map(|info| info.pod_namespace);
906
907                if insert_namespace_fields
908                    && let Some(name) = namespace
909                    && ns_annotator.annotate(&mut event, name).is_none()
910                {
911                    emit!(KubernetesLogsEventNamespaceAnnotationError { event: &event });
912                }
913
914                let node_info = node_annotator.annotate(&mut event, self_node_name.as_str());
915
916                if node_info.is_none() {
917                    emit!(KubernetesLogsEventNodeAnnotationError { event: &event });
918                }
919            }
920
921            checkpoints.update(line.file_id, line.end_offset);
922            event
923        });
924
925        let mut parser = Parser::new(log_namespace);
926        let events = events.flat_map(move |event| {
927            let mut buf = OutputBuffer::with_capacity(1);
928            parser.transform(&mut buf, event);
929            futures::stream::iter(buf.into_events())
930        });
931
932        let (events_count, _) = events.size_hint();
933
934        let mut stream = if auto_partial_merge {
935            merge_partial_events(events, log_namespace, max_merged_line_bytes).left_stream()
936        } else {
937            events.right_stream()
938        };
939
940        let event_processing_loop = out.send_event_stream(&mut stream);
941
942        let mut lifecycle = Lifecycle::new();
943        {
944            let (slot, shutdown) = lifecycle.add();
945            let fut = util::run_file_server(file_server, file_source_tx, shutdown, checkpointer)
946                .map(|result| match result {
947                    Ok(FileServerShutdown) => info!(message = "File server completed gracefully."),
948                    Err(error) => emit!(KubernetesLifecycleError {
949                        message: "File server exited with an error.",
950                        error,
951                        count: events_count,
952                    }),
953                });
954            slot.bind(Box::pin(fut));
955        }
956        {
957            let (slot, shutdown) = lifecycle.add();
958            let fut = util::complete_with_deadline_on_signal(
959                event_processing_loop,
960                shutdown,
961                Duration::from_secs(30), // more than enough time to propagate
962            )
963            .map(|result| {
964                match result {
965                    Ok(Ok(())) => info!(message = "Event processing loop completed gracefully."),
966                    Ok(Err(_)) => emit!(StreamClosedError {
967                        count: events_count
968                    }),
969                    Err(error) => emit!(KubernetesLifecycleError {
970                        error,
971                        message: "Event processing loop timed out during the shutdown.",
972                        count: events_count,
973                    }),
974                };
975            });
976            slot.bind(Box::pin(fut));
977        }
978
979        lifecycle.run(global_shutdown).await;
980        // Stop Kubernetes object reflectors to avoid their leak on vector reload.
981        for reflector in reflectors {
982            reflector.abort();
983        }
984        info!(message = "Done.");
985        Ok(())
986    }
987}
988
989// Set page size to None if use_apiserver_cache is true, to make the list requests containing `resourceVersion=0`` parameters.
990fn get_page_size(use_apiserver_cache: bool) -> Option<u32> {
991    if use_apiserver_cache {
992        None
993    } else {
994        watcher::Config::default().page_size
995    }
996}
997
998fn create_event(
999    line: Bytes,
1000    file: &str,
1001    ingestion_timestamp_field: Option<&OwnedTargetPath>,
1002    log_namespace: LogNamespace,
1003) -> Event {
1004    let deserializer = BytesDeserializer;
1005    let mut log = deserializer.parse_single(line, log_namespace);
1006
1007    log_namespace.insert_source_metadata(
1008        Config::NAME,
1009        &mut log,
1010        Some(LegacyKey::Overwrite(path!("file"))),
1011        path!("file"),
1012        file,
1013    );
1014
1015    log_namespace.insert_vector_metadata(
1016        &mut log,
1017        log_schema().source_type_key(),
1018        path!("source_type"),
1019        Bytes::from(Config::NAME),
1020    );
1021    match (log_namespace, ingestion_timestamp_field) {
1022        // When using LogNamespace::Vector always set the ingest_timestamp.
1023        (LogNamespace::Vector, _) => {
1024            log.metadata_mut()
1025                .value_mut()
1026                .insert(path!("vector", "ingest_timestamp"), Utc::now());
1027        }
1028        // When LogNamespace::Legacy, only set when the `ingestion_timestamp_field` is configured.
1029        (LogNamespace::Legacy, Some(ingestion_timestamp_field)) => {
1030            log.try_insert(ingestion_timestamp_field, Utc::now())
1031        }
1032        // The CRI/Docker parsers handle inserting the `log_schema().timestamp_key()` value.
1033        (LogNamespace::Legacy, None) => (),
1034    };
1035
1036    log.into()
1037}
1038
1039/// This function returns the default value for `self_node_name` variable
1040/// as it should be at the generated config file.
1041fn default_self_node_name_env_template() -> String {
1042    format!("${{{}}}", SELF_NODE_NAME_ENV_KEY.to_owned())
1043}
1044
1045fn default_path_inclusion() -> Vec<PathBuf> {
1046    vec![PathBuf::from("**/*")]
1047}
1048
1049fn default_path_exclusion() -> Vec<PathBuf> {
1050    vec![PathBuf::from("**/*.gz"), PathBuf::from("**/*.tmp")]
1051}
1052
1053const fn default_max_read_bytes() -> usize {
1054    2048
1055}
1056
1057// We'd like to consume rotated pod log files first to release our file handle and let
1058// the space be reclaimed
1059const fn default_oldest_first() -> bool {
1060    true
1061}
1062
1063// It might make sense to disable this for clusters with a very large number of namespaces.
1064const fn default_insert_namespace_fields() -> bool {
1065    true
1066}
1067
1068const fn default_max_line_bytes() -> usize {
1069    // NOTE: The below comment documents an incorrect assumption, see
1070    // https://github.com/vectordotdev/vector/issues/6967
1071    //
1072    // The 16KB is the maximum size of the payload at single line for both
1073    // docker and CRI log formats.
1074    // We take a double of that to account for metadata and padding, and to
1075    // have a power of two rounding. Line splitting is countered at the
1076    // parsers, see the `partial_events_merger` logic.
1077
1078    32 * 1024 // 32 KiB
1079}
1080
1081const fn default_glob_minimum_cooldown_ms() -> Duration {
1082    Duration::from_millis(60_000)
1083}
1084
1085const fn default_fingerprint_lines() -> usize {
1086    1
1087}
1088
1089const fn default_delay_deletion_ms() -> Duration {
1090    Duration::from_millis(60_000)
1091}
1092
1093const fn default_rotate_wait() -> Duration {
1094    Duration::from_secs(u64::MAX / 2)
1095}
1096
1097// This function constructs the patterns we include for file watching, created
1098// from the defaults or user provided configuration.
1099fn prepare_include_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1100    prepare_glob_patterns(&config.include_paths_glob_patterns, "Including")
1101}
1102
1103// This function constructs the patterns we exclude from file watching, created
1104// from the defaults or user provided configuration.
1105fn prepare_exclude_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1106    prepare_glob_patterns(&config.exclude_paths_glob_patterns, "Excluding")
1107}
1108
1109// This function constructs the patterns for file watching, created
1110// from the defaults or user provided configuration.
1111fn prepare_glob_patterns(paths: &[PathBuf], op: &str) -> crate::Result<Vec<glob::Pattern>> {
1112    let ret = paths
1113        .iter()
1114        .map(|pattern| {
1115            let pattern = pattern
1116                .to_str()
1117                .ok_or("glob pattern is not a valid UTF-8 string")?;
1118            Ok(glob::Pattern::new(pattern)?)
1119        })
1120        .collect::<crate::Result<Vec<_>>>()?;
1121
1122    info!(
1123        message = format!("{op} matching files."),
1124        ret = ?ret
1125            .iter()
1126            .map(glob::Pattern::as_str)
1127            .collect::<Vec<_>>()
1128    );
1129
1130    Ok(ret)
1131}
1132
1133// This function constructs the effective field selector to use, based on
1134// the specified configuration.
1135fn prepare_field_selector(config: &Config, self_node_name: &str) -> crate::Result<String> {
1136    info!(
1137        message = "Obtained Kubernetes Node name to collect logs for (self).",
1138        ?self_node_name
1139    );
1140
1141    let field_selector = format!("spec.nodeName={self_node_name}");
1142
1143    if config.extra_field_selector.is_empty() {
1144        return Ok(field_selector);
1145    }
1146
1147    Ok(format!(
1148        "{},{}",
1149        field_selector, config.extra_field_selector
1150    ))
1151}
1152
1153// This function constructs the selector for a node to annotate entries with a node metadata.
1154fn prepare_node_selector(self_node_name: &str) -> crate::Result<String> {
1155    Ok(format!("metadata.name={self_node_name}"))
1156}
1157
1158// This function constructs the effective label selector to use, based on
1159// the specified configuration.
1160fn prepare_label_selector(selector: &str) -> String {
1161    const BUILT_IN: &str = "vector.dev/exclude!=true";
1162
1163    if selector.is_empty() {
1164        return BUILT_IN.to_string();
1165    }
1166
1167    format!("{BUILT_IN},{selector}")
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use similar_asserts::assert_eq;
1173    use vector_lib::{
1174        config::LogNamespace,
1175        lookup::{OwnedTargetPath, owned_value_path},
1176        schema::Definition,
1177    };
1178    use vrl::value::{Kind, kind::Collection};
1179
1180    use super::Config;
1181    use crate::config::SourceConfig;
1182
1183    #[test]
1184    fn generate_config() {
1185        crate::test_util::test_generate_config::<Config>();
1186    }
1187
1188    #[test]
1189    fn test_default_config_insert_namespace_fields() {
1190        let config = Config::default();
1191        assert_eq!(config.insert_namespace_fields, true);
1192    }
1193
1194    #[test]
1195    fn test_config_insert_namespace_fields_disabled() {
1196        let config = Config {
1197            insert_namespace_fields: false,
1198            ..Default::default()
1199        };
1200        assert_eq!(config.insert_namespace_fields, false);
1201    }
1202
1203    #[test]
1204    fn test_config_serialization_insert_namespace_fields() {
1205        // Test that the flag serializes/deserializes correctly from TOML
1206        let toml_config = r#"
1207            insert_namespace_fields = false
1208        "#;
1209        let config: Config = toml::from_str(toml_config).unwrap();
1210        assert_eq!(config.insert_namespace_fields, false);
1211
1212        let default_toml = "";
1213        let default_config: Config = toml::from_str(default_toml).unwrap();
1214        assert_eq!(default_config.insert_namespace_fields, true);
1215    }
1216
1217    #[test]
1218    fn test_insert_namespace_fields_affects_behavior() {
1219        // Test that the config field properly controls namespace watching behavior
1220        // This is a unit test for the conditional logic in the run method
1221        let enabled_config = Config {
1222            insert_namespace_fields: true,
1223            ..Default::default()
1224        };
1225        let disabled_config = Config {
1226            insert_namespace_fields: false,
1227            ..Default::default()
1228        };
1229
1230        // The main validation is that the flag is passed through correctly
1231        // and can be used in conditional logic
1232        assert!(should_watch_namespaces(&enabled_config));
1233        assert!(!should_watch_namespaces(&disabled_config));
1234    }
1235
1236    // Helper function to simulate the conditional logic from the run method
1237    fn should_watch_namespaces(config: &Config) -> bool {
1238        config.insert_namespace_fields
1239    }
1240
1241    #[test]
1242    fn prepare_exclude_paths() {
1243        let cases = vec![
1244            (
1245                Config::default(),
1246                vec![
1247                    glob::Pattern::new("**/*.gz").unwrap(),
1248                    glob::Pattern::new("**/*.tmp").unwrap(),
1249                ],
1250            ),
1251            (
1252                Config {
1253                    exclude_paths_glob_patterns: vec![std::path::PathBuf::from("**/*.tmp")],
1254                    ..Default::default()
1255                },
1256                vec![glob::Pattern::new("**/*.tmp").unwrap()],
1257            ),
1258            (
1259                Config {
1260                    exclude_paths_glob_patterns: vec![
1261                        std::path::PathBuf::from("**/kube-system_*/**"),
1262                        std::path::PathBuf::from("**/*.gz"),
1263                        std::path::PathBuf::from("**/*.tmp"),
1264                    ],
1265                    ..Default::default()
1266                },
1267                vec![
1268                    glob::Pattern::new("**/kube-system_*/**").unwrap(),
1269                    glob::Pattern::new("**/*.gz").unwrap(),
1270                    glob::Pattern::new("**/*.tmp").unwrap(),
1271                ],
1272            ),
1273        ];
1274
1275        for (input, mut expected) in cases {
1276            let mut output = super::prepare_exclude_paths(&input).unwrap();
1277            expected.sort();
1278            output.sort();
1279            assert_eq!(expected, output, "expected left, actual right");
1280        }
1281    }
1282
1283    #[test]
1284    fn prepare_field_selector() {
1285        let cases = vec![
1286            // We're not testing `Config::default()` or empty `self_node_name`
1287            // as passing env vars in the concurrent tests is difficult.
1288            (
1289                Config {
1290                    self_node_name: "qwe".to_owned(),
1291                    ..Default::default()
1292                },
1293                "spec.nodeName=qwe",
1294            ),
1295            (
1296                Config {
1297                    self_node_name: "qwe".to_owned(),
1298                    extra_field_selector: "".to_owned(),
1299                    ..Default::default()
1300                },
1301                "spec.nodeName=qwe",
1302            ),
1303            (
1304                Config {
1305                    self_node_name: "qwe".to_owned(),
1306                    extra_field_selector: "foo=bar".to_owned(),
1307                    ..Default::default()
1308                },
1309                "spec.nodeName=qwe,foo=bar",
1310            ),
1311        ];
1312
1313        for (input, expected) in cases {
1314            let output = super::prepare_field_selector(&input, "qwe").unwrap();
1315            assert_eq!(expected, output, "expected left, actual right");
1316        }
1317    }
1318
1319    #[test]
1320    fn prepare_label_selector() {
1321        let cases = vec![
1322            (
1323                Config::default().extra_label_selector,
1324                "vector.dev/exclude!=true",
1325            ),
1326            (
1327                Config::default().extra_namespace_label_selector,
1328                "vector.dev/exclude!=true",
1329            ),
1330            (
1331                Config {
1332                    extra_label_selector: "".to_owned(),
1333                    ..Default::default()
1334                }
1335                .extra_label_selector,
1336                "vector.dev/exclude!=true",
1337            ),
1338            (
1339                Config {
1340                    extra_namespace_label_selector: "".to_owned(),
1341                    ..Default::default()
1342                }
1343                .extra_namespace_label_selector,
1344                "vector.dev/exclude!=true",
1345            ),
1346            (
1347                Config {
1348                    extra_label_selector: "qwe".to_owned(),
1349                    ..Default::default()
1350                }
1351                .extra_label_selector,
1352                "vector.dev/exclude!=true,qwe",
1353            ),
1354            (
1355                Config {
1356                    extra_namespace_label_selector: "qwe".to_owned(),
1357                    ..Default::default()
1358                }
1359                .extra_namespace_label_selector,
1360                "vector.dev/exclude!=true,qwe",
1361            ),
1362        ];
1363
1364        for (input, expected) in cases {
1365            let output = super::prepare_label_selector(&input);
1366            assert_eq!(expected, output, "expected left, actual right");
1367        }
1368    }
1369
1370    #[test]
1371    fn test_output_schema_definition_vector_namespace() {
1372        let definitions = toml::from_str::<Config>("")
1373            .unwrap()
1374            .outputs(LogNamespace::Vector)
1375            .remove(0)
1376            .schema_definition(true);
1377
1378        assert_eq!(
1379            definitions,
1380            Some(
1381                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1382                    .with_metadata_field(
1383                        &owned_value_path!("kubernetes_logs", "file"),
1384                        Kind::bytes(),
1385                        None
1386                    )
1387                    .with_metadata_field(
1388                        &owned_value_path!("kubernetes_logs", "container_id"),
1389                        Kind::bytes().or_undefined(),
1390                        None
1391                    )
1392                    .with_metadata_field(
1393                        &owned_value_path!("kubernetes_logs", "container_image"),
1394                        Kind::bytes().or_undefined(),
1395                        None
1396                    )
1397                    .with_metadata_field(
1398                        &owned_value_path!("kubernetes_logs", "container_name"),
1399                        Kind::bytes().or_undefined(),
1400                        None
1401                    )
1402                    .with_metadata_field(
1403                        &owned_value_path!("kubernetes_logs", "namespace_labels"),
1404                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1405                            .or_undefined(),
1406                        None
1407                    )
1408                    .with_metadata_field(
1409                        &owned_value_path!("kubernetes_logs", "node_labels"),
1410                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1411                            .or_undefined(),
1412                        None
1413                    )
1414                    .with_metadata_field(
1415                        &owned_value_path!("kubernetes_logs", "pod_annotations"),
1416                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1417                            .or_undefined(),
1418                        None
1419                    )
1420                    .with_metadata_field(
1421                        &owned_value_path!("kubernetes_logs", "pod_ip"),
1422                        Kind::bytes().or_undefined(),
1423                        None
1424                    )
1425                    .with_metadata_field(
1426                        &owned_value_path!("kubernetes_logs", "pod_ips"),
1427                        Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1428                        None
1429                    )
1430                    .with_metadata_field(
1431                        &owned_value_path!("kubernetes_logs", "pod_labels"),
1432                        Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1433                            .or_undefined(),
1434                        None
1435                    )
1436                    .with_metadata_field(
1437                        &owned_value_path!("kubernetes_logs", "pod_name"),
1438                        Kind::bytes().or_undefined(),
1439                        None
1440                    )
1441                    .with_metadata_field(
1442                        &owned_value_path!("kubernetes_logs", "pod_namespace"),
1443                        Kind::bytes().or_undefined(),
1444                        None
1445                    )
1446                    .with_metadata_field(
1447                        &owned_value_path!("kubernetes_logs", "pod_node_name"),
1448                        Kind::bytes().or_undefined(),
1449                        None
1450                    )
1451                    .with_metadata_field(
1452                        &owned_value_path!("kubernetes_logs", "pod_owner"),
1453                        Kind::bytes().or_undefined(),
1454                        None
1455                    )
1456                    .with_metadata_field(
1457                        &owned_value_path!("kubernetes_logs", "pod_uid"),
1458                        Kind::bytes().or_undefined(),
1459                        None
1460                    )
1461                    .with_metadata_field(
1462                        &owned_value_path!("kubernetes_logs", "stream"),
1463                        Kind::bytes(),
1464                        None
1465                    )
1466                    .with_metadata_field(
1467                        &owned_value_path!("kubernetes_logs", "timestamp"),
1468                        Kind::timestamp(),
1469                        Some("timestamp")
1470                    )
1471                    .with_metadata_field(
1472                        &owned_value_path!("vector", "source_type"),
1473                        Kind::bytes(),
1474                        None
1475                    )
1476                    .with_metadata_field(
1477                        &owned_value_path!("vector", "ingest_timestamp"),
1478                        Kind::timestamp(),
1479                        None
1480                    )
1481                    .with_meaning(OwnedTargetPath::event_root(), "message")
1482            )
1483        )
1484    }
1485
1486    #[test]
1487    fn test_output_schema_definition_legacy_namespace() {
1488        let definitions = toml::from_str::<Config>("")
1489            .unwrap()
1490            .outputs(LogNamespace::Legacy)
1491            .remove(0)
1492            .schema_definition(true);
1493
1494        assert_eq!(
1495            definitions,
1496            Some(
1497                Definition::new_with_default_metadata(
1498                    Kind::object(Collection::empty()),
1499                    [LogNamespace::Legacy]
1500                )
1501                .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1502                .with_event_field(
1503                    &owned_value_path!("message"),
1504                    Kind::bytes(),
1505                    Some("message")
1506                )
1507                .with_event_field(
1508                    &owned_value_path!("kubernetes", "container_id"),
1509                    Kind::bytes().or_undefined(),
1510                    None
1511                )
1512                .with_event_field(
1513                    &owned_value_path!("kubernetes", "container_image"),
1514                    Kind::bytes().or_undefined(),
1515                    None
1516                )
1517                .with_event_field(
1518                    &owned_value_path!("kubernetes", "container_name"),
1519                    Kind::bytes().or_undefined(),
1520                    None
1521                )
1522                .with_event_field(
1523                    &owned_value_path!("kubernetes", "namespace_labels"),
1524                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1525                    None
1526                )
1527                .with_event_field(
1528                    &owned_value_path!("kubernetes", "node_labels"),
1529                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1530                    None
1531                )
1532                .with_event_field(
1533                    &owned_value_path!("kubernetes", "pod_annotations"),
1534                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1535                    None
1536                )
1537                .with_event_field(
1538                    &owned_value_path!("kubernetes", "pod_ip"),
1539                    Kind::bytes().or_undefined(),
1540                    None
1541                )
1542                .with_event_field(
1543                    &owned_value_path!("kubernetes", "pod_ips"),
1544                    Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1545                    None
1546                )
1547                .with_event_field(
1548                    &owned_value_path!("kubernetes", "pod_labels"),
1549                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1550                    None
1551                )
1552                .with_event_field(
1553                    &owned_value_path!("kubernetes", "pod_name"),
1554                    Kind::bytes().or_undefined(),
1555                    None
1556                )
1557                .with_event_field(
1558                    &owned_value_path!("kubernetes", "pod_namespace"),
1559                    Kind::bytes().or_undefined(),
1560                    None
1561                )
1562                .with_event_field(
1563                    &owned_value_path!("kubernetes", "pod_node_name"),
1564                    Kind::bytes().or_undefined(),
1565                    None
1566                )
1567                .with_event_field(
1568                    &owned_value_path!("kubernetes", "pod_owner"),
1569                    Kind::bytes().or_undefined(),
1570                    None
1571                )
1572                .with_event_field(
1573                    &owned_value_path!("kubernetes", "pod_uid"),
1574                    Kind::bytes().or_undefined(),
1575                    None
1576                )
1577                .with_event_field(&owned_value_path!("stream"), Kind::bytes(), None)
1578                .with_event_field(
1579                    &owned_value_path!("timestamp"),
1580                    Kind::timestamp(),
1581                    Some("timestamp")
1582                )
1583                .with_event_field(
1584                    &owned_value_path!("source_type"),
1585                    Kind::bytes(),
1586                    None
1587                )
1588            )
1589        )
1590    }
1591}