vector/enrichment_tables/memory/
config.rs1use std::num::NonZeroU64;
2use std::sync::Arc;
3
4use crate::sinks::Healthcheck;
5use crate::sources::Source;
6use crate::{config::SinkContext, enrichment_tables::memory::Memory};
7use async_trait::async_trait;
8use futures::{future, FutureExt};
9use tokio::sync::Mutex;
10use vector_lib::config::{AcknowledgementsConfig, DataType, Input, LogNamespace};
11use vector_lib::enrichment::Table;
12use vector_lib::id::ComponentKey;
13use vector_lib::schema::{self};
14use vector_lib::{configurable::configurable_component, sink::VectorSink};
15use vrl::path::OwnedTargetPath;
16use vrl::value::Kind;
17
18use crate::config::{EnrichmentTableConfig, SinkConfig, SourceConfig, SourceContext, SourceOutput};
19
20use super::internal_events::InternalMetricsConfig;
21use super::source::MemorySourceConfig;
22
23#[configurable_component(enrichment_table("memory"))]
25#[derive(Clone)]
26pub struct MemoryConfig {
27 #[serde(default = "default_ttl")]
31 pub ttl: u64,
32 #[serde(default = "default_scan_interval")]
36 pub scan_interval: NonZeroU64,
37 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
45 pub flush_interval: Option<u64>,
46 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
51 pub max_byte_size: Option<u64>,
52 #[configurable(metadata(docs::hidden))]
54 #[serde(default)]
55 pub log_namespace: Option<bool>,
56 #[configurable(derived)]
58 #[serde(default)]
59 pub internal_metrics: InternalMetricsConfig,
60 #[configurable(derived)]
62 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
63 pub source_config: Option<MemorySourceConfig>,
64
65 #[serde(skip)]
66 memory: Arc<Mutex<Option<Box<Memory>>>>,
67}
68
69impl PartialEq for MemoryConfig {
70 fn eq(&self, other: &Self) -> bool {
71 self.ttl == other.ttl
72 && self.scan_interval == other.scan_interval
73 && self.flush_interval == other.flush_interval
74 }
75}
76impl Eq for MemoryConfig {}
77
78impl Default for MemoryConfig {
79 fn default() -> Self {
80 Self {
81 ttl: default_ttl(),
82 scan_interval: default_scan_interval(),
83 flush_interval: None,
84 memory: Arc::new(Mutex::new(None)),
85 max_byte_size: None,
86 log_namespace: None,
87 source_config: None,
88 internal_metrics: InternalMetricsConfig::default(),
89 }
90 }
91}
92
93const fn default_ttl() -> u64 {
94 600
95}
96
97const fn default_scan_interval() -> NonZeroU64 {
98 unsafe { NonZeroU64::new_unchecked(30) }
99}
100
101impl MemoryConfig {
102 pub(super) async fn get_or_build_memory(&self) -> Memory {
103 let mut boxed_memory = self.memory.lock().await;
104 *boxed_memory
105 .get_or_insert_with(|| Box::new(Memory::new(self.clone())))
106 .clone()
107 }
108}
109
110impl EnrichmentTableConfig for MemoryConfig {
111 async fn build(
112 &self,
113 _globals: &crate::config::GlobalOptions,
114 ) -> crate::Result<Box<dyn Table + Send + Sync>> {
115 Ok(Box::new(self.get_or_build_memory().await))
116 }
117
118 fn sink_config(
119 &self,
120 default_key: &ComponentKey,
121 ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
122 Some((default_key.clone(), Box::new(self.clone())))
123 }
124
125 fn source_config(
126 &self,
127 _default_key: &ComponentKey,
128 ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
129 let Some(source_config) = &self.source_config else {
130 return None;
131 };
132 Some((
133 source_config.source_key.clone().into(),
134 Box::new(self.clone()),
135 ))
136 }
137}
138
139#[async_trait]
140#[typetag::serde(name = "memory_enrichment_table")]
141impl SinkConfig for MemoryConfig {
142 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
143 let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await);
144
145 Ok((sink, future::ok(()).boxed()))
146 }
147
148 fn input(&self) -> Input {
149 Input::log()
150 }
151
152 fn acknowledgements(&self) -> &AcknowledgementsConfig {
153 &AcknowledgementsConfig::DEFAULT
154 }
155}
156
157#[async_trait]
158#[typetag::serde(name = "memory_enrichment_table")]
159impl SourceConfig for MemoryConfig {
160 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
161 let memory = self.get_or_build_memory().await;
162
163 let log_namespace = cx.log_namespace(self.log_namespace);
164
165 Ok(Box::pin(
166 memory.as_source(cx.shutdown, cx.out, log_namespace).run(),
167 ))
168 }
169
170 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
171 let log_namespace = global_log_namespace.merge(self.log_namespace);
172 let schema_definition = match log_namespace {
173 LogNamespace::Legacy => schema::Definition::default_legacy_namespace(),
174 LogNamespace::Vector => {
175 schema::Definition::new_with_default_metadata(Kind::any_object(), [log_namespace])
176 .with_meaning(OwnedTargetPath::event_root(), "message")
177 }
178 }
179 .with_standard_vector_source_metadata();
180
181 vec![SourceOutput::new_maybe_logs(
182 DataType::Log,
183 schema_definition,
184 )]
185 }
186
187 fn can_acknowledge(&self) -> bool {
188 false
189 }
190}
191
192impl std::fmt::Debug for MemoryConfig {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("MemoryConfig")
195 .field("ttl", &self.ttl)
196 .field("scan_interval", &self.scan_interval)
197 .field("flush_interval", &self.flush_interval)
198 .field("max_byte_size", &self.max_byte_size)
199 .finish()
200 }
201}
202
203impl_generate_config_from_default!(MemoryConfig);