1use crate::enrichment_tables::memory::internal_events::{
2 MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed, MemoryEnrichmentTableInserted,
3 MemoryEnrichmentTableRead, MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
4};
5use crate::enrichment_tables::memory::MemoryConfig;
6use crate::SourceSender;
7use std::sync::{Arc, Mutex, MutexGuard};
8use std::time::{Duration, Instant};
9
10use evmap::shallow_copy::CopyValue;
11use evmap::{self};
12use evmap_derive::ShallowCopy;
13use futures::StreamExt;
14use thread_local::ThreadLocal;
15use tokio::time::interval;
16use tokio_stream::wrappers::IntervalStream;
17use vector_lib::config::LogNamespace;
18use vector_lib::shutdown::ShutdownSignal;
19use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
20
21use async_trait::async_trait;
22use bytes::Bytes;
23use futures::stream::BoxStream;
24use vector_lib::enrichment::{Case, Condition, IndexHandle, Table};
25use vector_lib::event::{Event, EventStatus, Finalizable};
26use vector_lib::internal_event::{
27 ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
28};
29use vector_lib::sink::StreamSink;
30use vrl::value::{KeyString, ObjectMap, Value};
31
32use super::source::MemorySource;
33
34#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
36pub struct MemoryEntry {
37 value: String,
38 update_time: CopyValue<Instant>,
39}
40
41impl ByteSizeOf for MemoryEntry {
42 fn allocated_bytes(&self) -> usize {
43 self.value.size_of()
44 }
45}
46
47impl MemoryEntry {
48 pub(super) fn as_object_map(
49 &self,
50 now: Instant,
51 total_ttl: u64,
52 key: &str,
53 ) -> Result<ObjectMap, String> {
54 let ttl = total_ttl.saturating_sub(now.duration_since(*self.update_time).as_secs());
55 Ok(ObjectMap::from([
56 (
57 KeyString::from("key"),
58 Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
59 ),
60 (
61 KeyString::from("value"),
62 serde_json::from_str::<Value>(&self.value)
63 .map_err(|_| "Failed to read value from memory!")?,
64 ),
65 (
66 KeyString::from("ttl"),
67 Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
68 ),
69 ]))
70 }
71
72 fn expired(&self, now: Instant, ttl: u64) -> bool {
73 now.duration_since(*self.update_time).as_secs() > ttl
74 }
75}
76
77#[derive(Default)]
78struct MemoryMetadata {
79 byte_size: u64,
80}
81
82pub(super) struct MemoryWriter {
84 pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
85 metadata: MemoryMetadata,
86}
87
88pub struct Memory {
90 pub(super) read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
91 pub(super) read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
92 pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
93 pub(super) config: MemoryConfig,
94}
95
96impl Memory {
97 pub fn new(config: MemoryConfig) -> Self {
99 let (read_handle, write_handle) = evmap::new();
100 Self {
101 config,
102 read_handle_factory: read_handle.factory(),
103 read_handle: ThreadLocal::new(),
104 write_handle: Arc::new(Mutex::new(MemoryWriter {
105 write_handle,
106 metadata: MemoryMetadata::default(),
107 })),
108 }
109 }
110
111 pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
112 self.read_handle
113 .get_or(|| self.read_handle_factory.handle())
114 }
115
116 fn handle_value(&self, value: ObjectMap) {
117 let mut writer = self.write_handle.lock().expect("mutex poisoned");
118 let now = Instant::now();
119
120 for (k, v) in value.into_iter() {
121 let new_entry_key = String::from(k);
122 let Ok(v) = serde_json::to_string(&v) else {
123 emit!(MemoryEnrichmentTableInsertFailed {
124 key: &new_entry_key,
125 include_key_metric_tag: self.config.internal_metrics.include_key_tag
126 });
127 continue;
128 };
129 let new_entry = MemoryEntry {
130 value: v,
131 update_time: now.into(),
132 };
133 let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
134 if let Some(max_byte_size) = self.config.max_byte_size {
135 if writer
136 .metadata
137 .byte_size
138 .saturating_add(new_entry_size as u64)
139 > max_byte_size
140 {
141 emit!(MemoryEnrichmentTableInsertFailed {
143 key: &new_entry_key,
144 include_key_metric_tag: self.config.internal_metrics.include_key_tag
145 });
146 continue;
147 }
148 }
149 writer.metadata.byte_size = writer
150 .metadata
151 .byte_size
152 .saturating_add(new_entry_size as u64);
153 emit!(MemoryEnrichmentTableInserted {
154 key: &new_entry_key,
155 include_key_metric_tag: self.config.internal_metrics.include_key_tag
156 });
157 writer.write_handle.update(new_entry_key, new_entry);
158 }
159
160 if self.config.flush_interval.is_none() {
161 self.flush(writer);
162 }
163 }
164
165 fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
166 let now = Instant::now();
167
168 let mut needs_flush = false;
169 if let Some(reader) = self.get_read_handle().read() {
173 for (k, v) in reader.iter() {
174 if let Some(entry) = v.get_one() {
175 if entry.expired(now, self.config.ttl) {
176 writer.write_handle.empty(k.clone());
179 emit!(MemoryEnrichmentTableTtlExpired {
180 key: k,
181 include_key_metric_tag: self.config.internal_metrics.include_key_tag
182 });
183 needs_flush = true;
184 }
185 }
186 }
187 };
188
189 needs_flush
190 }
191
192 fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
193 let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
194 if needs_flush {
195 self.flush(writer);
196 }
197 }
198
199 fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
200 writer.write_handle.refresh();
201 if let Some(reader) = self.get_read_handle().read() {
202 let mut byte_size = 0;
203 for (k, v) in reader.iter() {
204 byte_size += k.size_of() + v.get_one().size_of();
205 }
206 writer.metadata.byte_size = byte_size as u64;
207 emit!(MemoryEnrichmentTableFlushed {
208 new_objects_count: reader.len(),
209 new_byte_size: byte_size
210 });
211 }
212 }
213
214 pub(crate) fn as_source(
215 &self,
216 shutdown: ShutdownSignal,
217 out: SourceSender,
218 log_namespace: LogNamespace,
219 ) -> MemorySource {
220 MemorySource {
221 memory: self.clone(),
222 shutdown,
223 out,
224 log_namespace,
225 }
226 }
227}
228
229impl Clone for Memory {
230 fn clone(&self) -> Self {
231 Self {
232 read_handle_factory: self.read_handle_factory.clone(),
233 read_handle: ThreadLocal::new(),
234 write_handle: Arc::clone(&self.write_handle),
235 config: self.config.clone(),
236 }
237 }
238}
239
240impl Table for Memory {
241 fn find_table_row<'a>(
242 &self,
243 case: Case,
244 condition: &'a [Condition<'a>],
245 select: Option<&'a [String]>,
246 wildcard: Option<&Value>,
247 index: Option<IndexHandle>,
248 ) -> Result<ObjectMap, String> {
249 let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
250
251 match rows.pop() {
252 Some(row) if rows.is_empty() => Ok(row),
253 Some(_) => Err("More than 1 row found".to_string()),
254 None => Err("Key not found".to_string()),
255 }
256 }
257
258 fn find_table_rows<'a>(
259 &self,
260 _case: Case,
261 condition: &'a [Condition<'a>],
262 _select: Option<&'a [String]>,
263 _wildcard: Option<&Value>,
264 _index: Option<IndexHandle>,
265 ) -> Result<Vec<ObjectMap>, String> {
266 match condition.first() {
267 Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()),
268 Some(Condition::Equals { value, .. }) => {
269 let key = value.to_string_lossy();
270 match self.get_read_handle().get_one(key.as_ref()) {
271 Some(row) => {
272 emit!(MemoryEnrichmentTableRead {
273 key: &key,
274 include_key_metric_tag: self.config.internal_metrics.include_key_tag
275 });
276 row.as_object_map(Instant::now(), self.config.ttl, &key)
277 .map(|r| vec![r])
278 }
279 None => {
280 emit!(MemoryEnrichmentTableReadFailed {
281 key: &key,
282 include_key_metric_tag: self.config.internal_metrics.include_key_tag
283 });
284 Ok(Default::default())
285 }
286 }
287 }
288 Some(_) => Err("Only equality condition is allowed".to_string()),
289 None => Err("Key condition must be specified".to_string()),
290 }
291 }
292
293 fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, String> {
294 match fields.len() {
295 0 => Err("Key field is required".to_string()),
296 1 => Ok(IndexHandle(0)),
297 _ => Err("Only one field is allowed".to_string()),
298 }
299 }
300
301 fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
303 Vec::new()
304 }
305
306 fn needs_reload(&self) -> bool {
308 false
309 }
310}
311
312impl std::fmt::Debug for Memory {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 write!(f, "Memory {} row(s)", self.get_read_handle().len())
315 }
316}
317
318#[async_trait]
319impl StreamSink<Event> for Memory {
320 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
321 let events_sent = register!(EventsSent::from(Output(None)));
322 let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
323 let mut flush_interval = IntervalStream::new(interval(
324 self.config
325 .flush_interval
326 .map(Duration::from_secs)
327 .unwrap_or(Duration::MAX),
328 ));
329 let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
330 self.config.scan_interval.into(),
331 )));
332
333 loop {
334 tokio::select! {
335 event = input.next() => {
336 let mut event = if let Some(event) = event {
337 event
338 } else {
339 break;
340 };
341 let event_byte_size = event.estimated_json_encoded_size_of();
342
343 let finalizers = event.take_finalizers();
344
345 let log = event.into_log();
347
348 if let (Value::Object(map), _) = log.into_parts() {
349 self.handle_value(map)
350 };
351
352 finalizers.update_status(EventStatus::Delivered);
353 events_sent.emit(CountByteSize(1, event_byte_size));
354 bytes_sent.emit(ByteSize(event_byte_size.get()));
355 }
356
357 Some(_) = flush_interval.next() => {
358 let writer = self.write_handle.lock().expect("mutex poisoned");
359 self.flush(writer);
360 }
361
362 Some(_) = scan_interval.next() => {
363 let writer = self.write_handle.lock().expect("mutex poisoned");
364 self.scan(writer);
365 }
366 }
367 }
368 Ok(())
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use futures::{future::ready, StreamExt};
375 use futures_util::stream;
376 use std::{num::NonZeroU64, time::Duration};
377 use tokio::time;
378
379 use vector_lib::{
380 event::{EventContainer, MetricValue},
381 metrics::Controller,
382 sink::VectorSink,
383 };
384
385 use super::*;
386 use crate::{
387 enrichment_tables::memory::{
388 internal_events::InternalMetricsConfig, source::MemorySourceConfig,
389 },
390 event::{Event, LogEvent},
391 test_util::components::{
392 run_and_assert_sink_compliance, run_and_assert_source_compliance, SINK_TAGS,
393 SOURCE_TAGS,
394 },
395 };
396
397 fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
398 let mut config = MemoryConfig::default();
399 modfn(&mut config);
400 config
401 }
402
403 #[test]
404 fn finds_row() {
405 let memory = Memory::new(Default::default());
406 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
407
408 let condition = Condition::Equals {
409 field: "key",
410 value: Value::from("test_key"),
411 };
412
413 assert_eq!(
414 Ok(ObjectMap::from([
415 ("key".into(), Value::from("test_key")),
416 ("ttl".into(), Value::from(memory.config.ttl)),
417 ("value".into(), Value::from(5)),
418 ])),
419 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
420 );
421 }
422
423 #[test]
424 fn calculates_ttl() {
425 let ttl = 100;
426 let secs_to_subtract = 10;
427 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
428 {
429 let mut handle = memory.write_handle.lock().unwrap();
430 handle.write_handle.update(
431 "test_key".to_string(),
432 MemoryEntry {
433 value: "5".to_string(),
434 update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
435 },
436 );
437 handle.write_handle.refresh();
438 }
439
440 let condition = Condition::Equals {
441 field: "key",
442 value: Value::from("test_key"),
443 };
444
445 assert_eq!(
446 Ok(ObjectMap::from([
447 ("key".into(), Value::from("test_key")),
448 ("ttl".into(), Value::from(ttl - secs_to_subtract)),
449 ("value".into(), Value::from(5)),
450 ])),
451 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
452 );
453 }
454
455 #[test]
456 fn removes_expired_records_on_scan_interval() {
457 let ttl = 100;
458 let memory = Memory::new(build_memory_config(|c| {
459 c.ttl = ttl;
460 }));
461 {
462 let mut handle = memory.write_handle.lock().unwrap();
463 handle.write_handle.update(
464 "test_key".to_string(),
465 MemoryEntry {
466 value: "5".to_string(),
467 update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
468 },
469 );
470 handle.write_handle.refresh();
471 }
472
473 let condition = Condition::Equals {
475 field: "key",
476 value: Value::from("test_key"),
477 };
478 assert_eq!(
479 Ok(ObjectMap::from([
480 ("key".into(), Value::from("test_key")),
481 ("ttl".into(), Value::from(0)),
482 ("value".into(), Value::from(5)),
483 ])),
484 memory.find_table_row(Case::Sensitive, &[condition.clone()], None, None, None)
485 );
486
487 let writer = memory.write_handle.lock().unwrap();
489 memory.scan(writer);
490
491 assert!(memory
493 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
494 .unwrap()
495 .pop()
496 .is_none());
497 }
498
499 #[test]
500 fn does_not_show_values_before_flush_interval() {
501 let ttl = 100;
502 let memory = Memory::new(build_memory_config(|c| {
503 c.ttl = ttl;
504 c.flush_interval = Some(10);
505 }));
506 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
507
508 let condition = Condition::Equals {
509 field: "key",
510 value: Value::from("test_key"),
511 };
512
513 assert!(memory
514 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
515 .unwrap()
516 .pop()
517 .is_none());
518 }
519
520 #[test]
521 fn updates_ttl_on_value_replacement() {
522 let ttl = 100;
523 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
524 {
525 let mut handle = memory.write_handle.lock().unwrap();
526 handle.write_handle.update(
527 "test_key".to_string(),
528 MemoryEntry {
529 value: "5".to_string(),
530 update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
531 },
532 );
533 handle.write_handle.refresh();
534 }
535 let condition = Condition::Equals {
536 field: "key",
537 value: Value::from("test_key"),
538 };
539
540 assert_eq!(
541 Ok(ObjectMap::from([
542 ("key".into(), Value::from("test_key")),
543 ("ttl".into(), Value::from(ttl / 2)),
544 ("value".into(), Value::from(5)),
545 ])),
546 memory.find_table_row(Case::Sensitive, &[condition.clone()], None, None, None)
547 );
548
549 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
550
551 assert_eq!(
552 Ok(ObjectMap::from([
553 ("key".into(), Value::from("test_key")),
554 ("ttl".into(), Value::from(ttl)),
555 ("value".into(), Value::from(5)),
556 ])),
557 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
558 );
559 }
560
561 #[test]
562 fn ignores_all_values_over_byte_size_limit() {
563 let memory = Memory::new(build_memory_config(|c| {
564 c.max_byte_size = Some(1);
565 }));
566 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
567
568 let condition = Condition::Equals {
569 field: "key",
570 value: Value::from("test_key"),
571 };
572
573 assert!(memory
574 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
575 .unwrap()
576 .pop()
577 .is_none());
578 }
579
580 #[test]
581 fn ignores_values_when_byte_size_limit_is_reached() {
582 let ttl = 100;
583 let memory = Memory::new(build_memory_config(|c| {
584 c.ttl = ttl;
585 c.max_byte_size = Some(150);
586 }));
587 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
588 memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
589
590 assert_eq!(
591 Ok(ObjectMap::from([
592 ("key".into(), Value::from("test_key")),
593 ("ttl".into(), Value::from(ttl)),
594 ("value".into(), Value::from(5)),
595 ])),
596 memory.find_table_row(
597 Case::Sensitive,
598 &[Condition::Equals {
599 field: "key",
600 value: Value::from("test_key")
601 }],
602 None,
603 None,
604 None
605 )
606 );
607
608 assert!(memory
609 .find_table_rows(
610 Case::Sensitive,
611 &[Condition::Equals {
612 field: "key",
613 value: Value::from("rejected_key")
614 }],
615 None,
616 None,
617 None
618 )
619 .unwrap()
620 .pop()
621 .is_none());
622 }
623
624 #[test]
625 fn missing_key() {
626 let memory = Memory::new(Default::default());
627
628 let condition = Condition::Equals {
629 field: "key",
630 value: Value::from("test_key"),
631 };
632
633 assert!(memory
634 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
635 .unwrap()
636 .pop()
637 .is_none());
638 }
639
640 #[tokio::test]
641 async fn sink_spec_compliance() {
642 let event = Event::Log(LogEvent::from(ObjectMap::from([(
643 "test_key".into(),
644 Value::from(5),
645 )])));
646
647 let memory = Memory::new(Default::default());
648
649 run_and_assert_sink_compliance(
650 VectorSink::from_event_streamsink(memory),
651 stream::once(ready(event)),
652 &SINK_TAGS,
653 )
654 .await;
655 }
656
657 #[tokio::test]
658 async fn flush_metrics_without_interval() {
659 let event = Event::Log(LogEvent::from(ObjectMap::from([(
660 "test_key".into(),
661 Value::from(5),
662 )])));
663
664 let memory = Memory::new(Default::default());
665
666 run_and_assert_sink_compliance(
667 VectorSink::from_event_streamsink(memory),
668 stream::once(ready(event)),
669 &SINK_TAGS,
670 )
671 .await;
672
673 let metrics = Controller::get().unwrap().capture_metrics();
674 let insertions_counter = metrics
675 .iter()
676 .find(|m| {
677 matches!(m.value(), MetricValue::Counter { .. })
678 && m.name() == "memory_enrichment_table_insertions_total"
679 })
680 .expect("Insertions metric is missing!");
681 let MetricValue::Counter {
682 value: insertions_count,
683 } = insertions_counter.value()
684 else {
685 unreachable!();
686 };
687 let flushes_counter = metrics
688 .iter()
689 .find(|m| {
690 matches!(m.value(), MetricValue::Counter { .. })
691 && m.name() == "memory_enrichment_table_flushes_total"
692 })
693 .expect("Flushes metric is missing!");
694 let MetricValue::Counter {
695 value: flushes_count,
696 } = flushes_counter.value()
697 else {
698 unreachable!();
699 };
700 let object_count_gauge = metrics
701 .iter()
702 .find(|m| {
703 matches!(m.value(), MetricValue::Gauge { .. })
704 && m.name() == "memory_enrichment_table_objects_count"
705 })
706 .expect("Object count metric is missing!");
707 let MetricValue::Gauge {
708 value: object_count,
709 } = object_count_gauge.value()
710 else {
711 unreachable!();
712 };
713 let byte_size_gauge = metrics
714 .iter()
715 .find(|m| {
716 matches!(m.value(), MetricValue::Gauge { .. })
717 && m.name() == "memory_enrichment_table_byte_size"
718 })
719 .expect("Byte size metric is missing!");
720 assert_eq!(*insertions_count, 1.0);
721 assert_eq!(*flushes_count, 1.0);
722 assert_eq!(*object_count, 1.0);
723 assert!(!byte_size_gauge.is_empty());
724 }
725
726 #[tokio::test]
727 async fn flush_metrics_with_interval() {
728 let event = Event::Log(LogEvent::from(ObjectMap::from([(
729 "test_key".into(),
730 Value::from(5),
731 )])));
732
733 let memory = Memory::new(build_memory_config(|c| {
734 c.flush_interval = Some(1);
735 }));
736
737 run_and_assert_sink_compliance(
738 VectorSink::from_event_streamsink(memory),
739 stream::iter(vec![event.clone(), event]).flat_map(|e| {
740 stream::once(async move {
741 tokio::time::sleep(Duration::from_millis(600)).await;
742 e
743 })
744 }),
745 &SINK_TAGS,
746 )
747 .await;
748
749 let metrics = Controller::get().unwrap().capture_metrics();
750 let insertions_counter = metrics
751 .iter()
752 .find(|m| {
753 matches!(m.value(), MetricValue::Counter { .. })
754 && m.name() == "memory_enrichment_table_insertions_total"
755 })
756 .expect("Insertions metric is missing!");
757 let MetricValue::Counter {
758 value: insertions_count,
759 } = insertions_counter.value()
760 else {
761 unreachable!();
762 };
763 let flushes_counter = metrics
764 .iter()
765 .find(|m| {
766 matches!(m.value(), MetricValue::Counter { .. })
767 && m.name() == "memory_enrichment_table_flushes_total"
768 })
769 .expect("Flushes metric is missing!");
770 let MetricValue::Counter {
771 value: flushes_count,
772 } = flushes_counter.value()
773 else {
774 unreachable!();
775 };
776 let object_count_gauge = metrics
777 .iter()
778 .find(|m| {
779 matches!(m.value(), MetricValue::Gauge { .. })
780 && m.name() == "memory_enrichment_table_objects_count"
781 })
782 .expect("Object count metric is missing!");
783 let MetricValue::Gauge {
784 value: object_count,
785 } = object_count_gauge.value()
786 else {
787 unreachable!();
788 };
789 let byte_size_gauge = metrics
790 .iter()
791 .find(|m| {
792 matches!(m.value(), MetricValue::Gauge { .. })
793 && m.name() == "memory_enrichment_table_byte_size"
794 })
795 .expect("Byte size metric is missing!");
796
797 assert_eq!(*insertions_count, 2.0);
798 assert_eq!(*flushes_count, 2.0);
800 assert_eq!(*object_count, 1.0);
801 assert!(!byte_size_gauge.is_empty());
802 }
803
804 #[tokio::test]
805 async fn flush_metrics_with_key() {
806 let event = Event::Log(LogEvent::from(ObjectMap::from([(
807 "test_key".into(),
808 Value::from(5),
809 )])));
810
811 let memory = Memory::new(build_memory_config(|c| {
812 c.internal_metrics = InternalMetricsConfig {
813 include_key_tag: true,
814 };
815 }));
816
817 run_and_assert_sink_compliance(
818 VectorSink::from_event_streamsink(memory),
819 stream::once(ready(event)),
820 &SINK_TAGS,
821 )
822 .await;
823
824 let metrics = Controller::get().unwrap().capture_metrics();
825 let insertions_counter = metrics
826 .iter()
827 .find(|m| {
828 matches!(m.value(), MetricValue::Counter { .. })
829 && m.name() == "memory_enrichment_table_insertions_total"
830 })
831 .expect("Insertions metric is missing!");
832
833 assert!(insertions_counter.tag_matches("key", "test_key"));
834 }
835
836 #[tokio::test]
837 async fn flush_metrics_without_key() {
838 let event = Event::Log(LogEvent::from(ObjectMap::from([(
839 "test_key".into(),
840 Value::from(5),
841 )])));
842
843 let memory = Memory::new(Default::default());
844
845 run_and_assert_sink_compliance(
846 VectorSink::from_event_streamsink(memory),
847 stream::once(ready(event)),
848 &SINK_TAGS,
849 )
850 .await;
851
852 let metrics = Controller::get().unwrap().capture_metrics();
853 let insertions_counter = metrics
854 .iter()
855 .find(|m| {
856 matches!(m.value(), MetricValue::Counter { .. })
857 && m.name() == "memory_enrichment_table_insertions_total"
858 })
859 .expect("Insertions metric is missing!");
860
861 assert!(insertions_counter.tag_value("key").is_none());
862 }
863
864 #[tokio::test]
865 async fn source_spec_compliance() {
866 let mut memory_config = MemoryConfig::default();
867 memory_config.source_config = Some(MemorySourceConfig {
868 export_interval: NonZeroU64::try_from(1).unwrap(),
869 export_batch_size: None,
870 remove_after_export: false,
871 source_key: "test".to_string(),
872 });
873 let memory = memory_config.get_or_build_memory().await;
874 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
875
876 let mut events: Vec<Event> = run_and_assert_source_compliance(
877 memory_config,
878 time::Duration::from_secs(5),
879 &SOURCE_TAGS,
880 )
881 .await;
882
883 assert!(!events.is_empty());
884 let event = events.remove(0);
885 let log = event.as_log();
886
887 assert!(!log.value().is_empty());
888 }
889}