vector/enrichment_tables/memory/
config.rs1use std::{num::NonZeroU64, sync::Arc};
2
3use async_trait::async_trait;
4use futures::{FutureExt, future};
5use tokio::sync::Mutex;
6use vector_lib::{
7 config::{AcknowledgementsConfig, DataType, Input, LogNamespace},
8 configurable::configurable_component,
9 enrichment::Table,
10 id::ComponentKey,
11 lookup::lookup_v2::OptionalValuePath,
12 schema::{self},
13 sink::VectorSink,
14};
15use vrl::{path::OwnedTargetPath, value::Kind};
16
17use super::{internal_events::InternalMetricsConfig, source::MemorySourceConfig};
18use crate::{
19 config::{
20 EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
21 },
22 enrichment_tables::memory::Memory,
23 sinks::Healthcheck,
24 sources::Source,
25};
26
27#[configurable_component(enrichment_table("memory"))]
29#[derive(Clone)]
30pub struct MemoryConfig {
31 #[serde(default = "default_ttl")]
35 pub ttl: u64,
36 #[serde(default = "default_scan_interval")]
40 pub scan_interval: NonZeroU64,
41 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
49 pub flush_interval: Option<u64>,
50 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
55 pub max_byte_size: Option<u64>,
56 #[configurable(metadata(docs::hidden))]
58 #[serde(default)]
59 pub log_namespace: Option<bool>,
60 #[configurable(derived)]
62 #[serde(default)]
63 pub internal_metrics: InternalMetricsConfig,
64 #[configurable(derived)]
66 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
67 pub source_config: Option<MemorySourceConfig>,
68 #[configurable(derived)]
70 #[serde(default)]
71 pub ttl_field: OptionalValuePath,
72
73 #[serde(skip)]
74 memory: Arc<Mutex<Option<Box<Memory>>>>,
75}
76
77impl PartialEq for MemoryConfig {
78 fn eq(&self, other: &Self) -> bool {
79 self.ttl == other.ttl
80 && self.scan_interval == other.scan_interval
81 && self.flush_interval == other.flush_interval
82 }
83}
84impl Eq for MemoryConfig {}
85
86impl Default for MemoryConfig {
87 fn default() -> Self {
88 Self {
89 ttl: default_ttl(),
90 scan_interval: default_scan_interval(),
91 flush_interval: None,
92 memory: Arc::new(Mutex::new(None)),
93 max_byte_size: None,
94 log_namespace: None,
95 source_config: None,
96 internal_metrics: InternalMetricsConfig::default(),
97 ttl_field: OptionalValuePath::none(),
98 }
99 }
100}
101
102const fn default_ttl() -> u64 {
103 600
104}
105
106const fn default_scan_interval() -> NonZeroU64 {
107 unsafe { NonZeroU64::new_unchecked(30) }
108}
109
110impl MemoryConfig {
111 pub(super) async fn get_or_build_memory(&self) -> Memory {
112 let mut boxed_memory = self.memory.lock().await;
113 *boxed_memory
114 .get_or_insert_with(|| Box::new(Memory::new(self.clone())))
115 .clone()
116 }
117}
118
119impl EnrichmentTableConfig for MemoryConfig {
120 async fn build(
121 &self,
122 _globals: &crate::config::GlobalOptions,
123 ) -> crate::Result<Box<dyn Table + Send + Sync>> {
124 Ok(Box::new(self.get_or_build_memory().await))
125 }
126
127 fn sink_config(
128 &self,
129 default_key: &ComponentKey,
130 ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
131 Some((default_key.clone(), Box::new(self.clone())))
132 }
133
134 fn source_config(
135 &self,
136 _default_key: &ComponentKey,
137 ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
138 let Some(source_config) = &self.source_config else {
139 return None;
140 };
141 Some((
142 source_config.source_key.clone().into(),
143 Box::new(self.clone()),
144 ))
145 }
146}
147
148#[async_trait]
149#[typetag::serde(name = "memory_enrichment_table")]
150impl SinkConfig for MemoryConfig {
151 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
152 let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await);
153
154 Ok((sink, future::ok(()).boxed()))
155 }
156
157 fn input(&self) -> Input {
158 Input::log()
159 }
160
161 fn acknowledgements(&self) -> &AcknowledgementsConfig {
162 &AcknowledgementsConfig::DEFAULT
163 }
164}
165
166#[async_trait]
167#[typetag::serde(name = "memory_enrichment_table")]
168impl SourceConfig for MemoryConfig {
169 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
170 let memory = self.get_or_build_memory().await;
171
172 let log_namespace = cx.log_namespace(self.log_namespace);
173
174 Ok(Box::pin(
175 memory.as_source(cx.shutdown, cx.out, log_namespace).run(),
176 ))
177 }
178
179 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
180 let log_namespace = global_log_namespace.merge(self.log_namespace);
181 let schema_definition = match log_namespace {
182 LogNamespace::Legacy => schema::Definition::default_legacy_namespace(),
183 LogNamespace::Vector => {
184 schema::Definition::new_with_default_metadata(Kind::any_object(), [log_namespace])
185 .with_meaning(OwnedTargetPath::event_root(), "message")
186 }
187 }
188 .with_standard_vector_source_metadata();
189
190 vec![SourceOutput::new_maybe_logs(
191 DataType::Log,
192 schema_definition,
193 )]
194 }
195
196 fn can_acknowledge(&self) -> bool {
197 false
198 }
199}
200
201impl std::fmt::Debug for MemoryConfig {
202 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203 f.debug_struct("MemoryConfig")
204 .field("ttl", &self.ttl)
205 .field("scan_interval", &self.scan_interval)
206 .field("flush_interval", &self.flush_interval)
207 .field("max_byte_size", &self.max_byte_size)
208 .finish()
209 }
210}
211
212impl_generate_config_from_default!(MemoryConfig);