vector/sinks/aws_cloudwatch_logs/
service.rs

1use std::{
2    collections::HashMap,
3    fmt,
4    task::{Context, Poll, ready},
5};
6
7use aws_sdk_cloudwatchlogs::{
8    Client as CloudwatchLogsClient,
9    operation::{
10        create_log_group::CreateLogGroupError, create_log_stream::CreateLogStreamError,
11        describe_log_streams::DescribeLogStreamsError, put_log_events::PutLogEventsError,
12        put_retention_policy::PutRetentionPolicyError,
13    },
14    types::InputLogEvent,
15};
16use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
17use chrono::Duration;
18use futures::{FutureExt, future::BoxFuture};
19use futures_util::TryFutureExt;
20use http::{
21    HeaderValue,
22    header::{HeaderName, InvalidHeaderName, InvalidHeaderValue},
23};
24use indexmap::IndexMap;
25use snafu::{ResultExt, Snafu};
26use tokio::sync::oneshot;
27use tower::{
28    Service, ServiceBuilder, ServiceExt,
29    buffer::Buffer,
30    limit::{ConcurrencyLimit, RateLimit},
31    retry::Retry,
32    timeout::Timeout,
33};
34use vector_lib::{
35    finalization::EventStatus,
36    request_metadata::{GroupedCountByteSize, MetaDescriptive},
37    stream::DriverResponse,
38};
39
40use crate::sinks::{
41    aws_cloudwatch_logs::{
42        CloudwatchKey,
43        config::{CloudwatchLogsSinkConfig, Retention},
44        request,
45        retry::CloudwatchRetryLogic,
46        sink::BatchCloudwatchRequest,
47    },
48    util::{EncodedLength, TowerRequestSettings, retries::FibonacciRetryPolicy},
49};
50
51type Svc = Buffer<
52    Vec<InputLogEvent>,
53    <ConcurrencyLimit<
54        RateLimit<
55            Retry<
56                FibonacciRetryPolicy<CloudwatchRetryLogic<Vec<InputLogEvent>, ()>>,
57                Buffer<
58                    Vec<InputLogEvent>,
59                    <Timeout<CloudwatchLogsSvc> as Service<Vec<InputLogEvent>>>::Future,
60                >,
61            >,
62        >,
63    > as Service<Vec<InputLogEvent>>>::Future,
64>;
65
66#[derive(Debug)]
67pub enum CloudwatchError {
68    Put(SdkError<PutLogEventsError, HttpResponse>),
69    DescribeLogStreams(SdkError<DescribeLogStreamsError, HttpResponse>),
70    CreateStream(SdkError<CreateLogStreamError, HttpResponse>),
71    CreateGroup(SdkError<CreateLogGroupError, HttpResponse>),
72    PutRetentionPolicy(SdkError<PutRetentionPolicyError, HttpResponse>),
73    NoStreamsFound,
74}
75
76impl fmt::Display for CloudwatchError {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        match self {
79            CloudwatchError::Put(error) => write!(f, "CloudwatchError::Put: {error}"),
80            CloudwatchError::DescribeLogStreams(error) => {
81                write!(f, "CloudwatchError::DescribeLogStreams: {error}")
82            }
83            CloudwatchError::CreateStream(error) => {
84                write!(f, "CloudwatchError::CreateStream: {error}")
85            }
86            CloudwatchError::CreateGroup(error) => {
87                write!(f, "CloudwatchError::CreateGroup: {error}")
88            }
89            CloudwatchError::NoStreamsFound => write!(f, "CloudwatchError: No Streams Found"),
90            CloudwatchError::PutRetentionPolicy(error) => {
91                write!(f, "CloudwatchError::PutRetentionPolicy: {error}")
92            }
93        }
94    }
95}
96
97impl std::error::Error for CloudwatchError {}
98
99impl From<SdkError<PutLogEventsError, HttpResponse>> for CloudwatchError {
100    fn from(error: SdkError<PutLogEventsError, HttpResponse>) -> Self {
101        CloudwatchError::Put(error)
102    }
103}
104
105impl From<SdkError<DescribeLogStreamsError, HttpResponse>> for CloudwatchError {
106    fn from(error: SdkError<DescribeLogStreamsError, HttpResponse>) -> Self {
107        CloudwatchError::DescribeLogStreams(error)
108    }
109}
110
111#[derive(Debug)]
112pub struct CloudwatchResponse {
113    events_byte_size: GroupedCountByteSize,
114}
115
116impl crate::sinks::util::sink::Response for CloudwatchResponse {
117    fn is_successful(&self) -> bool {
118        true
119    }
120
121    fn is_transient(&self) -> bool {
122        false
123    }
124}
125
126impl DriverResponse for CloudwatchResponse {
127    fn event_status(&self) -> EventStatus {
128        EventStatus::Delivered
129    }
130
131    fn events_sent(&self) -> &GroupedCountByteSize {
132        &self.events_byte_size
133    }
134}
135
136#[derive(Snafu, Debug)]
137enum HeaderError {
138    #[snafu(display("invalid header name {source}"))]
139    InvalidName { source: InvalidHeaderName },
140    #[snafu(display("invalid header value {source}"))]
141    InvalidValue { source: InvalidHeaderValue },
142}
143
144impl CloudwatchLogsPartitionSvc {
145    pub fn new(
146        config: CloudwatchLogsSinkConfig,
147        client: CloudwatchLogsClient,
148    ) -> crate::Result<Self> {
149        let request_settings = config.request.tower.into_settings();
150
151        let headers = config
152            .request
153            .headers
154            .iter()
155            .map(|(name, value)| {
156                Ok((
157                    HeaderName::from_bytes(name.as_bytes()).context(InvalidNameSnafu {})?,
158                    HeaderValue::from_str(value.as_str()).context(InvalidValueSnafu {})?,
159                ))
160            })
161            .collect::<Result<IndexMap<_, _>, HeaderError>>()?;
162
163        Ok(Self {
164            config,
165            clients: HashMap::new(),
166            request_settings,
167            client,
168            headers,
169        })
170    }
171}
172
173impl Service<BatchCloudwatchRequest> for CloudwatchLogsPartitionSvc {
174    type Response = CloudwatchResponse;
175    type Error = crate::Error;
176    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
177
178    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
179        Poll::Ready(Ok(()))
180    }
181
182    fn call(&mut self, mut req: BatchCloudwatchRequest) -> Self::Future {
183        let metadata = std::mem::take(req.metadata_mut());
184        let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
185
186        let key = req.key;
187        let events = req
188            .events
189            .into_iter()
190            .map(|req| {
191                InputLogEvent::builder()
192                    .message(req.message)
193                    .timestamp(req.timestamp)
194                    .build()
195                    .expect("all builder fields specified")
196            })
197            .collect();
198
199        let svc = if let Some(svc) = &mut self.clients.get_mut(&key) {
200            svc.clone()
201        } else {
202            // Concurrency limit is 1 because we need token from previous request.
203            let svc = ServiceBuilder::new()
204                .buffer(1)
205                .concurrency_limit(1)
206                .rate_limit(
207                    self.request_settings.rate_limit_num,
208                    self.request_settings.rate_limit_duration,
209                )
210                .retry(
211                    self.request_settings
212                        .retry_policy(CloudwatchRetryLogic::new()),
213                )
214                .buffer(1)
215                .timeout(self.request_settings.timeout)
216                .service(CloudwatchLogsSvc::new(
217                    self.config.clone(),
218                    &key,
219                    self.client.clone(),
220                    self.headers.clone(),
221                ));
222
223            self.clients.insert(key, svc.clone());
224            svc
225        };
226
227        svc.oneshot(events)
228            .map_ok(move |_x| CloudwatchResponse { events_byte_size })
229            .map_err(Into::into)
230            .boxed()
231    }
232}
233
234impl CloudwatchLogsSvc {
235    pub fn new(
236        config: CloudwatchLogsSinkConfig,
237        key: &CloudwatchKey,
238        client: CloudwatchLogsClient,
239        headers: IndexMap<HeaderName, HeaderValue>,
240    ) -> Self {
241        let group_name = key.group.clone();
242        let stream_name = key.stream.clone();
243
244        let create_missing_group = config.create_missing_group;
245        let create_missing_stream = config.create_missing_stream;
246
247        let retention = config.retention.clone();
248
249        let kms_key = config.kms_key.clone();
250        let tags = config.tags.clone();
251
252        CloudwatchLogsSvc {
253            headers,
254            client,
255            stream_name,
256            group_name,
257            create_missing_group,
258            create_missing_stream,
259            retention,
260            kms_key,
261            tags,
262            token: None,
263            token_rx: None,
264        }
265    }
266
267    pub fn process_events(&self, mut events: Vec<InputLogEvent>) -> Vec<Vec<InputLogEvent>> {
268        // Sort by timestamp
269        events.sort_by_key(|e| e.timestamp);
270
271        info!(message = "Sending events.", events = %events.len());
272
273        let mut event_batches = Vec::new();
274
275        // We will split events into 24h batches.
276        // Relies on log_events being sorted by timestamp in ascending order.
277        while let Some(oldest) = events.first() {
278            let limit = oldest.timestamp + Duration::days(1).num_milliseconds();
279
280            if events.last().expect("Events can't be empty").timestamp <= limit {
281                // Fast path.
282                // In most cases the difference between oldest and newest event
283                // is less than 24h.
284                event_batches.push(events);
285                break;
286            }
287
288            // At this point we know that an event older than the limit exists.
289            //
290            // We will find none or one of the events with timestamp==limit.
291            // In the case of more events with limit, we can just split them
292            // at found event, and send those before at with this batch, and
293            // those after at with the next batch.
294            let at = events
295                .binary_search_by_key(&limit, |e| e.timestamp)
296                .unwrap_or_else(|at| at);
297
298            // Can't be empty
299            let remainder = events.split_off(at);
300            event_batches.push(events);
301            events = remainder;
302        }
303
304        event_batches
305    }
306}
307
308impl Service<Vec<InputLogEvent>> for CloudwatchLogsSvc {
309    type Response = ();
310    type Error = CloudwatchError;
311    type Future = request::CloudwatchFuture;
312
313    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
314        if let Some(rx) = &mut self.token_rx {
315            self.token = ready!(rx.poll_unpin(cx)).ok().flatten();
316            self.token_rx = None;
317        }
318        Poll::Ready(Ok(()))
319    }
320
321    fn call(&mut self, req: Vec<InputLogEvent>) -> Self::Future {
322        if self.token_rx.is_none() {
323            let event_batches = self.process_events(req);
324
325            let (tx, rx) = oneshot::channel();
326            self.token_rx = Some(rx);
327
328            request::CloudwatchFuture::new(
329                self.client.clone(),
330                self.headers.clone(),
331                self.stream_name.clone(),
332                self.group_name.clone(),
333                self.create_missing_group,
334                self.create_missing_stream,
335                self.retention.clone(),
336                self.kms_key.clone(),
337                self.tags.clone(),
338                event_batches,
339                self.token.take(),
340                tx,
341            )
342        } else {
343            panic!("poll_ready was not called; this is a bug!");
344        }
345    }
346}
347
348pub struct CloudwatchLogsSvc {
349    client: CloudwatchLogsClient,
350    headers: IndexMap<HeaderName, HeaderValue>,
351    stream_name: String,
352    group_name: String,
353    create_missing_group: bool,
354    create_missing_stream: bool,
355    retention: Retention,
356    kms_key: Option<String>,
357    tags: Option<HashMap<String, String>>,
358    token: Option<String>,
359    token_rx: Option<oneshot::Receiver<Option<String>>>,
360}
361
362impl EncodedLength for InputLogEvent {
363    fn encoded_length(&self) -> usize {
364        self.message.len() + 26
365    }
366}
367
368#[derive(Clone)]
369pub struct CloudwatchLogsPartitionSvc {
370    config: CloudwatchLogsSinkConfig,
371    headers: IndexMap<HeaderName, HeaderValue>,
372    clients: HashMap<CloudwatchKey, Svc>,
373    request_settings: TowerRequestSettings,
374    client: CloudwatchLogsClient,
375}