vector/sinks/util/adaptive_concurrency/
future.rs

1//! Future types
2//!
3use std::{
4    future::Future,
5    pin::Pin,
6    sync::Arc,
7    task::{Context, Poll, ready},
8    time::Instant,
9};
10
11use pin_project::pin_project;
12use tokio::sync::OwnedSemaphorePermit;
13
14use super::{controller::Controller, instant_now};
15use crate::sinks::util::retries::RetryLogic;
16
17/// Future for the `AdaptiveConcurrencyLimit` service.
18///
19/// This future runs the inner future, which is used to collect the
20/// response from the inner service, and then tells the controller to
21/// adjust its measurements when that future is ready. It also owns the
22/// semaphore permit that is used to control concurrency such that the
23/// semaphore is returned when this future is dropped.
24///
25/// Note that this future must be awaited immediately (such as by
26/// spawning it) to prevent extraneous delays from causing discrepancies
27/// in the measurements.
28#[pin_project]
29pub struct ResponseFuture<F, L> {
30    #[pin]
31    inner: F,
32    // Keep this around so that it is dropped when the future completes
33    _permit: OwnedSemaphorePermit,
34    controller: Arc<Controller<L>>,
35    start: Instant,
36}
37
38impl<F, L> ResponseFuture<F, L> {
39    pub(super) fn new(
40        inner: F,
41        _permit: OwnedSemaphorePermit,
42        controller: Arc<Controller<L>>,
43    ) -> Self {
44        Self {
45            inner,
46            _permit,
47            controller,
48            start: instant_now(),
49        }
50    }
51}
52
53impl<F, L, E> Future for ResponseFuture<F, L>
54where
55    F: Future<Output = Result<L::Response, E>>,
56    L: RetryLogic,
57    E: Into<crate::Error>,
58{
59    type Output = Result<L::Response, crate::Error>;
60
61    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62        let future = self.project();
63        let output = ready!(future.inner.poll(cx)).map_err(Into::into);
64        future.controller.adjust_to_response(*future.start, &output);
65        Poll::Ready(output)
66    }
67}