vector/enrichment_tables/memory/
table.rs

1use crate::enrichment_tables::memory::internal_events::{
2    MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed, MemoryEnrichmentTableInserted,
3    MemoryEnrichmentTableRead, MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
4};
5use crate::enrichment_tables::memory::MemoryConfig;
6use crate::SourceSender;
7use std::sync::{Arc, Mutex, MutexGuard};
8use std::time::{Duration, Instant};
9
10use evmap::shallow_copy::CopyValue;
11use evmap::{self};
12use evmap_derive::ShallowCopy;
13use futures::StreamExt;
14use thread_local::ThreadLocal;
15use tokio::time::interval;
16use tokio_stream::wrappers::IntervalStream;
17use vector_lib::config::LogNamespace;
18use vector_lib::shutdown::ShutdownSignal;
19use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use futures::stream::BoxStream;
24use vector_lib::enrichment::{Case, Condition, IndexHandle, Table};
25use vector_lib::event::{Event, EventStatus, Finalizable};
26use vector_lib::internal_event::{
27    ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
28};
29use vector_lib::sink::StreamSink;
30use vrl::value::{KeyString, ObjectMap, Value};
31
32use super::source::MemorySource;
33
34/// Single memory entry containing the value and TTL
35#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
36pub struct MemoryEntry {
37    value: String,
38    update_time: CopyValue<Instant>,
39}
40
41impl ByteSizeOf for MemoryEntry {
42    fn allocated_bytes(&self) -> usize {
43        self.value.size_of()
44    }
45}
46
47impl MemoryEntry {
48    pub(super) fn as_object_map(
49        &self,
50        now: Instant,
51        total_ttl: u64,
52        key: &str,
53    ) -> Result<ObjectMap, String> {
54        let ttl = total_ttl.saturating_sub(now.duration_since(*self.update_time).as_secs());
55        Ok(ObjectMap::from([
56            (
57                KeyString::from("key"),
58                Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
59            ),
60            (
61                KeyString::from("value"),
62                serde_json::from_str::<Value>(&self.value)
63                    .map_err(|_| "Failed to read value from memory!")?,
64            ),
65            (
66                KeyString::from("ttl"),
67                Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
68            ),
69        ]))
70    }
71
72    fn expired(&self, now: Instant, ttl: u64) -> bool {
73        now.duration_since(*self.update_time).as_secs() > ttl
74    }
75}
76
77#[derive(Default)]
78struct MemoryMetadata {
79    byte_size: u64,
80}
81
82// Used to ensure that these 2 are locked together
83pub(super) struct MemoryWriter {
84    pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
85    metadata: MemoryMetadata,
86}
87
88/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure.
89pub struct Memory {
90    pub(super) read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
91    pub(super) read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
92    pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
93    pub(super) config: MemoryConfig,
94}
95
96impl Memory {
97    /// Creates a new [Memory] based on the provided config.
98    pub fn new(config: MemoryConfig) -> Self {
99        let (read_handle, write_handle) = evmap::new();
100        Self {
101            config,
102            read_handle_factory: read_handle.factory(),
103            read_handle: ThreadLocal::new(),
104            write_handle: Arc::new(Mutex::new(MemoryWriter {
105                write_handle,
106                metadata: MemoryMetadata::default(),
107            })),
108        }
109    }
110
111    pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
112        self.read_handle
113            .get_or(|| self.read_handle_factory.handle())
114    }
115
116    fn handle_value(&self, value: ObjectMap) {
117        let mut writer = self.write_handle.lock().expect("mutex poisoned");
118        let now = Instant::now();
119
120        for (k, v) in value.into_iter() {
121            let new_entry_key = String::from(k);
122            let Ok(v) = serde_json::to_string(&v) else {
123                emit!(MemoryEnrichmentTableInsertFailed {
124                    key: &new_entry_key,
125                    include_key_metric_tag: self.config.internal_metrics.include_key_tag
126                });
127                continue;
128            };
129            let new_entry = MemoryEntry {
130                value: v,
131                update_time: now.into(),
132            };
133            let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
134            if let Some(max_byte_size) = self.config.max_byte_size {
135                if writer
136                    .metadata
137                    .byte_size
138                    .saturating_add(new_entry_size as u64)
139                    > max_byte_size
140                {
141                    // Reject new entries
142                    emit!(MemoryEnrichmentTableInsertFailed {
143                        key: &new_entry_key,
144                        include_key_metric_tag: self.config.internal_metrics.include_key_tag
145                    });
146                    continue;
147                }
148            }
149            writer.metadata.byte_size = writer
150                .metadata
151                .byte_size
152                .saturating_add(new_entry_size as u64);
153            emit!(MemoryEnrichmentTableInserted {
154                key: &new_entry_key,
155                include_key_metric_tag: self.config.internal_metrics.include_key_tag
156            });
157            writer.write_handle.update(new_entry_key, new_entry);
158        }
159
160        if self.config.flush_interval.is_none() {
161            self.flush(writer);
162        }
163    }
164
165    fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
166        let now = Instant::now();
167
168        let mut needs_flush = false;
169        // Since evmap holds 2 separate maps for the data, we are free to directly remove
170        // elements via the writer, while we are iterating the reader
171        // Refresh will happen only after we manually invoke it after iteration
172        if let Some(reader) = self.get_read_handle().read() {
173            for (k, v) in reader.iter() {
174                if let Some(entry) = v.get_one() {
175                    if entry.expired(now, self.config.ttl) {
176                        // Byte size is not reduced at this point, because the actual deletion
177                        // will only happen at refresh time
178                        writer.write_handle.empty(k.clone());
179                        emit!(MemoryEnrichmentTableTtlExpired {
180                            key: k,
181                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
182                        });
183                        needs_flush = true;
184                    }
185                }
186            }
187        };
188
189        needs_flush
190    }
191
192    fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
193        let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
194        if needs_flush {
195            self.flush(writer);
196        }
197    }
198
199    fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
200        writer.write_handle.refresh();
201        if let Some(reader) = self.get_read_handle().read() {
202            let mut byte_size = 0;
203            for (k, v) in reader.iter() {
204                byte_size += k.size_of() + v.get_one().size_of();
205            }
206            writer.metadata.byte_size = byte_size as u64;
207            emit!(MemoryEnrichmentTableFlushed {
208                new_objects_count: reader.len(),
209                new_byte_size: byte_size
210            });
211        }
212    }
213
214    pub(crate) fn as_source(
215        &self,
216        shutdown: ShutdownSignal,
217        out: SourceSender,
218        log_namespace: LogNamespace,
219    ) -> MemorySource {
220        MemorySource {
221            memory: self.clone(),
222            shutdown,
223            out,
224            log_namespace,
225        }
226    }
227}
228
229impl Clone for Memory {
230    fn clone(&self) -> Self {
231        Self {
232            read_handle_factory: self.read_handle_factory.clone(),
233            read_handle: ThreadLocal::new(),
234            write_handle: Arc::clone(&self.write_handle),
235            config: self.config.clone(),
236        }
237    }
238}
239
240impl Table for Memory {
241    fn find_table_row<'a>(
242        &self,
243        case: Case,
244        condition: &'a [Condition<'a>],
245        select: Option<&'a [String]>,
246        wildcard: Option<&Value>,
247        index: Option<IndexHandle>,
248    ) -> Result<ObjectMap, String> {
249        let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
250
251        match rows.pop() {
252            Some(row) if rows.is_empty() => Ok(row),
253            Some(_) => Err("More than 1 row found".to_string()),
254            None => Err("Key not found".to_string()),
255        }
256    }
257
258    fn find_table_rows<'a>(
259        &self,
260        _case: Case,
261        condition: &'a [Condition<'a>],
262        _select: Option<&'a [String]>,
263        _wildcard: Option<&Value>,
264        _index: Option<IndexHandle>,
265    ) -> Result<Vec<ObjectMap>, String> {
266        match condition.first() {
267            Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()),
268            Some(Condition::Equals { value, .. }) => {
269                let key = value.to_string_lossy();
270                match self.get_read_handle().get_one(key.as_ref()) {
271                    Some(row) => {
272                        emit!(MemoryEnrichmentTableRead {
273                            key: &key,
274                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
275                        });
276                        row.as_object_map(Instant::now(), self.config.ttl, &key)
277                            .map(|r| vec![r])
278                    }
279                    None => {
280                        emit!(MemoryEnrichmentTableReadFailed {
281                            key: &key,
282                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
283                        });
284                        Ok(Default::default())
285                    }
286                }
287            }
288            Some(_) => Err("Only equality condition is allowed".to_string()),
289            None => Err("Key condition must be specified".to_string()),
290        }
291    }
292
293    fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, String> {
294        match fields.len() {
295            0 => Err("Key field is required".to_string()),
296            1 => Ok(IndexHandle(0)),
297            _ => Err("Only one field is allowed".to_string()),
298        }
299    }
300
301    /// Returns a list of the field names that are in each index
302    fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
303        Vec::new()
304    }
305
306    /// Doesn't need reload, data is written directly
307    fn needs_reload(&self) -> bool {
308        false
309    }
310}
311
312impl std::fmt::Debug for Memory {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        write!(f, "Memory {} row(s)", self.get_read_handle().len())
315    }
316}
317
318#[async_trait]
319impl StreamSink<Event> for Memory {
320    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
321        let events_sent = register!(EventsSent::from(Output(None)));
322        let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
323        let mut flush_interval = IntervalStream::new(interval(
324            self.config
325                .flush_interval
326                .map(Duration::from_secs)
327                .unwrap_or(Duration::MAX),
328        ));
329        let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
330            self.config.scan_interval.into(),
331        )));
332
333        loop {
334            tokio::select! {
335                event = input.next() => {
336                    let mut event = if let Some(event) = event {
337                        event
338                    } else {
339                        break;
340                    };
341                    let event_byte_size = event.estimated_json_encoded_size_of();
342
343                    let finalizers = event.take_finalizers();
344
345                    // Panic: This sink only accepts Logs, so this should never panic
346                    let log = event.into_log();
347
348                    if let (Value::Object(map), _) = log.into_parts() {
349                        self.handle_value(map)
350                    };
351
352                    finalizers.update_status(EventStatus::Delivered);
353                    events_sent.emit(CountByteSize(1, event_byte_size));
354                    bytes_sent.emit(ByteSize(event_byte_size.get()));
355                }
356
357                Some(_) = flush_interval.next() => {
358                    let writer = self.write_handle.lock().expect("mutex poisoned");
359                    self.flush(writer);
360                }
361
362                Some(_) = scan_interval.next() => {
363                    let writer = self.write_handle.lock().expect("mutex poisoned");
364                    self.scan(writer);
365                }
366            }
367        }
368        Ok(())
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use futures::{future::ready, StreamExt};
375    use futures_util::stream;
376    use std::{num::NonZeroU64, time::Duration};
377    use tokio::time;
378
379    use vector_lib::{
380        event::{EventContainer, MetricValue},
381        metrics::Controller,
382        sink::VectorSink,
383    };
384
385    use super::*;
386    use crate::{
387        enrichment_tables::memory::{
388            internal_events::InternalMetricsConfig, source::MemorySourceConfig,
389        },
390        event::{Event, LogEvent},
391        test_util::components::{
392            run_and_assert_sink_compliance, run_and_assert_source_compliance, SINK_TAGS,
393            SOURCE_TAGS,
394        },
395    };
396
397    fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
398        let mut config = MemoryConfig::default();
399        modfn(&mut config);
400        config
401    }
402
403    #[test]
404    fn finds_row() {
405        let memory = Memory::new(Default::default());
406        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
407
408        let condition = Condition::Equals {
409            field: "key",
410            value: Value::from("test_key"),
411        };
412
413        assert_eq!(
414            Ok(ObjectMap::from([
415                ("key".into(), Value::from("test_key")),
416                ("ttl".into(), Value::from(memory.config.ttl)),
417                ("value".into(), Value::from(5)),
418            ])),
419            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
420        );
421    }
422
423    #[test]
424    fn calculates_ttl() {
425        let ttl = 100;
426        let secs_to_subtract = 10;
427        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
428        {
429            let mut handle = memory.write_handle.lock().unwrap();
430            handle.write_handle.update(
431                "test_key".to_string(),
432                MemoryEntry {
433                    value: "5".to_string(),
434                    update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
435                },
436            );
437            handle.write_handle.refresh();
438        }
439
440        let condition = Condition::Equals {
441            field: "key",
442            value: Value::from("test_key"),
443        };
444
445        assert_eq!(
446            Ok(ObjectMap::from([
447                ("key".into(), Value::from("test_key")),
448                ("ttl".into(), Value::from(ttl - secs_to_subtract)),
449                ("value".into(), Value::from(5)),
450            ])),
451            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
452        );
453    }
454
455    #[test]
456    fn removes_expired_records_on_scan_interval() {
457        let ttl = 100;
458        let memory = Memory::new(build_memory_config(|c| {
459            c.ttl = ttl;
460        }));
461        {
462            let mut handle = memory.write_handle.lock().unwrap();
463            handle.write_handle.update(
464                "test_key".to_string(),
465                MemoryEntry {
466                    value: "5".to_string(),
467                    update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
468                },
469            );
470            handle.write_handle.refresh();
471        }
472
473        // Finds the value before scan
474        let condition = Condition::Equals {
475            field: "key",
476            value: Value::from("test_key"),
477        };
478        assert_eq!(
479            Ok(ObjectMap::from([
480                ("key".into(), Value::from("test_key")),
481                ("ttl".into(), Value::from(0)),
482                ("value".into(), Value::from(5)),
483            ])),
484            memory.find_table_row(Case::Sensitive, &[condition.clone()], None, None, None)
485        );
486
487        // Force scan
488        let writer = memory.write_handle.lock().unwrap();
489        memory.scan(writer);
490
491        // The value is not present anymore
492        assert!(memory
493            .find_table_rows(Case::Sensitive, &[condition], None, None, None)
494            .unwrap()
495            .pop()
496            .is_none());
497    }
498
499    #[test]
500    fn does_not_show_values_before_flush_interval() {
501        let ttl = 100;
502        let memory = Memory::new(build_memory_config(|c| {
503            c.ttl = ttl;
504            c.flush_interval = Some(10);
505        }));
506        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
507
508        let condition = Condition::Equals {
509            field: "key",
510            value: Value::from("test_key"),
511        };
512
513        assert!(memory
514            .find_table_rows(Case::Sensitive, &[condition], None, None, None)
515            .unwrap()
516            .pop()
517            .is_none());
518    }
519
520    #[test]
521    fn updates_ttl_on_value_replacement() {
522        let ttl = 100;
523        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
524        {
525            let mut handle = memory.write_handle.lock().unwrap();
526            handle.write_handle.update(
527                "test_key".to_string(),
528                MemoryEntry {
529                    value: "5".to_string(),
530                    update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
531                },
532            );
533            handle.write_handle.refresh();
534        }
535        let condition = Condition::Equals {
536            field: "key",
537            value: Value::from("test_key"),
538        };
539
540        assert_eq!(
541            Ok(ObjectMap::from([
542                ("key".into(), Value::from("test_key")),
543                ("ttl".into(), Value::from(ttl / 2)),
544                ("value".into(), Value::from(5)),
545            ])),
546            memory.find_table_row(Case::Sensitive, &[condition.clone()], None, None, None)
547        );
548
549        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
550
551        assert_eq!(
552            Ok(ObjectMap::from([
553                ("key".into(), Value::from("test_key")),
554                ("ttl".into(), Value::from(ttl)),
555                ("value".into(), Value::from(5)),
556            ])),
557            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
558        );
559    }
560
561    #[test]
562    fn ignores_all_values_over_byte_size_limit() {
563        let memory = Memory::new(build_memory_config(|c| {
564            c.max_byte_size = Some(1);
565        }));
566        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
567
568        let condition = Condition::Equals {
569            field: "key",
570            value: Value::from("test_key"),
571        };
572
573        assert!(memory
574            .find_table_rows(Case::Sensitive, &[condition], None, None, None)
575            .unwrap()
576            .pop()
577            .is_none());
578    }
579
580    #[test]
581    fn ignores_values_when_byte_size_limit_is_reached() {
582        let ttl = 100;
583        let memory = Memory::new(build_memory_config(|c| {
584            c.ttl = ttl;
585            c.max_byte_size = Some(150);
586        }));
587        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
588        memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
589
590        assert_eq!(
591            Ok(ObjectMap::from([
592                ("key".into(), Value::from("test_key")),
593                ("ttl".into(), Value::from(ttl)),
594                ("value".into(), Value::from(5)),
595            ])),
596            memory.find_table_row(
597                Case::Sensitive,
598                &[Condition::Equals {
599                    field: "key",
600                    value: Value::from("test_key")
601                }],
602                None,
603                None,
604                None
605            )
606        );
607
608        assert!(memory
609            .find_table_rows(
610                Case::Sensitive,
611                &[Condition::Equals {
612                    field: "key",
613                    value: Value::from("rejected_key")
614                }],
615                None,
616                None,
617                None
618            )
619            .unwrap()
620            .pop()
621            .is_none());
622    }
623
624    #[test]
625    fn missing_key() {
626        let memory = Memory::new(Default::default());
627
628        let condition = Condition::Equals {
629            field: "key",
630            value: Value::from("test_key"),
631        };
632
633        assert!(memory
634            .find_table_rows(Case::Sensitive, &[condition], None, None, None)
635            .unwrap()
636            .pop()
637            .is_none());
638    }
639
640    #[tokio::test]
641    async fn sink_spec_compliance() {
642        let event = Event::Log(LogEvent::from(ObjectMap::from([(
643            "test_key".into(),
644            Value::from(5),
645        )])));
646
647        let memory = Memory::new(Default::default());
648
649        run_and_assert_sink_compliance(
650            VectorSink::from_event_streamsink(memory),
651            stream::once(ready(event)),
652            &SINK_TAGS,
653        )
654        .await;
655    }
656
657    #[tokio::test]
658    async fn flush_metrics_without_interval() {
659        let event = Event::Log(LogEvent::from(ObjectMap::from([(
660            "test_key".into(),
661            Value::from(5),
662        )])));
663
664        let memory = Memory::new(Default::default());
665
666        run_and_assert_sink_compliance(
667            VectorSink::from_event_streamsink(memory),
668            stream::once(ready(event)),
669            &SINK_TAGS,
670        )
671        .await;
672
673        let metrics = Controller::get().unwrap().capture_metrics();
674        let insertions_counter = metrics
675            .iter()
676            .find(|m| {
677                matches!(m.value(), MetricValue::Counter { .. })
678                    && m.name() == "memory_enrichment_table_insertions_total"
679            })
680            .expect("Insertions metric is missing!");
681        let MetricValue::Counter {
682            value: insertions_count,
683        } = insertions_counter.value()
684        else {
685            unreachable!();
686        };
687        let flushes_counter = metrics
688            .iter()
689            .find(|m| {
690                matches!(m.value(), MetricValue::Counter { .. })
691                    && m.name() == "memory_enrichment_table_flushes_total"
692            })
693            .expect("Flushes metric is missing!");
694        let MetricValue::Counter {
695            value: flushes_count,
696        } = flushes_counter.value()
697        else {
698            unreachable!();
699        };
700        let object_count_gauge = metrics
701            .iter()
702            .find(|m| {
703                matches!(m.value(), MetricValue::Gauge { .. })
704                    && m.name() == "memory_enrichment_table_objects_count"
705            })
706            .expect("Object count metric is missing!");
707        let MetricValue::Gauge {
708            value: object_count,
709        } = object_count_gauge.value()
710        else {
711            unreachable!();
712        };
713        let byte_size_gauge = metrics
714            .iter()
715            .find(|m| {
716                matches!(m.value(), MetricValue::Gauge { .. })
717                    && m.name() == "memory_enrichment_table_byte_size"
718            })
719            .expect("Byte size metric is missing!");
720        assert_eq!(*insertions_count, 1.0);
721        assert_eq!(*flushes_count, 1.0);
722        assert_eq!(*object_count, 1.0);
723        assert!(!byte_size_gauge.is_empty());
724    }
725
726    #[tokio::test]
727    async fn flush_metrics_with_interval() {
728        let event = Event::Log(LogEvent::from(ObjectMap::from([(
729            "test_key".into(),
730            Value::from(5),
731        )])));
732
733        let memory = Memory::new(build_memory_config(|c| {
734            c.flush_interval = Some(1);
735        }));
736
737        run_and_assert_sink_compliance(
738            VectorSink::from_event_streamsink(memory),
739            stream::iter(vec![event.clone(), event]).flat_map(|e| {
740                stream::once(async move {
741                    tokio::time::sleep(Duration::from_millis(600)).await;
742                    e
743                })
744            }),
745            &SINK_TAGS,
746        )
747        .await;
748
749        let metrics = Controller::get().unwrap().capture_metrics();
750        let insertions_counter = metrics
751            .iter()
752            .find(|m| {
753                matches!(m.value(), MetricValue::Counter { .. })
754                    && m.name() == "memory_enrichment_table_insertions_total"
755            })
756            .expect("Insertions metric is missing!");
757        let MetricValue::Counter {
758            value: insertions_count,
759        } = insertions_counter.value()
760        else {
761            unreachable!();
762        };
763        let flushes_counter = metrics
764            .iter()
765            .find(|m| {
766                matches!(m.value(), MetricValue::Counter { .. })
767                    && m.name() == "memory_enrichment_table_flushes_total"
768            })
769            .expect("Flushes metric is missing!");
770        let MetricValue::Counter {
771            value: flushes_count,
772        } = flushes_counter.value()
773        else {
774            unreachable!();
775        };
776        let object_count_gauge = metrics
777            .iter()
778            .find(|m| {
779                matches!(m.value(), MetricValue::Gauge { .. })
780                    && m.name() == "memory_enrichment_table_objects_count"
781            })
782            .expect("Object count metric is missing!");
783        let MetricValue::Gauge {
784            value: object_count,
785        } = object_count_gauge.value()
786        else {
787            unreachable!();
788        };
789        let byte_size_gauge = metrics
790            .iter()
791            .find(|m| {
792                matches!(m.value(), MetricValue::Gauge { .. })
793                    && m.name() == "memory_enrichment_table_byte_size"
794            })
795            .expect("Byte size metric is missing!");
796
797        assert_eq!(*insertions_count, 2.0);
798        // One is done right away and the next one after the interval
799        assert_eq!(*flushes_count, 2.0);
800        assert_eq!(*object_count, 1.0);
801        assert!(!byte_size_gauge.is_empty());
802    }
803
804    #[tokio::test]
805    async fn flush_metrics_with_key() {
806        let event = Event::Log(LogEvent::from(ObjectMap::from([(
807            "test_key".into(),
808            Value::from(5),
809        )])));
810
811        let memory = Memory::new(build_memory_config(|c| {
812            c.internal_metrics = InternalMetricsConfig {
813                include_key_tag: true,
814            };
815        }));
816
817        run_and_assert_sink_compliance(
818            VectorSink::from_event_streamsink(memory),
819            stream::once(ready(event)),
820            &SINK_TAGS,
821        )
822        .await;
823
824        let metrics = Controller::get().unwrap().capture_metrics();
825        let insertions_counter = metrics
826            .iter()
827            .find(|m| {
828                matches!(m.value(), MetricValue::Counter { .. })
829                    && m.name() == "memory_enrichment_table_insertions_total"
830            })
831            .expect("Insertions metric is missing!");
832
833        assert!(insertions_counter.tag_matches("key", "test_key"));
834    }
835
836    #[tokio::test]
837    async fn flush_metrics_without_key() {
838        let event = Event::Log(LogEvent::from(ObjectMap::from([(
839            "test_key".into(),
840            Value::from(5),
841        )])));
842
843        let memory = Memory::new(Default::default());
844
845        run_and_assert_sink_compliance(
846            VectorSink::from_event_streamsink(memory),
847            stream::once(ready(event)),
848            &SINK_TAGS,
849        )
850        .await;
851
852        let metrics = Controller::get().unwrap().capture_metrics();
853        let insertions_counter = metrics
854            .iter()
855            .find(|m| {
856                matches!(m.value(), MetricValue::Counter { .. })
857                    && m.name() == "memory_enrichment_table_insertions_total"
858            })
859            .expect("Insertions metric is missing!");
860
861        assert!(insertions_counter.tag_value("key").is_none());
862    }
863
864    #[tokio::test]
865    async fn source_spec_compliance() {
866        let mut memory_config = MemoryConfig::default();
867        memory_config.source_config = Some(MemorySourceConfig {
868            export_interval: NonZeroU64::try_from(1).unwrap(),
869            export_batch_size: None,
870            remove_after_export: false,
871            source_key: "test".to_string(),
872        });
873        let memory = memory_config.get_or_build_memory().await;
874        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
875
876        let mut events: Vec<Event> = run_and_assert_source_compliance(
877            memory_config,
878            time::Duration::from_secs(5),
879            &SOURCE_TAGS,
880        )
881        .await;
882
883        assert!(!events.is_empty());
884        let event = events.remove(0);
885        let log = event.as_log();
886
887        assert!(!log.value().is_empty());
888    }
889}