vector/source_sender/
mod.rs

1#![allow(missing_docs)]
2use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc, time::Instant};
3
4use chrono::Utc;
5use futures::{Stream, StreamExt};
6use metrics::{Histogram, histogram};
7use tracing::Span;
8#[cfg(any(test, feature = "test-utils"))]
9use vector_lib::event::{EventStatus, into_event_stream};
10use vector_lib::{
11    ByteSizeOf, EstimatedJsonEncodedSizeOf,
12    buffers::{
13        EventCount,
14        config::MemoryBufferSize,
15        topology::channel::{self, LimitedReceiver, LimitedSender},
16    },
17    config::{SourceOutput, log_schema},
18    event::{Event, EventArray, EventContainer, EventRef, array, array::EventArrayIntoIter},
19    finalization::{AddBatchNotifier, BatchNotifier},
20    internal_event::{
21        self, ComponentEventsDropped, CountByteSize, DEFAULT_OUTPUT, EventsSent,
22        InternalEventHandle as _, Registered, UNINTENTIONAL,
23    },
24    json_size::JsonSize,
25};
26use vrl::value::Value;
27
28mod errors;
29
30pub use errors::{ClosedError, StreamSendError};
31
32use crate::{
33    config::{ComponentKey, OutputId},
34    schema::Definition,
35};
36
37pub(crate) const CHUNK_SIZE: usize = 1000;
38
39#[cfg(any(test, feature = "test-utils"))]
40const TEST_BUFFER_SIZE: usize = 100;
41
42const LAG_TIME_NAME: &str = "source_lag_time_seconds";
43
44/// SourceSenderItem is a thin wrapper around [EventArray] used to track the send duration of a batch.
45///
46/// This is needed because the send duration is calculated as the difference between when the batch
47/// is sent from the origin component to when the batch is enqueued on the receiving component's input buffer.
48/// For sources in particular, this requires the batch to be enqueued on two channels: the origin component's pump
49/// channel and then the receiving component's input buffer.
50#[derive(Debug)]
51pub struct SourceSenderItem {
52    /// The batch of events to send.
53    pub events: EventArray,
54    /// Reference instant used to calculate send duration.
55    pub send_reference: Instant,
56}
57
58impl AddBatchNotifier for SourceSenderItem {
59    fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
60        self.events.add_batch_notifier(notifier)
61    }
62}
63
64impl ByteSizeOf for SourceSenderItem {
65    fn allocated_bytes(&self) -> usize {
66        self.events.allocated_bytes()
67    }
68}
69
70impl EventCount for SourceSenderItem {
71    fn event_count(&self) -> usize {
72        self.events.event_count()
73    }
74}
75
76impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
77    fn estimated_json_encoded_size_of(&self) -> JsonSize {
78        self.events.estimated_json_encoded_size_of()
79    }
80}
81
82impl EventContainer for SourceSenderItem {
83    type IntoIter = EventArrayIntoIter;
84
85    fn len(&self) -> usize {
86        self.events.len()
87    }
88
89    fn into_events(self) -> Self::IntoIter {
90        self.events.into_events()
91    }
92}
93
94impl From<SourceSenderItem> for EventArray {
95    fn from(val: SourceSenderItem) -> Self {
96        val.events
97    }
98}
99
100pub struct Builder {
101    buf_size: usize,
102    default_output: Option<Output>,
103    named_outputs: HashMap<String, Output>,
104    lag_time: Option<Histogram>,
105}
106
107impl Default for Builder {
108    fn default() -> Self {
109        Self {
110            buf_size: CHUNK_SIZE,
111            default_output: None,
112            named_outputs: Default::default(),
113            lag_time: Some(histogram!(LAG_TIME_NAME)),
114        }
115    }
116}
117
118impl Builder {
119    pub const fn with_buffer(mut self, n: usize) -> Self {
120        self.buf_size = n;
121        self
122    }
123
124    pub fn add_source_output(
125        &mut self,
126        output: SourceOutput,
127        component_key: ComponentKey,
128    ) -> LimitedReceiver<SourceSenderItem> {
129        let lag_time = self.lag_time.clone();
130        let log_definition = output.schema_definition.clone();
131        let output_id = OutputId {
132            component: component_key,
133            port: output.port.clone(),
134        };
135        match output.port {
136            None => {
137                let (output, rx) = Output::new_with_buffer(
138                    self.buf_size,
139                    DEFAULT_OUTPUT.to_owned(),
140                    lag_time,
141                    log_definition,
142                    output_id,
143                );
144                self.default_output = Some(output);
145                rx
146            }
147            Some(name) => {
148                let (output, rx) = Output::new_with_buffer(
149                    self.buf_size,
150                    name.clone(),
151                    lag_time,
152                    log_definition,
153                    output_id,
154                );
155                self.named_outputs.insert(name, output);
156                rx
157            }
158        }
159    }
160
161    pub fn build(self) -> SourceSender {
162        SourceSender {
163            default_output: self.default_output,
164            named_outputs: self.named_outputs,
165        }
166    }
167}
168
169#[derive(Debug, Clone)]
170pub struct SourceSender {
171    // The default output is optional because some sources, e.g. `datadog_agent`
172    // and `opentelemetry`, can be configured to only output to named outputs.
173    default_output: Option<Output>,
174    named_outputs: HashMap<String, Output>,
175}
176
177impl SourceSender {
178    pub fn builder() -> Builder {
179        Builder::default()
180    }
181
182    #[cfg(any(test, feature = "test-utils"))]
183    pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver<SourceSenderItem>) {
184        let lag_time = Some(histogram!(LAG_TIME_NAME));
185        let output_id = OutputId {
186            component: "test".to_string().into(),
187            port: None,
188        };
189        let (default_output, rx) =
190            Output::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id);
191        (
192            Self {
193                default_output: Some(default_output),
194                named_outputs: Default::default(),
195            },
196            rx,
197        )
198    }
199
200    #[cfg(any(test, feature = "test-utils"))]
201    pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
202        let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
203        let recv = recv.into_stream().flat_map(into_event_stream);
204        (pipe, recv)
205    }
206
207    #[cfg(any(test, feature = "test-utils"))]
208    pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
209        let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
210        // In a source test pipeline, there is no sink to acknowledge
211        // events, so we have to add a map to the receiver to handle the
212        // finalization.
213        let recv = recv.into_stream().flat_map(move |mut item| {
214            item.events.iter_events_mut().for_each(|mut event| {
215                let metadata = event.metadata_mut();
216                metadata.update_status(status);
217                metadata.update_sources();
218            });
219            into_event_stream(item)
220        });
221        (pipe, recv)
222    }
223
224    #[cfg(any(test, feature = "test-utils"))]
225    pub fn new_test_errors(
226        error_at: impl Fn(usize) -> bool,
227    ) -> (Self, impl Stream<Item = Event> + Unpin) {
228        let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
229        // In a source test pipeline, there is no sink to acknowledge
230        // events, so we have to add a map to the receiver to handle the
231        // finalization.
232        let mut count: usize = 0;
233        let recv = recv.into_stream().flat_map(move |mut item| {
234            let status = if error_at(count) {
235                EventStatus::Errored
236            } else {
237                EventStatus::Delivered
238            };
239            count += 1;
240            item.events.iter_events_mut().for_each(|mut event| {
241                let metadata = event.metadata_mut();
242                metadata.update_status(status);
243                metadata.update_sources();
244            });
245            into_event_stream(item)
246        });
247        (pipe, recv)
248    }
249
250    #[cfg(any(test, feature = "test-utils"))]
251    pub fn add_outputs(
252        &mut self,
253        status: EventStatus,
254        name: String,
255    ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
256        // The lag_time parameter here will need to be filled in if this function is ever used for
257        // non-test situations.
258        let output_id = OutputId {
259            component: "test".to_string().into(),
260            port: Some(name.clone()),
261        };
262        let (output, recv) = Output::new_with_buffer(100, name.clone(), None, None, output_id);
263        let recv = recv.into_stream().map(move |mut item| {
264            item.events.iter_events_mut().for_each(|mut event| {
265                let metadata = event.metadata_mut();
266                metadata.update_status(status);
267                metadata.update_sources();
268            });
269            item
270        });
271        self.named_outputs.insert(name, output);
272        recv
273    }
274
275    /// Get a mutable reference to the default output, panicking if none exists.
276    const fn default_output_mut(&mut self) -> &mut Output {
277        self.default_output.as_mut().expect("no default output")
278    }
279
280    /// Send an event to the default output.
281    ///
282    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
283    pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
284        self.default_output_mut().send_event(event).await
285    }
286
287    /// Send a stream of events to the default output.
288    ///
289    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
290    pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
291    where
292        S: Stream<Item = E> + Unpin,
293        E: Into<Event> + ByteSizeOf,
294    {
295        self.default_output_mut().send_event_stream(events).await
296    }
297
298    /// Send a batch of events to the default output.
299    ///
300    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
301    pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
302    where
303        E: Into<Event> + ByteSizeOf,
304        I: IntoIterator<Item = E>,
305        <I as IntoIterator>::IntoIter: ExactSizeIterator,
306    {
307        self.default_output_mut().send_batch(events).await
308    }
309
310    /// Send a batch of events event to a named output.
311    ///
312    /// This internally handles emitting [EventsSent] and [ComponentEventsDropped] events.
313    pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), ClosedError>
314    where
315        E: Into<Event> + ByteSizeOf,
316        I: IntoIterator<Item = E>,
317        <I as IntoIterator>::IntoIter: ExactSizeIterator,
318    {
319        self.named_outputs
320            .get_mut(name)
321            .expect("unknown output")
322            .send_batch(events)
323            .await
324    }
325}
326
327/// UnsentEvents tracks the number of events yet to be sent in the buffer. This is used to
328/// increment the appropriate counters when a future is not polled to completion. Particularly,
329/// this is known to happen in a Warp server when a client sends a new HTTP request on a TCP
330/// connection that already has a pending request.
331///
332/// If its internal count is greater than 0 when dropped, the appropriate [ComponentEventsDropped]
333/// event is emitted.
334struct UnsentEventCount {
335    count: usize,
336    span: Span,
337}
338
339impl UnsentEventCount {
340    fn new(count: usize) -> Self {
341        Self {
342            count,
343            span: Span::current(),
344        }
345    }
346
347    const fn decr(&mut self, count: usize) {
348        self.count = self.count.saturating_sub(count);
349    }
350
351    const fn discard(&mut self) {
352        self.count = 0;
353    }
354}
355
356impl Drop for UnsentEventCount {
357    fn drop(&mut self) {
358        if self.count > 0 {
359            let _enter = self.span.enter();
360            emit!(ComponentEventsDropped::<UNINTENTIONAL> {
361                count: self.count,
362                reason: "Source send cancelled."
363            });
364        }
365    }
366}
367
368#[derive(Clone)]
369struct Output {
370    sender: LimitedSender<SourceSenderItem>,
371    lag_time: Option<Histogram>,
372    events_sent: Registered<EventsSent>,
373    /// The schema definition that will be attached to Log events sent through here
374    log_definition: Option<Arc<Definition>>,
375    /// The OutputId related to this source sender. This is set as the `upstream_id` in
376    /// `EventMetadata` for all event sent through here.
377    output_id: Arc<OutputId>,
378}
379
380impl fmt::Debug for Output {
381    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
382        fmt.debug_struct("Output")
383            .field("sender", &self.sender)
384            .field("output_id", &self.output_id)
385            // `metrics::Histogram` is missing `impl Debug`
386            .finish()
387    }
388}
389
390impl Output {
391    fn new_with_buffer(
392        n: usize,
393        output: String,
394        lag_time: Option<Histogram>,
395        log_definition: Option<Arc<Definition>>,
396        output_id: OutputId,
397    ) -> (Self, LimitedReceiver<SourceSenderItem>) {
398        let (tx, rx) = channel::limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap()));
399        (
400            Self {
401                sender: tx,
402                lag_time,
403                events_sent: register!(EventsSent::from(internal_event::Output(Some(
404                    output.into()
405                )))),
406                log_definition,
407                output_id: Arc::new(output_id),
408            },
409            rx,
410        )
411    }
412
413    async fn send(
414        &mut self,
415        mut events: EventArray,
416        unsent_event_count: &mut UnsentEventCount,
417    ) -> Result<(), ClosedError> {
418        let send_reference = Instant::now();
419        let reference = Utc::now().timestamp_millis();
420        events
421            .iter_events()
422            .for_each(|event| self.emit_lag_time(event, reference));
423
424        events.iter_events_mut().for_each(|mut event| {
425            // attach runtime schema definitions from the source
426            if let Some(log_definition) = &self.log_definition {
427                event.metadata_mut().set_schema_definition(log_definition);
428            }
429            event
430                .metadata_mut()
431                .set_upstream_id(Arc::clone(&self.output_id));
432        });
433
434        let byte_size = events.estimated_json_encoded_size_of();
435        let count = events.len();
436        self.sender
437            .send(SourceSenderItem {
438                events,
439                send_reference,
440            })
441            .await
442            .map_err(|_| ClosedError)?;
443        self.events_sent.emit(CountByteSize(count, byte_size));
444        unsent_event_count.decr(count);
445        Ok(())
446    }
447
448    async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
449        let event: EventArray = event.into();
450        // It's possible that the caller stops polling this future while it is blocked waiting
451        // on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
452        // `ComponentEventsDropped` events.
453        let mut unsent_event_count = UnsentEventCount::new(event.len());
454        self.send(event, &mut unsent_event_count).await
455    }
456
457    async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
458    where
459        S: Stream<Item = E> + Unpin,
460        E: Into<Event> + ByteSizeOf,
461    {
462        let mut stream = events.ready_chunks(CHUNK_SIZE);
463        while let Some(events) = stream.next().await {
464            self.send_batch(events.into_iter()).await?;
465        }
466        Ok(())
467    }
468
469    async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
470    where
471        E: Into<Event> + ByteSizeOf,
472        I: IntoIterator<Item = E>,
473        <I as IntoIterator>::IntoIter: ExactSizeIterator,
474    {
475        // It's possible that the caller stops polling this future while it is blocked waiting
476        // on `self.send()`. When that happens, we use `UnsentEventCount` to correctly emit
477        // `ComponentEventsDropped` events.
478        let events = events.into_iter().map(Into::into);
479        let mut unsent_event_count = UnsentEventCount::new(events.len());
480        for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
481            self.send(events, &mut unsent_event_count)
482                .await
483                .inspect_err(|_| {
484                    // The unsent event count is discarded here because the callee emits the
485                    // `StreamClosedError`.
486                    unsent_event_count.discard();
487                })?;
488        }
489        Ok(())
490    }
491
492    /// Calculate the difference between the reference time and the
493    /// timestamp stored in the given event reference, and emit the
494    /// different, as expressed in milliseconds, as a histogram.
495    fn emit_lag_time(&self, event: EventRef<'_>, reference: i64) {
496        if let Some(lag_time_metric) = &self.lag_time {
497            let timestamp = match event {
498                EventRef::Log(log) => {
499                    log_schema()
500                        .timestamp_key_target_path()
501                        .and_then(|timestamp_key| {
502                            log.get(timestamp_key).and_then(get_timestamp_millis)
503                        })
504                }
505                EventRef::Metric(metric) => metric
506                    .timestamp()
507                    .map(|timestamp| timestamp.timestamp_millis()),
508                EventRef::Trace(trace) => {
509                    log_schema()
510                        .timestamp_key_target_path()
511                        .and_then(|timestamp_key| {
512                            trace.get(timestamp_key).and_then(get_timestamp_millis)
513                        })
514                }
515            };
516            if let Some(timestamp) = timestamp {
517                // This will truncate precision for values larger than 2**52, but at that point the user
518                // probably has much larger problems than precision.
519                let lag_time = (reference - timestamp) as f64 / 1000.0;
520                lag_time_metric.record(lag_time);
521            }
522        }
523    }
524}
525
526const fn get_timestamp_millis(value: &Value) -> Option<i64> {
527    match value {
528        Value::Timestamp(timestamp) => Some(timestamp.timestamp_millis()),
529        _ => None,
530    }
531}
532
533#[cfg(test)]
534mod tests {
535    use chrono::{DateTime, Duration};
536    use rand::{Rng, rng};
537    use tokio::time::timeout;
538    use vector_lib::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent};
539    use vrl::event_path;
540
541    use super::*;
542    use crate::metrics::{self, Controller};
543
544    #[tokio::test]
545    async fn emits_lag_time_for_log() {
546        emit_and_test(|timestamp| {
547            let mut log = LogEvent::from("Log message");
548            log.insert("timestamp", timestamp);
549            Event::Log(log)
550        })
551        .await;
552    }
553
554    #[tokio::test]
555    async fn emits_lag_time_for_metric() {
556        emit_and_test(|timestamp| {
557            Event::Metric(
558                Metric::new(
559                    "name",
560                    MetricKind::Absolute,
561                    MetricValue::Gauge { value: 123.4 },
562                )
563                .with_timestamp(Some(timestamp)),
564            )
565        })
566        .await;
567    }
568
569    #[tokio::test]
570    async fn emits_lag_time_for_trace() {
571        emit_and_test(|timestamp| {
572            let mut trace = TraceEvent::default();
573            trace.insert(event_path!("timestamp"), timestamp);
574            Event::Trace(trace)
575        })
576        .await;
577    }
578
579    async fn emit_and_test(make_event: impl FnOnce(DateTime<Utc>) -> Event) {
580        metrics::init_test();
581        let (mut sender, _stream) = SourceSender::new_test();
582        let millis = rng().random_range(10..10000);
583        let timestamp = Utc::now() - Duration::milliseconds(millis);
584        let expected = millis as f64 / 1000.0;
585
586        let event = make_event(timestamp);
587        sender
588            .send_event(event)
589            .await
590            .expect("Send should not fail");
591
592        let lag_times = Controller::get()
593            .expect("There must be a controller")
594            .capture_metrics()
595            .into_iter()
596            .filter(|metric| metric.name() == "source_lag_time_seconds")
597            .collect::<Vec<_>>();
598        assert_eq!(lag_times.len(), 1);
599
600        let lag_time = &lag_times[0];
601        match lag_time.value() {
602            MetricValue::AggregatedHistogram {
603                buckets,
604                count,
605                sum,
606            } => {
607                let mut done = false;
608                for bucket in buckets {
609                    if !done && bucket.upper_limit >= expected {
610                        assert_eq!(bucket.count, 1);
611                        done = true;
612                    } else {
613                        assert_eq!(bucket.count, 0);
614                    }
615                }
616                assert_eq!(*count, 1);
617                assert!(
618                    (*sum - expected).abs() <= 0.002,
619                    "Histogram sum does not match expected sum: {} vs {}",
620                    *sum,
621                    expected,
622                );
623            }
624            _ => panic!("source_lag_time_seconds has invalid type"),
625        }
626    }
627
628    #[tokio::test]
629    async fn emits_component_discarded_events_total_for_send_event() {
630        metrics::init_test();
631        let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
632
633        let event = Event::Metric(Metric::new(
634            "name",
635            MetricKind::Absolute,
636            MetricValue::Gauge { value: 123.4 },
637        ));
638
639        // First send will succeed.
640        sender
641            .send_event(event.clone())
642            .await
643            .expect("First send should not fail");
644
645        // Second send will timeout, so the future will not be polled to completion.
646        let res = timeout(
647            std::time::Duration::from_millis(100),
648            sender.send_event(event.clone()),
649        )
650        .await;
651        assert!(res.is_err(), "Send should have timed out.");
652
653        let component_discarded_events_total = Controller::get()
654            .expect("There must be a controller")
655            .capture_metrics()
656            .into_iter()
657            .filter(|metric| metric.name() == "component_discarded_events_total")
658            .collect::<Vec<_>>();
659        assert_eq!(component_discarded_events_total.len(), 1);
660
661        let component_discarded_events_total = &component_discarded_events_total[0];
662        let MetricValue::Counter { value } = component_discarded_events_total.value() else {
663            panic!("component_discarded_events_total has invalid type")
664        };
665        assert_eq!(*value, 1.0);
666    }
667
668    #[tokio::test]
669    async fn emits_component_discarded_events_total_for_send_batch() {
670        metrics::init_test();
671        let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
672
673        let expected_drop = 100;
674        let events: Vec<Event> = (0..(CHUNK_SIZE + expected_drop))
675            .map(|_| {
676                Event::Metric(Metric::new(
677                    "name",
678                    MetricKind::Absolute,
679                    MetricValue::Gauge { value: 123.4 },
680                ))
681            })
682            .collect();
683
684        // `CHUNK_SIZE` events will be sent into buffer but then the future will not be polled to completion.
685        let res = timeout(
686            std::time::Duration::from_millis(100),
687            sender.send_batch(events),
688        )
689        .await;
690        assert!(res.is_err(), "Send should have timed out.");
691
692        let component_discarded_events_total = Controller::get()
693            .expect("There must be a controller")
694            .capture_metrics()
695            .into_iter()
696            .filter(|metric| metric.name() == "component_discarded_events_total")
697            .collect::<Vec<_>>();
698        assert_eq!(component_discarded_events_total.len(), 1);
699
700        let component_discarded_events_total = &component_discarded_events_total[0];
701        let MetricValue::Counter { value } = component_discarded_events_total.value() else {
702            panic!("component_discarded_events_total has invalid type")
703        };
704        assert_eq!(*value, expected_drop as f64);
705    }
706}