vector_core/source_sender/
sender.rs1#[cfg(any(test, feature = "test"))]
2use std::time::Duration;
3use std::{collections::HashMap, time::Instant};
4
5use futures::Stream;
6#[cfg(any(test, feature = "test"))]
7use futures::StreamExt as _;
8#[cfg(any(test, feature = "test"))]
9use metrics::histogram;
10use vector_buffers::EventCount;
11#[cfg(any(test, feature = "test"))]
12use vector_buffers::topology::channel::LimitedReceiver;
13#[cfg(any(test, feature = "test"))]
14use vector_common::internal_event::DEFAULT_OUTPUT;
15#[cfg(doc)]
16use vector_common::internal_event::{ComponentEventsDropped, EventsSent};
17use vector_common::{
18 byte_size_of::ByteSizeOf,
19 finalization::{AddBatchNotifier, BatchNotifier},
20 json_size::JsonSize,
21};
22
23use super::{Builder, Output, SendError};
24#[cfg(any(test, feature = "test"))]
25use super::{LAG_TIME_NAME, TEST_BUFFER_SIZE};
26use crate::{
27 EstimatedJsonEncodedSizeOf,
28 event::{Event, EventArray, EventContainer, array::EventArrayIntoIter},
29};
30#[cfg(any(test, feature = "test"))]
31use crate::{
32 config::OutputId,
33 event::{EventStatus, into_event_stream},
34};
35
36#[derive(Debug)]
43pub struct SourceSenderItem {
44 pub events: EventArray,
46 pub send_reference: Instant,
48}
49
50impl AddBatchNotifier for SourceSenderItem {
51 fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
52 self.events.add_batch_notifier(notifier);
53 }
54}
55
56impl ByteSizeOf for SourceSenderItem {
57 fn allocated_bytes(&self) -> usize {
58 self.events.allocated_bytes()
59 }
60}
61
62impl EventCount for SourceSenderItem {
63 fn event_count(&self) -> usize {
64 self.events.event_count()
65 }
66}
67
68impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
69 fn estimated_json_encoded_size_of(&self) -> JsonSize {
70 self.events.estimated_json_encoded_size_of()
71 }
72}
73
74impl EventContainer for SourceSenderItem {
75 type IntoIter = EventArrayIntoIter;
76
77 fn len(&self) -> usize {
78 self.events.len()
79 }
80
81 fn into_events(self) -> Self::IntoIter {
82 self.events.into_events()
83 }
84}
85
86impl From<SourceSenderItem> for EventArray {
87 fn from(val: SourceSenderItem) -> Self {
88 val.events
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct SourceSender {
94 pub(super) default_output: Option<Output>,
97 pub(super) named_outputs: HashMap<String, Output>,
98}
99
100impl SourceSender {
101 pub fn builder() -> Builder {
102 Builder::default()
103 }
104
105 #[cfg(any(test, feature = "test"))]
106 pub fn new_test_sender_with_options(
107 n: usize,
108 timeout: Option<Duration>,
109 ) -> (Self, LimitedReceiver<SourceSenderItem>) {
110 let lag_time = Some(histogram!(LAG_TIME_NAME));
111 let output_id = OutputId {
112 component: "test".to_string().into(),
113 port: None,
114 };
115 let (default_output, rx) = Output::new_with_buffer(
116 n,
117 DEFAULT_OUTPUT.to_owned(),
118 lag_time,
119 None,
120 output_id,
121 timeout,
122 );
123 (
124 Self {
125 default_output: Some(default_output),
126 named_outputs: Default::default(),
127 },
128 rx,
129 )
130 }
131
132 #[cfg(any(test, feature = "test"))]
133 pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
134 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
135 let recv = recv.into_stream().flat_map(into_event_stream);
136 (pipe, recv)
137 }
138
139 #[cfg(any(test, feature = "test"))]
140 pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
141 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
142 let recv = recv.into_stream().flat_map(move |mut item| {
146 item.events.iter_events_mut().for_each(|mut event| {
147 let metadata = event.metadata_mut();
148 metadata.update_status(status);
149 metadata.update_sources();
150 });
151 into_event_stream(item)
152 });
153 (pipe, recv)
154 }
155
156 #[cfg(any(test, feature = "test"))]
157 pub fn new_test_errors(
158 error_at: impl Fn(usize) -> bool,
159 ) -> (Self, impl Stream<Item = Event> + Unpin) {
160 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
161 let mut count: usize = 0;
165 let recv = recv.into_stream().flat_map(move |mut item| {
166 let status = if error_at(count) {
167 EventStatus::Errored
168 } else {
169 EventStatus::Delivered
170 };
171 count += 1;
172 item.events.iter_events_mut().for_each(|mut event| {
173 let metadata = event.metadata_mut();
174 metadata.update_status(status);
175 metadata.update_sources();
176 });
177 into_event_stream(item)
178 });
179 (pipe, recv)
180 }
181
182 #[cfg(any(test, feature = "test"))]
183 pub fn add_outputs(
184 &mut self,
185 status: EventStatus,
186 name: String,
187 ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
188 let output_id = OutputId {
191 component: "test".to_string().into(),
192 port: Some(name.clone()),
193 };
194 let (output, recv) =
195 Output::new_with_buffer(100, name.clone(), None, None, output_id, None);
196 let recv = recv.into_stream().map(move |mut item| {
197 item.events.iter_events_mut().for_each(|mut event| {
198 let metadata = event.metadata_mut();
199 metadata.update_status(status);
200 metadata.update_sources();
201 });
202 item
203 });
204 self.named_outputs.insert(name, output);
205 recv
206 }
207
208 const fn default_output_mut(&mut self) -> &mut Output {
210 self.default_output.as_mut().expect("no default output")
211 }
212
213 pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), SendError> {
217 self.default_output_mut().send_event(event).await
218 }
219
220 pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), SendError>
224 where
225 S: Stream<Item = E> + Unpin,
226 E: Into<Event> + ByteSizeOf,
227 {
228 self.default_output_mut().send_event_stream(events).await
229 }
230
231 pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), SendError>
235 where
236 E: Into<Event> + ByteSizeOf,
237 I: IntoIterator<Item = E>,
238 <I as IntoIterator>::IntoIter: ExactSizeIterator,
239 {
240 self.default_output_mut().send_batch(events).await
241 }
242
243 pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), SendError>
247 where
248 E: Into<Event> + ByteSizeOf,
249 I: IntoIterator<Item = E>,
250 <I as IntoIterator>::IntoIter: ExactSizeIterator,
251 {
252 self.named_outputs
253 .get_mut(name)
254 .expect("unknown output")
255 .send_batch(events)
256 .await
257 }
258}