1use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc};
2
3use bitmask_enum::bitmask;
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6
7mod global_options;
8mod log_schema;
9pub(crate) mod metrics_expiration;
10pub mod output_id;
11pub mod proxy;
12mod telemetry;
13
14pub use global_options::{GlobalOptions, WildcardMatching};
15pub use log_schema::{LogSchema, init_log_schema, log_schema};
16use lookup::{PathPrefix, lookup_v2::ValuePath, path};
17pub use output_id::OutputId;
18use serde::{Deserialize, Serialize};
19pub use telemetry::{Tags, Telemetry, init_telemetry, telemetry};
20pub use vector_common::config::ComponentKey;
21use vector_config::configurable_component;
22use vrl::value::Value;
23
24use crate::{event::LogEvent, schema};
25
26pub const MEMORY_BUFFER_DEFAULT_MAX_EVENTS: NonZeroUsize =
27 vector_buffers::config::memory_buffer_default_max_events();
28
29#[bitmask(u8)]
32#[bitmask_config(flags_iter)]
33pub enum DataType {
34 Log,
35 Metric,
36 Trace,
37}
38
39impl fmt::Display for DataType {
40 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41 f.debug_list()
42 .entries(
43 Self::flags().filter_map(|&(name, value)| self.contains(value).then_some(name)),
44 )
45 .finish()
46 }
47}
48
49#[derive(Debug, Clone, PartialEq)]
50pub struct Input {
51 ty: DataType,
52 log_schema_requirement: schema::Requirement,
53}
54
55impl Input {
56 pub fn data_type(&self) -> DataType {
57 self.ty
58 }
59
60 pub fn schema_requirement(&self) -> &schema::Requirement {
61 &self.log_schema_requirement
62 }
63
64 pub fn new(ty: DataType) -> Self {
65 Self {
66 ty,
67 log_schema_requirement: schema::Requirement::empty(),
68 }
69 }
70
71 pub fn log() -> Self {
72 Self {
73 ty: DataType::Log,
74 log_schema_requirement: schema::Requirement::empty(),
75 }
76 }
77
78 pub fn metric() -> Self {
79 Self {
80 ty: DataType::Metric,
81 log_schema_requirement: schema::Requirement::empty(),
82 }
83 }
84
85 pub fn trace() -> Self {
86 Self {
87 ty: DataType::Trace,
88 log_schema_requirement: schema::Requirement::empty(),
89 }
90 }
91
92 pub fn all() -> Self {
93 Self {
94 ty: DataType::all_bits(),
95 log_schema_requirement: schema::Requirement::empty(),
96 }
97 }
98
99 #[must_use]
101 pub fn with_schema_requirement(mut self, schema_requirement: schema::Requirement) -> Self {
102 self.log_schema_requirement = schema_requirement;
103 self
104 }
105}
106
107#[derive(Debug, Clone, PartialEq)]
108pub struct SourceOutput {
109 pub port: Option<String>,
110 pub ty: DataType,
111
112 pub schema_definition: Option<Arc<schema::Definition>>,
116}
117
118impl SourceOutput {
119 #[must_use]
123 pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
124 let schema_definition = ty
125 .contains(DataType::Log)
126 .then(|| Arc::new(schema_definition));
127
128 Self {
129 port: None,
130 ty,
131 schema_definition,
132 }
133 }
134
135 #[must_use]
140 pub fn new_metrics() -> Self {
141 Self {
142 port: None,
143 ty: DataType::Metric,
144 schema_definition: None,
145 }
146 }
147
148 #[must_use]
153 pub fn new_traces() -> Self {
154 Self {
155 port: None,
156 ty: DataType::Trace,
157 schema_definition: None,
158 }
159 }
160
161 #[must_use]
169 pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
170 use std::ops::Deref;
171
172 self.schema_definition.as_ref().map(|definition| {
173 if schema_enabled {
174 definition.deref().clone()
175 } else {
176 let mut new_definition =
177 schema::Definition::default_for_namespace(definition.log_namespaces());
178 new_definition.add_meanings(definition.meanings());
179 new_definition
180 }
181 })
182 }
183}
184
185impl SourceOutput {
186 #[must_use]
188 pub fn with_port(mut self, name: impl Into<String>) -> Self {
189 self.port = Some(name.into());
190 self
191 }
192}
193
194fn fmt_helper(
195 f: &mut fmt::Formatter<'_>,
196 maybe_port: Option<&String>,
197 data_type: DataType,
198) -> fmt::Result {
199 match maybe_port {
200 Some(port) => write!(f, "port: \"{port}\",",),
201 None => write!(f, "port: None,"),
202 }?;
203 write!(f, " types: {data_type}")
204}
205
206impl fmt::Display for SourceOutput {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 fmt_helper(f, self.port.as_ref(), self.ty)
209 }
210}
211
212#[derive(Debug, Clone, PartialEq)]
213pub struct TransformOutput {
214 pub port: Option<String>,
215 pub ty: DataType,
216
217 pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
222}
223
224impl fmt::Display for TransformOutput {
225 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226 fmt_helper(f, self.port.as_ref(), self.ty)
227 }
228}
229
230impl TransformOutput {
231 #[must_use]
234 pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
235 Self {
236 port: None,
237 ty,
238 log_schema_definitions: schema_definitions,
239 }
240 }
241
242 #[must_use]
244 pub fn with_port(mut self, name: impl Into<String>) -> Self {
245 self.port = Some(name.into());
246 self
247 }
248
249 #[must_use]
257 pub fn schema_definitions(
258 &self,
259 schema_enabled: bool,
260 ) -> HashMap<OutputId, schema::Definition> {
261 if schema_enabled {
262 self.log_schema_definitions.clone()
263 } else {
264 self.log_schema_definitions
265 .iter()
266 .map(|(output, definition)| {
267 let mut new_definition =
268 schema::Definition::default_for_namespace(definition.log_namespaces());
269 new_definition.add_meanings(definition.meanings());
270 (output.clone(), new_definition)
271 })
272 .collect()
273 }
274 }
275}
276
277pub fn clone_input_definitions(
281 input_definitions: &[(OutputId, schema::Definition)],
282) -> HashMap<OutputId, schema::Definition> {
283 input_definitions
284 .iter()
285 .map(|(output, definition)| (output.clone(), definition.clone()))
286 .collect()
287}
288
289#[configurable_component]
295#[configurable(deprecated)]
296#[configurable(title = "Controls how acknowledgements are handled by this source.")]
297#[configurable(
298 description = "This setting is **deprecated** in favor of enabling `acknowledgements` at the [global][global_acks] or sink level.
299
300Enabling or disabling acknowledgements at the source level has **no effect** on acknowledgement behavior.
301
302See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
303
304[global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
305[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
306)]
307#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
308pub struct SourceAcknowledgementsConfig {
309 enabled: Option<bool>,
311}
312
313impl SourceAcknowledgementsConfig {
314 pub const DEFAULT: Self = Self { enabled: None };
315
316 #[must_use]
317 pub fn merge_default(&self, other: &Self) -> Self {
318 let enabled = self.enabled.or(other.enabled);
319 Self { enabled }
320 }
321
322 pub fn enabled(&self) -> bool {
323 self.enabled.unwrap_or(false)
324 }
325}
326
327impl From<Option<bool>> for SourceAcknowledgementsConfig {
328 fn from(enabled: Option<bool>) -> Self {
329 Self { enabled }
330 }
331}
332
333impl From<bool> for SourceAcknowledgementsConfig {
334 fn from(enabled: bool) -> Self {
335 Some(enabled).into()
336 }
337}
338
339impl From<SourceAcknowledgementsConfig> for AcknowledgementsConfig {
340 fn from(config: SourceAcknowledgementsConfig) -> Self {
341 Self {
342 enabled: config.enabled,
343 }
344 }
345}
346
347#[configurable_component]
349#[configurable(title = "Controls how acknowledgements are handled for this sink.")]
350#[configurable(
351 description = "See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
352
353[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
354)]
355#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
356pub struct AcknowledgementsConfig {
357 enabled: Option<bool>,
368}
369
370impl AcknowledgementsConfig {
371 pub const DEFAULT: Self = Self { enabled: None };
372
373 #[must_use]
374 pub fn merge_default(&self, other: &Self) -> Self {
375 let enabled = self.enabled.or(other.enabled);
376 Self { enabled }
377 }
378
379 pub fn enabled(&self) -> bool {
380 self.enabled.unwrap_or(false)
381 }
382}
383
384impl From<Option<bool>> for AcknowledgementsConfig {
385 fn from(enabled: Option<bool>) -> Self {
386 Self { enabled }
387 }
388}
389
390impl From<bool> for AcknowledgementsConfig {
391 fn from(enabled: bool) -> Self {
392 Some(enabled).into()
393 }
394}
395
396#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq)]
397pub enum LogNamespace {
398 Vector,
403
404 Legacy,
409}
410
411impl From<bool> for LogNamespace {
414 fn from(x: bool) -> Self {
415 if x {
416 LogNamespace::Vector
417 } else {
418 LogNamespace::Legacy
419 }
420 }
421}
422
423impl Default for LogNamespace {
424 fn default() -> Self {
425 Self::Legacy
426 }
427}
428
429pub const NO_LEGACY_KEY: Option<LegacyKey<&'static str>> = None;
431
432pub enum LegacyKey<T> {
433 Overwrite(T),
435 InsertIfEmpty(T),
437}
438
439impl LogNamespace {
440 pub fn insert_source_metadata<'a>(
444 &self,
445 source_name: &'a str,
446 log: &mut LogEvent,
447 legacy_key: Option<LegacyKey<impl ValuePath<'a>>>,
448 metadata_key: impl ValuePath<'a>,
449 value: impl Into<Value>,
450 ) {
451 match self {
452 LogNamespace::Vector => {
453 log.metadata_mut()
454 .value_mut()
455 .insert(path!(source_name).concat(metadata_key), value);
456 }
457 LogNamespace::Legacy => match legacy_key {
458 None => { }
459 Some(LegacyKey::Overwrite(key)) => {
460 log.insert((PathPrefix::Event, key), value);
461 }
462 Some(LegacyKey::InsertIfEmpty(key)) => {
463 log.try_insert((PathPrefix::Event, key), value);
464 }
465 },
466 }
467 }
468
469 pub fn get_source_metadata<'a, 'b>(
473 &self,
474 source_name: &'a str,
475 log: &'b LogEvent,
476 legacy_key: impl ValuePath<'a>,
477 metadata_key: impl ValuePath<'a>,
478 ) -> Option<&'b Value> {
479 match self {
480 LogNamespace::Vector => log
481 .metadata()
482 .value()
483 .get(path!(source_name).concat(metadata_key)),
484 LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
485 }
486 }
487
488 pub fn insert_standard_vector_source_metadata(
494 &self,
495 log: &mut LogEvent,
496 source_name: &'static str,
497 now: DateTime<Utc>,
498 ) {
499 self.insert_vector_metadata(
500 log,
501 log_schema().source_type_key(),
502 path!("source_type"),
503 Bytes::from_static(source_name.as_bytes()),
504 );
505 self.insert_vector_metadata(
506 log,
507 log_schema().timestamp_key(),
508 path!("ingest_timestamp"),
509 now,
510 );
511 }
512
513 pub fn insert_vector_metadata<'a>(
518 &self,
519 log: &mut LogEvent,
520 legacy_key: Option<impl ValuePath<'a>>,
521 metadata_key: impl ValuePath<'a>,
522 value: impl Into<Value>,
523 ) {
524 match self {
525 LogNamespace::Vector => {
526 log.metadata_mut()
527 .value_mut()
528 .insert(path!("vector").concat(metadata_key), value);
529 }
530 LogNamespace::Legacy => {
531 if let Some(legacy_key) = legacy_key {
532 log.try_insert((PathPrefix::Event, legacy_key), value);
533 }
534 }
535 }
536 }
537
538 pub fn get_vector_metadata<'a, 'b>(
542 &self,
543 log: &'b LogEvent,
544 legacy_key: impl ValuePath<'a>,
545 metadata_key: impl ValuePath<'a>,
546 ) -> Option<&'b Value> {
547 match self {
548 LogNamespace::Vector => log
549 .metadata()
550 .value()
551 .get(path!("vector").concat(metadata_key)),
552 LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
553 }
554 }
555
556 pub fn new_log_from_data(&self, value: impl Into<Value>) -> LogEvent {
557 match self {
558 LogNamespace::Vector | LogNamespace::Legacy => LogEvent::from(value.into()),
559 }
560 }
561
562 #[must_use]
564 pub fn merge(&self, override_value: Option<impl Into<LogNamespace>>) -> LogNamespace {
565 override_value.map_or(*self, Into::into)
566 }
567}
568
569#[cfg(test)]
570mod test {
571 use chrono::Utc;
572 use lookup::{OwnedTargetPath, event_path, owned_value_path};
573 use vector_common::btreemap;
574 use vrl::value::Kind;
575
576 use super::*;
577 use crate::event::LogEvent;
578
579 #[test]
580 fn test_insert_standard_vector_source_metadata() {
581 let mut schema = LogSchema::default();
582 schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
583 "a", "b", "c", "d"
584 ))));
585 init_log_schema(schema, false);
586
587 let namespace = LogNamespace::Legacy;
588 let mut event = LogEvent::from("log");
589 namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now());
590
591 assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
592 }
593
594 #[test]
595 fn test_source_definitions_legacy() {
596 let definition = schema::Definition::empty_legacy_namespace()
597 .with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
598 .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
599 let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
600
601 let valid_event = LogEvent::from(Value::from(btreemap! {
602 "zork" => "norknoog",
603 "nork" => 32
604 }))
605 .into();
606
607 let invalid_event = LogEvent::from(Value::from(btreemap! {
608 "nork" => 32
609 }))
610 .into();
611
612 let new_definition = output.schema_definition(true).unwrap();
614
615 assert_eq!(
617 Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
618 new_definition.meaning_path("zork")
619 );
620
621 new_definition.assert_valid_for_event(&valid_event);
623 new_definition.assert_invalid_for_event(&invalid_event);
624
625 assert_eq!(
627 Some(
628 schema::Definition::default_legacy_namespace()
629 .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork")
630 ),
631 output.schema_definition(false)
632 );
633 }
634
635 #[test]
636 fn test_source_definitons_vector() {
637 let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
638 .with_metadata_field(
639 &owned_value_path!("vector", "zork"),
640 Kind::integer(),
641 Some("zork"),
642 )
643 .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
644
645 let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
646
647 let mut valid_event = LogEvent::from(Value::from(btreemap! {
648 "nork" => 32
649 }));
650
651 valid_event
652 .metadata_mut()
653 .value_mut()
654 .insert(path!("vector").concat("zork"), 32);
655
656 let valid_event = valid_event.into();
657
658 let mut invalid_event = LogEvent::from(Value::from(btreemap! {
659 "nork" => 32
660 }));
661
662 invalid_event
663 .metadata_mut()
664 .value_mut()
665 .insert(path!("vector").concat("zork"), "noog");
666
667 let invalid_event = invalid_event.into();
668
669 let new_definition = output.schema_definition(true).unwrap();
671
672 assert_eq!(
674 Some(&OwnedTargetPath::metadata(owned_value_path!(
675 "vector", "zork"
676 ))),
677 new_definition.meaning_path("zork")
678 );
679
680 new_definition.assert_valid_for_event(&valid_event);
682 new_definition.assert_invalid_for_event(&invalid_event);
683
684 let new_definition = output.schema_definition(false).unwrap();
686
687 assert_eq!(
689 Some(&OwnedTargetPath::metadata(owned_value_path!(
690 "vector", "zork"
691 ))),
692 new_definition.meaning_path("zork")
693 );
694
695 new_definition.assert_valid_for_event(&valid_event);
697 new_definition.assert_valid_for_event(&invalid_event);
698 }
699
700 #[test]
701 fn test_new_log_source_ignores_definition_with_metric_data_type() {
702 let definition = schema::Definition::any();
703 let output = SourceOutput::new_maybe_logs(DataType::Metric, definition);
704 assert_eq!(output.schema_definition(true), None);
705 }
706
707 #[test]
708 fn test_new_log_source_uses_definition_with_log_data_type() {
709 let definition = schema::Definition::any();
710 let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
711 assert_eq!(output.schema_definition(true), Some(definition));
712 }
713}