vector/enrichment_tables/memory/
source.rs1use std::time::{Duration, Instant};
2
3use chrono::Utc;
4use futures::StreamExt;
5use tokio::time::interval;
6use tokio_stream::wrappers::IntervalStream;
7use vector_lib::{
8 ByteSizeOf, EstimatedJsonEncodedSizeOf,
9 config::LogNamespace,
10 event::{Event, EventMetadata, LogEvent},
11 internal_event::{
12 ByteSize, BytesReceived, BytesReceivedHandle, CountByteSize, EventsReceived,
13 EventsReceivedHandle, InternalEventHandle, Protocol,
14 },
15 shutdown::ShutdownSignal,
16};
17
18use super::{Memory, MemoryConfig};
19use crate::{
20 SourceSender,
21 enrichment_tables::memory::{MemoryEntryPair, MemorySourceConfig},
22 internal_events::StreamClosedError,
23};
24
25pub(crate) const EXPIRED_ROUTE: &str = "expired";
26
27pub(crate) struct MemorySource {
29 pub(super) memory: Memory,
30 pub(super) shutdown: ShutdownSignal,
31 pub(super) out: SourceSender,
32 pub(super) log_namespace: LogNamespace,
33}
34
35impl MemorySource {
36 pub(crate) async fn run(mut self) -> Result<(), ()> {
37 let events_received = register!(EventsReceived);
38 let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
39 let source_config = self
40 .memory
41 .config
42 .source_config
43 .clone()
44 .expect("Unexpected missing source config in memory table used as a source.");
45 let mut interval = IntervalStream::new(interval(Duration::from_secs(
46 source_config
47 .export_interval
48 .map(Into::into)
49 .unwrap_or(u64::MAX),
50 )))
51 .take_until(self.shutdown.clone());
52 let mut expired_receiver = self.memory.subscribe_to_expired_items();
53
54 loop {
55 tokio::select! {
56 interval_time = interval.next() => {
57 if interval_time.is_none() {
58 break;
59 }
60 self.export_table_items(&source_config, &events_received, &bytes_received).await;
61 },
62
63 Ok(expired) = expired_receiver.recv() => {
64 self.export_expired_entries(expired, &events_received, &bytes_received).await;
65 }
66 }
67 }
68
69 Ok(())
70 }
71
72 async fn export_table_items(
73 &mut self,
74 source_config: &MemorySourceConfig,
75 events_received: &EventsReceivedHandle,
76 bytes_received: &BytesReceivedHandle,
77 ) {
78 let mut sent = 0_usize;
79 loop {
80 let mut events = Vec::new();
81 {
82 let mut writer = self.memory.write_handle.lock().unwrap();
83 if let Some(reader) = self.memory.get_read_handle().read() {
84 let now = Instant::now();
85 let utc_now = Utc::now();
86 events = reader
87 .iter()
88 .skip(if source_config.remove_after_export {
89 0
90 } else {
91 sent
92 })
93 .take(if let Some(batch_size) = source_config.export_batch_size {
94 batch_size as usize
95 } else {
96 usize::MAX
97 })
98 .filter_map(|(k, v)| {
99 if source_config.remove_after_export {
100 writer.write_handle.empty(k.clone());
101 }
102 v.get_one().map(|v| (k, v))
103 })
104 .filter_map(|(k, v)| {
105 let mut event = Event::Log(LogEvent::from_map(
106 v.as_object_map(now, k).ok()?,
107 EventMetadata::default(),
108 ));
109 let log = event.as_mut_log();
110 self.log_namespace.insert_standard_vector_source_metadata(
111 log,
112 MemoryConfig::NAME,
113 utc_now,
114 );
115
116 Some(event)
117 })
118 .collect::<Vec<_>>();
119 if source_config.remove_after_export {
120 writer.write_handle.refresh();
121 }
122 }
123 }
124 let count = events.len();
125 let byte_size = events.size_of();
126 let json_size = events.estimated_json_encoded_size_of();
127 bytes_received.emit(ByteSize(byte_size));
128 events_received.emit(CountByteSize(count, json_size));
129 if self.out.send_batch(events).await.is_err() {
130 emit!(StreamClosedError { count });
131 }
132
133 sent += count;
134 match source_config.export_batch_size {
135 None => break,
136 Some(export_batch_size) if count < export_batch_size as usize => break,
137 _ => {}
138 }
139 }
140 }
141
142 async fn export_expired_entries(
143 &mut self,
144 entries: Vec<MemoryEntryPair>,
145 events_received: &EventsReceivedHandle,
146 bytes_received: &BytesReceivedHandle,
147 ) {
148 let now = Instant::now();
149 let events = entries
150 .into_iter()
151 .filter_map(
152 |MemoryEntryPair {
153 key,
154 entry: expired_event,
155 }| {
156 let mut event = Event::Log(LogEvent::from_map(
157 expired_event.as_object_map(now, &key).ok()?,
158 EventMetadata::default(),
159 ));
160 let log = event.as_mut_log();
161 self.log_namespace.insert_standard_vector_source_metadata(
162 log,
163 MemoryConfig::NAME,
164 Utc::now(),
165 );
166 Some(event)
167 },
168 )
169 .collect::<Vec<_>>();
170 let count = events.len();
171 let byte_size = events.size_of();
172 let json_size = events.estimated_json_encoded_size_of();
173 bytes_received.emit(ByteSize(byte_size));
174 events_received.emit(CountByteSize(count, json_size));
175 if self
176 .out
177 .send_batch_named(EXPIRED_ROUTE, events)
178 .await
179 .is_err()
180 {
181 emit!(StreamClosedError { count });
182 }
183 }
184}