vector_core/
fanout.rs

1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{config::ComponentKey, event::EventArray};
11
12pub enum ControlMessage {
13    /// Adds a new sink to the fanout.
14    Add(ComponentKey, BufferSender<EventArray>),
15
16    /// Removes a sink from the fanout.
17    Remove(ComponentKey),
18
19    /// Pauses a sink in the fanout.
20    ///
21    /// If a fanout has any paused sinks, subsequent sends cannot proceed until all paused sinks
22    /// have been replaced.
23    Pause(ComponentKey),
24
25    /// Replaces a paused sink with its new sender.
26    Replace(ComponentKey, BufferSender<EventArray>),
27}
28
29impl fmt::Debug for ControlMessage {
30    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
31        write!(f, "ControlMessage::")?;
32        match self {
33            Self::Add(id, _) => write!(f, "Add({id:?})"),
34            Self::Remove(id) => write!(f, "Remove({id:?})"),
35            Self::Pause(id) => write!(f, "Pause({id:?})"),
36            Self::Replace(id, _) => write!(f, "Replace({id:?})"),
37        }
38    }
39}
40
41// TODO: We should really wrap this in a custom type that has dedicated methods for each operation
42// so that high-lever components don't need to do the raw channel sends, etc.
43pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
44
45pub struct Fanout {
46    senders: IndexMap<ComponentKey, Option<Sender>>,
47    control_channel: mpsc::UnboundedReceiver<ControlMessage>,
48}
49
50impl Fanout {
51    pub fn new() -> (Self, ControlChannel) {
52        let (control_tx, control_rx) = mpsc::unbounded_channel();
53
54        let fanout = Self {
55            senders: Default::default(),
56            control_channel: control_rx,
57        };
58
59        (fanout, control_tx)
60    }
61
62    /// Add a new sink as an output.
63    ///
64    /// # Panics
65    ///
66    /// Function will panic if a sink with the same ID is already present.
67    pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
68        assert!(
69            !self.senders.contains_key(&id),
70            "Adding duplicate output id to fanout: {id}"
71        );
72        self.senders.insert(id, Some(Sender::new(sink)));
73    }
74
75    fn remove(&mut self, id: &ComponentKey) {
76        assert!(
77            self.senders.shift_remove(id).is_some(),
78            "Removing nonexistent sink from fanout: {id}"
79        );
80    }
81
82    fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
83        match self.senders.get_mut(id) {
84            Some(sender) => {
85                // While a sink must be _known_ to be replaced, it must also be empty (previously
86                // paused or consumed when the `SendGroup` was created), otherwise an invalid
87                // sequence of control operations has been applied.
88                assert!(
89                    sender.replace(Sender::new(sink)).is_none(),
90                    "Replacing existing sink is not valid: {id}"
91                );
92            }
93            None => panic!("Replacing unknown sink from fanout: {id}"),
94        }
95    }
96
97    fn pause(&mut self, id: &ComponentKey) {
98        match self.senders.get_mut(id) {
99            Some(sender) => {
100                // A sink must be known and present to be replaced, otherwise an invalid sequence of
101                // control operations has been applied.
102                assert!(
103                    sender.take().is_some(),
104                    "Pausing nonexistent sink is not valid: {id}"
105                );
106            }
107            None => panic!("Pausing unknown sink from fanout: {id}"),
108        }
109    }
110
111    /// Apply a control message directly against this instance.
112    ///
113    /// This method should not be used if there is an active `SendGroup` being processed.
114    fn apply_control_message(&mut self, message: ControlMessage) {
115        trace!("Processing control message outside of send: {:?}", message);
116
117        match message {
118            ControlMessage::Add(id, sink) => self.add(id, sink),
119            ControlMessage::Remove(id) => self.remove(&id),
120            ControlMessage::Pause(id) => self.pause(&id),
121            ControlMessage::Replace(id, sink) => self.replace(&id, sink),
122        }
123    }
124
125    /// Waits for all paused sinks to be replaced.
126    ///
127    /// Control messages are processed until all senders have been replaced, so it is guaranteed
128    /// that when this method returns, all senders are ready for the next send to be triggered.
129    async fn wait_for_replacements(&mut self) {
130        while self.senders.values().any(Option::is_none) {
131            if let Some(msg) = self.control_channel.recv().await {
132                self.apply_control_message(msg);
133            } else {
134                // If the control channel is closed, there's nothing else we can do.
135
136                // TODO: It _seems_ like we should probably panic here, or at least return.
137                //
138                // Essentially, we should only land here if the control channel is closed but we
139                // haven't yet replaced all of the paused sinks... and we shouldn't have any paused
140                // sinks if Vector is stopping normally/gracefully, so like... we'd only get
141                // here during a configuration reload where we panicked in another thread due to
142                // an error of some sort, and the control channel got dropped, closed itself, and
143                // we're never going to be able to recover.
144                //
145                // The flipside is that by leaving it as-is, in the above hypothesized scenario,
146                // we'd avoid emitting additional panics/error logging when the root cause error was
147                // already doing so, like there's little value in knowing the fanout also hit an
148                // unrecoverable state if the whole process is about to come crashing down
149                // anyways... but it still does feel weird to have that encoded here by virtue of
150                // only a comment, and not an actual terminating expression. *shrug*
151            }
152        }
153    }
154
155    /// Send a stream of events to all connected sinks.
156    ///
157    /// This function will send events until the provided stream finishes. It will also block on the
158    /// resolution of any pending reload before proceeding with a send operation, similar to `send`.
159    ///
160    /// # Panics
161    ///
162    /// This method can panic if the fanout receives a control message that violates some invariant
163    /// about its current state (e.g. remove a nonexistent sink, etc.). This would imply a bug in
164    /// Vector's config reloading logic.
165    ///
166    /// # Errors
167    ///
168    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
169    /// returned detailing the cause.
170    pub async fn send_stream(
171        &mut self,
172        events: impl Stream<Item = (EventArray, Instant)>,
173    ) -> crate::Result<()> {
174        tokio::pin!(events);
175        while let Some((event_array, send_reference)) = events.next().await {
176            self.send(event_array, Some(send_reference)).await?;
177        }
178        Ok(())
179    }
180
181    /// Send a batch of events to all connected sinks.
182    ///
183    /// This will block on the resolution of any pending reload before proceeding with the send
184    /// operation.
185    ///
186    /// # Panics
187    ///
188    /// This method can panic if the fanout receives a control message that violates some invariant
189    /// about its current state (e.g. remove a nonexistent sink, etc). This would imply a bug in
190    /// Vector's config reloading logic.
191    ///
192    /// # Errors
193    ///
194    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
195    /// returned detailing the cause.
196    pub async fn send(
197        &mut self,
198        events: EventArray,
199        send_reference: Option<Instant>,
200    ) -> crate::Result<()> {
201        // First, process any available control messages in a non-blocking fashion.
202        while let Ok(message) = self.control_channel.try_recv() {
203            self.apply_control_message(message);
204        }
205
206        // Wait for any senders that are paused to be replaced first before continuing with the send.
207        self.wait_for_replacements().await;
208
209        // Nothing to send if we have no sender.
210        if self.senders.is_empty() {
211            trace!("No senders present.");
212            return Ok(());
213        }
214
215        // Keep track of whether the control channel has returned `Ready(None)`, and stop polling
216        // it once it has. If we don't do this check, it will continue to return `Ready(None)` any
217        // time it is polled, which can lead to a busy loop below.
218        //
219        // In real life this is likely a non-issue, but it can lead to strange behavior in tests if
220        // left unhandled.
221        let mut control_channel_open = true;
222
223        // Create our send group which arms all senders to send the given events, and handles
224        // adding/removing/replacing senders while the send is in-flight.
225        let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
226
227        loop {
228            tokio::select! {
229                // Semantically, it's not hugely important that this select is biased. It does,
230                // however, make testing simpler when you can count on control messages being
231                // processed first.
232                biased;
233
234                maybe_msg = self.control_channel.recv(), if control_channel_open => {
235                    trace!("Processing control message inside of send: {:?}", maybe_msg);
236
237                    // During a send operation, control messages must be applied via the
238                    // `SendGroup`, since it has exclusive access to the senders.
239                    match maybe_msg {
240                        Some(ControlMessage::Add(id, sink)) => {
241                            send_group.add(id, sink);
242                        },
243                        Some(ControlMessage::Remove(id)) => {
244                            send_group.remove(&id);
245                        },
246                        Some(ControlMessage::Pause(id)) => {
247                            send_group.pause(&id);
248                        },
249                        Some(ControlMessage::Replace(id, sink)) => {
250                            send_group.replace(&id, Sender::new(sink));
251                        },
252                        None => {
253                            // Control channel is closed, which means Vector is shutting down.
254                            control_channel_open = false;
255                        }
256                    }
257                }
258
259                result = send_group.send() => match result {
260                    Ok(()) => {
261                        trace!("Sent item to fanout.");
262                        break;
263                    },
264                    Err(e) => return Err(e),
265                }
266            }
267        }
268
269        Ok(())
270    }
271}
272
273struct SendGroup<'a> {
274    senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
275    sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
276}
277
278impl<'a> SendGroup<'a> {
279    fn new(
280        senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
281        events: EventArray,
282        send_reference: Option<Instant>,
283    ) -> Self {
284        // If we don't have a valid `Sender` for all sinks, then something went wrong in our logic
285        // to ensure we were starting with all valid/idle senders prior to initiating the send.
286        debug_assert!(senders.values().all(Option::is_some));
287
288        let last_sender_idx = senders.len().saturating_sub(1);
289        let mut events = Some(events);
290
291        // We generate a send future for each sender we have, which arms them with the events to
292        // send but also takes ownership of the sender itself, which we give back when the sender completes.
293        let mut sends = HashMap::new();
294        for (i, (key, sender)) in senders.iter_mut().enumerate() {
295            let mut sender = sender
296                .take()
297                .expect("sender must be present to initialize SendGroup");
298
299            // First, arm each sender with the item to actually send.
300            if i == last_sender_idx {
301                sender.input = events.take();
302            } else {
303                sender.input.clone_from(&events);
304            }
305            sender.send_reference = send_reference;
306
307            // Now generate a send for that sender which we'll drive to completion.
308            let send = async move {
309                sender.flush().await?;
310                Ok(sender)
311            };
312
313            sends.insert(key.clone(), ReusableBoxFuture::new(send));
314        }
315
316        Self { senders, sends }
317    }
318
319    fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
320        if let Some(send) = self.sends.remove(id) {
321            tokio::spawn(async move {
322                if let Err(e) = send.await {
323                    warn!(
324                        cause = %e,
325                        message = "Encountered error writing to component after detaching from topology.",
326                    );
327                }
328            });
329            true
330        } else {
331            false
332        }
333    }
334
335    #[allow(clippy::needless_pass_by_value)]
336    fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
337        // When we're in the middle of a send, we can only keep track of the new sink, but can't
338        // actually send to it, as we don't have the item to send... so only add it to `senders`.
339        assert!(
340            self.senders
341                .insert(id.clone(), Some(Sender::new(sink)))
342                .is_none(),
343            "Adding duplicate output id to fanout: {id}"
344        );
345    }
346
347    fn remove(&mut self, id: &ComponentKey) {
348        // We may or may not be removing a sender that we're try to drive a send against, so we have
349        // to also detach the send future for the sender if it exists, otherwise we'd be hanging
350        // around still trying to send to it.
351        assert!(
352            self.senders.shift_remove(id).is_some(),
353            "Removing nonexistent sink from fanout: {id}"
354        );
355
356        // Now try and detach the in-flight send, if it exists.
357        //
358        // We don't ensure that a send was or wasn't detached because this could be called either
359        // during an in-flight send _or_ after the send has completed.
360        self.try_detach_send(id);
361    }
362
363    fn replace(&mut self, id: &ComponentKey, sink: Sender) {
364        match self.senders.get_mut(id) {
365            Some(sender) => {
366                // While a sink must be _known_ to be replaced, it must also be empty (previously
367                // paused or consumed when the `SendGroup` was created), otherwise an invalid
368                // sequence of control operations has been applied.
369                assert!(
370                    sender.replace(sink).is_none(),
371                    "Replacing existing sink is not valid: {id}"
372                );
373            }
374            None => panic!("Replacing unknown sink from fanout: {id}"),
375        }
376    }
377
378    fn pause(&mut self, id: &ComponentKey) {
379        match self.senders.get_mut(id) {
380            Some(sender) => {
381                // If we don't currently own the `Sender` for the given component, that implies
382                // there is an in-flight send: a `SendGroup` cannot be created without all
383                // participating components having a send operation triggered.
384                //
385                // As such, `try_detach_send` should always succeed here, as pausing only occurs
386                // when a component is being _replaced_, and should not be called multiple times.
387                if sender.take().is_none() {
388                    assert!(
389                        self.try_detach_send(id),
390                        "Pausing already-paused sink is invalid: {id}"
391                    );
392                }
393            }
394            None => panic!("Pausing unknown sink from fanout: {id}"),
395        }
396    }
397
398    async fn send(&mut self) -> crate::Result<()> {
399        // Right now, we do a linear scan of all sends, polling each send once in order to avoid
400        // waiting forever, such that we can let our control messages get picked up while sends are
401        // waiting.
402        loop {
403            if self.sends.is_empty() {
404                break;
405            }
406
407            let mut done = Vec::new();
408            for (key, send) in &mut self.sends {
409                if let Poll::Ready(result) = poll!(send.get_pin()) {
410                    let sender = result?;
411
412                    // The send completed, so we restore the sender and mark ourselves so that this
413                    // future gets dropped.
414                    done.push((key.clone(), sender));
415                }
416            }
417
418            for (key, sender) in done {
419                self.sends.remove(&key);
420                self.replace(&key, sender);
421            }
422
423            if !self.sends.is_empty() {
424                // We manually yield ourselves because we've polled all of the sends at this point,
425                // so if any are left, then we're scheduled for a wake-up... this is a really poor
426                // approximation of what `FuturesUnordered` is doing.
427                pending!();
428            }
429        }
430
431        Ok(())
432    }
433}
434
435struct Sender {
436    inner: BufferSender<EventArray>,
437    input: Option<EventArray>,
438    send_reference: Option<Instant>,
439}
440
441impl Sender {
442    fn new(inner: BufferSender<EventArray>) -> Self {
443        Self {
444            inner,
445            input: None,
446            send_reference: None,
447        }
448    }
449
450    async fn flush(&mut self) -> crate::Result<()> {
451        let send_reference = self.send_reference.take();
452        if let Some(input) = self.input.take() {
453            self.inner.send(input, send_reference).await?;
454            self.inner.flush().await?;
455        }
456
457        Ok(())
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use std::mem;
464    use std::num::NonZeroUsize;
465
466    use futures::poll;
467    use tokio::sync::mpsc::UnboundedSender;
468    use tokio_test::{assert_pending, assert_ready, task::spawn};
469    use tracing::Span;
470    use vector_buffers::{
471        topology::{
472            builder::TopologyBuilder,
473            channel::{BufferReceiver, BufferSender},
474        },
475        WhenFull,
476    };
477    use vrl::value::Value;
478
479    use super::{ControlMessage, Fanout};
480    use crate::event::{Event, EventArray, LogEvent};
481    use crate::test_util::{collect_ready, collect_ready_events};
482    use crate::{config::ComponentKey, event::EventContainer};
483
484    async fn build_sender_pair(
485        capacity: usize,
486    ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
487        TopologyBuilder::standalone_memory(
488            NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
489            WhenFull::Block,
490            &Span::current(),
491        )
492        .await
493    }
494
495    async fn build_sender_pairs(
496        capacities: &[usize],
497    ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
498        let mut pairs = Vec::new();
499        for capacity in capacities {
500            pairs.push(build_sender_pair(*capacity).await);
501        }
502        pairs
503    }
504
505    async fn fanout_from_senders(
506        capacities: &[usize],
507    ) -> (
508        Fanout,
509        UnboundedSender<ControlMessage>,
510        Vec<BufferReceiver<EventArray>>,
511    ) {
512        let (mut fanout, control) = Fanout::new();
513        let pairs = build_sender_pairs(capacities).await;
514
515        let mut receivers = Vec::new();
516        for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
517            fanout.add(ComponentKey::from(i.to_string()), sender);
518            receivers.push(receiver);
519        }
520
521        (fanout, control, receivers)
522    }
523
524    async fn add_sender_to_fanout(
525        fanout: &mut Fanout,
526        receivers: &mut Vec<BufferReceiver<EventArray>>,
527        sender_id: usize,
528        capacity: usize,
529    ) {
530        let (sender, receiver) = build_sender_pair(capacity).await;
531        receivers.push(receiver);
532
533        fanout.add(ComponentKey::from(sender_id.to_string()), sender);
534    }
535
536    fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
537        control
538            .send(ControlMessage::Remove(ComponentKey::from(
539                sender_id.to_string(),
540            )))
541            .expect("sending control message should not fail");
542    }
543
544    async fn replace_sender_in_fanout(
545        control: &UnboundedSender<ControlMessage>,
546        receivers: &mut [BufferReceiver<EventArray>],
547        sender_id: usize,
548        capacity: usize,
549    ) -> BufferReceiver<EventArray> {
550        let (sender, receiver) = build_sender_pair(capacity).await;
551        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
552
553        control
554            .send(ControlMessage::Pause(ComponentKey::from(
555                sender_id.to_string(),
556            )))
557            .expect("sending control message should not fail");
558
559        control
560            .send(ControlMessage::Replace(
561                ComponentKey::from(sender_id.to_string()),
562                sender,
563            ))
564            .expect("sending control message should not fail");
565
566        old_receiver
567    }
568
569    async fn start_sender_replace(
570        control: &UnboundedSender<ControlMessage>,
571        receivers: &mut [BufferReceiver<EventArray>],
572        sender_id: usize,
573        capacity: usize,
574    ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
575        let (sender, receiver) = build_sender_pair(capacity).await;
576        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
577
578        control
579            .send(ControlMessage::Pause(ComponentKey::from(
580                sender_id.to_string(),
581            )))
582            .expect("sending control message should not fail");
583
584        (old_receiver, sender)
585    }
586
587    fn finish_sender_resume(
588        control: &UnboundedSender<ControlMessage>,
589        sender_id: usize,
590        sender: BufferSender<EventArray>,
591    ) {
592        control
593            .send(ControlMessage::Replace(
594                ComponentKey::from(sender_id.to_string()),
595                sender,
596            ))
597            .expect("sending control message should not fail");
598    }
599
600    fn unwrap_log_event_message<E>(event: E) -> String
601    where
602        E: EventContainer,
603    {
604        let event = event
605            .into_events()
606            .next()
607            .expect("must have at least one event");
608        let event = event.into_log();
609        event
610            .get("message")
611            .and_then(Value::as_bytes)
612            .and_then(|b| String::from_utf8(b.to_vec()).ok())
613            .expect("must be valid log event with `message` field")
614    }
615
616    #[tokio::test]
617    async fn fanout_writes_to_all() {
618        let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]).await;
619        let events = make_event_array(2);
620
621        let clones = events.clone();
622        fanout.send(clones, None).await.expect("should not fail");
623
624        for receiver in receivers {
625            assert_eq!(collect_ready(receiver.into_stream()), &[events.clone()]);
626        }
627    }
628
629    #[tokio::test]
630    async fn fanout_notready() {
631        let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]).await;
632        let events = make_events(2);
633
634        // First send should immediately complete because all senders have capacity:
635        let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
636        assert_ready!(first_send.poll()).expect("should not fail");
637        drop(first_send);
638
639        // Second send should return pending because sender B is now full:
640        let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
641        assert_pending!(second_send.poll());
642
643        // Now read an item from each receiver to free up capacity for the second sender:
644        for receiver in &mut receivers {
645            assert_eq!(Some(events[0].clone().into()), receiver.next().await);
646        }
647
648        // Now our second send should actually be able to complete:
649        assert_ready!(second_send.poll()).expect("should not fail");
650        drop(second_send);
651
652        // And make sure the second item comes through:
653        for receiver in &mut receivers {
654            assert_eq!(Some(events[1].clone().into()), receiver.next().await);
655        }
656    }
657
658    #[tokio::test]
659    async fn fanout_grow() {
660        let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]).await;
661        let events = make_events(3);
662
663        // Send in the first two events to our initial two senders:
664        fanout
665            .send(events[0].clone().into(), None)
666            .await
667            .expect("should not fail");
668        fanout
669            .send(events[1].clone().into(), None)
670            .await
671            .expect("should not fail");
672
673        // Now add a third sender:
674        add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4).await;
675
676        // Send in the last event which all three senders will now get:
677        fanout
678            .send(events[2].clone().into(), None)
679            .await
680            .expect("should not fail");
681
682        // Make sure the first two senders got all three events, but the third sender only got the
683        // last event:
684        let expected_events = [&events, &events, &events[2..]];
685        for (i, receiver) in receivers.into_iter().enumerate() {
686            assert_eq!(
687                collect_ready_events(receiver.into_stream()),
688                expected_events[i]
689            );
690        }
691    }
692
693    #[tokio::test]
694    async fn fanout_shrink() {
695        let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]).await;
696        let events = make_events(3);
697
698        // Send in the first two events to our initial two senders:
699        fanout
700            .send(events[0].clone().into(), None)
701            .await
702            .expect("should not fail");
703        fanout
704            .send(events[1].clone().into(), None)
705            .await
706            .expect("should not fail");
707
708        // Now remove the second sender:
709        remove_sender_from_fanout(&control, 1);
710
711        // Send in the last event which only the first sender will get:
712        fanout
713            .send(events[2].clone().into(), None)
714            .await
715            .expect("should not fail");
716
717        // Make sure the first sender got all three events, but the second sender only got the first two:
718        let expected_events = [&events, &events[..2]];
719        for (i, receiver) in receivers.into_iter().enumerate() {
720            assert_eq!(
721                collect_ready_events(receiver.into_stream()),
722                expected_events[i]
723            );
724        }
725    }
726
727    #[tokio::test]
728    async fn fanout_shrink_when_notready() {
729        // This test exercises that when we're waiting for a send to complete, we can correctly
730        // remove a sink whether or not it is the one that the send operation is still waiting on.
731        //
732        // This means that if we remove a sink that a current send is blocked on, we should be able
733        // to immediately proceed.
734        let events = make_events(2);
735        let expected_first_event = unwrap_log_event_message(events[0].clone());
736        let expected_second_event = unwrap_log_event_message(events[1].clone());
737
738        let cases = [
739            // Sender ID to drop, whether the second send should succeed after dropping, and the
740            // final "last event" a receiver should see after the second send:
741            (
742                0,
743                false,
744                [
745                    expected_second_event.clone(),
746                    expected_first_event.clone(),
747                    expected_second_event.clone(),
748                ],
749            ),
750            (
751                1,
752                true,
753                [
754                    expected_second_event.clone(),
755                    expected_second_event.clone(),
756                    expected_second_event.clone(),
757                ],
758            ),
759            (
760                2,
761                false,
762                [
763                    expected_second_event.clone(),
764                    expected_first_event.clone(),
765                    expected_second_event.clone(),
766                ],
767            ),
768        ];
769
770        for (sender_id, should_complete, expected_last_seen) in cases {
771            let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]).await;
772
773            // First send should immediately complete because all senders have capacity:
774            let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
775            assert_ready!(first_send.poll()).expect("should not fail");
776            drop(first_send);
777
778            // Second send should return pending because sender B is now full:
779            let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
780            assert_pending!(second_send.poll());
781
782            // Now drop our chosen sender and assert that polling the second send behaves as expected:
783            remove_sender_from_fanout(&control, sender_id);
784
785            if should_complete {
786                assert_ready!(second_send.poll()).expect("should not fail");
787            } else {
788                assert_pending!(second_send.poll());
789            }
790            drop(second_send);
791
792            // Now grab the last value available to each receiver and assert it's the second event.
793            drop(fanout);
794
795            let mut last_seen = Vec::new();
796            for receiver in &mut receivers {
797                let mut events = Vec::new();
798                while let Some(event) = receiver.next().await {
799                    events.insert(0, event);
800                }
801
802                last_seen.push(unwrap_log_event_message(events.remove(0)));
803            }
804
805            assert_eq!(&expected_last_seen[..], &last_seen);
806        }
807    }
808
809    #[tokio::test]
810    async fn fanout_no_sinks() {
811        let (mut fanout, _) = Fanout::new();
812        let events = make_events(2);
813
814        fanout
815            .send(events[0].clone().into(), None)
816            .await
817            .expect("should not fail");
818        fanout
819            .send(events[1].clone().into(), None)
820            .await
821            .expect("should not fail");
822    }
823
824    #[tokio::test]
825    async fn fanout_replace() {
826        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]).await;
827        let events = make_events(3);
828
829        // First two sends should immediately complete because all senders have capacity:
830        fanout
831            .send(events[0].clone().into(), None)
832            .await
833            .expect("should not fail");
834        fanout
835            .send(events[1].clone().into(), None)
836            .await
837            .expect("should not fail");
838
839        // Replace the first sender with a brand new one before polling again:
840        let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4).await;
841
842        // And do the third send which should also complete since all senders still have capacity:
843        fanout
844            .send(events[2].clone().into(), None)
845            .await
846            .expect("should not fail");
847
848        // Now make sure that the new "first" sender only got the third event, but that the second and
849        // third sender got all three events:
850        let expected_events = [&events[2..], &events, &events];
851        for (i, receiver) in receivers.into_iter().enumerate() {
852            assert_eq!(
853                collect_ready_events(receiver.into_stream()),
854                expected_events[i]
855            );
856        }
857
858        // And make sure our original "first" sender got the first two events:
859        assert_eq!(
860            collect_ready_events(old_first_receiver.into_stream()),
861            &events[..2]
862        );
863    }
864
865    #[tokio::test]
866    async fn fanout_wait() {
867        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]).await;
868        let events = make_events(3);
869
870        // First two sends should immediately complete because all senders have capacity:
871        let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
872        assert_ready!(poll!(send1)).expect("should not fail");
873        let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
874        assert_ready!(poll!(send2)).expect("should not fail");
875
876        // Now do an empty replace on the second sender, which we'll test to make sure that `Fanout`
877        // doesn't let any writes through until we replace it properly.  We get back the receiver
878        // we've replaced, but also the sender that we want to eventually install:
879        let (old_first_receiver, new_first_sender) =
880            start_sender_replace(&control, &mut receivers, 0, 4).await;
881
882        // Third send should return pending because now we have an in-flight replacement:
883        let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
884        assert_pending!(third_send.poll());
885
886        // Finish our sender replacement, which should wake up the third send and allow it to
887        // actually complete:
888        finish_sender_resume(&control, 0, new_first_sender);
889        assert!(third_send.is_woken());
890        assert_ready!(third_send.poll()).expect("should not fail");
891
892        // Make sure the original first sender got the first two events, the new first sender got
893        // the last event, and the second sender got all three:
894        assert_eq!(
895            collect_ready_events(old_first_receiver.into_stream()),
896            &events[0..2]
897        );
898
899        let expected_events = [&events[2..], &events];
900        for (i, receiver) in receivers.into_iter().enumerate() {
901            assert_eq!(
902                collect_ready_events(receiver.into_stream()),
903                expected_events[i]
904            );
905        }
906    }
907
908    fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
909        (0..count).map(|i| LogEvent::from(format!("line {i}")))
910    }
911
912    fn make_events(count: usize) -> Vec<Event> {
913        make_events_inner(count).map(Into::into).collect()
914    }
915
916    fn make_event_array(count: usize) -> EventArray {
917        make_events_inner(count).collect::<Vec<_>>().into()
918    }
919}