vector_core/source_sender/
sender.rs

1#[cfg(any(test, feature = "test"))]
2use std::time::Duration;
3use std::{collections::HashMap, time::Instant};
4
5use futures::Stream;
6#[cfg(any(test, feature = "test"))]
7use futures::StreamExt as _;
8#[cfg(any(test, feature = "test"))]
9use metrics::histogram;
10use vector_buffers::EventCount;
11#[cfg(any(test, feature = "test"))]
12use vector_buffers::topology::channel::LimitedReceiver;
13#[cfg(any(test, feature = "test"))]
14use vector_common::internal_event::DEFAULT_OUTPUT;
15#[cfg(doc)]
16use vector_common::internal_event::{ComponentEventsDropped, EventsSent};
17use vector_common::{
18    byte_size_of::ByteSizeOf,
19    finalization::{AddBatchNotifier, BatchNotifier},
20    json_size::JsonSize,
21};
22
23use super::{Builder, Output, SendError};
24#[cfg(any(test, feature = "test"))]
25use super::{LAG_TIME_NAME, TEST_BUFFER_SIZE};
26use crate::{
27    EstimatedJsonEncodedSizeOf,
28    event::{Event, EventArray, EventContainer, array::EventArrayIntoIter},
29};
30#[cfg(any(test, feature = "test"))]
31use crate::{
32    config::OutputId,
33    event::{EventStatus, into_event_stream},
34};
35
36/// SourceSenderItem is a thin wrapper around [EventArray] used to track the send duration of a batch.
37///
38/// This is needed because the send duration is calculated as the difference between when the batch
39/// is sent from the origin component to when the batch is enqueued on the receiving component's input buffer.
40/// For sources in particular, this requires the batch to be enqueued on two channels: the origin component's pump
41/// channel and then the receiving component's input buffer.
42#[derive(Debug)]
43pub struct SourceSenderItem {
44    /// The batch of events to send.
45    pub events: EventArray,
46    /// Reference instant used to calculate send duration.
47    pub send_reference: Instant,
48}
49
50impl AddBatchNotifier for SourceSenderItem {
51    fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
52        self.events.add_batch_notifier(notifier);
53    }
54}
55
56impl ByteSizeOf for SourceSenderItem {
57    fn allocated_bytes(&self) -> usize {
58        self.events.allocated_bytes()
59    }
60}
61
62impl EventCount for SourceSenderItem {
63    fn event_count(&self) -> usize {
64        self.events.event_count()
65    }
66}
67
68impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
69    fn estimated_json_encoded_size_of(&self) -> JsonSize {
70        self.events.estimated_json_encoded_size_of()
71    }
72}
73
74impl EventContainer for SourceSenderItem {
75    type IntoIter = EventArrayIntoIter;
76
77    fn len(&self) -> usize {
78        self.events.len()
79    }
80
81    fn into_events(self) -> Self::IntoIter {
82        self.events.into_events()
83    }
84}
85
86impl From<SourceSenderItem> for EventArray {
87    fn from(val: SourceSenderItem) -> Self {
88        val.events
89    }
90}
91
92#[derive(Debug, Clone)]
93pub struct SourceSender {
94    // The default output is optional because some sources, e.g. `datadog_agent`
95    // and `opentelemetry`, can be configured to only output to named outputs.
96    pub(super) default_output: Option<Output>,
97    pub(super) named_outputs: HashMap<String, Output>,
98}
99
100impl SourceSender {
101    pub fn builder() -> Builder {
102        Builder::default()
103    }
104
105    #[cfg(any(test, feature = "test"))]
106    pub fn new_test_sender_with_options(
107        n: usize,
108        timeout: Option<Duration>,
109    ) -> (Self, LimitedReceiver<SourceSenderItem>) {
110        let lag_time = Some(histogram!(LAG_TIME_NAME));
111        let output_id = OutputId {
112            component: "test".to_string().into(),
113            port: None,
114        };
115        let (default_output, rx) = Output::new_with_buffer(
116            n,
117            DEFAULT_OUTPUT.to_owned(),
118            lag_time,
119            None,
120            output_id,
121            timeout,
122            None,
123        );
124        (
125            Self {
126                default_output: Some(default_output),
127                named_outputs: Default::default(),
128            },
129            rx,
130        )
131    }
132
133    #[cfg(any(test, feature = "test"))]
134    pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
135        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
136        let recv = recv.into_stream().flat_map(into_event_stream);
137        (pipe, recv)
138    }
139
140    #[cfg(any(test, feature = "test"))]
141    pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
142        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
143        // In a source test pipeline, there is no sink to acknowledge
144        // events, so we have to add a map to the receiver to handle the
145        // finalization.
146        let recv = recv.into_stream().flat_map(move |mut item| {
147            item.events.iter_events_mut().for_each(|mut event| {
148                let metadata = event.metadata_mut();
149                metadata.update_status(status);
150                metadata.update_sources();
151            });
152            into_event_stream(item)
153        });
154        (pipe, recv)
155    }
156
157    #[cfg(any(test, feature = "test"))]
158    pub fn new_test_errors(
159        error_at: impl Fn(usize) -> bool,
160    ) -> (Self, impl Stream<Item = Event> + Unpin) {
161        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
162        // In a source test pipeline, there is no sink to acknowledge
163        // events, so we have to add a map to the receiver to handle the
164        // finalization.
165        let mut count: usize = 0;
166        let recv = recv.into_stream().flat_map(move |mut item| {
167            let status = if error_at(count) {
168                EventStatus::Errored
169            } else {
170                EventStatus::Delivered
171            };
172            count += 1;
173            item.events.iter_events_mut().for_each(|mut event| {
174                let metadata = event.metadata_mut();
175                metadata.update_status(status);
176                metadata.update_sources();
177            });
178            into_event_stream(item)
179        });
180        (pipe, recv)
181    }
182
183    #[cfg(any(test, feature = "test"))]
184    pub fn add_outputs(
185        &mut self,
186        status: EventStatus,
187        name: String,
188    ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
189        // The lag_time parameter here will need to be filled in if this function is ever used for
190        // non-test situations.
191        let output_id = OutputId {
192            component: "test".to_string().into(),
193            port: Some(name.clone()),
194        };
195        let (output, recv) =
196            Output::new_with_buffer(100, name.clone(), None, None, output_id, None, None);
197        let recv = recv.into_stream().map(move |mut item| {
198            item.events.iter_events_mut().for_each(|mut event| {
199                let metadata = event.metadata_mut();
200                metadata.update_status(status);
201                metadata.update_sources();
202            });
203            item
204        });
205        self.named_outputs.insert(name, output);
206        recv
207    }
208
209    /// Get a mutable reference to the default output, panicking if none exists.
210    const fn default_output_mut(&mut self) -> &mut Output {
211        self.default_output.as_mut().expect("no default output")
212    }
213
214    /// Send an event to the default output.
215    ///
216    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
217    pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), SendError> {
218        self.default_output_mut().send_event(event).await
219    }
220
221    /// Send a stream of events to the default output.
222    ///
223    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
224    pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), SendError>
225    where
226        S: Stream<Item = E> + Unpin,
227        E: Into<Event> + ByteSizeOf,
228    {
229        self.default_output_mut().send_event_stream(events).await
230    }
231
232    /// Send a batch of events to the default output.
233    ///
234    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
235    pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), SendError>
236    where
237        E: Into<Event> + ByteSizeOf,
238        I: IntoIterator<Item = E>,
239        <I as IntoIterator>::IntoIter: ExactSizeIterator,
240    {
241        self.default_output_mut().send_batch(events).await
242    }
243
244    /// Send a batch of events event to a named output.
245    ///
246    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
247    pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), SendError>
248    where
249        E: Into<Event> + ByteSizeOf,
250        I: IntoIterator<Item = E>,
251        <I as IntoIterator>::IntoIter: ExactSizeIterator,
252    {
253        self.named_outputs
254            .get_mut(name)
255            .expect("unknown output")
256            .send_batch(events)
257            .await
258    }
259}