1#![deny(missing_docs)]
7use std::{cmp::min, path::PathBuf, time::Duration};
8
9use bytes::Bytes;
10use chrono::Utc;
11use futures::{future::FutureExt, stream::StreamExt};
12use futures_util::Stream;
13use http_1::{HeaderName, HeaderValue};
14use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
15use k8s_paths_provider::K8sPathsProvider;
16use kube::{
17 api::Api,
18 config::{self, KubeConfigOptions},
19 runtime::{reflector, watcher, WatchStreamExt},
20 Client, Config as ClientConfig,
21};
22use lifecycle::Lifecycle;
23use serde_with::serde_as;
24use vector_lib::codecs::{BytesDeserializer, BytesDeserializerConfig};
25use vector_lib::configurable::configurable_component;
26use vector_lib::file_source::{
27 calculate_ignore_before, Checkpointer, FileServer, FileServerShutdown, FingerprintStrategy,
28 Fingerprinter, Line, ReadFrom, ReadFromConfig,
29};
30use vector_lib::lookup::{lookup_v2::OptionalTargetPath, owned_value_path, path, OwnedTargetPath};
31use vector_lib::{config::LegacyKey, config::LogNamespace, EstimatedJsonEncodedSizeOf};
32use vector_lib::{
33 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
34 TimeZone,
35};
36use vrl::value::{kind::Collection, Kind};
37
38use crate::{
39 built_info::{PKG_NAME, PKG_VERSION},
40 sources::kubernetes_logs::partial_events_merger::merge_partial_events,
41};
42use crate::{
43 config::{
44 log_schema, ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig,
45 SourceContext, SourceOutput,
46 },
47 event::Event,
48 internal_events::{
49 FileInternalMetricsConfig, FileSourceInternalEventsEmitter, KubernetesLifecycleError,
50 KubernetesLogsEventAnnotationError, KubernetesLogsEventNamespaceAnnotationError,
51 KubernetesLogsEventNodeAnnotationError, KubernetesLogsEventsReceived,
52 KubernetesLogsPodInfo, StreamClosedError,
53 },
54 kubernetes::{custom_reflector, meta_cache::MetaCache},
55 shutdown::ShutdownSignal,
56 sources,
57 transforms::{FunctionTransform, OutputBuffer},
58 SourceSender,
59};
60
61mod k8s_paths_provider;
62mod lifecycle;
63mod namespace_metadata_annotator;
64mod node_metadata_annotator;
65mod parser;
66mod partial_events_merger;
67mod path_helpers;
68mod pod_metadata_annotator;
69mod transform_utils;
70mod util;
71
72use self::namespace_metadata_annotator::NamespaceMetadataAnnotator;
73use self::node_metadata_annotator::NodeMetadataAnnotator;
74use self::parser::Parser;
75use self::pod_metadata_annotator::PodMetadataAnnotator;
76
77const SELF_NODE_NAME_ENV_KEY: &str = "VECTOR_SELF_NODE_NAME";
79
80#[serde_as]
82#[configurable_component(source("kubernetes_logs", "Collect Pod logs from Kubernetes Nodes."))]
83#[derive(Clone, Debug)]
84#[serde(deny_unknown_fields, default)]
85pub struct Config {
86 #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
93 #[configurable(metadata(
94 docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
95 ))]
96 extra_label_selector: String,
97
98 #[configurable(metadata(docs::examples = "my_custom_label!=my_value"))]
105 #[configurable(metadata(
106 docs::examples = "my_custom_label!=my_value,my_other_custom_label=my_value"
107 ))]
108 extra_namespace_label_selector: String,
109
110 self_node_name: String,
117
118 #[configurable(metadata(docs::examples = "metadata.name!=pod-name-to-exclude"))]
126 #[configurable(metadata(
127 docs::examples = "metadata.name!=pod-name-to-exclude,metadata.name=mypod"
128 ))]
129 extra_field_selector: String,
130
131 auto_partial_merge: bool,
136
137 #[configurable(metadata(docs::examples = "/var/local/lib/vector/"))]
146 #[configurable(metadata(docs::human_name = "Data Directory"))]
147 data_dir: Option<PathBuf>,
148
149 #[configurable(derived)]
150 #[serde(alias = "annotation_fields")]
151 pod_annotation_fields: pod_metadata_annotator::FieldsSpec,
152
153 #[configurable(derived)]
154 namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec,
155
156 #[configurable(derived)]
157 node_annotation_fields: node_metadata_annotator::FieldsSpec,
158
159 #[configurable(metadata(docs::examples = "**/include/**"))]
161 include_paths_glob_patterns: Vec<PathBuf>,
162
163 #[configurable(metadata(docs::examples = "**/exclude/**"))]
165 exclude_paths_glob_patterns: Vec<PathBuf>,
166
167 #[configurable(derived)]
168 #[serde(default = "default_read_from")]
169 read_from: ReadFromConfig,
170
171 #[serde(default)]
173 #[configurable(metadata(docs::type_unit = "seconds"))]
174 #[configurable(metadata(docs::examples = 600))]
175 #[configurable(metadata(docs::human_name = "Ignore Files Older Than"))]
176 ignore_older_secs: Option<u64>,
177
178 #[configurable(metadata(docs::type_unit = "bytes"))]
184 max_read_bytes: usize,
185
186 #[serde(default = "default_oldest_first")]
188 pub oldest_first: bool,
189
190 #[configurable(metadata(docs::type_unit = "bytes"))]
194 max_line_bytes: usize,
195
196 #[configurable(metadata(docs::type_unit = "bytes"))]
204 max_merged_line_bytes: Option<usize>,
205
206 #[configurable(metadata(docs::type_unit = "lines"))]
212 fingerprint_lines: usize,
213
214 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
221 #[configurable(metadata(docs::human_name = "Glob Minimum Cooldown"))]
222 glob_minimum_cooldown_ms: Duration,
223
224 #[configurable(metadata(docs::examples = ".ingest_timestamp", docs::examples = "ingest_ts"))]
230 ingestion_timestamp_field: Option<OptionalTargetPath>,
231
232 timezone: Option<TimeZone>,
234
235 #[configurable(metadata(docs::examples = "/path/to/.kube/config"))]
241 kube_config_file: Option<PathBuf>,
242
243 use_apiserver_cache: bool,
245
246 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
253 #[configurable(metadata(docs::human_name = "Delay Deletion"))]
254 delay_deletion_ms: Duration,
255
256 #[configurable(metadata(docs::hidden))]
258 #[serde(default)]
259 log_namespace: Option<bool>,
260
261 #[configurable(derived)]
262 #[serde(default)]
263 internal_metrics: FileInternalMetricsConfig,
264
265 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
268 #[configurable(metadata(docs::type_unit = "seconds"))]
269 #[serde(default = "default_rotate_wait", rename = "rotate_wait_secs")]
270 rotate_wait: Duration,
271}
272
273const fn default_read_from() -> ReadFromConfig {
274 ReadFromConfig::Beginning
275}
276
277impl GenerateConfig for Config {
278 fn generate_config() -> toml::Value {
279 toml::Value::try_from(Self {
280 self_node_name: default_self_node_name_env_template(),
281 auto_partial_merge: true,
282 ..Default::default()
283 })
284 .unwrap()
285 }
286}
287
288impl Default for Config {
289 fn default() -> Self {
290 Self {
291 extra_label_selector: "".to_string(),
292 extra_namespace_label_selector: "".to_string(),
293 self_node_name: default_self_node_name_env_template(),
294 extra_field_selector: "".to_string(),
295 auto_partial_merge: true,
296 data_dir: None,
297 pod_annotation_fields: pod_metadata_annotator::FieldsSpec::default(),
298 namespace_annotation_fields: namespace_metadata_annotator::FieldsSpec::default(),
299 node_annotation_fields: node_metadata_annotator::FieldsSpec::default(),
300 include_paths_glob_patterns: default_path_inclusion(),
301 exclude_paths_glob_patterns: default_path_exclusion(),
302 read_from: default_read_from(),
303 ignore_older_secs: None,
304 max_read_bytes: default_max_read_bytes(),
305 oldest_first: default_oldest_first(),
306 max_line_bytes: default_max_line_bytes(),
307 max_merged_line_bytes: None,
308 fingerprint_lines: default_fingerprint_lines(),
309 glob_minimum_cooldown_ms: default_glob_minimum_cooldown_ms(),
310 ingestion_timestamp_field: None,
311 timezone: None,
312 kube_config_file: None,
313 use_apiserver_cache: false,
314 delay_deletion_ms: default_delay_deletion_ms(),
315 log_namespace: None,
316 internal_metrics: Default::default(),
317 rotate_wait: default_rotate_wait(),
318 }
319 }
320}
321
322#[async_trait::async_trait]
323#[typetag::serde(name = "kubernetes_logs")]
324impl SourceConfig for Config {
325 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
326 let log_namespace = cx.log_namespace(self.log_namespace);
327 let source = Source::new(self, &cx.globals, &cx.key).await?;
328
329 Ok(Box::pin(
330 source
331 .run(cx.out, cx.shutdown, log_namespace)
332 .map(|result| {
333 result.map_err(|error| {
334 error!(message = "Source future failed.", %error);
335 })
336 }),
337 ))
338 }
339
340 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
341 let log_namespace = global_log_namespace.merge(self.log_namespace);
342 let schema_definition = BytesDeserializerConfig
343 .schema_definition(log_namespace)
344 .with_source_metadata(
345 Self::NAME,
346 Some(LegacyKey::Overwrite(owned_value_path!("file"))),
347 &owned_value_path!("file"),
348 Kind::bytes(),
349 None,
350 )
351 .with_source_metadata(
352 Self::NAME,
353 self.pod_annotation_fields
354 .container_id
355 .path
356 .clone()
357 .map(|k| k.path)
358 .map(LegacyKey::Overwrite),
359 &owned_value_path!("container_id"),
360 Kind::bytes().or_undefined(),
361 None,
362 )
363 .with_source_metadata(
364 Self::NAME,
365 self.pod_annotation_fields
366 .container_image
367 .path
368 .clone()
369 .map(|k| k.path)
370 .map(LegacyKey::Overwrite),
371 &owned_value_path!("container_image"),
372 Kind::bytes().or_undefined(),
373 None,
374 )
375 .with_source_metadata(
376 Self::NAME,
377 self.pod_annotation_fields
378 .container_name
379 .path
380 .clone()
381 .map(|k| k.path)
382 .map(LegacyKey::Overwrite),
383 &owned_value_path!("container_name"),
384 Kind::bytes().or_undefined(),
385 None,
386 )
387 .with_source_metadata(
388 Self::NAME,
389 self.namespace_annotation_fields
390 .namespace_labels
391 .path
392 .clone()
393 .map(|x| LegacyKey::Overwrite(x.path)),
394 &owned_value_path!("namespace_labels"),
395 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
396 None,
397 )
398 .with_source_metadata(
399 Self::NAME,
400 self.node_annotation_fields
401 .node_labels
402 .path
403 .clone()
404 .map(|x| LegacyKey::Overwrite(x.path)),
405 &owned_value_path!("node_labels"),
406 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
407 None,
408 )
409 .with_source_metadata(
410 Self::NAME,
411 self.pod_annotation_fields
412 .pod_annotations
413 .path
414 .clone()
415 .map(|k| k.path)
416 .map(LegacyKey::Overwrite),
417 &owned_value_path!("pod_annotations"),
418 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
419 None,
420 )
421 .with_source_metadata(
422 Self::NAME,
423 self.pod_annotation_fields
424 .pod_ip
425 .path
426 .clone()
427 .map(|k| k.path)
428 .map(LegacyKey::Overwrite),
429 &owned_value_path!("pod_ip"),
430 Kind::bytes().or_undefined(),
431 None,
432 )
433 .with_source_metadata(
434 Self::NAME,
435 self.pod_annotation_fields
436 .pod_ips
437 .path
438 .clone()
439 .map(|k| k.path)
440 .map(LegacyKey::Overwrite),
441 &owned_value_path!("pod_ips"),
442 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
443 None,
444 )
445 .with_source_metadata(
446 Self::NAME,
447 self.pod_annotation_fields
448 .pod_labels
449 .path
450 .clone()
451 .map(|k| k.path)
452 .map(LegacyKey::Overwrite),
453 &owned_value_path!("pod_labels"),
454 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
455 None,
456 )
457 .with_source_metadata(
458 Self::NAME,
459 self.pod_annotation_fields
460 .pod_name
461 .path
462 .clone()
463 .map(|k| k.path)
464 .map(LegacyKey::Overwrite),
465 &owned_value_path!("pod_name"),
466 Kind::bytes().or_undefined(),
467 None,
468 )
469 .with_source_metadata(
470 Self::NAME,
471 self.pod_annotation_fields
472 .pod_namespace
473 .path
474 .clone()
475 .map(|k| k.path)
476 .map(LegacyKey::Overwrite),
477 &owned_value_path!("pod_namespace"),
478 Kind::bytes().or_undefined(),
479 None,
480 )
481 .with_source_metadata(
482 Self::NAME,
483 self.pod_annotation_fields
484 .pod_node_name
485 .path
486 .clone()
487 .map(|k| k.path)
488 .map(LegacyKey::Overwrite),
489 &owned_value_path!("pod_node_name"),
490 Kind::bytes().or_undefined(),
491 None,
492 )
493 .with_source_metadata(
494 Self::NAME,
495 self.pod_annotation_fields
496 .pod_owner
497 .path
498 .clone()
499 .map(|k| k.path)
500 .map(LegacyKey::Overwrite),
501 &owned_value_path!("pod_owner"),
502 Kind::bytes().or_undefined(),
503 None,
504 )
505 .with_source_metadata(
506 Self::NAME,
507 self.pod_annotation_fields
508 .pod_uid
509 .path
510 .clone()
511 .map(|k| k.path)
512 .map(LegacyKey::Overwrite),
513 &owned_value_path!("pod_uid"),
514 Kind::bytes().or_undefined(),
515 None,
516 )
517 .with_source_metadata(
518 Self::NAME,
519 Some(LegacyKey::Overwrite(owned_value_path!("stream"))),
520 &owned_value_path!("stream"),
521 Kind::bytes(),
522 None,
523 )
524 .with_source_metadata(
525 Self::NAME,
526 log_schema()
527 .timestamp_key()
528 .cloned()
529 .map(LegacyKey::Overwrite),
530 &owned_value_path!("timestamp"),
531 Kind::timestamp(),
532 Some("timestamp"),
533 )
534 .with_standard_vector_source_metadata();
535
536 vec![SourceOutput::new_maybe_logs(
537 DataType::Log,
538 schema_definition,
539 )]
540 }
541
542 fn can_acknowledge(&self) -> bool {
543 false
544 }
545}
546
547#[derive(Clone)]
548struct Source {
549 client: Client,
550 data_dir: PathBuf,
551 auto_partial_merge: bool,
552 pod_fields_spec: pod_metadata_annotator::FieldsSpec,
553 namespace_fields_spec: namespace_metadata_annotator::FieldsSpec,
554 node_field_spec: node_metadata_annotator::FieldsSpec,
555 field_selector: String,
556 label_selector: String,
557 namespace_label_selector: String,
558 node_selector: String,
559 self_node_name: String,
560 include_paths: Vec<glob::Pattern>,
561 exclude_paths: Vec<glob::Pattern>,
562 read_from: ReadFrom,
563 ignore_older_secs: Option<u64>,
564 max_read_bytes: usize,
565 oldest_first: bool,
566 max_line_bytes: usize,
567 max_merged_line_bytes: Option<usize>,
568 fingerprint_lines: usize,
569 glob_minimum_cooldown: Duration,
570 use_apiserver_cache: bool,
571 ingestion_timestamp_field: Option<OwnedTargetPath>,
572 delay_deletion: Duration,
573 include_file_metric_tag: bool,
574 rotate_wait: Duration,
575}
576
577impl Source {
578 async fn new(
579 config: &Config,
580 globals: &GlobalOptions,
581 key: &ComponentKey,
582 ) -> crate::Result<Self> {
583 let self_node_name = if config.self_node_name.is_empty()
584 || config.self_node_name == default_self_node_name_env_template()
585 {
586 std::env::var(SELF_NODE_NAME_ENV_KEY).map_err(|_| {
587 format!(
588 "self_node_name config value or {SELF_NODE_NAME_ENV_KEY} env var is not set"
589 )
590 })?
591 } else {
592 config.self_node_name.clone()
593 };
594
595 let field_selector = prepare_field_selector(config, self_node_name.as_str())?;
596 let label_selector = prepare_label_selector(config.extra_label_selector.as_ref());
597 let namespace_label_selector =
598 prepare_label_selector(config.extra_namespace_label_selector.as_ref());
599 let node_selector = prepare_node_selector(self_node_name.as_str())?;
600
601 let mut client_config = match &config.kube_config_file {
605 Some(kc) => {
606 ClientConfig::from_custom_kubeconfig(
607 config::Kubeconfig::read_from(kc)?,
608 &KubeConfigOptions::default(),
609 )
610 .await?
611 }
612 None => ClientConfig::infer().await?,
613 };
614 if let Ok(user_agent) = HeaderValue::from_str(&format!("{PKG_NAME}/{PKG_VERSION}")) {
615 client_config
616 .headers
617 .push((HeaderName::from_static("user-agent"), user_agent));
618 }
619 let client = Client::try_from(client_config)?;
620
621 let data_dir = globals.resolve_and_make_data_subdir(config.data_dir.as_ref(), key.id())?;
622
623 let include_paths = prepare_include_paths(config)?;
624
625 let exclude_paths = prepare_exclude_paths(config)?;
626
627 let glob_minimum_cooldown = config.glob_minimum_cooldown_ms;
628
629 let delay_deletion = config.delay_deletion_ms;
630
631 let ingestion_timestamp_field = config
632 .ingestion_timestamp_field
633 .clone()
634 .and_then(|k| k.path);
635
636 Ok(Self {
637 client,
638 data_dir,
639 auto_partial_merge: config.auto_partial_merge,
640 pod_fields_spec: config.pod_annotation_fields.clone(),
641 namespace_fields_spec: config.namespace_annotation_fields.clone(),
642 node_field_spec: config.node_annotation_fields.clone(),
643 field_selector,
644 label_selector,
645 namespace_label_selector,
646 node_selector,
647 self_node_name,
648 include_paths,
649 exclude_paths,
650 read_from: ReadFrom::from(config.read_from),
651 ignore_older_secs: config.ignore_older_secs,
652 max_read_bytes: config.max_read_bytes,
653 oldest_first: config.oldest_first,
654 max_line_bytes: config.max_line_bytes,
655 max_merged_line_bytes: config.max_merged_line_bytes,
656 fingerprint_lines: config.fingerprint_lines,
657 glob_minimum_cooldown,
658 use_apiserver_cache: config.use_apiserver_cache,
659 ingestion_timestamp_field,
660 delay_deletion,
661 include_file_metric_tag: config.internal_metrics.include_file_tag,
662 rotate_wait: config.rotate_wait,
663 })
664 }
665
666 async fn run(
667 self,
668 mut out: SourceSender,
669 global_shutdown: ShutdownSignal,
670 log_namespace: LogNamespace,
671 ) -> crate::Result<()> {
672 let Self {
673 client,
674 data_dir,
675 auto_partial_merge,
676 pod_fields_spec,
677 namespace_fields_spec,
678 node_field_spec,
679 field_selector,
680 label_selector,
681 namespace_label_selector,
682 node_selector,
683 self_node_name,
684 include_paths,
685 exclude_paths,
686 read_from,
687 ignore_older_secs,
688 max_read_bytes,
689 oldest_first,
690 max_line_bytes,
691 max_merged_line_bytes,
692 fingerprint_lines,
693 glob_minimum_cooldown,
694 use_apiserver_cache,
695 ingestion_timestamp_field,
696 delay_deletion,
697 include_file_metric_tag,
698 rotate_wait,
699 } = self;
700
701 let mut reflectors = Vec::new();
702
703 let pods = Api::<Pod>::all(client.clone());
704
705 let list_semantic = if use_apiserver_cache {
706 watcher::ListSemantic::Any
707 } else {
708 watcher::ListSemantic::MostRecent
709 };
710
711 let pod_watcher = watcher(
712 pods,
713 watcher::Config {
714 field_selector: Some(field_selector),
715 label_selector: Some(label_selector),
716 list_semantic: list_semantic.clone(),
717 page_size: get_page_size(use_apiserver_cache),
718 ..Default::default()
719 },
720 )
721 .backoff(watcher::DefaultBackoff::default());
722
723 let pod_store_w = reflector::store::Writer::default();
724 let pod_state = pod_store_w.as_reader();
725 let pod_cacher = MetaCache::new();
726
727 reflectors.push(tokio::spawn(custom_reflector(
728 pod_store_w,
729 pod_cacher,
730 pod_watcher,
731 delay_deletion,
732 )));
733
734 let namespaces = Api::<Namespace>::all(client.clone());
737 let ns_watcher = watcher(
738 namespaces,
739 watcher::Config {
740 label_selector: Some(namespace_label_selector),
741 list_semantic: list_semantic.clone(),
742 page_size: get_page_size(use_apiserver_cache),
743 ..Default::default()
744 },
745 )
746 .backoff(watcher::DefaultBackoff::default());
747 let ns_store_w = reflector::store::Writer::default();
748 let ns_state = ns_store_w.as_reader();
749 let ns_cacher = MetaCache::new();
750
751 reflectors.push(tokio::spawn(custom_reflector(
752 ns_store_w,
753 ns_cacher,
754 ns_watcher,
755 delay_deletion,
756 )));
757
758 let nodes = Api::<Node>::all(client);
761 let node_watcher = watcher(
762 nodes,
763 watcher::Config {
764 field_selector: Some(node_selector),
765 list_semantic,
766 page_size: get_page_size(use_apiserver_cache),
767 ..Default::default()
768 },
769 )
770 .backoff(watcher::DefaultBackoff::default());
771 let node_store_w = reflector::store::Writer::default();
772 let node_state = node_store_w.as_reader();
773 let node_cacher = MetaCache::new();
774
775 reflectors.push(tokio::spawn(custom_reflector(
776 node_store_w,
777 node_cacher,
778 node_watcher,
779 delay_deletion,
780 )));
781
782 let paths_provider = K8sPathsProvider::new(
783 pod_state.clone(),
784 ns_state.clone(),
785 include_paths,
786 exclude_paths,
787 );
788 let annotator = PodMetadataAnnotator::new(pod_state, pod_fields_spec, log_namespace);
789 let ns_annotator =
790 NamespaceMetadataAnnotator::new(ns_state, namespace_fields_spec, log_namespace);
791 let node_annotator = NodeMetadataAnnotator::new(node_state, node_field_spec, log_namespace);
792
793 let ignore_before = calculate_ignore_before(ignore_older_secs);
794
795 let mut resolved_max_line_bytes = max_line_bytes;
796 if auto_partial_merge {
797 resolved_max_line_bytes = min(
798 max_line_bytes,
799 max_merged_line_bytes.unwrap_or(max_line_bytes),
800 );
801 }
802
803 let checkpointer = Checkpointer::new(&data_dir);
806 let file_server = FileServer {
807 paths_provider,
809 max_read_bytes,
814 ignore_checkpoints: false,
817 read_from,
819 ignore_before,
825 max_line_bytes: resolved_max_line_bytes,
828 line_delimiter: Bytes::from("\n"),
830 data_dir,
832 glob_minimum_cooldown,
835 fingerprinter: Fingerprinter {
839 strategy: FingerprintStrategy::FirstLinesChecksum {
840 ignored_header_bytes: 0,
843 lines: fingerprint_lines,
844 },
845 max_line_length: resolved_max_line_bytes,
846 ignore_not_found: true,
847 },
848 oldest_first,
849 remove_after: None,
851 emitter: FileSourceInternalEventsEmitter {
853 include_file_metric_tag,
854 },
855 handle: tokio::runtime::Handle::current(),
857 rotate_wait,
858 };
859
860 let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
861
862 let checkpoints = checkpointer.view();
863 let events = file_source_rx.flat_map(futures::stream::iter);
864 let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
865 let events = events.map(move |line| {
866 let byte_size = line.text.len();
867 bytes_received.emit(ByteSize(byte_size));
868
869 let mut event = create_event(
870 line.text,
871 &line.filename,
872 ingestion_timestamp_field.as_ref(),
873 log_namespace,
874 );
875
876 let file_info = annotator.annotate(&mut event, &line.filename);
877
878 emit!(KubernetesLogsEventsReceived {
879 file: &line.filename,
880 byte_size: event.estimated_json_encoded_size_of(),
881 pod_info: file_info.as_ref().map(|info| KubernetesLogsPodInfo {
882 name: info.pod_name.to_owned(),
883 namespace: info.pod_namespace.to_owned(),
884 }),
885 });
886
887 if file_info.is_none() {
888 emit!(KubernetesLogsEventAnnotationError { event: &event });
889 } else {
890 let namespace = file_info.as_ref().map(|info| info.pod_namespace);
891
892 if let Some(name) = namespace {
893 let ns_info = ns_annotator.annotate(&mut event, name);
894
895 if ns_info.is_none() {
896 emit!(KubernetesLogsEventNamespaceAnnotationError { event: &event });
897 }
898 }
899
900 let node_info = node_annotator.annotate(&mut event, self_node_name.as_str());
901
902 if node_info.is_none() {
903 emit!(KubernetesLogsEventNodeAnnotationError { event: &event });
904 }
905 }
906
907 checkpoints.update(line.file_id, line.end_offset);
908 event
909 });
910
911 let mut parser = Parser::new(log_namespace);
912 let events = events.flat_map(move |event| {
913 let mut buf = OutputBuffer::with_capacity(1);
914 parser.transform(&mut buf, event);
915 futures::stream::iter(buf.into_events())
916 });
917
918 let (events_count, _) = events.size_hint();
919
920 let mut stream = if auto_partial_merge {
921 merge_partial_events(events, log_namespace, max_merged_line_bytes).left_stream()
922 } else {
923 events.right_stream()
924 };
925
926 let event_processing_loop = out.send_event_stream(&mut stream);
927
928 let mut lifecycle = Lifecycle::new();
929 {
930 let (slot, shutdown) = lifecycle.add();
931 let fut = util::run_file_server(file_server, file_source_tx, shutdown, checkpointer)
932 .map(|result| match result {
933 Ok(FileServerShutdown) => info!(message = "File server completed gracefully."),
934 Err(error) => emit!(KubernetesLifecycleError {
935 message: "File server exited with an error.",
936 error,
937 count: events_count,
938 }),
939 });
940 slot.bind(Box::pin(fut));
941 }
942 {
943 let (slot, shutdown) = lifecycle.add();
944 let fut = util::complete_with_deadline_on_signal(
945 event_processing_loop,
946 shutdown,
947 Duration::from_secs(30), )
949 .map(|result| {
950 match result {
951 Ok(Ok(())) => info!(message = "Event processing loop completed gracefully."),
952 Ok(Err(_)) => emit!(StreamClosedError {
953 count: events_count
954 }),
955 Err(error) => emit!(KubernetesLifecycleError {
956 error,
957 message: "Event processing loop timed out during the shutdown.",
958 count: events_count,
959 }),
960 };
961 });
962 slot.bind(Box::pin(fut));
963 }
964
965 lifecycle.run(global_shutdown).await;
966 for reflector in reflectors {
968 reflector.abort();
969 }
970 info!(message = "Done.");
971 Ok(())
972 }
973}
974
975fn get_page_size(use_apiserver_cache: bool) -> Option<u32> {
977 if use_apiserver_cache {
978 None
979 } else {
980 watcher::Config::default().page_size
981 }
982}
983
984fn create_event(
985 line: Bytes,
986 file: &str,
987 ingestion_timestamp_field: Option<&OwnedTargetPath>,
988 log_namespace: LogNamespace,
989) -> Event {
990 let deserializer = BytesDeserializer;
991 let mut log = deserializer.parse_single(line, log_namespace);
992
993 log_namespace.insert_source_metadata(
994 Config::NAME,
995 &mut log,
996 Some(LegacyKey::Overwrite(path!("file"))),
997 path!("file"),
998 file,
999 );
1000
1001 log_namespace.insert_vector_metadata(
1002 &mut log,
1003 log_schema().source_type_key(),
1004 path!("source_type"),
1005 Bytes::from(Config::NAME),
1006 );
1007 match (log_namespace, ingestion_timestamp_field) {
1008 (LogNamespace::Vector, _) => {
1010 log.metadata_mut()
1011 .value_mut()
1012 .insert(path!("vector", "ingest_timestamp"), Utc::now());
1013 }
1014 (LogNamespace::Legacy, Some(ingestion_timestamp_field)) => {
1016 log.try_insert(ingestion_timestamp_field, Utc::now())
1017 }
1018 (LogNamespace::Legacy, None) => (),
1020 };
1021
1022 log.into()
1023}
1024
1025fn default_self_node_name_env_template() -> String {
1028 format!("${{{}}}", SELF_NODE_NAME_ENV_KEY.to_owned())
1029}
1030
1031fn default_path_inclusion() -> Vec<PathBuf> {
1032 vec![PathBuf::from("**/*")]
1033}
1034
1035fn default_path_exclusion() -> Vec<PathBuf> {
1036 vec![PathBuf::from("**/*.gz"), PathBuf::from("**/*.tmp")]
1037}
1038
1039const fn default_max_read_bytes() -> usize {
1040 2048
1041}
1042
1043const fn default_oldest_first() -> bool {
1046 true
1047}
1048
1049const fn default_max_line_bytes() -> usize {
1050 32 * 1024 }
1061
1062const fn default_glob_minimum_cooldown_ms() -> Duration {
1063 Duration::from_millis(60_000)
1064}
1065
1066const fn default_fingerprint_lines() -> usize {
1067 1
1068}
1069
1070const fn default_delay_deletion_ms() -> Duration {
1071 Duration::from_millis(60_000)
1072}
1073
1074const fn default_rotate_wait() -> Duration {
1075 Duration::from_secs(u64::MAX / 2)
1076}
1077
1078fn prepare_include_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1081 prepare_glob_patterns(&config.include_paths_glob_patterns, "Including")
1082}
1083
1084fn prepare_exclude_paths(config: &Config) -> crate::Result<Vec<glob::Pattern>> {
1087 prepare_glob_patterns(&config.exclude_paths_glob_patterns, "Excluding")
1088}
1089
1090fn prepare_glob_patterns(paths: &[PathBuf], op: &str) -> crate::Result<Vec<glob::Pattern>> {
1093 let ret = paths
1094 .iter()
1095 .map(|pattern| {
1096 let pattern = pattern
1097 .to_str()
1098 .ok_or("glob pattern is not a valid UTF-8 string")?;
1099 Ok(glob::Pattern::new(pattern)?)
1100 })
1101 .collect::<crate::Result<Vec<_>>>()?;
1102
1103 info!(
1104 message = format!("{op} matching files."),
1105 ret = ?ret
1106 .iter()
1107 .map(glob::Pattern::as_str)
1108 .collect::<Vec<_>>()
1109 );
1110
1111 Ok(ret)
1112}
1113
1114fn prepare_field_selector(config: &Config, self_node_name: &str) -> crate::Result<String> {
1117 info!(
1118 message = "Obtained Kubernetes Node name to collect logs for (self).",
1119 ?self_node_name
1120 );
1121
1122 let field_selector = format!("spec.nodeName={self_node_name}");
1123
1124 if config.extra_field_selector.is_empty() {
1125 return Ok(field_selector);
1126 }
1127
1128 Ok(format!(
1129 "{},{}",
1130 field_selector, config.extra_field_selector
1131 ))
1132}
1133
1134fn prepare_node_selector(self_node_name: &str) -> crate::Result<String> {
1136 Ok(format!("metadata.name={self_node_name}"))
1137}
1138
1139fn prepare_label_selector(selector: &str) -> String {
1142 const BUILT_IN: &str = "vector.dev/exclude!=true";
1143
1144 if selector.is_empty() {
1145 return BUILT_IN.to_string();
1146 }
1147
1148 format!("{BUILT_IN},{selector}")
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153 use similar_asserts::assert_eq;
1154 use vector_lib::lookup::{owned_value_path, OwnedTargetPath};
1155 use vector_lib::{config::LogNamespace, schema::Definition};
1156 use vrl::value::{kind::Collection, Kind};
1157
1158 use crate::config::SourceConfig;
1159
1160 use super::Config;
1161
1162 #[test]
1163 fn generate_config() {
1164 crate::test_util::test_generate_config::<Config>();
1165 }
1166
1167 #[test]
1168 fn prepare_exclude_paths() {
1169 let cases = vec![
1170 (
1171 Config::default(),
1172 vec![
1173 glob::Pattern::new("**/*.gz").unwrap(),
1174 glob::Pattern::new("**/*.tmp").unwrap(),
1175 ],
1176 ),
1177 (
1178 Config {
1179 exclude_paths_glob_patterns: vec![std::path::PathBuf::from("**/*.tmp")],
1180 ..Default::default()
1181 },
1182 vec![glob::Pattern::new("**/*.tmp").unwrap()],
1183 ),
1184 (
1185 Config {
1186 exclude_paths_glob_patterns: vec![
1187 std::path::PathBuf::from("**/kube-system_*/**"),
1188 std::path::PathBuf::from("**/*.gz"),
1189 std::path::PathBuf::from("**/*.tmp"),
1190 ],
1191 ..Default::default()
1192 },
1193 vec![
1194 glob::Pattern::new("**/kube-system_*/**").unwrap(),
1195 glob::Pattern::new("**/*.gz").unwrap(),
1196 glob::Pattern::new("**/*.tmp").unwrap(),
1197 ],
1198 ),
1199 ];
1200
1201 for (input, mut expected) in cases {
1202 let mut output = super::prepare_exclude_paths(&input).unwrap();
1203 expected.sort();
1204 output.sort();
1205 assert_eq!(expected, output, "expected left, actual right");
1206 }
1207 }
1208
1209 #[test]
1210 fn prepare_field_selector() {
1211 let cases = vec![
1212 (
1215 Config {
1216 self_node_name: "qwe".to_owned(),
1217 ..Default::default()
1218 },
1219 "spec.nodeName=qwe",
1220 ),
1221 (
1222 Config {
1223 self_node_name: "qwe".to_owned(),
1224 extra_field_selector: "".to_owned(),
1225 ..Default::default()
1226 },
1227 "spec.nodeName=qwe",
1228 ),
1229 (
1230 Config {
1231 self_node_name: "qwe".to_owned(),
1232 extra_field_selector: "foo=bar".to_owned(),
1233 ..Default::default()
1234 },
1235 "spec.nodeName=qwe,foo=bar",
1236 ),
1237 ];
1238
1239 for (input, expected) in cases {
1240 let output = super::prepare_field_selector(&input, "qwe").unwrap();
1241 assert_eq!(expected, output, "expected left, actual right");
1242 }
1243 }
1244
1245 #[test]
1246 fn prepare_label_selector() {
1247 let cases = vec![
1248 (
1249 Config::default().extra_label_selector,
1250 "vector.dev/exclude!=true",
1251 ),
1252 (
1253 Config::default().extra_namespace_label_selector,
1254 "vector.dev/exclude!=true",
1255 ),
1256 (
1257 Config {
1258 extra_label_selector: "".to_owned(),
1259 ..Default::default()
1260 }
1261 .extra_label_selector,
1262 "vector.dev/exclude!=true",
1263 ),
1264 (
1265 Config {
1266 extra_namespace_label_selector: "".to_owned(),
1267 ..Default::default()
1268 }
1269 .extra_namespace_label_selector,
1270 "vector.dev/exclude!=true",
1271 ),
1272 (
1273 Config {
1274 extra_label_selector: "qwe".to_owned(),
1275 ..Default::default()
1276 }
1277 .extra_label_selector,
1278 "vector.dev/exclude!=true,qwe",
1279 ),
1280 (
1281 Config {
1282 extra_namespace_label_selector: "qwe".to_owned(),
1283 ..Default::default()
1284 }
1285 .extra_namespace_label_selector,
1286 "vector.dev/exclude!=true,qwe",
1287 ),
1288 ];
1289
1290 for (input, expected) in cases {
1291 let output = super::prepare_label_selector(&input);
1292 assert_eq!(expected, output, "expected left, actual right");
1293 }
1294 }
1295
1296 #[test]
1297 fn test_output_schema_definition_vector_namespace() {
1298 let definitions = toml::from_str::<Config>("")
1299 .unwrap()
1300 .outputs(LogNamespace::Vector)
1301 .remove(0)
1302 .schema_definition(true);
1303
1304 assert_eq!(
1305 definitions,
1306 Some(
1307 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1308 .with_metadata_field(
1309 &owned_value_path!("kubernetes_logs", "file"),
1310 Kind::bytes(),
1311 None
1312 )
1313 .with_metadata_field(
1314 &owned_value_path!("kubernetes_logs", "container_id"),
1315 Kind::bytes().or_undefined(),
1316 None
1317 )
1318 .with_metadata_field(
1319 &owned_value_path!("kubernetes_logs", "container_image"),
1320 Kind::bytes().or_undefined(),
1321 None
1322 )
1323 .with_metadata_field(
1324 &owned_value_path!("kubernetes_logs", "container_name"),
1325 Kind::bytes().or_undefined(),
1326 None
1327 )
1328 .with_metadata_field(
1329 &owned_value_path!("kubernetes_logs", "namespace_labels"),
1330 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1331 .or_undefined(),
1332 None
1333 )
1334 .with_metadata_field(
1335 &owned_value_path!("kubernetes_logs", "node_labels"),
1336 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1337 .or_undefined(),
1338 None
1339 )
1340 .with_metadata_field(
1341 &owned_value_path!("kubernetes_logs", "pod_annotations"),
1342 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1343 .or_undefined(),
1344 None
1345 )
1346 .with_metadata_field(
1347 &owned_value_path!("kubernetes_logs", "pod_ip"),
1348 Kind::bytes().or_undefined(),
1349 None
1350 )
1351 .with_metadata_field(
1352 &owned_value_path!("kubernetes_logs", "pod_ips"),
1353 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1354 None
1355 )
1356 .with_metadata_field(
1357 &owned_value_path!("kubernetes_logs", "pod_labels"),
1358 Kind::object(Collection::empty().with_unknown(Kind::bytes()))
1359 .or_undefined(),
1360 None
1361 )
1362 .with_metadata_field(
1363 &owned_value_path!("kubernetes_logs", "pod_name"),
1364 Kind::bytes().or_undefined(),
1365 None
1366 )
1367 .with_metadata_field(
1368 &owned_value_path!("kubernetes_logs", "pod_namespace"),
1369 Kind::bytes().or_undefined(),
1370 None
1371 )
1372 .with_metadata_field(
1373 &owned_value_path!("kubernetes_logs", "pod_node_name"),
1374 Kind::bytes().or_undefined(),
1375 None
1376 )
1377 .with_metadata_field(
1378 &owned_value_path!("kubernetes_logs", "pod_owner"),
1379 Kind::bytes().or_undefined(),
1380 None
1381 )
1382 .with_metadata_field(
1383 &owned_value_path!("kubernetes_logs", "pod_uid"),
1384 Kind::bytes().or_undefined(),
1385 None
1386 )
1387 .with_metadata_field(
1388 &owned_value_path!("kubernetes_logs", "stream"),
1389 Kind::bytes(),
1390 None
1391 )
1392 .with_metadata_field(
1393 &owned_value_path!("kubernetes_logs", "timestamp"),
1394 Kind::timestamp(),
1395 Some("timestamp")
1396 )
1397 .with_metadata_field(
1398 &owned_value_path!("vector", "source_type"),
1399 Kind::bytes(),
1400 None
1401 )
1402 .with_metadata_field(
1403 &owned_value_path!("vector", "ingest_timestamp"),
1404 Kind::timestamp(),
1405 None
1406 )
1407 .with_meaning(OwnedTargetPath::event_root(), "message")
1408 )
1409 )
1410 }
1411
1412 #[test]
1413 fn test_output_schema_definition_legacy_namespace() {
1414 let definitions = toml::from_str::<Config>("")
1415 .unwrap()
1416 .outputs(LogNamespace::Legacy)
1417 .remove(0)
1418 .schema_definition(true);
1419
1420 assert_eq!(
1421 definitions,
1422 Some(
1423 Definition::new_with_default_metadata(
1424 Kind::object(Collection::empty()),
1425 [LogNamespace::Legacy]
1426 )
1427 .with_event_field(&owned_value_path!("file"), Kind::bytes(), None)
1428 .with_event_field(
1429 &owned_value_path!("message"),
1430 Kind::bytes(),
1431 Some("message")
1432 )
1433 .with_event_field(
1434 &owned_value_path!("kubernetes", "container_id"),
1435 Kind::bytes().or_undefined(),
1436 None
1437 )
1438 .with_event_field(
1439 &owned_value_path!("kubernetes", "container_image"),
1440 Kind::bytes().or_undefined(),
1441 None
1442 )
1443 .with_event_field(
1444 &owned_value_path!("kubernetes", "container_name"),
1445 Kind::bytes().or_undefined(),
1446 None
1447 )
1448 .with_event_field(
1449 &owned_value_path!("kubernetes", "namespace_labels"),
1450 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1451 None
1452 )
1453 .with_event_field(
1454 &owned_value_path!("kubernetes", "node_labels"),
1455 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1456 None
1457 )
1458 .with_event_field(
1459 &owned_value_path!("kubernetes", "pod_annotations"),
1460 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1461 None
1462 )
1463 .with_event_field(
1464 &owned_value_path!("kubernetes", "pod_ip"),
1465 Kind::bytes().or_undefined(),
1466 None
1467 )
1468 .with_event_field(
1469 &owned_value_path!("kubernetes", "pod_ips"),
1470 Kind::array(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1471 None
1472 )
1473 .with_event_field(
1474 &owned_value_path!("kubernetes", "pod_labels"),
1475 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1476 None
1477 )
1478 .with_event_field(
1479 &owned_value_path!("kubernetes", "pod_name"),
1480 Kind::bytes().or_undefined(),
1481 None
1482 )
1483 .with_event_field(
1484 &owned_value_path!("kubernetes", "pod_namespace"),
1485 Kind::bytes().or_undefined(),
1486 None
1487 )
1488 .with_event_field(
1489 &owned_value_path!("kubernetes", "pod_node_name"),
1490 Kind::bytes().or_undefined(),
1491 None
1492 )
1493 .with_event_field(
1494 &owned_value_path!("kubernetes", "pod_owner"),
1495 Kind::bytes().or_undefined(),
1496 None
1497 )
1498 .with_event_field(
1499 &owned_value_path!("kubernetes", "pod_uid"),
1500 Kind::bytes().or_undefined(),
1501 None
1502 )
1503 .with_event_field(&owned_value_path!("stream"), Kind::bytes(), None)
1504 .with_event_field(
1505 &owned_value_path!("timestamp"),
1506 Kind::timestamp(),
1507 Some("timestamp")
1508 )
1509 .with_event_field(
1510 &owned_value_path!("source_type"),
1511 Kind::bytes(),
1512 None
1513 )
1514 )
1515 )
1516 }
1517}