vector/sinks/util/service/
health.rs

1use std::{
2    future::Future,
3    pin::Pin,
4    sync::{
5        atomic::{AtomicUsize, Ordering},
6        Arc,
7    },
8    task::{ready, Context, Poll},
9};
10
11use futures::FutureExt;
12use futures_util::{future::BoxFuture, TryFuture};
13use pin_project::pin_project;
14use serde_with::serde_as;
15use stream_cancel::{Trigger, Tripwire};
16use tokio::time::{sleep, Duration};
17use tower::Service;
18use vector_lib::{configurable::configurable_component, emit};
19
20use crate::{
21    common::backoff::ExponentialBackoff,
22    internal_events::{EndpointsActive, OpenGauge},
23};
24
25const RETRY_MAX_DURATION_SECONDS_DEFAULT: u64 = 3_600;
26const RETRY_INITIAL_BACKOFF_SECONDS_DEFAULT: u64 = 1;
27const UNHEALTHY_AMOUNT_OF_ERRORS: usize = 5;
28
29/// Options for determining the health of an endpoint.
30#[serde_as]
31#[configurable_component]
32#[derive(Clone, Debug, Default)]
33#[serde(rename_all = "snake_case")]
34pub struct HealthConfig {
35    /// Initial delay between attempts to reactivate endpoints once they become unhealthy.
36    #[serde(default = "default_retry_initial_backoff_secs")]
37    #[configurable(metadata(docs::type_unit = "seconds"))]
38    // not using Duration type because the value is only used as a u64.
39    #[configurable(metadata(docs::human_name = "Retry Initial Backoff"))]
40    pub retry_initial_backoff_secs: u64,
41
42    /// Maximum delay between attempts to reactivate endpoints once they become unhealthy.
43    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
44    #[serde(default = "default_retry_max_duration_secs")]
45    #[configurable(metadata(docs::human_name = "Max Retry Duration"))]
46    pub retry_max_duration_secs: Duration,
47}
48
49const fn default_retry_initial_backoff_secs() -> u64 {
50    RETRY_INITIAL_BACKOFF_SECONDS_DEFAULT
51}
52
53const fn default_retry_max_duration_secs() -> std::time::Duration {
54    Duration::from_secs(RETRY_MAX_DURATION_SECONDS_DEFAULT)
55}
56
57impl HealthConfig {
58    pub fn build<S, L>(
59        &self,
60        logic: L,
61        inner: S,
62        open: OpenGauge,
63        endpoint: String,
64    ) -> HealthService<S, L> {
65        let counters = Arc::new(HealthCounters::new());
66        let snapshot = counters.snapshot();
67
68        open.clone().open(emit_active_endpoints);
69        HealthService {
70            inner,
71            logic,
72            counters,
73            snapshot,
74            endpoint,
75            state: CircuitState::Closed,
76            open,
77            // An exponential backoff starting from retry_initial_backoff_sec and doubling every time
78            // up to retry_max_duration_secs.
79            backoff: ExponentialBackoff::from_millis(2)
80                .factor((self.retry_initial_backoff_secs.saturating_mul(1000) / 2).max(1))
81                .max_delay(self.retry_max_duration_secs),
82        }
83    }
84}
85
86pub trait HealthLogic: Clone + Send + Sync + 'static {
87    type Error: Send + Sync + 'static;
88    type Response;
89
90    /// Returns health of the endpoint based on the response/error.
91    /// None if there is not enough information to determine it.
92    fn is_healthy(&self, response: &Result<Self::Response, Self::Error>) -> Option<bool>;
93}
94
95enum CircuitState {
96    /// Service is unhealthy hence it's not passing requests downstream.
97    /// Contains timeout.
98    Open(BoxFuture<'static, ()>),
99
100    /// Service will pass one request to test its health.
101    HalfOpen {
102        permit: Option<Trigger>,
103        done: Tripwire,
104    },
105
106    /// Service is healthy and passing requests downstream.
107    Closed,
108}
109
110/// A service which monitors the health of a service.
111/// Behaves like a circuit breaker.
112pub struct HealthService<S, L> {
113    inner: S,
114    logic: L,
115    counters: Arc<HealthCounters>,
116    snapshot: HealthSnapshot,
117    backoff: ExponentialBackoff,
118    state: CircuitState,
119    open: OpenGauge,
120    endpoint: String,
121}
122
123impl<S, L, Req> Service<Req> for HealthService<S, L>
124where
125    L: HealthLogic<Response = S::Response, Error = S::Error>,
126    S: Service<Req>,
127{
128    type Response = S::Response;
129    type Error = S::Error;
130    type Future = HealthFuture<S::Future, L>;
131
132    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133        loop {
134            self.state = match self.state {
135                CircuitState::Open(ref mut timer) => {
136                    ready!(timer.as_mut().poll(cx));
137
138                    debug!(message = "Endpoint is on probation.", endpoint = %&self.endpoint);
139
140                    // Using Tripwire will let us be notified when the request is done.
141                    // This can't be done through counters since a request can end without changing them.
142                    let (permit, done) = Tripwire::new();
143
144                    CircuitState::HalfOpen {
145                        permit: Some(permit),
146                        done,
147                    }
148                }
149                CircuitState::HalfOpen {
150                    permit: Some(_), ..
151                } => {
152                    // Pass one request to test health.
153                    return self.inner.poll_ready(cx).map_err(Into::into);
154                }
155                CircuitState::HalfOpen {
156                    permit: None,
157                    ref mut done,
158                } => {
159                    let done = Pin::new(done);
160                    ready!(done.poll(cx));
161
162                    if self.counters.healthy(self.snapshot).is_ok() {
163                        // A healthy response was observed
164                        info!(message = "Endpoint is healthy.", endpoint = %&self.endpoint);
165
166                        self.backoff.reset();
167                        self.open.clone().open(emit_active_endpoints);
168                        CircuitState::Closed
169                    } else {
170                        debug!(message = "Endpoint failed probation.", endpoint = %&self.endpoint);
171
172                        CircuitState::Open(
173                            sleep(self.backoff.next().expect("Should never end")).boxed(),
174                        )
175                    }
176                }
177                CircuitState::Closed => {
178                    // Check for errors
179                    match self.counters.healthy(self.snapshot) {
180                        Ok(snapshot) => {
181                            // Healthy
182                            self.snapshot = snapshot;
183                            return self.inner.poll_ready(cx).map_err(Into::into);
184                        }
185                        Err(errors) if errors >= UNHEALTHY_AMOUNT_OF_ERRORS => {
186                            // Unhealthy
187                            warn!(message = "Endpoint is unhealthy.", endpoint = %&self.endpoint);
188                            CircuitState::Open(
189                                sleep(self.backoff.next().expect("Should never end")).boxed(),
190                            )
191                        }
192                        Err(_) => {
193                            // Not ideal, but not enough errors to trip yet
194                            return self.inner.poll_ready(cx).map_err(Into::into);
195                        }
196                    }
197                }
198            }
199        }
200    }
201
202    fn call(&mut self, req: Req) -> Self::Future {
203        let permit = if let CircuitState::HalfOpen { permit, .. } = &mut self.state {
204            permit.take()
205        } else {
206            None
207        };
208
209        HealthFuture {
210            inner: self.inner.call(req),
211            logic: self.logic.clone(),
212            counters: Arc::clone(&self.counters),
213            permit,
214        }
215    }
216}
217
218/// Future for HealthService.
219#[pin_project]
220pub struct HealthFuture<F, L> {
221    #[pin]
222    inner: F,
223    logic: L,
224    counters: Arc<HealthCounters>,
225    permit: Option<Trigger>,
226}
227
228impl<F: TryFuture, L> Future for HealthFuture<F, L>
229where
230    F: Future<Output = Result<F::Ok, F::Error>>,
231    L: HealthLogic<Response = F::Ok, Error = F::Error>,
232{
233    type Output = Result<F::Ok, F::Error>;
234
235    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236        // Poll inner
237        let this = self.project();
238        let output = ready!(this.inner.poll(cx)).map_err(Into::into);
239
240        match this.logic.is_healthy(&output) {
241            None => (),
242            Some(true) => this.counters.inc_healthy(),
243            Some(false) => this.counters.inc_unhealthy(),
244        }
245
246        // Request is done so we can now drop the permit.
247        this.permit.take();
248
249        Poll::Ready(output)
250    }
251}
252
253/// Tracker of response health, incremented by HealthFuture and used by HealthService.
254struct HealthCounters {
255    healthy: AtomicUsize,
256    unhealthy: AtomicUsize,
257}
258
259impl HealthCounters {
260    const fn new() -> Self {
261        HealthCounters {
262            healthy: AtomicUsize::new(0),
263            unhealthy: AtomicUsize::new(0),
264        }
265    }
266
267    fn inc_healthy(&self) {
268        self.healthy.fetch_add(1, Ordering::Release);
269    }
270
271    fn inc_unhealthy(&self) {
272        self.unhealthy.fetch_add(1, Ordering::Release);
273    }
274
275    /// Checks if healthy.
276    ///
277    /// Returns new snapshot if healthy.
278    /// Else returns measure of unhealthy. Old snapshot is valid in that case.
279    fn healthy(&self, snapshot: HealthSnapshot) -> Result<HealthSnapshot, usize> {
280        let now = self.snapshot();
281
282        // Compare current snapshot with given
283        if now.healthy > snapshot.healthy {
284            // Healthy response was observed
285            Ok(now)
286        } else if now.unhealthy > snapshot.unhealthy {
287            // Unhealthy response was observed
288            Err(now.unhealthy - snapshot.unhealthy)
289        } else {
290            // No relative observations
291            Ok(now)
292        }
293    }
294
295    fn snapshot(&self) -> HealthSnapshot {
296        HealthSnapshot {
297            healthy: self.healthy.load(Ordering::Acquire),
298            unhealthy: self.unhealthy.load(Ordering::Acquire),
299        }
300    }
301}
302
303#[derive(Clone, Copy, Eq, PartialEq, Debug)]
304struct HealthSnapshot {
305    healthy: usize,
306    unhealthy: usize,
307}
308
309fn emit_active_endpoints(count: usize) {
310    emit!(EndpointsActive { count });
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    #[test]
318    fn test_health_counters() {
319        let counters = HealthCounters::new();
320        let mut snapshot = counters.snapshot();
321
322        counters.inc_healthy();
323        snapshot = counters.healthy(snapshot).unwrap();
324
325        counters.inc_unhealthy();
326        counters.inc_unhealthy();
327        assert_eq!(counters.healthy(snapshot), Err(2));
328
329        counters.inc_healthy();
330        assert!(counters.healthy(snapshot).is_ok());
331    }
332}