1#![allow(unsafe_op_in_unsafe_fn)] use std::{
4 sync::{Arc, Mutex, MutexGuard},
5 time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use evmap::{
11 shallow_copy::CopyValue,
12 {self},
13};
14use evmap_derive::ShallowCopy;
15use futures::{StreamExt, stream::BoxStream};
16use thread_local::ThreadLocal;
17use tokio::time::interval;
18use tokio_stream::wrappers::IntervalStream;
19use vector_lib::{
20 ByteSizeOf, EstimatedJsonEncodedSizeOf,
21 config::LogNamespace,
22 enrichment::{Case, Condition, IndexHandle, Table},
23 event::{Event, EventStatus, Finalizable},
24 internal_event::{
25 ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
26 },
27 shutdown::ShutdownSignal,
28 sink::StreamSink,
29};
30use vrl::value::{KeyString, ObjectMap, Value};
31
32use super::source::MemorySource;
33use crate::{
34 SourceSender,
35 enrichment_tables::memory::{
36 MemoryConfig,
37 internal_events::{
38 MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed,
39 MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead,
40 MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
41 },
42 },
43};
44
45#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
47pub struct MemoryEntry {
48 value: String,
49 update_time: CopyValue<Instant>,
50 ttl: u64,
51}
52
53impl ByteSizeOf for MemoryEntry {
54 fn allocated_bytes(&self) -> usize {
55 self.value.size_of()
56 }
57}
58
59impl MemoryEntry {
60 pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, String> {
61 let ttl = self
62 .ttl
63 .saturating_sub(now.duration_since(*self.update_time).as_secs());
64 Ok(ObjectMap::from([
65 (
66 KeyString::from("key"),
67 Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
68 ),
69 (
70 KeyString::from("value"),
71 serde_json::from_str::<Value>(&self.value)
72 .map_err(|_| "Failed to read value from memory!")?,
73 ),
74 (
75 KeyString::from("ttl"),
76 Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
77 ),
78 ]))
79 }
80
81 fn expired(&self, now: Instant) -> bool {
82 now.duration_since(*self.update_time).as_secs() > self.ttl
83 }
84}
85
86#[derive(Default)]
87struct MemoryMetadata {
88 byte_size: u64,
89}
90
91pub(super) struct MemoryWriter {
93 pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
94 metadata: MemoryMetadata,
95}
96
97pub struct Memory {
99 pub(super) read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
100 pub(super) read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
101 pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
102 pub(super) config: MemoryConfig,
103}
104
105impl Memory {
106 pub fn new(config: MemoryConfig) -> Self {
108 let (read_handle, write_handle) = evmap::new();
109 Self {
110 config,
111 read_handle_factory: read_handle.factory(),
112 read_handle: ThreadLocal::new(),
113 write_handle: Arc::new(Mutex::new(MemoryWriter {
114 write_handle,
115 metadata: MemoryMetadata::default(),
116 })),
117 }
118 }
119
120 pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
121 self.read_handle
122 .get_or(|| self.read_handle_factory.handle())
123 }
124
125 fn handle_value(&self, value: ObjectMap) {
126 let mut writer = self.write_handle.lock().expect("mutex poisoned");
127 let now = Instant::now();
128
129 for (k, value) in value.into_iter() {
130 let new_entry_key = String::from(k);
131 let Ok(v) = serde_json::to_string(&value) else {
132 emit!(MemoryEnrichmentTableInsertFailed {
133 key: &new_entry_key,
134 include_key_metric_tag: self.config.internal_metrics.include_key_tag
135 });
136 continue;
137 };
138 let new_entry = MemoryEntry {
139 value: v,
140 update_time: now.into(),
141 ttl: self
142 .config
143 .ttl_field
144 .path
145 .as_ref()
146 .and_then(|p| value.get(p))
147 .and_then(|v| v.as_integer())
148 .map(|v| v as u64)
149 .unwrap_or(self.config.ttl),
150 };
151 let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
152 if let Some(max_byte_size) = self.config.max_byte_size
153 && writer
154 .metadata
155 .byte_size
156 .saturating_add(new_entry_size as u64)
157 > max_byte_size
158 {
159 emit!(MemoryEnrichmentTableInsertFailed {
161 key: &new_entry_key,
162 include_key_metric_tag: self.config.internal_metrics.include_key_tag
163 });
164 continue;
165 }
166 writer.metadata.byte_size = writer
167 .metadata
168 .byte_size
169 .saturating_add(new_entry_size as u64);
170 emit!(MemoryEnrichmentTableInserted {
171 key: &new_entry_key,
172 include_key_metric_tag: self.config.internal_metrics.include_key_tag
173 });
174 writer.write_handle.update(new_entry_key, new_entry);
175 }
176
177 if self.config.flush_interval.is_none() {
178 self.flush(writer);
179 }
180 }
181
182 fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
183 let now = Instant::now();
184
185 let mut needs_flush = false;
186 if let Some(reader) = self.get_read_handle().read() {
190 for (k, v) in reader.iter() {
191 if let Some(entry) = v.get_one()
192 && entry.expired(now)
193 {
194 writer.write_handle.empty(k.clone());
197 emit!(MemoryEnrichmentTableTtlExpired {
198 key: k,
199 include_key_metric_tag: self.config.internal_metrics.include_key_tag
200 });
201 needs_flush = true;
202 }
203 }
204 };
205
206 needs_flush
207 }
208
209 fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
210 let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
211 if needs_flush {
212 self.flush(writer);
213 }
214 }
215
216 fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
217 writer.write_handle.refresh();
218 if let Some(reader) = self.get_read_handle().read() {
219 let mut byte_size = 0;
220 for (k, v) in reader.iter() {
221 byte_size += k.size_of() + v.get_one().size_of();
222 }
223 writer.metadata.byte_size = byte_size as u64;
224 emit!(MemoryEnrichmentTableFlushed {
225 new_objects_count: reader.len(),
226 new_byte_size: byte_size
227 });
228 }
229 }
230
231 pub(crate) fn as_source(
232 &self,
233 shutdown: ShutdownSignal,
234 out: SourceSender,
235 log_namespace: LogNamespace,
236 ) -> MemorySource {
237 MemorySource {
238 memory: self.clone(),
239 shutdown,
240 out,
241 log_namespace,
242 }
243 }
244}
245
246impl Clone for Memory {
247 fn clone(&self) -> Self {
248 Self {
249 read_handle_factory: self.read_handle_factory.clone(),
250 read_handle: ThreadLocal::new(),
251 write_handle: Arc::clone(&self.write_handle),
252 config: self.config.clone(),
253 }
254 }
255}
256
257impl Table for Memory {
258 fn find_table_row<'a>(
259 &self,
260 case: Case,
261 condition: &'a [Condition<'a>],
262 select: Option<&'a [String]>,
263 wildcard: Option<&Value>,
264 index: Option<IndexHandle>,
265 ) -> Result<ObjectMap, String> {
266 let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
267
268 match rows.pop() {
269 Some(row) if rows.is_empty() => Ok(row),
270 Some(_) => Err("More than 1 row found".to_string()),
271 None => Err("Key not found".to_string()),
272 }
273 }
274
275 fn find_table_rows<'a>(
276 &self,
277 _case: Case,
278 condition: &'a [Condition<'a>],
279 _select: Option<&'a [String]>,
280 _wildcard: Option<&Value>,
281 _index: Option<IndexHandle>,
282 ) -> Result<Vec<ObjectMap>, String> {
283 match condition.first() {
284 Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()),
285 Some(Condition::Equals { value, .. }) => {
286 let key = value.to_string_lossy();
287 match self.get_read_handle().get_one(key.as_ref()) {
288 Some(row) => {
289 emit!(MemoryEnrichmentTableRead {
290 key: &key,
291 include_key_metric_tag: self.config.internal_metrics.include_key_tag
292 });
293 row.as_object_map(Instant::now(), &key).map(|r| vec![r])
294 }
295 None => {
296 emit!(MemoryEnrichmentTableReadFailed {
297 key: &key,
298 include_key_metric_tag: self.config.internal_metrics.include_key_tag
299 });
300 Ok(Default::default())
301 }
302 }
303 }
304 Some(_) => Err("Only equality condition is allowed".to_string()),
305 None => Err("Key condition must be specified".to_string()),
306 }
307 }
308
309 fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, String> {
310 match fields.len() {
311 0 => Err("Key field is required".to_string()),
312 1 => Ok(IndexHandle(0)),
313 _ => Err("Only one field is allowed".to_string()),
314 }
315 }
316
317 fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
319 Vec::new()
320 }
321
322 fn needs_reload(&self) -> bool {
324 false
325 }
326}
327
328impl std::fmt::Debug for Memory {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 write!(f, "Memory {} row(s)", self.get_read_handle().len())
331 }
332}
333
334#[async_trait]
335impl StreamSink<Event> for Memory {
336 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
337 let events_sent = register!(EventsSent::from(Output(None)));
338 let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
339 let mut flush_interval = IntervalStream::new(interval(
340 self.config
341 .flush_interval
342 .map(Duration::from_secs)
343 .unwrap_or(Duration::MAX),
344 ));
345 let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
346 self.config.scan_interval.into(),
347 )));
348
349 loop {
350 tokio::select! {
351 event = input.next() => {
352 let mut event = if let Some(event) = event {
353 event
354 } else {
355 break;
356 };
357 let event_byte_size = event.estimated_json_encoded_size_of();
358
359 let finalizers = event.take_finalizers();
360
361 let log = event.into_log();
363
364 if let (Value::Object(map), _) = log.into_parts() {
365 self.handle_value(map)
366 };
367
368 finalizers.update_status(EventStatus::Delivered);
369 events_sent.emit(CountByteSize(1, event_byte_size));
370 bytes_sent.emit(ByteSize(event_byte_size.get()));
371 }
372
373 Some(_) = flush_interval.next() => {
374 let writer = self.write_handle.lock().expect("mutex poisoned");
375 self.flush(writer);
376 }
377
378 Some(_) = scan_interval.next() => {
379 let writer = self.write_handle.lock().expect("mutex poisoned");
380 self.scan(writer);
381 }
382 }
383 }
384 Ok(())
385 }
386}
387
388#[cfg(test)]
389mod tests {
390 use std::{num::NonZeroU64, slice::from_ref, time::Duration};
391
392 use futures::{StreamExt, future::ready};
393 use futures_util::stream;
394 use tokio::time;
395
396 use vector_lib::{
397 event::{EventContainer, MetricValue},
398 lookup::lookup_v2::OptionalValuePath,
399 metrics::Controller,
400 sink::VectorSink,
401 };
402
403 use super::*;
404 use crate::{
405 enrichment_tables::memory::{
406 internal_events::InternalMetricsConfig, source::MemorySourceConfig,
407 },
408 event::{Event, LogEvent},
409 test_util::components::{
410 SINK_TAGS, SOURCE_TAGS, run_and_assert_sink_compliance,
411 run_and_assert_source_compliance,
412 },
413 };
414
415 fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
416 let mut config = MemoryConfig::default();
417 modfn(&mut config);
418 config
419 }
420
421 #[test]
422 fn finds_row() {
423 let memory = Memory::new(Default::default());
424 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
425
426 let condition = Condition::Equals {
427 field: "key",
428 value: Value::from("test_key"),
429 };
430
431 assert_eq!(
432 Ok(ObjectMap::from([
433 ("key".into(), Value::from("test_key")),
434 ("ttl".into(), Value::from(memory.config.ttl)),
435 ("value".into(), Value::from(5)),
436 ])),
437 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
438 );
439 }
440
441 #[test]
442 fn calculates_ttl() {
443 let ttl = 100;
444 let secs_to_subtract = 10;
445 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
446 {
447 let mut handle = memory.write_handle.lock().unwrap();
448 handle.write_handle.update(
449 "test_key".to_string(),
450 MemoryEntry {
451 value: "5".to_string(),
452 update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
453 ttl,
454 },
455 );
456 handle.write_handle.refresh();
457 }
458
459 let condition = Condition::Equals {
460 field: "key",
461 value: Value::from("test_key"),
462 };
463
464 assert_eq!(
465 Ok(ObjectMap::from([
466 ("key".into(), Value::from("test_key")),
467 ("ttl".into(), Value::from(ttl - secs_to_subtract)),
468 ("value".into(), Value::from(5)),
469 ])),
470 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
471 );
472 }
473
474 #[test]
475 fn calculates_ttl_override() {
476 let global_ttl = 100;
477 let ttl_override = 10;
478 let memory = Memory::new(build_memory_config(|c| {
479 c.ttl = global_ttl;
480 c.ttl_field = OptionalValuePath::new("ttl");
481 }));
482 memory.handle_value(ObjectMap::from([
483 (
484 "ttl_override".into(),
485 Value::from(ObjectMap::from([
486 ("val".into(), Value::from(5)),
487 ("ttl".into(), Value::from(ttl_override)),
488 ])),
489 ),
490 (
491 "default_ttl".into(),
492 Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
493 ),
494 ]));
495
496 let default_condition = Condition::Equals {
497 field: "key",
498 value: Value::from("default_ttl"),
499 };
500 let override_condition = Condition::Equals {
501 field: "key",
502 value: Value::from("ttl_override"),
503 };
504
505 assert_eq!(
506 Ok(ObjectMap::from([
507 ("key".into(), Value::from("default_ttl")),
508 ("ttl".into(), Value::from(global_ttl)),
509 (
510 "value".into(),
511 Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
512 ),
513 ])),
514 memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
515 );
516 assert_eq!(
517 Ok(ObjectMap::from([
518 ("key".into(), Value::from("ttl_override")),
519 ("ttl".into(), Value::from(ttl_override)),
520 (
521 "value".into(),
522 Value::from(ObjectMap::from([
523 ("val".into(), Value::from(5)),
524 ("ttl".into(), Value::from(ttl_override))
525 ]))
526 ),
527 ])),
528 memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
529 );
530 }
531
532 #[test]
533 fn removes_expired_records_on_scan_interval() {
534 let ttl = 100;
535 let memory = Memory::new(build_memory_config(|c| {
536 c.ttl = ttl;
537 }));
538 {
539 let mut handle = memory.write_handle.lock().unwrap();
540 handle.write_handle.update(
541 "test_key".to_string(),
542 MemoryEntry {
543 value: "5".to_string(),
544 update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
545 ttl,
546 },
547 );
548 handle.write_handle.refresh();
549 }
550
551 let condition = Condition::Equals {
553 field: "key",
554 value: Value::from("test_key"),
555 };
556 assert_eq!(
557 Ok(ObjectMap::from([
558 ("key".into(), Value::from("test_key")),
559 ("ttl".into(), Value::from(0)),
560 ("value".into(), Value::from(5)),
561 ])),
562 memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
563 );
564
565 let writer = memory.write_handle.lock().unwrap();
567 memory.scan(writer);
568
569 assert!(
571 memory
572 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
573 .unwrap()
574 .pop()
575 .is_none()
576 );
577 }
578
579 #[test]
580 fn does_not_show_values_before_flush_interval() {
581 let ttl = 100;
582 let memory = Memory::new(build_memory_config(|c| {
583 c.ttl = ttl;
584 c.flush_interval = Some(10);
585 }));
586 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
587
588 let condition = Condition::Equals {
589 field: "key",
590 value: Value::from("test_key"),
591 };
592
593 assert!(
594 memory
595 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
596 .unwrap()
597 .pop()
598 .is_none()
599 );
600 }
601
602 #[test]
603 fn updates_ttl_on_value_replacement() {
604 let ttl = 100;
605 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
606 {
607 let mut handle = memory.write_handle.lock().unwrap();
608 handle.write_handle.update(
609 "test_key".to_string(),
610 MemoryEntry {
611 value: "5".to_string(),
612 update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
613 ttl,
614 },
615 );
616 handle.write_handle.refresh();
617 }
618 let condition = Condition::Equals {
619 field: "key",
620 value: Value::from("test_key"),
621 };
622
623 assert_eq!(
624 Ok(ObjectMap::from([
625 ("key".into(), Value::from("test_key")),
626 ("ttl".into(), Value::from(ttl / 2)),
627 ("value".into(), Value::from(5)),
628 ])),
629 memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
630 );
631
632 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
633
634 assert_eq!(
635 Ok(ObjectMap::from([
636 ("key".into(), Value::from("test_key")),
637 ("ttl".into(), Value::from(ttl)),
638 ("value".into(), Value::from(5)),
639 ])),
640 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
641 );
642 }
643
644 #[test]
645 fn ignores_all_values_over_byte_size_limit() {
646 let memory = Memory::new(build_memory_config(|c| {
647 c.max_byte_size = Some(1);
648 }));
649 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
650
651 let condition = Condition::Equals {
652 field: "key",
653 value: Value::from("test_key"),
654 };
655
656 assert!(
657 memory
658 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
659 .unwrap()
660 .pop()
661 .is_none()
662 );
663 }
664
665 #[test]
666 fn ignores_values_when_byte_size_limit_is_reached() {
667 let ttl = 100;
668 let memory = Memory::new(build_memory_config(|c| {
669 c.ttl = ttl;
670 c.max_byte_size = Some(150);
671 }));
672 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
673 memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
674
675 assert_eq!(
676 Ok(ObjectMap::from([
677 ("key".into(), Value::from("test_key")),
678 ("ttl".into(), Value::from(ttl)),
679 ("value".into(), Value::from(5)),
680 ])),
681 memory.find_table_row(
682 Case::Sensitive,
683 &[Condition::Equals {
684 field: "key",
685 value: Value::from("test_key")
686 }],
687 None,
688 None,
689 None
690 )
691 );
692
693 assert!(
694 memory
695 .find_table_rows(
696 Case::Sensitive,
697 &[Condition::Equals {
698 field: "key",
699 value: Value::from("rejected_key")
700 }],
701 None,
702 None,
703 None
704 )
705 .unwrap()
706 .pop()
707 .is_none()
708 );
709 }
710
711 #[test]
712 fn missing_key() {
713 let memory = Memory::new(Default::default());
714
715 let condition = Condition::Equals {
716 field: "key",
717 value: Value::from("test_key"),
718 };
719
720 assert!(
721 memory
722 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
723 .unwrap()
724 .pop()
725 .is_none()
726 );
727 }
728
729 #[tokio::test]
730 async fn sink_spec_compliance() {
731 let event = Event::Log(LogEvent::from(ObjectMap::from([(
732 "test_key".into(),
733 Value::from(5),
734 )])));
735
736 let memory = Memory::new(Default::default());
737
738 run_and_assert_sink_compliance(
739 VectorSink::from_event_streamsink(memory),
740 stream::once(ready(event)),
741 &SINK_TAGS,
742 )
743 .await;
744 }
745
746 #[tokio::test]
747 async fn flush_metrics_without_interval() {
748 let event = Event::Log(LogEvent::from(ObjectMap::from([(
749 "test_key".into(),
750 Value::from(5),
751 )])));
752
753 let memory = Memory::new(Default::default());
754
755 run_and_assert_sink_compliance(
756 VectorSink::from_event_streamsink(memory),
757 stream::once(ready(event)),
758 &SINK_TAGS,
759 )
760 .await;
761
762 let metrics = Controller::get().unwrap().capture_metrics();
763 let insertions_counter = metrics
764 .iter()
765 .find(|m| {
766 matches!(m.value(), MetricValue::Counter { .. })
767 && m.name() == "memory_enrichment_table_insertions_total"
768 })
769 .expect("Insertions metric is missing!");
770 let MetricValue::Counter {
771 value: insertions_count,
772 } = insertions_counter.value()
773 else {
774 unreachable!();
775 };
776 let flushes_counter = metrics
777 .iter()
778 .find(|m| {
779 matches!(m.value(), MetricValue::Counter { .. })
780 && m.name() == "memory_enrichment_table_flushes_total"
781 })
782 .expect("Flushes metric is missing!");
783 let MetricValue::Counter {
784 value: flushes_count,
785 } = flushes_counter.value()
786 else {
787 unreachable!();
788 };
789 let object_count_gauge = metrics
790 .iter()
791 .find(|m| {
792 matches!(m.value(), MetricValue::Gauge { .. })
793 && m.name() == "memory_enrichment_table_objects_count"
794 })
795 .expect("Object count metric is missing!");
796 let MetricValue::Gauge {
797 value: object_count,
798 } = object_count_gauge.value()
799 else {
800 unreachable!();
801 };
802 let byte_size_gauge = metrics
803 .iter()
804 .find(|m| {
805 matches!(m.value(), MetricValue::Gauge { .. })
806 && m.name() == "memory_enrichment_table_byte_size"
807 })
808 .expect("Byte size metric is missing!");
809 assert_eq!(*insertions_count, 1.0);
810 assert_eq!(*flushes_count, 1.0);
811 assert_eq!(*object_count, 1.0);
812 assert!(!byte_size_gauge.is_empty());
813 }
814
815 #[tokio::test]
816 async fn flush_metrics_with_interval() {
817 let event = Event::Log(LogEvent::from(ObjectMap::from([(
818 "test_key".into(),
819 Value::from(5),
820 )])));
821
822 let memory = Memory::new(build_memory_config(|c| {
823 c.flush_interval = Some(1);
824 }));
825
826 run_and_assert_sink_compliance(
827 VectorSink::from_event_streamsink(memory),
828 stream::iter(vec![event.clone(), event]).flat_map(|e| {
829 stream::once(async move {
830 tokio::time::sleep(Duration::from_millis(600)).await;
831 e
832 })
833 }),
834 &SINK_TAGS,
835 )
836 .await;
837
838 let metrics = Controller::get().unwrap().capture_metrics();
839 let insertions_counter = metrics
840 .iter()
841 .find(|m| {
842 matches!(m.value(), MetricValue::Counter { .. })
843 && m.name() == "memory_enrichment_table_insertions_total"
844 })
845 .expect("Insertions metric is missing!");
846 let MetricValue::Counter {
847 value: insertions_count,
848 } = insertions_counter.value()
849 else {
850 unreachable!();
851 };
852 let flushes_counter = metrics
853 .iter()
854 .find(|m| {
855 matches!(m.value(), MetricValue::Counter { .. })
856 && m.name() == "memory_enrichment_table_flushes_total"
857 })
858 .expect("Flushes metric is missing!");
859 let MetricValue::Counter {
860 value: flushes_count,
861 } = flushes_counter.value()
862 else {
863 unreachable!();
864 };
865 let object_count_gauge = metrics
866 .iter()
867 .find(|m| {
868 matches!(m.value(), MetricValue::Gauge { .. })
869 && m.name() == "memory_enrichment_table_objects_count"
870 })
871 .expect("Object count metric is missing!");
872 let MetricValue::Gauge {
873 value: object_count,
874 } = object_count_gauge.value()
875 else {
876 unreachable!();
877 };
878 let byte_size_gauge = metrics
879 .iter()
880 .find(|m| {
881 matches!(m.value(), MetricValue::Gauge { .. })
882 && m.name() == "memory_enrichment_table_byte_size"
883 })
884 .expect("Byte size metric is missing!");
885
886 assert_eq!(*insertions_count, 2.0);
887 assert_eq!(*flushes_count, 2.0);
889 assert_eq!(*object_count, 1.0);
890 assert!(!byte_size_gauge.is_empty());
891 }
892
893 #[tokio::test]
894 async fn flush_metrics_with_key() {
895 let event = Event::Log(LogEvent::from(ObjectMap::from([(
896 "test_key".into(),
897 Value::from(5),
898 )])));
899
900 let memory = Memory::new(build_memory_config(|c| {
901 c.internal_metrics = InternalMetricsConfig {
902 include_key_tag: true,
903 };
904 }));
905
906 run_and_assert_sink_compliance(
907 VectorSink::from_event_streamsink(memory),
908 stream::once(ready(event)),
909 &SINK_TAGS,
910 )
911 .await;
912
913 let metrics = Controller::get().unwrap().capture_metrics();
914 let insertions_counter = metrics
915 .iter()
916 .find(|m| {
917 matches!(m.value(), MetricValue::Counter { .. })
918 && m.name() == "memory_enrichment_table_insertions_total"
919 })
920 .expect("Insertions metric is missing!");
921
922 assert!(insertions_counter.tag_matches("key", "test_key"));
923 }
924
925 #[tokio::test]
926 async fn flush_metrics_without_key() {
927 let event = Event::Log(LogEvent::from(ObjectMap::from([(
928 "test_key".into(),
929 Value::from(5),
930 )])));
931
932 let memory = Memory::new(Default::default());
933
934 run_and_assert_sink_compliance(
935 VectorSink::from_event_streamsink(memory),
936 stream::once(ready(event)),
937 &SINK_TAGS,
938 )
939 .await;
940
941 let metrics = Controller::get().unwrap().capture_metrics();
942 let insertions_counter = metrics
943 .iter()
944 .find(|m| {
945 matches!(m.value(), MetricValue::Counter { .. })
946 && m.name() == "memory_enrichment_table_insertions_total"
947 })
948 .expect("Insertions metric is missing!");
949
950 assert!(insertions_counter.tag_value("key").is_none());
951 }
952
953 #[tokio::test]
954 async fn source_spec_compliance() {
955 let mut memory_config = MemoryConfig::default();
956 memory_config.source_config = Some(MemorySourceConfig {
957 export_interval: NonZeroU64::try_from(1).unwrap(),
958 export_batch_size: None,
959 remove_after_export: false,
960 source_key: "test".to_string(),
961 });
962 let memory = memory_config.get_or_build_memory().await;
963 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
964
965 let mut events: Vec<Event> = run_and_assert_source_compliance(
966 memory_config,
967 time::Duration::from_secs(5),
968 &SOURCE_TAGS,
969 )
970 .await;
971
972 assert!(!events.is_empty());
973 let event = events.remove(0);
974 let log = event.as_log();
975
976 assert!(!log.value().is_empty());
977 }
978}