vector/sinks/aws_cloudwatch_logs/
service.rs

1use std::{
2    collections::HashMap,
3    fmt,
4    task::{ready, Context, Poll},
5};
6
7use aws_sdk_cloudwatchlogs::{
8    operation::{
9        create_log_group::CreateLogGroupError, create_log_stream::CreateLogStreamError,
10        describe_log_streams::DescribeLogStreamsError, put_log_events::PutLogEventsError,
11        put_retention_policy::PutRetentionPolicyError,
12    },
13    types::InputLogEvent,
14    Client as CloudwatchLogsClient,
15};
16use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
17use chrono::Duration;
18use futures::{future::BoxFuture, FutureExt};
19use futures_util::TryFutureExt;
20use http::{
21    header::{HeaderName, InvalidHeaderName, InvalidHeaderValue},
22    HeaderValue,
23};
24use indexmap::IndexMap;
25use snafu::{ResultExt, Snafu};
26use tokio::sync::oneshot;
27use tower::{
28    buffer::Buffer,
29    limit::{ConcurrencyLimit, RateLimit},
30    retry::Retry,
31    timeout::Timeout,
32    Service, ServiceBuilder, ServiceExt,
33};
34use vector_lib::stream::DriverResponse;
35use vector_lib::{
36    finalization::EventStatus,
37    request_metadata::{GroupedCountByteSize, MetaDescriptive},
38};
39
40use crate::sinks::{
41    aws_cloudwatch_logs::{
42        config::CloudwatchLogsSinkConfig, config::Retention, request, retry::CloudwatchRetryLogic,
43        sink::BatchCloudwatchRequest, CloudwatchKey,
44    },
45    util::{retries::FibonacciRetryPolicy, EncodedLength, TowerRequestSettings},
46};
47
48type Svc = Buffer<
49    Vec<InputLogEvent>,
50    <ConcurrencyLimit<
51        RateLimit<
52            Retry<
53                FibonacciRetryPolicy<CloudwatchRetryLogic<Vec<InputLogEvent>, ()>>,
54                Buffer<
55                    Vec<InputLogEvent>,
56                    <Timeout<CloudwatchLogsSvc> as Service<Vec<InputLogEvent>>>::Future,
57                >,
58            >,
59        >,
60    > as Service<Vec<InputLogEvent>>>::Future,
61>;
62
63#[derive(Debug)]
64pub enum CloudwatchError {
65    Put(SdkError<PutLogEventsError, HttpResponse>),
66    DescribeLogStreams(SdkError<DescribeLogStreamsError, HttpResponse>),
67    CreateStream(SdkError<CreateLogStreamError, HttpResponse>),
68    CreateGroup(SdkError<CreateLogGroupError, HttpResponse>),
69    PutRetentionPolicy(SdkError<PutRetentionPolicyError, HttpResponse>),
70    NoStreamsFound,
71}
72
73impl fmt::Display for CloudwatchError {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        match self {
76            CloudwatchError::Put(error) => write!(f, "CloudwatchError::Put: {error}"),
77            CloudwatchError::DescribeLogStreams(error) => {
78                write!(f, "CloudwatchError::DescribeLogStreams: {error}")
79            }
80            CloudwatchError::CreateStream(error) => {
81                write!(f, "CloudwatchError::CreateStream: {error}")
82            }
83            CloudwatchError::CreateGroup(error) => {
84                write!(f, "CloudwatchError::CreateGroup: {error}")
85            }
86            CloudwatchError::NoStreamsFound => write!(f, "CloudwatchError: No Streams Found"),
87            CloudwatchError::PutRetentionPolicy(error) => {
88                write!(f, "CloudwatchError::PutRetentionPolicy: {error}")
89            }
90        }
91    }
92}
93
94impl std::error::Error for CloudwatchError {}
95
96impl From<SdkError<PutLogEventsError, HttpResponse>> for CloudwatchError {
97    fn from(error: SdkError<PutLogEventsError, HttpResponse>) -> Self {
98        CloudwatchError::Put(error)
99    }
100}
101
102impl From<SdkError<DescribeLogStreamsError, HttpResponse>> for CloudwatchError {
103    fn from(error: SdkError<DescribeLogStreamsError, HttpResponse>) -> Self {
104        CloudwatchError::DescribeLogStreams(error)
105    }
106}
107
108#[derive(Debug)]
109pub struct CloudwatchResponse {
110    events_byte_size: GroupedCountByteSize,
111}
112
113impl crate::sinks::util::sink::Response for CloudwatchResponse {
114    fn is_successful(&self) -> bool {
115        true
116    }
117
118    fn is_transient(&self) -> bool {
119        false
120    }
121}
122
123impl DriverResponse for CloudwatchResponse {
124    fn event_status(&self) -> EventStatus {
125        EventStatus::Delivered
126    }
127
128    fn events_sent(&self) -> &GroupedCountByteSize {
129        &self.events_byte_size
130    }
131}
132
133#[derive(Snafu, Debug)]
134enum HeaderError {
135    #[snafu(display("invalid header name {source}"))]
136    InvalidName { source: InvalidHeaderName },
137    #[snafu(display("invalid header value {source}"))]
138    InvalidValue { source: InvalidHeaderValue },
139}
140
141impl CloudwatchLogsPartitionSvc {
142    pub fn new(
143        config: CloudwatchLogsSinkConfig,
144        client: CloudwatchLogsClient,
145    ) -> crate::Result<Self> {
146        let request_settings = config.request.tower.into_settings();
147
148        let headers = config
149            .request
150            .headers
151            .iter()
152            .map(|(name, value)| {
153                Ok((
154                    HeaderName::from_bytes(name.as_bytes()).context(InvalidNameSnafu {})?,
155                    HeaderValue::from_str(value.as_str()).context(InvalidValueSnafu {})?,
156                ))
157            })
158            .collect::<Result<IndexMap<_, _>, HeaderError>>()?;
159
160        Ok(Self {
161            config,
162            clients: HashMap::new(),
163            request_settings,
164            client,
165            headers,
166        })
167    }
168}
169
170impl Service<BatchCloudwatchRequest> for CloudwatchLogsPartitionSvc {
171    type Response = CloudwatchResponse;
172    type Error = crate::Error;
173    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
174
175    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
176        Poll::Ready(Ok(()))
177    }
178
179    fn call(&mut self, mut req: BatchCloudwatchRequest) -> Self::Future {
180        let metadata = std::mem::take(req.metadata_mut());
181        let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
182
183        let key = req.key;
184        let events = req
185            .events
186            .into_iter()
187            .map(|req| {
188                InputLogEvent::builder()
189                    .message(req.message)
190                    .timestamp(req.timestamp)
191                    .build()
192                    .expect("all builder fields specified")
193            })
194            .collect();
195
196        let svc = if let Some(svc) = &mut self.clients.get_mut(&key) {
197            svc.clone()
198        } else {
199            // Concurrency limit is 1 because we need token from previous request.
200            let svc = ServiceBuilder::new()
201                .buffer(1)
202                .concurrency_limit(1)
203                .rate_limit(
204                    self.request_settings.rate_limit_num,
205                    self.request_settings.rate_limit_duration,
206                )
207                .retry(
208                    self.request_settings
209                        .retry_policy(CloudwatchRetryLogic::new()),
210                )
211                .buffer(1)
212                .timeout(self.request_settings.timeout)
213                .service(CloudwatchLogsSvc::new(
214                    self.config.clone(),
215                    &key,
216                    self.client.clone(),
217                    self.headers.clone(),
218                ));
219
220            self.clients.insert(key, svc.clone());
221            svc
222        };
223
224        svc.oneshot(events)
225            .map_ok(move |_x| CloudwatchResponse { events_byte_size })
226            .map_err(Into::into)
227            .boxed()
228    }
229}
230
231impl CloudwatchLogsSvc {
232    pub fn new(
233        config: CloudwatchLogsSinkConfig,
234        key: &CloudwatchKey,
235        client: CloudwatchLogsClient,
236        headers: IndexMap<HeaderName, HeaderValue>,
237    ) -> Self {
238        let group_name = key.group.clone();
239        let stream_name = key.stream.clone();
240
241        let create_missing_group = config.create_missing_group;
242        let create_missing_stream = config.create_missing_stream;
243
244        let retention = config.retention.clone();
245
246        let kms_key = config.kms_key.clone();
247        let tags = config.tags.clone();
248
249        CloudwatchLogsSvc {
250            headers,
251            client,
252            stream_name,
253            group_name,
254            create_missing_group,
255            create_missing_stream,
256            retention,
257            kms_key,
258            tags,
259            token: None,
260            token_rx: None,
261        }
262    }
263
264    pub fn process_events(&self, mut events: Vec<InputLogEvent>) -> Vec<Vec<InputLogEvent>> {
265        // Sort by timestamp
266        events.sort_by_key(|e| e.timestamp);
267
268        info!(message = "Sending events.", events = %events.len());
269
270        let mut event_batches = Vec::new();
271
272        // We will split events into 24h batches.
273        // Relies on log_events being sorted by timestamp in ascending order.
274        while let Some(oldest) = events.first() {
275            let limit = oldest.timestamp + Duration::days(1).num_milliseconds();
276
277            if events.last().expect("Events can't be empty").timestamp <= limit {
278                // Fast path.
279                // In most cases the difference between oldest and newest event
280                // is less than 24h.
281                event_batches.push(events);
282                break;
283            }
284
285            // At this point we know that an event older than the limit exists.
286            //
287            // We will find none or one of the events with timestamp==limit.
288            // In the case of more events with limit, we can just split them
289            // at found event, and send those before at with this batch, and
290            // those after at with the next batch.
291            let at = events
292                .binary_search_by_key(&limit, |e| e.timestamp)
293                .unwrap_or_else(|at| at);
294
295            // Can't be empty
296            let remainder = events.split_off(at);
297            event_batches.push(events);
298            events = remainder;
299        }
300
301        event_batches
302    }
303}
304
305impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
306    type Response = ();
307    type Error = CloudwatchError;
308    type Future = request::CloudwatchFuture;
309
310    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
311        if let Some(rx) = &mut self.token_rx {
312            self.token = ready!(rx.poll_unpin(cx)).ok().flatten();
313            self.token_rx = None;
314        }
315        Poll::Ready(Ok(()))
316    }
317
318    fn call(&mut self, req: Vec<InputLogEvent>) -> Self::Future {
319        if self.token_rx.is_none() {
320            let event_batches = self.process_events(req);
321
322            let (tx, rx) = oneshot::channel();
323            self.token_rx = Some(rx);
324
325            request::CloudwatchFuture::new(
326                self.client.clone(),
327                self.headers.clone(),
328                self.stream_name.clone(),
329                self.group_name.clone(),
330                self.create_missing_group,
331                self.create_missing_stream,
332                self.retention.clone(),
333                self.kms_key.clone(),
334                self.tags.clone(),
335                event_batches,
336                self.token.take(),
337                tx,
338            )
339        } else {
340            panic!("poll_ready was not called; this is a bug!");
341        }
342    }
343}
344
345pub struct CloudwatchLogsSvc {
346    client: CloudwatchLogsClient,
347    headers: IndexMap<HeaderName, HeaderValue>,
348    stream_name: String,
349    group_name: String,
350    create_missing_group: bool,
351    create_missing_stream: bool,
352    retention: Retention,
353    kms_key: Option<String>,
354    tags: Option<HashMap<String, String>>,
355    token: Option<String>,
356    token_rx: Option<oneshot::Receiver<Option<String>>>,
357}
358
359impl EncodedLength for InputLogEvent {
360    fn encoded_length(&self) -> usize {
361        self.message.len() + 26
362    }
363}
364
365#[derive(Clone)]
366pub struct CloudwatchLogsPartitionSvc {
367    config: CloudwatchLogsSinkConfig,
368    headers: IndexMap<HeaderName, HeaderValue>,
369    clients: HashMap<CloudwatchKey, Svc>,
370    request_settings: TowerRequestSettings,
371    client: CloudwatchLogsClient,
372}