vector/enrichment_tables/memory/
table.rs

1#![allow(unsafe_op_in_unsafe_fn)] // TODO review ShallowCopy usage code and fix properly.
2
3use std::{
4    sync::{Arc, Mutex, MutexGuard},
5    time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use evmap::{
11    shallow_copy::CopyValue,
12    {self},
13};
14use evmap_derive::ShallowCopy;
15use futures::{StreamExt, stream::BoxStream};
16use thread_local::ThreadLocal;
17use tokio::{
18    sync::broadcast::{Receiver, Sender},
19    time::interval,
20};
21use tokio_stream::wrappers::IntervalStream;
22use vector_lib::{
23    ByteSizeOf, EstimatedJsonEncodedSizeOf,
24    config::LogNamespace,
25    enrichment::{Case, Condition, Error, IndexHandle, InternalError, Table},
26    event::{Event, EventStatus, Finalizable},
27    internal_event::{
28        ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
29    },
30    shutdown::ShutdownSignal,
31    sink::StreamSink,
32};
33use vrl::value::{KeyString, ObjectMap, Value};
34
35use super::source::MemorySource;
36use crate::{
37    SourceSender,
38    enrichment_tables::memory::{
39        MemoryConfig,
40        internal_events::{
41            MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed,
42            MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead,
43            MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
44        },
45    },
46};
47
48/// Single memory entry containing the value and TTL
49#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
50pub struct MemoryEntry {
51    value: String,
52    update_time: CopyValue<Instant>,
53    ttl: u64,
54}
55
56impl ByteSizeOf for MemoryEntry {
57    fn allocated_bytes(&self) -> usize {
58        self.value.size_of()
59    }
60}
61
62impl MemoryEntry {
63    pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, Error> {
64        let ttl = self
65            .ttl
66            .saturating_sub(now.duration_since(*self.update_time).as_secs());
67        Ok(ObjectMap::from([
68            (
69                KeyString::from("key"),
70                Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
71            ),
72            (
73                KeyString::from("value"),
74                // Unreachable in normal operation: `value` was serialized by `handle_value`.
75                serde_json::from_str::<Value>(&self.value).map_err(|source| Error::Internal {
76                    source: InternalError::FailedToDecode {
77                        details: source.to_string(),
78                    },
79                })?,
80            ),
81            (
82                KeyString::from("ttl"),
83                Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
84            ),
85        ]))
86    }
87
88    fn expired(&self, now: Instant) -> bool {
89        now.duration_since(*self.update_time).as_secs() > self.ttl
90    }
91}
92
93#[derive(Default)]
94struct MemoryMetadata {
95    byte_size: u64,
96}
97
98/// [`MemoryEntry`] combined with its key
99#[derive(Clone)]
100pub(super) struct MemoryEntryPair {
101    /// Key of this entry
102    pub(super) key: String,
103    /// The value of this entry
104    pub(super) entry: MemoryEntry,
105}
106
107// Used to ensure that these 2 are locked together
108pub(super) struct MemoryWriter {
109    pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
110    metadata: MemoryMetadata,
111}
112
113/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure.
114pub struct Memory {
115    read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
116    read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
117    pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
118    pub(super) config: MemoryConfig,
119    #[allow(dead_code)]
120    expired_items_receiver: Receiver<Vec<MemoryEntryPair>>,
121    expired_items_sender: Sender<Vec<MemoryEntryPair>>,
122}
123
124impl Memory {
125    /// Creates a new [Memory] based on the provided config.
126    pub fn new(config: MemoryConfig) -> Self {
127        let (read_handle, write_handle) = evmap::new();
128        // Buffer could only be used if source is stuck exporting available items, but in that case,
129        // publishing will not happen either, because the lock would be held, so this buffer is not
130        // that important
131        let (expired_tx, expired_rx) = tokio::sync::broadcast::channel(5);
132        Self {
133            config,
134            read_handle_factory: read_handle.factory(),
135            read_handle: ThreadLocal::new(),
136            write_handle: Arc::new(Mutex::new(MemoryWriter {
137                write_handle,
138                metadata: MemoryMetadata::default(),
139            })),
140            expired_items_sender: expired_tx,
141            expired_items_receiver: expired_rx,
142        }
143    }
144
145    pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
146        self.read_handle
147            .get_or(|| self.read_handle_factory.handle())
148    }
149
150    pub(super) fn subscribe_to_expired_items(&self) -> Receiver<Vec<MemoryEntryPair>> {
151        self.expired_items_sender.subscribe()
152    }
153
154    fn handle_value(&self, value: ObjectMap) {
155        let mut writer = self.write_handle.lock().expect("mutex poisoned");
156        let now = Instant::now();
157
158        for (k, value) in value.into_iter() {
159            let new_entry_key = String::from(k);
160            let Ok(v) = serde_json::to_string(&value) else {
161                emit!(MemoryEnrichmentTableInsertFailed {
162                    key: &new_entry_key,
163                    include_key_metric_tag: self.config.internal_metrics.include_key_tag
164                });
165                continue;
166            };
167            let new_entry = MemoryEntry {
168                value: v,
169                update_time: now.into(),
170                ttl: self
171                    .config
172                    .ttl_field
173                    .path
174                    .as_ref()
175                    .and_then(|p| value.get(p))
176                    .and_then(|v| v.as_integer())
177                    .map(|v| v as u64)
178                    .unwrap_or(self.config.ttl),
179            };
180            let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
181            if let Some(max_byte_size) = self.config.max_byte_size
182                && writer
183                    .metadata
184                    .byte_size
185                    .saturating_add(new_entry_size as u64)
186                    > max_byte_size
187            {
188                // Reject new entries
189                emit!(MemoryEnrichmentTableInsertFailed {
190                    key: &new_entry_key,
191                    include_key_metric_tag: self.config.internal_metrics.include_key_tag
192                });
193                continue;
194            }
195            writer.metadata.byte_size = writer
196                .metadata
197                .byte_size
198                .saturating_add(new_entry_size as u64);
199            emit!(MemoryEnrichmentTableInserted {
200                key: &new_entry_key,
201                include_key_metric_tag: self.config.internal_metrics.include_key_tag
202            });
203            writer.write_handle.update(new_entry_key, new_entry);
204        }
205
206        if self.config.flush_interval.is_none() {
207            self.flush(writer);
208        }
209    }
210
211    fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
212        let now = Instant::now();
213
214        let mut needs_flush = false;
215        // Since evmap holds 2 separate maps for the data, we are free to directly remove
216        // elements via the writer, while we are iterating the reader
217        // Refresh will happen only after we manually invoke it after iteration
218        if let Some(reader) = self.get_read_handle().read() {
219            for (k, v) in reader.iter() {
220                if let Some(entry) = v.get_one()
221                    && entry.expired(now)
222                {
223                    // Byte size is not reduced at this point, because the actual deletion
224                    // will only happen at refresh time
225                    writer.write_handle.empty(k.clone());
226                    emit!(MemoryEnrichmentTableTtlExpired {
227                        key: k,
228                        include_key_metric_tag: self.config.internal_metrics.include_key_tag
229                    });
230                    needs_flush = true;
231                }
232            }
233        };
234
235        needs_flush
236    }
237
238    fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
239        let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
240        if needs_flush {
241            self.flush(writer);
242        }
243    }
244
245    fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
246        // First publish items to be removed, if needed
247        if self
248            .config
249            .source_config
250            .as_ref()
251            .map(|c| c.export_expired_items)
252            .unwrap_or_default()
253        {
254            let pending_removal = writer
255                .write_handle
256                .pending()
257                .iter()
258                // We only use empty operation to remove keys
259                .filter_map(|o| match o {
260                    evmap::Operation::Empty(k) => Some(k),
261                    _ => None,
262                })
263                .filter_map(|key| {
264                    writer.write_handle.get_one(key).map(|v| MemoryEntryPair {
265                        key: key.to_string(),
266                        entry: v.clone(),
267                    })
268                })
269                .collect::<Vec<_>>();
270            if let Err(error) = self.expired_items_sender.send(pending_removal) {
271                error!(
272                    message = "Error exporting expired items from memory enrichment table.",
273                    error = %error,
274                );
275            }
276        }
277
278        writer.write_handle.refresh();
279        if let Some(reader) = self.get_read_handle().read() {
280            let mut byte_size = 0;
281            for (k, v) in reader.iter() {
282                byte_size += k.size_of() + v.get_one().size_of();
283            }
284            writer.metadata.byte_size = byte_size as u64;
285            emit!(MemoryEnrichmentTableFlushed {
286                new_objects_count: reader.len(),
287                new_byte_size: byte_size
288            });
289        }
290    }
291
292    pub(crate) fn as_source(
293        &self,
294        shutdown: ShutdownSignal,
295        out: SourceSender,
296        log_namespace: LogNamespace,
297    ) -> MemorySource {
298        MemorySource {
299            memory: self.clone(),
300            shutdown,
301            out,
302            log_namespace,
303        }
304    }
305}
306
307impl Clone for Memory {
308    fn clone(&self) -> Self {
309        Self {
310            read_handle_factory: self.read_handle_factory.clone(),
311            read_handle: ThreadLocal::new(),
312            write_handle: Arc::clone(&self.write_handle),
313            config: self.config.clone(),
314            expired_items_sender: self.expired_items_sender.clone(),
315            expired_items_receiver: self.expired_items_sender.subscribe(),
316        }
317    }
318}
319
320impl Table for Memory {
321    fn find_table_row<'a>(
322        &self,
323        case: Case,
324        condition: &'a [Condition<'a>],
325        select: Option<&'a [String]>,
326        wildcard: Option<&Value>,
327        index: Option<IndexHandle>,
328    ) -> Result<ObjectMap, Error> {
329        let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
330
331        match rows.pop() {
332            Some(row) if rows.is_empty() => Ok(row),
333            Some(_) => Err(Error::MoreThanOneRowFound),
334            None => Err(Error::NoRowsFound),
335        }
336    }
337
338    fn find_table_rows<'a>(
339        &self,
340        _case: Case,
341        condition: &'a [Condition<'a>],
342        _select: Option<&'a [String]>,
343        _wildcard: Option<&Value>,
344        _index: Option<IndexHandle>,
345    ) -> Result<Vec<ObjectMap>, Error> {
346        match condition.first() {
347            Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed),
348            Some(Condition::Equals { value, .. }) => {
349                let key = value.to_string_lossy();
350                match self.get_read_handle().get_one(key.as_ref()) {
351                    Some(row) => {
352                        emit!(MemoryEnrichmentTableRead {
353                            key: &key,
354                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
355                        });
356                        row.as_object_map(Instant::now(), &key).map(|r| vec![r])
357                    }
358                    None => {
359                        emit!(MemoryEnrichmentTableReadFailed {
360                            key: &key,
361                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
362                        });
363                        Ok(Default::default())
364                    }
365                }
366            }
367            Some(_) => Err(Error::OnlyEqualityConditionAllowed),
368            None => Err(Error::MissingCondition { kind: "Key" }),
369        }
370    }
371
372    fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, Error> {
373        match fields.len() {
374            0 => Err(Error::MissingRequiredField { field: "Key" }),
375            1 => Ok(IndexHandle(0)),
376            _ => Err(Error::OnlyOneFieldAllowed),
377        }
378    }
379
380    /// Returns a list of the field names that are in each index
381    fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
382        Vec::new()
383    }
384
385    /// Doesn't need reload, data is written directly
386    fn needs_reload(&self) -> bool {
387        false
388    }
389}
390
391impl std::fmt::Debug for Memory {
392    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393        write!(f, "Memory {} row(s)", self.get_read_handle().len())
394    }
395}
396
397#[async_trait]
398impl StreamSink<Event> for Memory {
399    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
400        let events_sent = register!(EventsSent::from(Output(None)));
401        let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
402        let mut flush_interval = IntervalStream::new(interval(
403            self.config
404                .flush_interval
405                .map(Duration::from_secs)
406                .unwrap_or(Duration::MAX),
407        ));
408        let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
409            self.config.scan_interval.into(),
410        )));
411
412        loop {
413            tokio::select! {
414                event = input.next() => {
415                    let mut event = if let Some(event) = event {
416                        event
417                    } else {
418                        break;
419                    };
420                    let event_byte_size = event.estimated_json_encoded_size_of();
421
422                    let finalizers = event.take_finalizers();
423
424                    // Panic: This sink only accepts Logs, so this should never panic
425                    let log = event.into_log();
426
427                    if let (Value::Object(map), _) = log.into_parts() {
428                        self.handle_value(map)
429                    };
430
431                    finalizers.update_status(EventStatus::Delivered);
432                    events_sent.emit(CountByteSize(1, event_byte_size));
433                    bytes_sent.emit(ByteSize(event_byte_size.get()));
434                }
435
436                Some(_) = flush_interval.next() => {
437                    let writer = self.write_handle.lock().expect("mutex poisoned");
438                    self.flush(writer);
439                }
440
441                Some(_) = scan_interval.next() => {
442                    let writer = self.write_handle.lock().expect("mutex poisoned");
443                    self.scan(writer);
444                }
445            }
446        }
447        Ok(())
448    }
449}
450
451#[cfg(test)]
452mod tests {
453    use std::{num::NonZeroU64, slice::from_ref, time::Duration};
454
455    use futures::{StreamExt, future::ready};
456    use futures_util::stream;
457    use tokio::time;
458
459    use vector_lib::{
460        event::{EventContainer, MetricValue},
461        lookup::lookup_v2::OptionalValuePath,
462        metrics::Controller,
463        sink::VectorSink,
464    };
465
466    use super::*;
467    use crate::{
468        enrichment_tables::memory::{
469            config::MemorySourceConfig, internal_events::InternalMetricsConfig,
470        },
471        event::{Event, LogEvent},
472        test_util::components::{
473            SINK_TAGS, SOURCE_TAGS, run_and_assert_sink_compliance,
474            run_and_assert_source_compliance,
475        },
476    };
477
478    fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
479        let mut config = MemoryConfig::default();
480        modfn(&mut config);
481        config
482    }
483
484    #[test]
485    fn finds_row() {
486        let memory = Memory::new(Default::default());
487        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
488
489        let condition = Condition::Equals {
490            field: "key",
491            value: Value::from("test_key"),
492        };
493
494        assert_eq!(
495            Ok(ObjectMap::from([
496                ("key".into(), Value::from("test_key")),
497                ("ttl".into(), Value::from(memory.config.ttl)),
498                ("value".into(), Value::from(5)),
499            ])),
500            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
501        );
502    }
503
504    #[test]
505    fn calculates_ttl() {
506        let ttl = 100;
507        let secs_to_subtract = 10;
508        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
509        {
510            let mut handle = memory.write_handle.lock().unwrap();
511            handle.write_handle.update(
512                "test_key".to_string(),
513                MemoryEntry {
514                    value: "5".to_string(),
515                    update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
516                    ttl,
517                },
518            );
519            handle.write_handle.refresh();
520        }
521
522        let condition = Condition::Equals {
523            field: "key",
524            value: Value::from("test_key"),
525        };
526
527        assert_eq!(
528            Ok(ObjectMap::from([
529                ("key".into(), Value::from("test_key")),
530                ("ttl".into(), Value::from(ttl - secs_to_subtract)),
531                ("value".into(), Value::from(5)),
532            ])),
533            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
534        );
535    }
536
537    #[test]
538    fn calculates_ttl_override() {
539        let global_ttl = 100;
540        let ttl_override = 10;
541        let memory = Memory::new(build_memory_config(|c| {
542            c.ttl = global_ttl;
543            c.ttl_field = OptionalValuePath::new("ttl");
544        }));
545        memory.handle_value(ObjectMap::from([
546            (
547                "ttl_override".into(),
548                Value::from(ObjectMap::from([
549                    ("val".into(), Value::from(5)),
550                    ("ttl".into(), Value::from(ttl_override)),
551                ])),
552            ),
553            (
554                "default_ttl".into(),
555                Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
556            ),
557        ]));
558
559        let default_condition = Condition::Equals {
560            field: "key",
561            value: Value::from("default_ttl"),
562        };
563        let override_condition = Condition::Equals {
564            field: "key",
565            value: Value::from("ttl_override"),
566        };
567
568        assert_eq!(
569            Ok(ObjectMap::from([
570                ("key".into(), Value::from("default_ttl")),
571                ("ttl".into(), Value::from(global_ttl)),
572                (
573                    "value".into(),
574                    Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
575                ),
576            ])),
577            memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
578        );
579        assert_eq!(
580            Ok(ObjectMap::from([
581                ("key".into(), Value::from("ttl_override")),
582                ("ttl".into(), Value::from(ttl_override)),
583                (
584                    "value".into(),
585                    Value::from(ObjectMap::from([
586                        ("val".into(), Value::from(5)),
587                        ("ttl".into(), Value::from(ttl_override))
588                    ]))
589                ),
590            ])),
591            memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
592        );
593    }
594
595    #[test]
596    fn removes_expired_records_on_scan_interval() {
597        let ttl = 100;
598        let memory = Memory::new(build_memory_config(|c| {
599            c.ttl = ttl;
600        }));
601        {
602            let mut handle = memory.write_handle.lock().unwrap();
603            handle.write_handle.update(
604                "test_key".to_string(),
605                MemoryEntry {
606                    value: "5".to_string(),
607                    update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
608                    ttl,
609                },
610            );
611            handle.write_handle.refresh();
612        }
613
614        // Finds the value before scan
615        let condition = Condition::Equals {
616            field: "key",
617            value: Value::from("test_key"),
618        };
619        assert_eq!(
620            Ok(ObjectMap::from([
621                ("key".into(), Value::from("test_key")),
622                ("ttl".into(), Value::from(0)),
623                ("value".into(), Value::from(5)),
624            ])),
625            memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
626        );
627
628        // Force scan
629        let writer = memory.write_handle.lock().unwrap();
630        memory.scan(writer);
631
632        // The value is not present anymore
633        assert!(
634            memory
635                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
636                .unwrap()
637                .pop()
638                .is_none()
639        );
640    }
641
642    #[test]
643    fn does_not_show_values_before_flush_interval() {
644        let ttl = 100;
645        let memory = Memory::new(build_memory_config(|c| {
646            c.ttl = ttl;
647            c.flush_interval = Some(10);
648        }));
649        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
650
651        let condition = Condition::Equals {
652            field: "key",
653            value: Value::from("test_key"),
654        };
655
656        assert!(
657            memory
658                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
659                .unwrap()
660                .pop()
661                .is_none()
662        );
663    }
664
665    #[test]
666    fn updates_ttl_on_value_replacement() {
667        let ttl = 100;
668        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
669        {
670            let mut handle = memory.write_handle.lock().unwrap();
671            handle.write_handle.update(
672                "test_key".to_string(),
673                MemoryEntry {
674                    value: "5".to_string(),
675                    update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
676                    ttl,
677                },
678            );
679            handle.write_handle.refresh();
680        }
681        let condition = Condition::Equals {
682            field: "key",
683            value: Value::from("test_key"),
684        };
685
686        assert_eq!(
687            Ok(ObjectMap::from([
688                ("key".into(), Value::from("test_key")),
689                ("ttl".into(), Value::from(ttl / 2)),
690                ("value".into(), Value::from(5)),
691            ])),
692            memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
693        );
694
695        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
696
697        assert_eq!(
698            Ok(ObjectMap::from([
699                ("key".into(), Value::from("test_key")),
700                ("ttl".into(), Value::from(ttl)),
701                ("value".into(), Value::from(5)),
702            ])),
703            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
704        );
705    }
706
707    #[test]
708    fn ignores_all_values_over_byte_size_limit() {
709        let memory = Memory::new(build_memory_config(|c| {
710            c.max_byte_size = Some(1);
711        }));
712        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
713
714        let condition = Condition::Equals {
715            field: "key",
716            value: Value::from("test_key"),
717        };
718
719        assert!(
720            memory
721                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
722                .unwrap()
723                .pop()
724                .is_none()
725        );
726    }
727
728    #[test]
729    fn ignores_values_when_byte_size_limit_is_reached() {
730        let ttl = 100;
731        let memory = Memory::new(build_memory_config(|c| {
732            c.ttl = ttl;
733            c.max_byte_size = Some(150);
734        }));
735        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
736        memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
737
738        assert_eq!(
739            Ok(ObjectMap::from([
740                ("key".into(), Value::from("test_key")),
741                ("ttl".into(), Value::from(ttl)),
742                ("value".into(), Value::from(5)),
743            ])),
744            memory.find_table_row(
745                Case::Sensitive,
746                &[Condition::Equals {
747                    field: "key",
748                    value: Value::from("test_key")
749                }],
750                None,
751                None,
752                None
753            )
754        );
755
756        assert!(
757            memory
758                .find_table_rows(
759                    Case::Sensitive,
760                    &[Condition::Equals {
761                        field: "key",
762                        value: Value::from("rejected_key")
763                    }],
764                    None,
765                    None,
766                    None
767                )
768                .unwrap()
769                .pop()
770                .is_none()
771        );
772    }
773
774    #[test]
775    fn missing_key() {
776        let memory = Memory::new(Default::default());
777
778        let condition = Condition::Equals {
779            field: "key",
780            value: Value::from("test_key"),
781        };
782
783        assert!(
784            memory
785                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
786                .unwrap()
787                .pop()
788                .is_none()
789        );
790    }
791
792    #[tokio::test]
793    async fn sink_spec_compliance() {
794        let event = Event::Log(LogEvent::from(ObjectMap::from([(
795            "test_key".into(),
796            Value::from(5),
797        )])));
798
799        let memory = Memory::new(Default::default());
800
801        run_and_assert_sink_compliance(
802            VectorSink::from_event_streamsink(memory),
803            stream::once(ready(event)),
804            &SINK_TAGS,
805        )
806        .await;
807    }
808
809    #[tokio::test]
810    async fn flush_metrics_without_interval() {
811        let event = Event::Log(LogEvent::from(ObjectMap::from([(
812            "test_key".into(),
813            Value::from(5),
814        )])));
815
816        let memory = Memory::new(Default::default());
817
818        run_and_assert_sink_compliance(
819            VectorSink::from_event_streamsink(memory),
820            stream::once(ready(event)),
821            &SINK_TAGS,
822        )
823        .await;
824
825        let metrics = Controller::get().unwrap().capture_metrics();
826        let insertions_counter = metrics
827            .iter()
828            .find(|m| {
829                matches!(m.value(), MetricValue::Counter { .. })
830                    && m.name() == "memory_enrichment_table_insertions_total"
831            })
832            .expect("Insertions metric is missing!");
833        let MetricValue::Counter {
834            value: insertions_count,
835        } = insertions_counter.value()
836        else {
837            unreachable!();
838        };
839        let flushes_counter = metrics
840            .iter()
841            .find(|m| {
842                matches!(m.value(), MetricValue::Counter { .. })
843                    && m.name() == "memory_enrichment_table_flushes_total"
844            })
845            .expect("Flushes metric is missing!");
846        let MetricValue::Counter {
847            value: flushes_count,
848        } = flushes_counter.value()
849        else {
850            unreachable!();
851        };
852        let object_count_gauge = metrics
853            .iter()
854            .find(|m| {
855                matches!(m.value(), MetricValue::Gauge { .. })
856                    && m.name() == "memory_enrichment_table_objects_count"
857            })
858            .expect("Object count metric is missing!");
859        let MetricValue::Gauge {
860            value: object_count,
861        } = object_count_gauge.value()
862        else {
863            unreachable!();
864        };
865        let byte_size_gauge = metrics
866            .iter()
867            .find(|m| {
868                matches!(m.value(), MetricValue::Gauge { .. })
869                    && m.name() == "memory_enrichment_table_byte_size"
870            })
871            .expect("Byte size metric is missing!");
872        assert_eq!(*insertions_count, 1.0);
873        assert_eq!(*flushes_count, 1.0);
874        assert_eq!(*object_count, 1.0);
875        assert!(!byte_size_gauge.is_empty());
876    }
877
878    #[tokio::test]
879    async fn flush_metrics_with_interval() {
880        let event = Event::Log(LogEvent::from(ObjectMap::from([(
881            "test_key".into(),
882            Value::from(5),
883        )])));
884
885        let memory = Memory::new(build_memory_config(|c| {
886            c.flush_interval = Some(1);
887        }));
888
889        run_and_assert_sink_compliance(
890            VectorSink::from_event_streamsink(memory),
891            stream::iter(vec![event.clone(), event]).flat_map(|e| {
892                stream::once(async move {
893                    tokio::time::sleep(Duration::from_millis(600)).await;
894                    e
895                })
896            }),
897            &SINK_TAGS,
898        )
899        .await;
900
901        let metrics = Controller::get().unwrap().capture_metrics();
902        let insertions_counter = metrics
903            .iter()
904            .find(|m| {
905                matches!(m.value(), MetricValue::Counter { .. })
906                    && m.name() == "memory_enrichment_table_insertions_total"
907            })
908            .expect("Insertions metric is missing!");
909        let MetricValue::Counter {
910            value: insertions_count,
911        } = insertions_counter.value()
912        else {
913            unreachable!();
914        };
915        let flushes_counter = metrics
916            .iter()
917            .find(|m| {
918                matches!(m.value(), MetricValue::Counter { .. })
919                    && m.name() == "memory_enrichment_table_flushes_total"
920            })
921            .expect("Flushes metric is missing!");
922        let MetricValue::Counter {
923            value: flushes_count,
924        } = flushes_counter.value()
925        else {
926            unreachable!();
927        };
928        let object_count_gauge = metrics
929            .iter()
930            .find(|m| {
931                matches!(m.value(), MetricValue::Gauge { .. })
932                    && m.name() == "memory_enrichment_table_objects_count"
933            })
934            .expect("Object count metric is missing!");
935        let MetricValue::Gauge {
936            value: object_count,
937        } = object_count_gauge.value()
938        else {
939            unreachable!();
940        };
941        let byte_size_gauge = metrics
942            .iter()
943            .find(|m| {
944                matches!(m.value(), MetricValue::Gauge { .. })
945                    && m.name() == "memory_enrichment_table_byte_size"
946            })
947            .expect("Byte size metric is missing!");
948
949        assert_eq!(*insertions_count, 2.0);
950        // One is done right away and the next one after the interval
951        assert_eq!(*flushes_count, 2.0);
952        assert_eq!(*object_count, 1.0);
953        assert!(!byte_size_gauge.is_empty());
954    }
955
956    #[tokio::test]
957    async fn flush_metrics_with_key() {
958        let event = Event::Log(LogEvent::from(ObjectMap::from([(
959            "test_key".into(),
960            Value::from(5),
961        )])));
962
963        let memory = Memory::new(build_memory_config(|c| {
964            c.internal_metrics = InternalMetricsConfig {
965                include_key_tag: true,
966            };
967        }));
968
969        run_and_assert_sink_compliance(
970            VectorSink::from_event_streamsink(memory),
971            stream::once(ready(event)),
972            &SINK_TAGS,
973        )
974        .await;
975
976        let metrics = Controller::get().unwrap().capture_metrics();
977        let insertions_counter = metrics
978            .iter()
979            .find(|m| {
980                matches!(m.value(), MetricValue::Counter { .. })
981                    && m.name() == "memory_enrichment_table_insertions_total"
982            })
983            .expect("Insertions metric is missing!");
984
985        assert!(insertions_counter.tag_matches("key", "test_key"));
986    }
987
988    #[tokio::test]
989    async fn flush_metrics_without_key() {
990        let event = Event::Log(LogEvent::from(ObjectMap::from([(
991            "test_key".into(),
992            Value::from(5),
993        )])));
994
995        let memory = Memory::new(Default::default());
996
997        run_and_assert_sink_compliance(
998            VectorSink::from_event_streamsink(memory),
999            stream::once(ready(event)),
1000            &SINK_TAGS,
1001        )
1002        .await;
1003
1004        let metrics = Controller::get().unwrap().capture_metrics();
1005        let insertions_counter = metrics
1006            .iter()
1007            .find(|m| {
1008                matches!(m.value(), MetricValue::Counter { .. })
1009                    && m.name() == "memory_enrichment_table_insertions_total"
1010            })
1011            .expect("Insertions metric is missing!");
1012
1013        assert!(insertions_counter.tag_value("key").is_none());
1014    }
1015
1016    #[tokio::test]
1017    async fn source_spec_compliance() {
1018        let mut memory_config = MemoryConfig::default();
1019        memory_config.source_config = Some(MemorySourceConfig {
1020            export_interval: Some(NonZeroU64::try_from(1).unwrap()),
1021            export_batch_size: None,
1022            remove_after_export: false,
1023            export_expired_items: false,
1024            source_key: "test".to_string(),
1025        });
1026        let memory = memory_config.get_or_build_memory().await;
1027        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
1028
1029        let mut events: Vec<Event> = run_and_assert_source_compliance(
1030            memory_config,
1031            time::Duration::from_secs(5),
1032            &SOURCE_TAGS,
1033        )
1034        .await;
1035
1036        assert!(!events.is_empty());
1037        let event = events.remove(0);
1038        let log = event.as_log();
1039
1040        assert!(!log.value().is_empty());
1041    }
1042}