vector/sinks/aws_cloudwatch_logs/
retry.rs

1use std::marker::PhantomData;
2
3use aws_sdk_cloudwatchlogs::operation::{
4    create_log_stream::CreateLogStreamError, describe_log_streams::DescribeLogStreamsError,
5    put_log_events::PutLogEventsError,
6};
7use aws_smithy_runtime_api::client::result::SdkError;
8
9use crate::{
10    aws::is_retriable_error,
11    sinks::{aws_cloudwatch_logs::service::CloudwatchError, util::retries::RetryLogic},
12};
13
14#[derive(Debug)]
15pub struct CloudwatchRetryLogic<Request, Response> {
16    request: PhantomData<Request>,
17    response: PhantomData<Response>,
18}
19impl<Request, Response> CloudwatchRetryLogic<Request, Response> {
20    pub const fn new() -> CloudwatchRetryLogic<Request, Response> {
21        CloudwatchRetryLogic {
22            request: PhantomData,
23            response: PhantomData,
24        }
25    }
26}
27
28impl<Request, Response> Clone for CloudwatchRetryLogic<Request, Response> {
29    fn clone(&self) -> Self {
30        CloudwatchRetryLogic {
31            request: PhantomData,
32            response: PhantomData,
33        }
34    }
35}
36
37impl<Request: Send + Sync + 'static, Response: Send + Sync + 'static> RetryLogic
38    for CloudwatchRetryLogic<Request, Response>
39{
40    type Error = CloudwatchError;
41    type Request = Request;
42    type Response = Response;
43
44    // TODO this match may not be necessary given the logic in `is_retriable_error()`
45    #[allow(clippy::cognitive_complexity)] // long, but just a hair over our limit
46    fn is_retriable_error(&self, error: &Self::Error) -> bool {
47        match error {
48            CloudwatchError::Put(err) => {
49                if let SdkError::ServiceError(inner) = err {
50                    let err = inner.err();
51                    if matches!(err, PutLogEventsError::ServiceUnavailableException(_)) {
52                        return true;
53                    }
54                }
55                is_retriable_error(err)
56            }
57            CloudwatchError::DescribeLogStreams(err) => {
58                if let SdkError::ServiceError(inner) = err {
59                    let err = inner.err();
60                    if matches!(err, DescribeLogStreamsError::ServiceUnavailableException(_)) {
61                        return true;
62                    }
63                }
64                is_retriable_error(err)
65            }
66            CloudwatchError::CreateStream(err) => {
67                if let SdkError::ServiceError(inner) = err {
68                    let err = inner.err();
69                    if matches!(err, CreateLogStreamError::ServiceUnavailableException(_)) {
70                        return true;
71                    }
72                }
73                is_retriable_error(err)
74            }
75            _ => false,
76        }
77    }
78}
79
80#[cfg(test)]
81mod test {
82    use aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError;
83    use aws_smithy_runtime_api::{
84        client::{orchestrator::HttpResponse, result::SdkError},
85        http::StatusCode,
86    };
87    use aws_smithy_types::body::SdkBody;
88
89    use crate::sinks::{
90        aws_cloudwatch_logs::{retry::CloudwatchRetryLogic, service::CloudwatchError},
91        util::retries::RetryLogic,
92    };
93
94    #[test]
95    fn test_throttle_retry() {
96        let retry_logic: CloudwatchRetryLogic<(), ()> = CloudwatchRetryLogic::new();
97
98        let meta_err = aws_smithy_types::error::ErrorMetadata::builder()
99            .code("ThrottlingException")
100            .message("Rate exceeded for logStreamName log-test-1.us-east-1.compute.internal")
101            .build();
102
103        let body = SdkBody::from(
104            "{\"__type\":\"ThrottlingException\",\"message\":\"Rate exceeded for logStreamName log-test-1.us-east-1.compute.internal\"}",
105        );
106
107        let raw = HttpResponse::new(StatusCode::try_from(400_u16).unwrap(), body);
108
109        let err = CloudwatchError::Put(SdkError::service_error(
110            PutLogEventsError::generic(meta_err),
111            raw,
112        ));
113        assert!(retry_logic.is_retriable_error(&err));
114    }
115}