1use std::sync::Arc;
2use std::{collections::HashMap, fmt, num::NonZeroUsize};
3
4use bitmask_enum::bitmask;
5use bytes::Bytes;
6use chrono::{DateTime, Utc};
7
8mod global_options;
9mod log_schema;
10pub(crate) mod metrics_expiration;
11pub mod output_id;
12pub mod proxy;
13mod telemetry;
14
15use crate::event::LogEvent;
16pub use global_options::{GlobalOptions, WildcardMatching};
17pub use log_schema::{init_log_schema, log_schema, LogSchema};
18use lookup::{lookup_v2::ValuePath, path, PathPrefix};
19pub use output_id::OutputId;
20use serde::{Deserialize, Serialize};
21pub use telemetry::{init_telemetry, telemetry, Tags, Telemetry};
22pub use vector_common::config::ComponentKey;
23use vector_config::configurable_component;
24use vrl::value::Value;
25
26use crate::schema;
27
28pub const MEMORY_BUFFER_DEFAULT_MAX_EVENTS: NonZeroUsize =
29 vector_buffers::config::memory_buffer_default_max_events();
30
31#[bitmask(u8)]
34#[bitmask_config(flags_iter)]
35pub enum DataType {
36 Log,
37 Metric,
38 Trace,
39}
40
41impl fmt::Display for DataType {
42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
43 f.debug_list()
44 .entries(
45 Self::flags().filter_map(|&(name, value)| self.contains(value).then_some(name)),
46 )
47 .finish()
48 }
49}
50
51#[derive(Debug, Clone, PartialEq)]
52pub struct Input {
53 ty: DataType,
54 log_schema_requirement: schema::Requirement,
55}
56
57impl Input {
58 pub fn data_type(&self) -> DataType {
59 self.ty
60 }
61
62 pub fn schema_requirement(&self) -> &schema::Requirement {
63 &self.log_schema_requirement
64 }
65
66 pub fn new(ty: DataType) -> Self {
67 Self {
68 ty,
69 log_schema_requirement: schema::Requirement::empty(),
70 }
71 }
72
73 pub fn log() -> Self {
74 Self {
75 ty: DataType::Log,
76 log_schema_requirement: schema::Requirement::empty(),
77 }
78 }
79
80 pub fn metric() -> Self {
81 Self {
82 ty: DataType::Metric,
83 log_schema_requirement: schema::Requirement::empty(),
84 }
85 }
86
87 pub fn trace() -> Self {
88 Self {
89 ty: DataType::Trace,
90 log_schema_requirement: schema::Requirement::empty(),
91 }
92 }
93
94 pub fn all() -> Self {
95 Self {
96 ty: DataType::all_bits(),
97 log_schema_requirement: schema::Requirement::empty(),
98 }
99 }
100
101 #[must_use]
103 pub fn with_schema_requirement(mut self, schema_requirement: schema::Requirement) -> Self {
104 self.log_schema_requirement = schema_requirement;
105 self
106 }
107}
108
109#[derive(Debug, Clone, PartialEq)]
110pub struct SourceOutput {
111 pub port: Option<String>,
112 pub ty: DataType,
113
114 pub schema_definition: Option<Arc<schema::Definition>>,
118}
119
120impl SourceOutput {
121 #[must_use]
125 pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
126 let schema_definition = ty
127 .contains(DataType::Log)
128 .then(|| Arc::new(schema_definition));
129
130 Self {
131 port: None,
132 ty,
133 schema_definition,
134 }
135 }
136
137 #[must_use]
142 pub fn new_metrics() -> Self {
143 Self {
144 port: None,
145 ty: DataType::Metric,
146 schema_definition: None,
147 }
148 }
149
150 #[must_use]
155 pub fn new_traces() -> Self {
156 Self {
157 port: None,
158 ty: DataType::Trace,
159 schema_definition: None,
160 }
161 }
162
163 #[must_use]
171 pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
172 use std::ops::Deref;
173
174 self.schema_definition.as_ref().map(|definition| {
175 if schema_enabled {
176 definition.deref().clone()
177 } else {
178 let mut new_definition =
179 schema::Definition::default_for_namespace(definition.log_namespaces());
180 new_definition.add_meanings(definition.meanings());
181 new_definition
182 }
183 })
184 }
185}
186
187impl SourceOutput {
188 #[must_use]
190 pub fn with_port(mut self, name: impl Into<String>) -> Self {
191 self.port = Some(name.into());
192 self
193 }
194}
195
196fn fmt_helper(
197 f: &mut fmt::Formatter<'_>,
198 maybe_port: Option<&String>,
199 data_type: DataType,
200) -> fmt::Result {
201 match maybe_port {
202 Some(port) => write!(f, "port: \"{port}\",",),
203 None => write!(f, "port: None,"),
204 }?;
205 write!(f, " types: {data_type}")
206}
207
208impl fmt::Display for SourceOutput {
209 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
210 fmt_helper(f, self.port.as_ref(), self.ty)
211 }
212}
213
214#[derive(Debug, Clone, PartialEq)]
215pub struct TransformOutput {
216 pub port: Option<String>,
217 pub ty: DataType,
218
219 pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
224}
225
226impl fmt::Display for TransformOutput {
227 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
228 fmt_helper(f, self.port.as_ref(), self.ty)
229 }
230}
231
232impl TransformOutput {
233 #[must_use]
236 pub fn new(ty: DataType, schema_definitions: HashMap<OutputId, schema::Definition>) -> Self {
237 Self {
238 port: None,
239 ty,
240 log_schema_definitions: schema_definitions,
241 }
242 }
243
244 #[must_use]
246 pub fn with_port(mut self, name: impl Into<String>) -> Self {
247 self.port = Some(name.into());
248 self
249 }
250
251 #[must_use]
259 pub fn schema_definitions(
260 &self,
261 schema_enabled: bool,
262 ) -> HashMap<OutputId, schema::Definition> {
263 if schema_enabled {
264 self.log_schema_definitions.clone()
265 } else {
266 self.log_schema_definitions
267 .iter()
268 .map(|(output, definition)| {
269 let mut new_definition =
270 schema::Definition::default_for_namespace(definition.log_namespaces());
271 new_definition.add_meanings(definition.meanings());
272 (output.clone(), new_definition)
273 })
274 .collect()
275 }
276 }
277}
278
279pub fn clone_input_definitions(
283 input_definitions: &[(OutputId, schema::Definition)],
284) -> HashMap<OutputId, schema::Definition> {
285 input_definitions
286 .iter()
287 .map(|(output, definition)| (output.clone(), definition.clone()))
288 .collect()
289}
290
291#[configurable_component]
297#[configurable(deprecated)]
298#[configurable(title = "Controls how acknowledgements are handled by this source.")]
299#[configurable(
300 description = "This setting is **deprecated** in favor of enabling `acknowledgements` at the [global][global_acks] or sink level.
301
302Enabling or disabling acknowledgements at the source level has **no effect** on acknowledgement behavior.
303
304See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
305
306[global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements
307[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
308)]
309#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
310pub struct SourceAcknowledgementsConfig {
311 enabled: Option<bool>,
313}
314
315impl SourceAcknowledgementsConfig {
316 pub const DEFAULT: Self = Self { enabled: None };
317
318 #[must_use]
319 pub fn merge_default(&self, other: &Self) -> Self {
320 let enabled = self.enabled.or(other.enabled);
321 Self { enabled }
322 }
323
324 pub fn enabled(&self) -> bool {
325 self.enabled.unwrap_or(false)
326 }
327}
328
329impl From<Option<bool>> for SourceAcknowledgementsConfig {
330 fn from(enabled: Option<bool>) -> Self {
331 Self { enabled }
332 }
333}
334
335impl From<bool> for SourceAcknowledgementsConfig {
336 fn from(enabled: bool) -> Self {
337 Some(enabled).into()
338 }
339}
340
341impl From<SourceAcknowledgementsConfig> for AcknowledgementsConfig {
342 fn from(config: SourceAcknowledgementsConfig) -> Self {
343 Self {
344 enabled: config.enabled,
345 }
346 }
347}
348
349#[configurable_component]
351#[configurable(title = "Controls how acknowledgements are handled for this sink.")]
352#[configurable(
353 description = "See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled.
354
355[e2e_acks]: https://vector.dev/docs/architecture/end-to-end-acknowledgements/"
356)]
357#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
358pub struct AcknowledgementsConfig {
359 enabled: Option<bool>,
370}
371
372impl AcknowledgementsConfig {
373 pub const DEFAULT: Self = Self { enabled: None };
374
375 #[must_use]
376 pub fn merge_default(&self, other: &Self) -> Self {
377 let enabled = self.enabled.or(other.enabled);
378 Self { enabled }
379 }
380
381 pub fn enabled(&self) -> bool {
382 self.enabled.unwrap_or(false)
383 }
384}
385
386impl From<Option<bool>> for AcknowledgementsConfig {
387 fn from(enabled: Option<bool>) -> Self {
388 Self { enabled }
389 }
390}
391
392impl From<bool> for AcknowledgementsConfig {
393 fn from(enabled: bool) -> Self {
394 Some(enabled).into()
395 }
396}
397
398#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Eq)]
399pub enum LogNamespace {
400 Vector,
405
406 Legacy,
411}
412
413impl From<bool> for LogNamespace {
416 fn from(x: bool) -> Self {
417 if x {
418 LogNamespace::Vector
419 } else {
420 LogNamespace::Legacy
421 }
422 }
423}
424
425impl Default for LogNamespace {
426 fn default() -> Self {
427 Self::Legacy
428 }
429}
430
431pub const NO_LEGACY_KEY: Option<LegacyKey<&'static str>> = None;
433
434pub enum LegacyKey<T> {
435 Overwrite(T),
437 InsertIfEmpty(T),
439}
440
441impl LogNamespace {
442 pub fn insert_source_metadata<'a>(
446 &self,
447 source_name: &'a str,
448 log: &mut LogEvent,
449 legacy_key: Option<LegacyKey<impl ValuePath<'a>>>,
450 metadata_key: impl ValuePath<'a>,
451 value: impl Into<Value>,
452 ) {
453 match self {
454 LogNamespace::Vector => {
455 log.metadata_mut()
456 .value_mut()
457 .insert(path!(source_name).concat(metadata_key), value);
458 }
459 LogNamespace::Legacy => match legacy_key {
460 None => { }
461 Some(LegacyKey::Overwrite(key)) => {
462 log.insert((PathPrefix::Event, key), value);
463 }
464 Some(LegacyKey::InsertIfEmpty(key)) => {
465 log.try_insert((PathPrefix::Event, key), value);
466 }
467 },
468 }
469 }
470
471 pub fn get_source_metadata<'a, 'b>(
475 &self,
476 source_name: &'a str,
477 log: &'b LogEvent,
478 legacy_key: impl ValuePath<'a>,
479 metadata_key: impl ValuePath<'a>,
480 ) -> Option<&'b Value> {
481 match self {
482 LogNamespace::Vector => log
483 .metadata()
484 .value()
485 .get(path!(source_name).concat(metadata_key)),
486 LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
487 }
488 }
489
490 pub fn insert_standard_vector_source_metadata(
496 &self,
497 log: &mut LogEvent,
498 source_name: &'static str,
499 now: DateTime<Utc>,
500 ) {
501 self.insert_vector_metadata(
502 log,
503 log_schema().source_type_key(),
504 path!("source_type"),
505 Bytes::from_static(source_name.as_bytes()),
506 );
507 self.insert_vector_metadata(
508 log,
509 log_schema().timestamp_key(),
510 path!("ingest_timestamp"),
511 now,
512 );
513 }
514
515 pub fn insert_vector_metadata<'a>(
520 &self,
521 log: &mut LogEvent,
522 legacy_key: Option<impl ValuePath<'a>>,
523 metadata_key: impl ValuePath<'a>,
524 value: impl Into<Value>,
525 ) {
526 match self {
527 LogNamespace::Vector => {
528 log.metadata_mut()
529 .value_mut()
530 .insert(path!("vector").concat(metadata_key), value);
531 }
532 LogNamespace::Legacy => {
533 if let Some(legacy_key) = legacy_key {
534 log.try_insert((PathPrefix::Event, legacy_key), value);
535 }
536 }
537 }
538 }
539
540 pub fn get_vector_metadata<'a, 'b>(
544 &self,
545 log: &'b LogEvent,
546 legacy_key: impl ValuePath<'a>,
547 metadata_key: impl ValuePath<'a>,
548 ) -> Option<&'b Value> {
549 match self {
550 LogNamespace::Vector => log
551 .metadata()
552 .value()
553 .get(path!("vector").concat(metadata_key)),
554 LogNamespace::Legacy => log.get((PathPrefix::Event, legacy_key)),
555 }
556 }
557
558 pub fn new_log_from_data(&self, value: impl Into<Value>) -> LogEvent {
559 match self {
560 LogNamespace::Vector | LogNamespace::Legacy => LogEvent::from(value.into()),
561 }
562 }
563
564 #[must_use]
566 pub fn merge(&self, override_value: Option<impl Into<LogNamespace>>) -> LogNamespace {
567 override_value.map_or(*self, Into::into)
568 }
569}
570
571#[cfg(test)]
572mod test {
573 use super::*;
574 use crate::event::LogEvent;
575 use chrono::Utc;
576 use lookup::{event_path, owned_value_path, OwnedTargetPath};
577 use vector_common::btreemap;
578 use vrl::value::Kind;
579
580 #[test]
581 fn test_insert_standard_vector_source_metadata() {
582 let mut schema = LogSchema::default();
583 schema.set_source_type_key(Some(OwnedTargetPath::event(owned_value_path!(
584 "a", "b", "c", "d"
585 ))));
586 init_log_schema(schema, false);
587
588 let namespace = LogNamespace::Legacy;
589 let mut event = LogEvent::from("log");
590 namespace.insert_standard_vector_source_metadata(&mut event, "source", Utc::now());
591
592 assert!(event.get(event_path!("a", "b", "c", "d")).is_some());
593 }
594
595 #[test]
596 fn test_source_definitions_legacy() {
597 let definition = schema::Definition::empty_legacy_namespace()
598 .with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
599 .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
600 let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
601
602 let valid_event = LogEvent::from(Value::from(btreemap! {
603 "zork" => "norknoog",
604 "nork" => 32
605 }))
606 .into();
607
608 let invalid_event = LogEvent::from(Value::from(btreemap! {
609 "nork" => 32
610 }))
611 .into();
612
613 let new_definition = output.schema_definition(true).unwrap();
615
616 assert_eq!(
618 Some(&OwnedTargetPath::event(owned_value_path!("zork"))),
619 new_definition.meaning_path("zork")
620 );
621
622 new_definition.assert_valid_for_event(&valid_event);
624 new_definition.assert_invalid_for_event(&invalid_event);
625
626 assert_eq!(
628 Some(
629 schema::Definition::default_legacy_namespace()
630 .with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork")
631 ),
632 output.schema_definition(false)
633 );
634 }
635
636 #[test]
637 fn test_source_definitons_vector() {
638 let definition = schema::Definition::default_for_namespace(&[LogNamespace::Vector].into())
639 .with_metadata_field(
640 &owned_value_path!("vector", "zork"),
641 Kind::integer(),
642 Some("zork"),
643 )
644 .with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
645
646 let output = SourceOutput::new_maybe_logs(DataType::Log, definition);
647
648 let mut valid_event = LogEvent::from(Value::from(btreemap! {
649 "nork" => 32
650 }));
651
652 valid_event
653 .metadata_mut()
654 .value_mut()
655 .insert(path!("vector").concat("zork"), 32);
656
657 let valid_event = valid_event.into();
658
659 let mut invalid_event = LogEvent::from(Value::from(btreemap! {
660 "nork" => 32
661 }));
662
663 invalid_event
664 .metadata_mut()
665 .value_mut()
666 .insert(path!("vector").concat("zork"), "noog");
667
668 let invalid_event = invalid_event.into();
669
670 let new_definition = output.schema_definition(true).unwrap();
672
673 assert_eq!(
675 Some(&OwnedTargetPath::metadata(owned_value_path!(
676 "vector", "zork"
677 ))),
678 new_definition.meaning_path("zork")
679 );
680
681 new_definition.assert_valid_for_event(&valid_event);
683 new_definition.assert_invalid_for_event(&invalid_event);
684
685 let new_definition = output.schema_definition(false).unwrap();
687
688 assert_eq!(
690 Some(&OwnedTargetPath::metadata(owned_value_path!(
691 "vector", "zork"
692 ))),
693 new_definition.meaning_path("zork")
694 );
695
696 new_definition.assert_valid_for_event(&valid_event);
698 new_definition.assert_valid_for_event(&invalid_event);
699 }
700
701 #[test]
702 fn test_new_log_source_ignores_definition_with_metric_data_type() {
703 let definition = schema::Definition::any();
704 let output = SourceOutput::new_maybe_logs(DataType::Metric, definition);
705 assert_eq!(output.schema_definition(true), None);
706 }
707
708 #[test]
709 fn test_new_log_source_uses_definition_with_log_data_type() {
710 let definition = schema::Definition::any();
711 let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
712 assert_eq!(output.schema_definition(true), Some(definition));
713 }
714}