vector/sinks/util/
service.rs

1use std::{hash::Hash, marker::PhantomData, num::NonZeroU64, pin::Pin, sync::Arc, time::Duration};
2
3use futures_util::stream::{self, BoxStream};
4use serde_with::serde_as;
5use tower::{
6    balance::p2c::Balance,
7    buffer::{Buffer, BufferLayer},
8    discover::Change,
9    layer::{util::Stack, Layer},
10    limit::RateLimit,
11    retry::Retry,
12    timeout::Timeout,
13    Service, ServiceBuilder,
14};
15use vector_lib::configurable::configurable_component;
16
17pub use crate::sinks::util::service::{
18    concurrency::Concurrency,
19    health::{HealthConfig, HealthLogic, HealthService},
20    map::Map,
21};
22use crate::{
23    internal_events::OpenGauge,
24    sinks::util::{
25        adaptive_concurrency::{
26            AdaptiveConcurrencyLimit, AdaptiveConcurrencyLimitLayer, AdaptiveConcurrencySettings,
27        },
28        retries::{FibonacciRetryPolicy, JitterMode, RetryLogic},
29        service::map::MapLayer,
30        sink::Response,
31        Batch, BatchSink, Partition, PartitionBatchSink,
32    },
33};
34
35mod concurrency;
36mod health;
37mod map;
38pub mod net;
39
40pub type Svc<S, L> =
41    RateLimit<AdaptiveConcurrencyLimit<Retry<FibonacciRetryPolicy<L>, Timeout<S>>, L>>;
42pub type TowerBatchedSink<S, B, RL> = BatchSink<Svc<S, RL>, B>;
43pub type TowerPartitionSink<S, B, RL, K> = PartitionBatchSink<Svc<S, RL>, B, K>;
44
45// Distributed service types
46pub type DistributedService<S, RL, HL, K, Req> = RateLimit<
47    Retry<
48        FibonacciRetryPolicy<RL>,
49        Buffer<Req, <Balance<DiscoveryService<S, RL, HL, K>, Req> as Service<Req>>::Future>,
50    >,
51>;
52pub type DiscoveryService<S, RL, HL, K> =
53    BoxStream<'static, Result<Change<K, SingleDistributedService<S, RL, HL>>, crate::Error>>;
54pub type SingleDistributedService<S, RL, HL> =
55    AdaptiveConcurrencyLimit<HealthService<Timeout<S>, HL>, RL>;
56
57pub trait ServiceBuilderExt<L> {
58    fn map<R1, R2, F>(self, f: F) -> ServiceBuilder<Stack<MapLayer<R1, R2>, L>>
59    where
60        F: Fn(R1) -> R2 + Send + Sync + 'static;
61
62    fn settings<RL, Request>(
63        self,
64        settings: TowerRequestSettings,
65        retry_logic: RL,
66    ) -> ServiceBuilder<Stack<TowerRequestLayer<RL, Request>, L>>;
67}
68
69impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
70    fn map<R1, R2, F>(self, f: F) -> ServiceBuilder<Stack<MapLayer<R1, R2>, L>>
71    where
72        F: Fn(R1) -> R2 + Send + Sync + 'static,
73    {
74        self.layer(MapLayer::new(Arc::new(f)))
75    }
76
77    fn settings<RL, Request>(
78        self,
79        settings: TowerRequestSettings,
80        retry_logic: RL,
81    ) -> ServiceBuilder<Stack<TowerRequestLayer<RL, Request>, L>> {
82        self.layer(TowerRequestLayer {
83            settings,
84            retry_logic,
85            _pd: std::marker::PhantomData,
86        })
87    }
88}
89
90pub trait TowerRequestConfigDefaults {
91    const CONCURRENCY: Concurrency = Concurrency::Adaptive;
92    const TIMEOUT_SECS: u64 = 60;
93    const RATE_LIMIT_DURATION_SECS: u64 = 1;
94    const RATE_LIMIT_NUM: u64 = i64::MAX as u64; // i64 avoids TOML deserialize issue
95    const RETRY_ATTEMPTS: usize = isize::MAX as usize; // isize avoids TOML deserialize issue
96    const RETRY_MAX_DURATION_SECS: NonZeroU64 = NonZeroU64::new(30).unwrap();
97    const RETRY_INITIAL_BACKOFF_SECS: NonZeroU64 = NonZeroU64::new(1).unwrap();
98}
99
100#[derive(Clone, Copy, Debug)]
101pub struct GlobalTowerRequestConfigDefaults;
102
103impl TowerRequestConfigDefaults for GlobalTowerRequestConfigDefaults {}
104
105/// Middleware settings for outbound requests.
106///
107/// Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior.
108///
109/// Note that the retry backoff policy follows the Fibonacci sequence.
110#[serde_as]
111#[configurable_component]
112#[configurable(metadata(docs::advanced))]
113#[derive(Clone, Copy, Debug)]
114pub struct TowerRequestConfig<D: TowerRequestConfigDefaults = GlobalTowerRequestConfigDefaults> {
115    #[configurable(derived)]
116    #[serde(default = "default_concurrency::<D>")]
117    #[serde(skip_serializing_if = "concurrency_is_default::<D>")]
118    pub concurrency: Concurrency,
119
120    /// The time a request can take before being aborted.
121    ///
122    /// Datadog highly recommends that you do not lower this value below the service's internal timeout, as this could
123    /// create orphaned requests, pile on retries, and result in duplicate data downstream.
124    #[configurable(metadata(docs::type_unit = "seconds"))]
125    #[configurable(metadata(docs::human_name = "Timeout"))]
126    #[serde(default = "default_timeout_secs::<D>")]
127    pub timeout_secs: u64,
128
129    /// The time window used for the `rate_limit_num` option.
130    #[configurable(metadata(docs::type_unit = "seconds"))]
131    #[configurable(metadata(docs::human_name = "Rate Limit Duration"))]
132    #[serde(default = "default_rate_limit_duration_secs::<D>")]
133    pub rate_limit_duration_secs: u64,
134
135    /// The maximum number of requests allowed within the `rate_limit_duration_secs` time window.
136    #[configurable(metadata(docs::type_unit = "requests"))]
137    #[configurable(metadata(docs::human_name = "Rate Limit Number"))]
138    #[serde(default = "default_rate_limit_num::<D>")]
139    pub rate_limit_num: u64,
140
141    /// The maximum number of retries to make for failed requests.
142    #[configurable(metadata(docs::type_unit = "retries"))]
143    #[serde(default = "default_retry_attempts::<D>")]
144    pub retry_attempts: usize,
145
146    /// The maximum amount of time to wait between retries.
147    #[configurable(metadata(docs::type_unit = "seconds"))]
148    #[configurable(metadata(docs::human_name = "Max Retry Duration"))]
149    #[serde(default = "default_retry_max_duration_secs::<D>")]
150    pub retry_max_duration_secs: NonZeroU64,
151
152    /// The amount of time to wait before attempting the first retry for a failed request.
153    ///
154    /// After the first retry has failed, the fibonacci sequence is used to select future backoffs.
155    #[configurable(metadata(docs::type_unit = "seconds"))]
156    #[configurable(metadata(docs::human_name = "Retry Initial Backoff"))]
157    #[serde(default = "default_retry_initial_backoff_secs::<D>")]
158    pub retry_initial_backoff_secs: NonZeroU64,
159
160    #[configurable(derived)]
161    #[serde(default)]
162    pub retry_jitter_mode: JitterMode,
163
164    #[configurable(derived)]
165    #[serde(default)]
166    pub adaptive_concurrency: AdaptiveConcurrencySettings,
167
168    #[serde(skip)]
169    pub _d: PhantomData<D>,
170}
171
172const fn default_concurrency<D: TowerRequestConfigDefaults>() -> Concurrency {
173    D::CONCURRENCY
174}
175
176fn concurrency_is_default<D: TowerRequestConfigDefaults>(concurrency: &Concurrency) -> bool {
177    *concurrency == D::CONCURRENCY
178}
179
180const fn default_timeout_secs<D: TowerRequestConfigDefaults>() -> u64 {
181    D::TIMEOUT_SECS
182}
183
184const fn default_rate_limit_duration_secs<D: TowerRequestConfigDefaults>() -> u64 {
185    D::RATE_LIMIT_DURATION_SECS
186}
187
188const fn default_rate_limit_num<D: TowerRequestConfigDefaults>() -> u64 {
189    D::RATE_LIMIT_NUM
190}
191
192const fn default_retry_attempts<D: TowerRequestConfigDefaults>() -> usize {
193    D::RETRY_ATTEMPTS
194}
195
196const fn default_retry_max_duration_secs<D: TowerRequestConfigDefaults>() -> NonZeroU64 {
197    D::RETRY_MAX_DURATION_SECS
198}
199
200const fn default_retry_initial_backoff_secs<D: TowerRequestConfigDefaults>() -> NonZeroU64 {
201    D::RETRY_INITIAL_BACKOFF_SECS
202}
203
204impl<D: TowerRequestConfigDefaults> Default for TowerRequestConfig<D> {
205    fn default() -> Self {
206        Self {
207            concurrency: default_concurrency::<D>(),
208            timeout_secs: default_timeout_secs::<D>(),
209            rate_limit_duration_secs: default_rate_limit_duration_secs::<D>(),
210            rate_limit_num: default_rate_limit_num::<D>(),
211            retry_attempts: default_retry_attempts::<D>(),
212            retry_max_duration_secs: default_retry_max_duration_secs::<D>(),
213            retry_initial_backoff_secs: default_retry_initial_backoff_secs::<D>(),
214            adaptive_concurrency: AdaptiveConcurrencySettings::default(),
215            retry_jitter_mode: JitterMode::default(),
216
217            _d: PhantomData,
218        }
219    }
220}
221
222impl<D: TowerRequestConfigDefaults> TowerRequestConfig<D> {
223    pub const fn into_settings(&self) -> TowerRequestSettings {
224        // the unwrap() calls below are safe because the final defaults are always Some<>
225        TowerRequestSettings {
226            concurrency: self.concurrency.parse_concurrency(),
227            timeout: Duration::from_secs(self.timeout_secs),
228            rate_limit_duration: Duration::from_secs(self.rate_limit_duration_secs),
229            rate_limit_num: self.rate_limit_num,
230            retry_attempts: self.retry_attempts,
231            retry_max_duration: Duration::from_secs(self.retry_max_duration_secs.get()),
232            retry_initial_backoff: Duration::from_secs(self.retry_initial_backoff_secs.get()),
233            adaptive_concurrency: self.adaptive_concurrency,
234            retry_jitter_mode: self.retry_jitter_mode,
235        }
236    }
237}
238
239#[derive(Debug, Clone)]
240pub struct TowerRequestSettings {
241    pub concurrency: Option<usize>,
242    pub timeout: Duration,
243    pub rate_limit_duration: Duration,
244    pub rate_limit_num: u64,
245    pub retry_attempts: usize,
246    pub retry_max_duration: Duration,
247    pub retry_initial_backoff: Duration,
248    pub adaptive_concurrency: AdaptiveConcurrencySettings,
249    pub retry_jitter_mode: JitterMode,
250}
251
252impl TowerRequestSettings {
253    pub fn retry_policy<L: RetryLogic>(&self, logic: L) -> FibonacciRetryPolicy<L> {
254        FibonacciRetryPolicy::new(
255            self.retry_attempts,
256            self.retry_initial_backoff,
257            self.retry_max_duration,
258            logic,
259            self.retry_jitter_mode,
260        )
261    }
262
263    /// Note: This has been deprecated, please do not use when creating new Sinks.
264    pub fn partition_sink<B, RL, S, K>(
265        &self,
266        retry_logic: RL,
267        service: S,
268        batch: B,
269        batch_timeout: Duration,
270    ) -> TowerPartitionSink<S, B, RL, K>
271    where
272        RL: RetryLogic<Request = <B as Batch>::Output, Response = S::Response>,
273        S: Service<B::Output> + Clone + Send + 'static,
274        S::Error: Into<crate::Error> + Send + Sync + 'static,
275        S::Response: Send + Response,
276        S::Future: Send + 'static,
277        B: Batch,
278        B::Input: Partition<K>,
279        B::Output: Send + Clone + 'static,
280        K: Hash + Eq + Clone + Send + 'static,
281    {
282        let service = ServiceBuilder::new()
283            .settings(self.clone(), retry_logic)
284            .service(service);
285        PartitionBatchSink::new(service, batch, batch_timeout)
286    }
287
288    /// Note: This has been deprecated, please do not use when creating new Sinks.
289    pub fn batch_sink<B, RL, S>(
290        &self,
291        retry_logic: RL,
292        service: S,
293        batch: B,
294        batch_timeout: Duration,
295    ) -> TowerBatchedSink<S, B, RL>
296    where
297        RL: RetryLogic<Request = <B as Batch>::Output, Response = S::Response>,
298        S: Service<B::Output> + Clone + Send + 'static,
299        S::Error: Into<crate::Error> + Send + Sync + 'static,
300        S::Response: Send + Response,
301        S::Future: Send + 'static,
302        B: Batch,
303        B::Output: Send + Clone + 'static,
304    {
305        let service = ServiceBuilder::new()
306            .settings(self.clone(), retry_logic)
307            .service(service);
308        BatchSink::new(service, batch, batch_timeout)
309    }
310
311    /// Distributes requests to services [(Endpoint, service, healthcheck)]
312    ///
313    /// [BufferLayer] suggests that the `buffer_bound` should be at least equal to
314    /// the number of the callers of the service. For sinks, this should typically be 1.
315    pub fn distributed_service<Req, RL, HL, S>(
316        self,
317        retry_logic: RL,
318        services: Vec<(String, S)>,
319        health_config: HealthConfig,
320        health_logic: HL,
321        buffer_bound: usize,
322    ) -> DistributedService<S, RL, HL, usize, Req>
323    where
324        Req: Clone + Send + 'static,
325        RL: RetryLogic<Response = S::Response>,
326        HL: HealthLogic<Response = S::Response, Error = crate::Error>,
327        S: Service<Req> + Clone + Send + 'static,
328        S::Error: Into<crate::Error> + Send + Sync + 'static,
329        S::Response: Send,
330        S::Future: Send + 'static,
331    {
332        let policy = self.retry_policy(retry_logic.clone());
333
334        // Build services
335        let open = OpenGauge::new();
336        let services = services
337            .into_iter()
338            .map(|(endpoint, inner)| {
339                // Build individual service
340                ServiceBuilder::new()
341                    .layer(AdaptiveConcurrencyLimitLayer::new(
342                        self.concurrency,
343                        self.adaptive_concurrency,
344                        retry_logic.clone(),
345                    ))
346                    .service(
347                        health_config.build(
348                            health_logic.clone(),
349                            ServiceBuilder::new().timeout(self.timeout).service(inner),
350                            open.clone(),
351                            endpoint,
352                        ), // NOTE: there is a version conflict for crate `tracing` between `tracing_tower` crate
353                           // and Vector. Once that is resolved, this can be used instead of passing endpoint everywhere.
354                           // .trace_service(|_| info_span!("endpoint", %endpoint)),
355                    )
356            })
357            .enumerate()
358            .map(|(i, service)| Ok::<_, S::Error>(Change::Insert(i, service)))
359            .collect::<Vec<_>>();
360
361        // Build sink service
362        ServiceBuilder::new()
363            .rate_limit(self.rate_limit_num, self.rate_limit_duration)
364            .retry(policy)
365            // [Balance] must be wrapped with a [BufferLayer] so that the overall service implements Clone.
366            .layer(BufferLayer::new(buffer_bound))
367            .service(Balance::new(Box::pin(stream::iter(services)) as Pin<Box<_>>))
368    }
369}
370
371#[derive(Debug, Clone)]
372pub struct TowerRequestLayer<L, Request> {
373    settings: TowerRequestSettings,
374    retry_logic: L,
375    _pd: PhantomData<Request>,
376}
377
378impl<S, RL, Request> Layer<S> for TowerRequestLayer<RL, Request>
379where
380    S: Service<Request> + Send + 'static,
381    S::Response: Send + 'static,
382    S::Error: Into<crate::Error> + Send + Sync + 'static,
383    S::Future: Send + 'static,
384    RL: RetryLogic<Response = S::Response> + Send + 'static,
385    Request: Clone + Send + 'static,
386{
387    type Service = Svc<S, RL>;
388
389    fn layer(&self, inner: S) -> Self::Service {
390        let policy = self.settings.retry_policy(self.retry_logic.clone());
391        ServiceBuilder::new()
392            .rate_limit(
393                self.settings.rate_limit_num,
394                self.settings.rate_limit_duration,
395            )
396            .layer(AdaptiveConcurrencyLimitLayer::new(
397                self.settings.concurrency,
398                self.settings.adaptive_concurrency,
399                self.retry_logic.clone(),
400            ))
401            .retry(policy)
402            .timeout(self.settings.timeout)
403            .service(inner)
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use std::sync::{
410        atomic::{AtomicBool, Ordering::AcqRel},
411        Arc, Mutex,
412    };
413
414    use futures::{future, stream, FutureExt, SinkExt, StreamExt};
415    use tokio::time::Duration;
416    use vector_lib::json_size::JsonSize;
417
418    use super::*;
419    use crate::sinks::util::{
420        retries::{RetryAction, RetryLogic},
421        BatchSettings, EncodedEvent, PartitionBuffer, PartitionInnerBuffer, VecBuffer,
422    };
423
424    const TIMEOUT: Duration = Duration::from_secs(10);
425
426    #[test]
427    fn concurrency_param_works() {
428        let cfg = TowerRequestConfig::<GlobalTowerRequestConfigDefaults>::default();
429        let toml = toml::to_string(&cfg).unwrap();
430        toml::from_str::<TowerRequestConfig>(&toml).expect("Default config failed");
431
432        let cfg = toml::from_str::<TowerRequestConfig>("").expect("Empty config failed");
433        assert_eq!(cfg.concurrency, Concurrency::Adaptive);
434
435        let cfg = toml::from_str::<TowerRequestConfig>("concurrency = 10")
436            .expect("Fixed concurrency failed");
437        assert_eq!(cfg.concurrency, Concurrency::Fixed(10));
438
439        let cfg = toml::from_str::<TowerRequestConfig>(r#"concurrency = "adaptive""#)
440            .expect("Adaptive concurrency setting failed");
441        assert_eq!(cfg.concurrency, Concurrency::Adaptive);
442
443        let cfg = toml::from_str::<TowerRequestConfig>(r#"concurrency = "none""#)
444            .expect("None concurrency setting failed");
445        assert_eq!(cfg.concurrency, Concurrency::None);
446
447        toml::from_str::<TowerRequestConfig>(r#"concurrency = "broken""#)
448            .expect_err("Invalid concurrency setting didn't fail");
449
450        toml::from_str::<TowerRequestConfig>(r"concurrency = 0")
451            .expect_err("Invalid concurrency setting didn't fail on zero");
452
453        toml::from_str::<TowerRequestConfig>(r"concurrency = -9")
454            .expect_err("Invalid concurrency setting didn't fail on negative number");
455    }
456
457    #[test]
458    fn into_settings_with_global_defaults() {
459        let cfg = TowerRequestConfig::<GlobalTowerRequestConfigDefaults>::default();
460        let settings = cfg.into_settings();
461
462        assert_eq!(settings.concurrency, None);
463        assert_eq!(settings.timeout, Duration::from_secs(60));
464        assert_eq!(settings.rate_limit_duration, Duration::from_secs(1));
465        assert_eq!(settings.rate_limit_num, i64::MAX as u64);
466        assert_eq!(settings.retry_attempts, isize::MAX as usize);
467        assert_eq!(settings.retry_max_duration, Duration::from_secs(30));
468        assert_eq!(settings.retry_initial_backoff, Duration::from_secs(1));
469    }
470
471    #[derive(Clone, Copy, Debug)]
472    pub struct TestTowerRequestConfigDefaults;
473
474    impl TowerRequestConfigDefaults for TestTowerRequestConfigDefaults {
475        const CONCURRENCY: Concurrency = Concurrency::None;
476        const TIMEOUT_SECS: u64 = 1;
477        const RATE_LIMIT_DURATION_SECS: u64 = 2;
478        const RATE_LIMIT_NUM: u64 = 3;
479        const RETRY_ATTEMPTS: usize = 4;
480        const RETRY_MAX_DURATION_SECS: NonZeroU64 = NonZeroU64::new(5).unwrap();
481        const RETRY_INITIAL_BACKOFF_SECS: NonZeroU64 = NonZeroU64::new(6).unwrap();
482    }
483
484    #[test]
485    fn into_settings_with_overridden_defaults() {
486        let cfg = TowerRequestConfig::<TestTowerRequestConfigDefaults>::default();
487        let settings = cfg.into_settings();
488
489        assert_eq!(settings.concurrency, Some(1));
490        assert_eq!(settings.timeout, Duration::from_secs(1));
491        assert_eq!(settings.rate_limit_duration, Duration::from_secs(2));
492        assert_eq!(settings.rate_limit_num, 3);
493        assert_eq!(settings.retry_attempts, 4);
494        assert_eq!(settings.retry_max_duration, Duration::from_secs(5));
495        assert_eq!(settings.retry_initial_backoff, Duration::from_secs(6));
496    }
497
498    #[test]
499    fn into_settings_with_populated_config() {
500        // Populate with values not equal to the global defaults.
501        let cfg = toml::from_str::<TowerRequestConfig>(
502            r" concurrency = 16
503            timeout_secs = 1
504            rate_limit_duration_secs = 2
505            rate_limit_num = 3
506            retry_attempts = 4
507            retry_max_duration_secs = 5
508            retry_initial_backoff_secs = 6
509        ",
510        )
511        .expect("Config failed to parse");
512
513        // Merge with defaults
514        let settings = cfg.into_settings();
515        assert_eq!(
516            settings.concurrency,
517            Concurrency::Fixed(16).parse_concurrency()
518        );
519        assert_eq!(settings.timeout, Duration::from_secs(1));
520        assert_eq!(settings.rate_limit_duration, Duration::from_secs(2));
521        assert_eq!(settings.rate_limit_num, 3);
522        assert_eq!(settings.retry_attempts, 4);
523        assert_eq!(settings.retry_max_duration, Duration::from_secs(5));
524        assert_eq!(settings.retry_initial_backoff, Duration::from_secs(6));
525    }
526
527    #[tokio::test]
528    async fn partition_sink_retry_concurrency() {
529        let cfg: TowerRequestConfig<GlobalTowerRequestConfigDefaults> = TowerRequestConfig {
530            concurrency: Concurrency::Fixed(1),
531            ..TowerRequestConfig::default()
532        };
533        let settings = cfg.into_settings();
534
535        let sent_requests = Arc::new(Mutex::new(Vec::new()));
536
537        let svc = {
538            let sent_requests = Arc::clone(&sent_requests);
539            let delay = Arc::new(AtomicBool::new(true));
540            tower::service_fn(move |req: PartitionInnerBuffer<Vec<usize>, Vec<usize>>| {
541                let (req, _) = req.into_parts();
542                if delay.swap(false, AcqRel) {
543                    // Error on first request
544                    future::err::<(), _>(std::io::Error::other("")).boxed()
545                } else {
546                    sent_requests.lock().unwrap().push(req);
547                    future::ok::<_, std::io::Error>(()).boxed()
548                }
549            })
550        };
551
552        let mut batch_settings = BatchSettings::default();
553        batch_settings.size.bytes = 9999;
554        batch_settings.size.events = 10;
555
556        let mut sink = settings.partition_sink(
557            RetryAlways,
558            svc,
559            PartitionBuffer::new(VecBuffer::new(batch_settings.size)),
560            TIMEOUT,
561        );
562        sink.ordered();
563
564        let input = (0..20).map(|i| PartitionInnerBuffer::new(i, vec![0]));
565        sink.sink_map_err(drop)
566            .send_all(
567                &mut stream::iter(input)
568                    .map(|item| Ok(EncodedEvent::new(item, 0, JsonSize::zero()))),
569            )
570            .await
571            .unwrap();
572
573        let output = sent_requests.lock().unwrap();
574        assert_eq!(
575            &*output,
576            &vec![(0..10).collect::<Vec<_>>(), (10..20).collect::<Vec<_>>(),]
577        );
578    }
579
580    #[derive(Clone, Debug, Copy)]
581    struct RetryAlways;
582
583    impl RetryLogic for RetryAlways {
584        type Error = std::io::Error;
585        type Request = PartitionInnerBuffer<Vec<usize>, Vec<usize>>;
586        type Response = ();
587
588        fn is_retriable_error(&self, _: &Self::Error) -> bool {
589            true
590        }
591
592        fn should_retry_response(&self, _response: &Self::Response) -> RetryAction<Self::Request> {
593            // Treat the default as the request is successful
594            RetryAction::Successful
595        }
596    }
597}