vector/enrichment_tables/memory/
source.rs

1use std::time::{Duration, Instant};
2
3use chrono::Utc;
4use futures::StreamExt;
5use tokio::time::interval;
6use tokio_stream::wrappers::IntervalStream;
7use vector_lib::{
8    ByteSizeOf, EstimatedJsonEncodedSizeOf,
9    config::LogNamespace,
10    event::{Event, EventMetadata, LogEvent},
11    internal_event::{
12        ByteSize, BytesReceived, BytesReceivedHandle, CountByteSize, EventsReceived,
13        EventsReceivedHandle, InternalEventHandle, Protocol,
14    },
15    shutdown::ShutdownSignal,
16};
17
18use super::{Memory, MemoryConfig};
19use crate::{
20    SourceSender,
21    enrichment_tables::memory::{MemoryEntryPair, MemorySourceConfig},
22    internal_events::StreamClosedError,
23};
24
25pub(crate) const EXPIRED_ROUTE: &str = "expired";
26
27/// A struct that represents Memory when used as a source.
28pub(crate) struct MemorySource {
29    pub(super) memory: Memory,
30    pub(super) shutdown: ShutdownSignal,
31    pub(super) out: SourceSender,
32    pub(super) log_namespace: LogNamespace,
33}
34
35impl MemorySource {
36    pub(crate) async fn run(mut self) -> Result<(), ()> {
37        let events_received = register!(EventsReceived);
38        let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
39        let source_config = self
40            .memory
41            .config
42            .source_config
43            .clone()
44            .expect("Unexpected missing source config in memory table used as a source.");
45        let mut interval = IntervalStream::new(interval(Duration::from_secs(
46            source_config
47                .export_interval
48                .map(Into::into)
49                .unwrap_or(u64::MAX),
50        )))
51        .take_until(self.shutdown.clone());
52        let mut expired_receiver = self.memory.subscribe_to_expired_items();
53
54        loop {
55            tokio::select! {
56                interval_time = interval.next() => {
57                    if interval_time.is_none() {
58                        break;
59                    }
60                    self.export_table_items(&source_config, &events_received, &bytes_received).await;
61                },
62
63                Ok(expired) = expired_receiver.recv() => {
64                    self.export_expired_entries(expired, &events_received, &bytes_received).await;
65                }
66            }
67        }
68
69        Ok(())
70    }
71
72    async fn export_table_items(
73        &mut self,
74        source_config: &MemorySourceConfig,
75        events_received: &EventsReceivedHandle,
76        bytes_received: &BytesReceivedHandle,
77    ) {
78        let mut sent = 0_usize;
79        loop {
80            let mut events = Vec::new();
81            {
82                let mut writer = self.memory.write_handle.lock().unwrap();
83                if let Some(reader) = self.memory.get_read_handle().read() {
84                    let now = Instant::now();
85                    let utc_now = Utc::now();
86                    events = reader
87                        .iter()
88                        .skip(if source_config.remove_after_export {
89                            0
90                        } else {
91                            sent
92                        })
93                        .take(if let Some(batch_size) = source_config.export_batch_size {
94                            batch_size as usize
95                        } else {
96                            usize::MAX
97                        })
98                        .filter_map(|(k, v)| {
99                            if source_config.remove_after_export {
100                                writer.write_handle.empty(k.clone());
101                            }
102                            v.get_one().map(|v| (k, v))
103                        })
104                        .filter_map(|(k, v)| {
105                            let mut event = Event::Log(LogEvent::from_map(
106                                v.as_object_map(now, k).ok()?,
107                                EventMetadata::default(),
108                            ));
109                            let log = event.as_mut_log();
110                            self.log_namespace.insert_standard_vector_source_metadata(
111                                log,
112                                MemoryConfig::NAME,
113                                utc_now,
114                            );
115
116                            Some(event)
117                        })
118                        .collect::<Vec<_>>();
119                    if source_config.remove_after_export {
120                        writer.write_handle.refresh();
121                    }
122                }
123            }
124            let count = events.len();
125            let byte_size = events.size_of();
126            let json_size = events.estimated_json_encoded_size_of();
127            bytes_received.emit(ByteSize(byte_size));
128            events_received.emit(CountByteSize(count, json_size));
129            if self.out.send_batch(events).await.is_err() {
130                emit!(StreamClosedError { count });
131            }
132
133            sent += count;
134            match source_config.export_batch_size {
135                None => break,
136                Some(export_batch_size) if count < export_batch_size as usize => break,
137                _ => {}
138            }
139        }
140    }
141
142    async fn export_expired_entries(
143        &mut self,
144        entries: Vec<MemoryEntryPair>,
145        events_received: &EventsReceivedHandle,
146        bytes_received: &BytesReceivedHandle,
147    ) {
148        let now = Instant::now();
149        let events = entries
150            .into_iter()
151            .filter_map(
152                |MemoryEntryPair {
153                     key,
154                     entry: expired_event,
155                 }| {
156                    let mut event = Event::Log(LogEvent::from_map(
157                        expired_event.as_object_map(now, &key).ok()?,
158                        EventMetadata::default(),
159                    ));
160                    let log = event.as_mut_log();
161                    self.log_namespace.insert_standard_vector_source_metadata(
162                        log,
163                        MemoryConfig::NAME,
164                        Utc::now(),
165                    );
166                    Some(event)
167                },
168            )
169            .collect::<Vec<_>>();
170        let count = events.len();
171        let byte_size = events.size_of();
172        let json_size = events.estimated_json_encoded_size_of();
173        bytes_received.emit(ByteSize(byte_size));
174        events_received.emit(CountByteSize(count, json_size));
175        if self
176            .out
177            .send_batch_named(EXPIRED_ROUTE, events)
178            .await
179            .is_err()
180        {
181            emit!(StreamClosedError { count });
182        }
183    }
184}