vector/sinks/util/
sink.rs

1//! This module contains all our internal sink utilities
2//!
3//! All vector "sinks" are built around the `Sink` type which
4//! we use to "push" events into. Within the different types of
5//! vector "sinks" we need to support three main use cases:
6//!
7//! - Streaming sinks
8//! - Single partition batching
9//! - Multiple partition batching
10//!
11//! For each of these types this module provides one external type
12//! that can be used within sinks. The simplest type being the `StreamSink`
13//! type should be used when you do not want to batch events but you want
14//! to _stream_ them to the downstream service. `BatchSink` and `PartitionBatchSink`
15//! are similar in the sense that they both take some `tower::Service`, and `Batch`
16//! and will provide full batching, and request dispatching based on
17//! the settings passed.
18//!
19//! For more advanced use cases like HTTP based sinks, one should use the
20//! `BatchedHttpSink` type, which is a wrapper for `BatchSink` and `HttpSink`.
21//!
22//! # Driving to completion
23//!
24//! Each sink utility provided here strictly follows the patterns described in
25//! the `futures::Sink` docs. Each sink utility must be polled from a valid
26//! tokio context.
27//!
28//! For service based sinks like `BatchSink` and `PartitionBatchSink` they also
29//! must be polled within a valid tokio executor context. This is due to the fact
30//! that they will spawn service requests to allow them to be driven independently
31//! from the sink. A oneshot channel is used to tie them back into the sink to allow
32//! it to notify the consumer that the request has succeeded.
33
34use std::{
35    collections::HashMap,
36    fmt,
37    hash::Hash,
38    marker::PhantomData,
39    pin::Pin,
40    task::{Context, Poll, ready},
41};
42
43use futures::{FutureExt, Sink, Stream, TryFutureExt, future::BoxFuture, stream::FuturesUnordered};
44use pin_project::pin_project;
45use tokio::{
46    sync::oneshot,
47    time::{Duration, Sleep, sleep},
48};
49use tower::{Service, ServiceBuilder};
50use tracing::Instrument;
51use vector_lib::internal_event::{
52    CallError, CountByteSize, EventsSent, InternalEventHandle as _, Output,
53};
54// === StreamSink<Event> ===
55pub use vector_lib::sink::StreamSink;
56
57use super::{
58    EncodedEvent,
59    batch::{Batch, EncodedBatch, FinalizersBatch, PushResult, StatefulBatch},
60    buffer::{Partition, PartitionBuffer, PartitionInnerBuffer},
61    service::{Map, ServiceBuilderExt},
62};
63use crate::event::EventStatus;
64
65// === BatchSink ===
66
67/// A `Sink` interface that wraps a `Service` and a
68/// `Batch`.
69///
70/// Provided a batching scheme, a service and batch settings
71/// this type will handle buffering events via the batching scheme
72/// and dispatching requests via the service based on either the size
73/// of the batch or a batch linger timeout.
74///
75/// # Acking
76///
77/// Service based acking will only ack events when all prior request
78/// batches have been acked. This means if sequential requests r1, r2,
79/// and r3 are dispatched and r2 and r3 complete, all events contained
80/// in all requests will not be acked until r1 has completed.
81///
82/// Note: This has been deprecated, please do not use when creating new Sinks.
83#[pin_project]
84#[derive(Debug)]
85pub struct BatchSink<S, B>
86where
87    S: Service<B::Output>,
88    B: Batch,
89{
90    #[pin]
91    inner: PartitionBatchSink<
92        Map<S, PartitionInnerBuffer<B::Output, ()>, B::Output>,
93        PartitionBuffer<B, ()>,
94        (),
95    >,
96}
97
98impl<S, B> BatchSink<S, B>
99where
100    S: Service<B::Output>,
101    S::Future: Send + 'static,
102    S::Error: Into<crate::Error> + Send + 'static,
103    S::Response: Response + Send + 'static,
104    B: Batch,
105{
106    pub fn new(service: S, batch: B, timeout: Duration) -> Self {
107        let service = ServiceBuilder::new()
108            .map(|req: PartitionInnerBuffer<B::Output, ()>| req.into_parts().0)
109            .service(service);
110        let batch = PartitionBuffer::new(batch);
111        let inner = PartitionBatchSink::new(service, batch, timeout);
112        Self { inner }
113    }
114}
115
116#[cfg(test)]
117impl<S, B> BatchSink<S, B>
118where
119    S: Service<B::Output>,
120    B: Batch,
121{
122    pub const fn get_ref(&self) -> &S {
123        &self.inner.service.service.inner
124    }
125}
126
127impl<S, B> Sink<EncodedEvent<B::Input>> for BatchSink<S, B>
128where
129    S: Service<B::Output>,
130    S::Future: Send + 'static,
131    S::Error: Into<crate::Error> + Send + 'static,
132    S::Response: Response + Send + 'static,
133    B: Batch,
134{
135    type Error = crate::Error;
136
137    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138        self.project().inner.poll_ready(cx)
139    }
140
141    fn start_send(self: Pin<&mut Self>, item: EncodedEvent<B::Input>) -> Result<(), Self::Error> {
142        self.project()
143            .inner
144            .start_send(item.map(|item| PartitionInnerBuffer::new(item, ())))
145    }
146
147    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148        self.project().inner.poll_flush(cx)
149    }
150
151    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152        self.project().inner.poll_close(cx)
153    }
154}
155
156// === PartitionBatchSink ===
157
158/// A partition based batcher, given some `Service` and `Batch` where the
159/// input is partitionable via the `Partition` trait, it will hold many
160/// in flight batches.
161///
162/// This type is similar to `BatchSink` with the added benefit that it has
163/// more fine-grained partitioning ability. It will hold many different batches
164/// of events and contain linger timeouts for each.
165///
166/// Note that, unlike `BatchSink`, the `batch` given to this sink is
167/// *only* used to create new batches (via `Batch::fresh`) for each new
168/// partition.
169///
170/// # Acking
171///
172/// Service based acking will only ack events when all prior request
173/// batches have been acked. This means if sequential requests r1, r2,
174/// and r3 are dispatched and r2 and r3 complete, all events contained
175/// in all requests will not be acked until r1 has completed.
176///
177/// # Ordering
178/// Per partition ordering can be achieved by holding onto future of a request
179/// until it finishes. Until then all further requests in that partition are
180/// delayed.
181///
182/// Note: This has been deprecated, please do not use when creating new Sinks.
183#[pin_project]
184pub struct PartitionBatchSink<S, B, K>
185where
186    B: Batch,
187    S: Service<B::Output>,
188{
189    service: ServiceSink<S, B::Output>,
190    buffer: Option<(K, EncodedEvent<B::Input>)>,
191    batch: StatefulBatch<FinalizersBatch<B>>,
192    partitions: HashMap<K, StatefulBatch<FinalizersBatch<B>>>,
193    timeout: Duration,
194    lingers: HashMap<K, Pin<Box<Sleep>>>,
195    in_flight: Option<HashMap<K, BoxFuture<'static, ()>>>,
196    closing: bool,
197}
198
199impl<S, B, K> PartitionBatchSink<S, B, K>
200where
201    B: Batch,
202    B::Input: Partition<K>,
203    K: Hash + Eq + Clone + Send + 'static,
204    S: Service<B::Output>,
205    S::Future: Send + 'static,
206    S::Error: Into<crate::Error> + Send + 'static,
207    S::Response: Response + Send + 'static,
208{
209    pub fn new(service: S, batch: B, timeout: Duration) -> Self {
210        Self {
211            service: ServiceSink::new(service),
212            buffer: None,
213            batch: StatefulBatch::from(FinalizersBatch::from(batch)),
214            partitions: HashMap::new(),
215            timeout,
216            lingers: HashMap::new(),
217            in_flight: None,
218            closing: false,
219        }
220    }
221
222    /// Enforces per partition ordering of request.
223    pub fn ordered(&mut self) {
224        self.in_flight = Some(HashMap::new());
225    }
226}
227
228impl<S, B, K> Sink<EncodedEvent<B::Input>> for PartitionBatchSink<S, B, K>
229where
230    B: Batch,
231    B::Input: Partition<K>,
232    K: Hash + Eq + Clone + Send + 'static,
233    S: Service<B::Output>,
234    S::Future: Send + 'static,
235    S::Error: Into<crate::Error> + Send + 'static,
236    S::Response: Response + Send + 'static,
237{
238    type Error = crate::Error;
239
240    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
241        if self.buffer.is_some() {
242            match self.as_mut().poll_flush(cx) {
243                Poll::Ready(Ok(())) => {}
244                Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
245                Poll::Pending => {
246                    if self.buffer.is_some() {
247                        return Poll::Pending;
248                    }
249                }
250            }
251        }
252
253        Poll::Ready(Ok(()))
254    }
255
256    fn start_send(
257        mut self: Pin<&mut Self>,
258        item: EncodedEvent<B::Input>,
259    ) -> Result<(), Self::Error> {
260        let partition = item.item.partition();
261
262        let batch = loop {
263            if let Some(batch) = self.partitions.get_mut(&partition) {
264                break batch;
265            }
266
267            let batch = self.batch.fresh();
268            self.partitions.insert(partition.clone(), batch);
269
270            let delay = sleep(self.timeout);
271            self.lingers.insert(partition.clone(), Box::pin(delay));
272        };
273
274        if let PushResult::Overflow(item) = batch.push(item) {
275            self.buffer = Some((partition, item));
276        }
277
278        Ok(())
279    }
280
281    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
282        loop {
283            // Poll inner service while not ready, if we don't have buffer or any batch.
284            if self.buffer.is_none() && self.partitions.is_empty() {
285                ready!(self.service.poll_complete(cx));
286                return Poll::Ready(Ok(()));
287            }
288
289            // Try send batches.
290            let this = self.as_mut().project();
291            let mut partitions_ready = vec![];
292            for (partition, batch) in this.partitions.iter() {
293                if ((*this.closing && !batch.is_empty())
294                    || batch.was_full()
295                    || matches!(
296                        this.lingers
297                            .get_mut(partition)
298                            .expect("linger should exists for poll_flush")
299                            .poll_unpin(cx),
300                        Poll::Ready(())
301                    ))
302                    && this
303                        .in_flight
304                        .as_mut()
305                        .and_then(|map| map.get_mut(partition))
306                        .map(|req| matches!(req.poll_unpin(cx), Poll::Ready(())))
307                        .unwrap_or(true)
308                {
309                    partitions_ready.push(partition.clone());
310                }
311            }
312            let mut batch_consumed = false;
313            for partition in partitions_ready.iter() {
314                let service_ready = match this.service.poll_ready(cx) {
315                    Poll::Ready(Ok(())) => true,
316                    Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
317                    Poll::Pending => false,
318                };
319                if service_ready {
320                    trace!("Service ready; Sending batch.");
321
322                    let batch = this.partitions.remove(partition).unwrap();
323                    this.lingers.remove(partition);
324
325                    let batch = batch.finish();
326                    let future = tokio::spawn(this.service.call(batch));
327
328                    if let Some(map) = this.in_flight.as_mut() {
329                        map.insert(partition.clone(), future.map(|_| ()).fuse().boxed());
330                    }
331
332                    batch_consumed = true;
333                } else {
334                    break;
335                }
336            }
337            if batch_consumed {
338                continue;
339            }
340
341            // Cleanup of in flight futures
342            if let Some(in_flight) = this.in_flight.as_mut()
343                && in_flight.len() > this.partitions.len()
344            {
345                // There is at least one in flight future without a partition to check it
346                // so we will do it here.
347                let partitions = this.partitions;
348                in_flight.retain(|partition, req| {
349                    partitions.contains_key(partition) || req.poll_unpin(cx).is_pending()
350                });
351            }
352
353            // Try move item from buffer to batch.
354            if let Some((partition, item)) = self.buffer.take() {
355                if self.partitions.contains_key(&partition) {
356                    self.buffer = Some((partition, item));
357                } else {
358                    self.as_mut().start_send(item)?;
359
360                    if self.buffer.is_some() {
361                        unreachable!("Empty buffer overflowed.");
362                    }
363
364                    continue;
365                }
366            }
367
368            // Only poll inner service and return `Poll::Pending` anyway.
369            ready!(self.service.poll_complete(cx));
370            return Poll::Pending;
371        }
372    }
373
374    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375        trace!("Closing partition batch sink.");
376        self.closing = true;
377        self.poll_flush(cx)
378    }
379}
380
381impl<S, B, K> fmt::Debug for PartitionBatchSink<S, B, K>
382where
383    S: Service<B::Output> + fmt::Debug,
384    B: Batch + fmt::Debug,
385{
386    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387        f.debug_struct("PartitionBatchSink")
388            .field("service", &self.service)
389            .field("batch", &self.batch)
390            .field("timeout", &self.timeout)
391            .finish()
392    }
393}
394
395// === ServiceSink ===
396
397struct ServiceSink<S, Request> {
398    service: S,
399    in_flight: FuturesUnordered<oneshot::Receiver<()>>,
400    next_request_id: usize,
401    _pd: PhantomData<Request>,
402}
403
404impl<S, Request> ServiceSink<S, Request>
405where
406    S: Service<Request>,
407    S::Future: Send + 'static,
408    S::Error: Into<crate::Error> + Send + 'static,
409    S::Response: Response + Send + 'static,
410{
411    fn new(service: S) -> Self {
412        Self {
413            service,
414            in_flight: FuturesUnordered::new(),
415            next_request_id: 0,
416            _pd: PhantomData,
417        }
418    }
419
420    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
421        self.service.poll_ready(cx).map_err(Into::into)
422    }
423
424    fn call(&mut self, batch: EncodedBatch<Request>) -> BoxFuture<'static, ()> {
425        let EncodedBatch {
426            items,
427            finalizers,
428            count,
429            json_byte_size,
430            ..
431        } = batch;
432
433        let (tx, rx) = oneshot::channel();
434
435        self.in_flight.push(rx);
436
437        let request_id = self.next_request_id;
438        self.next_request_id = request_id.wrapping_add(1);
439
440        trace!(
441            message = "Submitting service request.",
442            in_flight_requests = self.in_flight.len()
443        );
444        let events_sent = register!(EventsSent::from(Output(None)));
445        self.service
446            .call(items)
447            .err_into()
448            .map(move |result| {
449                let status = result_status(&result);
450                finalizers.update_status(status);
451                match status {
452                    EventStatus::Delivered => {
453                        events_sent.emit(CountByteSize(count, json_byte_size));
454                        // TODO: Emit a BytesSent event here too
455                    }
456                    EventStatus::Rejected => {
457                        // Emit the `Error` and `EventsDropped` internal events.
458                        // This scenario occurs after retries have been attempted.
459                        let error = result.err().unwrap_or_else(|| "Response failed.".into());
460                        emit!(CallError {
461                            error,
462                            request_id,
463                            count,
464                        });
465                    }
466                    _ => {} // do nothing
467                }
468
469                // If the rx end is dropped we still completed
470                // the request so this is a weird case that we can
471                // ignore for now.
472                _ = tx.send(());
473            })
474            .instrument(info_span!("request", %request_id).or_current())
475            .boxed()
476    }
477
478    fn poll_complete(&mut self, cx: &mut Context<'_>) -> Poll<()> {
479        while !self.in_flight.is_empty() {
480            match ready!(Pin::new(&mut self.in_flight).poll_next(cx)) {
481                Some(Ok(())) => {}
482                Some(Err(_)) => panic!("ServiceSink service sender dropped."),
483                None => break,
484            }
485        }
486
487        Poll::Ready(())
488    }
489}
490
491impl<S, Request> fmt::Debug for ServiceSink<S, Request>
492where
493    S: fmt::Debug,
494{
495    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496        f.debug_struct("ServiceSink")
497            .field("service", &self.service)
498            .finish()
499    }
500}
501
502// === Response ===
503
504pub trait ServiceLogic: Clone {
505    type Response: Response;
506    fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus;
507}
508
509#[derive(Derivative)]
510#[derivative(Clone)]
511pub struct StdServiceLogic<R> {
512    _pd: PhantomData<R>,
513}
514
515impl<R> Default for StdServiceLogic<R> {
516    fn default() -> Self {
517        Self { _pd: PhantomData }
518    }
519}
520
521impl<R> ServiceLogic for StdServiceLogic<R>
522where
523    R: Response + Send,
524{
525    type Response = R;
526
527    fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus {
528        result_status(result)
529    }
530}
531
532fn result_status<R: Response + Send>(result: &crate::Result<R>) -> EventStatus {
533    match result {
534        Ok(response) => {
535            if response.is_successful() {
536                trace!(message = "Response successful.", ?response);
537                EventStatus::Delivered
538            } else if response.is_transient() {
539                error!(message = "Response wasn't successful.", ?response);
540                EventStatus::Errored
541            } else {
542                error!(message = "Response failed.", ?response);
543                EventStatus::Rejected
544            }
545        }
546        Err(error) => {
547            error!(message = "Request failed.", %error);
548            EventStatus::Errored
549        }
550    }
551}
552
553// === Response ===
554
555pub trait Response: fmt::Debug {
556    fn is_successful(&self) -> bool {
557        true
558    }
559
560    fn is_transient(&self) -> bool {
561        true
562    }
563}
564
565impl Response for () {}
566
567impl Response for &str {}
568
569#[cfg(test)]
570mod tests {
571    use std::{
572        convert::Infallible,
573        sync::{
574            Arc, Mutex,
575            atomic::{AtomicUsize, Ordering::Relaxed},
576        },
577    };
578
579    use bytes::Bytes;
580    use futures::{SinkExt, StreamExt, future, stream, task::noop_waker_ref};
581    use tokio::{task::yield_now, time::Instant};
582    use vector_lib::{
583        finalization::{BatchNotifier, BatchStatus, EventFinalizer, EventFinalizers},
584        json_size::JsonSize,
585    };
586
587    use super::*;
588    use crate::{
589        sinks::util::{BatchSettings, EncodedLength, VecBuffer},
590        test_util::trace_init,
591    };
592
593    const TIMEOUT: Duration = Duration::from_secs(10);
594
595    impl EncodedLength for usize {
596        fn encoded_length(&self) -> usize {
597            22
598        }
599    }
600
601    async fn advance_time(duration: Duration) {
602        tokio::time::pause();
603        tokio::time::advance(duration).await;
604        tokio::time::resume();
605    }
606
607    type Counter = Arc<AtomicUsize>;
608
609    struct Request(usize, EventFinalizers);
610
611    impl Request {
612        fn new(value: usize, counter: &Counter) -> Self {
613            let (batch, receiver) = BatchNotifier::new_with_receiver();
614            let counter = Arc::clone(counter);
615            tokio::spawn(async move {
616                if receiver.await == BatchStatus::Delivered {
617                    counter.fetch_add(value, Relaxed);
618                }
619            });
620            Self(value, EventFinalizers::new(EventFinalizer::new(batch)))
621        }
622
623        fn encoded(value: usize, counter: &Counter) -> EncodedEvent<Self> {
624            let mut item = Self::new(value, counter);
625            let finalizers = std::mem::take(&mut item.1);
626            EncodedEvent {
627                item,
628                finalizers,
629                json_byte_size: JsonSize::zero(),
630                byte_size: 0,
631            }
632        }
633    }
634
635    impl EncodedLength for Request {
636        fn encoded_length(&self) -> usize {
637            22
638        }
639    }
640
641    #[tokio::test]
642    async fn batch_sink_acking_sequential() {
643        let ack_counter = Counter::default();
644
645        let svc = tower::service_fn(|_| future::ok::<_, std::io::Error>(()));
646        let mut batch_settings = BatchSettings::default();
647        batch_settings.size.bytes = 9999;
648        batch_settings.size.events = 10;
649
650        let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
651
652        buffered
653            .sink_map_err(drop)
654            .send_all(
655                &mut stream::iter(1..=22).map(|item| Ok(Request::encoded(item, &ack_counter))),
656            )
657            .await
658            .unwrap();
659
660        assert_eq!(ack_counter.load(Relaxed), 22 * 23 / 2);
661    }
662
663    #[tokio::test]
664    async fn batch_sink_acking_unordered() {
665        let ack_counter = Counter::default();
666
667        trace_init();
668
669        // Services future will be spawned and work between `yield_now` calls.
670        let svc = tower::service_fn(|req: Vec<Request>| async move {
671            let duration = match req[0].0 {
672                1..=3 => Duration::from_secs(1),
673
674                // The 4th request will introduce some sort of
675                // latency spike to ensure later events don't
676                // get acked.
677                4 => Duration::from_secs(5),
678                5 | 6 => Duration::from_secs(1),
679                _ => unreachable!(),
680            };
681
682            sleep(duration).await;
683            Ok::<(), Infallible>(())
684        });
685
686        let mut batch_settings = BatchSettings::default();
687        batch_settings.size.bytes = 9999;
688        batch_settings.size.events = 1;
689
690        let mut sink = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
691
692        let mut cx = Context::from_waker(noop_waker_ref());
693        for item in 1..=3 {
694            assert!(matches!(
695                sink.poll_ready_unpin(&mut cx),
696                Poll::Ready(Ok(()))
697            ));
698            assert!(matches!(
699                sink.start_send_unpin(Request::encoded(item, &ack_counter)),
700                Ok(())
701            ));
702        }
703
704        // Clear internal buffer
705        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
706        assert_eq!(ack_counter.load(Relaxed), 0);
707
708        yield_now().await;
709        advance_time(Duration::from_secs(3)).await;
710        yield_now().await;
711
712        for _ in 1..=3 {
713            assert!(matches!(
714                sink.poll_flush_unpin(&mut cx),
715                Poll::Ready(Ok(()))
716            ));
717        }
718
719        // Events 1,2,3 should have been acked at this point.
720        assert_eq!(ack_counter.load(Relaxed), 6);
721
722        for item in 4..=6 {
723            assert!(matches!(
724                sink.poll_ready_unpin(&mut cx),
725                Poll::Ready(Ok(()))
726            ));
727            assert!(matches!(
728                sink.start_send_unpin(Request::encoded(item, &ack_counter)),
729                Ok(())
730            ));
731        }
732
733        // Clear internal buffer
734        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
735        assert_eq!(ack_counter.load(Relaxed), 6);
736
737        yield_now().await;
738        advance_time(Duration::from_secs(2)).await;
739        yield_now().await;
740
741        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
742
743        // Check that events 1-3,5,6 have been acked
744        assert_eq!(ack_counter.load(Relaxed), 17);
745
746        yield_now().await;
747        advance_time(Duration::from_secs(5)).await;
748        yield_now().await;
749
750        for _ in 4..=6 {
751            assert!(matches!(
752                sink.poll_flush_unpin(&mut cx),
753                Poll::Ready(Ok(()))
754            ));
755        }
756
757        assert_eq!(ack_counter.load(Relaxed), 21);
758    }
759
760    #[tokio::test]
761    async fn batch_sink_buffers_messages_until_limit() {
762        let sent_requests = Arc::new(Mutex::new(Vec::new()));
763
764        let svc = tower::service_fn(|req| {
765            let sent_requests = Arc::clone(&sent_requests);
766
767            sent_requests.lock().unwrap().push(req);
768
769            future::ok::<_, std::io::Error>(())
770        });
771
772        let mut batch_settings = BatchSettings::default();
773        batch_settings.size.bytes = 9999;
774        batch_settings.size.events = 10;
775        let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
776
777        buffered
778            .sink_map_err(drop)
779            .send_all(
780                &mut stream::iter(0..22)
781                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
782            )
783            .await
784            .unwrap();
785
786        let output = sent_requests.lock().unwrap();
787        assert_eq!(
788            &*output,
789            &vec![
790                vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
791                vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
792                vec![20, 21]
793            ]
794        );
795    }
796
797    #[tokio::test]
798    async fn batch_sink_flushes_below_min_on_close() {
799        let sent_requests = Arc::new(Mutex::new(Vec::new()));
800
801        let svc = tower::service_fn(|req| {
802            let sent_requests = Arc::clone(&sent_requests);
803            sent_requests.lock().unwrap().push(req);
804            future::ok::<_, std::io::Error>(())
805        });
806
807        let mut batch_settings = BatchSettings::default();
808        batch_settings.size.bytes = 9999;
809        batch_settings.size.events = 10;
810        let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
811
812        let mut cx = Context::from_waker(noop_waker_ref());
813        assert!(matches!(
814            buffered.poll_ready_unpin(&mut cx),
815            Poll::Ready(Ok(()))
816        ));
817        assert!(matches!(
818            buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
819            Ok(())
820        ));
821        assert!(matches!(
822            buffered.poll_ready_unpin(&mut cx),
823            Poll::Ready(Ok(()))
824        ));
825        assert!(matches!(
826            buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
827            Ok(())
828        ));
829
830        buffered.close().await.unwrap();
831
832        let output = sent_requests.lock().unwrap();
833        assert_eq!(&*output, &vec![vec![0, 1]]);
834    }
835
836    #[tokio::test]
837    async fn batch_sink_expired_linger() {
838        let sent_requests = Arc::new(Mutex::new(Vec::new()));
839
840        let svc = tower::service_fn(|req| {
841            let sent_requests = Arc::clone(&sent_requests);
842            sent_requests.lock().unwrap().push(req);
843            future::ok::<_, std::io::Error>(())
844        });
845
846        let mut batch_settings = BatchSettings::default();
847        batch_settings.size.bytes = 9999;
848        batch_settings.size.events = 10;
849        let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
850
851        let mut cx = Context::from_waker(noop_waker_ref());
852        assert!(matches!(
853            buffered.poll_ready_unpin(&mut cx),
854            Poll::Ready(Ok(()))
855        ));
856        assert!(matches!(
857            buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
858            Ok(())
859        ));
860        assert!(matches!(
861            buffered.poll_ready_unpin(&mut cx),
862            Poll::Ready(Ok(()))
863        ));
864        assert!(matches!(
865            buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
866            Ok(())
867        ));
868
869        // Move clock forward by linger timeout + 1 sec
870        advance_time(TIMEOUT + Duration::from_secs(1)).await;
871
872        // Flush buffer and make sure that this didn't take long time (because linger elapsed).
873        let start = Instant::now();
874        buffered.flush().await.unwrap();
875        let elapsed = start.duration_since(start);
876        assert!(elapsed < Duration::from_millis(200));
877
878        let output = sent_requests.lock().unwrap();
879        assert_eq!(&*output, &vec![vec![0, 1]]);
880    }
881
882    #[tokio::test]
883    async fn partition_batch_sink_buffers_messages_until_limit() {
884        let sent_requests = Arc::new(Mutex::new(Vec::new()));
885
886        let svc = tower::service_fn(|req| {
887            let sent_requests = Arc::clone(&sent_requests);
888            sent_requests.lock().unwrap().push(req);
889            future::ok::<_, std::io::Error>(())
890        });
891
892        let mut batch_settings = BatchSettings::default();
893        batch_settings.size.bytes = 9999;
894        batch_settings.size.events = 10;
895
896        let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
897
898        sink.sink_map_err(drop)
899            .send_all(
900                &mut stream::iter(0..22)
901                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
902            )
903            .await
904            .unwrap();
905
906        let output = sent_requests.lock().unwrap();
907        assert_eq!(
908            &*output,
909            &vec![
910                vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
911                vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
912                vec![20, 21]
913            ]
914        );
915    }
916
917    #[tokio::test]
918    async fn partition_batch_sink_buffers_by_partition_buffer_size_one() {
919        let sent_requests = Arc::new(Mutex::new(Vec::new()));
920
921        let svc = tower::service_fn(|req| {
922            let sent_requests = Arc::clone(&sent_requests);
923            sent_requests.lock().unwrap().push(req);
924            future::ok::<_, std::io::Error>(())
925        });
926
927        let mut batch_settings = BatchSettings::default();
928        batch_settings.size.bytes = 9999;
929        batch_settings.size.events = 1;
930
931        let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
932
933        let input = vec![Partitions::A, Partitions::B];
934        sink.sink_map_err(drop)
935            .send_all(
936                &mut stream::iter(input)
937                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
938            )
939            .await
940            .unwrap();
941
942        let mut output = sent_requests.lock().unwrap();
943        output[..].sort();
944        assert_eq!(&*output, &vec![vec![Partitions::A], vec![Partitions::B]]);
945    }
946
947    #[tokio::test]
948    async fn partition_batch_sink_buffers_by_partition_buffer_size_two() {
949        let sent_requests = Arc::new(Mutex::new(Vec::new()));
950
951        let svc = tower::service_fn(|req| {
952            let sent_requests = Arc::clone(&sent_requests);
953            sent_requests.lock().unwrap().push(req);
954            future::ok::<_, std::io::Error>(())
955        });
956
957        let mut batch_settings = BatchSettings::default();
958        batch_settings.size.bytes = 9999;
959        batch_settings.size.events = 2;
960
961        let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
962
963        let input = vec![Partitions::A, Partitions::B, Partitions::A, Partitions::B];
964        sink.sink_map_err(drop)
965            .send_all(
966                &mut stream::iter(input)
967                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
968            )
969            .await
970            .unwrap();
971
972        let mut output = sent_requests.lock().unwrap();
973        output[..].sort();
974        assert_eq!(
975            &*output,
976            &vec![
977                vec![Partitions::A, Partitions::A],
978                vec![Partitions::B, Partitions::B]
979            ]
980        );
981    }
982
983    #[tokio::test]
984    async fn partition_batch_sink_submits_after_linger() {
985        let sent_requests = Arc::new(Mutex::new(Vec::new()));
986
987        let svc = tower::service_fn(|req| {
988            let sent_requests = Arc::clone(&sent_requests);
989            sent_requests.lock().unwrap().push(req);
990            future::ok::<_, std::io::Error>(())
991        });
992
993        let mut batch_settings = BatchSettings::default();
994        batch_settings.size.bytes = 9999;
995        batch_settings.size.events = 10;
996
997        let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
998
999        let mut cx = Context::from_waker(noop_waker_ref());
1000        assert!(matches!(
1001            sink.poll_ready_unpin(&mut cx),
1002            Poll::Ready(Ok(()))
1003        ));
1004        assert!(matches!(
1005            sink.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
1006            Ok(())
1007        ));
1008        assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
1009
1010        advance_time(TIMEOUT + Duration::from_secs(1)).await;
1011
1012        let start = Instant::now();
1013        sink.flush().await.unwrap();
1014        let elapsed = start.duration_since(start);
1015        assert!(elapsed < Duration::from_millis(200));
1016
1017        let output = sent_requests.lock().unwrap();
1018        assert_eq!(&*output, &vec![vec![1]]);
1019    }
1020
1021    #[tokio::test]
1022    async fn service_sink_doesnt_propagate_error() {
1023        let ack_counter = Counter::default();
1024
1025        // We need a mock executor here because we need to ensure
1026        // that we poll the service futures within the mock clock
1027        // context. This allows us to manually advance the time on the
1028        // "spawned" futures.
1029        let svc = tower::service_fn(|req: Request| {
1030            if req.0 == 3 {
1031                future::err("bad")
1032            } else {
1033                future::ok("good")
1034            }
1035        });
1036        let mut sink = ServiceSink::new(svc);
1037        let req = |items: usize| {
1038            let mut req = Request::new(items, &ack_counter);
1039            let finalizers = std::mem::take(&mut req.1);
1040            EncodedBatch {
1041                items: req,
1042                finalizers,
1043                count: items,
1044                byte_size: 1,
1045                json_byte_size: JsonSize::new(1),
1046            }
1047        };
1048
1049        // send some initial requests
1050        let mut fut1 = sink.call(req(1));
1051        let mut fut2 = sink.call(req(2));
1052
1053        assert_eq!(ack_counter.load(Relaxed), 0);
1054
1055        let mut cx = Context::from_waker(noop_waker_ref());
1056        assert!(matches!(fut1.poll_unpin(&mut cx), Poll::Ready(())));
1057        assert!(matches!(fut2.poll_unpin(&mut cx), Poll::Ready(())));
1058        assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
1059
1060        yield_now().await;
1061        assert_eq!(ack_counter.load(Relaxed), 3);
1062
1063        // send one request that will error and one normal
1064        let mut fut3 = sink.call(req(3)); // I will error
1065        let mut fut4 = sink.call(req(4));
1066
1067        // make sure they all "worked"
1068        assert!(matches!(fut3.poll_unpin(&mut cx), Poll::Ready(())));
1069        assert!(matches!(fut4.poll_unpin(&mut cx), Poll::Ready(())));
1070        assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
1071
1072        yield_now().await;
1073        assert_eq!(ack_counter.load(Relaxed), 7);
1074    }
1075
1076    #[tokio::test]
1077    async fn partition_batch_sink_ordering_per_partition() {
1078        let sent_requests = Arc::new(Mutex::new(Vec::new()));
1079
1080        let mut delay = true;
1081        let svc = tower::service_fn(|req| {
1082            let sent_requests = Arc::clone(&sent_requests);
1083            if delay {
1084                // Delay and then error
1085                delay = false;
1086                sleep(Duration::from_secs(1))
1087                    .map(move |_| {
1088                        sent_requests.lock().unwrap().push(req);
1089                        Result::<_, std::io::Error>::Ok(())
1090                    })
1091                    .boxed()
1092            } else {
1093                sent_requests.lock().unwrap().push(req);
1094                future::ok::<_, std::io::Error>(()).boxed()
1095            }
1096        });
1097
1098        let mut batch_settings = BatchSettings::default();
1099        batch_settings.size.bytes = 9999;
1100        batch_settings.size.events = 10;
1101
1102        let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
1103        sink.ordered();
1104
1105        let input = (0..20).map(|i| (0, i)).chain((0..20).map(|i| (1, i)));
1106        sink.sink_map_err(drop)
1107            .send_all(
1108                &mut stream::iter(input)
1109                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
1110            )
1111            .await
1112            .unwrap();
1113
1114        let output = sent_requests.lock().unwrap();
1115        // We sended '0' partition first and delayed sending only first request, first 10 events,
1116        // which should delay sending the second batch of events in the same partition until
1117        // the first one succeeds.
1118        assert_eq!(
1119            &*output,
1120            &vec![
1121                (0..10).map(|i| (1, i)).collect::<Vec<_>>(),
1122                (10..20).map(|i| (1, i)).collect(),
1123                (0..10).map(|i| (0, i)).collect(),
1124                (10..20).map(|i| (0, i)).collect(),
1125            ]
1126        );
1127    }
1128
1129    #[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
1130    enum Partitions {
1131        A,
1132        B,
1133    }
1134
1135    impl EncodedLength for Partitions {
1136        fn encoded_length(&self) -> usize {
1137            10 // Dummy value
1138        }
1139    }
1140
1141    impl Partition<Bytes> for Partitions {
1142        fn partition(&self) -> Bytes {
1143            format!("{self:?}").into()
1144        }
1145    }
1146
1147    impl Partition<Bytes> for usize {
1148        fn partition(&self) -> Bytes {
1149            "key".into()
1150        }
1151    }
1152
1153    impl Partition<Bytes> for (usize, usize) {
1154        fn partition(&self) -> Bytes {
1155            self.0.to_string().into()
1156        }
1157    }
1158
1159    impl EncodedLength for (usize, usize) {
1160        fn encoded_length(&self) -> usize {
1161            16
1162        }
1163    }
1164}