vector/sinks/aws_cloudwatch_logs/
sink.rs

1use std::fmt;
2
3use async_trait::async_trait;
4use chrono::{Duration, Utc};
5use futures::{future, stream::BoxStream, StreamExt};
6use tower::Service;
7use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata};
8use vector_lib::stream::{BatcherSettings, DriverResponse};
9use vector_lib::{partition::Partitioner, sink::StreamSink};
10
11use crate::{
12    event::{Event, EventFinalizers, Finalizable},
13    sinks::{
14        aws_cloudwatch_logs::{
15            request_builder::{CloudwatchRequest, CloudwatchRequestBuilder},
16            CloudwatchKey,
17        },
18        util::SinkBuilderExt,
19    },
20};
21
22pub struct CloudwatchSink<S> {
23    pub batcher_settings: BatcherSettings,
24    pub(super) request_builder: CloudwatchRequestBuilder,
25    pub service: S,
26}
27
28impl<S> CloudwatchSink<S>
29where
30    S: Service<BatchCloudwatchRequest> + Send + 'static,
31    S::Future: Send + 'static,
32    S::Response: DriverResponse + Send + 'static,
33    S::Error: fmt::Debug + Into<crate::Error> + Send,
34{
35    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
36        let mut request_builder = self.request_builder;
37        let batcher_settings = self.batcher_settings;
38        let service = self.service;
39
40        input
41            .filter_map(|event| future::ready(request_builder.build(event)))
42            .filter(|req| {
43                let now = Utc::now();
44                let start = (now - Duration::days(14) + Duration::minutes(5)).timestamp_millis();
45                let end = (now + Duration::hours(2)).timestamp_millis();
46                let age_range = start..end;
47                future::ready(age_range.contains(&req.timestamp))
48            })
49            .batched_partitioned(CloudwatchPartitioner, || {
50                batcher_settings.as_byte_size_config()
51            })
52            .map(|(key, events)| {
53                let metadata = RequestMetadata::from_batch(
54                    events.iter().map(|req| req.get_metadata().clone()),
55                );
56
57                BatchCloudwatchRequest {
58                    key,
59                    events,
60                    metadata,
61                }
62            })
63            .into_driver(service)
64            .run()
65            .await
66    }
67}
68
69#[derive(Clone)]
70pub struct BatchCloudwatchRequest {
71    pub key: CloudwatchKey,
72    pub events: Vec<CloudwatchRequest>,
73    metadata: RequestMetadata,
74}
75
76impl Finalizable for BatchCloudwatchRequest {
77    fn take_finalizers(&mut self) -> EventFinalizers {
78        self.events.take_finalizers()
79    }
80}
81
82impl MetaDescriptive for BatchCloudwatchRequest {
83    fn get_metadata(&self) -> &RequestMetadata {
84        &self.metadata
85    }
86
87    fn metadata_mut(&mut self) -> &mut RequestMetadata {
88        &mut self.metadata
89    }
90}
91
92struct CloudwatchPartitioner;
93
94impl Partitioner for CloudwatchPartitioner {
95    type Item = CloudwatchRequest;
96    type Key = CloudwatchKey;
97
98    fn partition(&self, item: &Self::Item) -> Self::Key {
99        item.key.clone()
100    }
101}
102
103#[async_trait]
104impl<S> StreamSink<Event> for CloudwatchSink<S>
105where
106    S: Service<BatchCloudwatchRequest> + Send + 'static,
107    S::Future: Send + 'static,
108    S::Response: DriverResponse + Send + 'static,
109    S::Error: fmt::Debug + Into<crate::Error> + Send,
110{
111    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
112        self.run_inner(input).await
113    }
114}