vector/sinks/util/adaptive_concurrency/
future.rs1use std::{
4 future::Future,
5 pin::Pin,
6 sync::Arc,
7 task::{ready, Context, Poll},
8 time::Instant,
9};
10
11use pin_project::pin_project;
12use tokio::sync::OwnedSemaphorePermit;
13
14use super::{controller::Controller, instant_now};
15use crate::sinks::util::retries::RetryLogic;
16
17#[pin_project]
29pub struct ResponseFuture<F, L> {
30 #[pin]
31 inner: F,
32 _permit: OwnedSemaphorePermit,
34 controller: Arc<Controller<L>>,
35 start: Instant,
36}
37
38impl<F, L> ResponseFuture<F, L> {
39 pub(super) fn new(
40 inner: F,
41 _permit: OwnedSemaphorePermit,
42 controller: Arc<Controller<L>>,
43 ) -> Self {
44 Self {
45 inner,
46 _permit,
47 controller,
48 start: instant_now(),
49 }
50 }
51}
52
53impl<F, L, E> Future for ResponseFuture<F, L>
54where
55 F: Future<Output = Result<L::Response, E>>,
56 L: RetryLogic,
57 E: Into<crate::Error>,
58{
59 type Output = Result<L::Response, crate::Error>;
60
61 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62 let future = self.project();
63 let output = ready!(future.inner.poll(cx)).map_err(Into::into);
64 future.controller.adjust_to_response(*future.start, &output);
65 Poll::Ready(output)
66 }
67}