vector_core/source_sender/
sender.rs

1#[cfg(any(test, feature = "test"))]
2use std::time::Duration;
3use std::{collections::HashMap, time::Instant};
4
5use futures::Stream;
6#[cfg(any(test, feature = "test"))]
7use futures::StreamExt as _;
8#[cfg(any(test, feature = "test"))]
9use metrics::histogram;
10use vector_buffers::EventCount;
11#[cfg(any(test, feature = "test"))]
12use vector_buffers::topology::channel::LimitedReceiver;
13#[cfg(any(test, feature = "test"))]
14use vector_common::internal_event::DEFAULT_OUTPUT;
15#[cfg(doc)]
16use vector_common::internal_event::{ComponentEventsDropped, EventsSent};
17use vector_common::{
18    byte_size_of::ByteSizeOf,
19    finalization::{AddBatchNotifier, BatchNotifier},
20    json_size::JsonSize,
21};
22
23use super::{Builder, Output, SendError};
24#[cfg(any(test, feature = "test"))]
25use super::{LAG_TIME_NAME, TEST_BUFFER_SIZE};
26use crate::{
27    EstimatedJsonEncodedSizeOf,
28    event::{Event, EventArray, EventContainer, array::EventArrayIntoIter},
29};
30#[cfg(any(test, feature = "test"))]
31use crate::{
32    config::OutputId,
33    event::{EventStatus, into_event_stream},
34};
35
36/// SourceSenderItem is a thin wrapper around [EventArray] used to track the send duration of a batch.
37///
38/// This is needed because the send duration is calculated as the difference between when the batch
39/// is sent from the origin component to when the batch is enqueued on the receiving component's input buffer.
40/// For sources in particular, this requires the batch to be enqueued on two channels: the origin component's pump
41/// channel and then the receiving component's input buffer.
42#[derive(Debug)]
43pub struct SourceSenderItem {
44    /// The batch of events to send.
45    pub events: EventArray,
46    /// Reference instant used to calculate send duration.
47    pub send_reference: Instant,
48}
49
50impl AddBatchNotifier for SourceSenderItem {
51    fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
52        self.events.add_batch_notifier(notifier);
53    }
54}
55
56impl ByteSizeOf for SourceSenderItem {
57    fn allocated_bytes(&self) -> usize {
58        self.events.allocated_bytes()
59    }
60}
61
62impl EventCount for SourceSenderItem {
63    fn event_count(&self) -> usize {
64        self.events.event_count()
65    }
66}
67
68impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
69    fn estimated_json_encoded_size_of(&self) -> JsonSize {
70        self.events.estimated_json_encoded_size_of()
71    }
72}
73
74impl EventContainer for SourceSenderItem {
75    type IntoIter = EventArrayIntoIter;
76
77    fn len(&self) -> usize {
78        self.events.len()
79    }
80
81    fn into_events(self) -> Self::IntoIter {
82        self.events.into_events()
83    }
84}
85
86impl From<SourceSenderItem> for EventArray {
87    fn from(val: SourceSenderItem) -> Self {
88        val.events
89    }
90}
91
92#[derive(Debug, Clone)]
93pub struct SourceSender {
94    // The default output is optional because some sources, e.g. `datadog_agent`
95    // and `opentelemetry`, can be configured to only output to named outputs.
96    pub(super) default_output: Option<Output>,
97    pub(super) named_outputs: HashMap<String, Output>,
98}
99
100impl SourceSender {
101    pub fn builder() -> Builder {
102        Builder::default()
103    }
104
105    #[cfg(any(test, feature = "test"))]
106    pub fn new_test_sender_with_options(
107        n: usize,
108        timeout: Option<Duration>,
109    ) -> (Self, LimitedReceiver<SourceSenderItem>) {
110        let lag_time = Some(histogram!(LAG_TIME_NAME));
111        let output_id = OutputId {
112            component: "test".to_string().into(),
113            port: None,
114        };
115        let (default_output, rx) = Output::new_with_buffer(
116            n,
117            DEFAULT_OUTPUT.to_owned(),
118            lag_time,
119            None,
120            output_id,
121            timeout,
122        );
123        (
124            Self {
125                default_output: Some(default_output),
126                named_outputs: Default::default(),
127            },
128            rx,
129        )
130    }
131
132    #[cfg(any(test, feature = "test"))]
133    pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
134        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
135        let recv = recv.into_stream().flat_map(into_event_stream);
136        (pipe, recv)
137    }
138
139    #[cfg(any(test, feature = "test"))]
140    pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
141        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
142        // In a source test pipeline, there is no sink to acknowledge
143        // events, so we have to add a map to the receiver to handle the
144        // finalization.
145        let recv = recv.into_stream().flat_map(move |mut item| {
146            item.events.iter_events_mut().for_each(|mut event| {
147                let metadata = event.metadata_mut();
148                metadata.update_status(status);
149                metadata.update_sources();
150            });
151            into_event_stream(item)
152        });
153        (pipe, recv)
154    }
155
156    #[cfg(any(test, feature = "test"))]
157    pub fn new_test_errors(
158        error_at: impl Fn(usize) -> bool,
159    ) -> (Self, impl Stream<Item = Event> + Unpin) {
160        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
161        // In a source test pipeline, there is no sink to acknowledge
162        // events, so we have to add a map to the receiver to handle the
163        // finalization.
164        let mut count: usize = 0;
165        let recv = recv.into_stream().flat_map(move |mut item| {
166            let status = if error_at(count) {
167                EventStatus::Errored
168            } else {
169                EventStatus::Delivered
170            };
171            count += 1;
172            item.events.iter_events_mut().for_each(|mut event| {
173                let metadata = event.metadata_mut();
174                metadata.update_status(status);
175                metadata.update_sources();
176            });
177            into_event_stream(item)
178        });
179        (pipe, recv)
180    }
181
182    #[cfg(any(test, feature = "test"))]
183    pub fn add_outputs(
184        &mut self,
185        status: EventStatus,
186        name: String,
187    ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
188        // The lag_time parameter here will need to be filled in if this function is ever used for
189        // non-test situations.
190        let output_id = OutputId {
191            component: "test".to_string().into(),
192            port: Some(name.clone()),
193        };
194        let (output, recv) =
195            Output::new_with_buffer(100, name.clone(), None, None, output_id, None);
196        let recv = recv.into_stream().map(move |mut item| {
197            item.events.iter_events_mut().for_each(|mut event| {
198                let metadata = event.metadata_mut();
199                metadata.update_status(status);
200                metadata.update_sources();
201            });
202            item
203        });
204        self.named_outputs.insert(name, output);
205        recv
206    }
207
208    /// Get a mutable reference to the default output, panicking if none exists.
209    const fn default_output_mut(&mut self) -> &mut Output {
210        self.default_output.as_mut().expect("no default output")
211    }
212
213    /// Send an event to the default output.
214    ///
215    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
216    pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), SendError> {
217        self.default_output_mut().send_event(event).await
218    }
219
220    /// Send a stream of events to the default output.
221    ///
222    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
223    pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), SendError>
224    where
225        S: Stream<Item = E> + Unpin,
226        E: Into<Event> + ByteSizeOf,
227    {
228        self.default_output_mut().send_event_stream(events).await
229    }
230
231    /// Send a batch of events to the default output.
232    ///
233    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
234    pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), SendError>
235    where
236        E: Into<Event> + ByteSizeOf,
237        I: IntoIterator<Item = E>,
238        <I as IntoIterator>::IntoIter: ExactSizeIterator,
239    {
240        self.default_output_mut().send_batch(events).await
241    }
242
243    /// Send a batch of events event to a named output.
244    ///
245    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
246    pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), SendError>
247    where
248        E: Into<Event> + ByteSizeOf,
249        I: IntoIterator<Item = E>,
250        <I as IntoIterator>::IntoIter: ExactSizeIterator,
251    {
252        self.named_outputs
253            .get_mut(name)
254            .expect("unknown output")
255            .send_batch(events)
256            .await
257    }
258}