vector_core/
fanout.rs

1use std::{collections::HashMap, fmt, task::Poll, time::Instant};
2
3use futures::{Stream, StreamExt};
4use futures_util::{pending, poll};
5use indexmap::IndexMap;
6use tokio::sync::mpsc;
7use tokio_util::sync::ReusableBoxFuture;
8use vector_buffers::topology::channel::BufferSender;
9
10use crate::{config::ComponentKey, event::EventArray};
11
12pub enum ControlMessage {
13    /// Adds a new sink to the fanout.
14    Add(ComponentKey, BufferSender<EventArray>),
15
16    /// Removes a sink from the fanout.
17    Remove(ComponentKey),
18
19    /// Pauses a sink in the fanout.
20    ///
21    /// If a fanout has any paused sinks, subsequent sends cannot proceed until all paused sinks
22    /// have been replaced.
23    Pause(ComponentKey),
24
25    /// Replaces a paused sink with its new sender.
26    Replace(ComponentKey, BufferSender<EventArray>),
27}
28
29impl fmt::Debug for ControlMessage {
30    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
31        write!(f, "ControlMessage::")?;
32        match self {
33            Self::Add(id, _) => write!(f, "Add({id:?})"),
34            Self::Remove(id) => write!(f, "Remove({id:?})"),
35            Self::Pause(id) => write!(f, "Pause({id:?})"),
36            Self::Replace(id, _) => write!(f, "Replace({id:?})"),
37        }
38    }
39}
40
41// TODO: We should really wrap this in a custom type that has dedicated methods for each operation
42// so that high-lever components don't need to do the raw channel sends, etc.
43pub type ControlChannel = mpsc::UnboundedSender<ControlMessage>;
44
45pub struct Fanout {
46    senders: IndexMap<ComponentKey, Option<Sender>>,
47    control_channel: mpsc::UnboundedReceiver<ControlMessage>,
48}
49
50impl Fanout {
51    pub fn new() -> (Self, ControlChannel) {
52        let (control_tx, control_rx) = mpsc::unbounded_channel();
53
54        let fanout = Self {
55            senders: Default::default(),
56            control_channel: control_rx,
57        };
58
59        (fanout, control_tx)
60    }
61
62    /// Add a new sink as an output.
63    ///
64    /// # Panics
65    ///
66    /// Function will panic if a sink with the same ID is already present.
67    pub fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
68        assert!(
69            !self.senders.contains_key(&id),
70            "Adding duplicate output id to fanout: {id}"
71        );
72        self.senders.insert(id, Some(Sender::new(sink)));
73    }
74
75    fn remove(&mut self, id: &ComponentKey) {
76        assert!(
77            self.senders.shift_remove(id).is_some(),
78            "Removing nonexistent sink from fanout: {id}"
79        );
80    }
81
82    fn replace(&mut self, id: &ComponentKey, sink: BufferSender<EventArray>) {
83        match self.senders.get_mut(id) {
84            Some(sender) => {
85                // While a sink must be _known_ to be replaced, it must also be empty (previously
86                // paused or consumed when the `SendGroup` was created), otherwise an invalid
87                // sequence of control operations has been applied.
88                assert!(
89                    sender.replace(Sender::new(sink)).is_none(),
90                    "Replacing existing sink is not valid: {id}"
91                );
92            }
93            None => panic!("Replacing unknown sink from fanout: {id}"),
94        }
95    }
96
97    fn pause(&mut self, id: &ComponentKey) {
98        match self.senders.get_mut(id) {
99            Some(sender) => {
100                // A sink must be known and present to be replaced, otherwise an invalid sequence of
101                // control operations has been applied.
102                assert!(
103                    sender.take().is_some(),
104                    "Pausing nonexistent sink is not valid: {id}"
105                );
106            }
107            None => panic!("Pausing unknown sink from fanout: {id}"),
108        }
109    }
110
111    /// Apply a control message directly against this instance.
112    ///
113    /// This method should not be used if there is an active `SendGroup` being processed.
114    fn apply_control_message(&mut self, message: ControlMessage) {
115        trace!("Processing control message outside of send: {:?}", message);
116
117        match message {
118            ControlMessage::Add(id, sink) => self.add(id, sink),
119            ControlMessage::Remove(id) => self.remove(&id),
120            ControlMessage::Pause(id) => self.pause(&id),
121            ControlMessage::Replace(id, sink) => self.replace(&id, sink),
122        }
123    }
124
125    /// Waits for all paused sinks to be replaced.
126    ///
127    /// Control messages are processed until all senders have been replaced, so it is guaranteed
128    /// that when this method returns, all senders are ready for the next send to be triggered.
129    async fn wait_for_replacements(&mut self) {
130        while self.senders.values().any(Option::is_none) {
131            if let Some(msg) = self.control_channel.recv().await {
132                self.apply_control_message(msg);
133            } else {
134                // If the control channel is closed, there's nothing else we can do.
135
136                // TODO: It _seems_ like we should probably panic here, or at least return.
137                //
138                // Essentially, we should only land here if the control channel is closed but we
139                // haven't yet replaced all of the paused sinks... and we shouldn't have any paused
140                // sinks if Vector is stopping normally/gracefully, so like... we'd only get
141                // here during a configuration reload where we panicked in another thread due to
142                // an error of some sort, and the control channel got dropped, closed itself, and
143                // we're never going to be able to recover.
144                //
145                // The flipside is that by leaving it as-is, in the above hypothesized scenario,
146                // we'd avoid emitting additional panics/error logging when the root cause error was
147                // already doing so, like there's little value in knowing the fanout also hit an
148                // unrecoverable state if the whole process is about to come crashing down
149                // anyways... but it still does feel weird to have that encoded here by virtue of
150                // only a comment, and not an actual terminating expression. *shrug*
151            }
152        }
153    }
154
155    /// Send a stream of events to all connected sinks.
156    ///
157    /// This function will send events until the provided stream finishes. It will also block on the
158    /// resolution of any pending reload before proceeding with a send operation, similar to `send`.
159    ///
160    /// # Panics
161    ///
162    /// This method can panic if the fanout receives a control message that violates some invariant
163    /// about its current state (e.g. remove a nonexistent sink, etc.). This would imply a bug in
164    /// Vector's config reloading logic.
165    ///
166    /// # Errors
167    ///
168    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
169    /// returned detailing the cause.
170    pub async fn send_stream(
171        &mut self,
172        events: impl Stream<Item = (EventArray, Instant)>,
173    ) -> crate::Result<()> {
174        tokio::pin!(events);
175        while let Some((event_array, send_reference)) = events.next().await {
176            self.send(event_array, Some(send_reference)).await?;
177        }
178        Ok(())
179    }
180
181    /// Send a batch of events to all connected sinks.
182    ///
183    /// This will block on the resolution of any pending reload before proceeding with the send
184    /// operation.
185    ///
186    /// # Panics
187    ///
188    /// This method can panic if the fanout receives a control message that violates some invariant
189    /// about its current state (e.g. remove a nonexistent sink, etc). This would imply a bug in
190    /// Vector's config reloading logic.
191    ///
192    /// # Errors
193    ///
194    /// If an error occurs while sending events to any of the connected sinks, an error variant will be
195    /// returned detailing the cause.
196    pub async fn send(
197        &mut self,
198        events: EventArray,
199        send_reference: Option<Instant>,
200    ) -> crate::Result<()> {
201        // First, process any available control messages in a non-blocking fashion.
202        while let Ok(message) = self.control_channel.try_recv() {
203            self.apply_control_message(message);
204        }
205
206        // Wait for any senders that are paused to be replaced first before continuing with the send.
207        self.wait_for_replacements().await;
208
209        // Nothing to send if we have no sender.
210        if self.senders.is_empty() {
211            trace!("No senders present.");
212            return Ok(());
213        }
214
215        // Keep track of whether the control channel has returned `Ready(None)`, and stop polling
216        // it once it has. If we don't do this check, it will continue to return `Ready(None)` any
217        // time it is polled, which can lead to a busy loop below.
218        //
219        // In real life this is likely a non-issue, but it can lead to strange behavior in tests if
220        // left unhandled.
221        let mut control_channel_open = true;
222
223        // Create our send group which arms all senders to send the given events, and handles
224        // adding/removing/replacing senders while the send is in-flight.
225        let mut send_group = SendGroup::new(&mut self.senders, events, send_reference);
226
227        loop {
228            tokio::select! {
229                // Semantically, it's not hugely important that this select is biased. It does,
230                // however, make testing simpler when you can count on control messages being
231                // processed first.
232                biased;
233
234                maybe_msg = self.control_channel.recv(), if control_channel_open => {
235                    trace!("Processing control message inside of send: {:?}", maybe_msg);
236
237                    // During a send operation, control messages must be applied via the
238                    // `SendGroup`, since it has exclusive access to the senders.
239                    match maybe_msg {
240                        Some(ControlMessage::Add(id, sink)) => {
241                            send_group.add(id, sink);
242                        },
243                        Some(ControlMessage::Remove(id)) => {
244                            send_group.remove(&id);
245                        },
246                        Some(ControlMessage::Pause(id)) => {
247                            send_group.pause(&id);
248                        },
249                        Some(ControlMessage::Replace(id, sink)) => {
250                            send_group.replace(&id, Sender::new(sink));
251                        },
252                        None => {
253                            // Control channel is closed, which means Vector is shutting down.
254                            control_channel_open = false;
255                        }
256                    }
257                }
258
259                result = send_group.send() => match result {
260                    Ok(()) => {
261                        trace!("Sent item to fanout.");
262                        break;
263                    },
264                    Err(e) => return Err(e),
265                }
266            }
267        }
268
269        Ok(())
270    }
271}
272
273struct SendGroup<'a> {
274    senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
275    sends: HashMap<ComponentKey, ReusableBoxFuture<'static, crate::Result<Sender>>>,
276}
277
278impl<'a> SendGroup<'a> {
279    fn new(
280        senders: &'a mut IndexMap<ComponentKey, Option<Sender>>,
281        events: EventArray,
282        send_reference: Option<Instant>,
283    ) -> Self {
284        // If we don't have a valid `Sender` for all sinks, then something went wrong in our logic
285        // to ensure we were starting with all valid/idle senders prior to initiating the send.
286        debug_assert!(senders.values().all(Option::is_some));
287
288        let last_sender_idx = senders.len().saturating_sub(1);
289        let mut events = Some(events);
290
291        // We generate a send future for each sender we have, which arms them with the events to
292        // send but also takes ownership of the sender itself, which we give back when the sender completes.
293        let mut sends = HashMap::new();
294        for (i, (key, sender)) in senders.iter_mut().enumerate() {
295            let mut sender = sender
296                .take()
297                .expect("sender must be present to initialize SendGroup");
298
299            // First, arm each sender with the item to actually send.
300            if i == last_sender_idx {
301                sender.input = events.take();
302            } else {
303                sender.input.clone_from(&events);
304            }
305            sender.send_reference = send_reference;
306
307            // Now generate a send for that sender which we'll drive to completion.
308            let send = async move {
309                sender.flush().await?;
310                Ok(sender)
311            };
312
313            sends.insert(key.clone(), ReusableBoxFuture::new(send));
314        }
315
316        Self { senders, sends }
317    }
318
319    fn try_detach_send(&mut self, id: &ComponentKey) -> bool {
320        if let Some(send) = self.sends.remove(id) {
321            tokio::spawn(async move {
322                if let Err(e) = send.await {
323                    warn!(
324                        cause = %e,
325                        message = "Encountered error writing to component after detaching from topology.",
326                    );
327                }
328            });
329            true
330        } else {
331            false
332        }
333    }
334
335    #[allow(clippy::needless_pass_by_value)]
336    fn add(&mut self, id: ComponentKey, sink: BufferSender<EventArray>) {
337        // When we're in the middle of a send, we can only keep track of the new sink, but can't
338        // actually send to it, as we don't have the item to send... so only add it to `senders`.
339        assert!(
340            self.senders
341                .insert(id.clone(), Some(Sender::new(sink)))
342                .is_none(),
343            "Adding duplicate output id to fanout: {id}"
344        );
345    }
346
347    fn remove(&mut self, id: &ComponentKey) {
348        // We may or may not be removing a sender that we're try to drive a send against, so we have
349        // to also detach the send future for the sender if it exists, otherwise we'd be hanging
350        // around still trying to send to it.
351        assert!(
352            self.senders.shift_remove(id).is_some(),
353            "Removing nonexistent sink from fanout: {id}"
354        );
355
356        // Now try and detach the in-flight send, if it exists.
357        //
358        // We don't ensure that a send was or wasn't detached because this could be called either
359        // during an in-flight send _or_ after the send has completed.
360        self.try_detach_send(id);
361    }
362
363    fn replace(&mut self, id: &ComponentKey, sink: Sender) {
364        match self.senders.get_mut(id) {
365            Some(sender) => {
366                // While a sink must be _known_ to be replaced, it must also be empty (previously
367                // paused or consumed when the `SendGroup` was created), otherwise an invalid
368                // sequence of control operations has been applied.
369                assert!(
370                    sender.replace(sink).is_none(),
371                    "Replacing existing sink is not valid: {id}"
372                );
373            }
374            None => panic!("Replacing unknown sink from fanout: {id}"),
375        }
376    }
377
378    fn pause(&mut self, id: &ComponentKey) {
379        match self.senders.get_mut(id) {
380            Some(sender) => {
381                // If we don't currently own the `Sender` for the given component, that implies
382                // there is an in-flight send: a `SendGroup` cannot be created without all
383                // participating components having a send operation triggered.
384                //
385                // As such, `try_detach_send` should always succeed here, as pausing only occurs
386                // when a component is being _replaced_, and should not be called multiple times.
387                if sender.take().is_none() {
388                    assert!(
389                        self.try_detach_send(id),
390                        "Pausing already-paused sink is invalid: {id}"
391                    );
392                }
393            }
394            None => panic!("Pausing unknown sink from fanout: {id}"),
395        }
396    }
397
398    async fn send(&mut self) -> crate::Result<()> {
399        // Right now, we do a linear scan of all sends, polling each send once in order to avoid
400        // waiting forever, such that we can let our control messages get picked up while sends are
401        // waiting.
402        loop {
403            if self.sends.is_empty() {
404                break;
405            }
406
407            let mut done = Vec::new();
408            for (key, send) in &mut self.sends {
409                if let Poll::Ready(result) = poll!(send.get_pin()) {
410                    let sender = result?;
411
412                    // The send completed, so we restore the sender and mark ourselves so that this
413                    // future gets dropped.
414                    done.push((key.clone(), sender));
415                }
416            }
417
418            for (key, sender) in done {
419                self.sends.remove(&key);
420                self.replace(&key, sender);
421            }
422
423            if !self.sends.is_empty() {
424                // We manually yield ourselves because we've polled all of the sends at this point,
425                // so if any are left, then we're scheduled for a wake-up... this is a really poor
426                // approximation of what `FuturesUnordered` is doing.
427                pending!();
428            }
429        }
430
431        Ok(())
432    }
433}
434
435struct Sender {
436    inner: BufferSender<EventArray>,
437    input: Option<EventArray>,
438    send_reference: Option<Instant>,
439}
440
441impl Sender {
442    fn new(inner: BufferSender<EventArray>) -> Self {
443        Self {
444            inner,
445            input: None,
446            send_reference: None,
447        }
448    }
449
450    async fn flush(&mut self) -> crate::Result<()> {
451        let send_reference = self.send_reference.take();
452        if let Some(input) = self.input.take() {
453            self.inner.send(input, send_reference).await?;
454            self.inner.flush().await?;
455        }
456
457        Ok(())
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use std::{mem, num::NonZeroUsize};
464
465    use futures::poll;
466    use tokio::sync::mpsc::UnboundedSender;
467    use tokio_test::{assert_pending, assert_ready, task::spawn};
468    use tracing::Span;
469    use vector_buffers::{
470        WhenFull,
471        topology::{
472            builder::TopologyBuilder,
473            channel::{BufferReceiver, BufferSender},
474        },
475    };
476    use vrl::value::Value;
477
478    use super::{ControlMessage, Fanout};
479    use crate::{
480        config::ComponentKey,
481        event::{Event, EventArray, EventContainer, LogEvent},
482        test_util::{collect_ready, collect_ready_events},
483    };
484
485    async fn build_sender_pair(
486        capacity: usize,
487    ) -> (BufferSender<EventArray>, BufferReceiver<EventArray>) {
488        TopologyBuilder::standalone_memory(
489            NonZeroUsize::new(capacity).expect("capacity must be nonzero"),
490            WhenFull::Block,
491            &Span::current(),
492        )
493        .await
494    }
495
496    async fn build_sender_pairs(
497        capacities: &[usize],
498    ) -> Vec<(BufferSender<EventArray>, BufferReceiver<EventArray>)> {
499        let mut pairs = Vec::new();
500        for capacity in capacities {
501            pairs.push(build_sender_pair(*capacity).await);
502        }
503        pairs
504    }
505
506    async fn fanout_from_senders(
507        capacities: &[usize],
508    ) -> (
509        Fanout,
510        UnboundedSender<ControlMessage>,
511        Vec<BufferReceiver<EventArray>>,
512    ) {
513        let (mut fanout, control) = Fanout::new();
514        let pairs = build_sender_pairs(capacities).await;
515
516        let mut receivers = Vec::new();
517        for (i, (sender, receiver)) in pairs.into_iter().enumerate() {
518            fanout.add(ComponentKey::from(i.to_string()), sender);
519            receivers.push(receiver);
520        }
521
522        (fanout, control, receivers)
523    }
524
525    async fn add_sender_to_fanout(
526        fanout: &mut Fanout,
527        receivers: &mut Vec<BufferReceiver<EventArray>>,
528        sender_id: usize,
529        capacity: usize,
530    ) {
531        let (sender, receiver) = build_sender_pair(capacity).await;
532        receivers.push(receiver);
533
534        fanout.add(ComponentKey::from(sender_id.to_string()), sender);
535    }
536
537    fn remove_sender_from_fanout(control: &UnboundedSender<ControlMessage>, sender_id: usize) {
538        control
539            .send(ControlMessage::Remove(ComponentKey::from(
540                sender_id.to_string(),
541            )))
542            .expect("sending control message should not fail");
543    }
544
545    async fn replace_sender_in_fanout(
546        control: &UnboundedSender<ControlMessage>,
547        receivers: &mut [BufferReceiver<EventArray>],
548        sender_id: usize,
549        capacity: usize,
550    ) -> BufferReceiver<EventArray> {
551        let (sender, receiver) = build_sender_pair(capacity).await;
552        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
553
554        control
555            .send(ControlMessage::Pause(ComponentKey::from(
556                sender_id.to_string(),
557            )))
558            .expect("sending control message should not fail");
559
560        control
561            .send(ControlMessage::Replace(
562                ComponentKey::from(sender_id.to_string()),
563                sender,
564            ))
565            .expect("sending control message should not fail");
566
567        old_receiver
568    }
569
570    async fn start_sender_replace(
571        control: &UnboundedSender<ControlMessage>,
572        receivers: &mut [BufferReceiver<EventArray>],
573        sender_id: usize,
574        capacity: usize,
575    ) -> (BufferReceiver<EventArray>, BufferSender<EventArray>) {
576        let (sender, receiver) = build_sender_pair(capacity).await;
577        let old_receiver = mem::replace(&mut receivers[sender_id], receiver);
578
579        control
580            .send(ControlMessage::Pause(ComponentKey::from(
581                sender_id.to_string(),
582            )))
583            .expect("sending control message should not fail");
584
585        (old_receiver, sender)
586    }
587
588    fn finish_sender_resume(
589        control: &UnboundedSender<ControlMessage>,
590        sender_id: usize,
591        sender: BufferSender<EventArray>,
592    ) {
593        control
594            .send(ControlMessage::Replace(
595                ComponentKey::from(sender_id.to_string()),
596                sender,
597            ))
598            .expect("sending control message should not fail");
599    }
600
601    fn unwrap_log_event_message<E>(event: E) -> String
602    where
603        E: EventContainer,
604    {
605        let event = event
606            .into_events()
607            .next()
608            .expect("must have at least one event");
609        let event = event.into_log();
610        event
611            .get("message")
612            .and_then(Value::as_bytes)
613            .and_then(|b| String::from_utf8(b.to_vec()).ok())
614            .expect("must be valid log event with `message` field")
615    }
616
617    #[tokio::test]
618    async fn fanout_writes_to_all() {
619        let (mut fanout, _, receivers) = fanout_from_senders(&[2, 2]).await;
620        let events = make_event_array(2);
621
622        let clones = events.clone();
623        fanout.send(clones, None).await.expect("should not fail");
624
625        for receiver in receivers {
626            assert_eq!(
627                collect_ready(receiver.into_stream()),
628                std::slice::from_ref(&events)
629            );
630        }
631    }
632
633    #[tokio::test]
634    async fn fanout_notready() {
635        let (mut fanout, _, mut receivers) = fanout_from_senders(&[2, 1, 2]).await;
636        let events = make_events(2);
637
638        // First send should immediately complete because all senders have capacity:
639        let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
640        assert_ready!(first_send.poll()).expect("should not fail");
641        drop(first_send);
642
643        // Second send should return pending because sender B is now full:
644        let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
645        assert_pending!(second_send.poll());
646
647        // Now read an item from each receiver to free up capacity for the second sender:
648        for receiver in &mut receivers {
649            assert_eq!(Some(events[0].clone().into()), receiver.next().await);
650        }
651
652        // Now our second send should actually be able to complete:
653        assert_ready!(second_send.poll()).expect("should not fail");
654        drop(second_send);
655
656        // And make sure the second item comes through:
657        for receiver in &mut receivers {
658            assert_eq!(Some(events[1].clone().into()), receiver.next().await);
659        }
660    }
661
662    #[tokio::test]
663    async fn fanout_grow() {
664        let (mut fanout, _, mut receivers) = fanout_from_senders(&[4, 4]).await;
665        let events = make_events(3);
666
667        // Send in the first two events to our initial two senders:
668        fanout
669            .send(events[0].clone().into(), None)
670            .await
671            .expect("should not fail");
672        fanout
673            .send(events[1].clone().into(), None)
674            .await
675            .expect("should not fail");
676
677        // Now add a third sender:
678        add_sender_to_fanout(&mut fanout, &mut receivers, 2, 4).await;
679
680        // Send in the last event which all three senders will now get:
681        fanout
682            .send(events[2].clone().into(), None)
683            .await
684            .expect("should not fail");
685
686        // Make sure the first two senders got all three events, but the third sender only got the
687        // last event:
688        let expected_events = [&events, &events, &events[2..]];
689        for (i, receiver) in receivers.into_iter().enumerate() {
690            assert_eq!(
691                collect_ready_events(receiver.into_stream()),
692                expected_events[i]
693            );
694        }
695    }
696
697    #[tokio::test]
698    async fn fanout_shrink() {
699        let (mut fanout, control, receivers) = fanout_from_senders(&[4, 4]).await;
700        let events = make_events(3);
701
702        // Send in the first two events to our initial two senders:
703        fanout
704            .send(events[0].clone().into(), None)
705            .await
706            .expect("should not fail");
707        fanout
708            .send(events[1].clone().into(), None)
709            .await
710            .expect("should not fail");
711
712        // Now remove the second sender:
713        remove_sender_from_fanout(&control, 1);
714
715        // Send in the last event which only the first sender will get:
716        fanout
717            .send(events[2].clone().into(), None)
718            .await
719            .expect("should not fail");
720
721        // Make sure the first sender got all three events, but the second sender only got the first two:
722        let expected_events = [&events, &events[..2]];
723        for (i, receiver) in receivers.into_iter().enumerate() {
724            assert_eq!(
725                collect_ready_events(receiver.into_stream()),
726                expected_events[i]
727            );
728        }
729    }
730
731    #[tokio::test]
732    async fn fanout_shrink_when_notready() {
733        // This test exercises that when we're waiting for a send to complete, we can correctly
734        // remove a sink whether or not it is the one that the send operation is still waiting on.
735        //
736        // This means that if we remove a sink that a current send is blocked on, we should be able
737        // to immediately proceed.
738        let events = make_events(2);
739        let expected_first_event = unwrap_log_event_message(events[0].clone());
740        let expected_second_event = unwrap_log_event_message(events[1].clone());
741
742        let cases = [
743            // Sender ID to drop, whether the second send should succeed after dropping, and the
744            // final "last event" a receiver should see after the second send:
745            (
746                0,
747                false,
748                [
749                    expected_second_event.clone(),
750                    expected_first_event.clone(),
751                    expected_second_event.clone(),
752                ],
753            ),
754            (
755                1,
756                true,
757                [
758                    expected_second_event.clone(),
759                    expected_second_event.clone(),
760                    expected_second_event.clone(),
761                ],
762            ),
763            (
764                2,
765                false,
766                [
767                    expected_second_event.clone(),
768                    expected_first_event.clone(),
769                    expected_second_event.clone(),
770                ],
771            ),
772        ];
773
774        for (sender_id, should_complete, expected_last_seen) in cases {
775            let (mut fanout, control, mut receivers) = fanout_from_senders(&[2, 1, 2]).await;
776
777            // First send should immediately complete because all senders have capacity:
778            let mut first_send = spawn(fanout.send(events[0].clone().into(), None));
779            assert_ready!(first_send.poll()).expect("should not fail");
780            drop(first_send);
781
782            // Second send should return pending because sender B is now full:
783            let mut second_send = spawn(fanout.send(events[1].clone().into(), None));
784            assert_pending!(second_send.poll());
785
786            // Now drop our chosen sender and assert that polling the second send behaves as expected:
787            remove_sender_from_fanout(&control, sender_id);
788
789            if should_complete {
790                assert_ready!(second_send.poll()).expect("should not fail");
791            } else {
792                assert_pending!(second_send.poll());
793            }
794            drop(second_send);
795
796            // Now grab the last value available to each receiver and assert it's the second event.
797            drop(fanout);
798
799            let mut last_seen = Vec::new();
800            for receiver in &mut receivers {
801                let mut events = Vec::new();
802                while let Some(event) = receiver.next().await {
803                    events.insert(0, event);
804                }
805
806                last_seen.push(unwrap_log_event_message(events.remove(0)));
807            }
808
809            assert_eq!(&expected_last_seen[..], &last_seen);
810        }
811    }
812
813    #[tokio::test]
814    async fn fanout_no_sinks() {
815        let (mut fanout, _) = Fanout::new();
816        let events = make_events(2);
817
818        fanout
819            .send(events[0].clone().into(), None)
820            .await
821            .expect("should not fail");
822        fanout
823            .send(events[1].clone().into(), None)
824            .await
825            .expect("should not fail");
826    }
827
828    #[tokio::test]
829    async fn fanout_replace() {
830        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4, 4]).await;
831        let events = make_events(3);
832
833        // First two sends should immediately complete because all senders have capacity:
834        fanout
835            .send(events[0].clone().into(), None)
836            .await
837            .expect("should not fail");
838        fanout
839            .send(events[1].clone().into(), None)
840            .await
841            .expect("should not fail");
842
843        // Replace the first sender with a brand new one before polling again:
844        let old_first_receiver = replace_sender_in_fanout(&control, &mut receivers, 0, 4).await;
845
846        // And do the third send which should also complete since all senders still have capacity:
847        fanout
848            .send(events[2].clone().into(), None)
849            .await
850            .expect("should not fail");
851
852        // Now make sure that the new "first" sender only got the third event, but that the second and
853        // third sender got all three events:
854        let expected_events = [&events[2..], &events, &events];
855        for (i, receiver) in receivers.into_iter().enumerate() {
856            assert_eq!(
857                collect_ready_events(receiver.into_stream()),
858                expected_events[i]
859            );
860        }
861
862        // And make sure our original "first" sender got the first two events:
863        assert_eq!(
864            collect_ready_events(old_first_receiver.into_stream()),
865            &events[..2]
866        );
867    }
868
869    #[tokio::test]
870    async fn fanout_wait() {
871        let (mut fanout, control, mut receivers) = fanout_from_senders(&[4, 4]).await;
872        let events = make_events(3);
873
874        // First two sends should immediately complete because all senders have capacity:
875        let send1 = Box::pin(fanout.send(events[0].clone().into(), None));
876        assert_ready!(poll!(send1)).expect("should not fail");
877        let send2 = Box::pin(fanout.send(events[1].clone().into(), None));
878        assert_ready!(poll!(send2)).expect("should not fail");
879
880        // Now do an empty replace on the second sender, which we'll test to make sure that `Fanout`
881        // doesn't let any writes through until we replace it properly.  We get back the receiver
882        // we've replaced, but also the sender that we want to eventually install:
883        let (old_first_receiver, new_first_sender) =
884            start_sender_replace(&control, &mut receivers, 0, 4).await;
885
886        // Third send should return pending because now we have an in-flight replacement:
887        let mut third_send = spawn(fanout.send(events[2].clone().into(), None));
888        assert_pending!(third_send.poll());
889
890        // Finish our sender replacement, which should wake up the third send and allow it to
891        // actually complete:
892        finish_sender_resume(&control, 0, new_first_sender);
893        assert!(third_send.is_woken());
894        assert_ready!(third_send.poll()).expect("should not fail");
895
896        // Make sure the original first sender got the first two events, the new first sender got
897        // the last event, and the second sender got all three:
898        assert_eq!(
899            collect_ready_events(old_first_receiver.into_stream()),
900            &events[0..2]
901        );
902
903        let expected_events = [&events[2..], &events];
904        for (i, receiver) in receivers.into_iter().enumerate() {
905            assert_eq!(
906                collect_ready_events(receiver.into_stream()),
907                expected_events[i]
908            );
909        }
910    }
911
912    fn make_events_inner(count: usize) -> impl Iterator<Item = LogEvent> {
913        (0..count).map(|i| LogEvent::from(format!("line {i}")))
914    }
915
916    fn make_events(count: usize) -> Vec<Event> {
917        make_events_inner(count).map(Into::into).collect()
918    }
919
920    fn make_event_array(count: usize) -> EventArray {
921        make_events_inner(count).collect::<Vec<_>>().into()
922    }
923}