1#![allow(unsafe_op_in_unsafe_fn)] use std::{
4 sync::{Arc, Mutex, MutexGuard},
5 time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use evmap::{
11 shallow_copy::CopyValue,
12 {self},
13};
14use evmap_derive::ShallowCopy;
15use futures::{StreamExt, stream::BoxStream};
16use thread_local::ThreadLocal;
17use tokio::{
18 sync::broadcast::{Receiver, Sender},
19 time::interval,
20};
21use tokio_stream::wrappers::IntervalStream;
22use vector_lib::{
23 ByteSizeOf, EstimatedJsonEncodedSizeOf,
24 config::LogNamespace,
25 enrichment::{Case, Condition, IndexHandle, Table},
26 event::{Event, EventStatus, Finalizable},
27 internal_event::{
28 ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
29 },
30 shutdown::ShutdownSignal,
31 sink::StreamSink,
32};
33use vrl::value::{KeyString, ObjectMap, Value};
34
35use super::source::MemorySource;
36use crate::{
37 SourceSender,
38 enrichment_tables::memory::{
39 MemoryConfig,
40 internal_events::{
41 MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed,
42 MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead,
43 MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
44 },
45 },
46};
47
48#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
50pub struct MemoryEntry {
51 value: String,
52 update_time: CopyValue<Instant>,
53 ttl: u64,
54}
55
56impl ByteSizeOf for MemoryEntry {
57 fn allocated_bytes(&self) -> usize {
58 self.value.size_of()
59 }
60}
61
62impl MemoryEntry {
63 pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, String> {
64 let ttl = self
65 .ttl
66 .saturating_sub(now.duration_since(*self.update_time).as_secs());
67 Ok(ObjectMap::from([
68 (
69 KeyString::from("key"),
70 Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
71 ),
72 (
73 KeyString::from("value"),
74 serde_json::from_str::<Value>(&self.value)
75 .map_err(|_| "Failed to read value from memory!")?,
76 ),
77 (
78 KeyString::from("ttl"),
79 Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
80 ),
81 ]))
82 }
83
84 fn expired(&self, now: Instant) -> bool {
85 now.duration_since(*self.update_time).as_secs() > self.ttl
86 }
87}
88
89#[derive(Default)]
90struct MemoryMetadata {
91 byte_size: u64,
92}
93
94#[derive(Clone)]
96pub(super) struct MemoryEntryPair {
97 pub(super) key: String,
99 pub(super) entry: MemoryEntry,
101}
102
103pub(super) struct MemoryWriter {
105 pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
106 metadata: MemoryMetadata,
107}
108
109pub struct Memory {
111 read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
112 read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
113 pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
114 pub(super) config: MemoryConfig,
115 #[allow(dead_code)]
116 expired_items_receiver: Receiver<Vec<MemoryEntryPair>>,
117 expired_items_sender: Sender<Vec<MemoryEntryPair>>,
118}
119
120impl Memory {
121 pub fn new(config: MemoryConfig) -> Self {
123 let (read_handle, write_handle) = evmap::new();
124 let (expired_tx, expired_rx) = tokio::sync::broadcast::channel(5);
128 Self {
129 config,
130 read_handle_factory: read_handle.factory(),
131 read_handle: ThreadLocal::new(),
132 write_handle: Arc::new(Mutex::new(MemoryWriter {
133 write_handle,
134 metadata: MemoryMetadata::default(),
135 })),
136 expired_items_sender: expired_tx,
137 expired_items_receiver: expired_rx,
138 }
139 }
140
141 pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
142 self.read_handle
143 .get_or(|| self.read_handle_factory.handle())
144 }
145
146 pub(super) fn subscribe_to_expired_items(&self) -> Receiver<Vec<MemoryEntryPair>> {
147 self.expired_items_sender.subscribe()
148 }
149
150 fn handle_value(&self, value: ObjectMap) {
151 let mut writer = self.write_handle.lock().expect("mutex poisoned");
152 let now = Instant::now();
153
154 for (k, value) in value.into_iter() {
155 let new_entry_key = String::from(k);
156 let Ok(v) = serde_json::to_string(&value) else {
157 emit!(MemoryEnrichmentTableInsertFailed {
158 key: &new_entry_key,
159 include_key_metric_tag: self.config.internal_metrics.include_key_tag
160 });
161 continue;
162 };
163 let new_entry = MemoryEntry {
164 value: v,
165 update_time: now.into(),
166 ttl: self
167 .config
168 .ttl_field
169 .path
170 .as_ref()
171 .and_then(|p| value.get(p))
172 .and_then(|v| v.as_integer())
173 .map(|v| v as u64)
174 .unwrap_or(self.config.ttl),
175 };
176 let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
177 if let Some(max_byte_size) = self.config.max_byte_size
178 && writer
179 .metadata
180 .byte_size
181 .saturating_add(new_entry_size as u64)
182 > max_byte_size
183 {
184 emit!(MemoryEnrichmentTableInsertFailed {
186 key: &new_entry_key,
187 include_key_metric_tag: self.config.internal_metrics.include_key_tag
188 });
189 continue;
190 }
191 writer.metadata.byte_size = writer
192 .metadata
193 .byte_size
194 .saturating_add(new_entry_size as u64);
195 emit!(MemoryEnrichmentTableInserted {
196 key: &new_entry_key,
197 include_key_metric_tag: self.config.internal_metrics.include_key_tag
198 });
199 writer.write_handle.update(new_entry_key, new_entry);
200 }
201
202 if self.config.flush_interval.is_none() {
203 self.flush(writer);
204 }
205 }
206
207 fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
208 let now = Instant::now();
209
210 let mut needs_flush = false;
211 if let Some(reader) = self.get_read_handle().read() {
215 for (k, v) in reader.iter() {
216 if let Some(entry) = v.get_one()
217 && entry.expired(now)
218 {
219 writer.write_handle.empty(k.clone());
222 emit!(MemoryEnrichmentTableTtlExpired {
223 key: k,
224 include_key_metric_tag: self.config.internal_metrics.include_key_tag
225 });
226 needs_flush = true;
227 }
228 }
229 };
230
231 needs_flush
232 }
233
234 fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
235 let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
236 if needs_flush {
237 self.flush(writer);
238 }
239 }
240
241 fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
242 if self
244 .config
245 .source_config
246 .as_ref()
247 .map(|c| c.export_expired_items)
248 .unwrap_or_default()
249 {
250 let pending_removal = writer
251 .write_handle
252 .pending()
253 .iter()
254 .filter_map(|o| match o {
256 evmap::Operation::Empty(k) => Some(k),
257 _ => None,
258 })
259 .filter_map(|key| {
260 writer.write_handle.get_one(key).map(|v| MemoryEntryPair {
261 key: key.to_string(),
262 entry: v.clone(),
263 })
264 })
265 .collect::<Vec<_>>();
266 if let Err(error) = self.expired_items_sender.send(pending_removal) {
267 error!(
268 message = "Error exporting expired items from memory enrichment table.",
269 error = %error,
270 );
271 }
272 }
273
274 writer.write_handle.refresh();
275 if let Some(reader) = self.get_read_handle().read() {
276 let mut byte_size = 0;
277 for (k, v) in reader.iter() {
278 byte_size += k.size_of() + v.get_one().size_of();
279 }
280 writer.metadata.byte_size = byte_size as u64;
281 emit!(MemoryEnrichmentTableFlushed {
282 new_objects_count: reader.len(),
283 new_byte_size: byte_size
284 });
285 }
286 }
287
288 pub(crate) fn as_source(
289 &self,
290 shutdown: ShutdownSignal,
291 out: SourceSender,
292 log_namespace: LogNamespace,
293 ) -> MemorySource {
294 MemorySource {
295 memory: self.clone(),
296 shutdown,
297 out,
298 log_namespace,
299 }
300 }
301}
302
303impl Clone for Memory {
304 fn clone(&self) -> Self {
305 Self {
306 read_handle_factory: self.read_handle_factory.clone(),
307 read_handle: ThreadLocal::new(),
308 write_handle: Arc::clone(&self.write_handle),
309 config: self.config.clone(),
310 expired_items_sender: self.expired_items_sender.clone(),
311 expired_items_receiver: self.expired_items_sender.subscribe(),
312 }
313 }
314}
315
316impl Table for Memory {
317 fn find_table_row<'a>(
318 &self,
319 case: Case,
320 condition: &'a [Condition<'a>],
321 select: Option<&'a [String]>,
322 wildcard: Option<&Value>,
323 index: Option<IndexHandle>,
324 ) -> Result<ObjectMap, String> {
325 let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
326
327 match rows.pop() {
328 Some(row) if rows.is_empty() => Ok(row),
329 Some(_) => Err("More than 1 row found".to_string()),
330 None => Err("Key not found".to_string()),
331 }
332 }
333
334 fn find_table_rows<'a>(
335 &self,
336 _case: Case,
337 condition: &'a [Condition<'a>],
338 _select: Option<&'a [String]>,
339 _wildcard: Option<&Value>,
340 _index: Option<IndexHandle>,
341 ) -> Result<Vec<ObjectMap>, String> {
342 match condition.first() {
343 Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()),
344 Some(Condition::Equals { value, .. }) => {
345 let key = value.to_string_lossy();
346 match self.get_read_handle().get_one(key.as_ref()) {
347 Some(row) => {
348 emit!(MemoryEnrichmentTableRead {
349 key: &key,
350 include_key_metric_tag: self.config.internal_metrics.include_key_tag
351 });
352 row.as_object_map(Instant::now(), &key).map(|r| vec![r])
353 }
354 None => {
355 emit!(MemoryEnrichmentTableReadFailed {
356 key: &key,
357 include_key_metric_tag: self.config.internal_metrics.include_key_tag
358 });
359 Ok(Default::default())
360 }
361 }
362 }
363 Some(_) => Err("Only equality condition is allowed".to_string()),
364 None => Err("Key condition must be specified".to_string()),
365 }
366 }
367
368 fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, String> {
369 match fields.len() {
370 0 => Err("Key field is required".to_string()),
371 1 => Ok(IndexHandle(0)),
372 _ => Err("Only one field is allowed".to_string()),
373 }
374 }
375
376 fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
378 Vec::new()
379 }
380
381 fn needs_reload(&self) -> bool {
383 false
384 }
385}
386
387impl std::fmt::Debug for Memory {
388 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
389 write!(f, "Memory {} row(s)", self.get_read_handle().len())
390 }
391}
392
393#[async_trait]
394impl StreamSink<Event> for Memory {
395 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
396 let events_sent = register!(EventsSent::from(Output(None)));
397 let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
398 let mut flush_interval = IntervalStream::new(interval(
399 self.config
400 .flush_interval
401 .map(Duration::from_secs)
402 .unwrap_or(Duration::MAX),
403 ));
404 let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
405 self.config.scan_interval.into(),
406 )));
407
408 loop {
409 tokio::select! {
410 event = input.next() => {
411 let mut event = if let Some(event) = event {
412 event
413 } else {
414 break;
415 };
416 let event_byte_size = event.estimated_json_encoded_size_of();
417
418 let finalizers = event.take_finalizers();
419
420 let log = event.into_log();
422
423 if let (Value::Object(map), _) = log.into_parts() {
424 self.handle_value(map)
425 };
426
427 finalizers.update_status(EventStatus::Delivered);
428 events_sent.emit(CountByteSize(1, event_byte_size));
429 bytes_sent.emit(ByteSize(event_byte_size.get()));
430 }
431
432 Some(_) = flush_interval.next() => {
433 let writer = self.write_handle.lock().expect("mutex poisoned");
434 self.flush(writer);
435 }
436
437 Some(_) = scan_interval.next() => {
438 let writer = self.write_handle.lock().expect("mutex poisoned");
439 self.scan(writer);
440 }
441 }
442 }
443 Ok(())
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use std::{num::NonZeroU64, slice::from_ref, time::Duration};
450
451 use futures::{StreamExt, future::ready};
452 use futures_util::stream;
453 use tokio::time;
454
455 use vector_lib::{
456 event::{EventContainer, MetricValue},
457 lookup::lookup_v2::OptionalValuePath,
458 metrics::Controller,
459 sink::VectorSink,
460 };
461
462 use super::*;
463 use crate::{
464 enrichment_tables::memory::{
465 config::MemorySourceConfig, internal_events::InternalMetricsConfig,
466 },
467 event::{Event, LogEvent},
468 test_util::components::{
469 SINK_TAGS, SOURCE_TAGS, run_and_assert_sink_compliance,
470 run_and_assert_source_compliance,
471 },
472 };
473
474 fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
475 let mut config = MemoryConfig::default();
476 modfn(&mut config);
477 config
478 }
479
480 #[test]
481 fn finds_row() {
482 let memory = Memory::new(Default::default());
483 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
484
485 let condition = Condition::Equals {
486 field: "key",
487 value: Value::from("test_key"),
488 };
489
490 assert_eq!(
491 Ok(ObjectMap::from([
492 ("key".into(), Value::from("test_key")),
493 ("ttl".into(), Value::from(memory.config.ttl)),
494 ("value".into(), Value::from(5)),
495 ])),
496 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
497 );
498 }
499
500 #[test]
501 fn calculates_ttl() {
502 let ttl = 100;
503 let secs_to_subtract = 10;
504 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
505 {
506 let mut handle = memory.write_handle.lock().unwrap();
507 handle.write_handle.update(
508 "test_key".to_string(),
509 MemoryEntry {
510 value: "5".to_string(),
511 update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
512 ttl,
513 },
514 );
515 handle.write_handle.refresh();
516 }
517
518 let condition = Condition::Equals {
519 field: "key",
520 value: Value::from("test_key"),
521 };
522
523 assert_eq!(
524 Ok(ObjectMap::from([
525 ("key".into(), Value::from("test_key")),
526 ("ttl".into(), Value::from(ttl - secs_to_subtract)),
527 ("value".into(), Value::from(5)),
528 ])),
529 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
530 );
531 }
532
533 #[test]
534 fn calculates_ttl_override() {
535 let global_ttl = 100;
536 let ttl_override = 10;
537 let memory = Memory::new(build_memory_config(|c| {
538 c.ttl = global_ttl;
539 c.ttl_field = OptionalValuePath::new("ttl");
540 }));
541 memory.handle_value(ObjectMap::from([
542 (
543 "ttl_override".into(),
544 Value::from(ObjectMap::from([
545 ("val".into(), Value::from(5)),
546 ("ttl".into(), Value::from(ttl_override)),
547 ])),
548 ),
549 (
550 "default_ttl".into(),
551 Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
552 ),
553 ]));
554
555 let default_condition = Condition::Equals {
556 field: "key",
557 value: Value::from("default_ttl"),
558 };
559 let override_condition = Condition::Equals {
560 field: "key",
561 value: Value::from("ttl_override"),
562 };
563
564 assert_eq!(
565 Ok(ObjectMap::from([
566 ("key".into(), Value::from("default_ttl")),
567 ("ttl".into(), Value::from(global_ttl)),
568 (
569 "value".into(),
570 Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
571 ),
572 ])),
573 memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
574 );
575 assert_eq!(
576 Ok(ObjectMap::from([
577 ("key".into(), Value::from("ttl_override")),
578 ("ttl".into(), Value::from(ttl_override)),
579 (
580 "value".into(),
581 Value::from(ObjectMap::from([
582 ("val".into(), Value::from(5)),
583 ("ttl".into(), Value::from(ttl_override))
584 ]))
585 ),
586 ])),
587 memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
588 );
589 }
590
591 #[test]
592 fn removes_expired_records_on_scan_interval() {
593 let ttl = 100;
594 let memory = Memory::new(build_memory_config(|c| {
595 c.ttl = ttl;
596 }));
597 {
598 let mut handle = memory.write_handle.lock().unwrap();
599 handle.write_handle.update(
600 "test_key".to_string(),
601 MemoryEntry {
602 value: "5".to_string(),
603 update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
604 ttl,
605 },
606 );
607 handle.write_handle.refresh();
608 }
609
610 let condition = Condition::Equals {
612 field: "key",
613 value: Value::from("test_key"),
614 };
615 assert_eq!(
616 Ok(ObjectMap::from([
617 ("key".into(), Value::from("test_key")),
618 ("ttl".into(), Value::from(0)),
619 ("value".into(), Value::from(5)),
620 ])),
621 memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
622 );
623
624 let writer = memory.write_handle.lock().unwrap();
626 memory.scan(writer);
627
628 assert!(
630 memory
631 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
632 .unwrap()
633 .pop()
634 .is_none()
635 );
636 }
637
638 #[test]
639 fn does_not_show_values_before_flush_interval() {
640 let ttl = 100;
641 let memory = Memory::new(build_memory_config(|c| {
642 c.ttl = ttl;
643 c.flush_interval = Some(10);
644 }));
645 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
646
647 let condition = Condition::Equals {
648 field: "key",
649 value: Value::from("test_key"),
650 };
651
652 assert!(
653 memory
654 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
655 .unwrap()
656 .pop()
657 .is_none()
658 );
659 }
660
661 #[test]
662 fn updates_ttl_on_value_replacement() {
663 let ttl = 100;
664 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
665 {
666 let mut handle = memory.write_handle.lock().unwrap();
667 handle.write_handle.update(
668 "test_key".to_string(),
669 MemoryEntry {
670 value: "5".to_string(),
671 update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
672 ttl,
673 },
674 );
675 handle.write_handle.refresh();
676 }
677 let condition = Condition::Equals {
678 field: "key",
679 value: Value::from("test_key"),
680 };
681
682 assert_eq!(
683 Ok(ObjectMap::from([
684 ("key".into(), Value::from("test_key")),
685 ("ttl".into(), Value::from(ttl / 2)),
686 ("value".into(), Value::from(5)),
687 ])),
688 memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
689 );
690
691 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
692
693 assert_eq!(
694 Ok(ObjectMap::from([
695 ("key".into(), Value::from("test_key")),
696 ("ttl".into(), Value::from(ttl)),
697 ("value".into(), Value::from(5)),
698 ])),
699 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
700 );
701 }
702
703 #[test]
704 fn ignores_all_values_over_byte_size_limit() {
705 let memory = Memory::new(build_memory_config(|c| {
706 c.max_byte_size = Some(1);
707 }));
708 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
709
710 let condition = Condition::Equals {
711 field: "key",
712 value: Value::from("test_key"),
713 };
714
715 assert!(
716 memory
717 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
718 .unwrap()
719 .pop()
720 .is_none()
721 );
722 }
723
724 #[test]
725 fn ignores_values_when_byte_size_limit_is_reached() {
726 let ttl = 100;
727 let memory = Memory::new(build_memory_config(|c| {
728 c.ttl = ttl;
729 c.max_byte_size = Some(150);
730 }));
731 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
732 memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
733
734 assert_eq!(
735 Ok(ObjectMap::from([
736 ("key".into(), Value::from("test_key")),
737 ("ttl".into(), Value::from(ttl)),
738 ("value".into(), Value::from(5)),
739 ])),
740 memory.find_table_row(
741 Case::Sensitive,
742 &[Condition::Equals {
743 field: "key",
744 value: Value::from("test_key")
745 }],
746 None,
747 None,
748 None
749 )
750 );
751
752 assert!(
753 memory
754 .find_table_rows(
755 Case::Sensitive,
756 &[Condition::Equals {
757 field: "key",
758 value: Value::from("rejected_key")
759 }],
760 None,
761 None,
762 None
763 )
764 .unwrap()
765 .pop()
766 .is_none()
767 );
768 }
769
770 #[test]
771 fn missing_key() {
772 let memory = Memory::new(Default::default());
773
774 let condition = Condition::Equals {
775 field: "key",
776 value: Value::from("test_key"),
777 };
778
779 assert!(
780 memory
781 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
782 .unwrap()
783 .pop()
784 .is_none()
785 );
786 }
787
788 #[tokio::test]
789 async fn sink_spec_compliance() {
790 let event = Event::Log(LogEvent::from(ObjectMap::from([(
791 "test_key".into(),
792 Value::from(5),
793 )])));
794
795 let memory = Memory::new(Default::default());
796
797 run_and_assert_sink_compliance(
798 VectorSink::from_event_streamsink(memory),
799 stream::once(ready(event)),
800 &SINK_TAGS,
801 )
802 .await;
803 }
804
805 #[tokio::test]
806 async fn flush_metrics_without_interval() {
807 let event = Event::Log(LogEvent::from(ObjectMap::from([(
808 "test_key".into(),
809 Value::from(5),
810 )])));
811
812 let memory = Memory::new(Default::default());
813
814 run_and_assert_sink_compliance(
815 VectorSink::from_event_streamsink(memory),
816 stream::once(ready(event)),
817 &SINK_TAGS,
818 )
819 .await;
820
821 let metrics = Controller::get().unwrap().capture_metrics();
822 let insertions_counter = metrics
823 .iter()
824 .find(|m| {
825 matches!(m.value(), MetricValue::Counter { .. })
826 && m.name() == "memory_enrichment_table_insertions_total"
827 })
828 .expect("Insertions metric is missing!");
829 let MetricValue::Counter {
830 value: insertions_count,
831 } = insertions_counter.value()
832 else {
833 unreachable!();
834 };
835 let flushes_counter = metrics
836 .iter()
837 .find(|m| {
838 matches!(m.value(), MetricValue::Counter { .. })
839 && m.name() == "memory_enrichment_table_flushes_total"
840 })
841 .expect("Flushes metric is missing!");
842 let MetricValue::Counter {
843 value: flushes_count,
844 } = flushes_counter.value()
845 else {
846 unreachable!();
847 };
848 let object_count_gauge = metrics
849 .iter()
850 .find(|m| {
851 matches!(m.value(), MetricValue::Gauge { .. })
852 && m.name() == "memory_enrichment_table_objects_count"
853 })
854 .expect("Object count metric is missing!");
855 let MetricValue::Gauge {
856 value: object_count,
857 } = object_count_gauge.value()
858 else {
859 unreachable!();
860 };
861 let byte_size_gauge = metrics
862 .iter()
863 .find(|m| {
864 matches!(m.value(), MetricValue::Gauge { .. })
865 && m.name() == "memory_enrichment_table_byte_size"
866 })
867 .expect("Byte size metric is missing!");
868 assert_eq!(*insertions_count, 1.0);
869 assert_eq!(*flushes_count, 1.0);
870 assert_eq!(*object_count, 1.0);
871 assert!(!byte_size_gauge.is_empty());
872 }
873
874 #[tokio::test]
875 async fn flush_metrics_with_interval() {
876 let event = Event::Log(LogEvent::from(ObjectMap::from([(
877 "test_key".into(),
878 Value::from(5),
879 )])));
880
881 let memory = Memory::new(build_memory_config(|c| {
882 c.flush_interval = Some(1);
883 }));
884
885 run_and_assert_sink_compliance(
886 VectorSink::from_event_streamsink(memory),
887 stream::iter(vec![event.clone(), event]).flat_map(|e| {
888 stream::once(async move {
889 tokio::time::sleep(Duration::from_millis(600)).await;
890 e
891 })
892 }),
893 &SINK_TAGS,
894 )
895 .await;
896
897 let metrics = Controller::get().unwrap().capture_metrics();
898 let insertions_counter = metrics
899 .iter()
900 .find(|m| {
901 matches!(m.value(), MetricValue::Counter { .. })
902 && m.name() == "memory_enrichment_table_insertions_total"
903 })
904 .expect("Insertions metric is missing!");
905 let MetricValue::Counter {
906 value: insertions_count,
907 } = insertions_counter.value()
908 else {
909 unreachable!();
910 };
911 let flushes_counter = metrics
912 .iter()
913 .find(|m| {
914 matches!(m.value(), MetricValue::Counter { .. })
915 && m.name() == "memory_enrichment_table_flushes_total"
916 })
917 .expect("Flushes metric is missing!");
918 let MetricValue::Counter {
919 value: flushes_count,
920 } = flushes_counter.value()
921 else {
922 unreachable!();
923 };
924 let object_count_gauge = metrics
925 .iter()
926 .find(|m| {
927 matches!(m.value(), MetricValue::Gauge { .. })
928 && m.name() == "memory_enrichment_table_objects_count"
929 })
930 .expect("Object count metric is missing!");
931 let MetricValue::Gauge {
932 value: object_count,
933 } = object_count_gauge.value()
934 else {
935 unreachable!();
936 };
937 let byte_size_gauge = metrics
938 .iter()
939 .find(|m| {
940 matches!(m.value(), MetricValue::Gauge { .. })
941 && m.name() == "memory_enrichment_table_byte_size"
942 })
943 .expect("Byte size metric is missing!");
944
945 assert_eq!(*insertions_count, 2.0);
946 assert_eq!(*flushes_count, 2.0);
948 assert_eq!(*object_count, 1.0);
949 assert!(!byte_size_gauge.is_empty());
950 }
951
952 #[tokio::test]
953 async fn flush_metrics_with_key() {
954 let event = Event::Log(LogEvent::from(ObjectMap::from([(
955 "test_key".into(),
956 Value::from(5),
957 )])));
958
959 let memory = Memory::new(build_memory_config(|c| {
960 c.internal_metrics = InternalMetricsConfig {
961 include_key_tag: true,
962 };
963 }));
964
965 run_and_assert_sink_compliance(
966 VectorSink::from_event_streamsink(memory),
967 stream::once(ready(event)),
968 &SINK_TAGS,
969 )
970 .await;
971
972 let metrics = Controller::get().unwrap().capture_metrics();
973 let insertions_counter = metrics
974 .iter()
975 .find(|m| {
976 matches!(m.value(), MetricValue::Counter { .. })
977 && m.name() == "memory_enrichment_table_insertions_total"
978 })
979 .expect("Insertions metric is missing!");
980
981 assert!(insertions_counter.tag_matches("key", "test_key"));
982 }
983
984 #[tokio::test]
985 async fn flush_metrics_without_key() {
986 let event = Event::Log(LogEvent::from(ObjectMap::from([(
987 "test_key".into(),
988 Value::from(5),
989 )])));
990
991 let memory = Memory::new(Default::default());
992
993 run_and_assert_sink_compliance(
994 VectorSink::from_event_streamsink(memory),
995 stream::once(ready(event)),
996 &SINK_TAGS,
997 )
998 .await;
999
1000 let metrics = Controller::get().unwrap().capture_metrics();
1001 let insertions_counter = metrics
1002 .iter()
1003 .find(|m| {
1004 matches!(m.value(), MetricValue::Counter { .. })
1005 && m.name() == "memory_enrichment_table_insertions_total"
1006 })
1007 .expect("Insertions metric is missing!");
1008
1009 assert!(insertions_counter.tag_value("key").is_none());
1010 }
1011
1012 #[tokio::test]
1013 async fn source_spec_compliance() {
1014 let mut memory_config = MemoryConfig::default();
1015 memory_config.source_config = Some(MemorySourceConfig {
1016 export_interval: Some(NonZeroU64::try_from(1).unwrap()),
1017 export_batch_size: None,
1018 remove_after_export: false,
1019 export_expired_items: false,
1020 source_key: "test".to_string(),
1021 });
1022 let memory = memory_config.get_or_build_memory().await;
1023 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
1024
1025 let mut events: Vec<Event> = run_and_assert_source_compliance(
1026 memory_config,
1027 time::Duration::from_secs(5),
1028 &SOURCE_TAGS,
1029 )
1030 .await;
1031
1032 assert!(!events.is_empty());
1033 let event = events.remove(0);
1034 let log = event.as_log();
1035
1036 assert!(!log.value().is_empty());
1037 }
1038}