vector/enrichment_tables/memory/
table.rs

1#![allow(unsafe_op_in_unsafe_fn)] // TODO review ShallowCopy usage code and fix properly.
2
3use std::{
4    sync::{Arc, Mutex, MutexGuard},
5    time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use evmap::{
11    shallow_copy::CopyValue,
12    {self},
13};
14use evmap_derive::ShallowCopy;
15use futures::{StreamExt, stream::BoxStream};
16use thread_local::ThreadLocal;
17use tokio::{
18    sync::broadcast::{Receiver, Sender},
19    time::interval,
20};
21use tokio_stream::wrappers::IntervalStream;
22use vector_lib::{
23    ByteSizeOf, EstimatedJsonEncodedSizeOf,
24    config::LogNamespace,
25    enrichment::{Case, Condition, IndexHandle, Table},
26    event::{Event, EventStatus, Finalizable},
27    internal_event::{
28        ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
29    },
30    shutdown::ShutdownSignal,
31    sink::StreamSink,
32};
33use vrl::value::{KeyString, ObjectMap, Value};
34
35use super::source::MemorySource;
36use crate::{
37    SourceSender,
38    enrichment_tables::memory::{
39        MemoryConfig,
40        internal_events::{
41            MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed,
42            MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead,
43            MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
44        },
45    },
46};
47
48/// Single memory entry containing the value and TTL
49#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
50pub struct MemoryEntry {
51    value: String,
52    update_time: CopyValue<Instant>,
53    ttl: u64,
54}
55
56impl ByteSizeOf for MemoryEntry {
57    fn allocated_bytes(&self) -> usize {
58        self.value.size_of()
59    }
60}
61
62impl MemoryEntry {
63    pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, String> {
64        let ttl = self
65            .ttl
66            .saturating_sub(now.duration_since(*self.update_time).as_secs());
67        Ok(ObjectMap::from([
68            (
69                KeyString::from("key"),
70                Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
71            ),
72            (
73                KeyString::from("value"),
74                serde_json::from_str::<Value>(&self.value)
75                    .map_err(|_| "Failed to read value from memory!")?,
76            ),
77            (
78                KeyString::from("ttl"),
79                Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
80            ),
81        ]))
82    }
83
84    fn expired(&self, now: Instant) -> bool {
85        now.duration_since(*self.update_time).as_secs() > self.ttl
86    }
87}
88
89#[derive(Default)]
90struct MemoryMetadata {
91    byte_size: u64,
92}
93
94/// [`MemoryEntry`] combined with its key
95#[derive(Clone)]
96pub(super) struct MemoryEntryPair {
97    /// Key of this entry
98    pub(super) key: String,
99    /// The value of this entry
100    pub(super) entry: MemoryEntry,
101}
102
103// Used to ensure that these 2 are locked together
104pub(super) struct MemoryWriter {
105    pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
106    metadata: MemoryMetadata,
107}
108
109/// A struct that implements [vector_lib::enrichment::Table] to handle loading enrichment data from a memory structure.
110pub struct Memory {
111    read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
112    read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
113    pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
114    pub(super) config: MemoryConfig,
115    #[allow(dead_code)]
116    expired_items_receiver: Receiver<Vec<MemoryEntryPair>>,
117    expired_items_sender: Sender<Vec<MemoryEntryPair>>,
118}
119
120impl Memory {
121    /// Creates a new [Memory] based on the provided config.
122    pub fn new(config: MemoryConfig) -> Self {
123        let (read_handle, write_handle) = evmap::new();
124        // Buffer could only be used if source is stuck exporting available items, but in that case,
125        // publishing will not happen either, because the lock would be held, so this buffer is not
126        // that important
127        let (expired_tx, expired_rx) = tokio::sync::broadcast::channel(5);
128        Self {
129            config,
130            read_handle_factory: read_handle.factory(),
131            read_handle: ThreadLocal::new(),
132            write_handle: Arc::new(Mutex::new(MemoryWriter {
133                write_handle,
134                metadata: MemoryMetadata::default(),
135            })),
136            expired_items_sender: expired_tx,
137            expired_items_receiver: expired_rx,
138        }
139    }
140
141    pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
142        self.read_handle
143            .get_or(|| self.read_handle_factory.handle())
144    }
145
146    pub(super) fn subscribe_to_expired_items(&self) -> Receiver<Vec<MemoryEntryPair>> {
147        self.expired_items_sender.subscribe()
148    }
149
150    fn handle_value(&self, value: ObjectMap) {
151        let mut writer = self.write_handle.lock().expect("mutex poisoned");
152        let now = Instant::now();
153
154        for (k, value) in value.into_iter() {
155            let new_entry_key = String::from(k);
156            let Ok(v) = serde_json::to_string(&value) else {
157                emit!(MemoryEnrichmentTableInsertFailed {
158                    key: &new_entry_key,
159                    include_key_metric_tag: self.config.internal_metrics.include_key_tag
160                });
161                continue;
162            };
163            let new_entry = MemoryEntry {
164                value: v,
165                update_time: now.into(),
166                ttl: self
167                    .config
168                    .ttl_field
169                    .path
170                    .as_ref()
171                    .and_then(|p| value.get(p))
172                    .and_then(|v| v.as_integer())
173                    .map(|v| v as u64)
174                    .unwrap_or(self.config.ttl),
175            };
176            let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
177            if let Some(max_byte_size) = self.config.max_byte_size
178                && writer
179                    .metadata
180                    .byte_size
181                    .saturating_add(new_entry_size as u64)
182                    > max_byte_size
183            {
184                // Reject new entries
185                emit!(MemoryEnrichmentTableInsertFailed {
186                    key: &new_entry_key,
187                    include_key_metric_tag: self.config.internal_metrics.include_key_tag
188                });
189                continue;
190            }
191            writer.metadata.byte_size = writer
192                .metadata
193                .byte_size
194                .saturating_add(new_entry_size as u64);
195            emit!(MemoryEnrichmentTableInserted {
196                key: &new_entry_key,
197                include_key_metric_tag: self.config.internal_metrics.include_key_tag
198            });
199            writer.write_handle.update(new_entry_key, new_entry);
200        }
201
202        if self.config.flush_interval.is_none() {
203            self.flush(writer);
204        }
205    }
206
207    fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
208        let now = Instant::now();
209
210        let mut needs_flush = false;
211        // Since evmap holds 2 separate maps for the data, we are free to directly remove
212        // elements via the writer, while we are iterating the reader
213        // Refresh will happen only after we manually invoke it after iteration
214        if let Some(reader) = self.get_read_handle().read() {
215            for (k, v) in reader.iter() {
216                if let Some(entry) = v.get_one()
217                    && entry.expired(now)
218                {
219                    // Byte size is not reduced at this point, because the actual deletion
220                    // will only happen at refresh time
221                    writer.write_handle.empty(k.clone());
222                    emit!(MemoryEnrichmentTableTtlExpired {
223                        key: k,
224                        include_key_metric_tag: self.config.internal_metrics.include_key_tag
225                    });
226                    needs_flush = true;
227                }
228            }
229        };
230
231        needs_flush
232    }
233
234    fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
235        let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
236        if needs_flush {
237            self.flush(writer);
238        }
239    }
240
241    fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
242        // First publish items to be removed, if needed
243        if self
244            .config
245            .source_config
246            .as_ref()
247            .map(|c| c.export_expired_items)
248            .unwrap_or_default()
249        {
250            let pending_removal = writer
251                .write_handle
252                .pending()
253                .iter()
254                // We only use empty operation to remove keys
255                .filter_map(|o| match o {
256                    evmap::Operation::Empty(k) => Some(k),
257                    _ => None,
258                })
259                .filter_map(|key| {
260                    writer.write_handle.get_one(key).map(|v| MemoryEntryPair {
261                        key: key.to_string(),
262                        entry: v.clone(),
263                    })
264                })
265                .collect::<Vec<_>>();
266            if let Err(error) = self.expired_items_sender.send(pending_removal) {
267                error!(
268                    message = "Error exporting expired items from memory enrichment table.",
269                    error = %error,
270                );
271            }
272        }
273
274        writer.write_handle.refresh();
275        if let Some(reader) = self.get_read_handle().read() {
276            let mut byte_size = 0;
277            for (k, v) in reader.iter() {
278                byte_size += k.size_of() + v.get_one().size_of();
279            }
280            writer.metadata.byte_size = byte_size as u64;
281            emit!(MemoryEnrichmentTableFlushed {
282                new_objects_count: reader.len(),
283                new_byte_size: byte_size
284            });
285        }
286    }
287
288    pub(crate) fn as_source(
289        &self,
290        shutdown: ShutdownSignal,
291        out: SourceSender,
292        log_namespace: LogNamespace,
293    ) -> MemorySource {
294        MemorySource {
295            memory: self.clone(),
296            shutdown,
297            out,
298            log_namespace,
299        }
300    }
301}
302
303impl Clone for Memory {
304    fn clone(&self) -> Self {
305        Self {
306            read_handle_factory: self.read_handle_factory.clone(),
307            read_handle: ThreadLocal::new(),
308            write_handle: Arc::clone(&self.write_handle),
309            config: self.config.clone(),
310            expired_items_sender: self.expired_items_sender.clone(),
311            expired_items_receiver: self.expired_items_sender.subscribe(),
312        }
313    }
314}
315
316impl Table for Memory {
317    fn find_table_row<'a>(
318        &self,
319        case: Case,
320        condition: &'a [Condition<'a>],
321        select: Option<&'a [String]>,
322        wildcard: Option<&Value>,
323        index: Option<IndexHandle>,
324    ) -> Result<ObjectMap, String> {
325        let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
326
327        match rows.pop() {
328            Some(row) if rows.is_empty() => Ok(row),
329            Some(_) => Err("More than 1 row found".to_string()),
330            None => Err("Key not found".to_string()),
331        }
332    }
333
334    fn find_table_rows<'a>(
335        &self,
336        _case: Case,
337        condition: &'a [Condition<'a>],
338        _select: Option<&'a [String]>,
339        _wildcard: Option<&Value>,
340        _index: Option<IndexHandle>,
341    ) -> Result<Vec<ObjectMap>, String> {
342        match condition.first() {
343            Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()),
344            Some(Condition::Equals { value, .. }) => {
345                let key = value.to_string_lossy();
346                match self.get_read_handle().get_one(key.as_ref()) {
347                    Some(row) => {
348                        emit!(MemoryEnrichmentTableRead {
349                            key: &key,
350                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
351                        });
352                        row.as_object_map(Instant::now(), &key).map(|r| vec![r])
353                    }
354                    None => {
355                        emit!(MemoryEnrichmentTableReadFailed {
356                            key: &key,
357                            include_key_metric_tag: self.config.internal_metrics.include_key_tag
358                        });
359                        Ok(Default::default())
360                    }
361                }
362            }
363            Some(_) => Err("Only equality condition is allowed".to_string()),
364            None => Err("Key condition must be specified".to_string()),
365        }
366    }
367
368    fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, String> {
369        match fields.len() {
370            0 => Err("Key field is required".to_string()),
371            1 => Ok(IndexHandle(0)),
372            _ => Err("Only one field is allowed".to_string()),
373        }
374    }
375
376    /// Returns a list of the field names that are in each index
377    fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
378        Vec::new()
379    }
380
381    /// Doesn't need reload, data is written directly
382    fn needs_reload(&self) -> bool {
383        false
384    }
385}
386
387impl std::fmt::Debug for Memory {
388    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389        write!(f, "Memory {} row(s)", self.get_read_handle().len())
390    }
391}
392
393#[async_trait]
394impl StreamSink<Event> for Memory {
395    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
396        let events_sent = register!(EventsSent::from(Output(None)));
397        let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
398        let mut flush_interval = IntervalStream::new(interval(
399            self.config
400                .flush_interval
401                .map(Duration::from_secs)
402                .unwrap_or(Duration::MAX),
403        ));
404        let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
405            self.config.scan_interval.into(),
406        )));
407
408        loop {
409            tokio::select! {
410                event = input.next() => {
411                    let mut event = if let Some(event) = event {
412                        event
413                    } else {
414                        break;
415                    };
416                    let event_byte_size = event.estimated_json_encoded_size_of();
417
418                    let finalizers = event.take_finalizers();
419
420                    // Panic: This sink only accepts Logs, so this should never panic
421                    let log = event.into_log();
422
423                    if let (Value::Object(map), _) = log.into_parts() {
424                        self.handle_value(map)
425                    };
426
427                    finalizers.update_status(EventStatus::Delivered);
428                    events_sent.emit(CountByteSize(1, event_byte_size));
429                    bytes_sent.emit(ByteSize(event_byte_size.get()));
430                }
431
432                Some(_) = flush_interval.next() => {
433                    let writer = self.write_handle.lock().expect("mutex poisoned");
434                    self.flush(writer);
435                }
436
437                Some(_) = scan_interval.next() => {
438                    let writer = self.write_handle.lock().expect("mutex poisoned");
439                    self.scan(writer);
440                }
441            }
442        }
443        Ok(())
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use std::{num::NonZeroU64, slice::from_ref, time::Duration};
450
451    use futures::{StreamExt, future::ready};
452    use futures_util::stream;
453    use tokio::time;
454
455    use vector_lib::{
456        event::{EventContainer, MetricValue},
457        lookup::lookup_v2::OptionalValuePath,
458        metrics::Controller,
459        sink::VectorSink,
460    };
461
462    use super::*;
463    use crate::{
464        enrichment_tables::memory::{
465            config::MemorySourceConfig, internal_events::InternalMetricsConfig,
466        },
467        event::{Event, LogEvent},
468        test_util::components::{
469            SINK_TAGS, SOURCE_TAGS, run_and_assert_sink_compliance,
470            run_and_assert_source_compliance,
471        },
472    };
473
474    fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
475        let mut config = MemoryConfig::default();
476        modfn(&mut config);
477        config
478    }
479
480    #[test]
481    fn finds_row() {
482        let memory = Memory::new(Default::default());
483        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
484
485        let condition = Condition::Equals {
486            field: "key",
487            value: Value::from("test_key"),
488        };
489
490        assert_eq!(
491            Ok(ObjectMap::from([
492                ("key".into(), Value::from("test_key")),
493                ("ttl".into(), Value::from(memory.config.ttl)),
494                ("value".into(), Value::from(5)),
495            ])),
496            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
497        );
498    }
499
500    #[test]
501    fn calculates_ttl() {
502        let ttl = 100;
503        let secs_to_subtract = 10;
504        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
505        {
506            let mut handle = memory.write_handle.lock().unwrap();
507            handle.write_handle.update(
508                "test_key".to_string(),
509                MemoryEntry {
510                    value: "5".to_string(),
511                    update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
512                    ttl,
513                },
514            );
515            handle.write_handle.refresh();
516        }
517
518        let condition = Condition::Equals {
519            field: "key",
520            value: Value::from("test_key"),
521        };
522
523        assert_eq!(
524            Ok(ObjectMap::from([
525                ("key".into(), Value::from("test_key")),
526                ("ttl".into(), Value::from(ttl - secs_to_subtract)),
527                ("value".into(), Value::from(5)),
528            ])),
529            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
530        );
531    }
532
533    #[test]
534    fn calculates_ttl_override() {
535        let global_ttl = 100;
536        let ttl_override = 10;
537        let memory = Memory::new(build_memory_config(|c| {
538            c.ttl = global_ttl;
539            c.ttl_field = OptionalValuePath::new("ttl");
540        }));
541        memory.handle_value(ObjectMap::from([
542            (
543                "ttl_override".into(),
544                Value::from(ObjectMap::from([
545                    ("val".into(), Value::from(5)),
546                    ("ttl".into(), Value::from(ttl_override)),
547                ])),
548            ),
549            (
550                "default_ttl".into(),
551                Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
552            ),
553        ]));
554
555        let default_condition = Condition::Equals {
556            field: "key",
557            value: Value::from("default_ttl"),
558        };
559        let override_condition = Condition::Equals {
560            field: "key",
561            value: Value::from("ttl_override"),
562        };
563
564        assert_eq!(
565            Ok(ObjectMap::from([
566                ("key".into(), Value::from("default_ttl")),
567                ("ttl".into(), Value::from(global_ttl)),
568                (
569                    "value".into(),
570                    Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
571                ),
572            ])),
573            memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
574        );
575        assert_eq!(
576            Ok(ObjectMap::from([
577                ("key".into(), Value::from("ttl_override")),
578                ("ttl".into(), Value::from(ttl_override)),
579                (
580                    "value".into(),
581                    Value::from(ObjectMap::from([
582                        ("val".into(), Value::from(5)),
583                        ("ttl".into(), Value::from(ttl_override))
584                    ]))
585                ),
586            ])),
587            memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
588        );
589    }
590
591    #[test]
592    fn removes_expired_records_on_scan_interval() {
593        let ttl = 100;
594        let memory = Memory::new(build_memory_config(|c| {
595            c.ttl = ttl;
596        }));
597        {
598            let mut handle = memory.write_handle.lock().unwrap();
599            handle.write_handle.update(
600                "test_key".to_string(),
601                MemoryEntry {
602                    value: "5".to_string(),
603                    update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
604                    ttl,
605                },
606            );
607            handle.write_handle.refresh();
608        }
609
610        // Finds the value before scan
611        let condition = Condition::Equals {
612            field: "key",
613            value: Value::from("test_key"),
614        };
615        assert_eq!(
616            Ok(ObjectMap::from([
617                ("key".into(), Value::from("test_key")),
618                ("ttl".into(), Value::from(0)),
619                ("value".into(), Value::from(5)),
620            ])),
621            memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
622        );
623
624        // Force scan
625        let writer = memory.write_handle.lock().unwrap();
626        memory.scan(writer);
627
628        // The value is not present anymore
629        assert!(
630            memory
631                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
632                .unwrap()
633                .pop()
634                .is_none()
635        );
636    }
637
638    #[test]
639    fn does_not_show_values_before_flush_interval() {
640        let ttl = 100;
641        let memory = Memory::new(build_memory_config(|c| {
642            c.ttl = ttl;
643            c.flush_interval = Some(10);
644        }));
645        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
646
647        let condition = Condition::Equals {
648            field: "key",
649            value: Value::from("test_key"),
650        };
651
652        assert!(
653            memory
654                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
655                .unwrap()
656                .pop()
657                .is_none()
658        );
659    }
660
661    #[test]
662    fn updates_ttl_on_value_replacement() {
663        let ttl = 100;
664        let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
665        {
666            let mut handle = memory.write_handle.lock().unwrap();
667            handle.write_handle.update(
668                "test_key".to_string(),
669                MemoryEntry {
670                    value: "5".to_string(),
671                    update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
672                    ttl,
673                },
674            );
675            handle.write_handle.refresh();
676        }
677        let condition = Condition::Equals {
678            field: "key",
679            value: Value::from("test_key"),
680        };
681
682        assert_eq!(
683            Ok(ObjectMap::from([
684                ("key".into(), Value::from("test_key")),
685                ("ttl".into(), Value::from(ttl / 2)),
686                ("value".into(), Value::from(5)),
687            ])),
688            memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
689        );
690
691        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
692
693        assert_eq!(
694            Ok(ObjectMap::from([
695                ("key".into(), Value::from("test_key")),
696                ("ttl".into(), Value::from(ttl)),
697                ("value".into(), Value::from(5)),
698            ])),
699            memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
700        );
701    }
702
703    #[test]
704    fn ignores_all_values_over_byte_size_limit() {
705        let memory = Memory::new(build_memory_config(|c| {
706            c.max_byte_size = Some(1);
707        }));
708        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
709
710        let condition = Condition::Equals {
711            field: "key",
712            value: Value::from("test_key"),
713        };
714
715        assert!(
716            memory
717                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
718                .unwrap()
719                .pop()
720                .is_none()
721        );
722    }
723
724    #[test]
725    fn ignores_values_when_byte_size_limit_is_reached() {
726        let ttl = 100;
727        let memory = Memory::new(build_memory_config(|c| {
728            c.ttl = ttl;
729            c.max_byte_size = Some(150);
730        }));
731        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
732        memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
733
734        assert_eq!(
735            Ok(ObjectMap::from([
736                ("key".into(), Value::from("test_key")),
737                ("ttl".into(), Value::from(ttl)),
738                ("value".into(), Value::from(5)),
739            ])),
740            memory.find_table_row(
741                Case::Sensitive,
742                &[Condition::Equals {
743                    field: "key",
744                    value: Value::from("test_key")
745                }],
746                None,
747                None,
748                None
749            )
750        );
751
752        assert!(
753            memory
754                .find_table_rows(
755                    Case::Sensitive,
756                    &[Condition::Equals {
757                        field: "key",
758                        value: Value::from("rejected_key")
759                    }],
760                    None,
761                    None,
762                    None
763                )
764                .unwrap()
765                .pop()
766                .is_none()
767        );
768    }
769
770    #[test]
771    fn missing_key() {
772        let memory = Memory::new(Default::default());
773
774        let condition = Condition::Equals {
775            field: "key",
776            value: Value::from("test_key"),
777        };
778
779        assert!(
780            memory
781                .find_table_rows(Case::Sensitive, &[condition], None, None, None)
782                .unwrap()
783                .pop()
784                .is_none()
785        );
786    }
787
788    #[tokio::test]
789    async fn sink_spec_compliance() {
790        let event = Event::Log(LogEvent::from(ObjectMap::from([(
791            "test_key".into(),
792            Value::from(5),
793        )])));
794
795        let memory = Memory::new(Default::default());
796
797        run_and_assert_sink_compliance(
798            VectorSink::from_event_streamsink(memory),
799            stream::once(ready(event)),
800            &SINK_TAGS,
801        )
802        .await;
803    }
804
805    #[tokio::test]
806    async fn flush_metrics_without_interval() {
807        let event = Event::Log(LogEvent::from(ObjectMap::from([(
808            "test_key".into(),
809            Value::from(5),
810        )])));
811
812        let memory = Memory::new(Default::default());
813
814        run_and_assert_sink_compliance(
815            VectorSink::from_event_streamsink(memory),
816            stream::once(ready(event)),
817            &SINK_TAGS,
818        )
819        .await;
820
821        let metrics = Controller::get().unwrap().capture_metrics();
822        let insertions_counter = metrics
823            .iter()
824            .find(|m| {
825                matches!(m.value(), MetricValue::Counter { .. })
826                    && m.name() == "memory_enrichment_table_insertions_total"
827            })
828            .expect("Insertions metric is missing!");
829        let MetricValue::Counter {
830            value: insertions_count,
831        } = insertions_counter.value()
832        else {
833            unreachable!();
834        };
835        let flushes_counter = metrics
836            .iter()
837            .find(|m| {
838                matches!(m.value(), MetricValue::Counter { .. })
839                    && m.name() == "memory_enrichment_table_flushes_total"
840            })
841            .expect("Flushes metric is missing!");
842        let MetricValue::Counter {
843            value: flushes_count,
844        } = flushes_counter.value()
845        else {
846            unreachable!();
847        };
848        let object_count_gauge = metrics
849            .iter()
850            .find(|m| {
851                matches!(m.value(), MetricValue::Gauge { .. })
852                    && m.name() == "memory_enrichment_table_objects_count"
853            })
854            .expect("Object count metric is missing!");
855        let MetricValue::Gauge {
856            value: object_count,
857        } = object_count_gauge.value()
858        else {
859            unreachable!();
860        };
861        let byte_size_gauge = metrics
862            .iter()
863            .find(|m| {
864                matches!(m.value(), MetricValue::Gauge { .. })
865                    && m.name() == "memory_enrichment_table_byte_size"
866            })
867            .expect("Byte size metric is missing!");
868        assert_eq!(*insertions_count, 1.0);
869        assert_eq!(*flushes_count, 1.0);
870        assert_eq!(*object_count, 1.0);
871        assert!(!byte_size_gauge.is_empty());
872    }
873
874    #[tokio::test]
875    async fn flush_metrics_with_interval() {
876        let event = Event::Log(LogEvent::from(ObjectMap::from([(
877            "test_key".into(),
878            Value::from(5),
879        )])));
880
881        let memory = Memory::new(build_memory_config(|c| {
882            c.flush_interval = Some(1);
883        }));
884
885        run_and_assert_sink_compliance(
886            VectorSink::from_event_streamsink(memory),
887            stream::iter(vec![event.clone(), event]).flat_map(|e| {
888                stream::once(async move {
889                    tokio::time::sleep(Duration::from_millis(600)).await;
890                    e
891                })
892            }),
893            &SINK_TAGS,
894        )
895        .await;
896
897        let metrics = Controller::get().unwrap().capture_metrics();
898        let insertions_counter = metrics
899            .iter()
900            .find(|m| {
901                matches!(m.value(), MetricValue::Counter { .. })
902                    && m.name() == "memory_enrichment_table_insertions_total"
903            })
904            .expect("Insertions metric is missing!");
905        let MetricValue::Counter {
906            value: insertions_count,
907        } = insertions_counter.value()
908        else {
909            unreachable!();
910        };
911        let flushes_counter = metrics
912            .iter()
913            .find(|m| {
914                matches!(m.value(), MetricValue::Counter { .. })
915                    && m.name() == "memory_enrichment_table_flushes_total"
916            })
917            .expect("Flushes metric is missing!");
918        let MetricValue::Counter {
919            value: flushes_count,
920        } = flushes_counter.value()
921        else {
922            unreachable!();
923        };
924        let object_count_gauge = metrics
925            .iter()
926            .find(|m| {
927                matches!(m.value(), MetricValue::Gauge { .. })
928                    && m.name() == "memory_enrichment_table_objects_count"
929            })
930            .expect("Object count metric is missing!");
931        let MetricValue::Gauge {
932            value: object_count,
933        } = object_count_gauge.value()
934        else {
935            unreachable!();
936        };
937        let byte_size_gauge = metrics
938            .iter()
939            .find(|m| {
940                matches!(m.value(), MetricValue::Gauge { .. })
941                    && m.name() == "memory_enrichment_table_byte_size"
942            })
943            .expect("Byte size metric is missing!");
944
945        assert_eq!(*insertions_count, 2.0);
946        // One is done right away and the next one after the interval
947        assert_eq!(*flushes_count, 2.0);
948        assert_eq!(*object_count, 1.0);
949        assert!(!byte_size_gauge.is_empty());
950    }
951
952    #[tokio::test]
953    async fn flush_metrics_with_key() {
954        let event = Event::Log(LogEvent::from(ObjectMap::from([(
955            "test_key".into(),
956            Value::from(5),
957        )])));
958
959        let memory = Memory::new(build_memory_config(|c| {
960            c.internal_metrics = InternalMetricsConfig {
961                include_key_tag: true,
962            };
963        }));
964
965        run_and_assert_sink_compliance(
966            VectorSink::from_event_streamsink(memory),
967            stream::once(ready(event)),
968            &SINK_TAGS,
969        )
970        .await;
971
972        let metrics = Controller::get().unwrap().capture_metrics();
973        let insertions_counter = metrics
974            .iter()
975            .find(|m| {
976                matches!(m.value(), MetricValue::Counter { .. })
977                    && m.name() == "memory_enrichment_table_insertions_total"
978            })
979            .expect("Insertions metric is missing!");
980
981        assert!(insertions_counter.tag_matches("key", "test_key"));
982    }
983
984    #[tokio::test]
985    async fn flush_metrics_without_key() {
986        let event = Event::Log(LogEvent::from(ObjectMap::from([(
987            "test_key".into(),
988            Value::from(5),
989        )])));
990
991        let memory = Memory::new(Default::default());
992
993        run_and_assert_sink_compliance(
994            VectorSink::from_event_streamsink(memory),
995            stream::once(ready(event)),
996            &SINK_TAGS,
997        )
998        .await;
999
1000        let metrics = Controller::get().unwrap().capture_metrics();
1001        let insertions_counter = metrics
1002            .iter()
1003            .find(|m| {
1004                matches!(m.value(), MetricValue::Counter { .. })
1005                    && m.name() == "memory_enrichment_table_insertions_total"
1006            })
1007            .expect("Insertions metric is missing!");
1008
1009        assert!(insertions_counter.tag_value("key").is_none());
1010    }
1011
1012    #[tokio::test]
1013    async fn source_spec_compliance() {
1014        let mut memory_config = MemoryConfig::default();
1015        memory_config.source_config = Some(MemorySourceConfig {
1016            export_interval: Some(NonZeroU64::try_from(1).unwrap()),
1017            export_batch_size: None,
1018            remove_after_export: false,
1019            export_expired_items: false,
1020            source_key: "test".to_string(),
1021        });
1022        let memory = memory_config.get_or_build_memory().await;
1023        memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
1024
1025        let mut events: Vec<Event> = run_and_assert_source_compliance(
1026            memory_config,
1027            time::Duration::from_secs(5),
1028            &SOURCE_TAGS,
1029        )
1030        .await;
1031
1032        assert!(!events.is_empty());
1033        let event = events.remove(0);
1034        let log = event.as_log();
1035
1036        assert!(!log.value().is_empty());
1037    }
1038}