vector/enrichment_tables/memory/
source.rs1use chrono::Utc;
2use futures::StreamExt;
3use std::{
4 num::NonZeroU64,
5 time::{Duration, Instant},
6};
7use tokio::time::interval;
8use tokio_stream::wrappers::IntervalStream;
9use vector_lib::{
10 config::LogNamespace,
11 configurable::configurable_component,
12 event::{Event, EventMetadata, LogEvent},
13 internal_event::{
14 ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
15 },
16 shutdown::ShutdownSignal,
17 ByteSizeOf, EstimatedJsonEncodedSizeOf,
18};
19
20use crate::{internal_events::StreamClosedError, SourceSender};
21
22use super::{Memory, MemoryConfig};
23
24#[configurable_component]
26#[derive(Clone, Debug, PartialEq, Eq)]
27#[serde(deny_unknown_fields)]
28pub struct MemorySourceConfig {
29 pub export_interval: NonZeroU64,
31 #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
36 pub export_batch_size: Option<u64>,
37 #[serde(default = "crate::serde::default_false")]
42 pub remove_after_export: bool,
43 pub source_key: String,
46}
47
48pub(crate) struct MemorySource {
50 pub(super) memory: Memory,
51 pub(super) shutdown: ShutdownSignal,
52 pub(super) out: SourceSender,
53 pub(super) log_namespace: LogNamespace,
54}
55
56impl MemorySource {
57 pub(crate) async fn run(mut self) -> Result<(), ()> {
58 let events_received = register!(EventsReceived);
59 let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
60 let source_config = self
61 .memory
62 .config
63 .source_config
64 .as_ref()
65 .expect("Unexpected missing source config in memory table used as a source.");
66 let mut interval = IntervalStream::new(interval(Duration::from_secs(
67 source_config.export_interval.into(),
68 )))
69 .take_until(self.shutdown);
70
71 while interval.next().await.is_some() {
72 let mut sent = 0_usize;
73 loop {
74 let mut events = Vec::new();
75 {
76 let mut writer = self.memory.write_handle.lock().unwrap();
77 if let Some(reader) = self.memory.get_read_handle().read() {
78 let now = Instant::now();
79 let utc_now = Utc::now();
80 events = reader
81 .iter()
82 .skip(if source_config.remove_after_export {
83 0
84 } else {
85 sent
86 })
87 .take(if let Some(batch_size) = source_config.export_batch_size {
88 batch_size as usize
89 } else {
90 usize::MAX
91 })
92 .filter_map(|(k, v)| {
93 if source_config.remove_after_export {
94 writer.write_handle.empty(k.clone());
95 }
96 v.get_one().map(|v| (k, v))
97 })
98 .filter_map(|(k, v)| {
99 let mut event = Event::Log(LogEvent::from_map(
100 v.as_object_map(now, self.memory.config.ttl, k).ok()?,
101 EventMetadata::default(),
102 ));
103 let log = event.as_mut_log();
104 self.log_namespace.insert_standard_vector_source_metadata(
105 log,
106 MemoryConfig::NAME,
107 utc_now,
108 );
109
110 Some(event)
111 })
112 .collect::<Vec<_>>();
113 if source_config.remove_after_export {
114 writer.write_handle.refresh();
115 }
116 }
117 }
118 let count = events.len();
119 let byte_size = events.size_of();
120 let json_size = events.estimated_json_encoded_size_of();
121 bytes_received.emit(ByteSize(byte_size));
122 events_received.emit(CountByteSize(count, json_size));
123 if self.out.send_batch(events).await.is_err() {
124 emit!(StreamClosedError { count });
125 }
126
127 sent += count;
128 match source_config.export_batch_size {
129 None => break,
130 Some(export_batch_size) if count < export_batch_size as usize => break,
131 _ => {}
132 }
133 }
134 }
135
136 Ok(())
137 }
138}