vector/sinks/aws_cloudwatch_logs/
retry.rs1use std::marker::PhantomData;
2
3use aws_sdk_cloudwatchlogs::operation::create_log_stream::CreateLogStreamError;
4use aws_sdk_cloudwatchlogs::operation::describe_log_streams::DescribeLogStreamsError;
5use aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError;
6use aws_smithy_runtime_api::client::result::SdkError;
7
8use crate::aws::is_retriable_error;
9use crate::sinks::{aws_cloudwatch_logs::service::CloudwatchError, util::retries::RetryLogic};
10
11#[derive(Debug)]
12pub struct CloudwatchRetryLogic<Request, Response> {
13 request: PhantomData<Request>,
14 response: PhantomData<Response>,
15}
16impl<Request, Response> CloudwatchRetryLogic<Request, Response> {
17 pub const fn new() -> CloudwatchRetryLogic<Request, Response> {
18 CloudwatchRetryLogic {
19 request: PhantomData,
20 response: PhantomData,
21 }
22 }
23}
24
25impl<Request, Response> Clone for CloudwatchRetryLogic<Request, Response> {
26 fn clone(&self) -> Self {
27 CloudwatchRetryLogic {
28 request: PhantomData,
29 response: PhantomData,
30 }
31 }
32}
33
34impl<Request: Send + Sync + 'static, Response: Send + Sync + 'static> RetryLogic
35 for CloudwatchRetryLogic<Request, Response>
36{
37 type Error = CloudwatchError;
38 type Request = Request;
39 type Response = Response;
40
41 #[allow(clippy::cognitive_complexity)] fn is_retriable_error(&self, error: &Self::Error) -> bool {
44 match error {
45 CloudwatchError::Put(err) => {
46 if let SdkError::ServiceError(inner) = err {
47 let err = inner.err();
48 if matches!(err, PutLogEventsError::ServiceUnavailableException(_)) {
49 return true;
50 }
51 }
52 is_retriable_error(err)
53 }
54 CloudwatchError::DescribeLogStreams(err) => {
55 if let SdkError::ServiceError(inner) = err {
56 let err = inner.err();
57 if matches!(err, DescribeLogStreamsError::ServiceUnavailableException(_)) {
58 return true;
59 }
60 }
61 is_retriable_error(err)
62 }
63 CloudwatchError::CreateStream(err) => {
64 if let SdkError::ServiceError(inner) = err {
65 let err = inner.err();
66 if matches!(err, CreateLogStreamError::ServiceUnavailableException(_)) {
67 return true;
68 }
69 }
70 is_retriable_error(err)
71 }
72 _ => false,
73 }
74 }
75}
76
77#[cfg(test)]
78mod test {
79 use aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError;
80 use aws_smithy_runtime_api::{
81 client::{orchestrator::HttpResponse, result::SdkError},
82 http::StatusCode,
83 };
84 use aws_smithy_types::body::SdkBody;
85
86 use crate::sinks::aws_cloudwatch_logs::{
87 retry::CloudwatchRetryLogic, service::CloudwatchError,
88 };
89 use crate::sinks::util::retries::RetryLogic;
90
91 #[test]
92 fn test_throttle_retry() {
93 let retry_logic: CloudwatchRetryLogic<(), ()> = CloudwatchRetryLogic::new();
94
95 let meta_err = aws_smithy_types::error::ErrorMetadata::builder()
96 .code("ThrottlingException")
97 .message("Rate exceeded for logStreamName log-test-1.us-east-1.compute.internal")
98 .build();
99
100 let body = SdkBody::from("{\"__type\":\"ThrottlingException\",\"message\":\"Rate exceeded for logStreamName log-test-1.us-east-1.compute.internal\"}");
101
102 let raw = HttpResponse::new(StatusCode::try_from(400_u16).unwrap(), body);
103
104 let err = CloudwatchError::Put(SdkError::service_error(
105 PutLogEventsError::generic(meta_err),
106 raw,
107 ));
108 assert!(retry_logic.is_retriable_error(&err));
109 }
110}