vector/sinks/aws_cloudwatch_logs/
retry.rs1use std::marker::PhantomData;
2
3use aws_sdk_cloudwatchlogs::operation::{
4 create_log_stream::CreateLogStreamError, describe_log_streams::DescribeLogStreamsError,
5 put_log_events::PutLogEventsError,
6};
7use aws_smithy_runtime_api::client::result::SdkError;
8
9use crate::{
10 aws::is_retriable_error,
11 sinks::{aws_cloudwatch_logs::service::CloudwatchError, util::retries::RetryLogic},
12};
13
14#[derive(Debug)]
15pub struct CloudwatchRetryLogic<Request, Response> {
16 request: PhantomData<Request>,
17 response: PhantomData<Response>,
18}
19impl<Request, Response> CloudwatchRetryLogic<Request, Response> {
20 pub const fn new() -> CloudwatchRetryLogic<Request, Response> {
21 CloudwatchRetryLogic {
22 request: PhantomData,
23 response: PhantomData,
24 }
25 }
26}
27
28impl<Request, Response> Clone for CloudwatchRetryLogic<Request, Response> {
29 fn clone(&self) -> Self {
30 CloudwatchRetryLogic {
31 request: PhantomData,
32 response: PhantomData,
33 }
34 }
35}
36
37impl<Request: Send + Sync + 'static, Response: Send + Sync + 'static> RetryLogic
38 for CloudwatchRetryLogic<Request, Response>
39{
40 type Error = CloudwatchError;
41 type Request = Request;
42 type Response = Response;
43
44 #[allow(clippy::cognitive_complexity)] fn is_retriable_error(&self, error: &Self::Error) -> bool {
47 match error {
48 CloudwatchError::Put(err) => {
49 if let SdkError::ServiceError(inner) = err {
50 let err = inner.err();
51 if matches!(err, PutLogEventsError::ServiceUnavailableException(_)) {
52 return true;
53 }
54 }
55 is_retriable_error(err)
56 }
57 CloudwatchError::DescribeLogStreams(err) => {
58 if let SdkError::ServiceError(inner) = err {
59 let err = inner.err();
60 if matches!(err, DescribeLogStreamsError::ServiceUnavailableException(_)) {
61 return true;
62 }
63 }
64 is_retriable_error(err)
65 }
66 CloudwatchError::CreateStream(err) => {
67 if let SdkError::ServiceError(inner) = err {
68 let err = inner.err();
69 if matches!(err, CreateLogStreamError::ServiceUnavailableException(_)) {
70 return true;
71 }
72 }
73 is_retriable_error(err)
74 }
75 _ => false,
76 }
77 }
78}
79
80#[cfg(test)]
81mod test {
82 use aws_sdk_cloudwatchlogs::operation::put_log_events::PutLogEventsError;
83 use aws_smithy_runtime_api::{
84 client::{orchestrator::HttpResponse, result::SdkError},
85 http::StatusCode,
86 };
87 use aws_smithy_types::body::SdkBody;
88
89 use crate::sinks::{
90 aws_cloudwatch_logs::{retry::CloudwatchRetryLogic, service::CloudwatchError},
91 util::retries::RetryLogic,
92 };
93
94 #[test]
95 fn test_throttle_retry() {
96 let retry_logic: CloudwatchRetryLogic<(), ()> = CloudwatchRetryLogic::new();
97
98 let meta_err = aws_smithy_types::error::ErrorMetadata::builder()
99 .code("ThrottlingException")
100 .message("Rate exceeded for logStreamName log-test-1.us-east-1.compute.internal")
101 .build();
102
103 let body = SdkBody::from(
104 "{\"__type\":\"ThrottlingException\",\"message\":\"Rate exceeded for logStreamName log-test-1.us-east-1.compute.internal\"}",
105 );
106
107 let raw = HttpResponse::new(StatusCode::try_from(400_u16).unwrap(), body);
108
109 let err = CloudwatchError::Put(SdkError::service_error(
110 PutLogEventsError::generic(meta_err),
111 raw,
112 ));
113 assert!(retry_logic.is_retriable_error(&err));
114 }
115}