vector/enrichment_tables/memory/
table.rs

1#![allow(unsafe_op_in_unsafe_fn)] // TODO review ShallowCopy usage code and fix properly.
2
3use std::{
4    sync::{Arc, Mutex, MutexGuard},
5    time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use evmap::{
11    shallow_copy::CopyValue,
12    {self},
13};
14use evmap_derive::ShallowCopy;
15use futures::{StreamExt, stream::BoxStream};
16use thread_local::ThreadLocal;
17use tokio::time::interval;
18use tokio_stream::wrappers::IntervalStream;
19use vector_lib::{
20    ByteSizeOf, EstimatedJsonEncodedSizeOf,
21    config::LogNamespace,
22    enrichment::{Case, Condition, IndexHandle, Table},
23    event::{Event, EventStatus, Finalizable},
24    internal_event::{
25        ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
26    },
27    shutdown::ShutdownSignal,
28    sink::StreamSink,
29};
30use vrl::value::{KeyString, ObjectMap, Value};
31
32use super::source::MemorySource;
33use crate::{
34    SourceSender,
35    enrichment_tables::memory::{
36        MemoryConfig,
37        internal_events::{
38            MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed,
39            MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead,
40            MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
41        },
42    },
43};
44
45/// Single memory entry containing the value and TTL
46#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
47pub struct MemoryEntry {
48    value: String,
49    update_time: CopyValue<Instant>,
50    ttl: u64,
51}
52
53impl ByteSizeOf for MemoryEntry {
54    fn allocated_bytes(&self) -> usize {
55        self.value.size_of()
56    }
57}
58
59impl MemoryEntry {
60    pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, String> {
61        let ttl = self
62            .ttl
63            .saturating_sub(now.duration_since(*self.update_time).as_secs());
64        Ok(ObjectMap::from([
65            (
66                KeyString::from("key"),
67                Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
68            ),
69            (
70                KeyString::from("value"),
71                serde_json::from_str::<Value>(&self.value)
72                    .map_err(|_| "Failed to read value from memory!")?,
73            ),
74            (
75                KeyString::from("ttl"),
76                Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
77            ),
78        ]))
79    }
80
81    fn expired(&self, now: Instant) -> bool {
82        now.duration_since(*self.update_time).as_secs() > self.ttl
83    }
84}
85
86#[derive(Default)]
87struct MemoryMetadata {
88    byte_size: u64,
89}
90
91// Used to ensure that these 2 are locked together
92pub(super) struct MemoryWriter {
93    pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
94    metadata: MemoryMetadata,
95}
96
97/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure.
98pub struct Memory {
99    pub(super) read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
100    pub(super) read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
101    pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
102    pub(super) config: MemoryConfig,
103}
104
105impl Memory {
106    /// Creates a new [Memory] based on the provided config.
107    pub fn new(config: MemoryConfig) -> Self {
108        let (read_handle, write_handle) = evmap::new();
109        Self {
110            config,
111            read_handle_factory: read_handle.factory(),
112            read_handle: ThreadLocal::new(),
113            write_handle: Arc::new(Mutex::new(MemoryWriter {
114                write_handle,
115                metadata: MemoryMetadata::default(),
116            })),
117        }
118    }
119
120    pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
121        self.read_handle
122            .get_or(|| self.read_handle_factory.handle())
123    }
124
125    fn handle_value(&self, value: ObjectMap) {
126        let mut writer = self.write_handle.lock().expect("mutex poisoned");
127        let now = Instant::now();
128
129        for (k, value) in value.into_iter() {
130            let new_entry_key = String::from(k);
131            let Ok(v) = serde_json::to_string(&value) else {
132                emit!(MemoryEnrichmentTableInsertFailed {
133                    key: &new_entry_key,
134                    include_key_metric_tag: self.config.internal_metrics.include_key_tag
135                });
136                continue;
137            };
138            let new_entry = MemoryEntry {
139                value: v,
140                update_time: now.into(),
141                ttl: self
142                    .config
143                    .ttl_field
144                    .path
145                    .as_ref()
146                    .and_then(|p| value.get(p))
147                    .and_then(|v| v.as_integer())
148                    .map(|v| v as u64)
149                    .unwrap_or(self.config.ttl),
150            };
151            let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
152            if let Some(max_byte_size) = self.config.max_byte_size
153                && writer
154                    .metadata
155                    .byte_size
156                    .saturating_add(new_entry_size as u64)
157                    > max_byte_size
158            {
159                // Reject new entries
160                emit!(MemoryEnrichmentTableInsertFailed {
161                    key: &new_entry_key,
162                    include_key_metric_tag: self.config.internal_metrics.include_key_tag
163                });
164                continue;
165            }
166            writer.metadata.byte_size = writer
167                .metadata
168                .byte_size
169                .saturating_add(new_entry_size as u64);
170            emit!(MemoryEnrichmentTableInserted {
171                key: &new_entry_key,
172                include_key_metric_tag: self.config.internal_metrics.include_key_tag
173            });
174            writer.write_handle.update(new_entry_key, new_entry);
175        }
176
177        if self.config.flush_interval.is_none() {
178            self.flush(writer);
179        }
180    }
181
182    fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
183        let now = Instant::now();
184
185        let mut needs_flush = false;
186        // Since evmap holds 2 separate maps for the data, we are free to directly remove
187        // elements via the writer, while we are iterating the reader
188        // Refresh will happen only after we manually invoke it after iteration
189        if let Some(reader) = self.get_read_handle().read() {
190            for (k, v) in reader.iter() {
191                if let Some(entry) = v.get_one()
192                    && entry.expired(now)
193                {
194                    // Byte size is not reduced at this point, because the actual deletion
195                    // will only happen at refresh time
196                    writer.write_handle.empty(k.clone());
197                    emit!(MemoryEnrichmentTableTtlExpired {
198                        key: k,
199                        include_key_metric_tag: self.config.internal_metrics.include_key_tag
200                    });
201                    needs_flush = true;
202                }
203            }
204        };
205
206        needs_flush
207    }
208
209    fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
210        let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
211        if needs_flush {
212            self.flush(writer);
213        }
214    }
215
216    fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
217        writer.write_handle.refresh();
218        if let Some(reader) = self.get_read_handle().read() {
219            let mut byte_size = 0;
220            for (k, v) in reader.iter() {
221                byte_size += k.size_of() + v.get_one().size_of();
222            }
223            writer.metadata.byte_size = byte_size as u64;
224            emit!(MemoryEnrichmentTableFlushed {
225                new_objects_count: reader.len(),
226                new_byte_size: byte_size
227            });
228        }
229    }
230
231    pub(crate) fn as_source(
232        &self,
233        shutdown: ShutdownSignal,
234        out: SourceSender,
235        log_namespace: LogNamespace,
236    ) -> MemorySource {
237        MemorySource {
238            memory: self.clone(),
239            shutdown,
240            out,
241            log_namespace,
242        }
243    }
244}
245
246impl Clone for Memory {
247    fn clone(&self) -> Self {
248        Self {
249            read_handle_factory: self.read_handle_factory.clone(),
250            read_handle: ThreadLocal::new(),
251            write_handle: Arc::clone(&self.write_handle),
252            config: self.config.clone(),
253        }
254    }
255}
256
257impl Table for Memory {
258    fn find_table_row<'a>(
259        &self,
260        case: Case,
261        condition: &'a [Condition<'a>],
262        select: Option<&'a [String]>,
263        wildcard: Option<&Value>,
264        index: Option<IndexHandle>,
265    ) -> Result<ObjectMap, String> {
266        let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
267
268        match rows.pop() {
269            Some(row) if rows.is_empty() => Ok(row),
270            Some(_) => Err("More than 1 row found".to_string()),
271            None => Err("Key not found".to_string()),
272        }
273    }
274
275    fn find_table_rows<'a>(
276        &self,
277        _case: Case,
278        condition: &'a [Condition<'a>],
279        _select: Option<&'a [String]>,
280        _wildcard: Option<&Value>,
281        _index: Option<IndexHandle>,
282    ) -> Result<Vec<ObjectMap>, String> {
283        match condition.first() {
284            Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()),
285            Some(Condition::Equals { value, .. }) => {
286                let key = value.to_string_lossy();
287                match self.get_read_handle().get_one(key.as_ref()) {
288                    Some(row) => {
289                        emit!(MemoryEnrichmentTableRead {
290                            key: &key,
291                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
292                        });
293                        row.as_object_map(Instant::now(), &key).map(|r| vec![r])
294                    }
295                    None => {
296                        emit!(MemoryEnrichmentTableReadFailed {
297                            key: &key,
298                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
299                        });
300                        Ok(Default::default())
301                    }
302                }
303            }
304            Some(_) => Err("Only equality condition is allowed".to_string()),
305            None => Err("Key condition must be specified".to_string()),
306        }
307    }
308
309    fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, String> {
310        match fields.len() {
311            0 => Err("Key field is required".to_string()),
312            1 => Ok(IndexHandle(0)),
313            _ => Err("Only one field is allowed".to_string()),
314        }
315    }
316
317    /// Returns a list of the field names that are in each index
318    fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
319        Vec::new()
320    }
321
322    /// Doesn't need reload, data is written directly
323    fn needs_reload(&self) -> bool {
324        false
325    }
326}
327
328impl std::fmt::Debug for Memory {
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        write!(f, "Memory {} row(s)", self.get_read_handle().len())
331    }
332}
333
334#[async_trait]
335impl StreamSink<Event> for Memory {
336    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
337        let events_sent = register!(EventsSent::from(Output(None)));
338        let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
339        let mut flush_interval = IntervalStream::new(interval(
340            self.config
341                .flush_interval
342                .map(Duration::from_secs)
343                .unwrap_or(Duration::MAX),
344        ));
345        let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
346            self.config.scan_interval.into(),
347        )));
348
349        loop {
350            tokio::select! {
351                event = input.next() => {
352                    let mut event = if let Some(event) = event {
353                        event
354                    } else {
355                        break;
356                    };
357                    let event_byte_size = event.estimated_json_encoded_size_of();
358
359                    let finalizers = event.take_finalizers();
360
361                    // Panic: This sink only accepts Logs, so this should never panic
362                    let log = event.into_log();
363
364                    if let (Value::Object(map), _) = log.into_parts() {
365                        self.handle_value(map)
366                    };
367
368                    finalizers.update_status(EventStatus::Delivered);
369                    events_sent.emit(CountByteSize(1, event_byte_size));
370                    bytes_sent.emit(ByteSize(event_byte_size.get()));
371                }
372
373                Some(_) = flush_interval.next() => {
374                    let writer = self.write_handle.lock().expect("mutex poisoned");
375                    self.flush(writer);
376                }
377
378                Some(_) = scan_interval.next() => {
379                    let writer = self.write_handle.lock().expect("mutex poisoned");
380                    self.scan(writer);
381                }
382            }
383        }
384        Ok(())
385    }
386}
387
388#[cfg(test)]
389mod tests {
390    use std::{num::NonZeroU64, slice::from_ref, time::Duration};
391
392    use futures::{StreamExt, future::ready};
393    use futures_util::stream;
394    use tokio::time;
395
396    use vector_lib::{
397        event::{EventContainer, MetricValue},
398        lookup::lookup_v2::OptionalValuePath,
399        metrics::Controller,
400        sink::VectorSink,
401    };
402
403    use super::*;
404    use crate::{
405        enrichment_tables::memory::{
406            internal_events::InternalMetricsConfig, source::MemorySourceConfig,
407        },
408        event::{Event, LogEvent},
409        test_util::components::{
410            SINK_TAGS, SOURCE_TAGS, run_and_assert_sink_compliance,
411            run_and_assert_source_compliance,
412        },
413    };
414
415    fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
416        let mut config = MemoryConfig::default();
417        modfn(&mut config);
418        config
419    }
420
421    #[test]
422    fn finds_row() {
423        let memory = Memory::new(Default::default());
424        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
425
426        let condition = Condition::Equals {
427            field: "key",
428            value: Value::from("test_key"),
429        };
430
431        assert_eq!(
432            Ok(ObjectMap::from([
433                ("key".into(), Value::from("test_key")),
434                ("ttl".into(), Value::from(memory.config.ttl)),
435                ("value".into(), Value::from(5)),
436            ])),
437            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
438        );
439    }
440
441    #[test]
442    fn calculates_ttl() {
443        let ttl = 100;
444        let secs_to_subtract = 10;
445        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
446        {
447            let mut handle = memory.write_handle.lock().unwrap();
448            handle.write_handle.update(
449                "test_key".to_string(),
450                MemoryEntry {
451                    value: "5".to_string(),
452                    update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
453                    ttl,
454                },
455            );
456            handle.write_handle.refresh();
457        }
458
459        let condition = Condition::Equals {
460            field: "key",
461            value: Value::from("test_key"),
462        };
463
464        assert_eq!(
465            Ok(ObjectMap::from([
466                ("key".into(), Value::from("test_key")),
467                ("ttl".into(), Value::from(ttl - secs_to_subtract)),
468                ("value".into(), Value::from(5)),
469            ])),
470            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
471        );
472    }
473
474    #[test]
475    fn calculates_ttl_override() {
476        let global_ttl = 100;
477        let ttl_override = 10;
478        let memory = Memory::new(build_memory_config(|c| {
479            c.ttl = global_ttl;
480            c.ttl_field = OptionalValuePath::new("ttl");
481        }));
482        memory.handle_value(ObjectMap::from([
483            (
484                "ttl_override".into(),
485                Value::from(ObjectMap::from([
486                    ("val".into(), Value::from(5)),
487                    ("ttl".into(), Value::from(ttl_override)),
488                ])),
489            ),
490            (
491                "default_ttl".into(),
492                Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
493            ),
494        ]));
495
496        let default_condition = Condition::Equals {
497            field: "key",
498            value: Value::from("default_ttl"),
499        };
500        let override_condition = Condition::Equals {
501            field: "key",
502            value: Value::from("ttl_override"),
503        };
504
505        assert_eq!(
506            Ok(ObjectMap::from([
507                ("key".into(), Value::from("default_ttl")),
508                ("ttl".into(), Value::from(global_ttl)),
509                (
510                    "value".into(),
511                    Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
512                ),
513            ])),
514            memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
515        );
516        assert_eq!(
517            Ok(ObjectMap::from([
518                ("key".into(), Value::from("ttl_override")),
519                ("ttl".into(), Value::from(ttl_override)),
520                (
521                    "value".into(),
522                    Value::from(ObjectMap::from([
523                        ("val".into(), Value::from(5)),
524                        ("ttl".into(), Value::from(ttl_override))
525                    ]))
526                ),
527            ])),
528            memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
529        );
530    }
531
532    #[test]
533    fn removes_expired_records_on_scan_interval() {
534        let ttl = 100;
535        let memory = Memory::new(build_memory_config(|c| {
536            c.ttl = ttl;
537        }));
538        {
539            let mut handle = memory.write_handle.lock().unwrap();
540            handle.write_handle.update(
541                "test_key".to_string(),
542                MemoryEntry {
543                    value: "5".to_string(),
544                    update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
545                    ttl,
546                },
547            );
548            handle.write_handle.refresh();
549        }
550
551        // Finds the value before scan
552        let condition = Condition::Equals {
553            field: "key",
554            value: Value::from("test_key"),
555        };
556        assert_eq!(
557            Ok(ObjectMap::from([
558                ("key".into(), Value::from("test_key")),
559                ("ttl".into(), Value::from(0)),
560                ("value".into(), Value::from(5)),
561            ])),
562            memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
563        );
564
565        // Force scan
566        let writer = memory.write_handle.lock().unwrap();
567        memory.scan(writer);
568
569        // The value is not present anymore
570        assert!(
571            memory
572                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
573                .unwrap()
574                .pop()
575                .is_none()
576        );
577    }
578
579    #[test]
580    fn does_not_show_values_before_flush_interval() {
581        let ttl = 100;
582        let memory = Memory::new(build_memory_config(|c| {
583            c.ttl = ttl;
584            c.flush_interval = Some(10);
585        }));
586        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
587
588        let condition = Condition::Equals {
589            field: "key",
590            value: Value::from("test_key"),
591        };
592
593        assert!(
594            memory
595                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
596                .unwrap()
597                .pop()
598                .is_none()
599        );
600    }
601
602    #[test]
603    fn updates_ttl_on_value_replacement() {
604        let ttl = 100;
605        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
606        {
607            let mut handle = memory.write_handle.lock().unwrap();
608            handle.write_handle.update(
609                "test_key".to_string(),
610                MemoryEntry {
611                    value: "5".to_string(),
612                    update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
613                    ttl,
614                },
615            );
616            handle.write_handle.refresh();
617        }
618        let condition = Condition::Equals {
619            field: "key",
620            value: Value::from("test_key"),
621        };
622
623        assert_eq!(
624            Ok(ObjectMap::from([
625                ("key".into(), Value::from("test_key")),
626                ("ttl".into(), Value::from(ttl / 2)),
627                ("value".into(), Value::from(5)),
628            ])),
629            memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
630        );
631
632        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
633
634        assert_eq!(
635            Ok(ObjectMap::from([
636                ("key".into(), Value::from("test_key")),
637                ("ttl".into(), Value::from(ttl)),
638                ("value".into(), Value::from(5)),
639            ])),
640            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
641        );
642    }
643
644    #[test]
645    fn ignores_all_values_over_byte_size_limit() {
646        let memory = Memory::new(build_memory_config(|c| {
647            c.max_byte_size = Some(1);
648        }));
649        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
650
651        let condition = Condition::Equals {
652            field: "key",
653            value: Value::from("test_key"),
654        };
655
656        assert!(
657            memory
658                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
659                .unwrap()
660                .pop()
661                .is_none()
662        );
663    }
664
665    #[test]
666    fn ignores_values_when_byte_size_limit_is_reached() {
667        let ttl = 100;
668        let memory = Memory::new(build_memory_config(|c| {
669            c.ttl = ttl;
670            c.max_byte_size = Some(150);
671        }));
672        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
673        memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
674
675        assert_eq!(
676            Ok(ObjectMap::from([
677                ("key".into(), Value::from("test_key")),
678                ("ttl".into(), Value::from(ttl)),
679                ("value".into(), Value::from(5)),
680            ])),
681            memory.find_table_row(
682                Case::Sensitive,
683                &[Condition::Equals {
684                    field: "key",
685                    value: Value::from("test_key")
686                }],
687                None,
688                None,
689                None
690            )
691        );
692
693        assert!(
694            memory
695                .find_table_rows(
696                    Case::Sensitive,
697                    &[Condition::Equals {
698                        field: "key",
699                        value: Value::from("rejected_key")
700                    }],
701                    None,
702                    None,
703                    None
704                )
705                .unwrap()
706                .pop()
707                .is_none()
708        );
709    }
710
711    #[test]
712    fn missing_key() {
713        let memory = Memory::new(Default::default());
714
715        let condition = Condition::Equals {
716            field: "key",
717            value: Value::from("test_key"),
718        };
719
720        assert!(
721            memory
722                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
723                .unwrap()
724                .pop()
725                .is_none()
726        );
727    }
728
729    #[tokio::test]
730    async fn sink_spec_compliance() {
731        let event = Event::Log(LogEvent::from(ObjectMap::from([(
732            "test_key".into(),
733            Value::from(5),
734        )])));
735
736        let memory = Memory::new(Default::default());
737
738        run_and_assert_sink_compliance(
739            VectorSink::from_event_streamsink(memory),
740            stream::once(ready(event)),
741            &SINK_TAGS,
742        )
743        .await;
744    }
745
746    #[tokio::test]
747    async fn flush_metrics_without_interval() {
748        let event = Event::Log(LogEvent::from(ObjectMap::from([(
749            "test_key".into(),
750            Value::from(5),
751        )])));
752
753        let memory = Memory::new(Default::default());
754
755        run_and_assert_sink_compliance(
756            VectorSink::from_event_streamsink(memory),
757            stream::once(ready(event)),
758            &SINK_TAGS,
759        )
760        .await;
761
762        let metrics = Controller::get().unwrap().capture_metrics();
763        let insertions_counter = metrics
764            .iter()
765            .find(|m| {
766                matches!(m.value(), MetricValue::Counter { .. })
767                    && m.name() == "memory_enrichment_table_insertions_total"
768            })
769            .expect("Insertions metric is missing!");
770        let MetricValue::Counter {
771            value: insertions_count,
772        } = insertions_counter.value()
773        else {
774            unreachable!();
775        };
776        let flushes_counter = metrics
777            .iter()
778            .find(|m| {
779                matches!(m.value(), MetricValue::Counter { .. })
780                    && m.name() == "memory_enrichment_table_flushes_total"
781            })
782            .expect("Flushes metric is missing!");
783        let MetricValue::Counter {
784            value: flushes_count,
785        } = flushes_counter.value()
786        else {
787            unreachable!();
788        };
789        let object_count_gauge = metrics
790            .iter()
791            .find(|m| {
792                matches!(m.value(), MetricValue::Gauge { .. })
793                    && m.name() == "memory_enrichment_table_objects_count"
794            })
795            .expect("Object count metric is missing!");
796        let MetricValue::Gauge {
797            value: object_count,
798        } = object_count_gauge.value()
799        else {
800            unreachable!();
801        };
802        let byte_size_gauge = metrics
803            .iter()
804            .find(|m| {
805                matches!(m.value(), MetricValue::Gauge { .. })
806                    && m.name() == "memory_enrichment_table_byte_size"
807            })
808            .expect("Byte size metric is missing!");
809        assert_eq!(*insertions_count, 1.0);
810        assert_eq!(*flushes_count, 1.0);
811        assert_eq!(*object_count, 1.0);
812        assert!(!byte_size_gauge.is_empty());
813    }
814
815    #[tokio::test]
816    async fn flush_metrics_with_interval() {
817        let event = Event::Log(LogEvent::from(ObjectMap::from([(
818            "test_key".into(),
819            Value::from(5),
820        )])));
821
822        let memory = Memory::new(build_memory_config(|c| {
823            c.flush_interval = Some(1);
824        }));
825
826        run_and_assert_sink_compliance(
827            VectorSink::from_event_streamsink(memory),
828            stream::iter(vec![event.clone(), event]).flat_map(|e| {
829                stream::once(async move {
830                    tokio::time::sleep(Duration::from_millis(600)).await;
831                    e
832                })
833            }),
834            &SINK_TAGS,
835        )
836        .await;
837
838        let metrics = Controller::get().unwrap().capture_metrics();
839        let insertions_counter = metrics
840            .iter()
841            .find(|m| {
842                matches!(m.value(), MetricValue::Counter { .. })
843                    && m.name() == "memory_enrichment_table_insertions_total"
844            })
845            .expect("Insertions metric is missing!");
846        let MetricValue::Counter {
847            value: insertions_count,
848        } = insertions_counter.value()
849        else {
850            unreachable!();
851        };
852        let flushes_counter = metrics
853            .iter()
854            .find(|m| {
855                matches!(m.value(), MetricValue::Counter { .. })
856                    && m.name() == "memory_enrichment_table_flushes_total"
857            })
858            .expect("Flushes metric is missing!");
859        let MetricValue::Counter {
860            value: flushes_count,
861        } = flushes_counter.value()
862        else {
863            unreachable!();
864        };
865        let object_count_gauge = metrics
866            .iter()
867            .find(|m| {
868                matches!(m.value(), MetricValue::Gauge { .. })
869                    && m.name() == "memory_enrichment_table_objects_count"
870            })
871            .expect("Object count metric is missing!");
872        let MetricValue::Gauge {
873            value: object_count,
874        } = object_count_gauge.value()
875        else {
876            unreachable!();
877        };
878        let byte_size_gauge = metrics
879            .iter()
880            .find(|m| {
881                matches!(m.value(), MetricValue::Gauge { .. })
882                    && m.name() == "memory_enrichment_table_byte_size"
883            })
884            .expect("Byte size metric is missing!");
885
886        assert_eq!(*insertions_count, 2.0);
887        // One is done right away and the next one after the interval
888        assert_eq!(*flushes_count, 2.0);
889        assert_eq!(*object_count, 1.0);
890        assert!(!byte_size_gauge.is_empty());
891    }
892
893    #[tokio::test]
894    async fn flush_metrics_with_key() {
895        let event = Event::Log(LogEvent::from(ObjectMap::from([(
896            "test_key".into(),
897            Value::from(5),
898        )])));
899
900        let memory = Memory::new(build_memory_config(|c| {
901            c.internal_metrics = InternalMetricsConfig {
902                include_key_tag: true,
903            };
904        }));
905
906        run_and_assert_sink_compliance(
907            VectorSink::from_event_streamsink(memory),
908            stream::once(ready(event)),
909            &SINK_TAGS,
910        )
911        .await;
912
913        let metrics = Controller::get().unwrap().capture_metrics();
914        let insertions_counter = metrics
915            .iter()
916            .find(|m| {
917                matches!(m.value(), MetricValue::Counter { .. })
918                    && m.name() == "memory_enrichment_table_insertions_total"
919            })
920            .expect("Insertions metric is missing!");
921
922        assert!(insertions_counter.tag_matches("key", "test_key"));
923    }
924
925    #[tokio::test]
926    async fn flush_metrics_without_key() {
927        let event = Event::Log(LogEvent::from(ObjectMap::from([(
928            "test_key".into(),
929            Value::from(5),
930        )])));
931
932        let memory = Memory::new(Default::default());
933
934        run_and_assert_sink_compliance(
935            VectorSink::from_event_streamsink(memory),
936            stream::once(ready(event)),
937            &SINK_TAGS,
938        )
939        .await;
940
941        let metrics = Controller::get().unwrap().capture_metrics();
942        let insertions_counter = metrics
943            .iter()
944            .find(|m| {
945                matches!(m.value(), MetricValue::Counter { .. })
946                    && m.name() == "memory_enrichment_table_insertions_total"
947            })
948            .expect("Insertions metric is missing!");
949
950        assert!(insertions_counter.tag_value("key").is_none());
951    }
952
953    #[tokio::test]
954    async fn source_spec_compliance() {
955        let mut memory_config = MemoryConfig::default();
956        memory_config.source_config = Some(MemorySourceConfig {
957            export_interval: NonZeroU64::try_from(1).unwrap(),
958            export_batch_size: None,
959            remove_after_export: false,
960            source_key: "test".to_string(),
961        });
962        let memory = memory_config.get_or_build_memory().await;
963        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
964
965        let mut events: Vec<Event> = run_and_assert_source_compliance(
966            memory_config,
967            time::Duration::from_secs(5),
968            &SOURCE_TAGS,
969        )
970        .await;
971
972        assert!(!events.is_empty());
973        let event = events.remove(0);
974        let log = event.as_log();
975
976        assert!(!log.value().is_empty());
977    }
978}