vector/enrichment_tables/memory/
source.rs

1use chrono::Utc;
2use futures::StreamExt;
3use std::{
4    num::NonZeroU64,
5    time::{Duration, Instant},
6};
7use tokio::time::interval;
8use tokio_stream::wrappers::IntervalStream;
9use vector_lib::{
10    config::LogNamespace,
11    configurable::configurable_component,
12    event::{Event, EventMetadata, LogEvent},
13    internal_event::{
14        ByteSize, BytesReceived, CountByteSize, EventsReceived, InternalEventHandle, Protocol,
15    },
16    shutdown::ShutdownSignal,
17    ByteSizeOf, EstimatedJsonEncodedSizeOf,
18};
19
20use crate::{internal_events::StreamClosedError, SourceSender};
21
22use super::{Memory, MemoryConfig};
23
24/// Configuration for memory enrichment table source functionality.
25#[configurable_component]
26#[derive(Clone, Debug, PartialEq, Eq)]
27#[serde(deny_unknown_fields)]
28pub struct MemorySourceConfig {
29    /// Interval for exporting all data from the table when used as a source.
30    pub export_interval: NonZeroU64,
31    /// Batch size for data exporting. Used to prevent exporting entire table at
32    /// once and blocking the system.
33    ///
34    /// By default, batches are not used and entire table is exported.
35    #[serde(skip_serializing_if = "vector_lib::serde::is_default")]
36    pub export_batch_size: Option<u64>,
37    /// If set to true, all data will be removed from cache after exporting.
38    /// Only valid if used as a source and export_interval > 0
39    ///
40    /// By default, export will not remove data from cache
41    #[serde(default = "crate::serde::default_false")]
42    pub remove_after_export: bool,
43    /// Key to use for this component when used as a source. This must be different from the
44    /// component key.
45    pub source_key: String,
46}
47
48/// A struct that represents Memory when used as a source.
49pub(crate) struct MemorySource {
50    pub(super) memory: Memory,
51    pub(super) shutdown: ShutdownSignal,
52    pub(super) out: SourceSender,
53    pub(super) log_namespace: LogNamespace,
54}
55
56impl MemorySource {
57    pub(crate) async fn run(mut self) -> Result<(), ()> {
58        let events_received = register!(EventsReceived);
59        let bytes_received = register!(BytesReceived::from(Protocol::INTERNAL));
60        let source_config = self
61            .memory
62            .config
63            .source_config
64            .as_ref()
65            .expect("Unexpected missing source config in memory table used as a source.");
66        let mut interval = IntervalStream::new(interval(Duration::from_secs(
67            source_config.export_interval.into(),
68        )))
69        .take_until(self.shutdown);
70
71        while interval.next().await.is_some() {
72            let mut sent = 0_usize;
73            loop {
74                let mut events = Vec::new();
75                {
76                    let mut writer = self.memory.write_handle.lock().unwrap();
77                    if let Some(reader) = self.memory.get_read_handle().read() {
78                        let now = Instant::now();
79                        let utc_now = Utc::now();
80                        events = reader
81                            .iter()
82                            .skip(if source_config.remove_after_export {
83                                0
84                            } else {
85                                sent
86                            })
87                            .take(if let Some(batch_size) = source_config.export_batch_size {
88                                batch_size as usize
89                            } else {
90                                usize::MAX
91                            })
92                            .filter_map(|(k, v)| {
93                                if source_config.remove_after_export {
94                                    writer.write_handle.empty(k.clone());
95                                }
96                                v.get_one().map(|v| (k, v))
97                            })
98                            .filter_map(|(k, v)| {
99                                let mut event = Event::Log(LogEvent::from_map(
100                                    v.as_object_map(now, self.memory.config.ttl, k).ok()?,
101                                    EventMetadata::default(),
102                                ));
103                                let log = event.as_mut_log();
104                                self.log_namespace.insert_standard_vector_source_metadata(
105                                    log,
106                                    MemoryConfig::NAME,
107                                    utc_now,
108                                );
109
110                                Some(event)
111                            })
112                            .collect::<Vec<_>>();
113                        if source_config.remove_after_export {
114                            writer.write_handle.refresh();
115                        }
116                    }
117                }
118                let count = events.len();
119                let byte_size = events.size_of();
120                let json_size = events.estimated_json_encoded_size_of();
121                bytes_received.emit(ByteSize(byte_size));
122                events_received.emit(CountByteSize(count, json_size));
123                if self.out.send_batch(events).await.is_err() {
124                    emit!(StreamClosedError { count });
125                }
126
127                sent += count;
128                match source_config.export_batch_size {
129                    None => break,
130                    Some(export_batch_size) if count < export_batch_size as usize => break,
131                    _ => {}
132                }
133            }
134        }
135
136        Ok(())
137    }
138}