vector/sinks/aws_cloudwatch_logs/
sink.rs

1use std::fmt;
2
3use async_trait::async_trait;
4use chrono::{Duration, Utc};
5use futures::{StreamExt, future, stream::BoxStream};
6use tower::Service;
7use vector_lib::{
8    partition::Partitioner,
9    request_metadata::{MetaDescriptive, RequestMetadata},
10    sink::StreamSink,
11    stream::{BatcherSettings, DriverResponse},
12};
13
14use crate::{
15    event::{Event, EventFinalizers, Finalizable},
16    sinks::{
17        aws_cloudwatch_logs::{
18            CloudwatchKey,
19            request_builder::{CloudwatchRequest, CloudwatchRequestBuilder},
20        },
21        util::SinkBuilderExt,
22    },
23};
24
25pub struct CloudwatchSink<S> {
26    pub batcher_settings: BatcherSettings,
27    pub(super) request_builder: CloudwatchRequestBuilder,
28    pub service: S,
29}
30
31impl<S> CloudwatchSink<S>
32where
33    S: Service<BatchCloudwatchRequest> + Send + 'static,
34    S::Future: Send + 'static,
35    S::Response: DriverResponse + Send + 'static,
36    S::Error: fmt::Debug + Into<crate::Error> + Send,
37{
38    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
39        let mut request_builder = self.request_builder;
40        let batcher_settings = self.batcher_settings;
41        let service = self.service;
42
43        input
44            .filter_map(|event| future::ready(request_builder.build(event)))
45            .filter(|req| {
46                let now = Utc::now();
47                let start = (now - Duration::days(14) + Duration::minutes(5)).timestamp_millis();
48                let end = (now + Duration::hours(2)).timestamp_millis();
49                let age_range = start..end;
50                future::ready(age_range.contains(&req.timestamp))
51            })
52            .batched_partitioned(CloudwatchPartitioner, || {
53                batcher_settings.as_byte_size_config()
54            })
55            .map(|(key, events)| {
56                let metadata = RequestMetadata::from_batch(
57                    events.iter().map(|req| req.get_metadata().clone()),
58                );
59
60                BatchCloudwatchRequest {
61                    key,
62                    events,
63                    metadata,
64                }
65            })
66            .into_driver(service)
67            .run()
68            .await
69    }
70}
71
72#[derive(Clone)]
73pub struct BatchCloudwatchRequest {
74    pub key: CloudwatchKey,
75    pub events: Vec<CloudwatchRequest>,
76    metadata: RequestMetadata,
77}
78
79impl Finalizable for BatchCloudwatchRequest {
80    fn take_finalizers(&mut self) -> EventFinalizers {
81        self.events.take_finalizers()
82    }
83}
84
85impl MetaDescriptive for BatchCloudwatchRequest {
86    fn get_metadata(&self) -> &RequestMetadata {
87        &self.metadata
88    }
89
90    fn metadata_mut(&mut self) -> &mut RequestMetadata {
91        &mut self.metadata
92    }
93}
94
95struct CloudwatchPartitioner;
96
97impl Partitioner for CloudwatchPartitioner {
98    type Item = CloudwatchRequest;
99    type Key = CloudwatchKey;
100
101    fn partition(&self, item: &Self::Item) -> Self::Key {
102        item.key.clone()
103    }
104}
105
106#[async_trait]
107impl<S> StreamSink<Event> for CloudwatchSink<S>
108where
109    S: Service<BatchCloudwatchRequest> + Send + 'static,
110    S::Future: Send + 'static,
111    S::Response: DriverResponse + Send + 'static,
112    S::Error: fmt::Debug + Into<crate::Error> + Send,
113{
114    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
115        self.run_inner(input).await
116    }
117}