1#![deny(missing_docs)]
7use std::{cmp::min, path::PathBuf, time::Duration};
8
9use bytes::Bytes;
10use chrono::Utc;
11use futures::{future::FutureExt, stream::StreamExt};
12use futures_util::Stream;
13use http_1::{HeaderName, HeaderValue};
14use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
15use k8s_paths_provider::K8sPathsProvider;
16use kube::{
17 Client, Config as ClientConfig,
18 api::Api,
19 config::{self, KubeConfigOptions},
20 runtime::{WatchStreamExt, reflector, watcher},
21};
22use lifecycle::Lifecycle;
23use serde_with::serde_as;
24use vector_lib::{
25 EstimatedJsonEncodedSizeOf, TimeZone,
26 codecs::{BytesDeserializer, BytesDeserializerConfig},
27 config::{LegacyKey, LogNamespace},
28 configurable::configurable_component,
29 file_source::file_server::{
30 FileServer, Line, Shutdown as FileServerShutdown, calculate_ignore_before,
31 },
32 file_source_common::{
33 Checkpointer, FingerprintStrategy, Fingerprinter, ReadFrom, ReadFromConfig,
34 },
35 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
36 lookup::{OwnedTargetPath, lookup_v2::OptionalTargetPath, owned_value_path, path},
37};
38use vrl::value::{Kind, kind::Collection};
39
40use crate::{
41 SourceSender,
42 built_info::{PKG_NAME, PKG_VERSION},
43 config::{
44 ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig, SourceContext,
45 SourceOutput, log_schema,
46 },
47 event::Event,
48 internal_events::{
49 FileInternalMetricsConfig, FileSourceInternalEventsEmitter, KubernetesLifecycleError,
50 KubernetesLogsEventAnnotationError, KubernetesLogsEventNamespaceAnnotationError,
51 KubernetesLogsEventNodeAnnotationError, KubernetesLogsEventsReceived,
52 KubernetesLogsPodInfo, StreamClosedError,
53 },
54 kubernetes::{custom_reflector, meta_cache::MetaCache},
55 shutdown::ShutdownSignal,
56 sources,
57 sources::kubernetes_logs::partial_events_merger::merge_partial_events,
58 transforms::{FunctionTransform, OutputBuffer},
59};
60
61mod k8s_paths_provider;
62mod lifecycle;
63mod namespace_metadata_annotator;
64mod node_metadata_annotator;
65mod parser;
66mod partial_events_merger;
67mod path_helpers;
68mod pod_metadata_annotator;
69mod transform_utils;
70mod util;
71
72use self::{
73 namespace_metadata_annotator::NamespaceMetadataAnnotator,
74 node_metadata_annotator::NodeMetadataAnnotator, parser::Parser,
75 pod_metadata_annotator::PodMetadataAnnotator,
76};
77
78const SELF_NODE_NAME_ENV_KEY: &str = "VECTOR_SELF_NODE_NAME";
80
81#[serde_as]
83#[configurable_component(source("kubernetes_logs", "Collect Pod logs from Kubernetes Nodes."))]
84#[derive(Clone, Debug)]
85#[serde(deny_unknown_fields, default)]
86pub struct Config {
87 #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
94 #[configurable(metadata(
95 docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
96 ))]
97 extra_label_selector: String,
98
99 #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
106 #[configurable(metadata(
107 docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
108 ))]
109 extra_namespace_label_selector: String,
110
111 #[serde(default = "default_insert_namespace_fields")]
118 insert_namespace_fields: bool,
119
120 self_node_name: String,
127
128 #[configurable(metadata(docs::examples = "metadata.name!=pod-name-to-exclude"))]
136 #[configurable(metadata(
137 docs::examples = "metadata.name!=pod-name-to-exclude,metadata.name=mypod"
138 ))]
139 extra_field_selector: String,
140
141 auto_partial_merge: bool,
146
147 #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
156 #[configurable(metadata(docs::human_name = "Data Directory"))]
157 data_dir: Option<PathBuf>,
158
159 #[configurable(derived)]
160 #[serde(alias = "annotation_fields")]
161 pod_annotation_fields: pod_metadata_annotator::FieldsSpec,
162
163 #[configurable(derived)]
164 namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec,
165
166 #[configurable(derived)]
167 node_annotation_fields: node_metadata_annotator::FieldsSpec,
168
169 #[configurable(metadata(docs::examples = "**/include/**"))]
171 include_paths_glob_patterns: Vec<PathBuf>,
172
173 #[configurable(metadata(docs::examples = "**/exclude/**"))]
175 exclude_paths_glob_patterns: Vec<PathBuf>,
176
177 #[configurable(derived)]
178 #[serde(default = "default_read_from")]
179 read_from: ReadFromConfig,
180
181 #[serde(default)]
183 #[configurable(metadata(docs::type_unit = "seconds"))]
184 #[configurable(metadata(docs::examples = 600))]
185 #[configurable(metadata(docs::human_name = "Ignore Files Older Than"))]
186 ignore_older_secs: Option<u64>,
187
188 #[configurable(metadata(docs::type_unit = "bytes"))]
194 max_read_bytes: usize,
195
196 #[serde(default = "default_oldest_first")]
198 pub oldest_first: bool,
199
200 #[configurable(metadata(docs::type_unit = "bytes"))]
204 max_line_bytes: usize,
205
206 #[configurable(metadata(docs::type_unit = "bytes"))]
214 max_merged_line_bytes: Option<usize>,
215
216 #[configurable(metadata(docs::type_unit = "lines"))]
222 fingerprint_lines: usize,
223
224 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
231 #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
232 glob_minimum_cooldown_ms: Duration,
233
234 #[configurable(metadata(docs::examples = ".ingest_timestamp", docs::examples = "ingest_ts"))]
240 ingestion_timestamp_field: Option<OptionalTargetPath>,
241
242 timezone: Option<TimeZone>,
244
245 #[configurable(metadata(docs::examples = "/path/to/.kube/config"))]
251 kube_config_file: Option<PathBuf>,
252
253 use_apiserver_cache: bool,
255
256 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
263 #[configurable(metadata(docs::human_name = "Delay Deletion"))]
264 delay_deletion_ms: Duration,
265
266 #[configurable(metadata(docs::hidden))]
268 #[serde(default)]
269 log_namespace: Option<bool>,
270
271 #[configurable(derived)]
272 #[serde(default)]
273 internal_metrics: FileInternalMetricsConfig,
274
275 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
278 #[configurable(metadata(docs::type_unit = "seconds"))]
279 #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
280 rotate_wait: Duration,
281}
282
283const fn default_read_from() -> ReadFromConfig {
284 ReadFromConfig::Beginning
285}
286
287impl GenerateConfig for Config {
288 fn generate_config() -> toml::Value {
289 toml::Value::try_from(Self {
290 self_node_name: default_self_node_name_env_template(),
291 auto_partial_merge: true,
292 ..Default::default()
293 })
294 .unwrap()
295 }
296}
297
298impl Default for Config {
299 fn default() -> Self {
300 Self {
301 extra_label_selector: "".to_string(),
302 extra_namespace_label_selector: "".to_string(),
303 insert_namespace_fields: true,
304 self_node_name: default_self_node_name_env_template(),
305 extra_field_selector: "".to_string(),
306 auto_partial_merge: true,
307 data_dir: None,
308 pod_annotation_fields: pod_metadata_annotator::FieldsSpec::default(),
309 namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec::default(),
310 node_annotation_fields: node_metadata_annotator::FieldsSpec::default(),
311 include_paths_glob_patterns: default_path_inclusion(),
312 exclude_paths_glob_patterns: default_path_exclusion(),
313 read_from: default_read_from(),
314 ignore_older_secs: None,
315 max_read_bytes: default_max_read_bytes(),
316 oldest_first: default_oldest_first(),
317 max_line_bytes: default_max_line_bytes(),
318 max_merged_line_bytes: None,
319 fingerprint_lines: default_fingerprint_lines(),
320 glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
321 ingestion_timestamp_field: None,
322 timezone: None,
323 kube_config_file: None,
324 use_apiserver_cache: false,
325 delay_deletion_ms: default_delay_deletion_ms(),
326 log_namespace: None,
327 internal_metrics: Default::default(),
328 rotate_wait: default_rotate_wait(),
329 }
330 }
331}
332
333#[async_trait::async_trait]
334#[typetag::serde(name = "kubernetes_logs")]
335impl SourceConfig for Config {
336 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
337 let log_namespace = cx.log_namespace(self.log_namespace);
338 let source = Source::new(self, &cx.globals, &cx.key).await?;
339
340 Ok(Box::pin(
341 source
342 .run(cx.out, cx.shutdown, log_namespace)
343 .map(|result| {
344 result.map_err(|error| {
345 error!(message = "Source future failed.", %error);
346 })
347 }),
348 ))
349 }
350
351 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
352 let log_namespace = global_log_namespace.merge(self.log_namespace);
353 let schema_definition = BytesDeserializerConfig
354 .schema_definition(log_namespace)
355 .with_source_metadata(
356 Self::NAME,
357 Some(LegacyKey::Overwrite(owned_value_path!("file"))),
358 &owned_value_path!("file"),
359 Kind::bytes(),
360 None,
361 )
362 .with_source_metadata(
363 Self::NAME,
364 self.pod_annotation_fields
365 .container_id
366 .path
367 .clone()
368 .map(|k| k.path)
369 .map(LegacyKey::Overwrite),
370 &owned_value_path!("container_id"),
371 Kind::bytes().or_undefined(),
372 None,
373 )
374 .with_source_metadata(
375 Self::NAME,
376 self.pod_annotation_fields
377 .container_image
378 .path
379 .clone()
380 .map(|k| k.path)
381 .map(LegacyKey::Overwrite),
382 &owned_value_path!("container_image"),
383 Kind::bytes().or_undefined(),
384 None,
385 )
386 .with_source_metadata(
387 Self::NAME,
388 self.pod_annotation_fields
389 .container_name
390 .path
391 .clone()
392 .map(|k| k.path)
393 .map(LegacyKey::Overwrite),
394 &owned_value_path!("container_name"),
395 Kind::bytes().or_undefined(),
396 None,
397 )
398 .with_source_metadata(
399 Self::NAME,
400 self.namespace_annotation_fields
401 .namespace_labels
402 .path
403 .clone()
404 .map(|x| LegacyKey::Overwrite(x.path)),
405 &owned_value_path!("namespace_labels"),
406 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
407 None,
408 )
409 .with_source_metadata(
410 Self::NAME,
411 self.node_annotation_fields
412 .node_labels
413 .path
414 .clone()
415 .map(|x| LegacyKey::Overwrite(x.path)),
416 &owned_value_path!("node_labels"),
417 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
418 None,
419 )
420 .with_source_metadata(
421 Self::NAME,
422 self.pod_annotation_fields
423 .pod_annotations
424 .path
425 .clone()
426 .map(|k| k.path)
427 .map(LegacyKey::Overwrite),
428 &owned_value_path!("pod_annotations"),
429 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
430 None,
431 )
432 .with_source_metadata(
433 Self::NAME,
434 self.pod_annotation_fields
435 .pod_ip
436 .path
437 .clone()
438 .map(|k| k.path)
439 .map(LegacyKey::Overwrite),
440 &owned_value_path!("pod_ip"),
441 Kind::bytes().or_undefined(),
442 None,
443 )
444 .with_source_metadata(
445 Self::NAME,
446 self.pod_annotation_fields
447 .pod_ips
448 .path
449 .clone()
450 .map(|k| k.path)
451 .map(LegacyKey::Overwrite),
452 &owned_value_path!("pod_ips"),
453 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
454 None,
455 )
456 .with_source_metadata(
457 Self::NAME,
458 self.pod_annotation_fields
459 .pod_labels
460 .path
461 .clone()
462 .map(|k| k.path)
463 .map(LegacyKey::Overwrite),
464 &owned_value_path!("pod_labels"),
465 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
466 None,
467 )
468 .with_source_metadata(
469 Self::NAME,
470 self.pod_annotation_fields
471 .pod_name
472 .path
473 .clone()
474 .map(|k| k.path)
475 .map(LegacyKey::Overwrite),
476 &owned_value_path!("pod_name"),
477 Kind::bytes().or_undefined(),
478 None,
479 )
480 .with_source_metadata(
481 Self::NAME,
482 self.pod_annotation_fields
483 .pod_namespace
484 .path
485 .clone()
486 .map(|k| k.path)
487 .map(LegacyKey::Overwrite),
488 &owned_value_path!("pod_namespace"),
489 Kind::bytes().or_undefined(),
490 None,
491 )
492 .with_source_metadata(
493 Self::NAME,
494 self.pod_annotation_fields
495 .pod_node_name
496 .path
497 .clone()
498 .map(|k| k.path)
499 .map(LegacyKey::Overwrite),
500 &owned_value_path!("pod_node_name"),
501 Kind::bytes().or_undefined(),
502 None,
503 )
504 .with_source_metadata(
505 Self::NAME,
506 self.pod_annotation_fields
507 .pod_owner
508 .path
509 .clone()
510 .map(|k| k.path)
511 .map(LegacyKey::Overwrite),
512 &owned_value_path!("pod_owner"),
513 Kind::bytes().or_undefined(),
514 None,
515 )
516 .with_source_metadata(
517 Self::NAME,
518 self.pod_annotation_fields
519 .pod_uid
520 .path
521 .clone()
522 .map(|k| k.path)
523 .map(LegacyKey::Overwrite),
524 &owned_value_path!("pod_uid"),
525 Kind::bytes().or_undefined(),
526 None,
527 )
528 .with_source_metadata(
529 Self::NAME,
530 Some(LegacyKey::Overwrite(owned_value_path!("stream"))),
531 &owned_value_path!("stream"),
532 Kind::bytes(),
533 None,
534 )
535 .with_source_metadata(
536 Self::NAME,
537 log_schema()
538 .timestamp_key()
539 .cloned()
540 .map(LegacyKey::Overwrite),
541 &owned_value_path!("timestamp"),
542 Kind::timestamp(),
543 Some("timestamp"),
544 )
545 .with_standard_vector_source_metadata();
546
547 vec![SourceOutput::new_maybe_logs(
548 DataType::Log,
549 schema_definition,
550 )]
551 }
552
553 fn can_acknowledge(&self) -> bool {
554 false
555 }
556}
557
558#[derive(Clone)]
559struct Source {
560 client: Client,
561 data_dir: PathBuf,
562 auto_partial_merge: bool,
563 pod_fields_spec: pod_metadata_annotator::FieldsSpec,
564 namespace_fields_spec: namespace_metadata_annotator::FieldsSpec,
565 node_field_spec: node_metadata_annotator::FieldsSpec,
566 field_selector: String,
567 label_selector: String,
568 namespace_label_selector: String,
569 insert_namespace_fields: bool,
570 node_selector: String,
571 self_node_name: String,
572 include_paths: Vec<glob::Pattern>,
573 exclude_paths: Vec<glob::Pattern>,
574 read_from: ReadFrom,
575 ignore_older_secs: Option<u64>,
576 max_read_bytes: usize,
577 oldest_first: bool,
578 max_line_bytes: usize,
579 max_merged_line_bytes: Option<usize>,
580 fingerprint_lines: usize,
581 glob_minimum_cooldown: Duration,
582 use_apiserver_cache: bool,
583 ingestion_timestamp_field: Option<OwnedTargetPath>,
584 delay_deletion: Duration,
585 include_file_metric_tag: bool,
586 rotate_wait: Duration,
587}
588
589impl Source {
590 async fn new(
591 config: &Config,
592 globals: &GlobalOptions,
593 key: &ComponentKey,
594 ) -> crate::Result<Self> {
595 let self_node_name = if config.self_node_name.is_empty()
596 || config.self_node_name == default_self_node_name_env_template()
597 {
598 std::env::var(SELF_NODE_NAME_ENV_KEY).map_err(|_| {
599 format!(
600 "self_node_name config value or {SELF_NODE_NAME_ENV_KEY} env var is not set"
601 )
602 })?
603 } else {
604 config.self_node_name.clone()
605 };
606
607 let field_selector = prepare_field_selector(config, self_node_name.as_str())?;
608 let label_selector = prepare_label_selector(config.extra_label_selector.as_ref());
609 let namespace_label_selector =
610 prepare_label_selector(config.extra_namespace_label_selector.as_ref());
611 let node_selector = prepare_node_selector(self_node_name.as_str())?;
612
613 let mut client_config = match &config.kube_config_file {
617 Some(kc) => {
618 ClientConfig::from_custom_kubeconfig(
619 config::Kubeconfig::read_from(kc)?,
620 &KubeConfigOptions::default(),
621 )
622 .await?
623 }
624 None => ClientConfig::infer().await?,
625 };
626 if let Ok(user_agent) = HeaderValue::from_str(&format!("{PKG_NAME}/{PKG_VERSION}")) {
627 client_config
628 .headers
629 .push((HeaderName::from_static("user-agent"), user_agent));
630 }
631 let client = Client::try_from(client_config)?;
632
633 let data_dir = globals.resolve_and_make_data_subdir(config.data_dir.as_ref(), key.id())?;
634
635 let include_paths = prepare_include_paths(config)?;
636
637 let exclude_paths = prepare_exclude_paths(config)?;
638
639 let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
640
641 let delay_deletion = config.delay_deletion_ms;
642
643 let ingestion_timestamp_field = config
644 .ingestion_timestamp_field
645 .clone()
646 .and_then(|k| k.path);
647
648 Ok(Self {
649 client,
650 data_dir,
651 auto_partial_merge: config.auto_partial_merge,
652 pod_fields_spec: config.pod_annotation_fields.clone(),
653 namespace_fields_spec: config.namespace_annotation_fields.clone(),
654 node_field_spec: config.node_annotation_fields.clone(),
655 field_selector,
656 label_selector,
657 namespace_label_selector,
658 insert_namespace_fields: config.insert_namespace_fields,
659 node_selector,
660 self_node_name,
661 include_paths,
662 exclude_paths,
663 read_from: ReadFrom::from(config.read_from),
664 ignore_older_secs: config.ignore_older_secs,
665 max_read_bytes: config.max_read_bytes,
666 oldest_first: config.oldest_first,
667 max_line_bytes: config.max_line_bytes,
668 max_merged_line_bytes: config.max_merged_line_bytes,
669 fingerprint_lines: config.fingerprint_lines,
670 glob_minimum_cooldown,
671 use_apiserver_cache: config.use_apiserver_cache,
672 ingestion_timestamp_field,
673 delay_deletion,
674 include_file_metric_tag: config.internal_metrics.include_file_tag,
675 rotate_wait: config.rotate_wait,
676 })
677 }
678
679 async fn run(
680 self,
681 mut out: SourceSender,
682 global_shutdown: ShutdownSignal,
683 log_namespace: LogNamespace,
684 ) -> crate::Result<()> {
685 let Self {
686 client,
687 data_dir,
688 auto_partial_merge,
689 pod_fields_spec,
690 namespace_fields_spec,
691 node_field_spec,
692 field_selector,
693 label_selector,
694 namespace_label_selector,
695 insert_namespace_fields,
696 node_selector,
697 self_node_name,
698 include_paths,
699 exclude_paths,
700 read_from,
701 ignore_older_secs,
702 max_read_bytes,
703 oldest_first,
704 max_line_bytes,
705 max_merged_line_bytes,
706 fingerprint_lines,
707 glob_minimum_cooldown,
708 use_apiserver_cache,
709 ingestion_timestamp_field,
710 delay_deletion,
711 include_file_metric_tag,
712 rotate_wait,
713 } = self;
714
715 let mut reflectors = Vec::new();
716
717 let pods = Api::<Pod>::all(client.clone());
718
719 let list_semantic = if use_apiserver_cache {
720 watcher::ListSemantic::Any
721 } else {
722 watcher::ListSemantic::MostRecent
723 };
724
725 let pod_watcher = watcher(
726 pods,
727 watcher::Config {
728 field_selector: Some(field_selector),
729 label_selector: Some(label_selector),
730 list_semantic: list_semantic.clone(),
731 page_size: get_page_size(use_apiserver_cache),
732 ..Default::default()
733 },
734 )
735 .backoff(watcher::DefaultBackoff::default());
736
737 let pod_store_w = reflector::store::Writer::default();
738 let pod_state = pod_store_w.as_reader();
739 let pod_cacher = MetaCache::new();
740
741 reflectors.push(tokio::spawn(custom_reflector(
742 pod_store_w,
743 pod_cacher,
744 pod_watcher,
745 delay_deletion,
746 )));
747
748 let ns_store_w = reflector::store::Writer::default();
751 let ns_state = ns_store_w.as_reader();
752 if insert_namespace_fields {
753 let namespaces = Api::<Namespace>::all(client.clone());
754 let ns_watcher = watcher(
755 namespaces,
756 watcher::Config {
757 label_selector: Some(namespace_label_selector),
758 list_semantic: list_semantic.clone(),
759 page_size: get_page_size(use_apiserver_cache),
760 ..Default::default()
761 },
762 )
763 .backoff(watcher::DefaultBackoff::default());
764
765 reflectors.push(tokio::spawn(custom_reflector(
766 ns_store_w,
767 MetaCache::new(),
768 ns_watcher,
769 delay_deletion,
770 )));
771 }
772
773 let nodes = Api::<Node>::all(client);
776 let node_watcher = watcher(
777 nodes,
778 watcher::Config {
779 field_selector: Some(node_selector),
780 list_semantic,
781 page_size: get_page_size(use_apiserver_cache),
782 ..Default::default()
783 },
784 )
785 .backoff(watcher::DefaultBackoff::default());
786 let node_store_w = reflector::store::Writer::default();
787 let node_state = node_store_w.as_reader();
788 let node_cacher = MetaCache::new();
789
790 reflectors.push(tokio::spawn(custom_reflector(
791 node_store_w,
792 node_cacher,
793 node_watcher,
794 delay_deletion,
795 )));
796
797 let paths_provider = K8sPathsProvider::new(
798 pod_state.clone(),
799 ns_state.clone(),
800 include_paths,
801 exclude_paths,
802 insert_namespace_fields,
803 );
804 let annotator = PodMetadataAnnotator::new(pod_state, pod_fields_spec, log_namespace);
805 let ns_annotator =
806 NamespaceMetadataAnnotator::new(ns_state, namespace_fields_spec, log_namespace);
807 let node_annotator = NodeMetadataAnnotator::new(node_state, node_field_spec, log_namespace);
808
809 let ignore_before = calculate_ignore_before(ignore_older_secs);
810
811 let mut resolved_max_line_bytes = max_line_bytes;
812 if auto_partial_merge {
813 resolved_max_line_bytes = min(
814 max_line_bytes,
815 max_merged_line_bytes.unwrap_or(max_line_bytes),
816 );
817 }
818
819 let checkpointer = Checkpointer::new(&data_dir);
822 let file_server = FileServer {
823 paths_provider,
825 max_read_bytes,
830 ignore_checkpoints: false,
833 read_from,
835 ignore_before,
841 max_line_bytes: resolved_max_line_bytes,
844 line_delimiter: Bytes::from("\n"),
846 data_dir,
848 glob_minimum_cooldown,
851 fingerprinter: Fingerprinter {
855 strategy: FingerprintStrategy::FirstLinesChecksum {
856 ignored_header_bytes: 0,
859 lines: fingerprint_lines,
860 },
861 max_line_length: resolved_max_line_bytes,
862 ignore_not_found: true,
863 },
864 oldest_first,
865 remove_after: None,
867 emitter: FileSourceInternalEventsEmitter {
869 include_file_metric_tag,
870 },
871 rotate_wait,
873 };
874
875 let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
876
877 let checkpoints = checkpointer.view();
878 let events = file_source_rx.flat_map(futures::stream::iter);
879 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
880 let events = events.map(move |line| {
881 let byte_size = line.text.len();
882 bytes_received.emit(ByteSize(byte_size));
883
884 let mut event = create_event(
885 line.text,
886 &line.filename,
887 ingestion_timestamp_field.as_ref(),
888 log_namespace,
889 );
890
891 let file_info = annotator.annotate(&mut event, &line.filename);
892
893 emit!(KubernetesLogsEventsReceived {
894 file: &line.filename,
895 byte_size: event.estimated_json_encoded_size_of(),
896 pod_info: file_info.as_ref().map(|info| KubernetesLogsPodInfo {
897 name: info.pod_name.to_owned(),
898 namespace: info.pod_namespace.to_owned(),
899 }),
900 });
901
902 if file_info.is_none() {
903 emit!(KubernetesLogsEventAnnotationError { event: &event });
904 } else {
905 let namespace = file_info.as_ref().map(|info| info.pod_namespace);
906
907 if insert_namespace_fields
908 && let Some(name) = namespace
909 && ns_annotator.annotate(&mut event, name).is_none()
910 {
911 emit!(KubernetesLogsEventNamespaceAnnotationError { event: &event });
912 }
913
914 let node_info = node_annotator.annotate(&mut event, self_node_name.as_str());
915
916 if node_info.is_none() {
917 emit!(KubernetesLogsEventNodeAnnotationError { event: &event });
918 }
919 }
920
921 checkpoints.update(line.file_id, line.end_offset);
922 event
923 });
924
925 let mut parser = Parser::new(log_namespace);
926 let events = events.flat_map(move |event| {
927 let mut buf = OutputBuffer::with_capacity(1);
928 parser.transform(&mut buf, event);
929 futures::stream::iter(buf.into_events())
930 });
931
932 let (events_count, _) = events.size_hint();
933
934 let mut stream = if auto_partial_merge {
935 merge_partial_events(events, log_namespace, max_merged_line_bytes).left_stream()
936 } else {
937 events.right_stream()
938 };
939
940 let event_processing_loop = out.send_event_stream(&mut stream);
941
942 let mut lifecycle = Lifecycle::new();
943 {
944 let (slot, shutdown) = lifecycle.add();
945 let fut = util::run_file_server(file_server, file_source_tx, shutdown, checkpointer)
946 .map(|result| match result {
947 Ok(FileServerShutdown) => info!(message = "File server completed gracefully."),
948 Err(error) => emit!(KubernetesLifecycleError {
949 message: "File server exited with an error.",
950 error,
951 count: events_count,
952 }),
953 });
954 slot.bind(Box::pin(fut));
955 }
956 {
957 let (slot, shutdown) = lifecycle.add();
958 let fut = util::complete_with_deadline_on_signal(
959 event_processing_loop,
960 shutdown,
961 Duration::from_secs(30), )
963 .map(|result| {
964 match result {
965 Ok(Ok(())) => info!(message = "Event processing loop completed gracefully."),
966 Ok(Err(_)) => emit!(StreamClosedError {
967 count: events_count
968 }),
969 Err(error) => emit!(KubernetesLifecycleError {
970 error,
971 message: "Event processing loop timed out during the shutdown.",
972 count: events_count,
973 }),
974 };
975 });
976 slot.bind(Box::pin(fut));
977 }
978
979 lifecycle.run(global_shutdown).await;
980 for reflector in reflectors {
982 reflector.abort();
983 }
984 info!(message = "Done.");
985 Ok(())
986 }
987}
988
989fn get_page_size(use_apiserver_cache: bool) -> Option<u32> {
991 if use_apiserver_cache {
992 None
993 } else {
994 watcher::Config::default().page_size
995 }
996}
997
998fn create_event(
999 line: Bytes,
1000 file: &str,
1001 ingestion_timestamp_field: Option<&OwnedTargetPath>,
1002 log_namespace: LogNamespace,
1003) -> Event {
1004 let deserializer = BytesDeserializer;
1005 let mut log = deserializer.parse_single(line, log_namespace);
1006
1007 log_namespace.insert_source_metadata(
1008 Config::NAME,
1009 &mut log,
1010 Some(LegacyKey::Overwrite(path!("file"))),
1011 path!("file"),
1012 file,
1013 );
1014
1015 log_namespace.insert_vector_metadata(
1016 &mut log,
1017 log_schema().source_type_key(),
1018 path!("source_type"),
1019 Bytes::from(Config::NAME),
1020 );
1021 match (log_namespace, ingestion_timestamp_field) {
1022 (LogNamespace::Vector, _) => {
1024 log.metadata_mut()
1025 .value_mut()
1026 .insert(path!("vector", "ingest_timestamp"), Utc::now());
1027 }
1028 (LogNamespace::Legacy, Some(ingestion_timestamp_field)) => {
1030 log.try_insert(ingestion_timestamp_field, Utc::now())
1031 }
1032 (LogNamespace::Legacy, None) => (),
1034 };
1035
1036 log.into()
1037}
1038
1039fn default_self_node_name_env_template() -> String {
1042 format!("${{{}}}", SELF_NODE_NAME_ENV_KEY.to_owned())
1043}
1044
1045fn default_path_inclusion() -> Vec<PathBuf> {
1046 vec![PathBuf::from("**/*")]
1047}
1048
1049fn default_path_exclusion() -> Vec<PathBuf> {
1050 vec![PathBuf::from("**/*.gz"), PathBuf::from("**/*.tmp")]
1051}
1052
1053const fn default_max_read_bytes() -> usize {
1054 2048
1055}
1056
1057const fn default_oldest_first() -> bool {
1060 true
1061}
1062
1063const fn default_insert_namespace_fields() -> bool {
1065 true
1066}
1067
1068const fn default_max_line_bytes() -> usize {
1069 32 * 1024 }
1080
1081const fn default_glob_minimum_cooldown_ms() -> Duration {
1082 Duration::from_millis(60_000)
1083}
1084
1085const fn default_fingerprint_lines() -> usize {
1086 1
1087}
1088
1089const fn default_delay_deletion_ms() -> Duration {
1090 Duration::from_millis(60_000)
1091}
1092
1093const fn default_rotate_wait() -> Duration {
1094 Duration::from_secs(u64::MAX / 2)
1095}
1096
1097fn prepare_include_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1100 prepare_glob_patterns(&config.include_paths_glob_patterns, "Including")
1101}
1102
1103fn prepare_exclude_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1106 prepare_glob_patterns(&config.exclude_paths_glob_patterns, "Excluding")
1107}
1108
1109fn prepare_glob_patterns(paths: &[PathBuf], op: &str) -> crate::Result<Vec<glob::Pattern>> {
1112 let ret = paths
1113 .iter()
1114 .map(|pattern| {
1115 let pattern = pattern
1116 .to_str()
1117 .ok_or("glob pattern is not a valid UTF-8 string")?;
1118 Ok(glob::Pattern::new(pattern)?)
1119 })
1120 .collect::<crate::Result<Vec<_>>>()?;
1121
1122 info!(
1123 message = format!("{op} matching files."),
1124 ret = ?ret
1125 .iter()
1126 .map(glob::Pattern::as_str)
1127 .collect::<Vec<_>>()
1128 );
1129
1130 Ok(ret)
1131}
1132
1133fn prepare_field_selector(config: &Config, self_node_name: &str) -> crate::Result<String> {
1136 info!(
1137 message = "Obtained Kubernetes Node name to collect logs for (self).",
1138 ?self_node_name
1139 );
1140
1141 let field_selector = format!("spec.nodeName={self_node_name}");
1142
1143 if config.extra_field_selector.is_empty() {
1144 return Ok(field_selector);
1145 }
1146
1147 Ok(format!(
1148 "{},{}",
1149 field_selector, config.extra_field_selector
1150 ))
1151}
1152
1153fn prepare_node_selector(self_node_name: &str) -> crate::Result<String> {
1155 Ok(format!("metadata.name={self_node_name}"))
1156}
1157
1158fn prepare_label_selector(selector: &str) -> String {
1161 const BUILT_IN: &str = "vector.dev/exclude!=true";
1162
1163 if selector.is_empty() {
1164 return BUILT_IN.to_string();
1165 }
1166
1167 format!("{BUILT_IN},{selector}")
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use similar_asserts::assert_eq;
1173 use vector_lib::{
1174 config::LogNamespace,
1175 lookup::{OwnedTargetPath, owned_value_path},
1176 schema::Definition,
1177 };
1178 use vrl::value::{Kind, kind::Collection};
1179
1180 use super::Config;
1181 use crate::config::SourceConfig;
1182
1183 #[test]
1184 fn generate_config() {
1185 crate::test_util::test_generate_config::<Config>();
1186 }
1187
1188 #[test]
1189 fn test_default_config_insert_namespace_fields() {
1190 let config = Config::default();
1191 assert_eq!(config.insert_namespace_fields, true);
1192 }
1193
1194 #[test]
1195 fn test_config_insert_namespace_fields_disabled() {
1196 let config = Config {
1197 insert_namespace_fields: false,
1198 ..Default::default()
1199 };
1200 assert_eq!(config.insert_namespace_fields, false);
1201 }
1202
1203 #[test]
1204 fn test_config_serialization_insert_namespace_fields() {
1205 let toml_config = r#"
1207 insert_namespace_fields = false
1208 "#;
1209 let config: Config = toml::from_str(toml_config).unwrap();
1210 assert_eq!(config.insert_namespace_fields, false);
1211
1212 let default_toml = "";
1213 let default_config: Config = toml::from_str(default_toml).unwrap();
1214 assert_eq!(default_config.insert_namespace_fields, true);
1215 }
1216
1217 #[test]
1218 fn test_insert_namespace_fields_affects_behavior() {
1219 let enabled_config = Config {
1222 insert_namespace_fields: true,
1223 ..Default::default()
1224 };
1225 let disabled_config = Config {
1226 insert_namespace_fields: false,
1227 ..Default::default()
1228 };
1229
1230 assert!(should_watch_namespaces(&enabled_config));
1233 assert!(!should_watch_namespaces(&disabled_config));
1234 }
1235
1236 fn should_watch_namespaces(config: &Config) -> bool {
1238 config.insert_namespace_fields
1239 }
1240
1241 #[test]
1242 fn prepare_exclude_paths() {
1243 let cases = vec![
1244 (
1245 Config::default(),
1246 vec![
1247 glob::Pattern::new("**/*.gz").unwrap(),
1248 glob::Pattern::new("**/*.tmp").unwrap(),
1249 ],
1250 ),
1251 (
1252 Config {
1253 exclude_paths_glob_patterns: vec![std::path::PathBuf::from("**/*.tmp")],
1254 ..Default::default()
1255 },
1256 vec![glob::Pattern::new("**/*.tmp").unwrap()],
1257 ),
1258 (
1259 Config {
1260 exclude_paths_glob_patterns: vec![
1261 std::path::PathBuf::from("**/kube-system_*/**"),
1262 std::path::PathBuf::from("**/*.gz"),
1263 std::path::PathBuf::from("**/*.tmp"),
1264 ],
1265 ..Default::default()
1266 },
1267 vec![
1268 glob::Pattern::new("**/kube-system_*/**").unwrap(),
1269 glob::Pattern::new("**/*.gz").unwrap(),
1270 glob::Pattern::new("**/*.tmp").unwrap(),
1271 ],
1272 ),
1273 ];
1274
1275 for (input, mut expected) in cases {
1276 let mut output = super::prepare_exclude_paths(&input).unwrap();
1277 expected.sort();
1278 output.sort();
1279 assert_eq!(expected, output, "expected left, actual right");
1280 }
1281 }
1282
1283 #[test]
1284 fn prepare_field_selector() {
1285 let cases = vec![
1286 (
1289 Config {
1290 self_node_name: "qwe".to_owned(),
1291 ..Default::default()
1292 },
1293 "spec.nodeName=qwe",
1294 ),
1295 (
1296 Config {
1297 self_node_name: "qwe".to_owned(),
1298 extra_field_selector: "".to_owned(),
1299 ..Default::default()
1300 },
1301 "spec.nodeName=qwe",
1302 ),
1303 (
1304 Config {
1305 self_node_name: "qwe".to_owned(),
1306 extra_field_selector: "foo=bar".to_owned(),
1307 ..Default::default()
1308 },
1309 "spec.nodeName=qwe,foo=bar",
1310 ),
1311 ];
1312
1313 for (input, expected) in cases {
1314 let output = super::prepare_field_selector(&input, "qwe").unwrap();
1315 assert_eq!(expected, output, "expected left, actual right");
1316 }
1317 }
1318
1319 #[test]
1320 fn prepare_label_selector() {
1321 let cases = vec![
1322 (
1323 Config::default().extra_label_selector,
1324 "vector.dev/exclude!=true",
1325 ),
1326 (
1327 Config::default().extra_namespace_label_selector,
1328 "vector.dev/exclude!=true",
1329 ),
1330 (
1331 Config {
1332 extra_label_selector: "".to_owned(),
1333 ..Default::default()
1334 }
1335 .extra_label_selector,
1336 "vector.dev/exclude!=true",
1337 ),
1338 (
1339 Config {
1340 extra_namespace_label_selector: "".to_owned(),
1341 ..Default::default()
1342 }
1343 .extra_namespace_label_selector,
1344 "vector.dev/exclude!=true",
1345 ),
1346 (
1347 Config {
1348 extra_label_selector: "qwe".to_owned(),
1349 ..Default::default()
1350 }
1351 .extra_label_selector,
1352 "vector.dev/exclude!=true,qwe",
1353 ),
1354 (
1355 Config {
1356 extra_namespace_label_selector: "qwe".to_owned(),
1357 ..Default::default()
1358 }
1359 .extra_namespace_label_selector,
1360 "vector.dev/exclude!=true,qwe",
1361 ),
1362 ];
1363
1364 for (input, expected) in cases {
1365 let output = super::prepare_label_selector(&input);
1366 assert_eq!(expected, output, "expected left, actual right");
1367 }
1368 }
1369
1370 #[test]
1371 fn test_output_schema_definition_vector_namespace() {
1372 let definitions = toml::from_str::<Config>("")
1373 .unwrap()
1374 .outputs(LogNamespace::Vector)
1375 .remove(0)
1376 .schema_definition(true);
1377
1378 assert_eq!(
1379 definitions,
1380 Some(
1381 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1382 .with_metadata_field(
1383 &owned_value_path!("kubernetes_logs", "file"),
1384 Kind::bytes(),
1385 None
1386 )
1387 .with_metadata_field(
1388 &owned_value_path!("kubernetes_logs", "container_id"),
1389 Kind::bytes().or_undefined(),
1390 None
1391 )
1392 .with_metadata_field(
1393 &owned_value_path!("kubernetes_logs", "container_image"),
1394 Kind::bytes().or_undefined(),
1395 None
1396 )
1397 .with_metadata_field(
1398 &owned_value_path!("kubernetes_logs", "container_name"),
1399 Kind::bytes().or_undefined(),
1400 None
1401 )
1402 .with_metadata_field(
1403 &owned_value_path!("kubernetes_logs", "namespace_labels"),
1404 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1405 .or_undefined(),
1406 None
1407 )
1408 .with_metadata_field(
1409 &owned_value_path!("kubernetes_logs", "node_labels"),
1410 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1411 .or_undefined(),
1412 None
1413 )
1414 .with_metadata_field(
1415 &owned_value_path!("kubernetes_logs", "pod_annotations"),
1416 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1417 .or_undefined(),
1418 None
1419 )
1420 .with_metadata_field(
1421 &owned_value_path!("kubernetes_logs", "pod_ip"),
1422 Kind::bytes().or_undefined(),
1423 None
1424 )
1425 .with_metadata_field(
1426 &owned_value_path!("kubernetes_logs", "pod_ips"),
1427 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1428 None
1429 )
1430 .with_metadata_field(
1431 &owned_value_path!("kubernetes_logs", "pod_labels"),
1432 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1433 .or_undefined(),
1434 None
1435 )
1436 .with_metadata_field(
1437 &owned_value_path!("kubernetes_logs", "pod_name"),
1438 Kind::bytes().or_undefined(),
1439 None
1440 )
1441 .with_metadata_field(
1442 &owned_value_path!("kubernetes_logs", "pod_namespace"),
1443 Kind::bytes().or_undefined(),
1444 None
1445 )
1446 .with_metadata_field(
1447 &owned_value_path!("kubernetes_logs", "pod_node_name"),
1448 Kind::bytes().or_undefined(),
1449 None
1450 )
1451 .with_metadata_field(
1452 &owned_value_path!("kubernetes_logs", "pod_owner"),
1453 Kind::bytes().or_undefined(),
1454 None
1455 )
1456 .with_metadata_field(
1457 &owned_value_path!("kubernetes_logs", "pod_uid"),
1458 Kind::bytes().or_undefined(),
1459 None
1460 )
1461 .with_metadata_field(
1462 &owned_value_path!("kubernetes_logs", "stream"),
1463 Kind::bytes(),
1464 None
1465 )
1466 .with_metadata_field(
1467 &owned_value_path!("kubernetes_logs", "timestamp"),
1468 Kind::timestamp(),
1469 Some("timestamp")
1470 )
1471 .with_metadata_field(
1472 &owned_value_path!("vector", "source_type"),
1473 Kind::bytes(),
1474 None
1475 )
1476 .with_metadata_field(
1477 &owned_value_path!("vector", "ingest_timestamp"),
1478 Kind::timestamp(),
1479 None
1480 )
1481 .with_meaning(OwnedTargetPath::event_root(), "message")
1482 )
1483 )
1484 }
1485
1486 #[test]
1487 fn test_output_schema_definition_legacy_namespace() {
1488 let definitions = toml::from_str::<Config>("")
1489 .unwrap()
1490 .outputs(LogNamespace::Legacy)
1491 .remove(0)
1492 .schema_definition(true);
1493
1494 assert_eq!(
1495 definitions,
1496 Some(
1497 Definition::new_with_default_metadata(
1498 Kind::object(Collection::empty()),
1499 [LogNamespace::Legacy]
1500 )
1501 .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1502 .with_event_field(
1503 &owned_value_path!("message"),
1504 Kind::bytes(),
1505 Some("message")
1506 )
1507 .with_event_field(
1508 &owned_value_path!("kubernetes", "container_id"),
1509 Kind::bytes().or_undefined(),
1510 None
1511 )
1512 .with_event_field(
1513 &owned_value_path!("kubernetes", "container_image"),
1514 Kind::bytes().or_undefined(),
1515 None
1516 )
1517 .with_event_field(
1518 &owned_value_path!("kubernetes", "container_name"),
1519 Kind::bytes().or_undefined(),
1520 None
1521 )
1522 .with_event_field(
1523 &owned_value_path!("kubernetes", "namespace_labels"),
1524 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1525 None
1526 )
1527 .with_event_field(
1528 &owned_value_path!("kubernetes", "node_labels"),
1529 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1530 None
1531 )
1532 .with_event_field(
1533 &owned_value_path!("kubernetes", "pod_annotations"),
1534 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1535 None
1536 )
1537 .with_event_field(
1538 &owned_value_path!("kubernetes", "pod_ip"),
1539 Kind::bytes().or_undefined(),
1540 None
1541 )
1542 .with_event_field(
1543 &owned_value_path!("kubernetes", "pod_ips"),
1544 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1545 None
1546 )
1547 .with_event_field(
1548 &owned_value_path!("kubernetes", "pod_labels"),
1549 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1550 None
1551 )
1552 .with_event_field(
1553 &owned_value_path!("kubernetes", "pod_name"),
1554 Kind::bytes().or_undefined(),
1555 None
1556 )
1557 .with_event_field(
1558 &owned_value_path!("kubernetes", "pod_namespace"),
1559 Kind::bytes().or_undefined(),
1560 None
1561 )
1562 .with_event_field(
1563 &owned_value_path!("kubernetes", "pod_node_name"),
1564 Kind::bytes().or_undefined(),
1565 None
1566 )
1567 .with_event_field(
1568 &owned_value_path!("kubernetes", "pod_owner"),
1569 Kind::bytes().or_undefined(),
1570 None
1571 )
1572 .with_event_field(
1573 &owned_value_path!("kubernetes", "pod_uid"),
1574 Kind::bytes().or_undefined(),
1575 None
1576 )
1577 .with_event_field(&owned_value_path!("stream"), Kind::bytes(), None)
1578 .with_event_field(
1579 &owned_value_path!("timestamp"),
1580 Kind::timestamp(),
1581 Some("timestamp")
1582 )
1583 .with_event_field(
1584 &owned_value_path!("source_type"),
1585 Kind::bytes(),
1586 None
1587 )
1588 )
1589 )
1590 }
1591}