vector_core/source_sender/
sender.rs

1#[cfg(any(test, feature = "test"))]
2use std::time::Duration;
3use std::{collections::HashMap, time::Instant};
4
5use futures::Stream;
6#[cfg(any(test, feature = "test"))]
7use futures::StreamExt as _;
8#[cfg(any(test, feature = "test"))]
9use metrics::histogram;
10use vector_buffers::EventCount;
11#[cfg(any(test, feature = "test"))]
12use vector_buffers::topology::channel::LimitedReceiver;
13#[cfg(any(test, feature = "test"))]
14use vector_common::internal_event::DEFAULT_OUTPUT;
15#[cfg(doc)]
16use vector_common::internal_event::{ComponentEventsDropped, EventsSent};
17use vector_common::{
18    byte_size_of::ByteSizeOf,
19    finalization::{AddBatchNotifier, BatchNotifier},
20    json_size::JsonSize,
21};
22
23use super::{Builder, Output, SendError};
24#[cfg(any(test, feature = "test"))]
25use super::{
26    LAG_TIME_NAME, OutputMetrics, SEND_BATCH_LATENCY_NAME, SEND_LATENCY_NAME, TEST_BUFFER_SIZE,
27};
28use crate::{
29    EstimatedJsonEncodedSizeOf,
30    event::{Event, EventArray, EventContainer, array::EventArrayIntoIter},
31};
32#[cfg(any(test, feature = "test"))]
33use crate::{
34    config::OutputId,
35    event::{EventStatus, into_event_stream},
36};
37
38/// SourceSenderItem is a thin wrapper around [EventArray] used to track the send duration of a batch.
39///
40/// This is needed because the send duration is calculated as the difference between when the batch
41/// is sent from the origin component to when the batch is enqueued on the receiving component's input buffer.
42/// For sources in particular, this requires the batch to be enqueued on two channels: the origin component's pump
43/// channel and then the receiving component's input buffer.
44#[derive(Debug)]
45pub struct SourceSenderItem {
46    /// The batch of events to send.
47    pub events: EventArray,
48    /// Reference instant used to calculate send duration.
49    pub send_reference: Instant,
50}
51
52impl AddBatchNotifier for SourceSenderItem {
53    fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
54        self.events.add_batch_notifier(notifier);
55    }
56}
57
58impl ByteSizeOf for SourceSenderItem {
59    fn allocated_bytes(&self) -> usize {
60        self.events.allocated_bytes()
61    }
62}
63
64impl EventCount for SourceSenderItem {
65    fn event_count(&self) -> usize {
66        self.events.event_count()
67    }
68}
69
70impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
71    fn estimated_json_encoded_size_of(&self) -> JsonSize {
72        self.events.estimated_json_encoded_size_of()
73    }
74}
75
76impl EventContainer for SourceSenderItem {
77    type IntoIter = EventArrayIntoIter;
78
79    fn len(&self) -> usize {
80        self.events.len()
81    }
82
83    fn into_events(self) -> Self::IntoIter {
84        self.events.into_events()
85    }
86}
87
88impl From<SourceSenderItem> for EventArray {
89    fn from(val: SourceSenderItem) -> Self {
90        val.events
91    }
92}
93
94#[derive(Debug, Clone)]
95pub struct SourceSender {
96    // The default output is optional because some sources, e.g. `datadog_agent`
97    // and `opentelemetry`, can be configured to only output to named outputs.
98    pub(super) default_output: Option<Output>,
99    pub(super) named_outputs: HashMap<String, Output>,
100}
101
102impl SourceSender {
103    pub fn builder() -> Builder {
104        Builder::default()
105    }
106
107    #[cfg(any(test, feature = "test"))]
108    pub fn new_test_sender_with_options(
109        n: usize,
110        timeout: Option<Duration>,
111    ) -> (Self, LimitedReceiver<SourceSenderItem>) {
112        let lag_time = Some(histogram!(LAG_TIME_NAME));
113        let send_latency = Some(histogram!(SEND_LATENCY_NAME));
114        let send_batch_latency = Some(histogram!(SEND_BATCH_LATENCY_NAME));
115        let output_id = OutputId {
116            component: "test".to_string().into(),
117            port: None,
118        };
119        let (default_output, rx) = Output::new_with_buffer(
120            n,
121            DEFAULT_OUTPUT.to_owned(),
122            OutputMetrics::new(lag_time, send_latency, send_batch_latency),
123            None,
124            output_id,
125            timeout,
126            None,
127        );
128        (
129            Self {
130                default_output: Some(default_output),
131                named_outputs: Default::default(),
132            },
133            rx,
134        )
135    }
136
137    #[cfg(any(test, feature = "test"))]
138    pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
139        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
140        let recv = recv.into_stream().flat_map(into_event_stream);
141        (pipe, recv)
142    }
143
144    #[cfg(any(test, feature = "test"))]
145    pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
146        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
147        // In a source test pipeline, there is no sink to acknowledge
148        // events, so we have to add a map to the receiver to handle the
149        // finalization.
150        let recv = recv.into_stream().flat_map(move |mut item| {
151            item.events.iter_events_mut().for_each(|mut event| {
152                let metadata = event.metadata_mut();
153                metadata.update_status(status);
154                metadata.update_sources();
155            });
156            into_event_stream(item)
157        });
158        (pipe, recv)
159    }
160
161    #[cfg(any(test, feature = "test"))]
162    pub fn new_test_errors(
163        error_at: impl Fn(usize) -> bool,
164    ) -> (Self, impl Stream<Item = Event> + Unpin) {
165        let (pipe, recv) = Self::new_test_sender_with_options(TEST_BUFFER_SIZE, None);
166        // In a source test pipeline, there is no sink to acknowledge
167        // events, so we have to add a map to the receiver to handle the
168        // finalization.
169        let mut count: usize = 0;
170        let recv = recv.into_stream().flat_map(move |mut item| {
171            let status = if error_at(count) {
172                EventStatus::Errored
173            } else {
174                EventStatus::Delivered
175            };
176            count += 1;
177            item.events.iter_events_mut().for_each(|mut event| {
178                let metadata = event.metadata_mut();
179                metadata.update_status(status);
180                metadata.update_sources();
181            });
182            into_event_stream(item)
183        });
184        (pipe, recv)
185    }
186
187    #[cfg(any(test, feature = "test"))]
188    pub fn add_outputs(
189        &mut self,
190        status: EventStatus,
191        name: String,
192    ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
193        // The lag_time parameter here will need to be filled in if this function is ever used for
194        // non-test situations.
195        let output_id = OutputId {
196            component: "test".to_string().into(),
197            port: Some(name.clone()),
198        };
199        let (output, recv) = Output::new_with_buffer(
200            100,
201            name.clone(),
202            OutputMetrics::default(),
203            None,
204            output_id,
205            None,
206            None,
207        );
208        let recv = recv.into_stream().map(move |mut item| {
209            item.events.iter_events_mut().for_each(|mut event| {
210                let metadata = event.metadata_mut();
211                metadata.update_status(status);
212                metadata.update_sources();
213            });
214            item
215        });
216        self.named_outputs.insert(name, output);
217        recv
218    }
219
220    /// Get a mutable reference to the default output, panicking if none exists.
221    const fn default_output_mut(&mut self) -> &mut Output {
222        self.default_output.as_mut().expect("no default output")
223    }
224
225    /// Send an event to the default output.
226    ///
227    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
228    pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), SendError> {
229        self.default_output_mut().send_event(event).await
230    }
231
232    /// Send a stream of events to the default output.
233    ///
234    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
235    pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), SendError>
236    where
237        S: Stream<Item = E> + Unpin,
238        E: Into<Event> + ByteSizeOf,
239    {
240        self.default_output_mut().send_event_stream(events).await
241    }
242
243    /// Send a batch of events to the default output.
244    ///
245    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
246    pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), SendError>
247    where
248        E: Into<Event> + ByteSizeOf,
249        I: IntoIterator<Item = E>,
250        <I as IntoIterator>::IntoIter: ExactSizeIterator,
251    {
252        self.default_output_mut().send_batch(events).await
253    }
254
255    /// Send a batch of events event to a named output.
256    ///
257    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
258    pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), SendError>
259    where
260        E: Into<Event> + ByteSizeOf,
261        I: IntoIterator<Item = E>,
262        <I as IntoIterator>::IntoIter: ExactSizeIterator,
263    {
264        self.named_outputs
265            .get_mut(name)
266            .expect("unknown output")
267            .send_batch(events)
268            .await
269    }
270}