vector_core/source_sender/
sender.rs1#[cfg(any(test, feature = "test"))]
2use std::time::Duration;
3use std::{collections::HashMap, time::Instant};
4
5use futures::Stream;
6#[cfg(any(test, feature = "test"))]
7use futures::StreamExt as _;
8#[cfg(any(test, feature = "test"))]
9use metrics::histogram;
10use vector_buffers::EventCount;
11#[cfg(any(test, feature = "test"))]
12use vector_buffers::topology::channel::LimitedReceiver;
13#[cfg(any(test, feature = "test"))]
14use vector_common::internal_event::DEFAULT_OUTPUT;
15#[cfg(doc)]
16use vector_common::internal_event::{ComponentEventsDropped, EventsSent};
17use vector_common::{
18 byte_size_of::ByteSizeOf,
19 finalization::{AddBatchNotifier, BatchNotifier},
20 json_size::JsonSize,
21};
22
23use super::{Builder, Output, SendError};
24#[cfg(any(test, feature = "test"))]
25use super::{
26 LAG_TIME_NAME, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME, TEST_BUFFER_SIZE,
27};
28use crate::{
29 EstimatedJsonEncodedSizeOf,
30 event::{Event, EventArray, EventContainer, array::EventArrayIntoIter},
31};
32#[cfg(any(test, feature = "test"))]
33use crate::{
34 config::OutputId,
35 event::{EventStatus, into_event_stream},
36};
37
38#[derive(Debug)]
45pub struct SourceSenderItem {
46 pub events: EventArray,
48 pub send_reference: Instant,
50}
51
52impl AddBatchNotifier for SourceSenderItem {
53 fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
54 self.events.add_batch_notifier(notifier);
55 }
56}
57
58impl ByteSizeOf for SourceSenderItem {
59 fn allocated_bytes(&self) -> usize {
60 self.events.allocated_bytes()
61 }
62}
63
64impl EventCount for SourceSenderItem {
65 fn event_count(&self) -> usize {
66 self.events.event_count()
67 }
68}
69
70impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
71 fn estimated_json_encoded_size_of(&self) -> JsonSize {
72 self.events.estimated_json_encoded_size_of()
73 }
74}
75
76impl EventContainer for SourceSenderItem {
77 type IntoIter = EventArrayIntoIter;
78
79 fn len(&self) -> usize {
80 self.events.len()
81 }
82
83 fn into_events(self) -> Self::IntoIter {
84 self.events.into_events()
85 }
86}
87
88impl From<SourceSenderItem> for EventArray {
89 fn from(val: SourceSenderItem) -> Self {
90 val.events
91 }
92}
93
94#[derive(Debug, Clone)]
95pub struct SourceSender {
96 pub(super) default_output: Option<Output>,
99 pub(super) named_outputs: HashMap<String, Output>,
100}
101
102impl SourceSender {
103 pub fn builder() -> Builder {
104 Builder::default()
105 }
106
107 #[cfg(any(test, feature = "test"))]
108 pub fn new_test_sender_with_options(
109 n: usize,
110 timeout: Option<Duration>,
111 ) -> (Self, LimitedReceiver<SourceSenderItem>) {
112 let lag_time = Some(histogram!(LAG_TIME_NAME));
113 let send_latency = Some(histogram!(SEND_LATENCY_NAME));
114 let send_batch_latency = Some(histogram!(SEND_BATCH_LATENCY_NAME));
115 let output_id = OutputId {
116 component: "test".to_string().into(),
117 port: None,
118 };
119 let (default_output, rx) = Output::new_with_buffer(
120 n,
121 DEFAULT_OUTPUT.to_owned(),
122 OutputMetrics::new(lag_time, send_latency, send_batch_latency),
123 None,
124 output_id,
125 timeout,
126 None,
127 );
128 (
129 Self {
130 default_output: Some(default_output),
131 named_outputs: Default::default(),
132 },
133 rx,
134 )
135 }
136
137 #[cfg(any(test, feature = "test"))]
138 pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
139 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
140 let recv = recv.into_stream().flat_map(into_event_stream);
141 (pipe, recv)
142 }
143
144 #[cfg(any(test, feature = "test"))]
145 pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
146 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
147 let recv = recv.into_stream().flat_map(move |mut item| {
151 item.events.iter_events_mut().for_each(|mut event| {
152 let metadata = event.metadata_mut();
153 metadata.update_status(status);
154 metadata.update_sources();
155 });
156 into_event_stream(item)
157 });
158 (pipe, recv)
159 }
160
161 #[cfg(any(test, feature = "test"))]
162 pub fn new_test_errors(
163 error_at: impl Fn(usize) -> bool,
164 ) -> (Self, impl Stream<Item = Event> + Unpin) {
165 let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
166 let mut count: usize = 0;
170 let recv = recv.into_stream().flat_map(move |mut item| {
171 let status = if error_at(count) {
172 EventStatus::Errored
173 } else {
174 EventStatus::Delivered
175 };
176 count += 1;
177 item.events.iter_events_mut().for_each(|mut event| {
178 let metadata = event.metadata_mut();
179 metadata.update_status(status);
180 metadata.update_sources();
181 });
182 into_event_stream(item)
183 });
184 (pipe, recv)
185 }
186
187 #[cfg(any(test, feature = "test"))]
188 pub fn add_outputs(
189 &mut self,
190 status: EventStatus,
191 name: String,
192 ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
193 let output_id = OutputId {
196 component: "test".to_string().into(),
197 port: Some(name.clone()),
198 };
199 let (output, recv) = Output::new_with_buffer(
200 100,
201 name.clone(),
202 OutputMetrics::default(),
203 None,
204 output_id,
205 None,
206 None,
207 );
208 let recv = recv.into_stream().map(move |mut item| {
209 item.events.iter_events_mut().for_each(|mut event| {
210 let metadata = event.metadata_mut();
211 metadata.update_status(status);
212 metadata.update_sources();
213 });
214 item
215 });
216 self.named_outputs.insert(name, output);
217 recv
218 }
219
220 const fn default_output_mut(&mut self) -> &mut Output {
222 self.default_output.as_mut().expect("no default output")
223 }
224
225 pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), SendError> {
229 self.default_output_mut().send_event(event).await
230 }
231
232 pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), SendError>
236 where
237 S: Stream<Item = E> + Unpin,
238 E: Into<Event> + ByteSizeOf,
239 {
240 self.default_output_mut().send_event_stream(events).await
241 }
242
243 pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), SendError>
247 where
248 E: Into<Event> + ByteSizeOf,
249 I: IntoIterator<Item = E>,
250 <I as IntoIterator>::IntoIter: ExactSizeIterator,
251 {
252 self.default_output_mut().send_batch(events).await
253 }
254
255 pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), SendError>
259 where
260 E: Into<Event> + ByteSizeOf,
261 I: IntoIterator<Item = E>,
262 <I as IntoIterator>::IntoIter: ExactSizeIterator,
263 {
264 self.named_outputs
265 .get_mut(name)
266 .expect("unknown output")
267 .send_batch(events)
268 .await
269 }
270}