use std::sync::Arc;
use vector_lib::{
codecs::TextSerializerConfig,
lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath},
sensitive_string::SensitiveString,
};
use crate::{
http::HttpClient,
sinks::{
prelude::*,
splunk_hec::common::{
acknowledgements::HecClientAcknowledgementsConfig,
build_healthcheck, build_http_batch_service, create_client,
service::{HecService, HttpRequestBuilder},
EndpointTarget, SplunkHecDefaultBatchSettings,
},
util::http::HttpRetryLogic,
},
};
use super::{encoder::HecLogsEncoder, request_builder::HecLogsRequestBuilder, sink::HecLogsSink};
#[configurable_component(sink(
"splunk_hec_logs",
"Deliver log data to Splunk's HTTP Event Collector."
))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct HecLogsSinkConfig {
#[serde(alias = "token")]
pub default_token: SensitiveString,
#[configurable(metadata(
docs::examples = "https://http-inputs-hec.splunkcloud.com",
docs::examples = "https://hec.splunk.com:8088",
docs::examples = "http://example.com"
))]
#[configurable(validation(format = "uri"))]
pub endpoint: String,
#[configurable(metadata(docs::advanced))]
pub host_key: Option<OptionalTargetPath>,
#[configurable(metadata(docs::advanced))]
#[serde(default)]
#[configurable(metadata(docs::examples = "field1", docs::examples = "field2"))]
pub indexed_fields: Vec<ConfigValuePath>,
#[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
pub index: Option<Template>,
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(docs::examples = "{{ sourcetype }}", docs::examples = "_json",))]
pub sourcetype: Option<Template>,
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(
docs::examples = "{{ file }}",
docs::examples = "/var/log/syslog",
docs::examples = "UDP:514"
))]
pub source: Option<Template>,
#[configurable(derived)]
pub encoding: EncodingConfig,
#[configurable(derived)]
#[serde(default)]
pub compression: Compression,
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
#[configurable(derived)]
pub tls: Option<TlsConfig>,
#[configurable(derived)]
#[serde(default)]
pub acknowledgements: HecClientAcknowledgementsConfig,
#[serde(skip)]
pub timestamp_nanos_key: Option<String>,
#[configurable(metadata(docs::advanced))]
#[configurable(metadata(docs::examples = "timestamp", docs::examples = ""))]
pub timestamp_key: Option<OptionalTargetPath>,
#[serde(default)]
pub auto_extract_timestamp: Option<bool>,
#[configurable(derived)]
#[configurable(metadata(docs::advanced))]
#[serde(default = "default_endpoint_target")]
pub endpoint_target: EndpointTarget,
}
const fn default_endpoint_target() -> EndpointTarget {
EndpointTarget::Event
}
impl GenerateConfig for HecLogsSinkConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
default_token: "${VECTOR_SPLUNK_HEC_TOKEN}".to_owned().into(),
endpoint: "endpoint".to_owned(),
host_key: None,
indexed_fields: vec![],
index: None,
sourcetype: None,
source: None,
encoding: TextSerializerConfig::default().into(),
compression: Compression::default(),
batch: BatchConfig::default(),
request: TowerRequestConfig::default(),
tls: None,
acknowledgements: Default::default(),
timestamp_nanos_key: None,
timestamp_key: None,
auto_extract_timestamp: None,
endpoint_target: EndpointTarget::Event,
})
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "splunk_hec_logs")]
impl SinkConfig for HecLogsSinkConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
if self.auto_extract_timestamp.is_some() && self.endpoint_target == EndpointTarget::Raw {
return Err("`auto_extract_timestamp` cannot be set for the `raw` endpoint.".into());
}
let client = create_client(&self.tls, cx.proxy())?;
let healthcheck = build_healthcheck(
self.endpoint.clone(),
self.default_token.inner().to_owned(),
client.clone(),
)
.boxed();
let sink = self.build_processor(client, cx)?;
Ok((sink, healthcheck))
}
fn input(&self) -> Input {
Input::new(self.encoding.config().input_type() & DataType::Log)
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements.inner
}
}
impl HecLogsSinkConfig {
pub fn build_processor(&self, client: HttpClient, _: SinkContext) -> crate::Result<VectorSink> {
let ack_client = if self.acknowledgements.indexer_acknowledgements_enabled {
Some(client.clone())
} else {
None
};
let transformer = self.encoding.transformer();
let serializer = self.encoding.build()?;
let encoder = Encoder::<()>::new(serializer);
let encoder = HecLogsEncoder {
transformer,
encoder,
auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
};
let request_builder = HecLogsRequestBuilder {
encoder,
compression: self.compression,
};
let request_settings = self.request.into_settings();
let http_request_builder = Arc::new(HttpRequestBuilder::new(
self.endpoint.clone(),
self.endpoint_target,
self.default_token.inner().to_owned(),
self.compression,
));
let http_service = ServiceBuilder::new()
.settings(request_settings, HttpRetryLogic)
.service(build_http_batch_service(
client,
Arc::clone(&http_request_builder),
self.endpoint_target,
self.auto_extract_timestamp.unwrap_or_default(),
));
let service = HecService::new(
http_service,
ack_client,
http_request_builder,
self.acknowledgements.clone(),
);
let batch_settings = self.batch.into_batcher_settings()?;
let sink = HecLogsSink {
service,
request_builder,
batch_settings,
sourcetype: self.sourcetype.clone(),
source: self.source.clone(),
index: self.index.clone(),
indexed_fields: self
.indexed_fields
.iter()
.map(|config_path| config_path.0.clone())
.collect(),
host_key: self.host_key.clone(),
timestamp_nanos_key: self.timestamp_nanos_key.clone(),
timestamp_key: self.timestamp_key.clone(),
endpoint_target: self.endpoint_target,
auto_extract_timestamp: self.auto_extract_timestamp.unwrap_or_default(),
};
Ok(VectorSink::from_event_streamsink(sink))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::components::validation::prelude::*;
use vector_lib::{
codecs::{encoding::format::JsonSerializerOptions, JsonSerializerConfig, MetricTagValues},
config::LogNamespace,
};
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<HecLogsSinkConfig>();
}
impl ValidatableComponent for HecLogsSinkConfig {
fn validation_configuration() -> ValidationConfiguration {
let endpoint = "http://127.0.0.1:9001".to_string();
let mut batch = BatchConfig::default();
batch.max_events = Some(1);
let config = Self {
endpoint: endpoint.clone(),
default_token: "i_am_an_island".to_string().into(),
host_key: None,
indexed_fields: vec![],
index: None,
sourcetype: None,
source: None,
encoding: EncodingConfig::new(
JsonSerializerConfig::new(
MetricTagValues::Full,
JsonSerializerOptions::default(),
)
.into(),
Transformer::default(),
),
compression: Compression::default(),
batch,
request: TowerRequestConfig {
timeout_secs: 2,
retry_attempts: 0,
..Default::default()
},
tls: None,
acknowledgements: HecClientAcknowledgementsConfig {
indexer_acknowledgements_enabled: false,
..Default::default()
},
timestamp_nanos_key: None,
timestamp_key: None,
auto_extract_timestamp: None,
endpoint_target: EndpointTarget::Raw,
};
let endpoint = format!("{endpoint}/services/collector/raw");
let external_resource = ExternalResource::new(
ResourceDirection::Push,
HttpResourceConfig::from_parts(
http::Uri::try_from(&endpoint).expect("should not fail to parse URI"),
None,
),
config.encoding.clone(),
);
ValidationConfiguration::from_sink(
Self::NAME,
LogNamespace::Legacy,
vec![ComponentTestCaseConfig::from_sink(
config,
None,
Some(external_resource),
)],
)
}
}
register_validatable_component!(HecLogsSinkConfig);
}