vector/enrichment_tables/memory/
config.rs

1use std::{num::NonZeroU64, sync::Arc};
2
3use async_trait::async_trait;
4use futures::{FutureExt, future};
5use tokio::sync::Mutex;
6use vector_lib::{
7    config::{AcknowledgementsConfig, DataType, Input, LogNamespace},
8    configurable::configurable_component,
9    enrichment::Table,
10    id::ComponentKey,
11    lookup::lookup_v2::OptionalValuePath,
12    schema::{self},
13    sink::VectorSink,
14};
15use vrl::{path::OwnedTargetPath, value::Kind};
16
17use super::{Memory, internal_events::InternalMetricsConfig, source::EXPIRED_ROUTE};
18use crate::{
19    config::{
20        EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
21    },
22    sinks::Healthcheck,
23    sources::Source,
24};
25
26/// Configuration for the `memory` enrichment table.
27#[configurable_component(enrichment_table("memory"))]
28#[derive(Clone)]
29pub struct MemoryConfig {
30    /// TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache.
31    /// When TTL expires, data behind a specific key in the cache is removed.
32    /// TTL is reset when the key is replaced.
33    #[serde(default = "default_ttl")]
34    pub ttl: u64,
35    /// The scan interval used to look for expired records. This is provided
36    /// as an optimization to ensure that TTL is updated, but without doing
37    /// too many cache scans.
38    #[serde(default = "default_scan_interval")]
39    pub scan_interval: NonZeroU64,
40    /// The interval used for making writes visible in the table.
41    /// Longer intervals might get better performance,
42    /// but there is a longer delay before the data is visible in the table.
43    /// Since every TTL scan makes its changes visible, only use this value
44    /// if it is shorter than the `scan_interval`.
45    ///
46    /// By default, all writes are made visible immediately.
47    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
48    pub flush_interval: Option<u64>,
49    /// Maximum size of the table in bytes. All insertions that make
50    /// this table bigger than the maximum size are rejected.
51    ///
52    /// By default, there is no size limit.
53    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
54    pub max_byte_size: Option<u64>,
55    /// The namespace to use for logs. This overrides the global setting.
56    #[configurable(metadata(docs::hidden))]
57    #[serde(default)]
58    pub log_namespace: Option<bool>,
59    /// Configuration of internal metrics
60    #[configurable(derived)]
61    #[serde(default)]
62    pub internal_metrics: InternalMetricsConfig,
63    /// Configuration for source functionality.
64    #[configurable(derived)]
65    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
66    pub source_config: Option<MemorySourceConfig>,
67    /// Field in the incoming value used as the TTL override.
68    #[configurable(derived)]
69    #[serde(default)]
70    pub ttl_field: OptionalValuePath,
71
72    #[serde(skip)]
73    memory: Arc<Mutex<Option<Box<Memory>>>>,
74}
75
76/// Configuration for memory enrichment table source functionality.
77#[configurable_component]
78#[derive(Clone, Debug, PartialEq, Eq)]
79#[serde(deny_unknown_fields)]
80pub struct MemorySourceConfig {
81    /// Interval for exporting all data from the table when used as a source.
82    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
83    pub export_interval: Option<NonZeroU64>,
84    /// Batch size for data exporting. Used to prevent exporting entire table at
85    /// once and blocking the system.
86    ///
87    /// By default, batches are not used and entire table is exported.
88    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
89    pub export_batch_size: Option<u64>,
90    /// If set to true, all data will be removed from cache after exporting.
91    /// Only valid if used as a source and export_interval > 0
92    ///
93    /// By default, export will not remove data from cache
94    #[serde(default = "crate::serde::default_false")]
95    pub remove_after_export: bool,
96    /// Set to true to export expired items via the `expired` output port.
97    /// Expired items ignore other settings and are exported as they are flushed from the table.
98    #[serde(default = "crate::serde::default_false")]
99    pub export_expired_items: bool,
100    /// Key to use for this component when used as a source. This must be different from the
101    /// component key.
102    pub source_key: String,
103}
104
105impl PartialEq for MemoryConfig {
106    fn eq(&self, other: &Self) -> bool {
107        self.ttl == other.ttl
108            && self.scan_interval == other.scan_interval
109            && self.flush_interval == other.flush_interval
110    }
111}
112impl Eq for MemoryConfig {}
113
114impl Default for MemoryConfig {
115    fn default() -> Self {
116        Self {
117            ttl: default_ttl(),
118            scan_interval: default_scan_interval(),
119            flush_interval: None,
120            memory: Arc::new(Mutex::new(None)),
121            max_byte_size: None,
122            log_namespace: None,
123            source_config: None,
124            internal_metrics: InternalMetricsConfig::default(),
125            ttl_field: OptionalValuePath::none(),
126        }
127    }
128}
129
130const fn default_ttl() -> u64 {
131    600
132}
133
134const fn default_scan_interval() -> NonZeroU64 {
135    unsafe { NonZeroU64::new_unchecked(30) }
136}
137
138impl MemoryConfig {
139    pub(super) async fn get_or_build_memory(&self) -> Memory {
140        let mut boxed_memory = self.memory.lock().await;
141        *boxed_memory
142            .get_or_insert_with(|| Box::new(Memory::new(self.clone())))
143            .clone()
144    }
145}
146
147impl EnrichmentTableConfig for MemoryConfig {
148    async fn build(
149        &self,
150        _globals: &crate::config::GlobalOptions,
151    ) -> crate::Result<Box<dyn Table + Send + Sync>> {
152        Ok(Box::new(self.get_or_build_memory().await))
153    }
154
155    fn sink_config(
156        &self,
157        default_key: &ComponentKey,
158    ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
159        Some((default_key.clone(), Box::new(self.clone())))
160    }
161
162    fn source_config(
163        &self,
164        _default_key: &ComponentKey,
165    ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
166        let Some(source_config) = &self.source_config else {
167            return None;
168        };
169        Some((
170            source_config.source_key.clone().into(),
171            Box::new(self.clone()),
172        ))
173    }
174}
175
176#[async_trait]
177#[typetag::serde(name = "memory_enrichment_table")]
178impl SinkConfig for MemoryConfig {
179    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
180        let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await);
181
182        Ok((sink, future::ok(()).boxed()))
183    }
184
185    fn input(&self) -> Input {
186        Input::log()
187    }
188
189    fn acknowledgements(&self) -> &AcknowledgementsConfig {
190        &AcknowledgementsConfig::DEFAULT
191    }
192}
193
194#[async_trait]
195#[typetag::serde(name = "memory_enrichment_table")]
196impl SourceConfig for MemoryConfig {
197    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
198        let memory = self.get_or_build_memory().await;
199
200        let log_namespace = cx.log_namespace(self.log_namespace);
201
202        Ok(Box::pin(
203            memory.as_source(cx.shutdown, cx.out, log_namespace).run(),
204        ))
205    }
206
207    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
208        let log_namespace = global_log_namespace.merge(self.log_namespace);
209        let schema_definition = match log_namespace {
210            LogNamespace::Legacy => schema::Definition::default_legacy_namespace(),
211            LogNamespace::Vector => {
212                schema::Definition::new_with_default_metadata(Kind::any_object(), [log_namespace])
213                    .with_meaning(OwnedTargetPath::event_root(), "message")
214            }
215        }
216        .with_standard_vector_source_metadata();
217
218        if self
219            .source_config
220            .as_ref()
221            .map(|c| c.export_expired_items)
222            .unwrap_or_default()
223        {
224            vec![
225                SourceOutput::new_maybe_logs(DataType::Log, schema_definition.clone()),
226                SourceOutput::new_maybe_logs(DataType::Log, schema_definition)
227                    .with_port(EXPIRED_ROUTE),
228            ]
229        } else {
230            vec![SourceOutput::new_maybe_logs(
231                DataType::Log,
232                schema_definition,
233            )]
234        }
235    }
236
237    fn can_acknowledge(&self) -> bool {
238        false
239    }
240}
241
242impl std::fmt::Debug for MemoryConfig {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        f.debug_struct("MemoryConfig")
245            .field("ttl", &self.ttl)
246            .field("scan_interval", &self.scan_interval)
247            .field("flush_interval", &self.flush_interval)
248            .field("max_byte_size", &self.max_byte_size)
249            .finish()
250    }
251}
252
253impl_generate_config_from_default!(MemoryConfig);