vector/source_sender/
mod.rs

1#![allow(missing_docs)]
2use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc, time::Instant};
3
4use chrono::Utc;
5use futures::{Stream, StreamExt};
6use metrics::{histogram, Histogram};
7use tracing::Span;
8use vector_lib::buffers::EventCount;
9use vector_lib::buffers::{
10    config::MemoryBufferSize,
11    topology::channel::{self, LimitedReceiver, LimitedSender},
12};
13use vector_lib::event::array::EventArrayIntoIter;
14#[cfg(any(test, feature = "test-utils"))]
15use vector_lib::event::{into_event_stream, EventStatus};
16use vector_lib::finalization::{AddBatchNotifier, BatchNotifier};
17use vector_lib::internal_event::{ComponentEventsDropped, UNINTENTIONAL};
18use vector_lib::json_size::JsonSize;
19use vector_lib::{
20    config::{log_schema, SourceOutput},
21    event::{array, Event, EventArray, EventContainer, EventRef},
22    internal_event::{
23        self, CountByteSize, EventsSent, InternalEventHandle as _, Registered, DEFAULT_OUTPUT,
24    },
25    ByteSizeOf, EstimatedJsonEncodedSizeOf,
26};
27use vrl::value::Value;
28
29mod errors;
30
31use crate::config::{ComponentKey, OutputId};
32use crate::schema::Definition;
33pub use errors::{ClosedError, StreamSendError};
34
35pub(crate) const CHUNK_SIZE: usize = 1000;
36
37#[cfg(any(test, feature = "test-utils"))]
38const TEST_BUFFER_SIZE: usize = 100;
39
40const LAG_TIME_NAME: &str = "source_lag_time_seconds";
41
42/// SourceSenderItem is a thin wrapper around [EventArray] used to track the send duration of a batch.
43///
44/// This is needed because the send duration is calculated as the difference between when the batch
45/// is sent from the origin component to when the batch is enqueued on the receiving component's input buffer.
46/// For sources in particular, this requires the batch to be enqueued on two channels: the origin component's pump
47/// channel and then the receiving component's input buffer.
48#[derive(Debug)]
49pub struct SourceSenderItem {
50    /// The batch of events to send.
51    pub events: EventArray,
52    /// Reference instant used to calculate send duration.
53    pub send_reference: Instant,
54}
55
56impl AddBatchNotifier for SourceSenderItem {
57    fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
58        self.events.add_batch_notifier(notifier)
59    }
60}
61
62impl ByteSizeOf for SourceSenderItem {
63    fn allocated_bytes(&self) -> usize {
64        self.events.allocated_bytes()
65    }
66}
67
68impl EventCount for SourceSenderItem {
69    fn event_count(&self) -> usize {
70        self.events.event_count()
71    }
72}
73
74impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
75    fn estimated_json_encoded_size_of(&self) -> JsonSize {
76        self.events.estimated_json_encoded_size_of()
77    }
78}
79
80impl EventContainer for SourceSenderItem {
81    type IntoIter = EventArrayIntoIter;
82
83    fn len(&self) -> usize {
84        self.events.len()
85    }
86
87    fn into_events(self) -> Self::IntoIter {
88        self.events.into_events()
89    }
90}
91
92impl From<SourceSenderItem> for EventArray {
93    fn from(val: SourceSenderItem) -> Self {
94        val.events
95    }
96}
97
98pub struct Builder {
99    buf_size: usize,
100    default_output: Option<Output>,
101    named_outputs: HashMap<String, Output>,
102    lag_time: Option<Histogram>,
103}
104
105impl Default for Builder {
106    fn default() -> Self {
107        Self {
108            buf_size: CHUNK_SIZE,
109            default_output: None,
110            named_outputs: Default::default(),
111            lag_time: Some(histogram!(LAG_TIME_NAME)),
112        }
113    }
114}
115
116impl Builder {
117    pub const fn with_buffer(mut self, n: usize) -> Self {
118        self.buf_size = n;
119        self
120    }
121
122    pub fn add_source_output(
123        &mut self,
124        output: SourceOutput,
125        component_key: ComponentKey,
126    ) -> LimitedReceiver<SourceSenderItem> {
127        let lag_time = self.lag_time.clone();
128        let log_definition = output.schema_definition.clone();
129        let output_id = OutputId {
130            component: component_key,
131            port: output.port.clone(),
132        };
133        match output.port {
134            None => {
135                let (output, rx) = Output::new_with_buffer(
136                    self.buf_size,
137                    DEFAULT_OUTPUT.to_owned(),
138                    lag_time,
139                    log_definition,
140                    output_id,
141                );
142                self.default_output = Some(output);
143                rx
144            }
145            Some(name) => {
146                let (output, rx) = Output::new_with_buffer(
147                    self.buf_size,
148                    name.clone(),
149                    lag_time,
150                    log_definition,
151                    output_id,
152                );
153                self.named_outputs.insert(name, output);
154                rx
155            }
156        }
157    }
158
159    pub fn build(self) -> SourceSender {
160        SourceSender {
161            default_output: self.default_output,
162            named_outputs: self.named_outputs,
163        }
164    }
165}
166
167#[derive(Debug, Clone)]
168pub struct SourceSender {
169    // The default output is optional because some sources, e.g. `datadog_agent`
170    // and `opentelemetry`, can be configured to only output to named outputs.
171    default_output: Option<Output>,
172    named_outputs: HashMap<String, Output>,
173}
174
175impl SourceSender {
176    pub fn builder() -> Builder {
177        Builder::default()
178    }
179
180    #[cfg(any(test, feature = "test-utils"))]
181    pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver<SourceSenderItem>) {
182        let lag_time = Some(histogram!(LAG_TIME_NAME));
183        let output_id = OutputId {
184            component: "test".to_string().into(),
185            port: None,
186        };
187        let (default_output, rx) =
188            Output::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id);
189        (
190            Self {
191                default_output: Some(default_output),
192                named_outputs: Default::default(),
193            },
194            rx,
195        )
196    }
197
198    #[cfg(any(test, feature = "test-utils"))]
199    pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
200        let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
201        let recv = recv.into_stream().flat_map(into_event_stream);
202        (pipe, recv)
203    }
204
205    #[cfg(any(test, feature = "test-utils"))]
206    pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
207        let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
208        // In a source test pipeline, there is no sink to acknowledge
209        // events, so we have to add a map to the receiver to handle the
210        // finalization.
211        let recv = recv.into_stream().flat_map(move |mut item| {
212            item.events.iter_events_mut().for_each(|mut event| {
213                let metadata = event.metadata_mut();
214                metadata.update_status(status);
215                metadata.update_sources();
216            });
217            into_event_stream(item)
218        });
219        (pipe, recv)
220    }
221
222    #[cfg(any(test, feature = "test-utils"))]
223    pub fn new_test_errors(
224        error_at: impl Fn(usize) -> bool,
225    ) -> (Self, impl Stream<Item = Event> + Unpin) {
226        let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
227        // In a source test pipeline, there is no sink to acknowledge
228        // events, so we have to add a map to the receiver to handle the
229        // finalization.
230        let mut count: usize = 0;
231        let recv = recv.into_stream().flat_map(move |mut item| {
232            let status = if error_at(count) {
233                EventStatus::Errored
234            } else {
235                EventStatus::Delivered
236            };
237            count += 1;
238            item.events.iter_events_mut().for_each(|mut event| {
239                let metadata = event.metadata_mut();
240                metadata.update_status(status);
241                metadata.update_sources();
242            });
243            into_event_stream(item)
244        });
245        (pipe, recv)
246    }
247
248    #[cfg(any(test, feature = "test-utils"))]
249    pub fn add_outputs(
250        &mut self,
251        status: EventStatus,
252        name: String,
253    ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
254        // The lag_time parameter here will need to be filled in if this function is ever used for
255        // non-test situations.
256        let output_id = OutputId {
257            component: "test".to_string().into(),
258            port: Some(name.clone()),
259        };
260        let (output, recv) = Output::new_with_buffer(100, name.clone(), None, None, output_id);
261        let recv = recv.into_stream().map(move |mut item| {
262            item.events.iter_events_mut().for_each(|mut event| {
263                let metadata = event.metadata_mut();
264                metadata.update_status(status);
265                metadata.update_sources();
266            });
267            item
268        });
269        self.named_outputs.insert(name, output);
270        recv
271    }
272
273    /// Get a mutable reference to the default output, panicking if none exists.
274    const fn default_output_mut(&mut self) -> &mut Output {
275        self.default_output.as_mut().expect("no default output")
276    }
277
278    /// Send an event to the default output.
279    ///
280    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
281    pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
282        self.default_output_mut().send_event(event).await
283    }
284
285    /// Send a stream of events to the default output.
286    ///
287    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
288    pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
289    where
290        S: Stream<Item = E> + Unpin,
291        E: Into<Event> + ByteSizeOf,
292    {
293        self.default_output_mut().send_event_stream(events).await
294    }
295
296    /// Send a batch of events to the default output.
297    ///
298    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
299    pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
300    where
301        E: Into<Event> + ByteSizeOf,
302        I: IntoIterator<Item = E>,
303        <I as IntoIterator>::IntoIter: ExactSizeIterator,
304    {
305        self.default_output_mut().send_batch(events).await
306    }
307
308    /// Send a batch of events event to a named output.
309    ///
310    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
311    pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), ClosedError>
312    where
313        E: Into<Event> + ByteSizeOf,
314        I: IntoIterator<Item = E>,
315        <I as IntoIterator>::IntoIter: ExactSizeIterator,
316    {
317        self.named_outputs
318            .get_mut(name)
319            .expect("unknown output")
320            .send_batch(events)
321            .await
322    }
323}
324
325/// UnsentEvents tracks the number of events yet to be sent in the buffer. This is used to
326/// increment the appropriate counters when a future is not polled to completion. Particularly,
327/// this is known to happen in a Warp server when a client sends a new HTTP request on a TCP
328/// connection that already has a pending request.
329///
330/// If its internal count is greater than 0 when dropped, the appropriate [ComponentEventsDropped]
331/// event is emitted.
332struct UnsentEventCount {
333    count: usize,
334    span: Span,
335}
336
337impl UnsentEventCount {
338    fn new(count: usize) -> Self {
339        Self {
340            count,
341            span: Span::current(),
342        }
343    }
344
345    const fn decr(&mut self, count: usize) {
346        self.count = self.count.saturating_sub(count);
347    }
348
349    const fn discard(&mut self) {
350        self.count = 0;
351    }
352}
353
354impl Drop for UnsentEventCount {
355    fn drop(&mut self) {
356        if self.count > 0 {
357            let _enter = self.span.enter();
358            emit!(ComponentEventsDropped::<UNINTENTIONAL> {
359                count: self.count,
360                reason: "Source send cancelled."
361            });
362        }
363    }
364}
365
366#[derive(Clone)]
367struct Output {
368    sender: LimitedSender<SourceSenderItem>,
369    lag_time: Option<Histogram>,
370    events_sent: Registered<EventsSent>,
371    /// The schema definition that will be attached to Log events sent through here
372    log_definition: Option<Arc<Definition>>,
373    /// The OutputId related to this source sender. This is set as the `upstream_id` in
374    /// `EventMetadata` for all event sent through here.
375    output_id: Arc<OutputId>,
376}
377
378impl fmt::Debug for Output {
379    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
380        fmt.debug_struct("Output")
381            .field("sender", &self.sender)
382            .field("output_id", &self.output_id)
383            // `metrics::Histogram` is missing `impl Debug`
384            .finish()
385    }
386}
387
388impl Output {
389    fn new_with_buffer(
390        n: usize,
391        output: String,
392        lag_time: Option<Histogram>,
393        log_definition: Option<Arc<Definition>>,
394        output_id: OutputId,
395    ) -> (Self, LimitedReceiver<SourceSenderItem>) {
396        let (tx, rx) = channel::limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap()));
397        (
398            Self {
399                sender: tx,
400                lag_time,
401                events_sent: register!(EventsSent::from(internal_event::Output(Some(
402                    output.into()
403                )))),
404                log_definition,
405                output_id: Arc::new(output_id),
406            },
407            rx,
408        )
409    }
410
411    async fn send(
412        &mut self,
413        mut events: EventArray,
414        unsent_event_count: &mut UnsentEventCount,
415    ) -> Result<(), ClosedError> {
416        let send_reference = Instant::now();
417        let reference = Utc::now().timestamp_millis();
418        events
419            .iter_events()
420            .for_each(|event| self.emit_lag_time(event, reference));
421
422        events.iter_events_mut().for_each(|mut event| {
423            // attach runtime schema definitions from the source
424            if let Some(log_definition) = &self.log_definition {
425                event.metadata_mut().set_schema_definition(log_definition);
426            }
427            event
428                .metadata_mut()
429                .set_upstream_id(Arc::clone(&self.output_id));
430        });
431
432        let byte_size = events.estimated_json_encoded_size_of();
433        let count = events.len();
434        self.sender
435            .send(SourceSenderItem {
436                events,
437                send_reference,
438            })
439            .await
440            .map_err(|_| ClosedError)?;
441        self.events_sent.emit(CountByteSize(count, byte_size));
442        unsent_event_count.decr(count);
443        Ok(())
444    }
445
446    async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
447        let event: EventArray = event.into();
448        // It's possible that the caller stops polling this future while it is blocked waiting
449        // on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
450        // `ComponentEventsDropped` events.
451        let mut unsent_event_count = UnsentEventCount::new(event.len());
452        self.send(event, &mut unsent_event_count).await
453    }
454
455    async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
456    where
457        S: Stream<Item = E> + Unpin,
458        E: Into<Event> + ByteSizeOf,
459    {
460        let mut stream = events.ready_chunks(CHUNK_SIZE);
461        while let Some(events) = stream.next().await {
462            self.send_batch(events.into_iter()).await?;
463        }
464        Ok(())
465    }
466
467    async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
468    where
469        E: Into<Event> + ByteSizeOf,
470        I: IntoIterator<Item = E>,
471        <I as IntoIterator>::IntoIter: ExactSizeIterator,
472    {
473        // It's possible that the caller stops polling this future while it is blocked waiting
474        // on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
475        // `ComponentEventsDropped` events.
476        let events = events.into_iter().map(Into::into);
477        let mut unsent_event_count = UnsentEventCount::new(events.len());
478        for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
479            self.send(events, &mut unsent_event_count)
480                .await
481                .inspect_err(|_| {
482                    // The unsent event count is discarded here because the callee emits the
483                    // `StreamClosedError`.
484                    unsent_event_count.discard();
485                })?;
486        }
487        Ok(())
488    }
489
490    /// Calculate the difference between the reference time and the
491    /// timestamp stored in the given event reference, and emit the
492    /// different, as expressed in milliseconds, as a histogram.
493    fn emit_lag_time(&self, event: EventRef<'_>, reference: i64) {
494        if let Some(lag_time_metric) = &self.lag_time {
495            let timestamp = match event {
496                EventRef::Log(log) => {
497                    log_schema()
498                        .timestamp_key_target_path()
499                        .and_then(|timestamp_key| {
500                            log.get(timestamp_key).and_then(get_timestamp_millis)
501                        })
502                }
503                EventRef::Metric(metric) => metric
504                    .timestamp()
505                    .map(|timestamp| timestamp.timestamp_millis()),
506                EventRef::Trace(trace) => {
507                    log_schema()
508                        .timestamp_key_target_path()
509                        .and_then(|timestamp_key| {
510                            trace.get(timestamp_key).and_then(get_timestamp_millis)
511                        })
512                }
513            };
514            if let Some(timestamp) = timestamp {
515                // This will truncate precision for values larger than 2**52, but at that point the user
516                // probably has much larger problems than precision.
517                let lag_time = (reference - timestamp) as f64 / 1000.0;
518                lag_time_metric.record(lag_time);
519            }
520        }
521    }
522}
523
524const fn get_timestamp_millis(value: &Value) -> Option<i64> {
525    match value {
526        Value::Timestamp(timestamp) => Some(timestamp.timestamp_millis()),
527        _ => None,
528    }
529}
530
531#[cfg(test)]
532mod tests {
533    use chrono::{DateTime, Duration};
534    use rand::{rng, Rng};
535    use tokio::time::timeout;
536    use vector_lib::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent};
537    use vrl::event_path;
538
539    use super::*;
540    use crate::metrics::{self, Controller};
541
542    #[tokio::test]
543    async fn emits_lag_time_for_log() {
544        emit_and_test(|timestamp| {
545            let mut log = LogEvent::from("Log message");
546            log.insert("timestamp", timestamp);
547            Event::Log(log)
548        })
549        .await;
550    }
551
552    #[tokio::test]
553    async fn emits_lag_time_for_metric() {
554        emit_and_test(|timestamp| {
555            Event::Metric(
556                Metric::new(
557                    "name",
558                    MetricKind::Absolute,
559                    MetricValue::Gauge { value: 123.4 },
560                )
561                .with_timestamp(Some(timestamp)),
562            )
563        })
564        .await;
565    }
566
567    #[tokio::test]
568    async fn emits_lag_time_for_trace() {
569        emit_and_test(|timestamp| {
570            let mut trace = TraceEvent::default();
571            trace.insert(event_path!("timestamp"), timestamp);
572            Event::Trace(trace)
573        })
574        .await;
575    }
576
577    async fn emit_and_test(make_event: impl FnOnce(DateTime<Utc>) -> Event) {
578        metrics::init_test();
579        let (mut sender, _stream) = SourceSender::new_test();
580        let millis = rng().random_range(10..10000);
581        let timestamp = Utc::now() - Duration::milliseconds(millis);
582        let expected = millis as f64 / 1000.0;
583
584        let event = make_event(timestamp);
585        sender
586            .send_event(event)
587            .await
588            .expect("Send should not fail");
589
590        let lag_times = Controller::get()
591            .expect("There must be a controller")
592            .capture_metrics()
593            .into_iter()
594            .filter(|metric| metric.name() == "source_lag_time_seconds")
595            .collect::<Vec<_>>();
596        assert_eq!(lag_times.len(), 1);
597
598        let lag_time = &lag_times[0];
599        match lag_time.value() {
600            MetricValue::AggregatedHistogram {
601                buckets,
602                count,
603                sum,
604            } => {
605                let mut done = false;
606                for bucket in buckets {
607                    if !done && bucket.upper_limit >= expected {
608                        assert_eq!(bucket.count, 1);
609                        done = true;
610                    } else {
611                        assert_eq!(bucket.count, 0);
612                    }
613                }
614                assert_eq!(*count, 1);
615                assert!(
616                    (*sum - expected).abs() <= 0.002,
617                    "Histogram sum does not match expected sum: {} vs {}",
618                    *sum,
619                    expected,
620                );
621            }
622            _ => panic!("source_lag_time_seconds has invalid type"),
623        }
624    }
625
626    #[tokio::test]
627    async fn emits_component_discarded_events_total_for_send_event() {
628        metrics::init_test();
629        let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
630
631        let event = Event::Metric(Metric::new(
632            "name",
633            MetricKind::Absolute,
634            MetricValue::Gauge { value: 123.4 },
635        ));
636
637        // First send will succeed.
638        sender
639            .send_event(event.clone())
640            .await
641            .expect("First send should not fail");
642
643        // Second send will timeout, so the future will not be polled to completion.
644        let res = timeout(
645            std::time::Duration::from_millis(100),
646            sender.send_event(event.clone()),
647        )
648        .await;
649        assert!(res.is_err(), "Send should have timed out.");
650
651        let component_discarded_events_total = Controller::get()
652            .expect("There must be a controller")
653            .capture_metrics()
654            .into_iter()
655            .filter(|metric| metric.name() == "component_discarded_events_total")
656            .collect::<Vec<_>>();
657        assert_eq!(component_discarded_events_total.len(), 1);
658
659        let component_discarded_events_total = &component_discarded_events_total[0];
660        let MetricValue::Counter { value } = component_discarded_events_total.value() else {
661            panic!("component_discarded_events_total has invalid type")
662        };
663        assert_eq!(*value, 1.0);
664    }
665
666    #[tokio::test]
667    async fn emits_component_discarded_events_total_for_send_batch() {
668        metrics::init_test();
669        let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
670
671        let expected_drop = 100;
672        let events: Vec<Event> = (0..(CHUNK_SIZE + expected_drop))
673            .map(|_| {
674                Event::Metric(Metric::new(
675                    "name",
676                    MetricKind::Absolute,
677                    MetricValue::Gauge { value: 123.4 },
678                ))
679            })
680            .collect();
681
682        // `CHUNK_SIZE` events will be sent into buffer but then the future will not be polled to completion.
683        let res = timeout(
684            std::time::Duration::from_millis(100),
685            sender.send_batch(events),
686        )
687        .await;
688        assert!(res.is_err(), "Send should have timed out.");
689
690        let component_discarded_events_total = Controller::get()
691            .expect("There must be a controller")
692            .capture_metrics()
693            .into_iter()
694            .filter(|metric| metric.name() == "component_discarded_events_total")
695            .collect::<Vec<_>>();
696        assert_eq!(component_discarded_events_total.len(), 1);
697
698        let component_discarded_events_total = &component_discarded_events_total[0];
699        let MetricValue::Counter { value } = component_discarded_events_total.value() else {
700            panic!("component_discarded_events_total has invalid type")
701        };
702        assert_eq!(*value, expected_drop as f64);
703    }
704}