vector/sinks/util/service/
health.rs1use std::{
2 future::Future,
3 pin::Pin,
4 sync::{
5 atomic::{AtomicUsize, Ordering},
6 Arc,
7 },
8 task::{ready, Context, Poll},
9};
10
11use futures::FutureExt;
12use futures_util::{future::BoxFuture, TryFuture};
13use pin_project::pin_project;
14use serde_with::serde_as;
15use stream_cancel::{Trigger, Tripwire};
16use tokio::time::{sleep, Duration};
17use tower::Service;
18use vector_lib::{configurable::configurable_component, emit};
19
20use crate::{
21 common::backoff::ExponentialBackoff,
22 internal_events::{EndpointsActive, OpenGauge},
23};
24
25const RETRY_MAX_DURATION_SECONDS_DEFAULT: u64 = 3_600;
26const RETRY_INITIAL_BACKOFF_SECONDS_DEFAULT: u64 = 1;
27const UNHEALTHY_AMOUNT_OF_ERRORS: usize = 5;
28
29#[serde_as]
31#[configurable_component]
32#[derive(Clone, Debug, Default)]
33#[serde(rename_all = "snake_case")]
34pub struct HealthConfig {
35 #[serde(default = "default_retry_initial_backoff_secs")]
37 #[configurable(metadata(docs::type_unit = "seconds"))]
38 #[configurable(metadata(docs::human_name = "Retry Initial Backoff"))]
40 pub retry_initial_backoff_secs: u64,
41
42 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
44 #[serde(default = "default_retry_max_duration_secs")]
45 #[configurable(metadata(docs::human_name = "Max Retry Duration"))]
46 pub retry_max_duration_secs: Duration,
47}
48
49const fn default_retry_initial_backoff_secs() -> u64 {
50 RETRY_INITIAL_BACKOFF_SECONDS_DEFAULT
51}
52
53const fn default_retry_max_duration_secs() -> std::time::Duration {
54 Duration::from_secs(RETRY_MAX_DURATION_SECONDS_DEFAULT)
55}
56
57impl HealthConfig {
58 pub fn build<S, L>(
59 &self,
60 logic: L,
61 inner: S,
62 open: OpenGauge,
63 endpoint: String,
64 ) -> HealthService<S, L> {
65 let counters = Arc::new(HealthCounters::new());
66 let snapshot = counters.snapshot();
67
68 open.clone().open(emit_active_endpoints);
69 HealthService {
70 inner,
71 logic,
72 counters,
73 snapshot,
74 endpoint,
75 state: CircuitState::Closed,
76 open,
77 backoff: ExponentialBackoff::from_millis(2)
80 .factor((self.retry_initial_backoff_secs.saturating_mul(1000) / 2).max(1))
81 .max_delay(self.retry_max_duration_secs),
82 }
83 }
84}
85
86pub trait HealthLogic: Clone + Send + Sync + 'static {
87 type Error: Send + Sync + 'static;
88 type Response;
89
90 fn is_healthy(&self, response: &Result<Self::Response, Self::Error>) -> Option<bool>;
93}
94
95enum CircuitState {
96 Open(BoxFuture<'static, ()>),
99
100 HalfOpen {
102 permit: Option<Trigger>,
103 done: Tripwire,
104 },
105
106 Closed,
108}
109
110pub struct HealthService<S, L> {
113 inner: S,
114 logic: L,
115 counters: Arc<HealthCounters>,
116 snapshot: HealthSnapshot,
117 backoff: ExponentialBackoff,
118 state: CircuitState,
119 open: OpenGauge,
120 endpoint: String,
121}
122
123impl<S, L, Req> Service<Req> for HealthService<S, L>
124where
125 L: HealthLogic<Response = S::Response, Error = S::Error>,
126 S: Service<Req>,
127{
128 type Response = S::Response;
129 type Error = S::Error;
130 type Future = HealthFuture<S::Future, L>;
131
132 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
133 loop {
134 self.state = match self.state {
135 CircuitState::Open(ref mut timer) => {
136 ready!(timer.as_mut().poll(cx));
137
138 debug!(message = "Endpoint is on probation.", endpoint = %&self.endpoint);
139
140 let (permit, done) = Tripwire::new();
143
144 CircuitState::HalfOpen {
145 permit: Some(permit),
146 done,
147 }
148 }
149 CircuitState::HalfOpen {
150 permit: Some(_), ..
151 } => {
152 return self.inner.poll_ready(cx).map_err(Into::into);
154 }
155 CircuitState::HalfOpen {
156 permit: None,
157 ref mut done,
158 } => {
159 let done = Pin::new(done);
160 ready!(done.poll(cx));
161
162 if self.counters.healthy(self.snapshot).is_ok() {
163 info!(message = "Endpoint is healthy.", endpoint = %&self.endpoint);
165
166 self.backoff.reset();
167 self.open.clone().open(emit_active_endpoints);
168 CircuitState::Closed
169 } else {
170 debug!(message = "Endpoint failed probation.", endpoint = %&self.endpoint);
171
172 CircuitState::Open(
173 sleep(self.backoff.next().expect("Should never end")).boxed(),
174 )
175 }
176 }
177 CircuitState::Closed => {
178 match self.counters.healthy(self.snapshot) {
180 Ok(snapshot) => {
181 self.snapshot = snapshot;
183 return self.inner.poll_ready(cx).map_err(Into::into);
184 }
185 Err(errors) if errors >= UNHEALTHY_AMOUNT_OF_ERRORS => {
186 warn!(message = "Endpoint is unhealthy.", endpoint = %&self.endpoint);
188 CircuitState::Open(
189 sleep(self.backoff.next().expect("Should never end")).boxed(),
190 )
191 }
192 Err(_) => {
193 return self.inner.poll_ready(cx).map_err(Into::into);
195 }
196 }
197 }
198 }
199 }
200 }
201
202 fn call(&mut self, req: Req) -> Self::Future {
203 let permit = if let CircuitState::HalfOpen { permit, .. } = &mut self.state {
204 permit.take()
205 } else {
206 None
207 };
208
209 HealthFuture {
210 inner: self.inner.call(req),
211 logic: self.logic.clone(),
212 counters: Arc::clone(&self.counters),
213 permit,
214 }
215 }
216}
217
218#[pin_project]
220pub struct HealthFuture<F, L> {
221 #[pin]
222 inner: F,
223 logic: L,
224 counters: Arc<HealthCounters>,
225 permit: Option<Trigger>,
226}
227
228impl<F: TryFuture, L> Future for HealthFuture<F, L>
229where
230 F: Future<Output = Result<F::Ok, F::Error>>,
231 L: HealthLogic<Response = F::Ok, Error = F::Error>,
232{
233 type Output = Result<F::Ok, F::Error>;
234
235 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
236 let this = self.project();
238 let output = ready!(this.inner.poll(cx)).map_err(Into::into);
239
240 match this.logic.is_healthy(&output) {
241 None => (),
242 Some(true) => this.counters.inc_healthy(),
243 Some(false) => this.counters.inc_unhealthy(),
244 }
245
246 this.permit.take();
248
249 Poll::Ready(output)
250 }
251}
252
253struct HealthCounters {
255 healthy: AtomicUsize,
256 unhealthy: AtomicUsize,
257}
258
259impl HealthCounters {
260 const fn new() -> Self {
261 HealthCounters {
262 healthy: AtomicUsize::new(0),
263 unhealthy: AtomicUsize::new(0),
264 }
265 }
266
267 fn inc_healthy(&self) {
268 self.healthy.fetch_add(1, Ordering::Release);
269 }
270
271 fn inc_unhealthy(&self) {
272 self.unhealthy.fetch_add(1, Ordering::Release);
273 }
274
275 fn healthy(&self, snapshot: HealthSnapshot) -> Result<HealthSnapshot, usize> {
280 let now = self.snapshot();
281
282 if now.healthy > snapshot.healthy {
284 Ok(now)
286 } else if now.unhealthy > snapshot.unhealthy {
287 Err(now.unhealthy - snapshot.unhealthy)
289 } else {
290 Ok(now)
292 }
293 }
294
295 fn snapshot(&self) -> HealthSnapshot {
296 HealthSnapshot {
297 healthy: self.healthy.load(Ordering::Acquire),
298 unhealthy: self.unhealthy.load(Ordering::Acquire),
299 }
300 }
301}
302
303#[derive(Clone, Copy, Eq, PartialEq, Debug)]
304struct HealthSnapshot {
305 healthy: usize,
306 unhealthy: usize,
307}
308
309fn emit_active_endpoints(count: usize) {
310 emit!(EndpointsActive { count });
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
318 fn test_health_counters() {
319 let counters = HealthCounters::new();
320 let mut snapshot = counters.snapshot();
321
322 counters.inc_healthy();
323 snapshot = counters.healthy(snapshot).unwrap();
324
325 counters.inc_unhealthy();
326 counters.inc_unhealthy();
327 assert_eq!(counters.healthy(snapshot), Err(2));
328
329 counters.inc_healthy();
330 assert!(counters.healthy(snapshot).is_ok());
331 }
332}