1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
use std::{
    future::Future,
    pin::Pin,
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    },
    task::{ready, Context, Poll},
};

use futures::FutureExt;
use futures_util::{future::BoxFuture, TryFuture};
use pin_project::pin_project;
use serde_with::serde_as;
use stream_cancel::{Trigger, Tripwire};
use tokio::time::{sleep, Duration};
use tower::Service;
use vector_lib::{configurable::configurable_component, emit};

use crate::{
    internal_events::{EndpointsActive, OpenGauge},
    sinks::util::retries::ExponentialBackoff,
};

const RETRY_MAX_DURATION_SECONDS_DEFAULT: u64 = 3_600;
const RETRY_INITIAL_BACKOFF_SECONDS_DEFAULT: u64 = 1;
const UNHEALTHY_AMOUNT_OF_ERRORS: usize = 5;

/// Options for determining the health of an endpoint.
#[serde_as]
#[configurable_component]
#[derive(Clone, Debug, Default)]
#[serde(rename_all = "snake_case")]
pub struct HealthConfig {
    /// Initial delay between attempts to reactivate endpoints once they become unhealthy.
    #[serde(default = "default_retry_initial_backoff_secs")]
    #[configurable(metadata(docs::type_unit = "seconds"))]
    // not using Duration type because the value is only used as a u64.
    #[configurable(metadata(docs::human_name = "Retry Initial Backoff"))]
    pub retry_initial_backoff_secs: u64,

    /// Maximum delay between attempts to reactivate endpoints once they become unhealthy.
    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
    #[serde(default = "default_retry_max_duration_secs")]
    #[configurable(metadata(docs::human_name = "Max Retry Duration"))]
    pub retry_max_duration_secs: Duration,
}

const fn default_retry_initial_backoff_secs() -> u64 {
    RETRY_INITIAL_BACKOFF_SECONDS_DEFAULT
}

const fn default_retry_max_duration_secs() -> std::time::Duration {
    Duration::from_secs(RETRY_MAX_DURATION_SECONDS_DEFAULT)
}

impl HealthConfig {
    pub fn build<S, L>(
        &self,
        logic: L,
        inner: S,
        open: OpenGauge,
        endpoint: String,
    ) -> HealthService<S, L> {
        let counters = Arc::new(HealthCounters::new());
        let snapshot = counters.snapshot();

        open.clone().open(emit_active_endpoints);
        HealthService {
            inner,
            logic,
            counters,
            snapshot,
            endpoint,
            state: CircuitState::Closed,
            open,
            // An exponential backoff starting from retry_initial_backoff_sec and doubling every time
            // up to retry_max_duration_secs.
            backoff: ExponentialBackoff::from_millis(2)
                .factor((self.retry_initial_backoff_secs.saturating_mul(1000) / 2).max(1))
                .max_delay(self.retry_max_duration_secs),
        }
    }
}

pub trait HealthLogic: Clone + Send + Sync + 'static {
    type Error: Send + Sync + 'static;
    type Response;

    /// Returns health of the endpoint based on the response/error.
    /// None if there is not enough information to determine it.
    fn is_healthy(&self, response: &Result<Self::Response, Self::Error>) -> Option<bool>;
}

