vector/sinks/aws_cloudwatch_logs/
retry.rs

1use std::marker::PhantomData;
2
3use aws_sdk_cloudwatchlogs::operation::create_log_stream::CreateLogStreamError;
4use aws_sdk_cloudwatchlogs::operation::describe_log_streams::DescribeLogStreamsError;
5use aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError;
6use aws_smithy_runtime_api::client::result::SdkError;
7
8use crate::aws::is_retriable_error;
9use crate::sinks::{aws_cloudwatch_logs::service::CloudwatchError, util::retries::RetryLogic};
10
11#[derive(Debug)]
12pub struct CloudwatchRetryLogic<Request, Response> {
13    request: PhantomData<Request>,
14    response: PhantomData<Response>,
15}
16impl<Request, Response> CloudwatchRetryLogic<Request, Response> {
17    pub const fn new() -> CloudwatchRetryLogic<Request, Response> {
18        CloudwatchRetryLogic {
19            request: PhantomData,
20            response: PhantomData,
21        }
22    }
23}
24
25impl<Request, Response> Clone for CloudwatchRetryLogic<Request, Response> {
26    fn clone(&self) -> Self {
27        CloudwatchRetryLogic {
28            request: PhantomData,
29            response: PhantomData,
30        }
31    }
32}
33
34impl<Request: Send + Sync + 'static, Response: Send + Sync + 'static> RetryLogic
35    for CloudwatchRetryLogic<Request, Response>
36{
37    type Error = CloudwatchError;
38    type Request = Request;
39    type Response = Response;
40
41    // TODO this match may not be necessary given the logic in `is_retriable_error()`
42    #[allow(clippy::cognitive_complexity)] // long, but just a hair over our limit
43    fn is_retriable_error(&self, error: &Self::Error) -> bool {
44        match error {
45            CloudwatchError::Put(err) => {
46                if let SdkError::ServiceError(inner) = err {
47                    let err = inner.err();
48                    if matches!(err, PutLogEventsError::ServiceUnavailableException(_)) {
49                        return true;
50                    }
51                }
52                is_retriable_error(err)
53            }
54            CloudwatchError::DescribeLogStreams(err) => {
55                if let SdkError::ServiceError(inner) = err {
56                    let err = inner.err();
57                    if matches!(err, DescribeLogStreamsError::ServiceUnavailableException(_)) {
58                        return true;
59                    }
60                }
61                is_retriable_error(err)
62            }
63            CloudwatchError::CreateStream(err) => {
64                if let SdkError::ServiceError(inner) = err {
65                    let err = inner.err();
66                    if matches!(err, CreateLogStreamError::ServiceUnavailableException(_)) {
67                        return true;
68                    }
69                }
70                is_retriable_error(err)
71            }
72            _ => false,
73        }
74    }
75}
76
77#[cfg(test)]
78mod test {
79    use aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError;
80    use aws_smithy_runtime_api::{
81        client::{orchestrator::HttpResponse, result::SdkError},
82        http::StatusCode,
83    };
84    use aws_smithy_types::body::SdkBody;
85
86    use crate::sinks::aws_cloudwatch_logs::{
87        retry::CloudwatchRetryLogic, service::CloudwatchError,
88    };
89    use crate::sinks::util::retries::RetryLogic;
90
91    #[test]
92    fn test_throttle_retry() {
93        let retry_logic: CloudwatchRetryLogic<(), ()> = CloudwatchRetryLogic::new();
94
95        let meta_err = aws_smithy_types::error::ErrorMetadata::builder()
96            .code("ThrottlingException")
97            .message("Rate exceeded for logStreamName log-test-1.us-east-1.compute.internal")
98            .build();
99
100        let body = SdkBody::from("{\"__type\":\"ThrottlingException\",\"message\":\"Rate exceeded for logStreamName log-test-1.us-east-1.compute.internal\"}");
101
102        let raw = HttpResponse::new(StatusCode::try_from(400_u16).unwrap(), body);
103
104        let err = CloudwatchError::Put(SdkError::service_error(
105            PutLogEventsError::generic(meta_err),
106            raw,
107        ));
108        assert!(retry_logic.is_retriable_error(&err));
109    }
110}