vector/enrichment_tables/memory/
config.rs

1use std::num::NonZeroU64;
2use std::sync::Arc;
3
4use crate::sinks::Healthcheck;
5use crate::sources::Source;
6use crate::{config::SinkContext, enrichment_tables::memory::Memory};
7use async_trait::async_trait;
8use futures::{future, FutureExt};
9use tokio::sync::Mutex;
10use vector_lib::config::{AcknowledgementsConfig, DataType, Input, LogNamespace};
11use vector_lib::enrichment::Table;
12use vector_lib::id::ComponentKey;
13use vector_lib::schema::{self};
14use vector_lib::{configurable::configurable_component, sink::VectorSink};
15use vrl::path::OwnedTargetPath;
16use vrl::value::Kind;
17
18use crate::config::{EnrichmentTableConfig, SinkConfig, SourceConfig, SourceContext, SourceOutput};
19
20use super::internal_events::InternalMetricsConfig;
21use super::source::MemorySourceConfig;
22
23/// Configuration for the `memory` enrichment table.
24#[configurable_component(enrichment_table("memory"))]
25#[derive(Clone)]
26pub struct MemoryConfig {
27    /// TTL (time-to-live in seconds) is used to limit the lifetime of data stored in the cache.
28    /// When TTL expires, data behind a specific key in the cache is removed.
29    /// TTL is reset when the key is replaced.
30    #[serde(default = "default_ttl")]
31    pub ttl: u64,
32    /// The scan interval used to look for expired records. This is provided
33    /// as an optimization to ensure that TTL is updated, but without doing
34    /// too many cache scans.
35    #[serde(default = "default_scan_interval")]
36    pub scan_interval: NonZeroU64,
37    /// The interval used for making writes visible in the table.
38    /// Longer intervals might get better performance,
39    /// but there is a longer delay before the data is visible in the table.
40    /// Since every TTL scan makes its changes visible, only use this value
41    /// if it is shorter than the `scan_interval`.
42    ///
43    /// By default, all writes are made visible immediately.
44    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
45    pub flush_interval: Option<u64>,
46    /// Maximum size of the table in bytes. All insertions that make
47    /// this table bigger than the maximum size are rejected.
48    ///
49    /// By default, there is no size limit.
50    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
51    pub max_byte_size: Option<u64>,
52    /// The namespace to use for logs. This overrides the global setting.
53    #[configurable(metadata(docs::hidden))]
54    #[serde(default)]
55    pub log_namespace: Option<bool>,
56    /// Configuration of internal metrics
57    #[configurable(derived)]
58    #[serde(default)]
59    pub internal_metrics: InternalMetricsConfig,
60    /// Configuration for source functionality.
61    #[configurable(derived)]
62    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
63    pub source_config: Option<MemorySourceConfig>,
64
65    #[serde(skip)]
66    memory: Arc<Mutex<Option<Box<Memory>>>>,
67}
68
69impl PartialEq for MemoryConfig {
70    fn eq(&self, other: &Self) -> bool {
71        self.ttl == other.ttl
72            && self.scan_interval == other.scan_interval
73            && self.flush_interval == other.flush_interval
74    }
75}
76impl Eq for MemoryConfig {}
77
78impl Default for MemoryConfig {
79    fn default() -> Self {
80        Self {
81            ttl: default_ttl(),
82            scan_interval: default_scan_interval(),
83            flush_interval: None,
84            memory: Arc::new(Mutex::new(None)),
85            max_byte_size: None,
86            log_namespace: None,
87            source_config: None,
88            internal_metrics: InternalMetricsConfig::default(),
89        }
90    }
91}
92
93const fn default_ttl() -> u64 {
94    600
95}
96
97const fn default_scan_interval() -> NonZeroU64 {
98    unsafe { NonZeroU64::new_unchecked(30) }
99}
100
101impl MemoryConfig {
102    pub(super) async fn get_or_build_memory(&self) -> Memory {
103        let mut boxed_memory = self.memory.lock().await;
104        *boxed_memory
105            .get_or_insert_with(|| Box::new(Memory::new(self.clone())))
106            .clone()
107    }
108}
109
110impl EnrichmentTableConfig for MemoryConfig {
111    async fn build(
112        &self,
113        _globals: &crate::config::GlobalOptions,
114    ) -> crate::Result<Box<dyn Table + Send + Sync>> {
115        Ok(Box::new(self.get_or_build_memory().await))
116    }
117
118    fn sink_config(
119        &self,
120        default_key: &ComponentKey,
121    ) -> Option<(ComponentKey, Box<dyn SinkConfig>)> {
122        Some((default_key.clone(), Box::new(self.clone())))
123    }
124
125    fn source_config(
126        &self,
127        _default_key: &ComponentKey,
128    ) -> Option<(ComponentKey, Box<dyn SourceConfig>)> {
129        let Some(source_config) = &self.source_config else {
130            return None;
131        };
132        Some((
133            source_config.source_key.clone().into(),
134            Box::new(self.clone()),
135        ))
136    }
137}
138
139#[async_trait]
140#[typetag::serde(name = "memory_enrichment_table")]
141impl SinkConfig for MemoryConfig {
142    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
143        let sink = VectorSink::from_event_streamsink(self.get_or_build_memory().await);
144
145        Ok((sink, future::ok(()).boxed()))
146    }
147
148    fn input(&self) -> Input {
149        Input::log()
150    }
151
152    fn acknowledgements(&self) -> &AcknowledgementsConfig {
153        &AcknowledgementsConfig::DEFAULT
154    }
155}
156
157#[async_trait]
158#[typetag::serde(name = "memory_enrichment_table")]
159impl SourceConfig for MemoryConfig {
160    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
161        let memory = self.get_or_build_memory().await;
162
163        let log_namespace = cx.log_namespace(self.log_namespace);
164
165        Ok(Box::pin(
166            memory.as_source(cx.shutdown, cx.out, log_namespace).run(),
167        ))
168    }
169
170    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
171        let log_namespace = global_log_namespace.merge(self.log_namespace);
172        let schema_definition = match log_namespace {
173            LogNamespace::Legacy => schema::Definition::default_legacy_namespace(),
174            LogNamespace::Vector => {
175                schema::Definition::new_with_default_metadata(Kind::any_object(), [log_namespace])
176                    .with_meaning(OwnedTargetPath::event_root(), "message")
177            }
178        }
179        .with_standard_vector_source_metadata();
180
181        vec![SourceOutput::new_maybe_logs(
182            DataType::Log,
183            schema_definition,
184        )]
185    }
186
187    fn can_acknowledge(&self) -> bool {
188        false
189    }
190}
191
192impl std::fmt::Debug for MemoryConfig {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("MemoryConfig")
195            .field("ttl", &self.ttl)
196            .field("scan_interval", &self.scan_interval)
197            .field("flush_interval", &self.flush_interval)
198            .field("max_byte_size", &self.max_byte_size)
199            .finish()
200    }
201}
202
203impl_generate_config_from_default!(MemoryConfig);