enum CircuitState {
    /// Service is unhealthy hence it's not passing requests downstream.
    /// Contains timeout.
    Open(BoxFuture<'static, ()>),

    /// Service will pass one request to test its health.
    HalfOpen {
        permit: Option<Trigger>,
        done: Tripwire,
    },

    /// Service is healthy and passing requests downstream.
    Closed,
}

/// A service which monitors the health of a service.
/// Behaves like a circuit breaker.
pub struct HealthService<S, L> {
    inner: S,
    logic: L,
    counters: Arc<HealthCounters>,
    snapshot: HealthSnapshot,
    backoff: ExponentialBackoff,
    state: CircuitState,
    open: OpenGauge,
    endpoint: String,
}

impl<S, L, Req> Service<Req> for HealthService<S, L>
where
    L: HealthLogic<Response = S::Response, Error = S::Error>,
    S: Service<Req>,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = HealthFuture<S::Future, L>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        loop {
            self.state = match self.state {
                CircuitState::Open(ref mut timer) => {
                    ready!(timer.as_mut().poll(cx));

                    debug!(message = "Endpoint is on probation.", endpoint = %&self.endpoint);

                    // Using Tripwire will let us be notified when the request is done.
                    // This can't be done through counters since a request can end without changing them.
                    let (permit, done) = Tripwire::new();

                    CircuitState::HalfOpen {
                        permit: Some(permit),
                        done,
                    }
                }
                CircuitState::HalfOpen {
                    permit: Some(_), ..
                } => {
                    // Pass one request to test health.
                    return self.inner.poll_ready(cx).map_err(Into::into);
                }
                CircuitState::HalfOpen {
                    permit: None,
                    ref mut done,
                } => {
                    let done = Pin::new(done);
                    ready!(done.poll(cx));

                    if self.counters.healthy(self.snapshot).is_ok() {
                        // A healthy response was observed
                        info!(message = "Endpoint is healthy.", endpoint = %&self.endpoint);

                        self.backoff.reset();
                        self.open.clone().open(emit_active_endpoints);
                        CircuitState::Closed
                    } else {
                        debug!(message = "Endpoint failed probation.", endpoint = %&self.endpoint);

                        CircuitState::Open(
                            sleep(self.backoff.next().expect("Should never end")).boxed(),
                        )
                    }
                }
                CircuitState::Closed => {
                    // Check for errors
                    match self.counters.healthy(self.snapshot) {
                        Ok(snapshot) => {
                            // Healthy
                            self.snapshot = snapshot;
                            return self.inner.poll_ready(cx).map_err(Into::into);
                        }
                        Err(errors) if errors >= UNHEALTHY_AMOUNT_OF_ERRORS => {
                            // Unhealthy
                            warn!(message = "Endpoint is unhealthy.", endpoint = %&self.endpoint);
                            CircuitState::Open(
                                sleep(self.backoff.next().expect("Should never end")).boxed(),
                            )
                        }
                        Err(_) => {
                            // Not ideal, but not enough errors to trip yet
                            return self.inner.poll_ready(cx).map_err(Into::into);
                        }
                    }
                }
            }
        }
    }

    fn call(&mut self, req: Req) -> Self::Future {
        let permit = if let CircuitState::HalfOpen { permit, .. } = &mut self.state {
            permit.take()
        } else {
            None
        };

        HealthFuture {
            inner: self.inner.call(req),
            logic: self.logic.clone(),
            counters: Arc::clone(&self.counters),
            permit,
        }
    }
}

/// Future for HealthService.
#[pin_project]
pub struct HealthFuture<F, L> {
    #[pin]
    inner: F,
    logic: L,
    counters: Arc<HealthCounters>,
    permit: Option<Trigger>,
}

impl<F: TryFuture, L> Future for HealthFuture<F, L>
where
    F: Future<Output = Result<F::Ok, F::Error>>,
    L: HealthLogic<Response = F::Ok, Error = F::Error>,
{
    type Output = Result<F::Ok, F::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Poll inner
        let this = self.project();
        let output = ready!(this.inner.poll(cx)).map_err(Into::into);

        match this.logic.is_healthy(&output) {
            None => (),
            Some(true) => this.counters.inc_healthy(),
            Some(false) => this.counters.inc_unhealthy(),
        }

        // Request is done so we can now drop the permit.
        this.permit.take();

        Poll::Ready(output)
    }
}

/// Tracker of response health, incremented by HealthFuture and used by HealthService.
struct HealthCounters {
    healthy: AtomicUsize,
    unhealthy: AtomicUsize,
}

impl HealthCounters {
    const fn new() -> Self {
        HealthCounters {
            healthy: AtomicUsize::new(0),
            unhealthy: AtomicUsize::new(0),
        }
    }

    fn inc_healthy(&self) {
        self.healthy.fetch_add(1, Ordering::Release);
    }

    fn inc_unhealthy(&self) {
        self.unhealthy.fetch_add(1, Ordering::Release);
    }

    /// Checks if healthy.
    ///
    /// Returns new snapshot if healthy.
    /// Else returns measure of unhealthy. Old snapshot is valid in that case.
    fn healthy(&self, snapshot: HealthSnapshot) -> Result<HealthSnapshot, usize> {
        let now = self.snapshot();

        // Compare current snapshot with given
        if now.healthy > snapshot.healthy {
            // Healthy response was observed
            Ok(now)
        } else if now.unhealthy > snapshot.unhealthy {
            // Unhealthy response was observed
            Err(now.unhealthy - snapshot.unhealthy)
        } else {
            // No relative observations
            Ok(now)
        }
    }

    fn snapshot(&self) -> HealthSnapshot {
        HealthSnapshot {
            healthy: self.healthy.load(Ordering::Acquire),
            unhealthy: self.unhealthy.load(Ordering::Acquire),
        }
    }
}

#[derive(Clone, Copy, Eq, PartialEq, Debug)]
struct HealthSnapshot {
    healthy: usize,
    unhealthy: usize,
}

fn emit_active_endpoints(count: usize) {
    emit!(EndpointsActive { count });
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_health_counters() {
        let counters = HealthCounters::new();
        let mut snapshot = counters.snapshot();

        counters.inc_healthy();
        snapshot = counters.healthy(snapshot).unwrap();

        counters.inc_unhealthy();
        counters.inc_unhealthy();
        assert_eq!(counters.healthy(snapshot), Err(2));

        counters.inc_healthy();
        assert!(counters.healthy(snapshot).is_ok());
    }
}