1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{config::ComponentKey, event::EventArray};
11
12pub enum ControlMessage {
13 Add(ComponentKey, BufferSender<EventArray>),
15
16 Remove(ComponentKey),
18
19 Pause(ComponentKey),
24
25 Replace(ComponentKey, BufferSender<EventArray>),
27}
28
29impl fmt::Debug for ControlMessage {
30 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
31 write!(f, "ControlMessage::")?;
32 match self {
33 Self::Add(id, _) => write!(f, "Add({id:?})"),
34 Self::Remove(id) => write!(f, "Remove({id:?})"),
35 Self::Pause(id) => write!(f, "Pause({id:?})"),
36 Self::Replace(id, _) => write!(f, "Replace({id:?})"),
37 }
38 }
39}
40
41pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
44
45pub struct Fanout {
46 senders: IndexMap<ComponentKey, Option<Sender>>,
47 control_channel: mpsc::UnboundedReceiver<ControlMessage>,
48}
49
50impl Fanout {
51 pub fn new() -> (Self, ControlChannel) {
52 let (control_tx, control_rx) = mpsc::unbounded_channel();
53
54 let fanout = Self {
55 senders: Default::default(),
56 control_channel: control_rx,
57 };
58
59 (fanout, control_tx)
60 }
61
62 pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
68 assert!(
69 !self.senders.contains_key(&id),
70 "Adding duplicate output id to fanout: {id}"
71 );
72 self.senders.insert(id, Some(Sender::new(sink)));
73 }
74
75 fn remove(&mut self, id: &ComponentKey) {
76 assert!(
77 self.senders.shift_remove(id).is_some(),
78 "Removing nonexistent sink from fanout: {id}"
79 );
80 }
81
82 fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
83 match self.senders.get_mut(id) {
84 Some(sender) => {
85 assert!(
89 sender.replace(Sender::new(sink)).is_none(),
90 "Replacing existing sink is not valid: {id}"
91 );
92 }
93 None => panic!("Replacing unknown sink from fanout: {id}"),
94 }
95 }
96
97 fn pause(&mut self, id: &ComponentKey) {
98 match self.senders.get_mut(id) {
99 Some(sender) => {
100 assert!(
103 sender.take().is_some(),
104 "Pausing nonexistent sink is not valid: {id}"
105 );
106 }
107 None => panic!("Pausing unknown sink from fanout: {id}"),
108 }
109 }
110
111 fn apply_control_message(&mut self, message: ControlMessage) {
115 trace!("Processing control message outside of send: {:?}", message);
116
117 match message {
118 ControlMessage::Add(id, sink) => self.add(id, sink),
119 ControlMessage::Remove(id) => self.remove(&id),
120 ControlMessage::Pause(id) => self.pause(&id),
121 ControlMessage::Replace(id, sink) => self.replace(&id, sink),
122 }
123 }
124
125 async fn wait_for_replacements(&mut self) {
130 while self.senders.values().any(Option::is_none) {
131 if let Some(msg) = self.control_channel.recv().await {
132 self.apply_control_message(msg);
133 } else {
134 }
152 }
153 }
154
155 pub async fn send_stream(
171 &mut self,
172 events: impl Stream<Item = (EventArray, Instant)>,
173 ) -> crate::Result<()> {
174 tokio::pin!(events);
175 while let Some((event_array, send_reference)) = events.next().await {
176 self.send(event_array, Some(send_reference)).await?;
177 }
178 Ok(())
179 }
180
181 pub async fn send(
197 &mut self,
198 events: EventArray,
199 send_reference: Option<Instant>,
200 ) -> crate::Result<()> {
201 while let Ok(message) = self.control_channel.try_recv() {
203 self.apply_control_message(message);
204 }
205
206 self.wait_for_replacements().await;
208
209 if self.senders.is_empty() {
211 trace!("No senders present.");
212 return Ok(());
213 }
214
215 let mut control_channel_open = true;
222
223 let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
226
227 loop {
228 tokio::select! {
229 biased;
233
234 maybe_msg = self.control_channel.recv(), if control_channel_open => {
235 trace!("Processing control message inside of send: {:?}", maybe_msg);
236
237 match maybe_msg {
240 Some(ControlMessage::Add(id, sink)) => {
241 send_group.add(id, sink);
242 },
243 Some(ControlMessage::Remove(id)) => {
244 send_group.remove(&id);
245 },
246 Some(ControlMessage::Pause(id)) => {
247 send_group.pause(&id);
248 },
249 Some(ControlMessage::Replace(id, sink)) => {
250 send_group.replace(&id, Sender::new(sink));
251 },
252 None => {
253 control_channel_open = false;
255 }
256 }
257 }
258
259 result = send_group.send() => match result {
260 Ok(()) => {
261 trace!("Sent item to fanout.");
262 break;
263 },
264 Err(e) => return Err(e),
265 }
266 }
267 }
268
269 Ok(())
270 }
271}
272
273struct SendGroup<'a> {
274 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
275 sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
276}
277
278impl<'a> SendGroup<'a> {
279 fn new(
280 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
281 events: EventArray,
282 send_reference: Option<Instant>,
283 ) -> Self {
284 debug_assert!(senders.values().all(Option::is_some));
287
288 let last_sender_idx = senders.len().saturating_sub(1);
289 let mut events = Some(events);
290
291 let mut sends = HashMap::new();
294 for (i, (key, sender)) in senders.iter_mut().enumerate() {
295 let mut sender = sender
296 .take()
297 .expect("sender must be present to initialize SendGroup");
298
299 if i == last_sender_idx {
301 sender.input = events.take();
302 } else {
303 sender.input.clone_from(&events);
304 }
305 sender.send_reference = send_reference;
306
307 let send = async move {
309 sender.flush().await?;
310 Ok(sender)
311 };
312
313 sends.insert(key.clone(), ReusableBoxFuture::new(send));
314 }
315
316 Self { senders, sends }
317 }
318
319 fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
320 if let Some(send) = self.sends.remove(id) {
321 tokio::spawn(async move {
322 if let Err(e) = send.await {
323 warn!(
324 cause = %e,
325 message = "Encountered error writing to component after detaching from topology.",
326 );
327 }
328 });
329 true
330 } else {
331 false
332 }
333 }
334
335 #[allow(clippy::needless_pass_by_value)]
336 fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
337 assert!(
340 self.senders
341 .insert(id.clone(), Some(Sender::new(sink)))
342 .is_none(),
343 "Adding duplicate output id to fanout: {id}"
344 );
345 }
346
347 fn remove(&mut self, id: &ComponentKey) {
348 assert!(
352 self.senders.shift_remove(id).is_some(),
353 "Removing nonexistent sink from fanout: {id}"
354 );
355
356 self.try_detach_send(id);
361 }
362
363 fn replace(&mut self, id: &ComponentKey, sink: Sender) {
364 match self.senders.get_mut(id) {
365 Some(sender) => {
366 assert!(
370 sender.replace(sink).is_none(),
371 "Replacing existing sink is not valid: {id}"
372 );
373 }
374 None => panic!("Replacing unknown sink from fanout: {id}"),
375 }
376 }
377
378 fn pause(&mut self, id: &ComponentKey) {
379 match self.senders.get_mut(id) {
380 Some(sender) => {
381 if sender.take().is_none() {
388 assert!(
389 self.try_detach_send(id),
390 "Pausing already-paused sink is invalid: {id}"
391 );
392 }
393 }
394 None => panic!("Pausing unknown sink from fanout: {id}"),
395 }
396 }
397
398 async fn send(&mut self) -> crate::Result<()> {
399 loop {
403 if self.sends.is_empty() {
404 break;
405 }
406
407 let mut done = Vec::new();
408 for (key, send) in &mut self.sends {
409 if let Poll::Ready(result) = poll!(send.get_pin()) {
410 let sender = result?;
411
412 done.push((key.clone(), sender));
415 }
416 }
417
418 for (key, sender) in done {
419 self.sends.remove(&key);
420 self.replace(&key, sender);
421 }
422
423 if !self.sends.is_empty() {
424 pending!();
428 }
429 }
430
431 Ok(())
432 }
433}
434
435struct Sender {
436 inner: BufferSender<EventArray>,
437 input: Option<EventArray>,
438 send_reference: Option<Instant>,
439}
440
441impl Sender {
442 fn new(inner: BufferSender<EventArray>) -> Self {
443 Self {
444 inner,
445 input: None,
446 send_reference: None,
447 }
448 }
449
450 async fn flush(&mut self) -> crate::Result<()> {
451 let send_reference = self.send_reference.take();
452 if let Some(input) = self.input.take() {
453 self.inner.send(input, send_reference).await?;
454 self.inner.flush().await?;
455 }
456
457 Ok(())
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use std::{mem, num::NonZeroUsize};
464
465 use futures::poll;
466 use tokio::sync::mpsc::UnboundedSender;
467 use tokio_test::{assert_pending, assert_ready, task::spawn};
468 use tracing::Span;
469 use vector_buffers::{
470 WhenFull,
471 topology::{
472 builder::TopologyBuilder,
473 channel::{BufferReceiver, BufferSender},
474 },
475 };
476 use vrl::value::Value;
477
478 use super::{ControlMessage, Fanout};
479 use crate::{
480 config::ComponentKey,
481 event::{Event, EventArray, EventContainer, LogEvent},
482 test_util::{collect_ready, collect_ready_events},
483 };
484
485 async fn build_sender_pair(
486 capacity: usize,
487 ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
488 TopologyBuilder::standalone_memory(
489 NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
490 WhenFull::Block,
491 &Span::current(),
492 )
493 .await
494 }
495
496 async fn build_sender_pairs(
497 capacities: &[usize],
498 ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
499 let mut pairs = Vec::new();
500 for capacity in capacities {
501 pairs.push(build_sender_pair(*capacity).await);
502 }
503 pairs
504 }
505
506 async fn fanout_from_senders(
507 capacities: &[usize],
508 ) -> (
509 Fanout,
510 UnboundedSender<ControlMessage>,
511 Vec<BufferReceiver<EventArray>>,
512 ) {
513 let (mut fanout, control) = Fanout::new();
514 let pairs = build_sender_pairs(capacities).await;
515
516 let mut receivers = Vec::new();
517 for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
518 fanout.add(ComponentKey::from(i.to_string()), sender);
519 receivers.push(receiver);
520 }
521
522 (fanout, control, receivers)
523 }
524
525 async fn add_sender_to_fanout(
526 fanout: &mut Fanout,
527 receivers: &mut Vec<BufferReceiver<EventArray>>,
528 sender_id: usize,
529 capacity: usize,
530 ) {
531 let (sender, receiver) = build_sender_pair(capacity).await;
532 receivers.push(receiver);
533
534 fanout.add(ComponentKey::from(sender_id.to_string()), sender);
535 }
536
537 fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
538 control
539 .send(ControlMessage::Remove(ComponentKey::from(
540 sender_id.to_string(),
541 )))
542 .expect("sending control message should not fail");
543 }
544
545 async fn replace_sender_in_fanout(
546 control: &UnboundedSender<ControlMessage>,
547 receivers: &mut [BufferReceiver<EventArray>],
548 sender_id: usize,
549 capacity: usize,
550 ) -> BufferReceiver<EventArray> {
551 let (sender, receiver) = build_sender_pair(capacity).await;
552 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
553
554 control
555 .send(ControlMessage::Pause(ComponentKey::from(
556 sender_id.to_string(),
557 )))
558 .expect("sending control message should not fail");
559
560 control
561 .send(ControlMessage::Replace(
562 ComponentKey::from(sender_id.to_string()),
563 sender,
564 ))
565 .expect("sending control message should not fail");
566
567 old_receiver
568 }
569
570 async fn start_sender_replace(
571 control: &UnboundedSender<ControlMessage>,
572 receivers: &mut [BufferReceiver<EventArray>],
573 sender_id: usize,
574 capacity: usize,
575 ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
576 let (sender, receiver) = build_sender_pair(capacity).await;
577 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
578
579 control
580 .send(ControlMessage::Pause(ComponentKey::from(
581 sender_id.to_string(),
582 )))
583 .expect("sending control message should not fail");
584
585 (old_receiver, sender)
586 }
587
588 fn finish_sender_resume(
589 control: &UnboundedSender<ControlMessage>,
590 sender_id: usize,
591 sender: BufferSender<EventArray>,
592 ) {
593 control
594 .send(ControlMessage::Replace(
595 ComponentKey::from(sender_id.to_string()),
596 sender,
597 ))
598 .expect("sending control message should not fail");
599 }
600
601 fn unwrap_log_event_message<E>(event: E) -> String
602 where
603 E: EventContainer,
604 {
605 let event = event
606 .into_events()
607 .next()
608 .expect("must have at least one event");
609 let event = event.into_log();
610 event
611 .get("message")
612 .and_then(Value::as_bytes)
613 .and_then(|b| String::from_utf8(b.to_vec()).ok())
614 .expect("must be valid log event with `message` field")
615 }
616
617 #[tokio::test]
618 async fn fanout_writes_to_all() {
619 let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]).await;
620 let events = make_event_array(2);
621
622 let clones = events.clone();
623 fanout.send(clones, None).await.expect("should not fail");
624
625 for receiver in receivers {
626 assert_eq!(
627 collect_ready(receiver.into_stream()),
628 std::slice::from_ref(&events)
629 );
630 }
631 }
632
633 #[tokio::test]
634 async fn fanout_notready() {
635 let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]).await;
636 let events = make_events(2);
637
638 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
640 assert_ready!(first_send.poll()).expect("should not fail");
641 drop(first_send);
642
643 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
645 assert_pending!(second_send.poll());
646
647 for receiver in &mut receivers {
649 assert_eq!(Some(events[0].clone().into()), receiver.next().await);
650 }
651
652 assert_ready!(second_send.poll()).expect("should not fail");
654 drop(second_send);
655
656 for receiver in &mut receivers {
658 assert_eq!(Some(events[1].clone().into()), receiver.next().await);
659 }
660 }
661
662 #[tokio::test]
663 async fn fanout_grow() {
664 let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]).await;
665 let events = make_events(3);
666
667 fanout
669 .send(events[0].clone().into(), None)
670 .await
671 .expect("should not fail");
672 fanout
673 .send(events[1].clone().into(), None)
674 .await
675 .expect("should not fail");
676
677 add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4).await;
679
680 fanout
682 .send(events[2].clone().into(), None)
683 .await
684 .expect("should not fail");
685
686 let expected_events = [&events, &events, &events[2..]];
689 for (i, receiver) in receivers.into_iter().enumerate() {
690 assert_eq!(
691 collect_ready_events(receiver.into_stream()),
692 expected_events[i]
693 );
694 }
695 }
696
697 #[tokio::test]
698 async fn fanout_shrink() {
699 let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]).await;
700 let events = make_events(3);
701
702 fanout
704 .send(events[0].clone().into(), None)
705 .await
706 .expect("should not fail");
707 fanout
708 .send(events[1].clone().into(), None)
709 .await
710 .expect("should not fail");
711
712 remove_sender_from_fanout(&control, 1);
714
715 fanout
717 .send(events[2].clone().into(), None)
718 .await
719 .expect("should not fail");
720
721 let expected_events = [&events, &events[..2]];
723 for (i, receiver) in receivers.into_iter().enumerate() {
724 assert_eq!(
725 collect_ready_events(receiver.into_stream()),
726 expected_events[i]
727 );
728 }
729 }
730
731 #[tokio::test]
732 async fn fanout_shrink_when_notready() {
733 let events = make_events(2);
739 let expected_first_event = unwrap_log_event_message(events[0].clone());
740 let expected_second_event = unwrap_log_event_message(events[1].clone());
741
742 let cases = [
743 (
746 0,
747 false,
748 [
749 expected_second_event.clone(),
750 expected_first_event.clone(),
751 expected_second_event.clone(),
752 ],
753 ),
754 (
755 1,
756 true,
757 [
758 expected_second_event.clone(),
759 expected_second_event.clone(),
760 expected_second_event.clone(),
761 ],
762 ),
763 (
764 2,
765 false,
766 [
767 expected_second_event.clone(),
768 expected_first_event.clone(),
769 expected_second_event.clone(),
770 ],
771 ),
772 ];
773
774 for (sender_id, should_complete, expected_last_seen) in cases {
775 let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]).await;
776
777 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
779 assert_ready!(first_send.poll()).expect("should not fail");
780 drop(first_send);
781
782 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
784 assert_pending!(second_send.poll());
785
786 remove_sender_from_fanout(&control, sender_id);
788
789 if should_complete {
790 assert_ready!(second_send.poll()).expect("should not fail");
791 } else {
792 assert_pending!(second_send.poll());
793 }
794 drop(second_send);
795
796 drop(fanout);
798
799 let mut last_seen = Vec::new();
800 for receiver in &mut receivers {
801 let mut events = Vec::new();
802 while let Some(event) = receiver.next().await {
803 events.insert(0, event);
804 }
805
806 last_seen.push(unwrap_log_event_message(events.remove(0)));
807 }
808
809 assert_eq!(&expected_last_seen[..], &last_seen);
810 }
811 }
812
813 #[tokio::test]
814 async fn fanout_no_sinks() {
815 let (mut fanout, _) = Fanout::new();
816 let events = make_events(2);
817
818 fanout
819 .send(events[0].clone().into(), None)
820 .await
821 .expect("should not fail");
822 fanout
823 .send(events[1].clone().into(), None)
824 .await
825 .expect("should not fail");
826 }
827
828 #[tokio::test]
829 async fn fanout_replace() {
830 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]).await;
831 let events = make_events(3);
832
833 fanout
835 .send(events[0].clone().into(), None)
836 .await
837 .expect("should not fail");
838 fanout
839 .send(events[1].clone().into(), None)
840 .await
841 .expect("should not fail");
842
843 let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4).await;
845
846 fanout
848 .send(events[2].clone().into(), None)
849 .await
850 .expect("should not fail");
851
852 let expected_events = [&events[2..], &events, &events];
855 for (i, receiver) in receivers.into_iter().enumerate() {
856 assert_eq!(
857 collect_ready_events(receiver.into_stream()),
858 expected_events[i]
859 );
860 }
861
862 assert_eq!(
864 collect_ready_events(old_first_receiver.into_stream()),
865 &events[..2]
866 );
867 }
868
869 #[tokio::test]
870 async fn fanout_wait() {
871 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]).await;
872 let events = make_events(3);
873
874 let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
876 assert_ready!(poll!(send1)).expect("should not fail");
877 let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
878 assert_ready!(poll!(send2)).expect("should not fail");
879
880 let (old_first_receiver, new_first_sender) =
884 start_sender_replace(&control, &mut receivers, 0, 4).await;
885
886 let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
888 assert_pending!(third_send.poll());
889
890 finish_sender_resume(&control, 0, new_first_sender);
893 assert!(third_send.is_woken());
894 assert_ready!(third_send.poll()).expect("should not fail");
895
896 assert_eq!(
899 collect_ready_events(old_first_receiver.into_stream()),
900 &events[0..2]
901 );
902
903 let expected_events = [&events[2..], &events];
904 for (i, receiver) in receivers.into_iter().enumerate() {
905 assert_eq!(
906 collect_ready_events(receiver.into_stream()),
907 expected_events[i]
908 );
909 }
910 }
911
912 fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
913 (0..count).map(|i| LogEvent::from(format!("line {i}")))
914 }
915
916 fn make_events(count: usize) -> Vec<Event> {
917 make_events_inner(count).map(Into::into).collect()
918 }
919
920 fn make_event_array(count: usize) -> EventArray {
921 make_events_inner(count).collect::<Vec<_>>().into()
922 }
923}