1use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc};
2
3use bitmask_enum::bitmask;
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6
7mod global_options;
8mod log_schema;
9pub(crate) mod metrics_expiration;
10pub mod output_id;
11pub mod proxy;
12mod telemetry;
13
14pub use global_options::{GlobalOptions, WildcardMatching};
15pub use log_schema::{LogSchema, init_log_schema, log_schema};
16use lookup::{PathPrefix, lookup_v2::ValuePath, path};
17pub use output_id::OutputId;
18use serde::{Deserialize, Serialize};
19pub use telemetry::{Tags, Telemetry, init_telemetry, telemetry};
20pub use vector_common::config::ComponentKey;
21use vector_config::configurable_component;
22use vrl::value::Value;
23
24use crate::{event::LogEvent, schema};
25
26pub const MEMORY_BUFFER_DEFAULT_MAX_EVENTS: NonZeroUsize =
27 vector_buffers::config::memory_buffer_default_max_events();
28
29#[bitmask(u8)]
32#[bitmask_config(flags_iter)]
33pub enum DataType {
34 Log,
35 Metric,
36 Trace,
37}
38
39impl fmt::Display for DataType {
40 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41 f.debug_list()
42 .entries(
43 Self::flags().filter_map(|&(name, value)| self.contains(value).then_some(name)),
44 )
45 .finish()
46 }
47}
48
49#[derive(Debug, Clone, PartialEq)]
50pub struct Input {
51 ty: DataType,
52 log_schema_requirement: schema::Requirement,
53}
54
55impl Input {
56 pub fn data_type(&self) -> DataType {
57 self.ty
58 }
59
60 pub fn schema_requirement(&self) -> &schema::Requirement {
61 &self.log_schema_requirement
62 }
63
64 pub fn new(ty: DataType) -> Self {
65 Self {
66 ty,
67 log_schema_requirement: schema::Requirement::empty(),
68 }
69 }
70
71 pub fn log() -> Self {
72 Self {
73 ty: DataType::Log,
74 log_schema_requirement: schema::Requirement::empty(),
75 }
76 }
77
78 pub fn metric() -> Self {
79 Self {
80 ty: DataType::Metric,
81 log_schema_requirement: schema::Requirement::empty(),
82 }
83 }
84
85 pub fn trace() -> Self {
86 Self {
87 ty: DataType::Trace,
88 log_schema_requirement: schema::Requirement::empty(),
89 }
90 }
91
92 pub fn all() -> Self {
93 Self {
94 ty: DataType::all_bits(),
95 log_schema_requirement: schema::Requirement::empty(),
96 }
97 }
98
99 #[must_use]
101 pub fn with_schema_requirement(mut self, schema_requirement: schema::Requirement) -> Self {
102 self.log_schema_requirement = schema_requirement;
103 self
104 }
105}
106
107#[derive(Debug, Clone, PartialEq)]
108pub struct SourceOutput {
109 pub port: Option<String>,
110 pub ty: DataType,
111
112 pub schema_definition: Option<Arc<schema::Definition>>,
116}
117
118impl SourceOutput {
119 #[must_use]
123 pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
124 let schema_definition = ty
125 .contains(DataType::Log)
126 .then(|| Arc::new(schema_definition));
127
128 Self {
129 port: None,
130 ty,
131 schema_definition,
132 }
133 }
134
135 #[must_use]
140 pub fn new_metrics() -> Self {
141 Self {
142 port: None,
143 ty: DataType::Metric,
144 schema_definition: None,
145 }
146 }
147
148 #[must_use]
153 pub fn new_traces() -> Self {
154 Self {
155 port: None,
156 ty: DataType::Trace,
157 schema_definition: None,
158 }
159 }
160
161 #[must_use]
169 pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
170 use std::ops::Deref;
171
172 self.schema_definition.as_ref().map(|definition| {
173 if schema_enabled {
174 definition.deref().clone()
175 } else {
176 let mut new_definition =
177 schema::Definition::default_for_namespace(definition.log_namespaces());
178 new_definition.add_meanings(definition.meanings());
179 new_definition
180 }
181 })
182 }
183}
184
185impl SourceOutput {
186 #[must_use]
188 pub fn with_port(mut self, name: impl Into<String>) -> Self {
189 self.port = Some(name.into());
190 self
191 }
192}
193
194fn fmt_helper(
195 f: &mut fmt::Formatter<'_>,
196 maybe_port: Option<&String>,
197 data_type: DataType,
198) -> fmt::Result {
199 match maybe_port {
200 Some(port) => write!(f, "port: \"{port}\",",),
201 None => write!(f, "port: None,"),
202 }?;
203 write!(f, " types: {data_type}")
204}
205
206impl fmt::Display for SourceOutput {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 fmt_helper(f, self.port.as_ref(), self.ty)
209 }
210}
211
212#[derive(Debug, Clone, PartialEq)]
213pub struct TransformOutput {
214 pub port: Option<String>,
215 pub ty: DataType,
216
217 pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
222}
223
224impl fmt::Display for TransformOutput {
225 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226 fmt_helper(f, self.port.as_ref(), self.ty)
227 }
228}
229
230impl TransformOutput {
231 #[must_use]
234 pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
235 Self {
236 port: None,
237 ty,
238 log_schema_definitions: schema_definitions,
239 }
240 }
241
242 #[must_use]
244 pub fn with_port(mut self, name: impl Into<String>) -> Self {
245 self.port = Some(name.into());
246 self
247 }
248
249 #[must_use]
257 pub fn schema_definitions(
258 &self,
259 schema_enabled: bool,
260 ) -> HashMap<OutputId, schema::Definition> {
261 if schema_enabled {
262 self.log_schema_definitions.clone()
263 } else {
264 self.log_schema_definitions
265 .iter()
266 .map(|(output, definition)| {
267 let mut new_definition =
268 schema::Definition::default_for_namespace(definition.log_namespaces());
269 new_definition.add_meanings(definition.meanings());
270 (output.clone(), new_definition)
271 })
272 .collect()
273 }
274 }
275}
276
277pub fn clone_input_definitions(
281 input_definitions: &[(OutputId, schema::Definition)],
282) -> HashMap<OutputId, schema::Definition> {
283 input_definitions
284 .iter()
285 .map(|(output, definition)| (output.clone(), definition.clone()))
286 .collect()
287}
288
289#[configurable_component]
295#[configurable(deprecated)]
296#[configurable(title = "Controls how acknowledgements are handled by this source.")]
297#[configurable(
298 description = "This setting is **deprecated** in favor of enabling `acknowledgements` at the [global][global_acks] or sink level.
299
300Enabling or disabling acknowledgements at the source level has **no effect** on acknowledgement behavior.
301
302See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
303
304[global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
305[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
306)]
307#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
308pub struct SourceAcknowledgementsConfig {
309 enabled: Option<bool>,
311}
312
313impl SourceAcknowledgementsConfig {
314 pub const DEFAULT: Self = Self { enabled: None };
315
316 #[must_use]
317 pub fn merge_default(&self, other: &Self) -> Self {
318 let enabled = self.enabled.or(other.enabled);
319 Self { enabled }
320 }
321
322 pub fn enabled(&self) -> bool {
323 self.enabled.unwrap_or(false)
324 }
325}
326
327impl From<Option<bool>> for SourceAcknowledgementsConfig {
328 fn from(enabled: Option<bool>) -> Self {
329 Self { enabled }
330 }
331}
332
333impl From<bool> for SourceAcknowledgementsConfig {
334 fn from(enabled: bool) -> Self {
335 Some(enabled).into()
336 }
337}
338
339impl From<SourceAcknowledgementsConfig> for AcknowledgementsConfig {
340 fn from(config: SourceAcknowledgementsConfig) -> Self {
341 Self {
342 enabled: config.enabled,
343 }
344 }
345}
346
347#[configurable_component]
349#[configurable(title = "Controls how acknowledgements are handled for this sink.")]
350#[configurable(
351 description = "See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
352
353[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
354)]
355#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
356pub struct AcknowledgementsConfig {
357 enabled: Option<bool>,
368}
369
370impl AcknowledgementsConfig {
371 pub const DEFAULT: Self = Self { enabled: None };
372
373 #[must_use]
374 pub fn merge_default(&self, other: &Self) -> Self {
375 let enabled = self.enabled.or(other.enabled);
376 Self { enabled }
377 }
378
379 pub fn enabled(&self) -> bool {
380 self.enabled.unwrap_or(false)
381 }
382}
383
384impl From<Option<bool>> for AcknowledgementsConfig {
385 fn from(enabled: Option<bool>) -> Self {
386 Self { enabled }
387 }
388}
389
390impl From<bool> for AcknowledgementsConfig {
391 fn from(enabled: bool) -> Self {
392 Some(enabled).into()
393 }
394}
395
396#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq, Default)]
397pub enum LogNamespace {
398 Vector,
403
404 #[default]
409 Legacy,
410}
411
412impl From<bool> for LogNamespace {
415 fn from(x: bool) -> Self {
416 if x {
417 LogNamespace::Vector
418 } else {
419 LogNamespace::Legacy
420 }
421 }
422}
423
424impl From<LogNamespace> for bool {
425 fn from(x: LogNamespace) -> Self {
426 x == LogNamespace::Vector
427 }
428}
429
430pub const NO_LEGACY_KEY: Option<LegacyKey<&'static str>> = None;
432
433pub enum LegacyKey<T> {
434 Overwrite(T),
436 InsertIfEmpty(T),
438}
439
440impl LogNamespace {
441 pub fn insert_source_metadata<'a>(
445 &self,
446 source_name: &'a str,
447 log: &mut LogEvent,
448 legacy_key: Option<LegacyKey<impl ValuePath<'a>>>,
449 metadata_key: impl ValuePath<'a>,
450 value: impl Into<Value>,
451 ) {
452 match self {
453 LogNamespace::Vector => {
454 log.metadata_mut()
455 .value_mut()
456 .insert(path!(source_name).concat(metadata_key), value);
457 }
458 LogNamespace::Legacy => match legacy_key {
459 None => { }
460 Some(LegacyKey::Overwrite(key)) => {
461 log.insert((PathPrefix::Event, key), value);
462 }
463 Some(LegacyKey::InsertIfEmpty(key)) => {
464 log.try_insert((PathPrefix::Event, key), value);
465 }
466 },
467 }
468 }
469
470 pub fn get_source_metadata<'a, 'b>(
474 &self,
475 source_name: &'a str,
476 log: &'b LogEvent,
477 legacy_key: impl ValuePath<'a>,
478 metadata_key: impl ValuePath<'a>,
479 ) -> Option<&'b Value> {
480 match self {
481 LogNamespace::Vector => log
482 .metadata()
483 .value()
484 .get(path!(source_name).concat(metadata_key)),
485 LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
486 }
487 }
488
489 pub fn insert_standard_vector_source_metadata(
495 &self,
496 log: &mut LogEvent,
497 source_name: &'static str,
498 now: DateTime<Utc>,
499 ) {
500 self.insert_vector_metadata(
501 log,
502 log_schema().source_type_key(),
503 path!("source_type"),
504 Bytes::from_static(source_name.as_bytes()),
505 );
506 self.insert_vector_metadata(
507 log,
508 log_schema().timestamp_key(),
509 path!("ingest_timestamp"),
510 now,
511 );
512 }
513
514 pub fn insert_vector_metadata<'a>(
519 &self,
520 log: &mut LogEvent,
521 legacy_key: Option<impl ValuePath<'a>>,
522 metadata_key: impl ValuePath<'a>,
523 value: impl Into<Value>,
524 ) {
525 match self {
526 LogNamespace::Vector => {
527 log.metadata_mut()
528 .value_mut()
529 .insert(path!("vector").concat(metadata_key), value);
530 }
531 LogNamespace::Legacy => {
532 if let Some(legacy_key) = legacy_key {
533 log.try_insert((PathPrefix::Event, legacy_key), value);
534 }
535 }
536 }
537 }
538
539 pub fn get_vector_metadata<'a, 'b>(
543 &self,
544 log: &'b LogEvent,
545 legacy_key: impl ValuePath<'a>,
546 metadata_key: impl ValuePath<'a>,
547 ) -> Option<&'b Value> {
548 match self {
549 LogNamespace::Vector => log
550 .metadata()
551 .value()
552 .get(path!("vector").concat(metadata_key)),
553 LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
554 }
555 }
556
557 pub fn new_log_from_data(&self, value: impl Into<Value>) -> LogEvent {
558 match self {
559 LogNamespace::Vector | LogNamespace::Legacy => LogEvent::from(value.into()),
560 }
561 }
562
563 #[must_use]
565 pub fn merge(&self, override_value: Option<impl Into<LogNamespace>>) -> LogNamespace {
566 override_value.map_or(*self, Into::into)
567 }
568}
569
570#[cfg(test)]
571mod test {
572 use chrono::Utc;
573 use lookup::{OwnedTargetPath, event_path, owned_value_path};
574 use vector_common::btreemap;
575 use vrl::value::Kind;
576
577 use super::*;
578 use crate::event::LogEvent;
579
580 #[test]
581 fn test_insert_standard_vector_source_metadata() {
582 let mut schema = LogSchema::default();
583 schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
584 "a", "b", "c", "d"
585 ))));
586 init_log_schema(schema, false);
587
588 let namespace = LogNamespace::Legacy;
589 let mut event = LogEvent::from("log");
590 namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now());
591
592 assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
593 }
594
595 #[test]
596 fn test_source_definitions_legacy() {
597 let definition = schema::Definition::empty_legacy_namespace()
598 .with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
599 .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
600 let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
601
602 let valid_event = LogEvent::from(Value::from(btreemap! {
603 "zork" => "norknoog",
604 "nork" => 32
605 }))
606 .into();
607
608 let invalid_event = LogEvent::from(Value::from(btreemap! {
609 "nork" => 32
610 }))
611 .into();
612
613 let new_definition = output.schema_definition(true).unwrap();
615
616 assert_eq!(
618 Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
619 new_definition.meaning_path("zork")
620 );
621
622 new_definition.assert_valid_for_event(&valid_event);
624 new_definition.assert_invalid_for_event(&invalid_event);
625
626 assert_eq!(
628 Some(
629 schema::Definition::default_legacy_namespace()
630 .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork")
631 ),
632 output.schema_definition(false)
633 );
634 }
635
636 #[test]
637 fn test_source_definitons_vector() {
638 let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
639 .with_metadata_field(
640 &owned_value_path!("vector", "zork"),
641 Kind::integer(),
642 Some("zork"),
643 )
644 .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
645
646 let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
647
648 let mut valid_event = LogEvent::from(Value::from(btreemap! {
649 "nork" => 32
650 }));
651
652 valid_event
653 .metadata_mut()
654 .value_mut()
655 .insert(path!("vector").concat("zork"), 32);
656
657 let valid_event = valid_event.into();
658
659 let mut invalid_event = LogEvent::from(Value::from(btreemap! {
660 "nork" => 32
661 }));
662
663 invalid_event
664 .metadata_mut()
665 .value_mut()
666 .insert(path!("vector").concat("zork"), "noog");
667
668 let invalid_event = invalid_event.into();
669
670 let new_definition = output.schema_definition(true).unwrap();
672
673 assert_eq!(
675 Some(&OwnedTargetPath::metadata(owned_value_path!(
676 "vector", "zork"
677 ))),
678 new_definition.meaning_path("zork")
679 );
680
681 new_definition.assert_valid_for_event(&valid_event);
683 new_definition.assert_invalid_for_event(&invalid_event);
684
685 let new_definition = output.schema_definition(false).unwrap();
687
688 assert_eq!(
690 Some(&OwnedTargetPath::metadata(owned_value_path!(
691 "vector", "zork"
692 ))),
693 new_definition.meaning_path("zork")
694 );
695
696 new_definition.assert_valid_for_event(&valid_event);
698 new_definition.assert_valid_for_event(&invalid_event);
699 }
700
701 #[test]
702 fn test_new_log_source_ignores_definition_with_metric_data_type() {
703 let definition = schema::Definition::any();
704 let output = SourceOutput::new_maybe_logs(DataType::Metric, definition);
705 assert_eq!(output.schema_definition(true), None);
706 }
707
708 #[test]
709 fn test_new_log_source_uses_definition_with_log_data_type() {
710 let definition = schema::Definition::any();
711 let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
712 assert_eq!(output.schema_definition(true), Some(definition));
713 }
714}