1#![allow(missing_docs)]
2use std::{collections::HashMap, fmt, num::NonZeroUsize, sync::Arc, time::Instant};
3
4use chrono::Utc;
5use futures::{Stream, StreamExt};
6use metrics::{histogram, Histogram};
7use tracing::Span;
8use vector_lib::buffers::EventCount;
9use vector_lib::buffers::{
10 config::MemoryBufferSize,
11 topology::channel::{self, LimitedReceiver, LimitedSender},
12};
13use vector_lib::event::array::EventArrayIntoIter;
14#[cfg(any(test, feature = "test-utils"))]
15use vector_lib::event::{into_event_stream, EventStatus};
16use vector_lib::finalization::{AddBatchNotifier, BatchNotifier};
17use vector_lib::internal_event::{ComponentEventsDropped, UNINTENTIONAL};
18use vector_lib::json_size::JsonSize;
19use vector_lib::{
20 config::{log_schema, SourceOutput},
21 event::{array, Event, EventArray, EventContainer, EventRef},
22 internal_event::{
23 self, CountByteSize, EventsSent, InternalEventHandle as _, Registered, DEFAULT_OUTPUT,
24 },
25 ByteSizeOf, EstimatedJsonEncodedSizeOf,
26};
27use vrl::value::Value;
28
29mod errors;
30
31use crate::config::{ComponentKey, OutputId};
32use crate::schema::Definition;
33pub use errors::{ClosedError, StreamSendError};
34
35pub(crate) const CHUNK_SIZE: usize = 1000;
36
37#[cfg(any(test, feature = "test-utils"))]
38const TEST_BUFFER_SIZE: usize = 100;
39
40const LAG_TIME_NAME: &str = "source_lag_time_seconds";
41
42#[derive(Debug)]
49pub struct SourceSenderItem {
50 pub events: EventArray,
52 pub send_reference: Instant,
54}
55
56impl AddBatchNotifier for SourceSenderItem {
57 fn add_batch_notifier(&mut self, notifier: BatchNotifier) {
58 self.events.add_batch_notifier(notifier)
59 }
60}
61
62impl ByteSizeOf for SourceSenderItem {
63 fn allocated_bytes(&self) -> usize {
64 self.events.allocated_bytes()
65 }
66}
67
68impl EventCount for SourceSenderItem {
69 fn event_count(&self) -> usize {
70 self.events.event_count()
71 }
72}
73
74impl EstimatedJsonEncodedSizeOf for SourceSenderItem {
75 fn estimated_json_encoded_size_of(&self) -> JsonSize {
76 self.events.estimated_json_encoded_size_of()
77 }
78}
79
80impl EventContainer for SourceSenderItem {
81 type IntoIter = EventArrayIntoIter;
82
83 fn len(&self) -> usize {
84 self.events.len()
85 }
86
87 fn into_events(self) -> Self::IntoIter {
88 self.events.into_events()
89 }
90}
91
92impl From<SourceSenderItem> for EventArray {
93 fn from(val: SourceSenderItem) -> Self {
94 val.events
95 }
96}
97
98pub struct Builder {
99 buf_size: usize,
100 default_output: Option<Output>,
101 named_outputs: HashMap<String, Output>,
102 lag_time: Option<Histogram>,
103}
104
105impl Default for Builder {
106 fn default() -> Self {
107 Self {
108 buf_size: CHUNK_SIZE,
109 default_output: None,
110 named_outputs: Default::default(),
111 lag_time: Some(histogram!(LAG_TIME_NAME)),
112 }
113 }
114}
115
116impl Builder {
117 pub const fn with_buffer(mut self, n: usize) -> Self {
118 self.buf_size = n;
119 self
120 }
121
122 pub fn add_source_output(
123 &mut self,
124 output: SourceOutput,
125 component_key: ComponentKey,
126 ) -> LimitedReceiver<SourceSenderItem> {
127 let lag_time = self.lag_time.clone();
128 let log_definition = output.schema_definition.clone();
129 let output_id = OutputId {
130 component: component_key,
131 port: output.port.clone(),
132 };
133 match output.port {
134 None => {
135 let (output, rx) = Output::new_with_buffer(
136 self.buf_size,
137 DEFAULT_OUTPUT.to_owned(),
138 lag_time,
139 log_definition,
140 output_id,
141 );
142 self.default_output = Some(output);
143 rx
144 }
145 Some(name) => {
146 let (output, rx) = Output::new_with_buffer(
147 self.buf_size,
148 name.clone(),
149 lag_time,
150 log_definition,
151 output_id,
152 );
153 self.named_outputs.insert(name, output);
154 rx
155 }
156 }
157 }
158
159 pub fn build(self) -> SourceSender {
160 SourceSender {
161 default_output: self.default_output,
162 named_outputs: self.named_outputs,
163 }
164 }
165}
166
167#[derive(Debug, Clone)]
168pub struct SourceSender {
169 default_output: Option<Output>,
172 named_outputs: HashMap<String, Output>,
173}
174
175impl SourceSender {
176 pub fn builder() -> Builder {
177 Builder::default()
178 }
179
180 #[cfg(any(test, feature = "test-utils"))]
181 pub fn new_test_sender_with_buffer(n: usize) -> (Self, LimitedReceiver<SourceSenderItem>) {
182 let lag_time = Some(histogram!(LAG_TIME_NAME));
183 let output_id = OutputId {
184 component: "test".to_string().into(),
185 port: None,
186 };
187 let (default_output, rx) =
188 Output::new_with_buffer(n, DEFAULT_OUTPUT.to_owned(), lag_time, None, output_id);
189 (
190 Self {
191 default_output: Some(default_output),
192 named_outputs: Default::default(),
193 },
194 rx,
195 )
196 }
197
198 #[cfg(any(test, feature = "test-utils"))]
199 pub fn new_test() -> (Self, impl Stream<Item = Event> + Unpin) {
200 let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
201 let recv = recv.into_stream().flat_map(into_event_stream);
202 (pipe, recv)
203 }
204
205 #[cfg(any(test, feature = "test-utils"))]
206 pub fn new_test_finalize(status: EventStatus) -> (Self, impl Stream<Item = Event> + Unpin) {
207 let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
208 let recv = recv.into_stream().flat_map(move |mut item| {
212 item.events.iter_events_mut().for_each(|mut event| {
213 let metadata = event.metadata_mut();
214 metadata.update_status(status);
215 metadata.update_sources();
216 });
217 into_event_stream(item)
218 });
219 (pipe, recv)
220 }
221
222 #[cfg(any(test, feature = "test-utils"))]
223 pub fn new_test_errors(
224 error_at: impl Fn(usize) -> bool,
225 ) -> (Self, impl Stream<Item = Event> + Unpin) {
226 let (pipe, recv) = Self::new_test_sender_with_buffer(TEST_BUFFER_SIZE);
227 let mut count: usize = 0;
231 let recv = recv.into_stream().flat_map(move |mut item| {
232 let status = if error_at(count) {
233 EventStatus::Errored
234 } else {
235 EventStatus::Delivered
236 };
237 count += 1;
238 item.events.iter_events_mut().for_each(|mut event| {
239 let metadata = event.metadata_mut();
240 metadata.update_status(status);
241 metadata.update_sources();
242 });
243 into_event_stream(item)
244 });
245 (pipe, recv)
246 }
247
248 #[cfg(any(test, feature = "test-utils"))]
249 pub fn add_outputs(
250 &mut self,
251 status: EventStatus,
252 name: String,
253 ) -> impl Stream<Item = SourceSenderItem> + Unpin + use<> {
254 let output_id = OutputId {
257 component: "test".to_string().into(),
258 port: Some(name.clone()),
259 };
260 let (output, recv) = Output::new_with_buffer(100, name.clone(), None, None, output_id);
261 let recv = recv.into_stream().map(move |mut item| {
262 item.events.iter_events_mut().for_each(|mut event| {
263 let metadata = event.metadata_mut();
264 metadata.update_status(status);
265 metadata.update_sources();
266 });
267 item
268 });
269 self.named_outputs.insert(name, output);
270 recv
271 }
272
273 const fn default_output_mut(&mut self) -> &mut Output {
275 self.default_output.as_mut().expect("no default output")
276 }
277
278 pub async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
282 self.default_output_mut().send_event(event).await
283 }
284
285 pub async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
289 where
290 S: Stream<Item = E> + Unpin,
291 E: Into<Event> + ByteSizeOf,
292 {
293 self.default_output_mut().send_event_stream(events).await
294 }
295
296 pub async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
300 where
301 E: Into<Event> + ByteSizeOf,
302 I: IntoIterator<Item = E>,
303 <I as IntoIterator>::IntoIter: ExactSizeIterator,
304 {
305 self.default_output_mut().send_batch(events).await
306 }
307
308 pub async fn send_batch_named<I, E>(&mut self, name: &str, events: I) -> Result<(), ClosedError>
312 where
313 E: Into<Event> + ByteSizeOf,
314 I: IntoIterator<Item = E>,
315 <I as IntoIterator>::IntoIter: ExactSizeIterator,
316 {
317 self.named_outputs
318 .get_mut(name)
319 .expect("unknown output")
320 .send_batch(events)
321 .await
322 }
323}
324
325struct UnsentEventCount {
333 count: usize,
334 span: Span,
335}
336
337impl UnsentEventCount {
338 fn new(count: usize) -> Self {
339 Self {
340 count,
341 span: Span::current(),
342 }
343 }
344
345 const fn decr(&mut self, count: usize) {
346 self.count = self.count.saturating_sub(count);
347 }
348
349 const fn discard(&mut self) {
350 self.count = 0;
351 }
352}
353
354impl Drop for UnsentEventCount {
355 fn drop(&mut self) {
356 if self.count > 0 {
357 let _enter = self.span.enter();
358 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
359 count: self.count,
360 reason: "Source send cancelled."
361 });
362 }
363 }
364}
365
366#[derive(Clone)]
367struct Output {
368 sender: LimitedSender<SourceSenderItem>,
369 lag_time: Option<Histogram>,
370 events_sent: Registered<EventsSent>,
371 log_definition: Option<Arc<Definition>>,
373 output_id: Arc<OutputId>,
376}
377
378impl fmt::Debug for Output {
379 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
380 fmt.debug_struct("Output")
381 .field("sender", &self.sender)
382 .field("output_id", &self.output_id)
383 .finish()
385 }
386}
387
388impl Output {
389 fn new_with_buffer(
390 n: usize,
391 output: String,
392 lag_time: Option<Histogram>,
393 log_definition: Option<Arc<Definition>>,
394 output_id: OutputId,
395 ) -> (Self, LimitedReceiver<SourceSenderItem>) {
396 let (tx, rx) = channel::limited(MemoryBufferSize::MaxEvents(NonZeroUsize::new(n).unwrap()));
397 (
398 Self {
399 sender: tx,
400 lag_time,
401 events_sent: register!(EventsSent::from(internal_event::Output(Some(
402 output.into()
403 )))),
404 log_definition,
405 output_id: Arc::new(output_id),
406 },
407 rx,
408 )
409 }
410
411 async fn send(
412 &mut self,
413 mut events: EventArray,
414 unsent_event_count: &mut UnsentEventCount,
415 ) -> Result<(), ClosedError> {
416 let send_reference = Instant::now();
417 let reference = Utc::now().timestamp_millis();
418 events
419 .iter_events()
420 .for_each(|event| self.emit_lag_time(event, reference));
421
422 events.iter_events_mut().for_each(|mut event| {
423 if let Some(log_definition) = &self.log_definition {
425 event.metadata_mut().set_schema_definition(log_definition);
426 }
427 event
428 .metadata_mut()
429 .set_upstream_id(Arc::clone(&self.output_id));
430 });
431
432 let byte_size = events.estimated_json_encoded_size_of();
433 let count = events.len();
434 self.sender
435 .send(SourceSenderItem {
436 events,
437 send_reference,
438 })
439 .await
440 .map_err(|_| ClosedError)?;
441 self.events_sent.emit(CountByteSize(count, byte_size));
442 unsent_event_count.decr(count);
443 Ok(())
444 }
445
446 async fn send_event(&mut self, event: impl Into<EventArray>) -> Result<(), ClosedError> {
447 let event: EventArray = event.into();
448 let mut unsent_event_count = UnsentEventCount::new(event.len());
452 self.send(event, &mut unsent_event_count).await
453 }
454
455 async fn send_event_stream<S, E>(&mut self, events: S) -> Result<(), ClosedError>
456 where
457 S: Stream<Item = E> + Unpin,
458 E: Into<Event> + ByteSizeOf,
459 {
460 let mut stream = events.ready_chunks(CHUNK_SIZE);
461 while let Some(events) = stream.next().await {
462 self.send_batch(events.into_iter()).await?;
463 }
464 Ok(())
465 }
466
467 async fn send_batch<I, E>(&mut self, events: I) -> Result<(), ClosedError>
468 where
469 E: Into<Event> + ByteSizeOf,
470 I: IntoIterator<Item = E>,
471 <I as IntoIterator>::IntoIter: ExactSizeIterator,
472 {
473 let events = events.into_iter().map(Into::into);
477 let mut unsent_event_count = UnsentEventCount::new(events.len());
478 for events in array::events_into_arrays(events, Some(CHUNK_SIZE)) {
479 self.send(events, &mut unsent_event_count)
480 .await
481 .inspect_err(|_| {
482 unsent_event_count.discard();
485 })?;
486 }
487 Ok(())
488 }
489
490 fn emit_lag_time(&self, event: EventRef<'_>, reference: i64) {
494 if let Some(lag_time_metric) = &self.lag_time {
495 let timestamp = match event {
496 EventRef::Log(log) => {
497 log_schema()
498 .timestamp_key_target_path()
499 .and_then(|timestamp_key| {
500 log.get(timestamp_key).and_then(get_timestamp_millis)
501 })
502 }
503 EventRef::Metric(metric) => metric
504 .timestamp()
505 .map(|timestamp| timestamp.timestamp_millis()),
506 EventRef::Trace(trace) => {
507 log_schema()
508 .timestamp_key_target_path()
509 .and_then(|timestamp_key| {
510 trace.get(timestamp_key).and_then(get_timestamp_millis)
511 })
512 }
513 };
514 if let Some(timestamp) = timestamp {
515 let lag_time = (reference - timestamp) as f64 / 1000.0;
518 lag_time_metric.record(lag_time);
519 }
520 }
521 }
522}
523
524const fn get_timestamp_millis(value: &Value) -> Option<i64> {
525 match value {
526 Value::Timestamp(timestamp) => Some(timestamp.timestamp_millis()),
527 _ => None,
528 }
529}
530
531#[cfg(test)]
532mod tests {
533 use chrono::{DateTime, Duration};
534 use rand::{rng, Rng};
535 use tokio::time::timeout;
536 use vector_lib::event::{LogEvent, Metric, MetricKind, MetricValue, TraceEvent};
537 use vrl::event_path;
538
539 use super::*;
540 use crate::metrics::{self, Controller};
541
542 #[tokio::test]
543 async fn emits_lag_time_for_log() {
544 emit_and_test(|timestamp| {
545 let mut log = LogEvent::from("Log message");
546 log.insert("timestamp", timestamp);
547 Event::Log(log)
548 })
549 .await;
550 }
551
552 #[tokio::test]
553 async fn emits_lag_time_for_metric() {
554 emit_and_test(|timestamp| {
555 Event::Metric(
556 Metric::new(
557 "name",
558 MetricKind::Absolute,
559 MetricValue::Gauge { value: 123.4 },
560 )
561 .with_timestamp(Some(timestamp)),
562 )
563 })
564 .await;
565 }
566
567 #[tokio::test]
568 async fn emits_lag_time_for_trace() {
569 emit_and_test(|timestamp| {
570 let mut trace = TraceEvent::default();
571 trace.insert(event_path!("timestamp"), timestamp);
572 Event::Trace(trace)
573 })
574 .await;
575 }
576
577 async fn emit_and_test(make_event: impl FnOnce(DateTime<Utc>) -> Event) {
578 metrics::init_test();
579 let (mut sender, _stream) = SourceSender::new_test();
580 let millis = rng().random_range(10..10000);
581 let timestamp = Utc::now() - Duration::milliseconds(millis);
582 let expected = millis as f64 / 1000.0;
583
584 let event = make_event(timestamp);
585 sender
586 .send_event(event)
587 .await
588 .expect("Send should not fail");
589
590 let lag_times = Controller::get()
591 .expect("There must be a controller")
592 .capture_metrics()
593 .into_iter()
594 .filter(|metric| metric.name() == "source_lag_time_seconds")
595 .collect::<Vec<_>>();
596 assert_eq!(lag_times.len(), 1);
597
598 let lag_time = &lag_times[0];
599 match lag_time.value() {
600 MetricValue::AggregatedHistogram {
601 buckets,
602 count,
603 sum,
604 } => {
605 let mut done = false;
606 for bucket in buckets {
607 if !done && bucket.upper_limit >= expected {
608 assert_eq!(bucket.count, 1);
609 done = true;
610 } else {
611 assert_eq!(bucket.count, 0);
612 }
613 }
614 assert_eq!(*count, 1);
615 assert!(
616 (*sum - expected).abs() <= 0.002,
617 "Histogram sum does not match expected sum: {} vs {}",
618 *sum,
619 expected,
620 );
621 }
622 _ => panic!("source_lag_time_seconds has invalid type"),
623 }
624 }
625
626 #[tokio::test]
627 async fn emits_component_discarded_events_total_for_send_event() {
628 metrics::init_test();
629 let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
630
631 let event = Event::Metric(Metric::new(
632 "name",
633 MetricKind::Absolute,
634 MetricValue::Gauge { value: 123.4 },
635 ));
636
637 sender
639 .send_event(event.clone())
640 .await
641 .expect("First send should not fail");
642
643 let res = timeout(
645 std::time::Duration::from_millis(100),
646 sender.send_event(event.clone()),
647 )
648 .await;
649 assert!(res.is_err(), "Send should have timed out.");
650
651 let component_discarded_events_total = Controller::get()
652 .expect("There must be a controller")
653 .capture_metrics()
654 .into_iter()
655 .filter(|metric| metric.name() == "component_discarded_events_total")
656 .collect::<Vec<_>>();
657 assert_eq!(component_discarded_events_total.len(), 1);
658
659 let component_discarded_events_total = &component_discarded_events_total[0];
660 let MetricValue::Counter { value } = component_discarded_events_total.value() else {
661 panic!("component_discarded_events_total has invalid type")
662 };
663 assert_eq!(*value, 1.0);
664 }
665
666 #[tokio::test]
667 async fn emits_component_discarded_events_total_for_send_batch() {
668 metrics::init_test();
669 let (mut sender, _recv) = SourceSender::new_test_sender_with_buffer(1);
670
671 let expected_drop = 100;
672 let events: Vec<Event> = (0..(CHUNK_SIZE + expected_drop))
673 .map(|_| {
674 Event::Metric(Metric::new(
675 "name",
676 MetricKind::Absolute,
677 MetricValue::Gauge { value: 123.4 },
678 ))
679 })
680 .collect();
681
682 let res = timeout(
684 std::time::Duration::from_millis(100),
685 sender.send_batch(events),
686 )
687 .await;
688 assert!(res.is_err(), "Send should have timed out.");
689
690 let component_discarded_events_total = Controller::get()
691 .expect("There must be a controller")
692 .capture_metrics()
693 .into_iter()
694 .filter(|metric| metric.name() == "component_discarded_events_total")
695 .collect::<Vec<_>>();
696 assert_eq!(component_discarded_events_total.len(), 1);
697
698 let component_discarded_events_total = &component_discarded_events_total[0];
699 let MetricValue::Counter { value } = component_discarded_events_total.value() else {
700 panic!("component_discarded_events_total has invalid type")
701 };
702 assert_eq!(*value, expected_drop as f64);
703 }
704}