1use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc};
2
3use bitmask_enum::bitmask;
4use bytes::Bytes;
5use chrono::{DateTime, Utc};
6
7mod global_options;
8mod log_schema;
9pub(crate) mod metrics_expiration;
10pub mod output_id;
11pub mod proxy;
12mod telemetry;
13
14pub use global_options::{GlobalOptions, WildcardMatching};
15pub use log_schema::{LogSchema, init_log_schema, log_schema};
16use lookup::{PathPrefix, lookup_v2::ValuePath, path};
17pub use output_id::OutputId;
18use serde::{Deserialize, Serialize};
19pub use telemetry::{Tags, Telemetry, init_telemetry, telemetry};
20pub use vector_common::config::ComponentKey;
21use vector_config::configurable_component;
22use vrl::value::Value;
23
24use crate::{event::LogEvent, schema};
25
26pub const MEMORY_BUFFER_DEFAULT_MAX_EVENTS: NonZeroUsize =
27 vector_buffers::config::memory_buffer_default_max_events();
28
29#[bitmask(u8)]
32#[bitmask_config(flags_iter)]
33pub enum DataType {
34 Log,
35 Metric,
36 Trace,
37}
38
39impl fmt::Display for DataType {
40 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
41 f.debug_list()
42 .entries(
43 Self::flags().filter_map(|&(name, value)| self.contains(value).then_some(name)),
44 )
45 .finish()
46 }
47}
48
49#[derive(Debug, Clone, PartialEq)]
50pub struct Input {
51 ty: DataType,
52 log_schema_requirement: schema::Requirement,
53}
54
55impl Input {
56 pub fn data_type(&self) -> DataType {
57 self.ty
58 }
59
60 pub fn schema_requirement(&self) -> &schema::Requirement {
61 &self.log_schema_requirement
62 }
63
64 pub fn new(ty: DataType) -> Self {
65 Self {
66 ty,
67 log_schema_requirement: schema::Requirement::empty(),
68 }
69 }
70
71 pub fn log() -> Self {
72 Self {
73 ty: DataType::Log,
74 log_schema_requirement: schema::Requirement::empty(),
75 }
76 }
77
78 pub fn metric() -> Self {
79 Self {
80 ty: DataType::Metric,
81 log_schema_requirement: schema::Requirement::empty(),
82 }
83 }
84
85 pub fn trace() -> Self {
86 Self {
87 ty: DataType::Trace,
88 log_schema_requirement: schema::Requirement::empty(),
89 }
90 }
91
92 pub fn all() -> Self {
93 Self {
94 ty: DataType::all_bits(),
95 log_schema_requirement: schema::Requirement::empty(),
96 }
97 }
98
99 #[must_use]
101 pub fn with_schema_requirement(mut self, schema_requirement: schema::Requirement) -> Self {
102 self.log_schema_requirement = schema_requirement;
103 self
104 }
105}
106
107#[derive(Debug, Clone, PartialEq)]
108pub struct SourceOutput {
109 pub port: Option<String>,
110 pub ty: DataType,
111
112 pub schema_definition: Option<Arc<schema::Definition>>,
116}
117
118impl SourceOutput {
119 #[must_use]
123 pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
124 let schema_definition = ty
125 .contains(DataType::Log)
126 .then(|| Arc::new(schema_definition));
127
128 Self {
129 port: None,
130 ty,
131 schema_definition,
132 }
133 }
134
135 #[must_use]
140 pub fn new_metrics() -> Self {
141 Self {
142 port: None,
143 ty: DataType::Metric,
144 schema_definition: None,
145 }
146 }
147
148 #[must_use]
153 pub fn new_traces() -> Self {
154 Self {
155 port: None,
156 ty: DataType::Trace,
157 schema_definition: None,
158 }
159 }
160
161 #[must_use]
169 pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
170 use std::ops::Deref;
171
172 self.schema_definition.as_ref().map(|definition| {
173 if schema_enabled {
174 definition.deref().clone()
175 } else {
176 let mut new_definition =
177 schema::Definition::default_for_namespace(definition.log_namespaces());
178 new_definition.add_meanings(definition.meanings());
179 new_definition
180 }
181 })
182 }
183}
184
185impl SourceOutput {
186 #[must_use]
188 pub fn with_port(mut self, name: impl Into<String>) -> Self {
189 self.port = Some(name.into());
190 self
191 }
192}
193
194fn fmt_helper(
195 f: &mut fmt::Formatter<'_>,
196 maybe_port: Option<&String>,
197 data_type: DataType,
198) -> fmt::Result {
199 match maybe_port {
200 Some(port) => write!(f, "port: \"{port}\",",),
201 None => write!(f, "port: None,"),
202 }?;
203 write!(f, " types: {data_type}")
204}
205
206impl fmt::Display for SourceOutput {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 fmt_helper(f, self.port.as_ref(), self.ty)
209 }
210}
211
212#[derive(Debug, Clone, PartialEq)]
213pub struct TransformOutput {
214 pub port: Option<String>,
215 pub ty: DataType,
216
217 pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
222}
223
224impl fmt::Display for TransformOutput {
225 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
226 fmt_helper(f, self.port.as_ref(), self.ty)
227 }
228}
229
230impl TransformOutput {
231 #[must_use]
234 pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
235 Self {
236 port: None,
237 ty,
238 log_schema_definitions: schema_definitions,
239 }
240 }
241
242 #[must_use]
244 pub fn with_port(mut self, name: impl Into<String>) -> Self {
245 self.port = Some(name.into());
246 self
247 }
248
249 #[must_use]
257 pub fn schema_definitions(
258 &self,
259 schema_enabled: bool,
260 ) -> HashMap<OutputId, schema::Definition> {
261 if schema_enabled {
262 self.log_schema_definitions.clone()
263 } else {
264 self.log_schema_definitions
265 .iter()
266 .map(|(output, definition)| {
267 let mut new_definition =
268 schema::Definition::default_for_namespace(definition.log_namespaces());
269 new_definition.add_meanings(definition.meanings());
270 (output.clone(), new_definition)
271 })
272 .collect()
273 }
274 }
275}
276
277pub fn clone_input_definitions(
281 input_definitions: &[(OutputId, schema::Definition)],
282) -> HashMap<OutputId, schema::Definition> {
283 input_definitions
284 .iter()
285 .map(|(output, definition)| (output.clone(), definition.clone()))
286 .collect()
287}
288
289#[configurable_component]
295#[configurable(deprecated)]
296#[configurable(title = "Controls how acknowledgements are handled by this source.")]
297#[configurable(
298 description = "This setting is **deprecated** in favor of enabling `acknowledgements` at the [global][global_acks] or sink level.
299
300Enabling or disabling acknowledgements at the source level has **no effect** on acknowledgement behavior.
301
302See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
303
304[global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
305[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
306)]
307#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
308pub struct SourceAcknowledgementsConfig {
309 enabled: Option<bool>,
311}
312
313impl SourceAcknowledgementsConfig {
314 pub const DEFAULT: Self = Self { enabled: None };
315
316 #[must_use]
317 pub fn merge_default(&self, other: &Self) -> Self {
318 let enabled = self.enabled.or(other.enabled);
319 Self { enabled }
320 }
321
322 pub fn enabled(&self) -> bool {
323 self.enabled.unwrap_or(false)
324 }
325}
326
327impl From<Option<bool>> for SourceAcknowledgementsConfig {
328 fn from(enabled: Option<bool>) -> Self {
329 Self { enabled }
330 }
331}
332
333impl From<bool> for SourceAcknowledgementsConfig {
334 fn from(enabled: bool) -> Self {
335 Some(enabled).into()
336 }
337}
338
339impl From<SourceAcknowledgementsConfig> for AcknowledgementsConfig {
340 fn from(config: SourceAcknowledgementsConfig) -> Self {
341 Self {
342 enabled: config.enabled,
343 }
344 }
345}
346
347#[configurable_component]
349#[configurable(title = "Controls how acknowledgements are handled for this sink.")]
350#[configurable(
351 description = "See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
352
353[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
354)]
355#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
356pub struct AcknowledgementsConfig {
357 enabled: Option<bool>,
368}
369
370impl AcknowledgementsConfig {
371 pub const DEFAULT: Self = Self { enabled: None };
372
373 #[must_use]
374 pub fn merge_default(&self, other: &Self) -> Self {
375 let enabled = self.enabled.or(other.enabled);
376 Self { enabled }
377 }
378
379 pub fn enabled(&self) -> bool {
380 self.enabled.unwrap_or(false)
381 }
382}
383
384impl From<Option<bool>> for AcknowledgementsConfig {
385 fn from(enabled: Option<bool>) -> Self {
386 Self { enabled }
387 }
388}
389
390impl From<bool> for AcknowledgementsConfig {
391 fn from(enabled: bool) -> Self {
392 Some(enabled).into()
393 }
394}
395
396#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq, Default)]
397pub enum LogNamespace {
398 Vector,
403
404 #[default]
409 Legacy,
410}
411
412impl From<bool> for LogNamespace {
415 fn from(x: bool) -> Self {
416 if x {
417 LogNamespace::Vector
418 } else {
419 LogNamespace::Legacy
420 }
421 }
422}
423
424pub const NO_LEGACY_KEY: Option<LegacyKey<&'static str>> = None;
426
427pub enum LegacyKey<T> {
428 Overwrite(T),
430 InsertIfEmpty(T),
432}
433
434impl LogNamespace {
435 pub fn insert_source_metadata<'a>(
439 &self,
440 source_name: &'a str,
441 log: &mut LogEvent,
442 legacy_key: Option<LegacyKey<impl ValuePath<'a>>>,
443 metadata_key: impl ValuePath<'a>,
444 value: impl Into<Value>,
445 ) {
446 match self {
447 LogNamespace::Vector => {
448 log.metadata_mut()
449 .value_mut()
450 .insert(path!(source_name).concat(metadata_key), value);
451 }
452 LogNamespace::Legacy => match legacy_key {
453 None => { }
454 Some(LegacyKey::Overwrite(key)) => {
455 log.insert((PathPrefix::Event, key), value);
456 }
457 Some(LegacyKey::InsertIfEmpty(key)) => {
458 log.try_insert((PathPrefix::Event, key), value);
459 }
460 },
461 }
462 }
463
464 pub fn get_source_metadata<'a, 'b>(
468 &self,
469 source_name: &'a str,
470 log: &'b LogEvent,
471 legacy_key: impl ValuePath<'a>,
472 metadata_key: impl ValuePath<'a>,
473 ) -> Option<&'b Value> {
474 match self {
475 LogNamespace::Vector => log
476 .metadata()
477 .value()
478 .get(path!(source_name).concat(metadata_key)),
479 LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
480 }
481 }
482
483 pub fn insert_standard_vector_source_metadata(
489 &self,
490 log: &mut LogEvent,
491 source_name: &'static str,
492 now: DateTime<Utc>,
493 ) {
494 self.insert_vector_metadata(
495 log,
496 log_schema().source_type_key(),
497 path!("source_type"),
498 Bytes::from_static(source_name.as_bytes()),
499 );
500 self.insert_vector_metadata(
501 log,
502 log_schema().timestamp_key(),
503 path!("ingest_timestamp"),
504 now,
505 );
506 }
507
508 pub fn insert_vector_metadata<'a>(
513 &self,
514 log: &mut LogEvent,
515 legacy_key: Option<impl ValuePath<'a>>,
516 metadata_key: impl ValuePath<'a>,
517 value: impl Into<Value>,
518 ) {
519 match self {
520 LogNamespace::Vector => {
521 log.metadata_mut()
522 .value_mut()
523 .insert(path!("vector").concat(metadata_key), value);
524 }
525 LogNamespace::Legacy => {
526 if let Some(legacy_key) = legacy_key {
527 log.try_insert((PathPrefix::Event, legacy_key), value);
528 }
529 }
530 }
531 }
532
533 pub fn get_vector_metadata<'a, 'b>(
537 &self,
538 log: &'b LogEvent,
539 legacy_key: impl ValuePath<'a>,
540 metadata_key: impl ValuePath<'a>,
541 ) -> Option<&'b Value> {
542 match self {
543 LogNamespace::Vector => log
544 .metadata()
545 .value()
546 .get(path!("vector").concat(metadata_key)),
547 LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
548 }
549 }
550
551 pub fn new_log_from_data(&self, value: impl Into<Value>) -> LogEvent {
552 match self {
553 LogNamespace::Vector | LogNamespace::Legacy => LogEvent::from(value.into()),
554 }
555 }
556
557 #[must_use]
559 pub fn merge(&self, override_value: Option<impl Into<LogNamespace>>) -> LogNamespace {
560 override_value.map_or(*self, Into::into)
561 }
562}
563
564#[cfg(test)]
565mod test {
566 use chrono::Utc;
567 use lookup::{OwnedTargetPath, event_path, owned_value_path};
568 use vector_common::btreemap;
569 use vrl::value::Kind;
570
571 use super::*;
572 use crate::event::LogEvent;
573
574 #[test]
575 fn test_insert_standard_vector_source_metadata() {
576 let mut schema = LogSchema::default();
577 schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
578 "a", "b", "c", "d"
579 ))));
580 init_log_schema(schema, false);
581
582 let namespace = LogNamespace::Legacy;
583 let mut event = LogEvent::from("log");
584 namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now());
585
586 assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
587 }
588
589 #[test]
590 fn test_source_definitions_legacy() {
591 let definition = schema::Definition::empty_legacy_namespace()
592 .with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
593 .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
594 let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
595
596 let valid_event = LogEvent::from(Value::from(btreemap! {
597 "zork" => "norknoog",
598 "nork" => 32
599 }))
600 .into();
601
602 let invalid_event = LogEvent::from(Value::from(btreemap! {
603 "nork" => 32
604 }))
605 .into();
606
607 let new_definition = output.schema_definition(true).unwrap();
609
610 assert_eq!(
612 Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
613 new_definition.meaning_path("zork")
614 );
615
616 new_definition.assert_valid_for_event(&valid_event);
618 new_definition.assert_invalid_for_event(&invalid_event);
619
620 assert_eq!(
622 Some(
623 schema::Definition::default_legacy_namespace()
624 .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork")
625 ),
626 output.schema_definition(false)
627 );
628 }
629
630 #[test]
631 fn test_source_definitons_vector() {
632 let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
633 .with_metadata_field(
634 &owned_value_path!("vector", "zork"),
635 Kind::integer(),
636 Some("zork"),
637 )
638 .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
639
640 let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
641
642 let mut valid_event = LogEvent::from(Value::from(btreemap! {
643 "nork" => 32
644 }));
645
646 valid_event
647 .metadata_mut()
648 .value_mut()
649 .insert(path!("vector").concat("zork"), 32);
650
651 let valid_event = valid_event.into();
652
653 let mut invalid_event = LogEvent::from(Value::from(btreemap! {
654 "nork" => 32
655 }));
656
657 invalid_event
658 .metadata_mut()
659 .value_mut()
660 .insert(path!("vector").concat("zork"), "noog");
661
662 let invalid_event = invalid_event.into();
663
664 let new_definition = output.schema_definition(true).unwrap();
666
667 assert_eq!(
669 Some(&OwnedTargetPath::metadata(owned_value_path!(
670 "vector", "zork"
671 ))),
672 new_definition.meaning_path("zork")
673 );
674
675 new_definition.assert_valid_for_event(&valid_event);
677 new_definition.assert_invalid_for_event(&invalid_event);
678
679 let new_definition = output.schema_definition(false).unwrap();
681
682 assert_eq!(
684 Some(&OwnedTargetPath::metadata(owned_value_path!(
685 "vector", "zork"
686 ))),
687 new_definition.meaning_path("zork")
688 );
689
690 new_definition.assert_valid_for_event(&valid_event);
692 new_definition.assert_valid_for_event(&invalid_event);
693 }
694
695 #[test]
696 fn test_new_log_source_ignores_definition_with_metric_data_type() {
697 let definition = schema::Definition::any();
698 let output = SourceOutput::new_maybe_logs(DataType::Metric, definition);
699 assert_eq!(output.schema_definition(true), None);
700 }
701
702 #[test]
703 fn test_new_log_source_uses_definition_with_log_data_type() {
704 let definition = schema::Definition::any();
705 let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
706 assert_eq!(output.schema_definition(true), Some(definition));
707 }
708}