vector/sinks/aws_cloudwatch_logs/
service.rs1use std::{
2 collections::HashMap,
3 fmt,
4 task::{Context, Poll, ready},
5};
6
7use aws_sdk_cloudwatchlogs::{
8 Client as CloudwatchLogsClient,
9 operation::{
10 create_log_group::CreateLogGroupError, create_log_stream::CreateLogStreamError,
11 describe_log_streams::DescribeLogStreamsError, put_log_events::PutLogEventsError,
12 put_retention_policy::PutRetentionPolicyError,
13 },
14 types::InputLogEvent,
15};
16use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
17use chrono::Duration;
18use futures::{FutureExt, future::BoxFuture};
19use futures_util::TryFutureExt;
20use http::{
21 HeaderValue,
22 header::{HeaderName, InvalidHeaderName, InvalidHeaderValue},
23};
24use indexmap::IndexMap;
25use snafu::{ResultExt, Snafu};
26use tokio::sync::oneshot;
27use tower::{
28 Service, ServiceBuilder, ServiceExt,
29 buffer::Buffer,
30 limit::{ConcurrencyLimit, RateLimit},
31 retry::Retry,
32 timeout::Timeout,
33};
34use vector_lib::{
35 finalization::EventStatus,
36 request_metadata::{GroupedCountByteSize, MetaDescriptive},
37 stream::DriverResponse,
38};
39
40use crate::sinks::{
41 aws_cloudwatch_logs::{
42 CloudwatchKey,
43 config::{CloudwatchLogsSinkConfig, Retention},
44 request,
45 retry::CloudwatchRetryLogic,
46 sink::BatchCloudwatchRequest,
47 },
48 util::{EncodedLength, TowerRequestSettings, retries::FibonacciRetryPolicy},
49};
50
51type Svc = Buffer<
52 Vec<InputLogEvent>,
53 <ConcurrencyLimit<
54 RateLimit<
55 Retry<
56 FibonacciRetryPolicy<CloudwatchRetryLogic<Vec<InputLogEvent>, ()>>,
57 Buffer<
58 Vec<InputLogEvent>,
59 <Timeout<CloudwatchLogsSvc> as Service<Vec<InputLogEvent>>>::Future,
60 >,
61 >,
62 >,
63 > as Service<Vec<InputLogEvent>>>::Future,
64>;
65
66#[derive(Debug)]
67pub enum CloudwatchError {
68 Put(SdkError<PutLogEventsError, HttpResponse>),
69 DescribeLogStreams(SdkError<DescribeLogStreamsError, HttpResponse>),
70 CreateStream(SdkError<CreateLogStreamError, HttpResponse>),
71 CreateGroup(SdkError<CreateLogGroupError, HttpResponse>),
72 PutRetentionPolicy(SdkError<PutRetentionPolicyError, HttpResponse>),
73 NoStreamsFound,
74}
75
76impl fmt::Display for CloudwatchError {
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 match self {
79 CloudwatchError::Put(error) => write!(f, "CloudwatchError::Put: {error}"),
80 CloudwatchError::DescribeLogStreams(error) => {
81 write!(f, "CloudwatchError::DescribeLogStreams: {error}")
82 }
83 CloudwatchError::CreateStream(error) => {
84 write!(f, "CloudwatchError::CreateStream: {error}")
85 }
86 CloudwatchError::CreateGroup(error) => {
87 write!(f, "CloudwatchError::CreateGroup: {error}")
88 }
89 CloudwatchError::NoStreamsFound => write!(f, "CloudwatchError: No Streams Found"),
90 CloudwatchError::PutRetentionPolicy(error) => {
91 write!(f, "CloudwatchError::PutRetentionPolicy: {error}")
92 }
93 }
94 }
95}
96
97impl std::error::Error for CloudwatchError {}
98
99impl From<SdkError<PutLogEventsError, HttpResponse>> for CloudwatchError {
100 fn from(error: SdkError<PutLogEventsError, HttpResponse>) -> Self {
101 CloudwatchError::Put(error)
102 }
103}
104
105impl From<SdkError<DescribeLogStreamsError, HttpResponse>> for CloudwatchError {
106 fn from(error: SdkError<DescribeLogStreamsError, HttpResponse>) -> Self {
107 CloudwatchError::DescribeLogStreams(error)
108 }
109}
110
111#[derive(Debug)]
112pub struct CloudwatchResponse {
113 events_byte_size: GroupedCountByteSize,
114}
115
116impl crate::sinks::util::sink::Response for CloudwatchResponse {
117 fn is_successful(&self) -> bool {
118 true
119 }
120
121 fn is_transient(&self) -> bool {
122 false
123 }
124}
125
126impl DriverResponse for CloudwatchResponse {
127 fn event_status(&self) -> EventStatus {
128 EventStatus::Delivered
129 }
130
131 fn events_sent(&self) -> &GroupedCountByteSize {
132 &self.events_byte_size
133 }
134}
135
136#[derive(Snafu, Debug)]
137enum HeaderError {
138 #[snafu(display("invalid header name {source}"))]
139 InvalidName { source: InvalidHeaderName },
140 #[snafu(display("invalid header value {source}"))]
141 InvalidValue { source: InvalidHeaderValue },
142}
143
144impl CloudwatchLogsPartitionSvc {
145 pub fn new(
146 config: CloudwatchLogsSinkConfig,
147 client: CloudwatchLogsClient,
148 ) -> crate::Result<Self> {
149 let request_settings = config.request.tower.into_settings();
150
151 let headers = config
152 .request
153 .headers
154 .iter()
155 .map(|(name, value)| {
156 Ok((
157 HeaderName::from_bytes(name.as_bytes()).context(InvalidNameSnafu {})?,
158 HeaderValue::from_str(value.as_str()).context(InvalidValueSnafu {})?,
159 ))
160 })
161 .collect::<Result<IndexMap<_, _>, HeaderError>>()?;
162
163 Ok(Self {
164 config,
165 clients: HashMap::new(),
166 request_settings,
167 client,
168 headers,
169 })
170 }
171}
172
173impl Service<BatchCloudwatchRequest> for CloudwatchLogsPartitionSvc {
174 type Response = CloudwatchResponse;
175 type Error = crate::Error;
176 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
177
178 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
179 Poll::Ready(Ok(()))
180 }
181
182 fn call(&mut self, mut req: BatchCloudwatchRequest) -> Self::Future {
183 let metadata = std::mem::take(req.metadata_mut());
184 let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
185
186 let key = req.key;
187 let events = req
188 .events
189 .into_iter()
190 .map(|req| {
191 InputLogEvent::builder()
192 .message(req.message)
193 .timestamp(req.timestamp)
194 .build()
195 .expect("all builder fields specified")
196 })
197 .collect();
198
199 let svc = if let Some(svc) = &mut self.clients.get_mut(&key) {
200 svc.clone()
201 } else {
202 let svc = ServiceBuilder::new()
204 .buffer(1)
205 .concurrency_limit(1)
206 .rate_limit(
207 self.request_settings.rate_limit_num,
208 self.request_settings.rate_limit_duration,
209 )
210 .retry(
211 self.request_settings
212 .retry_policy(CloudwatchRetryLogic::new()),
213 )
214 .buffer(1)
215 .timeout(self.request_settings.timeout)
216 .service(CloudwatchLogsSvc::new(
217 self.config.clone(),
218 &key,
219 self.client.clone(),
220 self.headers.clone(),
221 ));
222
223 self.clients.insert(key, svc.clone());
224 svc
225 };
226
227 svc.oneshot(events)
228 .map_ok(move |_x| CloudwatchResponse { events_byte_size })
229 .map_err(Into::into)
230 .boxed()
231 }
232}
233
234impl CloudwatchLogsSvc {
235 pub fn new(
236 config: CloudwatchLogsSinkConfig,
237 key: &CloudwatchKey,
238 client: CloudwatchLogsClient,
239 headers: IndexMap<HeaderName, HeaderValue>,
240 ) -> Self {
241 let group_name = key.group.clone();
242 let stream_name = key.stream.clone();
243
244 let create_missing_group = config.create_missing_group;
245 let create_missing_stream = config.create_missing_stream;
246
247 let retention = config.retention.clone();
248
249 let kms_key = config.kms_key.clone();
250 let tags = config.tags.clone();
251
252 CloudwatchLogsSvc {
253 headers,
254 client,
255 stream_name,
256 group_name,
257 create_missing_group,
258 create_missing_stream,
259 retention,
260 kms_key,
261 tags,
262 token: None,
263 token_rx: None,
264 }
265 }
266
267 pub fn process_events(&self, mut events: Vec<InputLogEvent>) -> Vec<Vec<InputLogEvent>> {
268 events.sort_by_key(|e| e.timestamp);
270
271 info!(message = "Sending events.", events = %events.len());
272
273 let mut event_batches = Vec::new();
274
275 while let Some(oldest) = events.first() {
278 let limit = oldest.timestamp + Duration::days(1).num_milliseconds();
279
280 if events.last().expect("Events can't be empty").timestamp <= limit {
281 event_batches.push(events);
285 break;
286 }
287
288 let at = events
295 .binary_search_by_key(&limit, |e| e.timestamp)
296 .unwrap_or_else(|at| at);
297
298 let remainder = events.split_off(at);
300 event_batches.push(events);
301 events = remainder;
302 }
303
304 event_batches
305 }
306}
307
308impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
309 type Response = ();
310 type Error = CloudwatchError;
311 type Future = request::CloudwatchFuture;
312
313 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
314 if let Some(rx) = &mut self.token_rx {
315 self.token = ready!(rx.poll_unpin(cx)).ok().flatten();
316 self.token_rx = None;
317 }
318 Poll::Ready(Ok(()))
319 }
320
321 fn call(&mut self, req: Vec<InputLogEvent>) -> Self::Future {
322 if self.token_rx.is_none() {
323 let event_batches = self.process_events(req);
324
325 let (tx, rx) = oneshot::channel();
326 self.token_rx = Some(rx);
327
328 request::CloudwatchFuture::new(
329 self.client.clone(),
330 self.headers.clone(),
331 self.stream_name.clone(),
332 self.group_name.clone(),
333 self.create_missing_group,
334 self.create_missing_stream,
335 self.retention.clone(),
336 self.kms_key.clone(),
337 self.tags.clone(),
338 event_batches,
339 self.token.take(),
340 tx,
341 )
342 } else {
343 panic!("poll_ready was not called; this is a bug!");
344 }
345 }
346}
347
348pub struct CloudwatchLogsSvc {
349 client: CloudwatchLogsClient,
350 headers: IndexMap<HeaderName, HeaderValue>,
351 stream_name: String,
352 group_name: String,
353 create_missing_group: bool,
354 create_missing_stream: bool,
355 retention: Retention,
356 kms_key: Option<String>,
357 tags: Option<HashMap<String, String>>,
358 token: Option<String>,
359 token_rx: Option<oneshot::Receiver<Option<String>>>,
360}
361
362impl EncodedLength for InputLogEvent {
363 fn encoded_length(&self) -> usize {
364 self.message.len() + 26
365 }
366}
367
368#[derive(Clone)]
369pub struct CloudwatchLogsPartitionSvc {
370 config: CloudwatchLogsSinkConfig,
371 headers: IndexMap<HeaderName, HeaderValue>,
372 clients: HashMap<CloudwatchKey, Svc>,
373 request_settings: TowerRequestSettings,
374 client: CloudwatchLogsClient,
375}