vector/sinks/aws_cloudwatch_logs/
sink.rs1use std::fmt;
2
3use async_trait::async_trait;
4use chrono::{Duration, Utc};
5use futures::{StreamExt, future, stream::BoxStream};
6use tower::Service;
7use vector_lib::{
8 partition::Partitioner,
9 request_metadata::{MetaDescriptive, RequestMetadata},
10 sink::StreamSink,
11 stream::{BatcherSettings, DriverResponse},
12};
13
14use crate::{
15 event::{Event, EventFinalizers, Finalizable},
16 sinks::{
17 aws_cloudwatch_logs::{
18 CloudwatchKey,
19 request_builder::{CloudwatchRequest, CloudwatchRequestBuilder},
20 },
21 util::SinkBuilderExt,
22 },
23};
24
25pub struct CloudwatchSink<S> {
26 pub batcher_settings: BatcherSettings,
27 pub(super) request_builder: CloudwatchRequestBuilder,
28 pub service: S,
29}
30
31impl<S> CloudwatchSink<S>
32where
33 S: Service<BatchCloudwatchRequest> + Send + 'static,
34 S::Future: Send + 'static,
35 S::Response: DriverResponse + Send + 'static,
36 S::Error: fmt::Debug + Into<crate::Error> + Send,
37{
38 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
39 let mut request_builder = self.request_builder;
40 let batcher_settings = self.batcher_settings;
41 let service = self.service;
42
43 input
44 .filter_map(|event| future::ready(request_builder.build(event)))
45 .filter(|req| {
46 let now = Utc::now();
47 let start = (now - Duration::days(14) + Duration::minutes(5)).timestamp_millis();
48 let end = (now + Duration::hours(2)).timestamp_millis();
49 let age_range = start..end;
50 future::ready(age_range.contains(&req.timestamp))
51 })
52 .batched_partitioned(CloudwatchPartitioner, || {
53 batcher_settings.as_byte_size_config()
54 })
55 .map(|(key, events)| {
56 let metadata = RequestMetadata::from_batch(
57 events.iter().map(|req| req.get_metadata().clone()),
58 );
59
60 BatchCloudwatchRequest {
61 key,
62 events,
63 metadata,
64 }
65 })
66 .into_driver(service)
67 .run()
68 .await
69 }
70}
71
72#[derive(Clone)]
73pub struct BatchCloudwatchRequest {
74 pub key: CloudwatchKey,
75 pub events: Vec<CloudwatchRequest>,
76 metadata: RequestMetadata,
77}
78
79impl Finalizable for BatchCloudwatchRequest {
80 fn take_finalizers(&mut self) -> EventFinalizers {
81 self.events.take_finalizers()
82 }
83}
84
85impl MetaDescriptive for BatchCloudwatchRequest {
86 fn get_metadata(&self) -> &RequestMetadata {
87 &self.metadata
88 }
89
90 fn metadata_mut(&mut self) -> &mut RequestMetadata {
91 &mut self.metadata
92 }
93}
94
95struct CloudwatchPartitioner;
96
97impl Partitioner for CloudwatchPartitioner {
98 type Item = CloudwatchRequest;
99 type Key = CloudwatchKey;
100
101 fn partition(&self, item: &Self::Item) -> Self::Key {
102 item.key.clone()
103 }
104}
105
106#[async_trait]
107impl<S> StreamSink<Event> for CloudwatchSink<S>
108where
109 S: Service<BatchCloudwatchRequest> + Send + 'static,
110 S::Future: Send + 'static,
111 S::Response: DriverResponse + Send + 'static,
112 S::Error: fmt::Debug + Into<crate::Error> + Send,
113{
114 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
115 self.run_inner(input).await
116 }
117}