1#![allow(unsafe_op_in_unsafe_fn)] use std::{
4 sync::{Arc, Mutex, MutexGuard},
5 time::{Duration, Instant},
6};
7
8use async_trait::async_trait;
9use bytes::Bytes;
10use evmap::{
11 shallow_copy::CopyValue,
12 {self},
13};
14use evmap_derive::ShallowCopy;
15use futures::{StreamExt, stream::BoxStream};
16use thread_local::ThreadLocal;
17use tokio::{
18 sync::broadcast::{Receiver, Sender},
19 time::interval,
20};
21use tokio_stream::wrappers::IntervalStream;
22use vector_lib::{
23 ByteSizeOf, EstimatedJsonEncodedSizeOf,
24 config::LogNamespace,
25 enrichment::{Case, Condition, Error, IndexHandle, InternalError, Table},
26 event::{Event, EventStatus, Finalizable},
27 internal_event::{
28 ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol,
29 },
30 shutdown::ShutdownSignal,
31 sink::StreamSink,
32};
33use vrl::value::{KeyString, ObjectMap, Value};
34
35use super::source::MemorySource;
36use crate::{
37 SourceSender,
38 enrichment_tables::memory::{
39 MemoryConfig,
40 internal_events::{
41 MemoryEnrichmentTableFlushed, MemoryEnrichmentTableInsertFailed,
42 MemoryEnrichmentTableInserted, MemoryEnrichmentTableRead,
43 MemoryEnrichmentTableReadFailed, MemoryEnrichmentTableTtlExpired,
44 },
45 },
46};
47
48#[derive(Clone, Eq, PartialEq, Hash, ShallowCopy)]
50pub struct MemoryEntry {
51 value: String,
52 update_time: CopyValue<Instant>,
53 ttl: u64,
54}
55
56impl ByteSizeOf for MemoryEntry {
57 fn allocated_bytes(&self) -> usize {
58 self.value.size_of()
59 }
60}
61
62impl MemoryEntry {
63 pub(super) fn as_object_map(&self, now: Instant, key: &str) -> Result<ObjectMap, Error> {
64 let ttl = self
65 .ttl
66 .saturating_sub(now.duration_since(*self.update_time).as_secs());
67 Ok(ObjectMap::from([
68 (
69 KeyString::from("key"),
70 Value::Bytes(Bytes::copy_from_slice(key.as_bytes())),
71 ),
72 (
73 KeyString::from("value"),
74 serde_json::from_str::<Value>(&self.value).map_err(|source| Error::Internal {
76 source: InternalError::FailedToDecode {
77 details: source.to_string(),
78 },
79 })?,
80 ),
81 (
82 KeyString::from("ttl"),
83 Value::Integer(ttl.try_into().unwrap_or(i64::MAX)),
84 ),
85 ]))
86 }
87
88 fn expired(&self, now: Instant) -> bool {
89 now.duration_since(*self.update_time).as_secs() > self.ttl
90 }
91}
92
93#[derive(Default)]
94struct MemoryMetadata {
95 byte_size: u64,
96}
97
98#[derive(Clone)]
100pub(super) struct MemoryEntryPair {
101 pub(super) key: String,
103 pub(super) entry: MemoryEntry,
105}
106
107pub(super) struct MemoryWriter {
109 pub(super) write_handle: evmap::WriteHandle<String, MemoryEntry>,
110 metadata: MemoryMetadata,
111}
112
113pub struct Memory {
115 read_handle_factory: evmap::ReadHandleFactory<String, MemoryEntry>,
116 read_handle: ThreadLocal<evmap::ReadHandle<String, MemoryEntry>>,
117 pub(super) write_handle: Arc<Mutex<MemoryWriter>>,
118 pub(super) config: MemoryConfig,
119 #[allow(dead_code)]
120 expired_items_receiver: Receiver<Vec<MemoryEntryPair>>,
121 expired_items_sender: Sender<Vec<MemoryEntryPair>>,
122}
123
124impl Memory {
125 pub fn new(config: MemoryConfig) -> Self {
127 let (read_handle, write_handle) = evmap::new();
128 let (expired_tx, expired_rx) = tokio::sync::broadcast::channel(5);
132 Self {
133 config,
134 read_handle_factory: read_handle.factory(),
135 read_handle: ThreadLocal::new(),
136 write_handle: Arc::new(Mutex::new(MemoryWriter {
137 write_handle,
138 metadata: MemoryMetadata::default(),
139 })),
140 expired_items_sender: expired_tx,
141 expired_items_receiver: expired_rx,
142 }
143 }
144
145 pub(super) fn get_read_handle(&self) -> &evmap::ReadHandle<String, MemoryEntry> {
146 self.read_handle
147 .get_or(|| self.read_handle_factory.handle())
148 }
149
150 pub(super) fn subscribe_to_expired_items(&self) -> Receiver<Vec<MemoryEntryPair>> {
151 self.expired_items_sender.subscribe()
152 }
153
154 fn handle_value(&self, value: ObjectMap) {
155 let mut writer = self.write_handle.lock().expect("mutex poisoned");
156 let now = Instant::now();
157
158 for (k, value) in value.into_iter() {
159 let new_entry_key = String::from(k);
160 let Ok(v) = serde_json::to_string(&value) else {
161 emit!(MemoryEnrichmentTableInsertFailed {
162 key: &new_entry_key,
163 include_key_metric_tag: self.config.internal_metrics.include_key_tag
164 });
165 continue;
166 };
167 let new_entry = MemoryEntry {
168 value: v,
169 update_time: now.into(),
170 ttl: self
171 .config
172 .ttl_field
173 .path
174 .as_ref()
175 .and_then(|p| value.get(p))
176 .and_then(|v| v.as_integer())
177 .map(|v| v as u64)
178 .unwrap_or(self.config.ttl),
179 };
180 let new_entry_size = new_entry_key.size_of() + new_entry.size_of();
181 if let Some(max_byte_size) = self.config.max_byte_size
182 && writer
183 .metadata
184 .byte_size
185 .saturating_add(new_entry_size as u64)
186 > max_byte_size
187 {
188 emit!(MemoryEnrichmentTableInsertFailed {
190 key: &new_entry_key,
191 include_key_metric_tag: self.config.internal_metrics.include_key_tag
192 });
193 continue;
194 }
195 writer.metadata.byte_size = writer
196 .metadata
197 .byte_size
198 .saturating_add(new_entry_size as u64);
199 emit!(MemoryEnrichmentTableInserted {
200 key: &new_entry_key,
201 include_key_metric_tag: self.config.internal_metrics.include_key_tag
202 });
203 writer.write_handle.update(new_entry_key, new_entry);
204 }
205
206 if self.config.flush_interval.is_none() {
207 self.flush(writer);
208 }
209 }
210
211 fn scan_and_mark_for_deletion(&self, writer: &mut MutexGuard<'_, MemoryWriter>) -> bool {
212 let now = Instant::now();
213
214 let mut needs_flush = false;
215 if let Some(reader) = self.get_read_handle().read() {
219 for (k, v) in reader.iter() {
220 if let Some(entry) = v.get_one()
221 && entry.expired(now)
222 {
223 writer.write_handle.empty(k.clone());
226 emit!(MemoryEnrichmentTableTtlExpired {
227 key: k,
228 include_key_metric_tag: self.config.internal_metrics.include_key_tag
229 });
230 needs_flush = true;
231 }
232 }
233 };
234
235 needs_flush
236 }
237
238 fn scan(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
239 let needs_flush = self.scan_and_mark_for_deletion(&mut writer);
240 if needs_flush {
241 self.flush(writer);
242 }
243 }
244
245 fn flush(&self, mut writer: MutexGuard<'_, MemoryWriter>) {
246 if self
248 .config
249 .source_config
250 .as_ref()
251 .map(|c| c.export_expired_items)
252 .unwrap_or_default()
253 {
254 let pending_removal = writer
255 .write_handle
256 .pending()
257 .iter()
258 .filter_map(|o| match o {
260 evmap::Operation::Empty(k) => Some(k),
261 _ => None,
262 })
263 .filter_map(|key| {
264 writer.write_handle.get_one(key).map(|v| MemoryEntryPair {
265 key: key.to_string(),
266 entry: v.clone(),
267 })
268 })
269 .collect::<Vec<_>>();
270 if let Err(error) = self.expired_items_sender.send(pending_removal) {
271 error!(
272 message = "Error exporting expired items from memory enrichment table.",
273 error = %error,
274 );
275 }
276 }
277
278 writer.write_handle.refresh();
279 if let Some(reader) = self.get_read_handle().read() {
280 let mut byte_size = 0;
281 for (k, v) in reader.iter() {
282 byte_size += k.size_of() + v.get_one().size_of();
283 }
284 writer.metadata.byte_size = byte_size as u64;
285 emit!(MemoryEnrichmentTableFlushed {
286 new_objects_count: reader.len(),
287 new_byte_size: byte_size
288 });
289 }
290 }
291
292 pub(crate) fn as_source(
293 &self,
294 shutdown: ShutdownSignal,
295 out: SourceSender,
296 log_namespace: LogNamespace,
297 ) -> MemorySource {
298 MemorySource {
299 memory: self.clone(),
300 shutdown,
301 out,
302 log_namespace,
303 }
304 }
305}
306
307impl Clone for Memory {
308 fn clone(&self) -> Self {
309 Self {
310 read_handle_factory: self.read_handle_factory.clone(),
311 read_handle: ThreadLocal::new(),
312 write_handle: Arc::clone(&self.write_handle),
313 config: self.config.clone(),
314 expired_items_sender: self.expired_items_sender.clone(),
315 expired_items_receiver: self.expired_items_sender.subscribe(),
316 }
317 }
318}
319
320impl Table for Memory {
321 fn find_table_row<'a>(
322 &self,
323 case: Case,
324 condition: &'a [Condition<'a>],
325 select: Option<&'a [String]>,
326 wildcard: Option<&Value>,
327 index: Option<IndexHandle>,
328 ) -> Result<ObjectMap, Error> {
329 let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
330
331 match rows.pop() {
332 Some(row) if rows.is_empty() => Ok(row),
333 Some(_) => Err(Error::MoreThanOneRowFound),
334 None => Err(Error::NoRowsFound),
335 }
336 }
337
338 fn find_table_rows<'a>(
339 &self,
340 _case: Case,
341 condition: &'a [Condition<'a>],
342 _select: Option<&'a [String]>,
343 _wildcard: Option<&Value>,
344 _index: Option<IndexHandle>,
345 ) -> Result<Vec<ObjectMap>, Error> {
346 match condition.first() {
347 Some(_) if condition.len() > 1 => Err(Error::OnlyOneConditionAllowed),
348 Some(Condition::Equals { value, .. }) => {
349 let key = value.to_string_lossy();
350 match self.get_read_handle().get_one(key.as_ref()) {
351 Some(row) => {
352 emit!(MemoryEnrichmentTableRead {
353 key: &key,
354 include_key_metric_tag: self.config.internal_metrics.include_key_tag
355 });
356 row.as_object_map(Instant::now(), &key).map(|r| vec![r])
357 }
358 None => {
359 emit!(MemoryEnrichmentTableReadFailed {
360 key: &key,
361 include_key_metric_tag: self.config.internal_metrics.include_key_tag
362 });
363 Ok(Default::default())
364 }
365 }
366 }
367 Some(_) => Err(Error::OnlyEqualityConditionAllowed),
368 None => Err(Error::MissingCondition { kind: "Key" }),
369 }
370 }
371
372 fn add_index(&mut self, _case: Case, fields: &[&str]) -> Result<IndexHandle, Error> {
373 match fields.len() {
374 0 => Err(Error::MissingRequiredField { field: "Key" }),
375 1 => Ok(IndexHandle(0)),
376 _ => Err(Error::OnlyOneFieldAllowed),
377 }
378 }
379
380 fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
382 Vec::new()
383 }
384
385 fn needs_reload(&self) -> bool {
387 false
388 }
389}
390
391impl std::fmt::Debug for Memory {
392 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
393 write!(f, "Memory {} row(s)", self.get_read_handle().len())
394 }
395}
396
397#[async_trait]
398impl StreamSink<Event> for Memory {
399 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
400 let events_sent = register!(EventsSent::from(Output(None)));
401 let bytes_sent = register!(BytesSent::from(Protocol("memory_enrichment_table".into(),)));
402 let mut flush_interval = IntervalStream::new(interval(
403 self.config
404 .flush_interval
405 .map(Duration::from_secs)
406 .unwrap_or(Duration::MAX),
407 ));
408 let mut scan_interval = IntervalStream::new(interval(Duration::from_secs(
409 self.config.scan_interval.into(),
410 )));
411
412 loop {
413 tokio::select! {
414 event = input.next() => {
415 let mut event = if let Some(event) = event {
416 event
417 } else {
418 break;
419 };
420 let event_byte_size = event.estimated_json_encoded_size_of();
421
422 let finalizers = event.take_finalizers();
423
424 let log = event.into_log();
426
427 if let (Value::Object(map), _) = log.into_parts() {
428 self.handle_value(map)
429 };
430
431 finalizers.update_status(EventStatus::Delivered);
432 events_sent.emit(CountByteSize(1, event_byte_size));
433 bytes_sent.emit(ByteSize(event_byte_size.get()));
434 }
435
436 Some(_) = flush_interval.next() => {
437 let writer = self.write_handle.lock().expect("mutex poisoned");
438 self.flush(writer);
439 }
440
441 Some(_) = scan_interval.next() => {
442 let writer = self.write_handle.lock().expect("mutex poisoned");
443 self.scan(writer);
444 }
445 }
446 }
447 Ok(())
448 }
449}
450
451#[cfg(test)]
452mod tests {
453 use std::{num::NonZeroU64, slice::from_ref, time::Duration};
454
455 use futures::{StreamExt, future::ready};
456 use futures_util::stream;
457 use tokio::time;
458
459 use vector_lib::{
460 event::{EventContainer, MetricValue},
461 lookup::lookup_v2::OptionalValuePath,
462 metrics::Controller,
463 sink::VectorSink,
464 };
465
466 use super::*;
467 use crate::{
468 enrichment_tables::memory::{
469 config::MemorySourceConfig, internal_events::InternalMetricsConfig,
470 },
471 event::{Event, LogEvent},
472 test_util::components::{
473 SINK_TAGS, SOURCE_TAGS, run_and_assert_sink_compliance,
474 run_and_assert_source_compliance,
475 },
476 };
477
478 fn build_memory_config(modfn: impl Fn(&mut MemoryConfig)) -> MemoryConfig {
479 let mut config = MemoryConfig::default();
480 modfn(&mut config);
481 config
482 }
483
484 #[test]
485 fn finds_row() {
486 let memory = Memory::new(Default::default());
487 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
488
489 let condition = Condition::Equals {
490 field: "key",
491 value: Value::from("test_key"),
492 };
493
494 assert_eq!(
495 Ok(ObjectMap::from([
496 ("key".into(), Value::from("test_key")),
497 ("ttl".into(), Value::from(memory.config.ttl)),
498 ("value".into(), Value::from(5)),
499 ])),
500 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
501 );
502 }
503
504 #[test]
505 fn calculates_ttl() {
506 let ttl = 100;
507 let secs_to_subtract = 10;
508 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
509 {
510 let mut handle = memory.write_handle.lock().unwrap();
511 handle.write_handle.update(
512 "test_key".to_string(),
513 MemoryEntry {
514 value: "5".to_string(),
515 update_time: (Instant::now() - Duration::from_secs(secs_to_subtract)).into(),
516 ttl,
517 },
518 );
519 handle.write_handle.refresh();
520 }
521
522 let condition = Condition::Equals {
523 field: "key",
524 value: Value::from("test_key"),
525 };
526
527 assert_eq!(
528 Ok(ObjectMap::from([
529 ("key".into(), Value::from("test_key")),
530 ("ttl".into(), Value::from(ttl - secs_to_subtract)),
531 ("value".into(), Value::from(5)),
532 ])),
533 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
534 );
535 }
536
537 #[test]
538 fn calculates_ttl_override() {
539 let global_ttl = 100;
540 let ttl_override = 10;
541 let memory = Memory::new(build_memory_config(|c| {
542 c.ttl = global_ttl;
543 c.ttl_field = OptionalValuePath::new("ttl");
544 }));
545 memory.handle_value(ObjectMap::from([
546 (
547 "ttl_override".into(),
548 Value::from(ObjectMap::from([
549 ("val".into(), Value::from(5)),
550 ("ttl".into(), Value::from(ttl_override)),
551 ])),
552 ),
553 (
554 "default_ttl".into(),
555 Value::from(ObjectMap::from([("val".into(), Value::from(5))])),
556 ),
557 ]));
558
559 let default_condition = Condition::Equals {
560 field: "key",
561 value: Value::from("default_ttl"),
562 };
563 let override_condition = Condition::Equals {
564 field: "key",
565 value: Value::from("ttl_override"),
566 };
567
568 assert_eq!(
569 Ok(ObjectMap::from([
570 ("key".into(), Value::from("default_ttl")),
571 ("ttl".into(), Value::from(global_ttl)),
572 (
573 "value".into(),
574 Value::from(ObjectMap::from([("val".into(), Value::from(5))]))
575 ),
576 ])),
577 memory.find_table_row(Case::Sensitive, &[default_condition], None, None, None)
578 );
579 assert_eq!(
580 Ok(ObjectMap::from([
581 ("key".into(), Value::from("ttl_override")),
582 ("ttl".into(), Value::from(ttl_override)),
583 (
584 "value".into(),
585 Value::from(ObjectMap::from([
586 ("val".into(), Value::from(5)),
587 ("ttl".into(), Value::from(ttl_override))
588 ]))
589 ),
590 ])),
591 memory.find_table_row(Case::Sensitive, &[override_condition], None, None, None)
592 );
593 }
594
595 #[test]
596 fn removes_expired_records_on_scan_interval() {
597 let ttl = 100;
598 let memory = Memory::new(build_memory_config(|c| {
599 c.ttl = ttl;
600 }));
601 {
602 let mut handle = memory.write_handle.lock().unwrap();
603 handle.write_handle.update(
604 "test_key".to_string(),
605 MemoryEntry {
606 value: "5".to_string(),
607 update_time: (Instant::now() - Duration::from_secs(ttl + 10)).into(),
608 ttl,
609 },
610 );
611 handle.write_handle.refresh();
612 }
613
614 let condition = Condition::Equals {
616 field: "key",
617 value: Value::from("test_key"),
618 };
619 assert_eq!(
620 Ok(ObjectMap::from([
621 ("key".into(), Value::from("test_key")),
622 ("ttl".into(), Value::from(0)),
623 ("value".into(), Value::from(5)),
624 ])),
625 memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
626 );
627
628 let writer = memory.write_handle.lock().unwrap();
630 memory.scan(writer);
631
632 assert!(
634 memory
635 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
636 .unwrap()
637 .pop()
638 .is_none()
639 );
640 }
641
642 #[test]
643 fn does_not_show_values_before_flush_interval() {
644 let ttl = 100;
645 let memory = Memory::new(build_memory_config(|c| {
646 c.ttl = ttl;
647 c.flush_interval = Some(10);
648 }));
649 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
650
651 let condition = Condition::Equals {
652 field: "key",
653 value: Value::from("test_key"),
654 };
655
656 assert!(
657 memory
658 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
659 .unwrap()
660 .pop()
661 .is_none()
662 );
663 }
664
665 #[test]
666 fn updates_ttl_on_value_replacement() {
667 let ttl = 100;
668 let memory = Memory::new(build_memory_config(|c| c.ttl = ttl));
669 {
670 let mut handle = memory.write_handle.lock().unwrap();
671 handle.write_handle.update(
672 "test_key".to_string(),
673 MemoryEntry {
674 value: "5".to_string(),
675 update_time: (Instant::now() - Duration::from_secs(ttl / 2)).into(),
676 ttl,
677 },
678 );
679 handle.write_handle.refresh();
680 }
681 let condition = Condition::Equals {
682 field: "key",
683 value: Value::from("test_key"),
684 };
685
686 assert_eq!(
687 Ok(ObjectMap::from([
688 ("key".into(), Value::from("test_key")),
689 ("ttl".into(), Value::from(ttl / 2)),
690 ("value".into(), Value::from(5)),
691 ])),
692 memory.find_table_row(Case::Sensitive, from_ref(&condition), None, None, None)
693 );
694
695 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
696
697 assert_eq!(
698 Ok(ObjectMap::from([
699 ("key".into(), Value::from("test_key")),
700 ("ttl".into(), Value::from(ttl)),
701 ("value".into(), Value::from(5)),
702 ])),
703 memory.find_table_row(Case::Sensitive, &[condition], None, None, None)
704 );
705 }
706
707 #[test]
708 fn ignores_all_values_over_byte_size_limit() {
709 let memory = Memory::new(build_memory_config(|c| {
710 c.max_byte_size = Some(1);
711 }));
712 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
713
714 let condition = Condition::Equals {
715 field: "key",
716 value: Value::from("test_key"),
717 };
718
719 assert!(
720 memory
721 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
722 .unwrap()
723 .pop()
724 .is_none()
725 );
726 }
727
728 #[test]
729 fn ignores_values_when_byte_size_limit_is_reached() {
730 let ttl = 100;
731 let memory = Memory::new(build_memory_config(|c| {
732 c.ttl = ttl;
733 c.max_byte_size = Some(150);
734 }));
735 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
736 memory.handle_value(ObjectMap::from([("rejected_key".into(), Value::from(5))]));
737
738 assert_eq!(
739 Ok(ObjectMap::from([
740 ("key".into(), Value::from("test_key")),
741 ("ttl".into(), Value::from(ttl)),
742 ("value".into(), Value::from(5)),
743 ])),
744 memory.find_table_row(
745 Case::Sensitive,
746 &[Condition::Equals {
747 field: "key",
748 value: Value::from("test_key")
749 }],
750 None,
751 None,
752 None
753 )
754 );
755
756 assert!(
757 memory
758 .find_table_rows(
759 Case::Sensitive,
760 &[Condition::Equals {
761 field: "key",
762 value: Value::from("rejected_key")
763 }],
764 None,
765 None,
766 None
767 )
768 .unwrap()
769 .pop()
770 .is_none()
771 );
772 }
773
774 #[test]
775 fn missing_key() {
776 let memory = Memory::new(Default::default());
777
778 let condition = Condition::Equals {
779 field: "key",
780 value: Value::from("test_key"),
781 };
782
783 assert!(
784 memory
785 .find_table_rows(Case::Sensitive, &[condition], None, None, None)
786 .unwrap()
787 .pop()
788 .is_none()
789 );
790 }
791
792 #[tokio::test]
793 async fn sink_spec_compliance() {
794 let event = Event::Log(LogEvent::from(ObjectMap::from([(
795 "test_key".into(),
796 Value::from(5),
797 )])));
798
799 let memory = Memory::new(Default::default());
800
801 run_and_assert_sink_compliance(
802 VectorSink::from_event_streamsink(memory),
803 stream::once(ready(event)),
804 &SINK_TAGS,
805 )
806 .await;
807 }
808
809 #[tokio::test]
810 async fn flush_metrics_without_interval() {
811 let event = Event::Log(LogEvent::from(ObjectMap::from([(
812 "test_key".into(),
813 Value::from(5),
814 )])));
815
816 let memory = Memory::new(Default::default());
817
818 run_and_assert_sink_compliance(
819 VectorSink::from_event_streamsink(memory),
820 stream::once(ready(event)),
821 &SINK_TAGS,
822 )
823 .await;
824
825 let metrics = Controller::get().unwrap().capture_metrics();
826 let insertions_counter = metrics
827 .iter()
828 .find(|m| {
829 matches!(m.value(), MetricValue::Counter { .. })
830 && m.name() == "memory_enrichment_table_insertions_total"
831 })
832 .expect("Insertions metric is missing!");
833 let MetricValue::Counter {
834 value: insertions_count,
835 } = insertions_counter.value()
836 else {
837 unreachable!();
838 };
839 let flushes_counter = metrics
840 .iter()
841 .find(|m| {
842 matches!(m.value(), MetricValue::Counter { .. })
843 && m.name() == "memory_enrichment_table_flushes_total"
844 })
845 .expect("Flushes metric is missing!");
846 let MetricValue::Counter {
847 value: flushes_count,
848 } = flushes_counter.value()
849 else {
850 unreachable!();
851 };
852 let object_count_gauge = metrics
853 .iter()
854 .find(|m| {
855 matches!(m.value(), MetricValue::Gauge { .. })
856 && m.name() == "memory_enrichment_table_objects_count"
857 })
858 .expect("Object count metric is missing!");
859 let MetricValue::Gauge {
860 value: object_count,
861 } = object_count_gauge.value()
862 else {
863 unreachable!();
864 };
865 let byte_size_gauge = metrics
866 .iter()
867 .find(|m| {
868 matches!(m.value(), MetricValue::Gauge { .. })
869 && m.name() == "memory_enrichment_table_byte_size"
870 })
871 .expect("Byte size metric is missing!");
872 assert_eq!(*insertions_count, 1.0);
873 assert_eq!(*flushes_count, 1.0);
874 assert_eq!(*object_count, 1.0);
875 assert!(!byte_size_gauge.is_empty());
876 }
877
878 #[tokio::test]
879 async fn flush_metrics_with_interval() {
880 let event = Event::Log(LogEvent::from(ObjectMap::from([(
881 "test_key".into(),
882 Value::from(5),
883 )])));
884
885 let memory = Memory::new(build_memory_config(|c| {
886 c.flush_interval = Some(1);
887 }));
888
889 run_and_assert_sink_compliance(
890 VectorSink::from_event_streamsink(memory),
891 stream::iter(vec![event.clone(), event]).flat_map(|e| {
892 stream::once(async move {
893 tokio::time::sleep(Duration::from_millis(600)).await;
894 e
895 })
896 }),
897 &SINK_TAGS,
898 )
899 .await;
900
901 let metrics = Controller::get().unwrap().capture_metrics();
902 let insertions_counter = metrics
903 .iter()
904 .find(|m| {
905 matches!(m.value(), MetricValue::Counter { .. })
906 && m.name() == "memory_enrichment_table_insertions_total"
907 })
908 .expect("Insertions metric is missing!");
909 let MetricValue::Counter {
910 value: insertions_count,
911 } = insertions_counter.value()
912 else {
913 unreachable!();
914 };
915 let flushes_counter = metrics
916 .iter()
917 .find(|m| {
918 matches!(m.value(), MetricValue::Counter { .. })
919 && m.name() == "memory_enrichment_table_flushes_total"
920 })
921 .expect("Flushes metric is missing!");
922 let MetricValue::Counter {
923 value: flushes_count,
924 } = flushes_counter.value()
925 else {
926 unreachable!();
927 };
928 let object_count_gauge = metrics
929 .iter()
930 .find(|m| {
931 matches!(m.value(), MetricValue::Gauge { .. })
932 && m.name() == "memory_enrichment_table_objects_count"
933 })
934 .expect("Object count metric is missing!");
935 let MetricValue::Gauge {
936 value: object_count,
937 } = object_count_gauge.value()
938 else {
939 unreachable!();
940 };
941 let byte_size_gauge = metrics
942 .iter()
943 .find(|m| {
944 matches!(m.value(), MetricValue::Gauge { .. })
945 && m.name() == "memory_enrichment_table_byte_size"
946 })
947 .expect("Byte size metric is missing!");
948
949 assert_eq!(*insertions_count, 2.0);
950 assert_eq!(*flushes_count, 2.0);
952 assert_eq!(*object_count, 1.0);
953 assert!(!byte_size_gauge.is_empty());
954 }
955
956 #[tokio::test]
957 async fn flush_metrics_with_key() {
958 let event = Event::Log(LogEvent::from(ObjectMap::from([(
959 "test_key".into(),
960 Value::from(5),
961 )])));
962
963 let memory = Memory::new(build_memory_config(|c| {
964 c.internal_metrics = InternalMetricsConfig {
965 include_key_tag: true,
966 };
967 }));
968
969 run_and_assert_sink_compliance(
970 VectorSink::from_event_streamsink(memory),
971 stream::once(ready(event)),
972 &SINK_TAGS,
973 )
974 .await;
975
976 let metrics = Controller::get().unwrap().capture_metrics();
977 let insertions_counter = metrics
978 .iter()
979 .find(|m| {
980 matches!(m.value(), MetricValue::Counter { .. })
981 && m.name() == "memory_enrichment_table_insertions_total"
982 })
983 .expect("Insertions metric is missing!");
984
985 assert!(insertions_counter.tag_matches("key", "test_key"));
986 }
987
988 #[tokio::test]
989 async fn flush_metrics_without_key() {
990 let event = Event::Log(LogEvent::from(ObjectMap::from([(
991 "test_key".into(),
992 Value::from(5),
993 )])));
994
995 let memory = Memory::new(Default::default());
996
997 run_and_assert_sink_compliance(
998 VectorSink::from_event_streamsink(memory),
999 stream::once(ready(event)),
1000 &SINK_TAGS,
1001 )
1002 .await;
1003
1004 let metrics = Controller::get().unwrap().capture_metrics();
1005 let insertions_counter = metrics
1006 .iter()
1007 .find(|m| {
1008 matches!(m.value(), MetricValue::Counter { .. })
1009 && m.name() == "memory_enrichment_table_insertions_total"
1010 })
1011 .expect("Insertions metric is missing!");
1012
1013 assert!(insertions_counter.tag_value("key").is_none());
1014 }
1015
1016 #[tokio::test]
1017 async fn source_spec_compliance() {
1018 let mut memory_config = MemoryConfig::default();
1019 memory_config.source_config = Some(MemorySourceConfig {
1020 export_interval: Some(NonZeroU64::try_from(1).unwrap()),
1021 export_batch_size: None,
1022 remove_after_export: false,
1023 export_expired_items: false,
1024 source_key: "test".to_string(),
1025 });
1026 let memory = memory_config.get_or_build_memory().await;
1027 memory.handle_value(ObjectMap::from([("test_key".into(), Value::from(5))]));
1028
1029 let mut events: Vec<Event> = run_and_assert_source_compliance(
1030 memory_config,
1031 time::Duration::from_secs(5),
1032 &SOURCE_TAGS,
1033 )
1034 .await;
1035
1036 assert!(!events.is_empty());
1037 let event = events.remove(0);
1038 let log = event.as_log();
1039
1040 assert!(!log.value().is_empty());
1041 }
1042}