1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{config::ComponentKey, event::EventArray};
11
12pub enum ControlMessage {
13 Add(ComponentKey, BufferSender<EventArray>),
15
16 Remove(ComponentKey),
18
19 Pause(ComponentKey),
24
25 Replace(ComponentKey, BufferSender<EventArray>),
27}
28
29impl fmt::Debug for ControlMessage {
30 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
31 write!(f, "ControlMessage::")?;
32 match self {
33 Self::Add(id, _) => write!(f, "Add({id:?})"),
34 Self::Remove(id) => write!(f, "Remove({id:?})"),
35 Self::Pause(id) => write!(f, "Pause({id:?})"),
36 Self::Replace(id, _) => write!(f, "Replace({id:?})"),
37 }
38 }
39}
40
41pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
44
45pub struct Fanout {
46 senders: IndexMap<ComponentKey, Option<Sender>>,
47 control_channel: mpsc::UnboundedReceiver<ControlMessage>,
48}
49
50impl Fanout {
51 pub fn new() -> (Self, ControlChannel) {
52 let (control_tx, control_rx) = mpsc::unbounded_channel();
53
54 let fanout = Self {
55 senders: Default::default(),
56 control_channel: control_rx,
57 };
58
59 (fanout, control_tx)
60 }
61
62 pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
68 assert!(
69 !self.senders.contains_key(&id),
70 "Adding duplicate output id to fanout: {id}"
71 );
72 self.senders.insert(id, Some(Sender::new(sink)));
73 }
74
75 fn remove(&mut self, id: &ComponentKey) {
76 assert!(
77 self.senders.shift_remove(id).is_some(),
78 "Removing nonexistent sink from fanout: {id}"
79 );
80 }
81
82 fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
83 match self.senders.get_mut(id) {
84 Some(sender) => {
85 assert!(
89 sender.replace(Sender::new(sink)).is_none(),
90 "Replacing existing sink is not valid: {id}"
91 );
92 }
93 None => panic!("Replacing unknown sink from fanout: {id}"),
94 }
95 }
96
97 fn pause(&mut self, id: &ComponentKey) {
98 match self.senders.get_mut(id) {
99 Some(sender) => {
100 assert!(
103 sender.take().is_some(),
104 "Pausing nonexistent sink is not valid: {id}"
105 );
106 }
107 None => panic!("Pausing unknown sink from fanout: {id}"),
108 }
109 }
110
111 fn apply_control_message(&mut self, message: ControlMessage) {
115 trace!("Processing control message outside of send: {:?}", message);
116
117 match message {
118 ControlMessage::Add(id, sink) => self.add(id, sink),
119 ControlMessage::Remove(id) => self.remove(&id),
120 ControlMessage::Pause(id) => self.pause(&id),
121 ControlMessage::Replace(id, sink) => self.replace(&id, sink),
122 }
123 }
124
125 async fn wait_for_replacements(&mut self) {
130 while self.senders.values().any(Option::is_none) {
131 if let Some(msg) = self.control_channel.recv().await {
132 self.apply_control_message(msg);
133 } else {
134 }
152 }
153 }
154
155 pub async fn send_stream(
171 &mut self,
172 events: impl Stream<Item = (EventArray, Instant)>,
173 ) -> crate::Result<()> {
174 tokio::pin!(events);
175 while let Some((event_array, send_reference)) = events.next().await {
176 self.send(event_array, Some(send_reference)).await?;
177 }
178 Ok(())
179 }
180
181 pub async fn send(
197 &mut self,
198 events: EventArray,
199 send_reference: Option<Instant>,
200 ) -> crate::Result<()> {
201 while let Ok(message) = self.control_channel.try_recv() {
203 self.apply_control_message(message);
204 }
205
206 self.wait_for_replacements().await;
208
209 if self.senders.is_empty() {
211 trace!("No senders present.");
212 return Ok(());
213 }
214
215 let mut control_channel_open = true;
222
223 let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
226
227 loop {
228 tokio::select! {
229 biased;
233
234 maybe_msg = self.control_channel.recv(), if control_channel_open => {
235 trace!("Processing control message inside of send: {:?}", maybe_msg);
236
237 match maybe_msg {
240 Some(ControlMessage::Add(id, sink)) => {
241 send_group.add(id, sink);
242 },
243 Some(ControlMessage::Remove(id)) => {
244 send_group.remove(&id);
245 },
246 Some(ControlMessage::Pause(id)) => {
247 send_group.pause(&id);
248 },
249 Some(ControlMessage::Replace(id, sink)) => {
250 send_group.replace(&id, Sender::new(sink));
251 },
252 None => {
253 control_channel_open = false;
255 }
256 }
257 }
258
259 result = send_group.send() => match result {
260 Ok(()) => {
261 trace!("Sent item to fanout.");
262 break;
263 },
264 Err(e) => return Err(e),
265 }
266 }
267 }
268
269 Ok(())
270 }
271}
272
273struct SendGroup<'a> {
274 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
275 sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
276}
277
278impl<'a> SendGroup<'a> {
279 fn new(
280 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
281 events: EventArray,
282 send_reference: Option<Instant>,
283 ) -> Self {
284 debug_assert!(senders.values().all(Option::is_some));
287
288 let last_sender_idx = senders.len().saturating_sub(1);
289 let mut events = Some(events);
290
291 let mut sends = HashMap::new();
294 for (i, (key, sender)) in senders.iter_mut().enumerate() {
295 let mut sender = sender
296 .take()
297 .expect("sender must be present to initialize SendGroup");
298
299 if i == last_sender_idx {
301 sender.input = events.take();
302 } else {
303 sender.input.clone_from(&events);
304 }
305 sender.send_reference = send_reference;
306
307 let send = async move {
309 sender.flush().await?;
310 Ok(sender)
311 };
312
313 sends.insert(key.clone(), ReusableBoxFuture::new(send));
314 }
315
316 Self { senders, sends }
317 }
318
319 fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
320 if let Some(send) = self.sends.remove(id) {
321 tokio::spawn(async move {
322 if let Err(e) = send.await {
323 warn!(
324 cause = %e,
325 message = "Encountered error writing to component after detaching from topology.",
326 );
327 }
328 });
329 true
330 } else {
331 false
332 }
333 }
334
335 #[allow(clippy::needless_pass_by_value)]
336 fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
337 assert!(
340 self.senders
341 .insert(id.clone(), Some(Sender::new(sink)))
342 .is_none(),
343 "Adding duplicate output id to fanout: {id}"
344 );
345 }
346
347 fn remove(&mut self, id: &ComponentKey) {
348 assert!(
352 self.senders.shift_remove(id).is_some(),
353 "Removing nonexistent sink from fanout: {id}"
354 );
355
356 self.try_detach_send(id);
361 }
362
363 fn replace(&mut self, id: &ComponentKey, sink: Sender) {
364 match self.senders.get_mut(id) {
365 Some(sender) => {
366 assert!(
370 sender.replace(sink).is_none(),
371 "Replacing existing sink is not valid: {id}"
372 );
373 }
374 None => panic!("Replacing unknown sink from fanout: {id}"),
375 }
376 }
377
378 fn pause(&mut self, id: &ComponentKey) {
379 match self.senders.get_mut(id) {
380 Some(sender) => {
381 if sender.take().is_none() {
388 assert!(
389 self.try_detach_send(id),
390 "Pausing already-paused sink is invalid: {id}"
391 );
392 }
393 }
394 None => panic!("Pausing unknown sink from fanout: {id}"),
395 }
396 }
397
398 async fn send(&mut self) -> crate::Result<()> {
399 loop {
403 if self.sends.is_empty() {
404 break;
405 }
406
407 let mut done = Vec::new();
408 for (key, send) in &mut self.sends {
409 if let Poll::Ready(result) = poll!(send.get_pin()) {
410 let sender = result?;
411
412 done.push((key.clone(), sender));
415 }
416 }
417
418 for (key, sender) in done {
419 self.sends.remove(&key);
420 self.replace(&key, sender);
421 }
422
423 if !self.sends.is_empty() {
424 pending!();
428 }
429 }
430
431 Ok(())
432 }
433}
434
435struct Sender {
436 inner: BufferSender<EventArray>,
437 input: Option<EventArray>,
438 send_reference: Option<Instant>,
439}
440
441impl Sender {
442 fn new(inner: BufferSender<EventArray>) -> Self {
443 Self {
444 inner,
445 input: None,
446 send_reference: None,
447 }
448 }
449
450 async fn flush(&mut self) -> crate::Result<()> {
451 let send_reference = self.send_reference.take();
452 if let Some(input) = self.input.take() {
453 self.inner.send(input, send_reference).await?;
454 self.inner.flush().await?;
455 }
456
457 Ok(())
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use std::mem;
464 use std::num::NonZeroUsize;
465
466 use futures::poll;
467 use tokio::sync::mpsc::UnboundedSender;
468 use tokio_test::{assert_pending, assert_ready, task::spawn};
469 use tracing::Span;
470 use vector_buffers::{
471 topology::{
472 builder::TopologyBuilder,
473 channel::{BufferReceiver, BufferSender},
474 },
475 WhenFull,
476 };
477 use vrl::value::Value;
478
479 use super::{ControlMessage, Fanout};
480 use crate::event::{Event, EventArray, LogEvent};
481 use crate::test_util::{collect_ready, collect_ready_events};
482 use crate::{config::ComponentKey, event::EventContainer};
483
484 async fn build_sender_pair(
485 capacity: usize,
486 ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
487 TopologyBuilder::standalone_memory(
488 NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
489 WhenFull::Block,
490 &Span::current(),
491 )
492 .await
493 }
494
495 async fn build_sender_pairs(
496 capacities: &[usize],
497 ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
498 let mut pairs = Vec::new();
499 for capacity in capacities {
500 pairs.push(build_sender_pair(*capacity).await);
501 }
502 pairs
503 }
504
505 async fn fanout_from_senders(
506 capacities: &[usize],
507 ) -> (
508 Fanout,
509 UnboundedSender<ControlMessage>,
510 Vec<BufferReceiver<EventArray>>,
511 ) {
512 let (mut fanout, control) = Fanout::new();
513 let pairs = build_sender_pairs(capacities).await;
514
515 let mut receivers = Vec::new();
516 for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
517 fanout.add(ComponentKey::from(i.to_string()), sender);
518 receivers.push(receiver);
519 }
520
521 (fanout, control, receivers)
522 }
523
524 async fn add_sender_to_fanout(
525 fanout: &mut Fanout,
526 receivers: &mut Vec<BufferReceiver<EventArray>>,
527 sender_id: usize,
528 capacity: usize,
529 ) {
530 let (sender, receiver) = build_sender_pair(capacity).await;
531 receivers.push(receiver);
532
533 fanout.add(ComponentKey::from(sender_id.to_string()), sender);
534 }
535
536 fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
537 control
538 .send(ControlMessage::Remove(ComponentKey::from(
539 sender_id.to_string(),
540 )))
541 .expect("sending control message should not fail");
542 }
543
544 async fn replace_sender_in_fanout(
545 control: &UnboundedSender<ControlMessage>,
546 receivers: &mut [BufferReceiver<EventArray>],
547 sender_id: usize,
548 capacity: usize,
549 ) -> BufferReceiver<EventArray> {
550 let (sender, receiver) = build_sender_pair(capacity).await;
551 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
552
553 control
554 .send(ControlMessage::Pause(ComponentKey::from(
555 sender_id.to_string(),
556 )))
557 .expect("sending control message should not fail");
558
559 control
560 .send(ControlMessage::Replace(
561 ComponentKey::from(sender_id.to_string()),
562 sender,
563 ))
564 .expect("sending control message should not fail");
565
566 old_receiver
567 }
568
569 async fn start_sender_replace(
570 control: &UnboundedSender<ControlMessage>,
571 receivers: &mut [BufferReceiver<EventArray>],
572 sender_id: usize,
573 capacity: usize,
574 ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
575 let (sender, receiver) = build_sender_pair(capacity).await;
576 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
577
578 control
579 .send(ControlMessage::Pause(ComponentKey::from(
580 sender_id.to_string(),
581 )))
582 .expect("sending control message should not fail");
583
584 (old_receiver, sender)
585 }
586
587 fn finish_sender_resume(
588 control: &UnboundedSender<ControlMessage>,
589 sender_id: usize,
590 sender: BufferSender<EventArray>,
591 ) {
592 control
593 .send(ControlMessage::Replace(
594 ComponentKey::from(sender_id.to_string()),
595 sender,
596 ))
597 .expect("sending control message should not fail");
598 }
599
600 fn unwrap_log_event_message<E>(event: E) -> String
601 where
602 E: EventContainer,
603 {
604 let event = event
605 .into_events()
606 .next()
607 .expect("must have at least one event");
608 let event = event.into_log();
609 event
610 .get("message")
611 .and_then(Value::as_bytes)
612 .and_then(|b| String::from_utf8(b.to_vec()).ok())
613 .expect("must be valid log event with `message` field")
614 }
615
616 #[tokio::test]
617 async fn fanout_writes_to_all() {
618 let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]).await;
619 let events = make_event_array(2);
620
621 let clones = events.clone();
622 fanout.send(clones, None).await.expect("should not fail");
623
624 for receiver in receivers {
625 assert_eq!(collect_ready(receiver.into_stream()), &[events.clone()]);
626 }
627 }
628
629 #[tokio::test]
630 async fn fanout_notready() {
631 let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]).await;
632 let events = make_events(2);
633
634 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
636 assert_ready!(first_send.poll()).expect("should not fail");
637 drop(first_send);
638
639 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
641 assert_pending!(second_send.poll());
642
643 for receiver in &mut receivers {
645 assert_eq!(Some(events[0].clone().into()), receiver.next().await);
646 }
647
648 assert_ready!(second_send.poll()).expect("should not fail");
650 drop(second_send);
651
652 for receiver in &mut receivers {
654 assert_eq!(Some(events[1].clone().into()), receiver.next().await);
655 }
656 }
657
658 #[tokio::test]
659 async fn fanout_grow() {
660 let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]).await;
661 let events = make_events(3);
662
663 fanout
665 .send(events[0].clone().into(), None)
666 .await
667 .expect("should not fail");
668 fanout
669 .send(events[1].clone().into(), None)
670 .await
671 .expect("should not fail");
672
673 add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4).await;
675
676 fanout
678 .send(events[2].clone().into(), None)
679 .await
680 .expect("should not fail");
681
682 let expected_events = [&events, &events, &events[2..]];
685 for (i, receiver) in receivers.into_iter().enumerate() {
686 assert_eq!(
687 collect_ready_events(receiver.into_stream()),
688 expected_events[i]
689 );
690 }
691 }
692
693 #[tokio::test]
694 async fn fanout_shrink() {
695 let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]).await;
696 let events = make_events(3);
697
698 fanout
700 .send(events[0].clone().into(), None)
701 .await
702 .expect("should not fail");
703 fanout
704 .send(events[1].clone().into(), None)
705 .await
706 .expect("should not fail");
707
708 remove_sender_from_fanout(&control, 1);
710
711 fanout
713 .send(events[2].clone().into(), None)
714 .await
715 .expect("should not fail");
716
717 let expected_events = [&events, &events[..2]];
719 for (i, receiver) in receivers.into_iter().enumerate() {
720 assert_eq!(
721 collect_ready_events(receiver.into_stream()),
722 expected_events[i]
723 );
724 }
725 }
726
727 #[tokio::test]
728 async fn fanout_shrink_when_notready() {
729 let events = make_events(2);
735 let expected_first_event = unwrap_log_event_message(events[0].clone());
736 let expected_second_event = unwrap_log_event_message(events[1].clone());
737
738 let cases = [
739 (
742 0,
743 false,
744 [
745 expected_second_event.clone(),
746 expected_first_event.clone(),
747 expected_second_event.clone(),
748 ],
749 ),
750 (
751 1,
752 true,
753 [
754 expected_second_event.clone(),
755 expected_second_event.clone(),
756 expected_second_event.clone(),
757 ],
758 ),
759 (
760 2,
761 false,
762 [
763 expected_second_event.clone(),
764 expected_first_event.clone(),
765 expected_second_event.clone(),
766 ],
767 ),
768 ];
769
770 for (sender_id, should_complete, expected_last_seen) in cases {
771 let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]).await;
772
773 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
775 assert_ready!(first_send.poll()).expect("should not fail");
776 drop(first_send);
777
778 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
780 assert_pending!(second_send.poll());
781
782 remove_sender_from_fanout(&control, sender_id);
784
785 if should_complete {
786 assert_ready!(second_send.poll()).expect("should not fail");
787 } else {
788 assert_pending!(second_send.poll());
789 }
790 drop(second_send);
791
792 drop(fanout);
794
795 let mut last_seen = Vec::new();
796 for receiver in &mut receivers {
797 let mut events = Vec::new();
798 while let Some(event) = receiver.next().await {
799 events.insert(0, event);
800 }
801
802 last_seen.push(unwrap_log_event_message(events.remove(0)));
803 }
804
805 assert_eq!(&expected_last_seen[..], &last_seen);
806 }
807 }
808
809 #[tokio::test]
810 async fn fanout_no_sinks() {
811 let (mut fanout, _) = Fanout::new();
812 let events = make_events(2);
813
814 fanout
815 .send(events[0].clone().into(), None)
816 .await
817 .expect("should not fail");
818 fanout
819 .send(events[1].clone().into(), None)
820 .await
821 .expect("should not fail");
822 }
823
824 #[tokio::test]
825 async fn fanout_replace() {
826 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]).await;
827 let events = make_events(3);
828
829 fanout
831 .send(events[0].clone().into(), None)
832 .await
833 .expect("should not fail");
834 fanout
835 .send(events[1].clone().into(), None)
836 .await
837 .expect("should not fail");
838
839 let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4).await;
841
842 fanout
844 .send(events[2].clone().into(), None)
845 .await
846 .expect("should not fail");
847
848 let expected_events = [&events[2..], &events, &events];
851 for (i, receiver) in receivers.into_iter().enumerate() {
852 assert_eq!(
853 collect_ready_events(receiver.into_stream()),
854 expected_events[i]
855 );
856 }
857
858 assert_eq!(
860 collect_ready_events(old_first_receiver.into_stream()),
861 &events[..2]
862 );
863 }
864
865 #[tokio::test]
866 async fn fanout_wait() {
867 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]).await;
868 let events = make_events(3);
869
870 let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
872 assert_ready!(poll!(send1)).expect("should not fail");
873 let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
874 assert_ready!(poll!(send2)).expect("should not fail");
875
876 let (old_first_receiver, new_first_sender) =
880 start_sender_replace(&control, &mut receivers, 0, 4).await;
881
882 let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
884 assert_pending!(third_send.poll());
885
886 finish_sender_resume(&control, 0, new_first_sender);
889 assert!(third_send.is_woken());
890 assert_ready!(third_send.poll()).expect("should not fail");
891
892 assert_eq!(
895 collect_ready_events(old_first_receiver.into_stream()),
896 &events[0..2]
897 );
898
899 let expected_events = [&events[2..], &events];
900 for (i, receiver) in receivers.into_iter().enumerate() {
901 assert_eq!(
902 collect_ready_events(receiver.into_stream()),
903 expected_events[i]
904 );
905 }
906 }
907
908 fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
909 (0..count).map(|i| LogEvent::from(format!("line {i}")))
910 }
911
912 fn make_events(count: usize) -> Vec<Event> {
913 make_events_inner(count).map(Into::into).collect()
914 }
915
916 fn make_event_array(count: usize) -> EventArray {
917 make_events_inner(count).collect::<Vec<_>>().into()
918 }
919}