vector/sinks/aws_cloudwatch_logs/
config.rs1use std::collections::HashMap;
2
3use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
4use futures::FutureExt;
5use serde::{Deserialize, Deserializer, de};
6use tower::ServiceBuilder;
7use vector_lib::{codecs::JsonSerializerConfig, configurable::configurable_component, schema};
8use vrl::value::Kind;
9
10use crate::{
11 aws::{AwsAuthentication, ClientBuilder, RegionOrEndpoint, create_client},
12 codecs::{Encoder, EncodingConfig},
13 config::{
14 AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
15 SinkContext,
16 },
17 sinks::{
18 Healthcheck, VectorSink,
19 aws_cloudwatch_logs::{
20 healthcheck::healthcheck, request_builder::CloudwatchRequestBuilder,
21 retry::CloudwatchRetryLogic, service::CloudwatchLogsPartitionSvc, sink::CloudwatchSink,
22 },
23 util::{
24 BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, http::RequestConfig,
25 },
26 },
27 template::Template,
28 tls::TlsConfig,
29};
30
31pub struct CloudwatchLogsClientBuilder;
32
33impl ClientBuilder for CloudwatchLogsClientBuilder {
34 type Client = aws_sdk_cloudwatchlogs::client::Client;
35
36 fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
37 aws_sdk_cloudwatchlogs::client::Client::new(config)
38 }
39}
40
41#[configurable_component]
42#[derive(Clone, Debug, Default)]
43pub struct Retention {
45 #[serde(default)]
47 pub enabled: bool,
48
49 #[serde(
51 default,
52 deserialize_with = "retention_days",
53 skip_serializing_if = "crate::serde::is_default"
54 )]
55 pub days: u32,
56}
57
58fn retention_days<'de, D>(deserializer: D) -> Result<u32, D::Error>
59where
60 D: Deserializer<'de>,
61{
62 let days: u32 = Deserialize::deserialize(deserializer)?;
63 const ALLOWED_VALUES: &[u32] = &[
64 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557,
65 2922, 3288, 3653,
66 ];
67 if ALLOWED_VALUES.contains(&days) {
68 Ok(days)
69 } else {
70 let msg = format!("one of allowed values: {ALLOWED_VALUES:?}").to_owned();
71 let expected: &str = &msg[..];
72 Err(de::Error::invalid_value(
73 de::Unexpected::Signed(days.into()),
74 &expected,
75 ))
76 }
77}
78
79#[configurable_component(sink(
81 "aws_cloudwatch_logs",
82 "Publish log events to AWS CloudWatch Logs."
83))]
84#[derive(Clone, Debug)]
85#[serde(deny_unknown_fields)]
86pub struct CloudwatchLogsSinkConfig {
87 #[configurable(metadata(docs::examples = "group-name"))]
91 #[configurable(metadata(docs::examples = "{{ file }}"))]
92 pub group_name: Template,
93
94 #[configurable(metadata(docs::examples = "{{ host }}"))]
102 #[configurable(metadata(docs::examples = "%Y-%m-%d"))]
103 #[configurable(metadata(docs::examples = "stream-name"))]
104 pub stream_name: Template,
105
106 #[serde(flatten)]
110 pub region: RegionOrEndpoint,
111
112 #[serde(default = "crate::serde::default_true")]
119 pub create_missing_group: bool,
120
121 #[serde(default = "crate::serde::default_true")]
125 pub create_missing_stream: bool,
126
127 #[configurable(derived)]
128 #[serde(default)]
129 pub retention: Retention,
130
131 #[configurable(derived)]
132 pub encoding: EncodingConfig,
133
134 #[configurable(derived)]
135 #[serde(default)]
136 pub compression: Compression,
137
138 #[configurable(derived)]
139 #[serde(default)]
140 pub batch: BatchConfig<CloudwatchLogsDefaultBatchSettings>,
141
142 #[configurable(derived)]
143 #[serde(default)]
144 pub request: RequestConfig,
145
146 #[configurable(derived)]
147 pub tls: Option<TlsConfig>,
148
149 #[configurable(deprecated)]
153 #[configurable(metadata(docs::hidden))]
154 pub assume_role: Option<String>,
155
156 #[configurable(derived)]
157 #[serde(default)]
158 pub auth: AwsAuthentication,
159
160 #[configurable(derived)]
161 #[serde(
162 default,
163 deserialize_with = "crate::serde::bool_or_struct",
164 skip_serializing_if = "crate::serde::is_default"
165 )]
166 pub acknowledgements: AcknowledgementsConfig,
167
168 #[configurable(derived)]
173 #[serde(default)]
174 pub kms_key: Option<String>,
175
176 #[configurable(derived)]
180 #[serde(default)]
181 #[configurable(metadata(
182 docs::additional_props_description = "A tag represented as a key-value pair"
183 ))]
184 pub tags: Option<HashMap<String, String>>,
185}
186
187impl CloudwatchLogsSinkConfig {
188 pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchLogsClient> {
189 create_client::<CloudwatchLogsClientBuilder>(
190 &CloudwatchLogsClientBuilder {},
191 &self.auth,
192 self.region.region(),
193 self.region.endpoint(),
194 proxy,
195 self.tls.as_ref(),
196 None,
197 )
198 .await
199 }
200}
201
202#[async_trait::async_trait]
203#[typetag::serde(name = "aws_cloudwatch_logs")]
204impl SinkConfig for CloudwatchLogsSinkConfig {
205 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
206 let batcher_settings = self.batch.into_batcher_settings()?;
207 let request_settings = self.request.tower.into_settings();
208 let client = self.create_client(cx.proxy()).await?;
209 let svc = ServiceBuilder::new()
210 .settings(request_settings, CloudwatchRetryLogic::new())
211 .service(CloudwatchLogsPartitionSvc::new(
212 self.clone(),
213 client.clone(),
214 )?);
215 let transformer = self.encoding.transformer();
216 let serializer = self.encoding.build()?;
217 let encoder = Encoder::<()>::new(serializer);
218 let healthcheck = healthcheck(self.clone(), client).boxed();
219 let sink = CloudwatchSink {
220 batcher_settings,
221 request_builder: CloudwatchRequestBuilder {
222 group_template: self.group_name.clone(),
223 stream_template: self.stream_name.clone(),
224 transformer,
225 encoder,
226 },
227
228 service: svc,
229 };
230
231 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
232 }
233
234 fn input(&self) -> Input {
235 let requirement =
236 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
237
238 Input::new(self.encoding.config().input_type() & DataType::Log)
239 .with_schema_requirement(requirement)
240 }
241
242 fn acknowledgements(&self) -> &AcknowledgementsConfig {
243 &self.acknowledgements
244 }
245}
246
247impl GenerateConfig for CloudwatchLogsSinkConfig {
248 fn generate_config() -> toml::Value {
249 toml::Value::try_from(default_config(JsonSerializerConfig::default().into())).unwrap()
250 }
251}
252
253fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
254 CloudwatchLogsSinkConfig {
255 encoding,
256 group_name: Default::default(),
257 stream_name: Default::default(),
258 region: Default::default(),
259 create_missing_group: true,
260 create_missing_stream: true,
261 retention: Default::default(),
262 compression: Default::default(),
263 batch: Default::default(),
264 request: Default::default(),
265 tls: Default::default(),
266 assume_role: Default::default(),
267 auth: Default::default(),
268 acknowledgements: Default::default(),
269 kms_key: Default::default(),
270 tags: Default::default(),
271 }
272}
273
274#[derive(Clone, Copy, Debug, Default)]
275pub struct CloudwatchLogsDefaultBatchSettings;
276
277impl SinkBatchSettings for CloudwatchLogsDefaultBatchSettings {
278 const MAX_EVENTS: Option<usize> = Some(10_000);
279 const MAX_BYTES: Option<usize> = Some(1_048_576);
280 const TIMEOUT_SECS: f64 = 1.0;
281}
282
283#[cfg(test)]
284mod tests {
285 use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsSinkConfig;
286
287 #[test]
288 fn test_generate_config() {
289 crate::test_util::test_generate_config::<CloudwatchLogsSinkConfig>();
290 }
291}