vector/sinks/aws_cloudwatch_logs/
service.rs1use std::{
2 collections::HashMap,
3 fmt,
4 task::{ready, Context, Poll},
5};
6
7use aws_sdk_cloudwatchlogs::{
8 operation::{
9 create_log_group::CreateLogGroupError, create_log_stream::CreateLogStreamError,
10 describe_log_streams::DescribeLogStreamsError, put_log_events::PutLogEventsError,
11 put_retention_policy::PutRetentionPolicyError,
12 },
13 types::InputLogEvent,
14 Client as CloudwatchLogsClient,
15};
16use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
17use chrono::Duration;
18use futures::{future::BoxFuture, FutureExt};
19use futures_util::TryFutureExt;
20use http::{
21 header::{HeaderName, InvalidHeaderName, InvalidHeaderValue},
22 HeaderValue,
23};
24use indexmap::IndexMap;
25use snafu::{ResultExt, Snafu};
26use tokio::sync::oneshot;
27use tower::{
28 buffer::Buffer,
29 limit::{ConcurrencyLimit, RateLimit},
30 retry::Retry,
31 timeout::Timeout,
32 Service, ServiceBuilder, ServiceExt,
33};
34use vector_lib::stream::DriverResponse;
35use vector_lib::{
36 finalization::EventStatus,
37 request_metadata::{GroupedCountByteSize, MetaDescriptive},
38};
39
40use crate::sinks::{
41 aws_cloudwatch_logs::{
42 config::CloudwatchLogsSinkConfig, config::Retention, request, retry::CloudwatchRetryLogic,
43 sink::BatchCloudwatchRequest, CloudwatchKey,
44 },
45 util::{retries::FibonacciRetryPolicy, EncodedLength, TowerRequestSettings},
46};
47
48type Svc = Buffer<
49 Vec<InputLogEvent>,
50 <ConcurrencyLimit<
51 RateLimit<
52 Retry<
53 FibonacciRetryPolicy<CloudwatchRetryLogic<Vec<InputLogEvent>, ()>>,
54 Buffer<
55 Vec<InputLogEvent>,
56 <Timeout<CloudwatchLogsSvc> as Service<Vec<InputLogEvent>>>::Future,
57 >,
58 >,
59 >,
60 > as Service<Vec<InputLogEvent>>>::Future,
61>;
62
63#[derive(Debug)]
64pub enum CloudwatchError {
65 Put(SdkError<PutLogEventsError, HttpResponse>),
66 DescribeLogStreams(SdkError<DescribeLogStreamsError, HttpResponse>),
67 CreateStream(SdkError<CreateLogStreamError, HttpResponse>),
68 CreateGroup(SdkError<CreateLogGroupError, HttpResponse>),
69 PutRetentionPolicy(SdkError<PutRetentionPolicyError, HttpResponse>),
70 NoStreamsFound,
71}
72
73impl fmt::Display for CloudwatchError {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 match self {
76 CloudwatchError::Put(error) => write!(f, "CloudwatchError::Put: {error}"),
77 CloudwatchError::DescribeLogStreams(error) => {
78 write!(f, "CloudwatchError::DescribeLogStreams: {error}")
79 }
80 CloudwatchError::CreateStream(error) => {
81 write!(f, "CloudwatchError::CreateStream: {error}")
82 }
83 CloudwatchError::CreateGroup(error) => {
84 write!(f, "CloudwatchError::CreateGroup: {error}")
85 }
86 CloudwatchError::NoStreamsFound => write!(f, "CloudwatchError: No Streams Found"),
87 CloudwatchError::PutRetentionPolicy(error) => {
88 write!(f, "CloudwatchError::PutRetentionPolicy: {error}")
89 }
90 }
91 }
92}
93
94impl std::error::Error for CloudwatchError {}
95
96impl From<SdkError<PutLogEventsError, HttpResponse>> for CloudwatchError {
97 fn from(error: SdkError<PutLogEventsError, HttpResponse>) -> Self {
98 CloudwatchError::Put(error)
99 }
100}
101
102impl From<SdkError<DescribeLogStreamsError, HttpResponse>> for CloudwatchError {
103 fn from(error: SdkError<DescribeLogStreamsError, HttpResponse>) -> Self {
104 CloudwatchError::DescribeLogStreams(error)
105 }
106}
107
108#[derive(Debug)]
109pub struct CloudwatchResponse {
110 events_byte_size: GroupedCountByteSize,
111}
112
113impl crate::sinks::util::sink::Response for CloudwatchResponse {
114 fn is_successful(&self) -> bool {
115 true
116 }
117
118 fn is_transient(&self) -> bool {
119 false
120 }
121}
122
123impl DriverResponse for CloudwatchResponse {
124 fn event_status(&self) -> EventStatus {
125 EventStatus::Delivered
126 }
127
128 fn events_sent(&self) -> &GroupedCountByteSize {
129 &self.events_byte_size
130 }
131}
132
133#[derive(Snafu, Debug)]
134enum HeaderError {
135 #[snafu(display("invalid header name {source}"))]
136 InvalidName { source: InvalidHeaderName },
137 #[snafu(display("invalid header value {source}"))]
138 InvalidValue { source: InvalidHeaderValue },
139}
140
141impl CloudwatchLogsPartitionSvc {
142 pub fn new(
143 config: CloudwatchLogsSinkConfig,
144 client: CloudwatchLogsClient,
145 ) -> crate::Result<Self> {
146 let request_settings = config.request.tower.into_settings();
147
148 let headers = config
149 .request
150 .headers
151 .iter()
152 .map(|(name, value)| {
153 Ok((
154 HeaderName::from_bytes(name.as_bytes()).context(InvalidNameSnafu {})?,
155 HeaderValue::from_str(value.as_str()).context(InvalidValueSnafu {})?,
156 ))
157 })
158 .collect::<Result<IndexMap<_, _>, HeaderError>>()?;
159
160 Ok(Self {
161 config,
162 clients: HashMap::new(),
163 request_settings,
164 client,
165 headers,
166 })
167 }
168}
169
170impl Service<BatchCloudwatchRequest> for CloudwatchLogsPartitionSvc {
171 type Response = CloudwatchResponse;
172 type Error = crate::Error;
173 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
174
175 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
176 Poll::Ready(Ok(()))
177 }
178
179 fn call(&mut self, mut req: BatchCloudwatchRequest) -> Self::Future {
180 let metadata = std::mem::take(req.metadata_mut());
181 let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
182
183 let key = req.key;
184 let events = req
185 .events
186 .into_iter()
187 .map(|req| {
188 InputLogEvent::builder()
189 .message(req.message)
190 .timestamp(req.timestamp)
191 .build()
192 .expect("all builder fields specified")
193 })
194 .collect();
195
196 let svc = if let Some(svc) = &mut self.clients.get_mut(&key) {
197 svc.clone()
198 } else {
199 let svc = ServiceBuilder::new()
201 .buffer(1)
202 .concurrency_limit(1)
203 .rate_limit(
204 self.request_settings.rate_limit_num,
205 self.request_settings.rate_limit_duration,
206 )
207 .retry(
208 self.request_settings
209 .retry_policy(CloudwatchRetryLogic::new()),
210 )
211 .buffer(1)
212 .timeout(self.request_settings.timeout)
213 .service(CloudwatchLogsSvc::new(
214 self.config.clone(),
215 &key,
216 self.client.clone(),
217 self.headers.clone(),
218 ));
219
220 self.clients.insert(key, svc.clone());
221 svc
222 };
223
224 svc.oneshot(events)
225 .map_ok(move |_x| CloudwatchResponse { events_byte_size })
226 .map_err(Into::into)
227 .boxed()
228 }
229}
230
231impl CloudwatchLogsSvc {
232 pub fn new(
233 config: CloudwatchLogsSinkConfig,
234 key: &CloudwatchKey,
235 client: CloudwatchLogsClient,
236 headers: IndexMap<HeaderName, HeaderValue>,
237 ) -> Self {
238 let group_name = key.group.clone();
239 let stream_name = key.stream.clone();
240
241 let create_missing_group = config.create_missing_group;
242 let create_missing_stream = config.create_missing_stream;
243
244 let retention = config.retention.clone();
245
246 let kms_key = config.kms_key.clone();
247 let tags = config.tags.clone();
248
249 CloudwatchLogsSvc {
250 headers,
251 client,
252 stream_name,
253 group_name,
254 create_missing_group,
255 create_missing_stream,
256 retention,
257 kms_key,
258 tags,
259 token: None,
260 token_rx: None,
261 }
262 }
263
264 pub fn process_events(&self, mut events: Vec<InputLogEvent>) -> Vec<Vec<InputLogEvent>> {
265 events.sort_by_key(|e| e.timestamp);
267
268 info!(message = "Sending events.", events = %events.len());
269
270 let mut event_batches = Vec::new();
271
272 while let Some(oldest) = events.first() {
275 let limit = oldest.timestamp + Duration::days(1).num_milliseconds();
276
277 if events.last().expect("Events can't be empty").timestamp <= limit {
278 event_batches.push(events);
282 break;
283 }
284
285 let at = events
292 .binary_search_by_key(&limit, |e| e.timestamp)
293 .unwrap_or_else(|at| at);
294
295 let remainder = events.split_off(at);
297 event_batches.push(events);
298 events = remainder;
299 }
300
301 event_batches
302 }
303}
304
305impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
306 type Response = ();
307 type Error = CloudwatchError;
308 type Future = request::CloudwatchFuture;
309
310 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
311 if let Some(rx) = &mut self.token_rx {
312 self.token = ready!(rx.poll_unpin(cx)).ok().flatten();
313 self.token_rx = None;
314 }
315 Poll::Ready(Ok(()))
316 }
317
318 fn call(&mut self, req: Vec<InputLogEvent>) -> Self::Future {
319 if self.token_rx.is_none() {
320 let event_batches = self.process_events(req);
321
322 let (tx, rx) = oneshot::channel();
323 self.token_rx = Some(rx);
324
325 request::CloudwatchFuture::new(
326 self.client.clone(),
327 self.headers.clone(),
328 self.stream_name.clone(),
329 self.group_name.clone(),
330 self.create_missing_group,
331 self.create_missing_stream,
332 self.retention.clone(),
333 self.kms_key.clone(),
334 self.tags.clone(),
335 event_batches,
336 self.token.take(),
337 tx,
338 )
339 } else {
340 panic!("poll_ready was not called; this is a bug!");
341 }
342 }
343}
344
345pub struct CloudwatchLogsSvc {
346 client: CloudwatchLogsClient,
347 headers: IndexMap<HeaderName, HeaderValue>,
348 stream_name: String,
349 group_name: String,
350 create_missing_group: bool,
351 create_missing_stream: bool,
352 retention: Retention,
353 kms_key: Option<String>,
354 tags: Option<HashMap<String, String>>,
355 token: Option<String>,
356 token_rx: Option<oneshot::Receiver<Option<String>>>,
357}
358
359impl EncodedLength for InputLogEvent {
360 fn encoded_length(&self) -> usize {
361 self.message.len() + 26
362 }
363}
364
365#[derive(Clone)]
366pub struct CloudwatchLogsPartitionSvc {
367 config: CloudwatchLogsSinkConfig,
368 headers: IndexMap<HeaderName, HeaderValue>,
369 clients: HashMap<CloudwatchKey, Svc>,
370 request_settings: TowerRequestSettings,
371 client: CloudwatchLogsClient,
372}