use aws_sdk_cloudwatchlogs::Client as CloudwatchLogsClient;
use futures::FutureExt;
use serde::{de, Deserialize, Deserializer};
use tower::ServiceBuilder;
use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::configurable::configurable_component;
use vector_lib::schema;
use vrl::value::Kind;
use crate::{
aws::{create_client, AwsAuthentication, ClientBuilder, RegionOrEndpoint},
codecs::{Encoder, EncodingConfig},
config::{
AcknowledgementsConfig, DataType, GenerateConfig, Input, ProxyConfig, SinkConfig,
SinkContext,
},
sinks::{
aws_cloudwatch_logs::{
healthcheck::healthcheck, request_builder::CloudwatchRequestBuilder,
retry::CloudwatchRetryLogic, service::CloudwatchLogsPartitionSvc, sink::CloudwatchSink,
},
util::{
http::RequestConfig, BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings,
},
Healthcheck, VectorSink,
},
template::Template,
tls::TlsConfig,
};
pub struct CloudwatchLogsClientBuilder;
impl ClientBuilder for CloudwatchLogsClientBuilder {
type Client = aws_sdk_cloudwatchlogs::client::Client;
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
aws_sdk_cloudwatchlogs::client::Client::new(config)
}
}
#[configurable_component]
#[derive(Clone, Debug, Default)]
pub struct Retention {
#[serde(default)]
pub enabled: bool,
#[serde(
default,
deserialize_with = "retention_days",
skip_serializing_if = "crate::serde::is_default"
)]
pub days: u32,
}
fn retention_days<'de, D>(deserializer: D) -> Result<u32, D::Error>
where
D: Deserializer<'de>,
{
let days: u32 = Deserialize::deserialize(deserializer)?;
const ALLOWED_VALUES: &[u32] = &[
1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1096, 1827, 2192, 2557,
2922, 3288, 3653,
];
if ALLOWED_VALUES.contains(&days) {
Ok(days)
} else {
let msg = format!("one of allowed values: {:?}", ALLOWED_VALUES).to_owned();
let expected: &str = &msg[..];
Err(de::Error::invalid_value(
de::Unexpected::Signed(days.into()),
&expected,
))
}
}
#[configurable_component(sink(
"aws_cloudwatch_logs",
"Publish log events to AWS CloudWatch Logs."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct CloudwatchLogsSinkConfig {
#[configurable(metadata(docs::examples = "group-name"))]
#[configurable(metadata(docs::examples = "{{ file }}"))]
pub group_name: Template,
#[configurable(metadata(docs::examples = "{{ host }}"))]
#[configurable(metadata(docs::examples = "%Y-%m-%d"))]
#[configurable(metadata(docs::examples = "stream-name"))]
pub stream_name: Template,
#[serde(flatten)]
pub region: RegionOrEndpoint,
#[serde(default = "crate::serde::default_true")]
pub create_missing_group: bool,
#[serde(default = "crate::serde::default_true")]
pub create_missing_stream: bool,
#[configurable(derived)]
#[serde(default)]
pub retention: Retention,
#[configurable(derived)]
pub encoding: EncodingConfig,
#[configurable(derived)]
#[serde(default)]
pub compression: Compression,
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<CloudwatchLogsDefaultBatchSettings>,
#[configurable(derived)]
#[serde(default)]
pub request: RequestConfig,
#[configurable(derived)]
pub tls: Option<TlsConfig>,
#[configurable(deprecated)]
#[configurable(metadata(docs::hidden))]
pub assume_role: Option<String>,
#[configurable(derived)]
#[serde(default)]
pub auth: AwsAuthentication,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
}
impl CloudwatchLogsSinkConfig {
pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchLogsClient> {
create_client::<CloudwatchLogsClientBuilder>(
&CloudwatchLogsClientBuilder {},
&self.auth,
self.region.region(),
self.region.endpoint(),
proxy,
&self.tls,
&None,
)
.await
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "aws_cloudwatch_logs")]
impl SinkConfig for CloudwatchLogsSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let batcher_settings = self.batch.into_batcher_settings()?;
let request_settings = self.request.tower.into_settings();
let client = self.create_client(cx.proxy()).await?;
let svc = ServiceBuilder::new()
.settings(request_settings, CloudwatchRetryLogic::new())
.service(CloudwatchLogsPartitionSvc::new(
self.clone(),
client.clone(),
)?);
let transformer = self.encoding.transformer();
let serializer = self.encoding.build()?;
let encoder = Encoder::<()>::new(serializer);
let healthcheck = healthcheck(self.clone(), client).boxed();
let sink = CloudwatchSink {
batcher_settings,
request_builder: CloudwatchRequestBuilder {
group_template: self.group_name.clone(),
stream_template: self.stream_name.clone(),
transformer,
encoder,
},
service: svc,
};
Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}
fn input(&self) -> Input {
let requirement =
schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
Input::new(self.encoding.config().input_type() & DataType::Log)
.with_schema_requirement(requirement)
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
impl GenerateConfig for CloudwatchLogsSinkConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(default_config(JsonSerializerConfig::default().into())).unwrap()
}
}
fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig {
CloudwatchLogsSinkConfig {
encoding,
group_name: Default::default(),
stream_name: Default::default(),
region: Default::default(),
create_missing_group: true,
create_missing_stream: true,
retention: Default::default(),
compression: Default::default(),
batch: Default::default(),
request: Default::default(),
tls: Default::default(),
assume_role: Default::default(),
auth: Default::default(),
acknowledgements: Default::default(),
}
}
#[derive(Clone, Copy, Debug, Default)]
pub struct CloudwatchLogsDefaultBatchSettings;
impl SinkBatchSettings for CloudwatchLogsDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(10_000);
const MAX_BYTES: Option<usize> = Some(1_048_576);
const TIMEOUT_SECS: f64 = 1.0;
}
#[cfg(test)]
mod tests {
use crate::sinks::aws_cloudwatch_logs::config::CloudwatchLogsSinkConfig;
#[test]
fn test_generate_config() {
crate::test_util::test_generate_config::<CloudwatchLogsSinkConfig>();
}
}