vector/enrichment_tables/memory/
config.rs

1use std::{num::NonZeroU64, sync::Arc};
2
3use async_trait::async_trait;
4use futures::{FutureExt, future};
5use tokio::sync::Mutex;
6use vector_lib::{
7    config::{AcknowledgementsConfig, DataType, Input, LogNamespace},
8    configurable::configurable_component,
9    enrichment::Table,
10    id::ComponentKey,
11    lookup::lookup_v2::OptionalValuePath,
12    schema::{self},
13    sink::VectorSink,
14};
15use vrl::{path::OwnedTargetPath, value::Kind};
16
17use super::{internal_events::InternalMetricsConfig, source::MemorySourceConfig};
18use crate::{
19    config::{
20        EnrichmentTableConfig, SinkConfig, SinkContext, SourceConfig, SourceContext, SourceOutput,
21    },
22    enrichment_tables::memory::Memory,
23    sinks::Healthcheck,
24    sources::Source,
25};
26
27/// Configuration for the `memory` enrichment table.
28#[configurable_component(enrichment_table("memory"))]
29#[derive(Clone)]
30pub struct MemoryConfig {
31    /// TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache.
32    /// When TTL expires, data behind a specific key in the cache is removed.
33    /// TTL is reset when the key is replaced.
34    #[serde(default = "default_ttl")]
35    pub ttl: u64,
36    /// The scan interval used to look for expired records. This is provided
37    /// as an optimization to ensure that TTL is updated, but without doing
38    /// too many cache scans.
39    #[serde(default = "default_scan_interval")]
40    pub scan_interval: NonZeroU64,
41    /// The interval used for making writes visible in the table.
42    /// Longer intervals might get better performance,
43    /// but there is a longer delay before the data is visible in the table.
44    /// Since every TTL scan makes its changes visible, only use this value
45    /// if it is shorter than the `scan_interval`.
46    ///
47    /// By default, all writes are made visible immediately.
48    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
49    pub flush_interval: Option<u64>,
50    /// Maximum size of the table in bytes. All insertions that make
51    /// this table bigger than the maximum size are rejected.
52    ///
53    /// By default, there is no size limit.
54    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
55    pub max_byte_size: Option<u64>,
56    /// The namespace to use for logs. This overrides the global setting.
57    #[configurable(metadata(docs::hidden))]
58    #[serde(default)]
59    pub log_namespace: Option<bool>,
60    /// Configuration of internal metrics
61    #[configurable(derived)]
62    #[serde(default)]
63    pub internal_metrics: InternalMetricsConfig,
64    /// Configuration for source functionality.
65    #[configurable(derived)]
66    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
67    pub source_config: Option<MemorySourceConfig>,
68    /// Field in the incoming value used as the TTL override.
69    #[configurable(derived)]
70    #[serde(default)]
71    pub ttl_field: OptionalValuePath,
72
73    #[serde(skip)]
74    memory: Arc<Mutex<Option<Box<Memory>>>>,
75}
76
77impl PartialEq for MemoryConfig {
78    fn eq(&self, other: &Self) -> bool {
79        self.ttl == other.ttl
80            && self.scan_interval == other.scan_interval
81            && self.flush_interval == other.flush_interval
82    }
83}
84impl Eq for MemoryConfig {}
85
86impl Default for MemoryConfig {
87    fn default() -> Self {
88        Self {
89            ttl: default_ttl(),
90            scan_interval: default_scan_interval(),
91            flush_interval: None,
92            memory: Arc::new(Mutex::new(None)),
93            max_byte_size: None,
94            log_namespace: None,
95            source_config: None,
96            internal_metrics: InternalMetricsConfig::default(),
97            ttl_field: OptionalValuePath::none(),
98        }
99    }
100}
101
102const fn default_ttl() -> u64 {
103    600
104}
105
106const fn default_scan_interval() -> NonZeroU64 {
107    unsafe { NonZeroU64::new_unchecked(30) }
108}
109
110impl MemoryConfig {
111    pub(super) async fn get_or_build_memory(&self) -> Memory {
112        let mut boxed_memory = self.memory.lock().await;
113        *boxed_memory
114            .get_or_insert_with(|| Box::new(Memory::new(self.clone())))
115            .clone()
116    }
117}
118
119impl EnrichmentTableConfig for MemoryConfig {
120    async fn build(
121        &self,
122        _globals: &crate::config::GlobalOptions,
123    ) -> crate::Result<Box<dyn Table + Send + Sync>> {
124        Ok(Box::new(self.get_or_build_memory().await))
125    }
126
127    fn sink_config(
128        &self,
129        default_key: &ComponentKey,
130    ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
131        Some((default_key.clone(), Box::new(self.clone())))
132    }
133
134    fn source_config(
135        &self,
136        _default_key: &ComponentKey,
137    ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
138        let Some(source_config) = &self.source_config else {
139            return None;
140        };
141        Some((
142            source_config.source_key.clone().into(),
143            Box::new(self.clone()),
144        ))
145    }
146}
147
148#[async_trait]
149#[typetag::serde(name = "memory_enrichment_table")]
150impl SinkConfig for MemoryConfig {
151    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
152        let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await);
153
154        Ok((sink, future::ok(()).boxed()))
155    }
156
157    fn input(&self) -> Input {
158        Input::log()
159    }
160
161    fn acknowledgements(&self) -> &AcknowledgementsConfig {
162        &AcknowledgementsConfig::DEFAULT
163    }
164}
165
166#[async_trait]
167#[typetag::serde(name = "memory_enrichment_table")]
168impl SourceConfig for MemoryConfig {
169    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
170        let memory = self.get_or_build_memory().await;
171
172        let log_namespace = cx.log_namespace(self.log_namespace);
173
174        Ok(Box::pin(
175            memory.as_source(cx.shutdown, cx.out, log_namespace).run(),
176        ))
177    }
178
179    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
180        let log_namespace = global_log_namespace.merge(self.log_namespace);
181        let schema_definition = match log_namespace {
182            LogNamespace::Legacy => schema::Definition::default_legacy_namespace(),
183            LogNamespace::Vector => {
184                schema::Definition::new_with_default_metadata(Kind::any_object(), [log_namespace])
185                    .with_meaning(OwnedTargetPath::event_root(), "message")
186            }
187        }
188        .with_standard_vector_source_metadata();
189
190        vec![SourceOutput::new_maybe_logs(
191            DataType::Log,
192            schema_definition,
193        )]
194    }
195
196    fn can_acknowledge(&self) -> bool {
197        false
198    }
199}
200
201impl std::fmt::Debug for MemoryConfig {
202    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
203        f.debug_struct("MemoryConfig")
204            .field("ttl", &self.ttl)
205            .field("scan_interval", &self.scan_interval)
206            .field("flush_interval", &self.flush_interval)
207            .field("max_byte_size", &self.max_byte_size)
208            .finish()
209    }
210}
211
212impl_generate_config_from_default!(MemoryConfig);