vector_core/source_sender/
sender.rs1#[cfg(any(test, feature = "test"))]
2use std::time::Duration;
3use std::{collections::HashMap, time::Instant};
4
5use futures::Stream;
6#[cfg(any(test, feature = "test"))]
7use futures::StreamExt as _;
8#[cfg(any(test, feature = "test"))]
9use metrics::histogram;
10use vector_buffers::EventCount;
11#[cfg(any(test, feature = "test"))]
12use vector_buffers::topology::channel::LimitedReceiver;
13#[cfg(any(test, feature = "test"))]
14use vector_common::internal_event::DEFAULT_OUTPUT;
15#[cfg(doc)]
16use vector_common::internal_event::{ComponentEventsDropped, EventsSent};
17use vector_common::{
18 byte_size_of::ByteSizeOf,
19 finalization::{AddBatchNotifier, BatchNotifier},
20 json_size::JsonSize,
21};
22
23use super::{Builder, Output, SendError};
24#[cfg(any(test, feature = "test"))]
25use super::{LAG_TIME_NAME, TEST_BUFFER_SIZE};
26use crate::{
27 EstimatedJsonEncodedSizeOf,
28 event::{Event, EventArray, EventContainer, array::EventArrayIntoIter},
29};
30#[cfg(any(test, feature = "test"))]
31use crate::{
32 config::OutputId,
33 event::{EventStatus, into_event_stream},
34};
35
36#[derive(Debug)]
43pub struct SourceSenderItem {
44 pub events: EventArray,
46 pub send_reference: Instant,
48}
49
50impl AddBatchNotifier for SourceSenderItem {
51 fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
52 self.events.add_batch_notifier(notifier);
53 }
54}
55
56impl ByteSizeOf for SourceSenderItem {
57 fn allocated_bytes(&self) -> usize {
58 self.events.allocated_bytes()
59 }
60}
61
62impl EventCount for SourceSenderItem {
63 fn event_count(&self) -> usize {
64 self.events.event_count()
65 }
66}
67
68impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
69 fn estimated_json_encoded_size_of(&self) -> JsonSize {
70 self.events.estimated_json_encoded_size_of()
71 }
72}
73
74impl EventContainer for SourceSenderItem {
75 type IntoIter = EventArrayIntoIter;
76
77 fn len(&self) -> usize {
78 self.events.len()
79 }
80
81 fn into_events(self) -> Self::IntoIter {
82 self.events.into_events()
83 }
84}
85
86impl From<SourceSenderItem> for EventArray {
87 fn from(val: SourceSenderItem) -> Self {
88 val.events
89 }
90}
91
92#[derive(Debug, Clone)]
93pub struct SourceSender {
94 pub(super) default_output: Option<Output>,
97 pub(super) named_outputs: HashMap<String, Output>,
98}
99
100impl SourceSender {
101 pub fn builder() -> Builder {
102 Builder::default()
103 }
104
105 #[cfg(any(test, feature = "test"))]
106 pub fn new_test_sender_with_options(
107 n: usize,
108 timeout: Option<Duration>,
109 ) -> (Self, LimitedReceiver<SourceSenderItem>) {
110 let lag_time = Some(histogram!(LAG_TIME_NAME));
111 let output_id = OutputId {
112 component: "test".to_string().into(),
113 port: None,
114 };
115 let (default_output, rx) = Output::new_with_buffer(
116 n,
117 DEFAULT_OUTPUT.to_owned(),
118 lag_time,
119 None,
120 output_id,
121 timeout,
122 None,
123 );
124 (
125 Self {
126 default_output: Some(default_output),
127 named_outputs: Default::default(),
128 },
129 rx,
130 )
131 }
132
133 #[cfg(any(test, feature = "test"))]
134 pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
135 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
136 let recv = recv.into_stream().flat_map(into_event_stream);
137 (pipe, recv)
138 }
139
140 #[cfg(any(test, feature = "test"))]
141 pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
142 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
143 let recv = recv.into_stream().flat_map(move |mut item| {
147 item.events.iter_events_mut().for_each(|mut event| {
148 let metadata = event.metadata_mut();
149 metadata.update_status(status);
150 metadata.update_sources();
151 });
152 into_event_stream(item)
153 });
154 (pipe, recv)
155 }
156
157 #[cfg(any(test, feature = "test"))]
158 pub fn new_test_errors(
159 error_at: impl Fn(usize) -> bool,
160 ) -> (Self, impl Stream<Item = Event> + Unpin) {
161 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
162 let mut count: usize = 0;
166 let recv = recv.into_stream().flat_map(move |mut item| {
167 let status = if error_at(count) {
168 EventStatus::Errored
169 } else {
170 EventStatus::Delivered
171 };
172 count += 1;
173 item.events.iter_events_mut().for_each(|mut event| {
174 let metadata = event.metadata_mut();
175 metadata.update_status(status);
176 metadata.update_sources();
177 });
178 into_event_stream(item)
179 });
180 (pipe, recv)
181 }
182
183 #[cfg(any(test, feature = "test"))]
184 pub fn add_outputs(
185 &mut self,
186 status: EventStatus,
187 name: String,
188 ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
189 let output_id = OutputId {
192 component: "test".to_string().into(),
193 port: Some(name.clone()),
194 };
195 let (output, recv) =
196 Output::new_with_buffer(100, name.clone(), None, None, output_id, None, None);
197 let recv = recv.into_stream().map(move |mut item| {
198 item.events.iter_events_mut().for_each(|mut event| {
199 let metadata = event.metadata_mut();
200 metadata.update_status(status);
201 metadata.update_sources();
202 });
203 item
204 });
205 self.named_outputs.insert(name, output);
206 recv
207 }
208
209 const fn default_output_mut(&mut self) -> &mut Output {
211 self.default_output.as_mut().expect("no default output")
212 }
213
214 pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), SendError> {
218 self.default_output_mut().send_event(event).await
219 }
220
221 pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), SendError>
225 where
226 S: Stream<Item = E> + Unpin,
227 E: Into<Event> + ByteSizeOf,
228 {
229 self.default_output_mut().send_event_stream(events).await
230 }
231
232 pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), SendError>
236 where
237 E: Into<Event> + ByteSizeOf,
238 I: IntoIterator<Item = E>,
239 <I as IntoIterator>::IntoIter: ExactSizeIterator,
240 {
241 self.default_output_mut().send_batch(events).await
242 }
243
244 pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), SendError>
248 where
249 E: Into<Event> + ByteSizeOf,
250 I: IntoIterator<Item = E>,
251 <I as IntoIterator>::IntoIter: ExactSizeIterator,
252 {
253 self.named_outputs
254 .get_mut(name)
255 .expect("unknown output")
256 .send_batch(events)
257 .await
258 }
259}