vector/sinks/aws_cloudwatch_logs/
config.rs1use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
2use futures::FutureExt;
3use serde::{de, Deserialize, Deserializer};
4use std::collections::HashMap;
5use tower::ServiceBuilder;
6use vector_lib::codecs::JsonSerializerConfig;
7use vector_lib::configurable::configurable_component;
8use vector_lib::schema;
9use vrl::value::Kind;
10
11use crate::{
12 aws::{create_client, AwsAuthentication, ClientBuilder, RegionOrEndpoint},
13 codecs::{Encoder, EncodingConfig},
14 config::{
15 AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
16 SinkContext,
17 },
18 sinks::{
19 aws_cloudwatch_logs::{
20 healthcheck::healthcheck, request_builder::CloudwatchRequestBuilder,
21 retry::CloudwatchRetryLogic, service::CloudwatchLogsPartitionSvc, sink::CloudwatchSink,
22 },
23 util::{
24 http::RequestConfig, BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings,
25 },
26 Healthcheck, VectorSink,
27 },
28 template::Template,
29 tls::TlsConfig,
30};
31
32pub struct CloudwatchLogsClientBuilder;
33
34impl ClientBuilder for CloudwatchLogsClientBuilder {
35 type Client = aws_sdk_cloudwatchlogs::client::Client;
36
37 fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
38 aws_sdk_cloudwatchlogs::client::Client::new(config)
39 }
40}
41
42#[configurable_component]
43#[derive(Clone, Debug, Default)]
44pub struct Retention {
46 #[serde(default)]
48 pub enabled: bool,
49
50 #[serde(
52 default,
53 deserialize_with = "retention_days",
54 skip_serializing_if = "crate::serde::is_default"
55 )]
56 pub days: u32,
57}
58
59fn retention_days<'de, D>(deserializer: D) -> Result<u32, D::Error>
60where
61 D: Deserializer<'de>,
62{
63 let days: u32 = Deserialize::deserialize(deserializer)?;
64 const ALLOWED_VALUES: &[u32] = &[
65 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557,
66 2922, 3288, 3653,
67 ];
68 if ALLOWED_VALUES.contains(&days) {
69 Ok(days)
70 } else {
71 let msg = format!("one of allowed values: {ALLOWED_VALUES:?}").to_owned();
72 let expected: &str = &msg[..];
73 Err(de::Error::invalid_value(
74 de::Unexpected::Signed(days.into()),
75 &expected,
76 ))
77 }
78}
79
80#[configurable_component(sink(
82 "aws_cloudwatch_logs",
83 "Publish log events to AWS CloudWatch Logs."
84))]
85#[derive(Clone, Debug)]
86#[serde(deny_unknown_fields)]
87pub struct CloudwatchLogsSinkConfig {
88 #[configurable(metadata(docs::examples = "group-name"))]
92 #[configurable(metadata(docs::examples = "{{ file }}"))]
93 pub group_name: Template,
94
95 #[configurable(metadata(docs::examples = "{{ host }}"))]
103 #[configurable(metadata(docs::examples = "%Y-%m-%d"))]
104 #[configurable(metadata(docs::examples = "stream-name"))]
105 pub stream_name: Template,
106
107 #[serde(flatten)]
111 pub region: RegionOrEndpoint,
112
113 #[serde(default = "crate::serde::default_true")]
120 pub create_missing_group: bool,
121
122 #[serde(default = "crate::serde::default_true")]
126 pub create_missing_stream: bool,
127
128 #[configurable(derived)]
129 #[serde(default)]
130 pub retention: Retention,
131
132 #[configurable(derived)]
133 pub encoding: EncodingConfig,
134
135 #[configurable(derived)]
136 #[serde(default)]
137 pub compression: Compression,
138
139 #[configurable(derived)]
140 #[serde(default)]
141 pub batch: BatchConfig<CloudwatchLogsDefaultBatchSettings>,
142
143 #[configurable(derived)]
144 #[serde(default)]
145 pub request: RequestConfig,
146
147 #[configurable(derived)]
148 pub tls: Option<TlsConfig>,
149
150 #[configurable(deprecated)]
154 #[configurable(metadata(docs::hidden))]
155 pub assume_role: Option<String>,
156
157 #[configurable(derived)]
158 #[serde(default)]
159 pub auth: AwsAuthentication,
160
161 #[configurable(derived)]
162 #[serde(
163 default,
164 deserialize_with = "crate::serde::bool_or_struct",
165 skip_serializing_if = "crate::serde::is_default"
166 )]
167 pub acknowledgements: AcknowledgementsConfig,
168
169 #[configurable(derived)]
174 #[serde(default)]
175 pub kms_key: Option<String>,
176
177 #[configurable(derived)]
181 #[serde(default)]
182 #[configurable(metadata(
183 docs::additional_props_description = "A tag represented as a key-value pair"
184 ))]
185 pub tags: Option<HashMap<String, String>>,
186}
187
188impl CloudwatchLogsSinkConfig {
189 pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchLogsClient> {
190 create_client::<CloudwatchLogsClientBuilder>(
191 &CloudwatchLogsClientBuilder {},
192 &self.auth,
193 self.region.region(),
194 self.region.endpoint(),
195 proxy,
196 self.tls.as_ref(),
197 None,
198 )
199 .await
200 }
201}
202
203#[async_trait::async_trait]
204#[typetag::serde(name = "aws_cloudwatch_logs")]
205impl SinkConfig for CloudwatchLogsSinkConfig {
206 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
207 let batcher_settings = self.batch.into_batcher_settings()?;
208 let request_settings = self.request.tower.into_settings();
209 let client = self.create_client(cx.proxy()).await?;
210 let svc = ServiceBuilder::new()
211 .settings(request_settings, CloudwatchRetryLogic::new())
212 .service(CloudwatchLogsPartitionSvc::new(
213 self.clone(),
214 client.clone(),
215 )?);
216 let transformer = self.encoding.transformer();
217 let serializer = self.encoding.build()?;
218 let encoder = Encoder::<()>::new(serializer);
219 let healthcheck = healthcheck(self.clone(), client).boxed();
220 let sink = CloudwatchSink {
221 batcher_settings,
222 request_builder: CloudwatchRequestBuilder {
223 group_template: self.group_name.clone(),
224 stream_template: self.stream_name.clone(),
225 transformer,
226 encoder,
227 },
228
229 service: svc,
230 };
231
232 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
233 }
234
235 fn input(&self) -> Input {
236 let requirement =
237 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
238
239 Input::new(self.encoding.config().input_type() & DataType::Log)
240 .with_schema_requirement(requirement)
241 }
242
243 fn acknowledgements(&self) -> &AcknowledgementsConfig {
244 &self.acknowledgements
245 }
246}
247
248impl GenerateConfig for CloudwatchLogsSinkConfig {
249 fn generate_config() -> toml::Value {
250 toml::Value::try_from(default_config(JsonSerializerConfig::default().into())).unwrap()
251 }
252}
253
254fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
255 CloudwatchLogsSinkConfig {
256 encoding,
257 group_name: Default::default(),
258 stream_name: Default::default(),
259 region: Default::default(),
260 create_missing_group: true,
261 create_missing_stream: true,
262 retention: Default::default(),
263 compression: Default::default(),
264 batch: Default::default(),
265 request: Default::default(),
266 tls: Default::default(),
267 assume_role: Default::default(),
268 auth: Default::default(),
269 acknowledgements: Default::default(),
270 kms_key: Default::default(),
271 tags: Default::default(),
272 }
273}
274
275#[derive(Clone, Copy, Debug, Default)]
276pub struct CloudwatchLogsDefaultBatchSettings;
277
278impl SinkBatchSettings for CloudwatchLogsDefaultBatchSettings {
279 const MAX_EVENTS: Option<usize> = Some(10_000);
280 const MAX_BYTES: Option<usize> = Some(1_048_576);
281 const TIMEOUT_SECS: f64 = 1.0;
282}
283
284#[cfg(test)]
285mod tests {
286 use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsSinkConfig;
287
288 #[test]
289 fn test_generate_config() {
290 crate::test_util::test_generate_config::<CloudwatchLogsSinkConfig>();
291 }
292}