1#![allow(missing_docs)]
2use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc, time::Instant};
3
4use chrono::Utc;
5use futures::{Stream, StreamExt};
6use metrics::{Histogram, histogram};
7use tracing::Span;
8#[cfg(any(test, feature = "test-utils"))]
9use vector_lib::event::{EventStatus, into_event_stream};
10use vector_lib::{
11 ByteSizeOf, EstimatedJsonEncodedSizeOf,
12 buffers::{
13 EventCount,
14 config::MemoryBufferSize,
15 topology::channel::{self, LimitedReceiver, LimitedSender},
16 },
17 config::{SourceOutput, log_schema},
18 event::{Event, EventArray, EventContainer, EventRef, array, array::EventArrayIntoIter},
19 finalization::{AddBatchNotifier, BatchNotifier},
20 internal_event::{
21 self, ComponentEventsDropped, CountByteSize, DEFAULT_OUTPUT, EventsSent,
22 InternalEventHandle as _, Registered, UNINTENTIONAL,
23 },
24 json_size::JsonSize,
25};
26use vrl::value::Value;
27
28mod errors;
29
30pub use errors::{ClosedError, StreamSendError};
31
32use crate::{
33 config::{ComponentKey, OutputId},
34 schema::Definition,
35};
36
37pub(crate) const CHUNK_SIZE: usize = 1000;
38
39#[cfg(any(test, feature = "test-utils"))]
40const TEST_BUFFER_SIZE: usize = 100;
41
42const LAG_TIME_NAME: &str = "source_lag_time_seconds";
43
44#[derive(Debug)]
51pub struct SourceSenderItem {
52 pub events: EventArray,
54 pub send_reference: Instant,
56}
57
58impl AddBatchNotifier for SourceSenderItem {
59 fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
60 self.events.add_batch_notifier(notifier)
61 }
62}
63
64impl ByteSizeOf for SourceSenderItem {
65 fn allocated_bytes(&self) -> usize {
66 self.events.allocated_bytes()
67 }
68}
69
70impl EventCount for SourceSenderItem {
71 fn event_count(&self) -> usize {
72 self.events.event_count()
73 }
74}
75
76impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
77 fn estimated_json_encoded_size_of(&self) -> JsonSize {
78 self.events.estimated_json_encoded_size_of()
79 }
80}
81
82impl EventContainer for SourceSenderItem {
83 type IntoIter = EventArrayIntoIter;
84
85 fn len(&self) -> usize {
86 self.events.len()
87 }
88
89 fn into_events(self) -> Self::IntoIter {
90 self.events.into_events()
91 }
92}
93
94impl From<SourceSenderItem> for EventArray {
95 fn from(val: SourceSenderItem) -> Self {
96 val.events
97 }
98}
99
100pub struct Builder {
101 buf_size: usize,
102 default_output: Option<Output>,
103 named_outputs: HashMap<String, Output>,
104 lag_time: Option<Histogram>,
105}
106
107impl Default for Builder {
108 fn default() -> Self {
109 Self {
110 buf_size: CHUNK_SIZE,
111 default_output: None,
112 named_outputs: Default::default(),
113 lag_time: Some(histogram!(LAG_TIME_NAME)),
114 }
115 }
116}
117
118impl Builder {
119 pub const fn with_buffer(mut self, n: usize) -> Self {
120 self.buf_size = n;
121 self
122 }
123
124 pub fn add_source_output(
125 &mut self,
126 output: SourceOutput,
127 component_key: ComponentKey,
128 ) -> LimitedReceiver<SourceSenderItem> {
129 let lag_time = self.lag_time.clone();
130 let log_definition = output.schema_definition.clone();
131 let output_id = OutputId {
132 component: component_key,
133 port: output.port.clone(),
134 };
135 match output.port {
136 None => {
137 let (output, rx) = Output::new_with_buffer(
138 self.buf_size,
139 DEFAULT_OUTPUT.to_owned(),
140 lag_time,
141 log_definition,
142 output_id,
143 );
144 self.default_output = Some(output);
145 rx
146 }
147 Some(name) => {
148 let (output, rx) = Output::new_with_buffer(
149 self.buf_size,
150 name.clone(),
151 lag_time,
152 log_definition,
153 output_id,
154 );
155 self.named_outputs.insert(name, output);
156 rx
157 }
158 }
159 }
160
161 pub fn build(self) -> SourceSender {
162 SourceSender {
163 default_output: self.default_output,
164 named_outputs: self.named_outputs,
165 }
166 }
167}
168
169#[derive(Debug, Clone)]
170pub struct SourceSender {
171 default_output: Option<Output>,
174 named_outputs: HashMap<String, Output>,
175}
176
177impl SourceSender {
178 pub fn builder() -> Builder {
179 Builder::default()
180 }
181
182 #[cfg(any(test, feature = "test-utils"))]
183 pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver<SourceSenderItem>) {
184 let lag_time = Some(histogram!(LAG_TIME_NAME));
185 let output_id = OutputId {
186 component: "test".to_string().into(),
187 port: None,
188 };
189 let (default_output, rx) =
190 Output::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id);
191 (
192 Self {
193 default_output: Some(default_output),
194 named_outputs: Default::default(),
195 },
196 rx,
197 )
198 }
199
200 #[cfg(any(test, feature = "test-utils"))]
201 pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
202 let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
203 let recv = recv.into_stream().flat_map(into_event_stream);
204 (pipe, recv)
205 }
206
207 #[cfg(any(test, feature = "test-utils"))]
208 pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
209 let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
210 let recv = recv.into_stream().flat_map(move |mut item| {
214 item.events.iter_events_mut().for_each(|mut event| {
215 let metadata = event.metadata_mut();
216 metadata.update_status(status);
217 metadata.update_sources();
218 });
219 into_event_stream(item)
220 });
221 (pipe, recv)
222 }
223
224 #[cfg(any(test, feature = "test-utils"))]
225 pub fn new_test_errors(
226 error_at: impl Fn(usize) -> bool,
227 ) -> (Self, impl Stream<Item = Event> + Unpin) {
228 let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
229 let mut count: usize = 0;
233 let recv = recv.into_stream().flat_map(move |mut item| {
234 let status = if error_at(count) {
235 EventStatus::Errored
236 } else {
237 EventStatus::Delivered
238 };
239 count += 1;
240 item.events.iter_events_mut().for_each(|mut event| {
241 let metadata = event.metadata_mut();
242 metadata.update_status(status);
243 metadata.update_sources();
244 });
245 into_event_stream(item)
246 });
247 (pipe, recv)
248 }
249
250 #[cfg(any(test, feature = "test-utils"))]
251 pub fn add_outputs(
252 &mut self,
253 status: EventStatus,
254 name: String,
255 ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
256 let output_id = OutputId {
259 component: "test".to_string().into(),
260 port: Some(name.clone()),
261 };
262 let (output, recv) = Output::new_with_buffer(100, name.clone(), None, None, output_id);
263 let recv = recv.into_stream().map(move |mut item| {
264 item.events.iter_events_mut().for_each(|mut event| {
265 let metadata = event.metadata_mut();
266 metadata.update_status(status);
267 metadata.update_sources();
268 });
269 item
270 });
271 self.named_outputs.insert(name, output);
272 recv
273 }
274
275 const fn default_output_mut(&mut self) -> &mut Output {
277 self.default_output.as_mut().expect("no default output")
278 }
279
280 pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
284 self.default_output_mut().send_event(event).await
285 }
286
287 pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
291 where
292 S: Stream<Item = E> + Unpin,
293 E: Into<Event> + ByteSizeOf,
294 {
295 self.default_output_mut().send_event_stream(events).await
296 }
297
298 pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
302 where
303 E: Into<Event> + ByteSizeOf,
304 I: IntoIterator<Item = E>,
305 <I as IntoIterator>::IntoIter: ExactSizeIterator,
306 {
307 self.default_output_mut().send_batch(events).await
308 }
309
310 pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), ClosedError>
314 where
315 E: Into<Event> + ByteSizeOf,
316 I: IntoIterator<Item = E>,
317 <I as IntoIterator>::IntoIter: ExactSizeIterator,
318 {
319 self.named_outputs
320 .get_mut(name)
321 .expect("unknown output")
322 .send_batch(events)
323 .await
324 }
325}
326
327struct UnsentEventCount {
335 count: usize,
336 span: Span,
337}
338
339impl UnsentEventCount {
340 fn new(count: usize) -> Self {
341 Self {
342 count,
343 span: Span::current(),
344 }
345 }
346
347 const fn decr(&mut self, count: usize) {
348 self.count = self.count.saturating_sub(count);
349 }
350
351 const fn discard(&mut self) {
352 self.count = 0;
353 }
354}
355
356impl Drop for UnsentEventCount {
357 fn drop(&mut self) {
358 if self.count > 0 {
359 let _enter = self.span.enter();
360 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
361 count: self.count,
362 reason: "Source send cancelled."
363 });
364 }
365 }
366}
367
368#[derive(Clone)]
369struct Output {
370 sender: LimitedSender<SourceSenderItem>,
371 lag_time: Option<Histogram>,
372 events_sent: Registered<EventsSent>,
373 log_definition: Option<Arc<Definition>>,
375 output_id: Arc<OutputId>,
378}
379
380impl fmt::Debug for Output {
381 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
382 fmt.debug_struct("Output")
383 .field("sender", &self.sender)
384 .field("output_id", &self.output_id)
385 .finish()
387 }
388}
389
390impl Output {
391 fn new_with_buffer(
392 n: usize,
393 output: String,
394 lag_time: Option<Histogram>,
395 log_definition: Option<Arc<Definition>>,
396 output_id: OutputId,
397 ) -> (Self, LimitedReceiver<SourceSenderItem>) {
398 let (tx, rx) = channel::limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap()));
399 (
400 Self {
401 sender: tx,
402 lag_time,
403 events_sent: register!(EventsSent::from(internal_event::Output(Some(
404 output.into()
405 )))),
406 log_definition,
407 output_id: Arc::new(output_id),
408 },
409 rx,
410 )
411 }
412
413 async fn send(
414 &mut self,
415 mut events: EventArray,
416 unsent_event_count: &mut UnsentEventCount,
417 ) -> Result<(), ClosedError> {
418 let send_reference = Instant::now();
419 let reference = Utc::now().timestamp_millis();
420 events
421 .iter_events()
422 .for_each(|event| self.emit_lag_time(event, reference));
423
424 events.iter_events_mut().for_each(|mut event| {
425 if let Some(log_definition) = &self.log_definition {
427 event.metadata_mut().set_schema_definition(log_definition);
428 }
429 event
430 .metadata_mut()
431 .set_upstream_id(Arc::clone(&self.output_id));
432 });
433
434 let byte_size = events.estimated_json_encoded_size_of();
435 let count = events.len();
436 self.sender
437 .send(SourceSenderItem {
438 events,
439 send_reference,
440 })
441 .await
442 .map_err(|_| ClosedError)?;
443 self.events_sent.emit(CountByteSize(count, byte_size));
444 unsent_event_count.decr(count);
445 Ok(())
446 }
447
448 async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
449 let event: EventArray = event.into();
450 let mut unsent_event_count = UnsentEventCount::new(event.len());
454 self.send(event, &mut unsent_event_count).await
455 }
456
457 async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
458 where
459 S: Stream<Item = E> + Unpin,
460 E: Into<Event> + ByteSizeOf,
461 {
462 let mut stream = events.ready_chunks(CHUNK_SIZE);
463 while let Some(events) = stream.next().await {
464 self.send_batch(events.into_iter()).await?;
465 }
466 Ok(())
467 }
468
469 async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
470 where
471 E: Into<Event> + ByteSizeOf,
472 I: IntoIterator<Item = E>,
473 <I as IntoIterator>::IntoIter: ExactSizeIterator,
474 {
475 let events = events.into_iter().map(Into::into);
479 let mut unsent_event_count = UnsentEventCount::new(events.len());
480 for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
481 self.send(events, &mut unsent_event_count)
482 .await
483 .inspect_err(|_| {
484 unsent_event_count.discard();
487 })?;
488 }
489 Ok(())
490 }
491
492 fn emit_lag_time(&self, event: EventRef<'_>, reference: i64) {
496 if let Some(lag_time_metric) = &self.lag_time {
497 let timestamp = match event {
498 EventRef::Log(log) => {
499 log_schema()
500 .timestamp_key_target_path()
501 .and_then(|timestamp_key| {
502 log.get(timestamp_key).and_then(get_timestamp_millis)
503 })
504 }
505 EventRef::Metric(metric) => metric
506 .timestamp()
507 .map(|timestamp| timestamp.timestamp_millis()),
508 EventRef::Trace(trace) => {
509 log_schema()
510 .timestamp_key_target_path()
511 .and_then(|timestamp_key| {
512 trace.get(timestamp_key).and_then(get_timestamp_millis)
513 })
514 }
515 };
516 if let Some(timestamp) = timestamp {
517 let lag_time = (reference - timestamp) as f64 / 1000.0;
520 lag_time_metric.record(lag_time);
521 }
522 }
523 }
524}
525
526const fn get_timestamp_millis(value: &Value) -> Option<i64> {
527 match value {
528 Value::Timestamp(timestamp) => Some(timestamp.timestamp_millis()),
529 _ => None,
530 }
531}
532
533#[cfg(test)]
534mod tests {
535 use chrono::{DateTime, Duration};
536 use rand::{Rng, rng};
537 use tokio::time::timeout;
538 use vector_lib::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent};
539 use vrl::event_path;
540
541 use super::*;
542 use crate::metrics::{self, Controller};
543
544 #[tokio::test]
545 async fn emits_lag_time_for_log() {
546 emit_and_test(|timestamp| {
547 let mut log = LogEvent::from("Log message");
548 log.insert("timestamp", timestamp);
549 Event::Log(log)
550 })
551 .await;
552 }
553
554 #[tokio::test]
555 async fn emits_lag_time_for_metric() {
556 emit_and_test(|timestamp| {
557 Event::Metric(
558 Metric::new(
559 "name",
560 MetricKind::Absolute,
561 MetricValue::Gauge { value: 123.4 },
562 )
563 .with_timestamp(Some(timestamp)),
564 )
565 })
566 .await;
567 }
568
569 #[tokio::test]
570 async fn emits_lag_time_for_trace() {
571 emit_and_test(|timestamp| {
572 let mut trace = TraceEvent::default();
573 trace.insert(event_path!("timestamp"), timestamp);
574 Event::Trace(trace)
575 })
576 .await;
577 }
578
579 async fn emit_and_test(make_event: impl FnOnce(DateTime<Utc>) -> Event) {
580 metrics::init_test();
581 let (mut sender, _stream) = SourceSender::new_test();
582 let millis = rng().random_range(10..10000);
583 let timestamp = Utc::now() - Duration::milliseconds(millis);
584 let expected = millis as f64 / 1000.0;
585
586 let event = make_event(timestamp);
587 sender
588 .send_event(event)
589 .await
590 .expect("Send should not fail");
591
592 let lag_times = Controller::get()
593 .expect("There must be a controller")
594 .capture_metrics()
595 .into_iter()
596 .filter(|metric| metric.name() == "source_lag_time_seconds")
597 .collect::<Vec<_>>();
598 assert_eq!(lag_times.len(), 1);
599
600 let lag_time = &lag_times[0];
601 match lag_time.value() {
602 MetricValue::AggregatedHistogram {
603 buckets,
604 count,
605 sum,
606 } => {
607 let mut done = false;
608 for bucket in buckets {
609 if !done && bucket.upper_limit >= expected {
610 assert_eq!(bucket.count, 1);
611 done = true;
612 } else {
613 assert_eq!(bucket.count, 0);
614 }
615 }
616 assert_eq!(*count, 1);
617 assert!(
618 (*sum - expected).abs() <= 0.002,
619 "Histogram sum does not match expected sum: {} vs {}",
620 *sum,
621 expected,
622 );
623 }
624 _ => panic!("source_lag_time_seconds has invalid type"),
625 }
626 }
627
628 #[tokio::test]
629 async fn emits_component_discarded_events_total_for_send_event() {
630 metrics::init_test();
631 let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
632
633 let event = Event::Metric(Metric::new(
634 "name",
635 MetricKind::Absolute,
636 MetricValue::Gauge { value: 123.4 },
637 ));
638
639 sender
641 .send_event(event.clone())
642 .await
643 .expect("First send should not fail");
644
645 let res = timeout(
647 std::time::Duration::from_millis(100),
648 sender.send_event(event.clone()),
649 )
650 .await;
651 assert!(res.is_err(), "Send should have timed out.");
652
653 let component_discarded_events_total = Controller::get()
654 .expect("There must be a controller")
655 .capture_metrics()
656 .into_iter()
657 .filter(|metric| metric.name() == "component_discarded_events_total")
658 .collect::<Vec<_>>();
659 assert_eq!(component_discarded_events_total.len(), 1);
660
661 let component_discarded_events_total = &component_discarded_events_total[0];
662 let MetricValue::Counter { value } = component_discarded_events_total.value() else {
663 panic!("component_discarded_events_total has invalid type")
664 };
665 assert_eq!(*value, 1.0);
666 }
667
668 #[tokio::test]
669 async fn emits_component_discarded_events_total_for_send_batch() {
670 metrics::init_test();
671 let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
672
673 let expected_drop = 100;
674 let events: Vec<Event> = (0..(CHUNK_SIZE + expected_drop))
675 .map(|_| {
676 Event::Metric(Metric::new(
677 "name",
678 MetricKind::Absolute,
679 MetricValue::Gauge { value: 123.4 },
680 ))
681 })
682 .collect();
683
684 let res = timeout(
686 std::time::Duration::from_millis(100),
687 sender.send_batch(events),
688 )
689 .await;
690 assert!(res.is_err(), "Send should have timed out.");
691
692 let component_discarded_events_total = Controller::get()
693 .expect("There must be a controller")
694 .capture_metrics()
695 .into_iter()
696 .filter(|metric| metric.name() == "component_discarded_events_total")
697 .collect::<Vec<_>>();
698 assert_eq!(component_discarded_events_total.len(), 1);
699
700 let component_discarded_events_total = &component_discarded_events_total[0];
701 let MetricValue::Counter { value } = component_discarded_events_total.value() else {
702 panic!("component_discarded_events_total has invalid type")
703 };
704 assert_eq!(*value, expected_drop as f64);
705 }
706}