1use std::{
35 collections::HashMap,
36 fmt,
37 hash::Hash,
38 marker::PhantomData,
39 pin::Pin,
40 task::{Context, Poll, ready},
41};
42
43use futures::{FutureExt, Sink, Stream, TryFutureExt, future::BoxFuture, stream::FuturesUnordered};
44use pin_project::pin_project;
45use tokio::{
46 sync::oneshot,
47 time::{Duration, Sleep, sleep},
48};
49use tower::{Service, ServiceBuilder};
50use tracing::Instrument;
51use vector_lib::internal_event::{
52 CallError, CountByteSize, EventsSent, InternalEventHandle as _, Output,
53};
54pub use vector_lib::sink::StreamSink;
56
57use super::{
58 EncodedEvent,
59 batch::{Batch, EncodedBatch, FinalizersBatch, PushResult, StatefulBatch},
60 buffer::{Partition, PartitionBuffer, PartitionInnerBuffer},
61 service::{Map, ServiceBuilderExt},
62};
63use crate::event::EventStatus;
64
65#[pin_project]
84#[derive(Debug)]
85pub struct BatchSink<S, B>
86where
87 S: Service<B::Output>,
88 B: Batch,
89{
90 #[pin]
91 inner: PartitionBatchSink<
92 Map<S, PartitionInnerBuffer<B::Output, ()>, B::Output>,
93 PartitionBuffer<B, ()>,
94 (),
95 >,
96}
97
98impl<S, B> BatchSink<S, B>
99where
100 S: Service<B::Output>,
101 S::Future: Send + 'static,
102 S::Error: Into<crate::Error> + Send + 'static,
103 S::Response: Response + Send + 'static,
104 B: Batch,
105{
106 pub fn new(service: S, batch: B, timeout: Duration) -> Self {
107 let service = ServiceBuilder::new()
108 .map(|req: PartitionInnerBuffer<B::Output, ()>| req.into_parts().0)
109 .service(service);
110 let batch = PartitionBuffer::new(batch);
111 let inner = PartitionBatchSink::new(service, batch, timeout);
112 Self { inner }
113 }
114}
115
116#[cfg(test)]
117impl<S, B> BatchSink<S, B>
118where
119 S: Service<B::Output>,
120 B: Batch,
121{
122 pub const fn get_ref(&self) -> &S {
123 &self.inner.service.service.inner
124 }
125}
126
127impl<S, B> Sink<EncodedEvent<B::Input>> for BatchSink<S, B>
128where
129 S: Service<B::Output>,
130 S::Future: Send + 'static,
131 S::Error: Into<crate::Error> + Send + 'static,
132 S::Response: Response + Send + 'static,
133 B: Batch,
134{
135 type Error = crate::Error;
136
137 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 self.project().inner.poll_ready(cx)
139 }
140
141 fn start_send(self: Pin<&mut Self>, item: EncodedEvent<B::Input>) -> Result<(), Self::Error> {
142 self.project()
143 .inner
144 .start_send(item.map(|item| PartitionInnerBuffer::new(item, ())))
145 }
146
147 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
148 self.project().inner.poll_flush(cx)
149 }
150
151 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
152 self.project().inner.poll_close(cx)
153 }
154}
155
156#[pin_project]
184pub struct PartitionBatchSink<S, B, K>
185where
186 B: Batch,
187 S: Service<B::Output>,
188{
189 service: ServiceSink<S, B::Output>,
190 buffer: Option<(K, EncodedEvent<B::Input>)>,
191 batch: StatefulBatch<FinalizersBatch<B>>,
192 partitions: HashMap<K, StatefulBatch<FinalizersBatch<B>>>,
193 timeout: Duration,
194 lingers: HashMap<K, Pin<Box<Sleep>>>,
195 in_flight: Option<HashMap<K, BoxFuture<'static, ()>>>,
196 closing: bool,
197}
198
199impl<S, B, K> PartitionBatchSink<S, B, K>
200where
201 B: Batch,
202 B::Input: Partition<K>,
203 K: Hash + Eq + Clone + Send + 'static,
204 S: Service<B::Output>,
205 S::Future: Send + 'static,
206 S::Error: Into<crate::Error> + Send + 'static,
207 S::Response: Response + Send + 'static,
208{
209 pub fn new(service: S, batch: B, timeout: Duration) -> Self {
210 Self {
211 service: ServiceSink::new(service),
212 buffer: None,
213 batch: StatefulBatch::from(FinalizersBatch::from(batch)),
214 partitions: HashMap::new(),
215 timeout,
216 lingers: HashMap::new(),
217 in_flight: None,
218 closing: false,
219 }
220 }
221
222 pub fn ordered(&mut self) {
224 self.in_flight = Some(HashMap::new());
225 }
226}
227
228impl<S, B, K> Sink<EncodedEvent<B::Input>> for PartitionBatchSink<S, B, K>
229where
230 B: Batch,
231 B::Input: Partition<K>,
232 K: Hash + Eq + Clone + Send + 'static,
233 S: Service<B::Output>,
234 S::Future: Send + 'static,
235 S::Error: Into<crate::Error> + Send + 'static,
236 S::Response: Response + Send + 'static,
237{
238 type Error = crate::Error;
239
240 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
241 if self.buffer.is_some() {
242 match self.as_mut().poll_flush(cx) {
243 Poll::Ready(Ok(())) => {}
244 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
245 Poll::Pending => {
246 if self.buffer.is_some() {
247 return Poll::Pending;
248 }
249 }
250 }
251 }
252
253 Poll::Ready(Ok(()))
254 }
255
256 fn start_send(
257 mut self: Pin<&mut Self>,
258 item: EncodedEvent<B::Input>,
259 ) -> Result<(), Self::Error> {
260 let partition = item.item.partition();
261
262 let batch = loop {
263 if let Some(batch) = self.partitions.get_mut(&partition) {
264 break batch;
265 }
266
267 let batch = self.batch.fresh();
268 self.partitions.insert(partition.clone(), batch);
269
270 let delay = sleep(self.timeout);
271 self.lingers.insert(partition.clone(), Box::pin(delay));
272 };
273
274 if let PushResult::Overflow(item) = batch.push(item) {
275 self.buffer = Some((partition, item));
276 }
277
278 Ok(())
279 }
280
281 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
282 loop {
283 if self.buffer.is_none() && self.partitions.is_empty() {
285 ready!(self.service.poll_complete(cx));
286 return Poll::Ready(Ok(()));
287 }
288
289 let this = self.as_mut().project();
291 let mut partitions_ready = vec![];
292 for (partition, batch) in this.partitions.iter() {
293 if ((*this.closing && !batch.is_empty())
294 || batch.was_full()
295 || matches!(
296 this.lingers
297 .get_mut(partition)
298 .expect("linger should exists for poll_flush")
299 .poll_unpin(cx),
300 Poll::Ready(())
301 ))
302 && this
303 .in_flight
304 .as_mut()
305 .and_then(|map| map.get_mut(partition))
306 .map(|req| matches!(req.poll_unpin(cx), Poll::Ready(())))
307 .unwrap_or(true)
308 {
309 partitions_ready.push(partition.clone());
310 }
311 }
312 let mut batch_consumed = false;
313 for partition in partitions_ready.iter() {
314 let service_ready = match this.service.poll_ready(cx) {
315 Poll::Ready(Ok(())) => true,
316 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
317 Poll::Pending => false,
318 };
319 if service_ready {
320 trace!("Service ready; Sending batch.");
321
322 let batch = this.partitions.remove(partition).unwrap();
323 this.lingers.remove(partition);
324
325 let batch = batch.finish();
326 let future = tokio::spawn(this.service.call(batch));
327
328 if let Some(map) = this.in_flight.as_mut() {
329 map.insert(partition.clone(), future.map(|_| ()).fuse().boxed());
330 }
331
332 batch_consumed = true;
333 } else {
334 break;
335 }
336 }
337 if batch_consumed {
338 continue;
339 }
340
341 if let Some(in_flight) = this.in_flight.as_mut()
343 && in_flight.len() > this.partitions.len()
344 {
345 let partitions = this.partitions;
348 in_flight.retain(|partition, req| {
349 partitions.contains_key(partition) || req.poll_unpin(cx).is_pending()
350 });
351 }
352
353 if let Some((partition, item)) = self.buffer.take() {
355 if self.partitions.contains_key(&partition) {
356 self.buffer = Some((partition, item));
357 } else {
358 self.as_mut().start_send(item)?;
359
360 if self.buffer.is_some() {
361 unreachable!("Empty buffer overflowed.");
362 }
363
364 continue;
365 }
366 }
367
368 ready!(self.service.poll_complete(cx));
370 return Poll::Pending;
371 }
372 }
373
374 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375 trace!("Closing partition batch sink.");
376 self.closing = true;
377 self.poll_flush(cx)
378 }
379}
380
381impl<S, B, K> fmt::Debug for PartitionBatchSink<S, B, K>
382where
383 S: Service<B::Output> + fmt::Debug,
384 B: Batch + fmt::Debug,
385{
386 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
387 f.debug_struct("PartitionBatchSink")
388 .field("service", &self.service)
389 .field("batch", &self.batch)
390 .field("timeout", &self.timeout)
391 .finish()
392 }
393}
394
395struct ServiceSink<S, Request> {
398 service: S,
399 in_flight: FuturesUnordered<oneshot::Receiver<()>>,
400 next_request_id: usize,
401 _pd: PhantomData<Request>,
402}
403
404impl<S, Request> ServiceSink<S, Request>
405where
406 S: Service<Request>,
407 S::Future: Send + 'static,
408 S::Error: Into<crate::Error> + Send + 'static,
409 S::Response: Response + Send + 'static,
410{
411 fn new(service: S) -> Self {
412 Self {
413 service,
414 in_flight: FuturesUnordered::new(),
415 next_request_id: 0,
416 _pd: PhantomData,
417 }
418 }
419
420 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
421 self.service.poll_ready(cx).map_err(Into::into)
422 }
423
424 fn call(&mut self, batch: EncodedBatch<Request>) -> BoxFuture<'static, ()> {
425 let EncodedBatch {
426 items,
427 finalizers,
428 count,
429 json_byte_size,
430 ..
431 } = batch;
432
433 let (tx, rx) = oneshot::channel();
434
435 self.in_flight.push(rx);
436
437 let request_id = self.next_request_id;
438 self.next_request_id = request_id.wrapping_add(1);
439
440 trace!(
441 message = "Submitting service request.",
442 in_flight_requests = self.in_flight.len()
443 );
444 let events_sent = register!(EventsSent::from(Output(None)));
445 self.service
446 .call(items)
447 .err_into()
448 .map(move |result| {
449 let status = result_status(&result);
450 finalizers.update_status(status);
451 match status {
452 EventStatus::Delivered => {
453 events_sent.emit(CountByteSize(count, json_byte_size));
454 }
456 EventStatus::Rejected => {
457 let error = result.err().unwrap_or_else(|| "Response failed.".into());
460 emit!(CallError {
461 error,
462 request_id,
463 count,
464 });
465 }
466 _ => {} }
468
469 _ = tx.send(());
473 })
474 .instrument(info_span!("request", %request_id).or_current())
475 .boxed()
476 }
477
478 fn poll_complete(&mut self, cx: &mut Context<'_>) -> Poll<()> {
479 while !self.in_flight.is_empty() {
480 match ready!(Pin::new(&mut self.in_flight).poll_next(cx)) {
481 Some(Ok(())) => {}
482 Some(Err(_)) => panic!("ServiceSink service sender dropped."),
483 None => break,
484 }
485 }
486
487 Poll::Ready(())
488 }
489}
490
491impl<S, Request> fmt::Debug for ServiceSink<S, Request>
492where
493 S: fmt::Debug,
494{
495 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
496 f.debug_struct("ServiceSink")
497 .field("service", &self.service)
498 .finish()
499 }
500}
501
502pub trait ServiceLogic: Clone {
505 type Response: Response;
506 fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus;
507}
508
509#[derive(Derivative)]
510#[derivative(Clone)]
511pub struct StdServiceLogic<R> {
512 _pd: PhantomData<R>,
513}
514
515impl<R> Default for StdServiceLogic<R> {
516 fn default() -> Self {
517 Self { _pd: PhantomData }
518 }
519}
520
521impl<R> ServiceLogic for StdServiceLogic<R>
522where
523 R: Response + Send,
524{
525 type Response = R;
526
527 fn result_status(&self, result: &crate::Result<Self::Response>) -> EventStatus {
528 result_status(result)
529 }
530}
531
532fn result_status<R: Response + Send>(result: &crate::Result<R>) -> EventStatus {
533 match result {
534 Ok(response) => {
535 if response.is_successful() {
536 trace!(message = "Response successful.", ?response);
537 EventStatus::Delivered
538 } else if response.is_transient() {
539 error!(message = "Response wasn't successful.", ?response);
540 EventStatus::Errored
541 } else {
542 error!(message = "Response failed.", ?response);
543 EventStatus::Rejected
544 }
545 }
546 Err(error) => {
547 error!(message = "Request failed.", %error);
548 EventStatus::Errored
549 }
550 }
551}
552
553pub trait Response: fmt::Debug {
556 fn is_successful(&self) -> bool {
557 true
558 }
559
560 fn is_transient(&self) -> bool {
561 true
562 }
563}
564
565impl Response for () {}
566
567impl Response for &str {}
568
569#[cfg(test)]
570mod tests {
571 use std::{
572 convert::Infallible,
573 sync::{
574 Arc, Mutex,
575 atomic::{AtomicUsize, Ordering::Relaxed},
576 },
577 };
578
579 use bytes::Bytes;
580 use futures::{SinkExt, StreamExt, future, stream, task::noop_waker_ref};
581 use tokio::{task::yield_now, time::Instant};
582 use vector_lib::{
583 finalization::{BatchNotifier, BatchStatus, EventFinalizer, EventFinalizers},
584 json_size::JsonSize,
585 };
586
587 use super::*;
588 use crate::{
589 sinks::util::{BatchSettings, EncodedLength, VecBuffer},
590 test_util::trace_init,
591 };
592
593 const TIMEOUT: Duration = Duration::from_secs(10);
594
595 impl EncodedLength for usize {
596 fn encoded_length(&self) -> usize {
597 22
598 }
599 }
600
601 async fn advance_time(duration: Duration) {
602 tokio::time::pause();
603 tokio::time::advance(duration).await;
604 tokio::time::resume();
605 }
606
607 type Counter = Arc<AtomicUsize>;
608
609 struct Request(usize, EventFinalizers);
610
611 impl Request {
612 fn new(value: usize, counter: &Counter) -> Self {
613 let (batch, receiver) = BatchNotifier::new_with_receiver();
614 let counter = Arc::clone(counter);
615 tokio::spawn(async move {
616 if receiver.await == BatchStatus::Delivered {
617 counter.fetch_add(value, Relaxed);
618 }
619 });
620 Self(value, EventFinalizers::new(EventFinalizer::new(batch)))
621 }
622
623 fn encoded(value: usize, counter: &Counter) -> EncodedEvent<Self> {
624 let mut item = Self::new(value, counter);
625 let finalizers = std::mem::take(&mut item.1);
626 EncodedEvent {
627 item,
628 finalizers,
629 json_byte_size: JsonSize::zero(),
630 byte_size: 0,
631 }
632 }
633 }
634
635 impl EncodedLength for Request {
636 fn encoded_length(&self) -> usize {
637 22
638 }
639 }
640
641 #[tokio::test]
642 async fn batch_sink_acking_sequential() {
643 let ack_counter = Counter::default();
644
645 let svc = tower::service_fn(|_| future::ok::<_, std::io::Error>(()));
646 let mut batch_settings = BatchSettings::default();
647 batch_settings.size.bytes = 9999;
648 batch_settings.size.events = 10;
649
650 let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
651
652 buffered
653 .sink_map_err(drop)
654 .send_all(
655 &mut stream::iter(1..=22).map(|item| Ok(Request::encoded(item, &ack_counter))),
656 )
657 .await
658 .unwrap();
659
660 assert_eq!(ack_counter.load(Relaxed), 22 * 23 / 2);
661 }
662
663 #[tokio::test]
664 async fn batch_sink_acking_unordered() {
665 let ack_counter = Counter::default();
666
667 trace_init();
668
669 let svc = tower::service_fn(|req: Vec<Request>| async move {
671 let duration = match req[0].0 {
672 1..=3 => Duration::from_secs(1),
673
674 4 => Duration::from_secs(5),
678 5 | 6 => Duration::from_secs(1),
679 _ => unreachable!(),
680 };
681
682 sleep(duration).await;
683 Ok::<(), Infallible>(())
684 });
685
686 let mut batch_settings = BatchSettings::default();
687 batch_settings.size.bytes = 9999;
688 batch_settings.size.events = 1;
689
690 let mut sink = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
691
692 let mut cx = Context::from_waker(noop_waker_ref());
693 for item in 1..=3 {
694 assert!(matches!(
695 sink.poll_ready_unpin(&mut cx),
696 Poll::Ready(Ok(()))
697 ));
698 assert!(matches!(
699 sink.start_send_unpin(Request::encoded(item, &ack_counter)),
700 Ok(())
701 ));
702 }
703
704 assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
706 assert_eq!(ack_counter.load(Relaxed), 0);
707
708 yield_now().await;
709 advance_time(Duration::from_secs(3)).await;
710 yield_now().await;
711
712 for _ in 1..=3 {
713 assert!(matches!(
714 sink.poll_flush_unpin(&mut cx),
715 Poll::Ready(Ok(()))
716 ));
717 }
718
719 assert_eq!(ack_counter.load(Relaxed), 6);
721
722 for item in 4..=6 {
723 assert!(matches!(
724 sink.poll_ready_unpin(&mut cx),
725 Poll::Ready(Ok(()))
726 ));
727 assert!(matches!(
728 sink.start_send_unpin(Request::encoded(item, &ack_counter)),
729 Ok(())
730 ));
731 }
732
733 assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
735 assert_eq!(ack_counter.load(Relaxed), 6);
736
737 yield_now().await;
738 advance_time(Duration::from_secs(2)).await;
739 yield_now().await;
740
741 assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
742
743 assert_eq!(ack_counter.load(Relaxed), 17);
745
746 yield_now().await;
747 advance_time(Duration::from_secs(5)).await;
748 yield_now().await;
749
750 for _ in 4..=6 {
751 assert!(matches!(
752 sink.poll_flush_unpin(&mut cx),
753 Poll::Ready(Ok(()))
754 ));
755 }
756
757 assert_eq!(ack_counter.load(Relaxed), 21);
758 }
759
760 #[tokio::test]
761 async fn batch_sink_buffers_messages_until_limit() {
762 let sent_requests = Arc::new(Mutex::new(Vec::new()));
763
764 let svc = tower::service_fn(|req| {
765 let sent_requests = Arc::clone(&sent_requests);
766
767 sent_requests.lock().unwrap().push(req);
768
769 future::ok::<_, std::io::Error>(())
770 });
771
772 let mut batch_settings = BatchSettings::default();
773 batch_settings.size.bytes = 9999;
774 batch_settings.size.events = 10;
775 let buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
776
777 buffered
778 .sink_map_err(drop)
779 .send_all(
780 &mut stream::iter(0..22)
781 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
782 )
783 .await
784 .unwrap();
785
786 let output = sent_requests.lock().unwrap();
787 assert_eq!(
788 &*output,
789 &vec![
790 vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
791 vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
792 vec![20, 21]
793 ]
794 );
795 }
796
797 #[tokio::test]
798 async fn batch_sink_flushes_below_min_on_close() {
799 let sent_requests = Arc::new(Mutex::new(Vec::new()));
800
801 let svc = tower::service_fn(|req| {
802 let sent_requests = Arc::clone(&sent_requests);
803 sent_requests.lock().unwrap().push(req);
804 future::ok::<_, std::io::Error>(())
805 });
806
807 let mut batch_settings = BatchSettings::default();
808 batch_settings.size.bytes = 9999;
809 batch_settings.size.events = 10;
810 let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
811
812 let mut cx = Context::from_waker(noop_waker_ref());
813 assert!(matches!(
814 buffered.poll_ready_unpin(&mut cx),
815 Poll::Ready(Ok(()))
816 ));
817 assert!(matches!(
818 buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
819 Ok(())
820 ));
821 assert!(matches!(
822 buffered.poll_ready_unpin(&mut cx),
823 Poll::Ready(Ok(()))
824 ));
825 assert!(matches!(
826 buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
827 Ok(())
828 ));
829
830 buffered.close().await.unwrap();
831
832 let output = sent_requests.lock().unwrap();
833 assert_eq!(&*output, &vec![vec![0, 1]]);
834 }
835
836 #[tokio::test]
837 async fn batch_sink_expired_linger() {
838 let sent_requests = Arc::new(Mutex::new(Vec::new()));
839
840 let svc = tower::service_fn(|req| {
841 let sent_requests = Arc::clone(&sent_requests);
842 sent_requests.lock().unwrap().push(req);
843 future::ok::<_, std::io::Error>(())
844 });
845
846 let mut batch_settings = BatchSettings::default();
847 batch_settings.size.bytes = 9999;
848 batch_settings.size.events = 10;
849 let mut buffered = BatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
850
851 let mut cx = Context::from_waker(noop_waker_ref());
852 assert!(matches!(
853 buffered.poll_ready_unpin(&mut cx),
854 Poll::Ready(Ok(()))
855 ));
856 assert!(matches!(
857 buffered.start_send_unpin(EncodedEvent::new(0, 0, JsonSize::zero())),
858 Ok(())
859 ));
860 assert!(matches!(
861 buffered.poll_ready_unpin(&mut cx),
862 Poll::Ready(Ok(()))
863 ));
864 assert!(matches!(
865 buffered.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
866 Ok(())
867 ));
868
869 advance_time(TIMEOUT + Duration::from_secs(1)).await;
871
872 let start = Instant::now();
874 buffered.flush().await.unwrap();
875 let elapsed = start.duration_since(start);
876 assert!(elapsed < Duration::from_millis(200));
877
878 let output = sent_requests.lock().unwrap();
879 assert_eq!(&*output, &vec![vec![0, 1]]);
880 }
881
882 #[tokio::test]
883 async fn partition_batch_sink_buffers_messages_until_limit() {
884 let sent_requests = Arc::new(Mutex::new(Vec::new()));
885
886 let svc = tower::service_fn(|req| {
887 let sent_requests = Arc::clone(&sent_requests);
888 sent_requests.lock().unwrap().push(req);
889 future::ok::<_, std::io::Error>(())
890 });
891
892 let mut batch_settings = BatchSettings::default();
893 batch_settings.size.bytes = 9999;
894 batch_settings.size.events = 10;
895
896 let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
897
898 sink.sink_map_err(drop)
899 .send_all(
900 &mut stream::iter(0..22)
901 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
902 )
903 .await
904 .unwrap();
905
906 let output = sent_requests.lock().unwrap();
907 assert_eq!(
908 &*output,
909 &vec![
910 vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
911 vec![10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
912 vec![20, 21]
913 ]
914 );
915 }
916
917 #[tokio::test]
918 async fn partition_batch_sink_buffers_by_partition_buffer_size_one() {
919 let sent_requests = Arc::new(Mutex::new(Vec::new()));
920
921 let svc = tower::service_fn(|req| {
922 let sent_requests = Arc::clone(&sent_requests);
923 sent_requests.lock().unwrap().push(req);
924 future::ok::<_, std::io::Error>(())
925 });
926
927 let mut batch_settings = BatchSettings::default();
928 batch_settings.size.bytes = 9999;
929 batch_settings.size.events = 1;
930
931 let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
932
933 let input = vec![Partitions::A, Partitions::B];
934 sink.sink_map_err(drop)
935 .send_all(
936 &mut stream::iter(input)
937 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
938 )
939 .await
940 .unwrap();
941
942 let mut output = sent_requests.lock().unwrap();
943 output[..].sort();
944 assert_eq!(&*output, &vec![vec![Partitions::A], vec![Partitions::B]]);
945 }
946
947 #[tokio::test]
948 async fn partition_batch_sink_buffers_by_partition_buffer_size_two() {
949 let sent_requests = Arc::new(Mutex::new(Vec::new()));
950
951 let svc = tower::service_fn(|req| {
952 let sent_requests = Arc::clone(&sent_requests);
953 sent_requests.lock().unwrap().push(req);
954 future::ok::<_, std::io::Error>(())
955 });
956
957 let mut batch_settings = BatchSettings::default();
958 batch_settings.size.bytes = 9999;
959 batch_settings.size.events = 2;
960
961 let sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
962
963 let input = vec![Partitions::A, Partitions::B, Partitions::A, Partitions::B];
964 sink.sink_map_err(drop)
965 .send_all(
966 &mut stream::iter(input)
967 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
968 )
969 .await
970 .unwrap();
971
972 let mut output = sent_requests.lock().unwrap();
973 output[..].sort();
974 assert_eq!(
975 &*output,
976 &vec![
977 vec![Partitions::A, Partitions::A],
978 vec![Partitions::B, Partitions::B]
979 ]
980 );
981 }
982
983 #[tokio::test]
984 async fn partition_batch_sink_submits_after_linger() {
985 let sent_requests = Arc::new(Mutex::new(Vec::new()));
986
987 let svc = tower::service_fn(|req| {
988 let sent_requests = Arc::clone(&sent_requests);
989 sent_requests.lock().unwrap().push(req);
990 future::ok::<_, std::io::Error>(())
991 });
992
993 let mut batch_settings = BatchSettings::default();
994 batch_settings.size.bytes = 9999;
995 batch_settings.size.events = 10;
996
997 let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
998
999 let mut cx = Context::from_waker(noop_waker_ref());
1000 assert!(matches!(
1001 sink.poll_ready_unpin(&mut cx),
1002 Poll::Ready(Ok(()))
1003 ));
1004 assert!(matches!(
1005 sink.start_send_unpin(EncodedEvent::new(1, 0, JsonSize::zero())),
1006 Ok(())
1007 ));
1008 assert!(matches!(sink.poll_flush_unpin(&mut cx), Poll::Pending));
1009
1010 advance_time(TIMEOUT + Duration::from_secs(1)).await;
1011
1012 let start = Instant::now();
1013 sink.flush().await.unwrap();
1014 let elapsed = start.duration_since(start);
1015 assert!(elapsed < Duration::from_millis(200));
1016
1017 let output = sent_requests.lock().unwrap();
1018 assert_eq!(&*output, &vec![vec![1]]);
1019 }
1020
1021 #[tokio::test]
1022 async fn service_sink_doesnt_propagate_error() {
1023 let ack_counter = Counter::default();
1024
1025 let svc = tower::service_fn(|req: Request| {
1030 if req.0 == 3 {
1031 future::err("bad")
1032 } else {
1033 future::ok("good")
1034 }
1035 });
1036 let mut sink = ServiceSink::new(svc);
1037 let req = |items: usize| {
1038 let mut req = Request::new(items, &ack_counter);
1039 let finalizers = std::mem::take(&mut req.1);
1040 EncodedBatch {
1041 items: req,
1042 finalizers,
1043 count: items,
1044 byte_size: 1,
1045 json_byte_size: JsonSize::new(1),
1046 }
1047 };
1048
1049 let mut fut1 = sink.call(req(1));
1051 let mut fut2 = sink.call(req(2));
1052
1053 assert_eq!(ack_counter.load(Relaxed), 0);
1054
1055 let mut cx = Context::from_waker(noop_waker_ref());
1056 assert!(matches!(fut1.poll_unpin(&mut cx), Poll::Ready(())));
1057 assert!(matches!(fut2.poll_unpin(&mut cx), Poll::Ready(())));
1058 assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
1059
1060 yield_now().await;
1061 assert_eq!(ack_counter.load(Relaxed), 3);
1062
1063 let mut fut3 = sink.call(req(3)); let mut fut4 = sink.call(req(4));
1066
1067 assert!(matches!(fut3.poll_unpin(&mut cx), Poll::Ready(())));
1069 assert!(matches!(fut4.poll_unpin(&mut cx), Poll::Ready(())));
1070 assert!(matches!(sink.poll_complete(&mut cx), Poll::Ready(())));
1071
1072 yield_now().await;
1073 assert_eq!(ack_counter.load(Relaxed), 7);
1074 }
1075
1076 #[tokio::test]
1077 async fn partition_batch_sink_ordering_per_partition() {
1078 let sent_requests = Arc::new(Mutex::new(Vec::new()));
1079
1080 let mut delay = true;
1081 let svc = tower::service_fn(|req| {
1082 let sent_requests = Arc::clone(&sent_requests);
1083 if delay {
1084 delay = false;
1086 sleep(Duration::from_secs(1))
1087 .map(move |_| {
1088 sent_requests.lock().unwrap().push(req);
1089 Result::<_, std::io::Error>::Ok(())
1090 })
1091 .boxed()
1092 } else {
1093 sent_requests.lock().unwrap().push(req);
1094 future::ok::<_, std::io::Error>(()).boxed()
1095 }
1096 });
1097
1098 let mut batch_settings = BatchSettings::default();
1099 batch_settings.size.bytes = 9999;
1100 batch_settings.size.events = 10;
1101
1102 let mut sink = PartitionBatchSink::new(svc, VecBuffer::new(batch_settings.size), TIMEOUT);
1103 sink.ordered();
1104
1105 let input = (0..20).map(|i| (0, i)).chain((0..20).map(|i| (1, i)));
1106 sink.sink_map_err(drop)
1107 .send_all(
1108 &mut stream::iter(input)
1109 .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
1110 )
1111 .await
1112 .unwrap();
1113
1114 let output = sent_requests.lock().unwrap();
1115 assert_eq!(
1119 &*output,
1120 &vec![
1121 (0..10).map(|i| (1, i)).collect::<Vec<_>>(),
1122 (10..20).map(|i| (1, i)).collect(),
1123 (0..10).map(|i| (0, i)).collect(),
1124 (10..20).map(|i| (0, i)).collect(),
1125 ]
1126 );
1127 }
1128
1129 #[derive(Debug, PartialEq, Eq, Ord, PartialOrd)]
1130 enum Partitions {
1131 A,
1132 B,
1133 }
1134
1135 impl EncodedLength for Partitions {
1136 fn encoded_length(&self) -> usize {
1137 10 }
1139 }
1140
1141 impl Partition<Bytes> for Partitions {
1142 fn partition(&self) -> Bytes {
1143 format!("{self:?}").into()
1144 }
1145 }
1146
1147 impl Partition<Bytes> for usize {
1148 fn partition(&self) -> Bytes {
1149 "key".into()
1150 }
1151 }
1152
1153 impl Partition<Bytes> for (usize, usize) {
1154 fn partition(&self) -> Bytes {
1155 self.0.to_string().into()
1156 }
1157 }
1158
1159 impl EncodedLength for (usize, usize) {
1160 fn encoded_length(&self) -> usize {
1161 16
1162 }
1163 }
1164}