1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::fmt;

use async_trait::async_trait;
use chrono::{Duration, Utc};
use futures::{future, stream::BoxStream, StreamExt};
use tower::Service;
use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata};
use vector_lib::stream::{BatcherSettings, DriverResponse};
use vector_lib::{partition::Partitioner, sink::StreamSink};

use crate::{
    event::{Event, EventFinalizers, Finalizable},
    sinks::{
        aws_cloudwatch_logs::{
            request_builder::{CloudwatchRequest, CloudwatchRequestBuilder},
            CloudwatchKey,
        },
        util::SinkBuilderExt,
    },
};

pub struct CloudwatchSink<S> {
    pub batcher_settings: BatcherSettings,
    pub(super) request_builder: CloudwatchRequestBuilder,
    pub service: S,
}

impl<S> CloudwatchSink<S>
where
    S: Service<BatchCloudwatchRequest> + Send + 'static,
    S::Future: Send + 'static,
    S::Response: DriverResponse + Send + 'static,
    S::Error: fmt::Debug + Into<crate::Error> + Send,
{
    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        let mut request_builder = self.request_builder;
        let batcher_settings = self.batcher_settings;
        let service = self.service;

        input
            .filter_map(|event| future::ready(request_builder.build(event)))
            .filter(|req| {
                let now = Utc::now();
                let start = (now - Duration::days(14) + Duration::minutes(5)).timestamp_millis();
                let end = (now + Duration::hours(2)).timestamp_millis();
                let age_range = start..end;
                future::ready(age_range.contains(&req.timestamp))
            })
            .batched_partitioned(CloudwatchPartitioner, || {
                batcher_settings.as_byte_size_config()
            })
            .map(|(key, events)| {
                let metadata = RequestMetadata::from_batch(
                    events.iter().map(|req| req.get_metadata().clone()),
                );

                BatchCloudwatchRequest {
                    key,
                    events,
                    metadata,
                }
            })
            .into_driver(service)
            .run()
            .await
    }
}

#[derive(Clone)]
pub struct BatchCloudwatchRequest {
    pub key: CloudwatchKey,
    pub events: Vec<CloudwatchRequest>,
    metadata: RequestMetadata,
}

impl Finalizable for BatchCloudwatchRequest {
    fn take_finalizers(&mut self) -> EventFinalizers {
        self.events.take_finalizers()
    }
}

impl MetaDescriptive for BatchCloudwatchRequest {
    fn get_metadata(&self) -> &RequestMetadata {
        &self.metadata
    }

    fn metadata_mut(&mut self) -> &mut RequestMetadata {
        &mut self.metadata
    }
}

struct CloudwatchPartitioner;

impl Partitioner for CloudwatchPartitioner {
    type Item = CloudwatchRequest;
    type Key = CloudwatchKey;

    fn partition(&self, item: &Self::Item) -> Self::Key {
        item.key.clone()
    }
}

#[async_trait]
impl<S> StreamSink<Event> for CloudwatchSink<S>
where
    S: Service<BatchCloudwatchRequest> + Send + 'static,
    S::Future: Send + 'static,
    S::Response: DriverResponse + Send + 'static,
    S::Error: fmt::Debug + Into<crate::Error> + Send,
{
    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        self.run_inner(input).await
    }
}