1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{config::ComponentKey, event::EventArray};
11
12pub enum ControlMessage {
13 Add(ComponentKey, BufferSender<EventArray>),
15
16 Remove(ComponentKey),
18
19 Pause(ComponentKey),
24
25 Replace(ComponentKey, BufferSender<EventArray>),
27}
28
29impl fmt::Debug for ControlMessage {
30 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
31 write!(f, "ControlMessage::")?;
32 match self {
33 Self::Add(id, _) => write!(f, "Add({id:?})"),
34 Self::Remove(id) => write!(f, "Remove({id:?})"),
35 Self::Pause(id) => write!(f, "Pause({id:?})"),
36 Self::Replace(id, _) => write!(f, "Replace({id:?})"),
37 }
38 }
39}
40
41pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
44
45pub struct Fanout {
46 senders: IndexMap<ComponentKey, Option<Sender>>,
47 control_channel: mpsc::UnboundedReceiver<ControlMessage>,
48}
49
50impl Fanout {
51 pub fn new() -> (Self, ControlChannel) {
52 let (control_tx, control_rx) = mpsc::unbounded_channel();
53
54 let fanout = Self {
55 senders: Default::default(),
56 control_channel: control_rx,
57 };
58
59 (fanout, control_tx)
60 }
61
62 pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
68 assert!(
69 !self.senders.contains_key(&id),
70 "Adding duplicate output id to fanout: {id}"
71 );
72 self.senders.insert(id, Some(Sender::new(sink)));
73 }
74
75 fn remove(&mut self, id: &ComponentKey) {
76 assert!(
77 self.senders.shift_remove(id).is_some(),
78 "Removing nonexistent sink from fanout: {id}"
79 );
80 }
81
82 fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
83 match self.senders.get_mut(id) {
84 Some(sender) => {
85 assert!(
89 sender.replace(Sender::new(sink)).is_none(),
90 "Replacing existing sink is not valid: {id}"
91 );
92 }
93 None => panic!("Replacing unknown sink from fanout: {id}"),
94 }
95 }
96
97 fn pause(&mut self, id: &ComponentKey) {
98 match self.senders.get_mut(id) {
99 Some(sender) => {
100 assert!(
103 sender.take().is_some(),
104 "Pausing nonexistent sink is not valid: {id}"
105 );
106 }
107 None => panic!("Pausing unknown sink from fanout: {id}"),
108 }
109 }
110
111 fn apply_control_message(&mut self, message: ControlMessage) {
115 trace!("Processing control message outside of send: {:?}", message);
116
117 match message {
118 ControlMessage::Add(id, sink) => self.add(id, sink),
119 ControlMessage::Remove(id) => self.remove(&id),
120 ControlMessage::Pause(id) => self.pause(&id),
121 ControlMessage::Replace(id, sink) => self.replace(&id, sink),
122 }
123 }
124
125 async fn wait_for_replacements(&mut self) {
130 while self.senders.values().any(Option::is_none) {
131 if let Some(msg) = self.control_channel.recv().await {
132 self.apply_control_message(msg);
133 } else {
134 }
152 }
153 }
154
155 pub async fn send_stream(
171 &mut self,
172 events: impl Stream<Item = (EventArray, Instant)>,
173 ) -> crate::Result<()> {
174 tokio::pin!(events);
175 while let Some((event_array, send_reference)) = events.next().await {
176 self.send(event_array, Some(send_reference)).await?;
177 }
178 Ok(())
179 }
180
181 pub async fn send(
197 &mut self,
198 events: EventArray,
199 send_reference: Option<Instant>,
200 ) -> crate::Result<()> {
201 while let Ok(message) = self.control_channel.try_recv() {
203 self.apply_control_message(message);
204 }
205
206 self.wait_for_replacements().await;
208
209 if self.senders.is_empty() {
211 trace!("No senders present.");
212 return Ok(());
213 }
214
215 let mut control_channel_open = true;
222
223 let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
226
227 loop {
228 tokio::select! {
229 biased;
233
234 maybe_msg = self.control_channel.recv(), if control_channel_open => {
235 trace!("Processing control message inside of send: {:?}", maybe_msg);
236
237 match maybe_msg {
240 Some(ControlMessage::Add(id, sink)) => {
241 send_group.add(id, sink);
242 },
243 Some(ControlMessage::Remove(id)) => {
244 send_group.remove(&id);
245 },
246 Some(ControlMessage::Pause(id)) => {
247 send_group.pause(&id);
248 },
249 Some(ControlMessage::Replace(id, sink)) => {
250 send_group.replace(&id, Sender::new(sink));
251 },
252 None => {
253 control_channel_open = false;
255 }
256 }
257 }
258
259 result = send_group.send() => match result {
260 Ok(()) => {
261 trace!("Sent item to fanout.");
262 break;
263 },
264 Err(e) => return Err(e),
265 }
266 }
267 }
268
269 Ok(())
270 }
271}
272
273struct SendGroup<'a> {
274 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
275 sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
276}
277
278impl<'a> SendGroup<'a> {
279 fn new(
280 senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
281 events: EventArray,
282 send_reference: Option<Instant>,
283 ) -> Self {
284 debug_assert!(senders.values().all(Option::is_some));
287
288 let last_sender_idx = senders.len().saturating_sub(1);
289 let mut events = Some(events);
290
291 let mut sends = HashMap::new();
294 for (i, (key, sender)) in senders.iter_mut().enumerate() {
295 let mut sender = sender
296 .take()
297 .expect("sender must be present to initialize SendGroup");
298
299 if i == last_sender_idx {
301 sender.input = events.take();
302 } else {
303 sender.input.clone_from(&events);
304 }
305 sender.send_reference = send_reference;
306
307 let send = async move {
309 sender.flush().await?;
310 Ok(sender)
311 };
312
313 sends.insert(key.clone(), ReusableBoxFuture::new(send));
314 }
315
316 Self { senders, sends }
317 }
318
319 fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
320 if let Some(send) = self.sends.remove(id) {
321 tokio::spawn(async move {
322 if let Err(e) = send.await {
323 warn!(
324 cause = %e,
325 message = "Encountered error writing to component after detaching from topology.",
326 );
327 }
328 });
329 true
330 } else {
331 false
332 }
333 }
334
335 #[allow(clippy::needless_pass_by_value)]
336 fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
337 assert!(
340 self.senders
341 .insert(id.clone(), Some(Sender::new(sink)))
342 .is_none(),
343 "Adding duplicate output id to fanout: {id}"
344 );
345 }
346
347 fn remove(&mut self, id: &ComponentKey) {
348 assert!(
352 self.senders.shift_remove(id).is_some(),
353 "Removing nonexistent sink from fanout: {id}"
354 );
355
356 self.try_detach_send(id);
361 }
362
363 fn replace(&mut self, id: &ComponentKey, sink: Sender) {
364 match self.senders.get_mut(id) {
365 Some(sender) => {
366 assert!(
370 sender.replace(sink).is_none(),
371 "Replacing existing sink is not valid: {id}"
372 );
373 }
374 None => panic!("Replacing unknown sink from fanout: {id}"),
375 }
376 }
377
378 fn pause(&mut self, id: &ComponentKey) {
379 match self.senders.get_mut(id) {
380 Some(sender) => {
381 if sender.take().is_none() {
388 assert!(
389 self.try_detach_send(id),
390 "Pausing already-paused sink is invalid: {id}"
391 );
392 }
393 }
394 None => panic!("Pausing unknown sink from fanout: {id}"),
395 }
396 }
397
398 async fn send(&mut self) -> crate::Result<()> {
399 loop {
403 if self.sends.is_empty() {
404 break;
405 }
406
407 let mut done = Vec::new();
408 for (key, send) in &mut self.sends {
409 if let Poll::Ready(result) = poll!(send.get_pin()) {
410 let sender = result?;
411
412 done.push((key.clone(), sender));
415 }
416 }
417
418 for (key, sender) in done {
419 self.sends.remove(&key);
420 self.replace(&key, sender);
421 }
422
423 if !self.sends.is_empty() {
424 pending!();
428 }
429 }
430
431 Ok(())
432 }
433}
434
435struct Sender {
436 inner: BufferSender<EventArray>,
437 input: Option<EventArray>,
438 send_reference: Option<Instant>,
439}
440
441impl Sender {
442 fn new(inner: BufferSender<EventArray>) -> Self {
443 Self {
444 inner,
445 input: None,
446 send_reference: None,
447 }
448 }
449
450 async fn flush(&mut self) -> crate::Result<()> {
451 let send_reference = self.send_reference.take();
452 if let Some(input) = self.input.take() {
453 self.inner.send(input, send_reference).await?;
454 self.inner.flush().await?;
455 }
456
457 Ok(())
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use std::{mem, num::NonZeroUsize};
464
465 use futures::poll;
466 use tokio::sync::mpsc::UnboundedSender;
467 use tokio_test::{assert_pending, assert_ready, task::spawn};
468 use tracing::Span;
469 use vector_buffers::{
470 WhenFull,
471 topology::{
472 builder::TopologyBuilder,
473 channel::{BufferReceiver, BufferSender},
474 },
475 };
476 use vrl::value::Value;
477
478 use super::{ControlMessage, Fanout};
479 use crate::{
480 config::ComponentKey,
481 event::{Event, EventArray, EventContainer, LogEvent},
482 test_util::{collect_ready, collect_ready_events},
483 };
484
485 fn build_sender_pair(
486 capacity: usize,
487 ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
488 TopologyBuilder::standalone_memory(
489 NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
490 WhenFull::Block,
491 &Span::current(),
492 None,
493 None,
494 )
495 }
496
497 fn build_sender_pairs(
498 capacities: &[usize],
499 ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
500 let mut pairs = Vec::new();
501 for capacity in capacities {
502 pairs.push(build_sender_pair(*capacity));
503 }
504 pairs
505 }
506
507 fn fanout_from_senders(
508 capacities: &[usize],
509 ) -> (
510 Fanout,
511 UnboundedSender<ControlMessage>,
512 Vec<BufferReceiver<EventArray>>,
513 ) {
514 let (mut fanout, control) = Fanout::new();
515 let pairs = build_sender_pairs(capacities);
516
517 let mut receivers = Vec::new();
518 for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
519 fanout.add(ComponentKey::from(i.to_string()), sender);
520 receivers.push(receiver);
521 }
522
523 (fanout, control, receivers)
524 }
525
526 fn add_sender_to_fanout(
527 fanout: &mut Fanout,
528 receivers: &mut Vec<BufferReceiver<EventArray>>,
529 sender_id: usize,
530 capacity: usize,
531 ) {
532 let (sender, receiver) = build_sender_pair(capacity);
533 receivers.push(receiver);
534
535 fanout.add(ComponentKey::from(sender_id.to_string()), sender);
536 }
537
538 fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
539 control
540 .send(ControlMessage::Remove(ComponentKey::from(
541 sender_id.to_string(),
542 )))
543 .expect("sending control message should not fail");
544 }
545
546 fn replace_sender_in_fanout(
547 control: &UnboundedSender<ControlMessage>,
548 receivers: &mut [BufferReceiver<EventArray>],
549 sender_id: usize,
550 capacity: usize,
551 ) -> BufferReceiver<EventArray> {
552 let (sender, receiver) = build_sender_pair(capacity);
553 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
554
555 control
556 .send(ControlMessage::Pause(ComponentKey::from(
557 sender_id.to_string(),
558 )))
559 .expect("sending control message should not fail");
560
561 control
562 .send(ControlMessage::Replace(
563 ComponentKey::from(sender_id.to_string()),
564 sender,
565 ))
566 .expect("sending control message should not fail");
567
568 old_receiver
569 }
570
571 fn start_sender_replace(
572 control: &UnboundedSender<ControlMessage>,
573 receivers: &mut [BufferReceiver<EventArray>],
574 sender_id: usize,
575 capacity: usize,
576 ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
577 let (sender, receiver) = build_sender_pair(capacity);
578 let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
579
580 control
581 .send(ControlMessage::Pause(ComponentKey::from(
582 sender_id.to_string(),
583 )))
584 .expect("sending control message should not fail");
585
586 (old_receiver, sender)
587 }
588
589 fn finish_sender_resume(
590 control: &UnboundedSender<ControlMessage>,
591 sender_id: usize,
592 sender: BufferSender<EventArray>,
593 ) {
594 control
595 .send(ControlMessage::Replace(
596 ComponentKey::from(sender_id.to_string()),
597 sender,
598 ))
599 .expect("sending control message should not fail");
600 }
601
602 fn unwrap_log_event_message<E>(event: E) -> String
603 where
604 E: EventContainer,
605 {
606 let event = event
607 .into_events()
608 .next()
609 .expect("must have at least one event");
610 let event = event.into_log();
611 event
612 .get("message")
613 .and_then(Value::as_bytes)
614 .and_then(|b| String::from_utf8(b.to_vec()).ok())
615 .expect("must be valid log event with `message` field")
616 }
617
618 #[tokio::test]
619 async fn fanout_writes_to_all() {
620 let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]);
621 let events = make_event_array(2);
622
623 let clones = events.clone();
624 fanout.send(clones, None).await.expect("should not fail");
625
626 for receiver in receivers {
627 assert_eq!(
628 collect_ready(receiver.into_stream()),
629 std::slice::from_ref(&events)
630 );
631 }
632 }
633
634 #[tokio::test]
635 async fn fanout_notready() {
636 let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]);
637 let events = make_events(2);
638
639 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
641 assert_ready!(first_send.poll()).expect("should not fail");
642 drop(first_send);
643
644 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
646 assert_pending!(second_send.poll());
647
648 for receiver in &mut receivers {
650 assert_eq!(Some(events[0].clone().into()), receiver.next().await);
651 }
652
653 assert_ready!(second_send.poll()).expect("should not fail");
655 drop(second_send);
656
657 for receiver in &mut receivers {
659 assert_eq!(Some(events[1].clone().into()), receiver.next().await);
660 }
661 }
662
663 #[tokio::test]
664 async fn fanout_grow() {
665 let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]);
666 let events = make_events(3);
667
668 fanout
670 .send(events[0].clone().into(), None)
671 .await
672 .expect("should not fail");
673 fanout
674 .send(events[1].clone().into(), None)
675 .await
676 .expect("should not fail");
677
678 add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4);
680
681 fanout
683 .send(events[2].clone().into(), None)
684 .await
685 .expect("should not fail");
686
687 let expected_events = [&events, &events, &events[2..]];
690 for (i, receiver) in receivers.into_iter().enumerate() {
691 assert_eq!(
692 collect_ready_events(receiver.into_stream()),
693 expected_events[i]
694 );
695 }
696 }
697
698 #[tokio::test]
699 async fn fanout_shrink() {
700 let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]);
701 let events = make_events(3);
702
703 fanout
705 .send(events[0].clone().into(), None)
706 .await
707 .expect("should not fail");
708 fanout
709 .send(events[1].clone().into(), None)
710 .await
711 .expect("should not fail");
712
713 remove_sender_from_fanout(&control, 1);
715
716 fanout
718 .send(events[2].clone().into(), None)
719 .await
720 .expect("should not fail");
721
722 let expected_events = [&events, &events[..2]];
724 for (i, receiver) in receivers.into_iter().enumerate() {
725 assert_eq!(
726 collect_ready_events(receiver.into_stream()),
727 expected_events[i]
728 );
729 }
730 }
731
732 #[tokio::test]
733 async fn fanout_shrink_when_notready() {
734 let events = make_events(2);
740 let expected_first_event = unwrap_log_event_message(events[0].clone());
741 let expected_second_event = unwrap_log_event_message(events[1].clone());
742
743 let cases = [
744 (
747 0,
748 false,
749 [
750 expected_second_event.clone(),
751 expected_first_event.clone(),
752 expected_second_event.clone(),
753 ],
754 ),
755 (
756 1,
757 true,
758 [
759 expected_second_event.clone(),
760 expected_second_event.clone(),
761 expected_second_event.clone(),
762 ],
763 ),
764 (
765 2,
766 false,
767 [
768 expected_second_event.clone(),
769 expected_first_event.clone(),
770 expected_second_event.clone(),
771 ],
772 ),
773 ];
774
775 for (sender_id, should_complete, expected_last_seen) in cases {
776 let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]);
777
778 let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
780 assert_ready!(first_send.poll()).expect("should not fail");
781 drop(first_send);
782
783 let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
785 assert_pending!(second_send.poll());
786
787 remove_sender_from_fanout(&control, sender_id);
789
790 if should_complete {
791 assert_ready!(second_send.poll()).expect("should not fail");
792 } else {
793 assert_pending!(second_send.poll());
794 }
795 drop(second_send);
796
797 drop(fanout);
799
800 let mut last_seen = Vec::new();
801 for receiver in &mut receivers {
802 let mut events = Vec::new();
803 while let Some(event) = receiver.next().await {
804 events.insert(0, event);
805 }
806
807 last_seen.push(unwrap_log_event_message(events.remove(0)));
808 }
809
810 assert_eq!(&expected_last_seen[..], &last_seen);
811 }
812 }
813
814 #[tokio::test]
815 async fn fanout_no_sinks() {
816 let (mut fanout, _) = Fanout::new();
817 let events = make_events(2);
818
819 fanout
820 .send(events[0].clone().into(), None)
821 .await
822 .expect("should not fail");
823 fanout
824 .send(events[1].clone().into(), None)
825 .await
826 .expect("should not fail");
827 }
828
829 #[tokio::test]
830 async fn fanout_replace() {
831 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]);
832 let events = make_events(3);
833
834 fanout
836 .send(events[0].clone().into(), None)
837 .await
838 .expect("should not fail");
839 fanout
840 .send(events[1].clone().into(), None)
841 .await
842 .expect("should not fail");
843
844 let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4);
846
847 fanout
849 .send(events[2].clone().into(), None)
850 .await
851 .expect("should not fail");
852
853 let expected_events = [&events[2..], &events, &events];
856 for (i, receiver) in receivers.into_iter().enumerate() {
857 assert_eq!(
858 collect_ready_events(receiver.into_stream()),
859 expected_events[i]
860 );
861 }
862
863 assert_eq!(
865 collect_ready_events(old_first_receiver.into_stream()),
866 &events[..2]
867 );
868 }
869
870 #[tokio::test]
871 async fn fanout_wait() {
872 let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]);
873 let events = make_events(3);
874
875 let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
877 assert_ready!(poll!(send1)).expect("should not fail");
878 let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
879 assert_ready!(poll!(send2)).expect("should not fail");
880
881 let (old_first_receiver, new_first_sender) =
885 start_sender_replace(&control, &mut receivers, 0, 4);
886
887 let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
889 assert_pending!(third_send.poll());
890
891 finish_sender_resume(&control, 0, new_first_sender);
894 assert!(third_send.is_woken());
895 assert_ready!(third_send.poll()).expect("should not fail");
896
897 assert_eq!(
900 collect_ready_events(old_first_receiver.into_stream()),
901 &events[0..2]
902 );
903
904 let expected_events = [&events[2..], &events];
905 for (i, receiver) in receivers.into_iter().enumerate() {
906 assert_eq!(
907 collect_ready_events(receiver.into_stream()),
908 expected_events[i]
909 );
910 }
911 }
912
913 fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
914 (0..count).map(|i| LogEvent::from(format!("line {i}")))
915 }
916
917 fn make_events(count: usize) -> Vec<Event> {
918 make_events_inner(count).map(Into::into).collect()
919 }
920
921 fn make_event_array(count: usize) -> EventArray {
922 make_events_inner(count).collect::<Vec<_>>().into()
923 }
924}