vector/sinks/util/
sink.rs

1//! This module contains all our internal sink utilities
2//!
3//! All vector "sinks" are built around the `Sink` type which
4//! we use to "push" events into. Within the different types of
5//! vector "sinks" we need to support three main use cases:
6//!
7//! - Streaming sinks
8//! - Single partition batching
9//! - Multiple partition batching
10//!
11//! For each of these types this module provides one external type
12//! that can be used within sinks. The simplest type being the `StreamSink`
13//! type should be used when you do not want to batch events but you want
14//! to _stream_ them to the downstream service. `BatchSink` and `PartitionBatchSink`
15//! are similar in the sense that they both take some `tower::Service`, and `Batch`
16//! and will provide full batching, and request dispatching based on
17//! the settings passed.
18//!
19//! For more advanced use cases like HTTP based sinks, one should use the
20//! `BatchedHttpSink` type, which is a wrapper for `BatchSink` and `HttpSink`.
21//!
22//! # Driving to completion
23//!
24//! Each sink utility provided here strictly follows the patterns described in
25//! the `futures::Sink` docs. Each sink utility must be polled from a valid
26//! tokio context.
27//!
28//! For service based sinks like `BatchSink` and `PartitionBatchSink` they also
29//! must be polled within a valid tokio executor context. This is due to the fact
30//! that they will spawn service requests to allow them to be driven independently
31//! from the sink. A oneshot channel is used to tie them back into the sink to allow
32//! it to notify the consumer that the request has succeeded.
33
34use std::{
35    collections::HashMap,
36    fmt,
37    hash::Hash,
38    marker::PhantomData,
39    pin::Pin,
40    task::{ready, Context, Poll},
41};
42
43use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, Sink, Stream, TryFutureExt};
44use pin_project::pin_project;
45use tokio::{
46    sync::oneshot,
47    time::{sleep, Duration, Sleep},
48};
49use tower::{Service, ServiceBuilder};
50use tracing::Instrument;
51use vector_lib::internal_event::{
52    CallError, CountByteSize, EventsSent, InternalEventHandle as _, Output,
53};
54// === StreamSink<Event> ===
55pub use vector_lib::sink::StreamSink;
56
57use super::{
58    batch::{Batch, EncodedBatch, FinalizersBatch, PushResult, StatefulBatch},
59    buffer::{Partition, PartitionBuffer, PartitionInnerBuffer},
60    service::{Map, ServiceBuilderExt},
61    EncodedEvent,
62};
63use crate::event::EventStatus;
64
65// === BatchSink ===
66
67/// A `Sink` interface that wraps a `Service` and a
68/// `Batch`.
69///
70/// Provided a batching scheme, a service and batch settings
71/// this type will handle buffering events via the batching scheme
72/// and dispatching requests via the service based on either the size
73/// of the batch or a batch linger timeout.
74///
75/// # Acking
76///
77/// Service based acking will only ack events when all prior request
78/// batches have been acked. This means if sequential requests r1, r2,
79/// and r3 are dispatched and r2 and r3 complete, all events contained
80/// in all requests will not be acked until r1 has completed.
81///
82/// Note: This has been deprecated, please do not use when creating new Sinks.
83#[pin_project]
84#[derive(Debug)]
85pub struct BatchSink<S, B>
86where
87    S: Service<B::Output>,
88    B: Batch,
89{
90    #[pin]
91    inner: PartitionBatchSink<
92        Map<S, PartitionInnerBuffer<B::Output, ()>, B::Output>,
93        PartitionBuffer<B, ()>,
94        (),
95    >,
96}
97
98impl<S, B> BatchSink<S, B>
99where
100    S: Service<B::Output>,
101    S::Future: Send + 'static,
102    S::Error: Into<crate::Error> + Send + 'static,
103    S::Response: Response + Send + 'static,
104    B: Batch,
105{
106    pub fn new(service: S, batch: B, timeout: Duration) -> Self {
107        let service = ServiceBuilder::new()
108            .map(|req: PartitionInnerBuffer<B::Output, ()>| req.into_parts().0)
109            .service(service);
110        let batch = PartitionBuffer::new(batch);
111        let inner = PartitionBatchSink::new(service, batch, timeout);
112        Self { inner }
113    }
114}
115
116#[cfg(test)]
117impl<S, B> BatchSink<S, B>
118where
119    S: Service<B::Output>,
120    B: Batch,
121{
122    pub const fn get_ref(&self) -> &S {
123        &self.inner.service.service.inner
124    }
125}
126
127impl<S, B> Sink<EncodedEvent<B::Input>> for BatchSink<S, B>
128where
129    S: Service<B::Output>,
130    S::Future: Send + 'static,
131    S::Error: Into<crate::Error> + Send + 'static,
132    S::Response: Response + Send + 'static,
133    B: Batch,
134{
135    type Error = crate::Error;
136
137    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138        self.project().inner.poll_ready(cx)
139    }
140
141    fn start_send(self: Pin<&mut Self>, item: EncodedEvent<B::Input>) -> Result<(), Self::Error> {
142        self.project()
143            .inner
144            .start_send(item.map(|item| PartitionInnerBuffer::new(item, ())))
145    }
146
147    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148        self.project().inner.poll_flush(cx)
149    }
150
151    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152        self.project().inner.poll_close(cx)
153    }
154}
155
156// === PartitionBatchSink ===
157
158/// A partition based batcher, given some `Service` and `Batch` where the
159/// input is partitionable via the `Partition` trait, it will hold many
160/// in flight batches.
161///
162/// This type is similar to `BatchSink` with the added benefit that it has
163/// more fine-grained partitioning ability. It will hold many different batches
164/// of events and contain linger timeouts for each.
165///
166/// Note that, unlike `BatchSink`, the `batch` given to this sink is
167/// *only* used to create new batches (via `Batch::fresh`) for each new
168/// partition.
169///
170/// # Acking
171///
172/// Service based acking will only ack events when all prior request
173/// batches have been acked. This means if sequential requests r1, r2,
174/// and r3 are dispatched and r2 and r3 complete, all events contained
175/// in all requests will not be acked until r1 has completed.
176///
177/// # Ordering
178/// Per partition ordering can be achieved by holding onto future of a request
179/// until it finishes. Until then all further requests in that partition are
180/// delayed.
181///
182/// Note: This has been deprecated, please do not use when creating new Sinks.
183#[pin_project]
184pub struct PartitionBatchSink<S, B, K>
185where
186    B: Batch,
187    S: Service<B::Output>,
188{
189    service: ServiceSink<S, B::Output>,
190    buffer: Option<(K, EncodedEvent<B::Input>)>,
191    batch: StatefulBatch<FinalizersBatch<B>>,
192    partitions: HashMap<K, StatefulBatch<FinalizersBatch<B>>>,
193    timeout: Duration,
194    lingers: HashMap<K, Pin<Box<Sleep>>>,
195    in_flight: Option<HashMap<K, BoxFuture<'static, ()>>>,
196    closing: bool,
197}
198
199impl<S, B, K> PartitionBatchSink<S, B, K>
200where
201    B: Batch,
202    B::Input: Partition<K>,
203    K: Hash + Eq + Clone + Send + 'static,
204    S: Service<B::Output>,
205    S::Future: Send + 'static,
206    S::Error: Into<crate::Error> + Send + 'static,
207    S::Response: Response + Send + 'static,
208{
209    pub fn new(service: S, batch: B, timeout: Duration) -> Self {
210        Self {
211            service: ServiceSink::new(service),
212            buffer: None,
213            batch: StatefulBatch::from(FinalizersBatch::from(batch)),
214            partitions: HashMap::new(),
215            timeout,
216            lingers: HashMap::new(),
217            in_flight: None,
218            closing: false,
219        }
220    }
221
222    /// Enforces per partition ordering of request.
223    pub fn ordered(&mut self) {
224        self.in_flight = Some(HashMap::new());
225    }
226}
227
228impl<S, B, K> Sink<EncodedEvent<B::Input>> for PartitionBatchSink<S, B, K>
229where
230    B: Batch,
231    B::Input: Partition<K>,
232    K: Hash + Eq + Clone + Send + 'static,
233    S: Service<B::Output>,
234    S::Future: Send + 'static,
235    S::Error: Into<crate::Error> + Send + 'static,
236    S::Response: Response + Send + 'static,
237{
238    type Error = crate::Error;
239
240    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
241        if self.buffer.is_some() {
242            match self.as_mut().poll_flush(cx) {
243                Poll::Ready(Ok(())) => {}
244                Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
245                Poll::Pending => {
246                    if self.buffer.is_some() {
247                        return Poll::Pending;
248                    }
249                }
250            }
251        }
252
253        Poll::Ready(Ok(()))
254    }
255
256    fn start_send(
257        mut self: Pin<&mut Self>,
258        item: EncodedEvent<B::Input>,
259    ) -> Result<(), Self::Error> {
260        let partition = item.item.partition();
261
262        let batch = loop {
263            if let Some(batch) = self.partitions.get_mut(&partition) {
264                break batch;
265            }
266
267            let batch = self.batch.fresh();
268            self.partitions.insert(partition.clone(), batch);
269
270            let delay = sleep(self.timeout);
271            self.lingers.insert(partition.clone(), Box::pin(delay));
272        };
273
274        if let PushResult::Overflow(item) = batch.push(item) {
275            self.buffer = Some((partition, item));
276        }
277
278        Ok(())
279    }
280
281    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
282        loop {
283            // Poll inner service while not ready, if we don't have buffer or any batch.
284            if self.buffer.is_none() && self.partitions.is_empty() {
285                ready!(self.service.poll_complete(cx));
286                return Poll::Ready(Ok(()));
287            }
288
289            // Try send batches.
290            let this = self.as_mut().project();
291            let mut partitions_ready = vec![];
292            for (partition, batch) in this.partitions.iter() {
293                if ((*this.closing && !batch.is_empty())
294                    || batch.was_full()
295                    || matches!(
296                        this.lingers
297                            .get_mut(partition)
298                            .expect("linger should exists for poll_flush")
299                            .poll_unpin(cx),
300                        Poll::Ready(())
301                    ))
302                    && this
303                        .in_flight
304                        .as_mut()
305                        .and_then(|map| map.get_mut(partition))
306                        .map(|req| matches!(req.poll_unpin(cx), Poll::Ready(())))
307                        .unwrap_or(true)
308                {
309                    partitions_ready.push(partition.clone());
310                }
311            }
312            let mut batch_consumed = false;
313            for partition in partitions_ready.iter() {
314                let service_ready = match this.service.poll_ready(cx) {
315                    Poll::Ready(Ok(())) => true,
316                    Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
317                    Poll::Pending => false,
318                };
319                if service_ready {
320                    trace!("Service ready; Sending batch.");
321
322                    let batch = this.partitions.remove(partition).unwrap();
323                    this.lingers.remove(partition);
324
325                    let batch = batch.finish();
326                    let future = tokio::spawn(this.service.call(batch));
327
328                    if let Some(map) = this.in_flight.as_mut() {
329                        map.insert(partition.clone(), future.map(|_| ()).fuse().boxed());
330                    }
331
332                    batch_consumed = true;
333                } else {
334                    break;
335                }
336            }
337            if batch_consumed {
338                continue;
339            }
340
341            // Cleanup of in flight futures
342            if let Some(in_flight) = this.in_flight.as_mut() {
343                if in_flight.len() > this.partitions.len() {
344                    // There is at least one in flight future without a partition to check it
345                    // so we will do it here.
346                    let partitions = this.partitions;
347                    in_flight.retain(|partition, req| {
348                        partitions.contains_key(partition) || req.poll_unpin(cx).is_pending()
349                    });
350                }
351            }
352
353            // Try move item from buffer to batch.
354            if let Some((partition, item)) = self.buffer.take() {
355                if self.partitions.contains_key(&partition) {
356                    self.buffer = Some((partition, item));
357                } else {
358                    self.as_mut().start_send(item)?;
359
360                    if self.buffer.is_some() {
361                        unreachable!("Empty buffer overflowed.");
362                    }
363
364                    continue;
365                }
366            }
367
368            // Only poll inner service and return `Poll::Pending` anyway.
369            ready!(self.service.poll_complete(cx));
370            return Poll::Pending;
371        }
372    }
373
374    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375        trace!("Closing partition batch sink.");
376        self.closing = true;
377        self.poll_flush(cx)
378    }
379}
380
381impl<S, B, K> fmt::Debug for PartitionBatchSink<S, B, K>
382where
383    S: Service<B::Output> + fmt::Debug,
384    B: Batch + fmt::Debug,
385{
386    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387        f.debug_struct("PartitionBatchSink")
388            .field("service", &self.service)
389            .field("batch", &self.batch)
390            .field("timeout", &self.timeout)
391            .finish()
392    }
393}
394
395// === ServiceSink ===
396
397struct ServiceSink<S, Request> {
398    service: S,
399    in_flight: FuturesUnordered<oneshot::Receiver<()>>,
400    next_request_id: usize,
401    _pd: PhantomData<Request>,
402}
403
404impl<S, Request> ServiceSink<S, Request>
405where
406    S: Service<Request>,
407    S::Future: Send + 'static,
408    S::Error: Into<crate::Error> + Send + 'static,
409    S::Response: Response + Send + 'static,
410{
411    fn new(service: S) -> Self {
412        Self {
413            service,
414            in_flight: FuturesUnordered::new(),
415            next_request_id: 0,
416            _pd: PhantomData,
417        }
418    }
419
420    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
421        self.service.poll_ready(cx).map_err(Into::into)
422    }
423
424    fn call(&mut self, batch: EncodedBatch<Request>) -> BoxFuture<'static, ()> {
425        let EncodedBatch {
426            items,
427            finalizers,
428            count,
429            json_byte_size,
430            ..
431        } = batch;
432
433        let (tx, rx) = oneshot::channel();
434
435        self.in_flight.push(rx);
436
437        let request_id = self.next_request_id;
438        self.next_request_id = request_id.wrapping_add(1);
439
440        trace!(
441            message = "Submitting service request.",
442            in_flight_requests = self.in_flight.len()
443        );
444        let events_sent = register!(EventsSent::from(Output(None)));
445        self.service
446            .call(items)
447            .err_into()
448            .map(move |result| {
449                let status = result_status(&result);
450                finalizers.update_status(status);
451                match status {
452                    EventStatus::Delivered => {
453                        events_sent.emit(CountByteSize(count, json_byte_size));
454                        // TODO: Emit a BytesSent event here too
455                    }
456                    EventStatus::Rejected => {
457                        // Emit the `Error` and `EventsDropped` internal events.
458                        // This scenario occurs after retries have been attempted.
459                        let error = result.err().unwrap_or_else(|| "Response failed.".into());
460                        emit!(CallError {
461                            error,
462                            request_id,
463                            count,
464                        });
465                    }
466                    _ => {} // do nothing
467                }
468
469                // If the rx end is dropped we still completed
470                // the request so this is a weird case that we can
471                // ignore for now.
472                _ = tx.send(());
473            })
474            .instrument(info_span!("request", %request_id).or_current())
475            .boxed()
476    }
477
478    fn poll_complete(&mut self, cx: &mut Context<'_>) -> Poll<()> {
479        while !self.in_flight.is_empty() {
480            match ready!(Pin::new(&mut self.in_flight).poll_next(cx)) {
481                Some(Ok(())) => {}
482                Some(Err(_)) => panic!("ServiceSink service sender dropped."),
483                None => break,
484            }
485        }
486
487        Poll::Ready(())
488    }
489}
490
491impl<S, Request> fmt::Debug for ServiceSink<S, Request>
492where
493    S: fmt::Debug,
494{
495    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496        f.debug_struct("ServiceSink")
497            .field("service", &self.service)
498            .finish()
499    }
500}
501
502// === Response ===
503
504pub trait ServiceLogic: Clone {
505    type Response: Response;
506    fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus;
507}
508
509#[derive(Derivative)]
510#[derivative(Clone)]
511pub struct StdServiceLogic<R> {
512    _pd: PhantomData<R>,
513}
514
515impl<R> Default for StdServiceLogic<R> {
516    fn default() -> Self {
517        Self { _pd: PhantomData }
518    }
519}
520
521impl<R> ServiceLogic for StdServiceLogic<R>
522where
523    R: Response + Send,
524{
525    type Response = R;
526
527    fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus {
528        result_status(result)
529    }
530}
531
532fn result_status<R: Response + Send>(result: &crate::Result<R>) -> EventStatus {
533    match result {
534        Ok(response) => {
535            if response.is_successful() {
536                trace!(message = "Response successful.", ?response);
537                EventStatus::Delivered
538            } else if response.is_transient() {
539                error!(message = "Response wasn't successful.", ?response);
540                EventStatus::Errored
541            } else {
542                error!(message = "Response failed.", ?response);
543                EventStatus::Rejected
544            }
545        }
546        Err(error) => {
547            error!(message = "Request failed.", %error);
548            EventStatus::Errored
549        }
550    }
551}
552
553// === Response ===
554
555pub trait Response: fmt::Debug {
556    fn is_successful(&self) -> bool {
557        true
558    }
559
560    fn is_transient(&self) -> bool {
561        true
562    }
563}
564
565impl Response for () {}
566
567impl Response for &str {}
568
569#[cfg(test)]
570mod tests {
571    use std::{
572        convert::Infallible,
573        sync::{atomic::AtomicUsize, atomic::Ordering::Relaxed, Arc, Mutex},
574    };
575
576    use bytes::Bytes;
577    use futures::{future, stream, task::noop_waker_ref, SinkExt, StreamExt};
578    use tokio::{task::yield_now, time::Instant};
579    use vector_lib::{
580        finalization::{BatchNotifier, BatchStatus, EventFinalizer, EventFinalizers},
581        json_size::JsonSize,
582    };
583
584    use super::*;
585    use crate::{
586        sinks::util::{BatchSettings, EncodedLength, VecBuffer},
587        test_util::trace_init,
588    };
589
590    const TIMEOUT: Duration = Duration::from_secs(10);
591
592    impl EncodedLength for usize {
593        fn encoded_length(&self) -> usize {
594            22
595        }
596    }
597
598    async fn advance_time(duration: Duration) {
599        tokio::time::pause();
600        tokio::time::advance(duration).await;
601        tokio::time::resume();
602    }
603
604    type Counter = Arc<AtomicUsize>;
605
606    struct Request(usize, EventFinalizers);
607
608    impl Request {
609        fn new(value: usize, counter: &Counter) -> Self {
610            let (batch, receiver) = BatchNotifier::new_with_receiver();
611            let counter = Arc::clone(counter);
612            tokio::spawn(async move {
613                if receiver.await == BatchStatus::Delivered {
614                    counter.fetch_add(value, Relaxed);
615                }
616            });
617            Self(value, EventFinalizers::new(EventFinalizer::new(batch)))
618        }
619
620        fn encoded(value: usize, counter: &Counter) -> EncodedEvent<Self> {
621            let mut item = Self::new(value, counter);
622            let finalizers = std::mem::take(&mut item.1);
623            EncodedEvent {
624                item,
625                finalizers,
626                json_byte_size: JsonSize::zero(),
627                byte_size: 0,
628            }
629        }
630    }
631
632    impl EncodedLength for Request {
633        fn encoded_length(&self) -> usize {
634            22
635        }
636    }
637
638    #[tokio::test]
639    async fn batch_sink_acking_sequential() {
640        let ack_counter = Counter::default();
641
642        let svc = tower::service_fn(|_| future::ok::<_, std::io::Error>(()));
643        let mut batch_settings = BatchSettings::default();
644        batch_settings.size.bytes = 9999;
645        batch_settings.size.events = 10;
646
647        let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
648
649        buffered
650            .sink_map_err(drop)
651            .send_all(
652                &mut stream::iter(1..=22).map(|item| Ok(Request::encoded(item, &ack_counter))),
653            )
654            .await
655            .unwrap();
656
657        assert_eq!(ack_counter.load(Relaxed), 22 * 23 / 2);
658    }
659
660    #[tokio::test]
661    async fn batch_sink_acking_unordered() {
662        let ack_counter = Counter::default();
663
664        trace_init();
665
666        // Services future will be spawned and work between `yield_now` calls.
667        let svc = tower::service_fn(|req: Vec<Request>| async move {
668            let duration = match req[0].0 {
669                1..=3 => Duration::from_secs(1),
670
671                // The 4th request will introduce some sort of
672                // latency spike to ensure later events don't
673                // get acked.
674                4 => Duration::from_secs(5),
675                5 | 6 => Duration::from_secs(1),
676                _ => unreachable!(),
677            };
678
679            sleep(duration).await;
680            Ok::<(), Infallible>(())
681        });
682
683        let mut batch_settings = BatchSettings::default();
684        batch_settings.size.bytes = 9999;
685        batch_settings.size.events = 1;
686
687        let mut sink = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
688
689        let mut cx = Context::from_waker(noop_waker_ref());
690        for item in 1..=3 {
691            assert!(matches!(
692                sink.poll_ready_unpin(&mut cx),
693                Poll::Ready(Ok(()))
694            ));
695            assert!(matches!(
696                sink.start_send_unpin(Request::encoded(item, &ack_counter)),
697                Ok(())
698            ));
699        }
700
701        // Clear internal buffer
702        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
703        assert_eq!(ack_counter.load(Relaxed), 0);
704
705        yield_now().await;
706        advance_time(Duration::from_secs(3)).await;
707        yield_now().await;
708
709        for _ in 1..=3 {
710            assert!(matches!(
711                sink.poll_flush_unpin(&mut cx),
712                Poll::Ready(Ok(()))
713            ));
714        }
715
716        // Events 1,2,3 should have been acked at this point.
717        assert_eq!(ack_counter.load(Relaxed), 6);
718
719        for item in 4..=6 {
720            assert!(matches!(
721                sink.poll_ready_unpin(&mut cx),
722                Poll::Ready(Ok(()))
723            ));
724            assert!(matches!(
725                sink.start_send_unpin(Request::encoded(item, &ack_counter)),
726                Ok(())
727            ));
728        }
729
730        // Clear internal buffer
731        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
732        assert_eq!(ack_counter.load(Relaxed), 6);
733
734        yield_now().await;
735        advance_time(Duration::from_secs(2)).await;
736        yield_now().await;
737
738        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
739
740        // Check that events 1-3,5,6 have been acked
741        assert_eq!(ack_counter.load(Relaxed), 17);
742
743        yield_now().await;
744        advance_time(Duration::from_secs(5)).await;
745        yield_now().await;
746
747        for _ in 4..=6 {
748            assert!(matches!(
749                sink.poll_flush_unpin(&mut cx),
750                Poll::Ready(Ok(()))
751            ));
752        }
753
754        assert_eq!(ack_counter.load(Relaxed), 21);
755    }
756
757    #[tokio::test]
758    async fn batch_sink_buffers_messages_until_limit() {
759        let sent_requests = Arc::new(Mutex::new(Vec::new()));
760
761        let svc = tower::service_fn(|req| {
762            let sent_requests = Arc::clone(&sent_requests);
763
764            sent_requests.lock().unwrap().push(req);
765
766            future::ok::<_, std::io::Error>(())
767        });
768
769        let mut batch_settings = BatchSettings::default();
770        batch_settings.size.bytes = 9999;
771        batch_settings.size.events = 10;
772        let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
773
774        buffered
775            .sink_map_err(drop)
776            .send_all(
777                &mut stream::iter(0..22)
778                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
779            )
780            .await
781            .unwrap();
782
783        let output = sent_requests.lock().unwrap();
784        assert_eq!(
785            &*output,
786            &vec![
787                vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
788                vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
789                vec![20, 21]
790            ]
791        );
792    }
793
794    #[tokio::test]
795    async fn batch_sink_flushes_below_min_on_close() {
796        let sent_requests = Arc::new(Mutex::new(Vec::new()));
797
798        let svc = tower::service_fn(|req| {
799            let sent_requests = Arc::clone(&sent_requests);
800            sent_requests.lock().unwrap().push(req);
801            future::ok::<_, std::io::Error>(())
802        });
803
804        let mut batch_settings = BatchSettings::default();
805        batch_settings.size.bytes = 9999;
806        batch_settings.size.events = 10;
807        let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
808
809        let mut cx = Context::from_waker(noop_waker_ref());
810        assert!(matches!(
811            buffered.poll_ready_unpin(&mut cx),
812            Poll::Ready(Ok(()))
813        ));
814        assert!(matches!(
815            buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
816            Ok(())
817        ));
818        assert!(matches!(
819            buffered.poll_ready_unpin(&mut cx),
820            Poll::Ready(Ok(()))
821        ));
822        assert!(matches!(
823            buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
824            Ok(())
825        ));
826
827        buffered.close().await.unwrap();
828
829        let output = sent_requests.lock().unwrap();
830        assert_eq!(&*output, &vec![vec![0, 1]]);
831    }
832
833    #[tokio::test]
834    async fn batch_sink_expired_linger() {
835        let sent_requests = Arc::new(Mutex::new(Vec::new()));
836
837        let svc = tower::service_fn(|req| {
838            let sent_requests = Arc::clone(&sent_requests);
839            sent_requests.lock().unwrap().push(req);
840            future::ok::<_, std::io::Error>(())
841        });
842
843        let mut batch_settings = BatchSettings::default();
844        batch_settings.size.bytes = 9999;
845        batch_settings.size.events = 10;
846        let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
847
848        let mut cx = Context::from_waker(noop_waker_ref());
849        assert!(matches!(
850            buffered.poll_ready_unpin(&mut cx),
851            Poll::Ready(Ok(()))
852        ));
853        assert!(matches!(
854            buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
855            Ok(())
856        ));
857        assert!(matches!(
858            buffered.poll_ready_unpin(&mut cx),
859            Poll::Ready(Ok(()))
860        ));
861        assert!(matches!(
862            buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
863            Ok(())
864        ));
865
866        // Move clock forward by linger timeout + 1 sec
867        advance_time(TIMEOUT + Duration::from_secs(1)).await;
868
869        // Flush buffer and make sure that this didn't take long time (because linger elapsed).
870        let start = Instant::now();
871        buffered.flush().await.unwrap();
872        let elapsed = start.duration_since(start);
873        assert!(elapsed < Duration::from_millis(200));
874
875        let output = sent_requests.lock().unwrap();
876        assert_eq!(&*output, &vec![vec![0, 1]]);
877    }
878
879    #[tokio::test]
880    async fn partition_batch_sink_buffers_messages_until_limit() {
881        let sent_requests = Arc::new(Mutex::new(Vec::new()));
882
883        let svc = tower::service_fn(|req| {
884            let sent_requests = Arc::clone(&sent_requests);
885            sent_requests.lock().unwrap().push(req);
886            future::ok::<_, std::io::Error>(())
887        });
888
889        let mut batch_settings = BatchSettings::default();
890        batch_settings.size.bytes = 9999;
891        batch_settings.size.events = 10;
892
893        let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
894
895        sink.sink_map_err(drop)
896            .send_all(
897                &mut stream::iter(0..22)
898                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
899            )
900            .await
901            .unwrap();
902
903        let output = sent_requests.lock().unwrap();
904        assert_eq!(
905            &*output,
906            &vec![
907                vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
908                vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
909                vec![20, 21]
910            ]
911        );
912    }
913
914    #[tokio::test]
915    async fn partition_batch_sink_buffers_by_partition_buffer_size_one() {
916        let sent_requests = Arc::new(Mutex::new(Vec::new()));
917
918        let svc = tower::service_fn(|req| {
919            let sent_requests = Arc::clone(&sent_requests);
920            sent_requests.lock().unwrap().push(req);
921            future::ok::<_, std::io::Error>(())
922        });
923
924        let mut batch_settings = BatchSettings::default();
925        batch_settings.size.bytes = 9999;
926        batch_settings.size.events = 1;
927
928        let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
929
930        let input = vec![Partitions::A, Partitions::B];
931        sink.sink_map_err(drop)
932            .send_all(
933                &mut stream::iter(input)
934                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
935            )
936            .await
937            .unwrap();
938
939        let mut output = sent_requests.lock().unwrap();
940        output[..].sort();
941        assert_eq!(&*output, &vec![vec![Partitions::A], vec![Partitions::B]]);
942    }
943
944    #[tokio::test]
945    async fn partition_batch_sink_buffers_by_partition_buffer_size_two() {
946        let sent_requests = Arc::new(Mutex::new(Vec::new()));
947
948        let svc = tower::service_fn(|req| {
949            let sent_requests = Arc::clone(&sent_requests);
950            sent_requests.lock().unwrap().push(req);
951            future::ok::<_, std::io::Error>(())
952        });
953
954        let mut batch_settings = BatchSettings::default();
955        batch_settings.size.bytes = 9999;
956        batch_settings.size.events = 2;
957
958        let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
959
960        let input = vec![Partitions::A, Partitions::B, Partitions::A, Partitions::B];
961        sink.sink_map_err(drop)
962            .send_all(
963                &mut stream::iter(input)
964                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
965            )
966            .await
967            .unwrap();
968
969        let mut output = sent_requests.lock().unwrap();
970        output[..].sort();
971        assert_eq!(
972            &*output,
973            &vec![
974                vec![Partitions::A, Partitions::A],
975                vec![Partitions::B, Partitions::B]
976            ]
977        );
978    }
979
980    #[tokio::test]
981    async fn partition_batch_sink_submits_after_linger() {
982        let sent_requests = Arc::new(Mutex::new(Vec::new()));
983
984        let svc = tower::service_fn(|req| {
985            let sent_requests = Arc::clone(&sent_requests);
986            sent_requests.lock().unwrap().push(req);
987            future::ok::<_, std::io::Error>(())
988        });
989
990        let mut batch_settings = BatchSettings::default();
991        batch_settings.size.bytes = 9999;
992        batch_settings.size.events = 10;
993
994        let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
995
996        let mut cx = Context::from_waker(noop_waker_ref());
997        assert!(matches!(
998            sink.poll_ready_unpin(&mut cx),
999            Poll::Ready(Ok(()))
1000        ));
1001        assert!(matches!(
1002            sink.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
1003            Ok(())
1004        ));
1005        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
1006
1007        advance_time(TIMEOUT + Duration::from_secs(1)).await;
1008
1009        let start = Instant::now();
1010        sink.flush().await.unwrap();
1011        let elapsed = start.duration_since(start);
1012        assert!(elapsed < Duration::from_millis(200));
1013
1014        let output = sent_requests.lock().unwrap();
1015        assert_eq!(&*output, &vec![vec![1]]);
1016    }
1017
1018    #[tokio::test]
1019    async fn service_sink_doesnt_propagate_error() {
1020        let ack_counter = Counter::default();
1021
1022        // We need a mock executor here because we need to ensure
1023        // that we poll the service futures within the mock clock
1024        // context. This allows us to manually advance the time on the
1025        // "spawned" futures.
1026        let svc = tower::service_fn(|req: Request| {
1027            if req.0 == 3 {
1028                future::err("bad")
1029            } else {
1030                future::ok("good")
1031            }
1032        });
1033        let mut sink = ServiceSink::new(svc);
1034        let req = |items: usize| {
1035            let mut req = Request::new(items, &ack_counter);
1036            let finalizers = std::mem::take(&mut req.1);
1037            EncodedBatch {
1038                items: req,
1039                finalizers,
1040                count: items,
1041                byte_size: 1,
1042                json_byte_size: JsonSize::new(1),
1043            }
1044        };
1045
1046        // send some initial requests
1047        let mut fut1 = sink.call(req(1));
1048        let mut fut2 = sink.call(req(2));
1049
1050        assert_eq!(ack_counter.load(Relaxed), 0);
1051
1052        let mut cx = Context::from_waker(noop_waker_ref());
1053        assert!(matches!(fut1.poll_unpin(&mut cx), Poll::Ready(())));
1054        assert!(matches!(fut2.poll_unpin(&mut cx), Poll::Ready(())));
1055        assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
1056
1057        yield_now().await;
1058        assert_eq!(ack_counter.load(Relaxed), 3);
1059
1060        // send one request that will error and one normal
1061        let mut fut3 = sink.call(req(3)); // I will error
1062        let mut fut4 = sink.call(req(4));
1063
1064        // make sure they all "worked"
1065        assert!(matches!(fut3.poll_unpin(&mut cx), Poll::Ready(())));
1066        assert!(matches!(fut4.poll_unpin(&mut cx), Poll::Ready(())));
1067        assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
1068
1069        yield_now().await;
1070        assert_eq!(ack_counter.load(Relaxed), 7);
1071    }
1072
1073    #[tokio::test]
1074    async fn partition_batch_sink_ordering_per_partition() {
1075        let sent_requests = Arc::new(Mutex::new(Vec::new()));
1076
1077        let mut delay = true;
1078        let svc = tower::service_fn(|req| {
1079            let sent_requests = Arc::clone(&sent_requests);
1080            if delay {
1081                // Delay and then error
1082                delay = false;
1083                sleep(Duration::from_secs(1))
1084                    .map(move |_| {
1085                        sent_requests.lock().unwrap().push(req);
1086                        Result::<_, std::io::Error>::Ok(())
1087                    })
1088                    .boxed()
1089            } else {
1090                sent_requests.lock().unwrap().push(req);
1091                future::ok::<_, std::io::Error>(()).boxed()
1092            }
1093        });
1094
1095        let mut batch_settings = BatchSettings::default();
1096        batch_settings.size.bytes = 9999;
1097        batch_settings.size.events = 10;
1098
1099        let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
1100        sink.ordered();
1101
1102        let input = (0..20).map(|i| (0, i)).chain((0..20).map(|i| (1, i)));
1103        sink.sink_map_err(drop)
1104            .send_all(
1105                &mut stream::iter(input)
1106                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
1107            )
1108            .await
1109            .unwrap();
1110
1111        let output = sent_requests.lock().unwrap();
1112        // We sended '0' partition first and delayed sending only first request, first 10 events,
1113        // which should delay sending the second batch of events in the same partition until
1114        // the first one succeeds.
1115        assert_eq!(
1116            &*output,
1117            &vec![
1118                (0..10).map(|i| (1, i)).collect::<Vec<_>>(),
1119                (10..20).map(|i| (1, i)).collect(),
1120                (0..10).map(|i| (0, i)).collect(),
1121                (10..20).map(|i| (0, i)).collect(),
1122            ]
1123        );
1124    }
1125
1126    #[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
1127    enum Partitions {
1128        A,
1129        B,
1130    }
1131
1132    impl EncodedLength for Partitions {
1133        fn encoded_length(&self) -> usize {
1134            10 // Dummy value
1135        }
1136    }
1137
1138    impl Partition<Bytes> for Partitions {
1139        fn partition(&self) -> Bytes {
1140            format!("{self:?}").into()
1141        }
1142    }
1143
1144    impl Partition<Bytes> for usize {
1145        fn partition(&self) -> Bytes {
1146            "key".into()
1147        }
1148    }
1149
1150    impl Partition<Bytes> for (usize, usize) {
1151        fn partition(&self) -> Bytes {
1152            self.0.to_string().into()
1153        }
1154    }
1155
1156    impl EncodedLength for (usize, usize) {
1157        fn encoded_length(&self) -> usize {
1158            16
1159        }
1160    }
1161}