1use std::{
35 collections::HashMap,
36 fmt,
37 hash::Hash,
38 marker::PhantomData,
39 pin::Pin,
40 task::{ready, Context, Poll},
41};
42
43use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, Sink, Stream, TryFutureExt};
44use pin_project::pin_project;
45use tokio::{
46 sync::oneshot,
47 time::{sleep, Duration, Sleep},
48};
49use tower::{Service, ServiceBuilder};
50use tracing::Instrument;
51use vector_lib::internal_event::{
52 CallError, CountByteSize, EventsSent, InternalEventHandle as _, Output,
53};
54pub use vector_lib::sink::StreamSink;
56
57use super::{
58 batch::{Batch, EncodedBatch, FinalizersBatch, PushResult, StatefulBatch},
59 buffer::{Partition, PartitionBuffer, PartitionInnerBuffer},
60 service::{Map, ServiceBuilderExt},
61 EncodedEvent,
62};
63use crate::event::EventStatus;
64
65#[pin_project]
84#[derive(Debug)]
85pub struct BatchSink<S, B>
86where
87 S: Service<B::Output>,
88 B: Batch,
89{
90 #[pin]
91 inner: PartitionBatchSink<
92 Map<S, PartitionInnerBuffer<B::Output, ()>, B::Output>,
93 PartitionBuffer<B, ()>,
94 (),
95 >,
96}
97
98impl<S, B> BatchSink<S, B>
99where
100 S: Service<B::Output>,
101 S::Future: Send + 'static,
102 S::Error: Into<crate::Error> + Send + 'static,
103 S::Response: Response + Send + 'static,
104 B: Batch,
105{
106 pub fn new(service: S, batch: B, timeout: Duration) -> Self {
107 let service = ServiceBuilder::new()
108 .map(|req: PartitionInnerBuffer<B::Output, ()>| req.into_parts().0)
109 .service(service);
110 let batch = PartitionBuffer::new(batch);
111 let inner = PartitionBatchSink::new(service, batch, timeout);
112 Self { inner }
113 }
114}
115
116#[cfg(test)]
117impl<S, B> BatchSink<S, B>
118where
119 S: Service<B::Output>,
120 B: Batch,
121{
122 pub const fn get_ref(&self) -> &S {
123 &self.inner.service.service.inner
124 }
125}
126
127impl<S, B> Sink<EncodedEvent<B::Input>> for BatchSink<S, B>
128where
129 S: Service<B::Output>,
130 S::Future: Send + 'static,
131 S::Error: Into<crate::Error> + Send + 'static,
132 S::Response: Response + Send + 'static,
133 B: Batch,
134{
135 type Error = crate::Error;
136
137 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 self.project().inner.poll_ready(cx)
139 }
140
141 fn start_send(self: Pin<&mut Self>, item: EncodedEvent<B::Input>) -> Result<(), Self::Error> {
142 self.project()
143 .inner
144 .start_send(item.map(|item| PartitionInnerBuffer::new(item, ())))
145 }
146
147 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148 self.project().inner.poll_flush(cx)
149 }
150
151 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152 self.project().inner.poll_close(cx)
153 }
154}
155
156#[pin_project]
184pub struct PartitionBatchSink<S, B, K>
185where
186 B: Batch,
187 S: Service<B::Output>,
188{
189 service: ServiceSink<S, B::Output>,
190 buffer: Option<(K, EncodedEvent<B::Input>)>,
191 batch: StatefulBatch<FinalizersBatch<B>>,
192 partitions: HashMap<K, StatefulBatch<FinalizersBatch<B>>>,
193 timeout: Duration,
194 lingers: HashMap<K, Pin<Box<Sleep>>>,
195 in_flight: Option<HashMap<K, BoxFuture<'static, ()>>>,
196 closing: bool,
197}
198
199impl<S, B, K> PartitionBatchSink<S, B, K>
200where
201 B: Batch,
202 B::Input: Partition<K>,
203 K: Hash + Eq + Clone + Send + 'static,
204 S: Service<B::Output>,
205 S::Future: Send + 'static,
206 S::Error: Into<crate::Error> + Send + 'static,
207 S::Response: Response + Send + 'static,
208{
209 pub fn new(service: S, batch: B, timeout: Duration) -> Self {
210 Self {
211 service: ServiceSink::new(service),
212 buffer: None,
213 batch: StatefulBatch::from(FinalizersBatch::from(batch)),
214 partitions: HashMap::new(),
215 timeout,
216 lingers: HashMap::new(),
217 in_flight: None,
218 closing: false,
219 }
220 }
221
222 pub fn ordered(&mut self) {
224 self.in_flight = Some(HashMap::new());
225 }
226}
227
228impl<S, B, K> Sink<EncodedEvent<B::Input>> for PartitionBatchSink<S, B, K>
229where
230 B: Batch,
231 B::Input: Partition<K>,
232 K: Hash + Eq + Clone + Send + 'static,
233 S: Service<B::Output>,
234 S::Future: Send + 'static,
235 S::Error: Into<crate::Error> + Send + 'static,
236 S::Response: Response + Send + 'static,
237{
238 type Error = crate::Error;
239
240 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
241 if self.buffer.is_some() {
242 match self.as_mut().poll_flush(cx) {
243 Poll::Ready(Ok(())) => {}
244 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
245 Poll::Pending => {
246 if self.buffer.is_some() {
247 return Poll::Pending;
248 }
249 }
250 }
251 }
252
253 Poll::Ready(Ok(()))
254 }
255
256 fn start_send(
257 mut self: Pin<&mut Self>,
258 item: EncodedEvent<B::Input>,
259 ) -> Result<(), Self::Error> {
260 let partition = item.item.partition();
261
262 let batch = loop {
263 if let Some(batch) = self.partitions.get_mut(&partition) {
264 break batch;
265 }
266
267 let batch = self.batch.fresh();
268 self.partitions.insert(partition.clone(), batch);
269
270 let delay = sleep(self.timeout);
271 self.lingers.insert(partition.clone(), Box::pin(delay));
272 };
273
274 if let PushResult::Overflow(item) = batch.push(item) {
275 self.buffer = Some((partition, item));
276 }
277
278 Ok(())
279 }
280
281 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
282 loop {
283 if self.buffer.is_none() && self.partitions.is_empty() {
285 ready!(self.service.poll_complete(cx));
286 return Poll::Ready(Ok(()));
287 }
288
289 let this = self.as_mut().project();
291 let mut partitions_ready = vec![];
292 for (partition, batch) in this.partitions.iter() {
293 if ((*this.closing && !batch.is_empty())
294 || batch.was_full()
295 || matches!(
296 this.lingers
297 .get_mut(partition)
298 .expect("linger should exists for poll_flush")
299 .poll_unpin(cx),
300 Poll::Ready(())
301 ))
302 && this
303 .in_flight
304 .as_mut()
305 .and_then(|map| map.get_mut(partition))
306 .map(|req| matches!(req.poll_unpin(cx), Poll::Ready(())))
307 .unwrap_or(true)
308 {
309 partitions_ready.push(partition.clone());
310 }
311 }
312 let mut batch_consumed = false;
313 for partition in partitions_ready.iter() {
314 let service_ready = match this.service.poll_ready(cx) {
315 Poll::Ready(Ok(())) => true,
316 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
317 Poll::Pending => false,
318 };
319 if service_ready {
320 trace!("Service ready; Sending batch.");
321
322 let batch = this.partitions.remove(partition).unwrap();
323 this.lingers.remove(partition);
324
325 let batch = batch.finish();
326 let future = tokio::spawn(this.service.call(batch));
327
328 if let Some(map) = this.in_flight.as_mut() {
329 map.insert(partition.clone(), future.map(|_| ()).fuse().boxed());
330 }
331
332 batch_consumed = true;
333 } else {
334 break;
335 }
336 }
337 if batch_consumed {
338 continue;
339 }
340
341 if let Some(in_flight) = this.in_flight.as_mut() {
343 if in_flight.len() > this.partitions.len() {
344 let partitions = this.partitions;
347 in_flight.retain(|partition, req| {
348 partitions.contains_key(partition) || req.poll_unpin(cx).is_pending()
349 });
350 }
351 }
352
353 if let Some((partition, item)) = self.buffer.take() {
355 if self.partitions.contains_key(&partition) {
356 self.buffer = Some((partition, item));
357 } else {
358 self.as_mut().start_send(item)?;
359
360 if self.buffer.is_some() {
361 unreachable!("Empty buffer overflowed.");
362 }
363
364 continue;
365 }
366 }
367
368 ready!(self.service.poll_complete(cx));
370 return Poll::Pending;
371 }
372 }
373
374 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375 trace!("Closing partition batch sink.");
376 self.closing = true;
377 self.poll_flush(cx)
378 }
379}
380
381impl<S, B, K> fmt::Debug for PartitionBatchSink<S, B, K>
382where
383 S: Service<B::Output> + fmt::Debug,
384 B: Batch + fmt::Debug,
385{
386 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387 f.debug_struct("PartitionBatchSink")
388 .field("service", &self.service)
389 .field("batch", &self.batch)
390 .field("timeout", &self.timeout)
391 .finish()
392 }
393}
394
395struct ServiceSink<S, Request> {
398 service: S,
399 in_flight: FuturesUnordered<oneshot::Receiver<()>>,
400 next_request_id: usize,
401 _pd: PhantomData<Request>,
402}
403
404impl<S, Request> ServiceSink<S, Request>
405where
406 S: Service<Request>,
407 S::Future: Send + 'static,
408 S::Error: Into<crate::Error> + Send + 'static,
409 S::Response: Response + Send + 'static,
410{
411 fn new(service: S) -> Self {
412 Self {
413 service,
414 in_flight: FuturesUnordered::new(),
415 next_request_id: 0,
416 _pd: PhantomData,
417 }
418 }
419
420 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
421 self.service.poll_ready(cx).map_err(Into::into)
422 }
423
424 fn call(&mut self, batch: EncodedBatch<Request>) -> BoxFuture<'static, ()> {
425 let EncodedBatch {
426 items,
427 finalizers,
428 count,
429 json_byte_size,
430 ..
431 } = batch;
432
433 let (tx, rx) = oneshot::channel();
434
435 self.in_flight.push(rx);
436
437 let request_id = self.next_request_id;
438 self.next_request_id = request_id.wrapping_add(1);
439
440 trace!(
441 message = "Submitting service request.",
442 in_flight_requests = self.in_flight.len()
443 );
444 let events_sent = register!(EventsSent::from(Output(None)));
445 self.service
446 .call(items)
447 .err_into()
448 .map(move |result| {
449 let status = result_status(&result);
450 finalizers.update_status(status);
451 match status {
452 EventStatus::Delivered => {
453 events_sent.emit(CountByteSize(count, json_byte_size));
454 }
456 EventStatus::Rejected => {
457 let error = result.err().unwrap_or_else(|| "Response failed.".into());
460 emit!(CallError {
461 error,
462 request_id,
463 count,
464 });
465 }
466 _ => {} }
468
469 _ = tx.send(());
473 })
474 .instrument(info_span!("request", %request_id).or_current())
475 .boxed()
476 }
477
478 fn poll_complete(&mut self, cx: &mut Context<'_>) -> Poll<()> {
479 while !self.in_flight.is_empty() {
480 match ready!(Pin::new(&mut self.in_flight).poll_next(cx)) {
481 Some(Ok(())) => {}
482 Some(Err(_)) => panic!("ServiceSink service sender dropped."),
483 None => break,
484 }
485 }
486
487 Poll::Ready(())
488 }
489}
490
491impl<S, Request> fmt::Debug for ServiceSink<S, Request>
492where
493 S: fmt::Debug,
494{
495 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496 f.debug_struct("ServiceSink")
497 .field("service", &self.service)
498 .finish()
499 }
500}
501
502pub trait ServiceLogic: Clone {
505 type Response: Response;
506 fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus;
507}
508
509#[derive(Derivative)]
510#[derivative(Clone)]
511pub struct StdServiceLogic<R> {
512 _pd: PhantomData<R>,
513}
514
515impl<R> Default for StdServiceLogic<R> {
516 fn default() -> Self {
517 Self { _pd: PhantomData }
518 }
519}
520
521impl<R> ServiceLogic for StdServiceLogic<R>
522where
523 R: Response + Send,
524{
525 type Response = R;
526
527 fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus {
528 result_status(result)
529 }
530}
531
532fn result_status<R: Response + Send>(result: &crate::Result<R>) -> EventStatus {
533 match result {
534 Ok(response) => {
535 if response.is_successful() {
536 trace!(message = "Response successful.", ?response);
537 EventStatus::Delivered
538 } else if response.is_transient() {
539 error!(message = "Response wasn't successful.", ?response);
540 EventStatus::Errored
541 } else {
542 error!(message = "Response failed.", ?response);
543 EventStatus::Rejected
544 }
545 }
546 Err(error) => {
547 error!(message = "Request failed.", %error);
548 EventStatus::Errored
549 }
550 }
551}
552
553pub trait Response: fmt::Debug {
556 fn is_successful(&self) -> bool {
557 true
558 }
559
560 fn is_transient(&self) -> bool {
561 true
562 }
563}
564
565impl Response for () {}
566
567impl Response for &str {}
568
569#[cfg(test)]
570mod tests {
571 use std::{
572 convert::Infallible,
573 sync::{atomic::AtomicUsize, atomic::Ordering::Relaxed, Arc, Mutex},
574 };
575
576 use bytes::Bytes;
577 use futures::{future, stream, task::noop_waker_ref, SinkExt, StreamExt};
578 use tokio::{task::yield_now, time::Instant};
579 use vector_lib::{
580 finalization::{BatchNotifier, BatchStatus, EventFinalizer, EventFinalizers},
581 json_size::JsonSize,
582 };
583
584 use super::*;
585 use crate::{
586 sinks::util::{BatchSettings, EncodedLength, VecBuffer},
587 test_util::trace_init,
588 };
589
590 const TIMEOUT: Duration = Duration::from_secs(10);
591
592 impl EncodedLength for usize {
593 fn encoded_length(&self) -> usize {
594 22
595 }
596 }
597
598 async fn advance_time(duration: Duration) {
599 tokio::time::pause();
600 tokio::time::advance(duration).await;
601 tokio::time::resume();
602 }
603
604 type Counter = Arc<AtomicUsize>;
605
606 struct Request(usize, EventFinalizers);
607
608 impl Request {
609 fn new(value: usize, counter: &Counter) -> Self {
610 let (batch, receiver) = BatchNotifier::new_with_receiver();
611 let counter = Arc::clone(counter);
612 tokio::spawn(async move {
613 if receiver.await == BatchStatus::Delivered {
614 counter.fetch_add(value, Relaxed);
615 }
616 });
617 Self(value, EventFinalizers::new(EventFinalizer::new(batch)))
618 }
619
620 fn encoded(value: usize, counter: &Counter) -> EncodedEvent<Self> {
621 let mut item = Self::new(value, counter);
622 let finalizers = std::mem::take(&mut item.1);
623 EncodedEvent {
624 item,
625 finalizers,
626 json_byte_size: JsonSize::zero(),
627 byte_size: 0,
628 }
629 }
630 }
631
632 impl EncodedLength for Request {
633 fn encoded_length(&self) -> usize {
634 22
635 }
636 }
637
638 #[tokio::test]
639 async fn batch_sink_acking_sequential() {
640 let ack_counter = Counter::default();
641
642 let svc = tower::service_fn(|_| future::ok::<_, std::io::Error>(()));
643 let mut batch_settings = BatchSettings::default();
644 batch_settings.size.bytes = 9999;
645 batch_settings.size.events = 10;
646
647 let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
648
649 buffered
650 .sink_map_err(drop)
651 .send_all(
652 &mut stream::iter(1..=22).map(|item| Ok(Request::encoded(item, &ack_counter))),
653 )
654 .await
655 .unwrap();
656
657 assert_eq!(ack_counter.load(Relaxed), 22 * 23 / 2);
658 }
659
660 #[tokio::test]
661 async fn batch_sink_acking_unordered() {
662 let ack_counter = Counter::default();
663
664 trace_init();
665
666 let svc = tower::service_fn(|req: Vec<Request>| async move {
668 let duration = match req[0].0 {
669 1..=3 => Duration::from_secs(1),
670
671 4 => Duration::from_secs(5),
675 5 | 6 => Duration::from_secs(1),
676 _ => unreachable!(),
677 };
678
679 sleep(duration).await;
680 Ok::<(), Infallible>(())
681 });
682
683 let mut batch_settings = BatchSettings::default();
684 batch_settings.size.bytes = 9999;
685 batch_settings.size.events = 1;
686
687 let mut sink = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
688
689 let mut cx = Context::from_waker(noop_waker_ref());
690 for item in 1..=3 {
691 assert!(matches!(
692 sink.poll_ready_unpin(&mut cx),
693 Poll::Ready(Ok(()))
694 ));
695 assert!(matches!(
696 sink.start_send_unpin(Request::encoded(item, &ack_counter)),
697 Ok(())
698 ));
699 }
700
701 assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
703 assert_eq!(ack_counter.load(Relaxed), 0);
704
705 yield_now().await;
706 advance_time(Duration::from_secs(3)).await;
707 yield_now().await;
708
709 for _ in 1..=3 {
710 assert!(matches!(
711 sink.poll_flush_unpin(&mut cx),
712 Poll::Ready(Ok(()))
713 ));
714 }
715
716 assert_eq!(ack_counter.load(Relaxed), 6);
718
719 for item in 4..=6 {
720 assert!(matches!(
721 sink.poll_ready_unpin(&mut cx),
722 Poll::Ready(Ok(()))
723 ));
724 assert!(matches!(
725 sink.start_send_unpin(Request::encoded(item, &ack_counter)),
726 Ok(())
727 ));
728 }
729
730 assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
732 assert_eq!(ack_counter.load(Relaxed), 6);
733
734 yield_now().await;
735 advance_time(Duration::from_secs(2)).await;
736 yield_now().await;
737
738 assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
739
740 assert_eq!(ack_counter.load(Relaxed), 17);
742
743 yield_now().await;
744 advance_time(Duration::from_secs(5)).await;
745 yield_now().await;
746
747 for _ in 4..=6 {
748 assert!(matches!(
749 sink.poll_flush_unpin(&mut cx),
750 Poll::Ready(Ok(()))
751 ));
752 }
753
754 assert_eq!(ack_counter.load(Relaxed), 21);
755 }
756
757 #[tokio::test]
758 async fn batch_sink_buffers_messages_until_limit() {
759 let sent_requests = Arc::new(Mutex::new(Vec::new()));
760
761 let svc = tower::service_fn(|req| {
762 let sent_requests = Arc::clone(&sent_requests);
763
764 sent_requests.lock().unwrap().push(req);
765
766 future::ok::<_, std::io::Error>(())
767 });
768
769 let mut batch_settings = BatchSettings::default();
770 batch_settings.size.bytes = 9999;
771 batch_settings.size.events = 10;
772 let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
773
774 buffered
775 .sink_map_err(drop)
776 .send_all(
777 &mut stream::iter(0..22)
778 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
779 )
780 .await
781 .unwrap();
782
783 let output = sent_requests.lock().unwrap();
784 assert_eq!(
785 &*output,
786 &vec![
787 vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
788 vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
789 vec![20, 21]
790 ]
791 );
792 }
793
794 #[tokio::test]
795 async fn batch_sink_flushes_below_min_on_close() {
796 let sent_requests = Arc::new(Mutex::new(Vec::new()));
797
798 let svc = tower::service_fn(|req| {
799 let sent_requests = Arc::clone(&sent_requests);
800 sent_requests.lock().unwrap().push(req);
801 future::ok::<_, std::io::Error>(())
802 });
803
804 let mut batch_settings = BatchSettings::default();
805 batch_settings.size.bytes = 9999;
806 batch_settings.size.events = 10;
807 let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
808
809 let mut cx = Context::from_waker(noop_waker_ref());
810 assert!(matches!(
811 buffered.poll_ready_unpin(&mut cx),
812 Poll::Ready(Ok(()))
813 ));
814 assert!(matches!(
815 buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
816 Ok(())
817 ));
818 assert!(matches!(
819 buffered.poll_ready_unpin(&mut cx),
820 Poll::Ready(Ok(()))
821 ));
822 assert!(matches!(
823 buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
824 Ok(())
825 ));
826
827 buffered.close().await.unwrap();
828
829 let output = sent_requests.lock().unwrap();
830 assert_eq!(&*output, &vec![vec![0, 1]]);
831 }
832
833 #[tokio::test]
834 async fn batch_sink_expired_linger() {
835 let sent_requests = Arc::new(Mutex::new(Vec::new()));
836
837 let svc = tower::service_fn(|req| {
838 let sent_requests = Arc::clone(&sent_requests);
839 sent_requests.lock().unwrap().push(req);
840 future::ok::<_, std::io::Error>(())
841 });
842
843 let mut batch_settings = BatchSettings::default();
844 batch_settings.size.bytes = 9999;
845 batch_settings.size.events = 10;
846 let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
847
848 let mut cx = Context::from_waker(noop_waker_ref());
849 assert!(matches!(
850 buffered.poll_ready_unpin(&mut cx),
851 Poll::Ready(Ok(()))
852 ));
853 assert!(matches!(
854 buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
855 Ok(())
856 ));
857 assert!(matches!(
858 buffered.poll_ready_unpin(&mut cx),
859 Poll::Ready(Ok(()))
860 ));
861 assert!(matches!(
862 buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
863 Ok(())
864 ));
865
866 advance_time(TIMEOUT + Duration::from_secs(1)).await;
868
869 let start = Instant::now();
871 buffered.flush().await.unwrap();
872 let elapsed = start.duration_since(start);
873 assert!(elapsed < Duration::from_millis(200));
874
875 let output = sent_requests.lock().unwrap();
876 assert_eq!(&*output, &vec![vec![0, 1]]);
877 }
878
879 #[tokio::test]
880 async fn partition_batch_sink_buffers_messages_until_limit() {
881 let sent_requests = Arc::new(Mutex::new(Vec::new()));
882
883 let svc = tower::service_fn(|req| {
884 let sent_requests = Arc::clone(&sent_requests);
885 sent_requests.lock().unwrap().push(req);
886 future::ok::<_, std::io::Error>(())
887 });
888
889 let mut batch_settings = BatchSettings::default();
890 batch_settings.size.bytes = 9999;
891 batch_settings.size.events = 10;
892
893 let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
894
895 sink.sink_map_err(drop)
896 .send_all(
897 &mut stream::iter(0..22)
898 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
899 )
900 .await
901 .unwrap();
902
903 let output = sent_requests.lock().unwrap();
904 assert_eq!(
905 &*output,
906 &vec![
907 vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
908 vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
909 vec![20, 21]
910 ]
911 );
912 }
913
914 #[tokio::test]
915 async fn partition_batch_sink_buffers_by_partition_buffer_size_one() {
916 let sent_requests = Arc::new(Mutex::new(Vec::new()));
917
918 let svc = tower::service_fn(|req| {
919 let sent_requests = Arc::clone(&sent_requests);
920 sent_requests.lock().unwrap().push(req);
921 future::ok::<_, std::io::Error>(())
922 });
923
924 let mut batch_settings = BatchSettings::default();
925 batch_settings.size.bytes = 9999;
926 batch_settings.size.events = 1;
927
928 let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
929
930 let input = vec![Partitions::A, Partitions::B];
931 sink.sink_map_err(drop)
932 .send_all(
933 &mut stream::iter(input)
934 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
935 )
936 .await
937 .unwrap();
938
939 let mut output = sent_requests.lock().unwrap();
940 output[..].sort();
941 assert_eq!(&*output, &vec![vec![Partitions::A], vec![Partitions::B]]);
942 }
943
944 #[tokio::test]
945 async fn partition_batch_sink_buffers_by_partition_buffer_size_two() {
946 let sent_requests = Arc::new(Mutex::new(Vec::new()));
947
948 let svc = tower::service_fn(|req| {
949 let sent_requests = Arc::clone(&sent_requests);
950 sent_requests.lock().unwrap().push(req);
951 future::ok::<_, std::io::Error>(())
952 });
953
954 let mut batch_settings = BatchSettings::default();
955 batch_settings.size.bytes = 9999;
956 batch_settings.size.events = 2;
957
958 let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
959
960 let input = vec![Partitions::A, Partitions::B, Partitions::A, Partitions::B];
961 sink.sink_map_err(drop)
962 .send_all(
963 &mut stream::iter(input)
964 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
965 )
966 .await
967 .unwrap();
968
969 let mut output = sent_requests.lock().unwrap();
970 output[..].sort();
971 assert_eq!(
972 &*output,
973 &vec![
974 vec![Partitions::A, Partitions::A],
975 vec![Partitions::B, Partitions::B]
976 ]
977 );
978 }
979
980 #[tokio::test]
981 async fn partition_batch_sink_submits_after_linger() {
982 let sent_requests = Arc::new(Mutex::new(Vec::new()));
983
984 let svc = tower::service_fn(|req| {
985 let sent_requests = Arc::clone(&sent_requests);
986 sent_requests.lock().unwrap().push(req);
987 future::ok::<_, std::io::Error>(())
988 });
989
990 let mut batch_settings = BatchSettings::default();
991 batch_settings.size.bytes = 9999;
992 batch_settings.size.events = 10;
993
994 let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
995
996 let mut cx = Context::from_waker(noop_waker_ref());
997 assert!(matches!(
998 sink.poll_ready_unpin(&mut cx),
999 Poll::Ready(Ok(()))
1000 ));
1001 assert!(matches!(
1002 sink.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
1003 Ok(())
1004 ));
1005 assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
1006
1007 advance_time(TIMEOUT + Duration::from_secs(1)).await;
1008
1009 let start = Instant::now();
1010 sink.flush().await.unwrap();
1011 let elapsed = start.duration_since(start);
1012 assert!(elapsed < Duration::from_millis(200));
1013
1014 let output = sent_requests.lock().unwrap();
1015 assert_eq!(&*output, &vec![vec![1]]);
1016 }
1017
1018 #[tokio::test]
1019 async fn service_sink_doesnt_propagate_error() {
1020 let ack_counter = Counter::default();
1021
1022 let svc = tower::service_fn(|req: Request| {
1027 if req.0 == 3 {
1028 future::err("bad")
1029 } else {
1030 future::ok("good")
1031 }
1032 });
1033 let mut sink = ServiceSink::new(svc);
1034 let req = |items: usize| {
1035 let mut req = Request::new(items, &ack_counter);
1036 let finalizers = std::mem::take(&mut req.1);
1037 EncodedBatch {
1038 items: req,
1039 finalizers,
1040 count: items,
1041 byte_size: 1,
1042 json_byte_size: JsonSize::new(1),
1043 }
1044 };
1045
1046 let mut fut1 = sink.call(req(1));
1048 let mut fut2 = sink.call(req(2));
1049
1050 assert_eq!(ack_counter.load(Relaxed), 0);
1051
1052 let mut cx = Context::from_waker(noop_waker_ref());
1053 assert!(matches!(fut1.poll_unpin(&mut cx), Poll::Ready(())));
1054 assert!(matches!(fut2.poll_unpin(&mut cx), Poll::Ready(())));
1055 assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
1056
1057 yield_now().await;
1058 assert_eq!(ack_counter.load(Relaxed), 3);
1059
1060 let mut fut3 = sink.call(req(3)); let mut fut4 = sink.call(req(4));
1063
1064 assert!(matches!(fut3.poll_unpin(&mut cx), Poll::Ready(())));
1066 assert!(matches!(fut4.poll_unpin(&mut cx), Poll::Ready(())));
1067 assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
1068
1069 yield_now().await;
1070 assert_eq!(ack_counter.load(Relaxed), 7);
1071 }
1072
1073 #[tokio::test]
1074 async fn partition_batch_sink_ordering_per_partition() {
1075 let sent_requests = Arc::new(Mutex::new(Vec::new()));
1076
1077 let mut delay = true;
1078 let svc = tower::service_fn(|req| {
1079 let sent_requests = Arc::clone(&sent_requests);
1080 if delay {
1081 delay = false;
1083 sleep(Duration::from_secs(1))
1084 .map(move |_| {
1085 sent_requests.lock().unwrap().push(req);
1086 Result::<_, std::io::Error>::Ok(())
1087 })
1088 .boxed()
1089 } else {
1090 sent_requests.lock().unwrap().push(req);
1091 future::ok::<_, std::io::Error>(()).boxed()
1092 }
1093 });
1094
1095 let mut batch_settings = BatchSettings::default();
1096 batch_settings.size.bytes = 9999;
1097 batch_settings.size.events = 10;
1098
1099 let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
1100 sink.ordered();
1101
1102 let input = (0..20).map(|i| (0, i)).chain((0..20).map(|i| (1, i)));
1103 sink.sink_map_err(drop)
1104 .send_all(
1105 &mut stream::iter(input)
1106 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
1107 )
1108 .await
1109 .unwrap();
1110
1111 let output = sent_requests.lock().unwrap();
1112 assert_eq!(
1116 &*output,
1117 &vec![
1118 (0..10).map(|i| (1, i)).collect::<Vec<_>>(),
1119 (10..20).map(|i| (1, i)).collect(),
1120 (0..10).map(|i| (0, i)).collect(),
1121 (10..20).map(|i| (0, i)).collect(),
1122 ]
1123 );
1124 }
1125
1126 #[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
1127 enum Partitions {
1128 A,
1129 B,
1130 }
1131
1132 impl EncodedLength for Partitions {
1133 fn encoded_length(&self) -> usize {
1134 10 }
1136 }
1137
1138 impl Partition<Bytes> for Partitions {
1139 fn partition(&self) -> Bytes {
1140 format!("{self:?}").into()
1141 }
1142 }
1143
1144 impl Partition<Bytes> for usize {
1145 fn partition(&self) -> Bytes {
1146 "key".into()
1147 }
1148 }
1149
1150 impl Partition<Bytes> for (usize, usize) {
1151 fn partition(&self) -> Bytes {
1152 self.0.to_string().into()
1153 }
1154 }
1155
1156 impl EncodedLength for (usize, usize) {
1157 fn encoded_length(&self) -> usize {
1158 16
1159 }
1160 }
1161}