vector/enrichment_tables/memory/
config.rs1use std::{num::NonZeroU64, sync::Arc};
2
3use async_trait::async_trait;
4use futures::{FutureExt, future};
5use tokio::sync::Mutex;
6use vector_lib::{
7 config::{AcknowledgementsConfig, DataType, Input, LogNamespace},
8 configurable::configurable_component,
9 enrichment::Table,
10 id::ComponentKey,
11 lookup::lookup_v2::OptionalValuePath,
12 schema::{self},
13 sink::VectorSink,
14};
15use vrl::{path::OwnedTargetPath, value::Kind};
16
17use super::{Memory, internal_events::InternalMetricsConfig, source::EXPIRED_ROUTE};
18use crate::{
19 config::{
20 EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
21 },
22 sinks::Healthcheck,
23 sources::Source,
24};
25
26#[configurable_component(enrichment_table("memory"))]
28#[derive(Clone)]
29pub struct MemoryConfig {
30 #[serde(default = "default_ttl")]
34 pub ttl: u64,
35 #[serde(default = "default_scan_interval")]
39 pub scan_interval: NonZeroU64,
40 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
48 pub flush_interval: Option<u64>,
49 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
54 pub max_byte_size: Option<u64>,
55 #[configurable(metadata(docs::hidden))]
57 #[serde(default)]
58 pub log_namespace: Option<bool>,
59 #[configurable(derived)]
61 #[serde(default)]
62 pub internal_metrics: InternalMetricsConfig,
63 #[configurable(derived)]
65 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
66 pub source_config: Option<MemorySourceConfig>,
67 #[configurable(derived)]
69 #[serde(default)]
70 pub ttl_field: OptionalValuePath,
71
72 #[serde(skip)]
73 memory: Arc<Mutex<Option<Box<Memory>>>>,
74}
75
76#[configurable_component]
78#[derive(Clone, Debug, PartialEq, Eq)]
79#[serde(deny_unknown_fields)]
80pub struct MemorySourceConfig {
81 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
83 pub export_interval: Option<NonZeroU64>,
84 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
89 pub export_batch_size: Option<u64>,
90 #[serde(default = "crate::serde::default_false")]
95 pub remove_after_export: bool,
96 #[serde(default = "crate::serde::default_false")]
99 pub export_expired_items: bool,
100 pub source_key: String,
103}
104
105impl PartialEq for MemoryConfig {
106 fn eq(&self, other: &Self) -> bool {
107 self.ttl == other.ttl
108 && self.scan_interval == other.scan_interval
109 && self.flush_interval == other.flush_interval
110 }
111}
112impl Eq for MemoryConfig {}
113
114impl Default for MemoryConfig {
115 fn default() -> Self {
116 Self {
117 ttl: default_ttl(),
118 scan_interval: default_scan_interval(),
119 flush_interval: None,
120 memory: Arc::new(Mutex::new(None)),
121 max_byte_size: None,
122 log_namespace: None,
123 source_config: None,
124 internal_metrics: InternalMetricsConfig::default(),
125 ttl_field: OptionalValuePath::none(),
126 }
127 }
128}
129
130const fn default_ttl() -> u64 {
131 600
132}
133
134const fn default_scan_interval() -> NonZeroU64 {
135 unsafe { NonZeroU64::new_unchecked(30) }
136}
137
138impl MemoryConfig {
139 pub(super) async fn get_or_build_memory(&self) -> Memory {
140 let mut boxed_memory = self.memory.lock().await;
141 *boxed_memory
142 .get_or_insert_with(|| Box::new(Memory::new(self.clone())))
143 .clone()
144 }
145}
146
147impl EnrichmentTableConfig for MemoryConfig {
148 async fn build(
149 &self,
150 _globals: &crate::config::GlobalOptions,
151 ) -> crate::Result<Box<dyn Table + Send + Sync>> {
152 Ok(Box::new(self.get_or_build_memory().await))
153 }
154
155 fn sink_config(
156 &self,
157 default_key: &ComponentKey,
158 ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
159 Some((default_key.clone(), Box::new(self.clone())))
160 }
161
162 fn source_config(
163 &self,
164 _default_key: &ComponentKey,
165 ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
166 let Some(source_config) = &self.source_config else {
167 return None;
168 };
169 Some((
170 source_config.source_key.clone().into(),
171 Box::new(self.clone()),
172 ))
173 }
174}
175
176#[async_trait]
177#[typetag::serde(name = "memory_enrichment_table")]
178impl SinkConfig for MemoryConfig {
179 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
180 let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await);
181
182 Ok((sink, future::ok(()).boxed()))
183 }
184
185 fn input(&self) -> Input {
186 Input::log()
187 }
188
189 fn acknowledgements(&self) -> &AcknowledgementsConfig {
190 &AcknowledgementsConfig::DEFAULT
191 }
192}
193
194#[async_trait]
195#[typetag::serde(name = "memory_enrichment_table")]
196impl SourceConfig for MemoryConfig {
197 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
198 let memory = self.get_or_build_memory().await;
199
200 let log_namespace = cx.log_namespace(self.log_namespace);
201
202 Ok(Box::pin(
203 memory.as_source(cx.shutdown, cx.out, log_namespace).run(),
204 ))
205 }
206
207 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
208 let log_namespace = global_log_namespace.merge(self.log_namespace);
209 let schema_definition = match log_namespace {
210 LogNamespace::Legacy => schema::Definition::default_legacy_namespace(),
211 LogNamespace::Vector => {
212 schema::Definition::new_with_default_metadata(Kind::any_object(), [log_namespace])
213 .with_meaning(OwnedTargetPath::event_root(), "message")
214 }
215 }
216 .with_standard_vector_source_metadata();
217
218 if self
219 .source_config
220 .as_ref()
221 .map(|c| c.export_expired_items)
222 .unwrap_or_default()
223 {
224 vec![
225 SourceOutput::new_maybe_logs(DataType::Log, schema_definition.clone()),
226 SourceOutput::new_maybe_logs(DataType::Log, schema_definition)
227 .with_port(EXPIRED_ROUTE),
228 ]
229 } else {
230 vec![SourceOutput::new_maybe_logs(
231 DataType::Log,
232 schema_definition,
233 )]
234 }
235 }
236
237 fn can_acknowledge(&self) -> bool {
238 false
239 }
240}
241
242impl std::fmt::Debug for MemoryConfig {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 f.debug_struct("MemoryConfig")
245 .field("ttl", &self.ttl)
246 .field("scan_interval", &self.scan_interval)
247 .field("flush_interval", &self.flush_interval)
248 .field("max_byte_size", &self.max_byte_size)
249 .finish()
250 }
251}
252
253impl_generate_config_from_default!(MemoryConfig);