use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath};
use vector_lib::sensitive_string::SensitiveString;
use super::config_host_key_target_path;
use crate::sinks::splunk_hec::common::config_timestamp_key_target_path;
use crate::{
codecs::EncodingConfig,
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
sinks::{
splunk_hec::{
common::{
acknowledgements::HecClientAcknowledgementsConfig, EndpointTarget,
SplunkHecDefaultBatchSettings,
},
logs::config::HecLogsSinkConfig,
},
util::{BatchConfig, Compression, TowerRequestConfig},
Healthcheck, VectorSink,
},
template::Template,
tls::TlsConfig,
};
pub(super) const HOST: &str = "https://cloud.humio.com";
#[configurable_component(sink("humio_logs", "Deliver log event data to Humio."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct HumioLogsConfig {
#[configurable(metadata(
docs::examples = "${HUMIO_TOKEN}",
docs::examples = "A94A8FE5CCB19BA61C4C08"
))]
pub(super) token: SensitiveString,
#[serde(alias = "host")]
#[serde(default = "default_endpoint")]
#[configurable(metadata(
docs::examples = "http://127.0.0.1",
docs::examples = "https://example.com",
))]
pub(super) endpoint: String,
pub(super) source: Option<Template>,
#[configurable(derived)]
pub(super) encoding: EncodingConfig,
#[configurable(metadata(
docs::examples = "json",
docs::examples = "none",
docs::examples = "{{ event_type }}"
))]
pub(super) event_type: Option<Template>,
#[serde(default = "config_host_key_target_path")]
pub(super) host_key: OptionalTargetPath,
#[serde(default)]
pub(super) indexed_fields: Vec<ConfigValuePath>,
#[serde(default)]
#[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
pub(super) index: Option<Template>,
#[configurable(derived)]
#[serde(default)]
pub(super) compression: Compression,
#[configurable(derived)]
#[serde(default)]
pub(super) request: TowerRequestConfig,
#[configurable(derived)]
#[serde(default)]
pub(super) batch: BatchConfig<SplunkHecDefaultBatchSettings>,
#[configurable(derived)]
pub(super) tls: Option<TlsConfig>,
#[serde(default = "timestamp_nanos_key")]
pub(super) timestamp_nanos_key: Option<String>,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
#[serde(default = "config_timestamp_key_target_path")]
pub(super) timestamp_key: OptionalTargetPath,
}
fn default_endpoint() -> String {
HOST.to_string()
}
pub fn timestamp_nanos_key() -> Option<String> {
Some("@timestamp.nanos".to_string())
}
impl GenerateConfig for HumioLogsConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
token: "${HUMIO_TOKEN}".to_owned().into(),
endpoint: default_endpoint(),
source: None,
encoding: JsonSerializerConfig::default().into(),
event_type: None,
indexed_fields: vec![],
index: None,
host_key: config_host_key_target_path(),
compression: Compression::default(),
request: TowerRequestConfig::default(),
batch: BatchConfig::default(),
tls: None,
timestamp_nanos_key: None,
acknowledgements: Default::default(),
timestamp_key: config_timestamp_key_target_path(),
})
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "humio_logs")]
impl SinkConfig for HumioLogsConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
self.build_hec_config().build(cx).await
}
fn input(&self) -> Input {
Input::new(self.encoding.config().input_type() & DataType::Log)
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
impl HumioLogsConfig {
fn build_hec_config(&self) -> HecLogsSinkConfig {
HecLogsSinkConfig {
default_token: self.token.clone(),
endpoint: self.endpoint.clone(),
host_key: Some(self.host_key.clone()),
indexed_fields: self.indexed_fields.clone(),
index: self.index.clone(),
sourcetype: self.event_type.clone(),
source: self.source.clone(),
timestamp_nanos_key: self.timestamp_nanos_key.clone(),
encoding: self.encoding.clone(),
compression: self.compression,
batch: self.batch,
request: self.request,
tls: self.tls.clone(),
acknowledgements: HecClientAcknowledgementsConfig {
indexer_acknowledgements_enabled: false,
..Default::default()
},
timestamp_key: Some(config_timestamp_key_target_path()),
endpoint_target: EndpointTarget::Event,
auto_extract_timestamp: None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<HumioLogsConfig>();
}
}
#[cfg(test)]
#[cfg(feature = "humio-integration-tests")]
mod integration_tests {
use chrono::{TimeZone, Utc};
use futures::{future::ready, stream};
use indoc::indoc;
use serde::Deserialize;
use serde_json::{json, Value as JsonValue};
use std::{collections::HashMap, convert::TryFrom};
use tokio::time::Duration;
use super::*;
use crate::{
config::{log_schema, SinkConfig, SinkContext},
event::LogEvent,
sinks::util::Compression,
test_util::{
components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
random_string,
},
};
fn humio_address() -> String {
std::env::var("HUMIO_ADDRESS").unwrap_or_else(|_| "http://localhost:8080".into())
}
#[tokio::test]
async fn humio_insert_message() {
wait_ready().await;
let cx = SinkContext::default();
let repo = create_repository().await;
let config = config(&repo.default_ingest_token);
let (sink, _) = config.build(cx).await.unwrap();
let message = random_string(100);
let host = "192.168.1.1".to_string();
let mut event = LogEvent::from(message.clone());
event.insert(log_schema().host_key_target_path().unwrap(), host.clone());
let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
event.insert(log_schema().timestamp_key_target_path().unwrap(), ts);
run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
let entry = find_entry(repo.name.as_str(), message.as_str()).await;
assert_eq!(
message,
entry
.fields
.get("message")
.expect("no message key")
.as_str()
.unwrap()
);
assert!(
entry.error.is_none(),
"Humio encountered an error parsing this message: {}",
entry
.error_msg
.unwrap_or_else(|| "no error message".to_string())
);
assert_eq!(Some(host), entry.host);
assert_eq!("132456", entry.timestamp_nanos);
}
#[tokio::test]
async fn humio_insert_source() {
wait_ready().await;
let cx = SinkContext::default();
let repo = create_repository().await;
let mut config = config(&repo.default_ingest_token);
config.source = Template::try_from("/var/log/syslog".to_string()).ok();
let (sink, _) = config.build(cx).await.unwrap();
let message = random_string(100);
let event = LogEvent::from(message.clone());
run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
let entry = find_entry(repo.name.as_str(), message.as_str()).await;
assert_eq!(entry.source, Some("/var/log/syslog".to_owned()));
assert!(
entry.error.is_none(),
"Humio encountered an error parsing this message: {}",
entry
.error_msg
.unwrap_or_else(|| "no error message".to_string())
);
}
#[tokio::test]
async fn humio_type() {
wait_ready().await;
let repo = create_repository().await;
{
let mut config = config(&repo.default_ingest_token);
config.event_type = Template::try_from("json".to_string()).ok();
let (sink, _) = config.build(SinkContext::default()).await.unwrap();
let message = random_string(100);
let mut event = LogEvent::from(message.clone());
event.insert("@timestamp", Utc::now().to_rfc3339());
run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
let entry = find_entry(repo.name.as_str(), message.as_str()).await;
assert_eq!(entry.humio_type, "json");
assert!(
entry.error.is_none(),
"Humio encountered an error parsing this message: {}",
entry
.error_msg
.unwrap_or_else(|| "no error message".to_string())
);
}
{
let config = config(&repo.default_ingest_token);
let (sink, _) = config.build(SinkContext::default()).await.unwrap();
let message = random_string(100);
let event = LogEvent::from(message.clone());
run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
let entry = find_entry(repo.name.as_str(), message.as_str()).await;
assert_eq!(entry.humio_type, "none");
}
}
fn config(token: &str) -> super::HumioLogsConfig {
let mut batch = BatchConfig::default();
batch.max_events = Some(1);
HumioLogsConfig {
token: token.to_string().into(),
endpoint: humio_address(),
source: None,
encoding: JsonSerializerConfig::default().into(),
event_type: None,
host_key: OptionalTargetPath {
path: log_schema().host_key_target_path().cloned(),
},
indexed_fields: vec![],
index: None,
compression: Compression::None,
request: TowerRequestConfig::default(),
batch,
tls: None,
timestamp_nanos_key: timestamp_nanos_key(),
acknowledgements: Default::default(),
timestamp_key: Default::default(),
}
}
async fn wait_ready() {
crate::test_util::retry_until(
|| async {
reqwest::get(format!("{}/api/v1/status", humio_address()))
.await
.map_err(|err| err.to_string())
.and_then(|res| {
if res.status().is_success() {
Ok(())
} else {
Err("server not ready...".into())
}
})
},
Duration::from_secs(1),
Duration::from_secs(30),
)
.await;
}
async fn create_repository() -> HumioRepository {
let client = reqwest::Client::builder().build().unwrap();
let graphql_url = format!("{}/graphql", humio_address());
let name = random_string(50);
let params = json!({
"query": format!(
indoc!{ r#"
mutation {{
createRepository(name:"{}") {{
repository {{
name
type
ingestTokens {{
name
token
}}
}}
}}
}}
"#},
name
),
});
let res = client
.post(&graphql_url)
.json(¶ms)
.send()
.await
.unwrap();
let json: JsonValue = res.json().await.unwrap();
let repository = &json["data"]["createRepository"]["repository"];
let token = repository["ingestTokens"].as_array().unwrap()[0]["token"]
.as_str()
.unwrap()
.to_string();
HumioRepository {
name: repository["name"].as_str().unwrap().to_string(),
default_ingest_token: token,
}
}
async fn find_entry(repository_name: &str, message: &str) -> HumioLog {
let client = reqwest::Client::builder().build().unwrap();
let search_url = format!(
"{}/api/v1/repositories/{}/query",
humio_address(),
repository_name
);
let search_query = format!(r#"message="{}""#, message);
for _ in 0..200usize {
let res = client
.post(&search_url)
.json(&json!({
"queryString": search_query,
}))
.header(reqwest::header::ACCEPT, "application/json")
.send()
.await
.unwrap();
let logs: Vec<HumioLog> = res.json().await.unwrap();
if !logs.is_empty() {
return logs[0].clone();
}
}
panic!(
"did not find event in Humio repository {} with message {}",
repository_name, message
);
}
#[derive(Debug)]
struct HumioRepository {
name: String,
default_ingest_token: String,
}
#[derive(Clone, Deserialize)]
#[allow(dead_code)] struct HumioLog {
#[serde(rename = "#repo")]
humio_repo: String,
#[serde(rename = "#type")]
humio_type: String,
#[serde(rename = "@error")]
error: Option<String>,
#[serde(rename = "@error_msg")]
error_msg: Option<String>,
#[serde(rename = "@rawstring")]
rawstring: String,
#[serde(rename = "@id")]
id: String,
#[serde(rename = "@timestamp")]
timestamp_millis: u64,
#[serde(rename = "@timestamp.nanos")]
timestamp_nanos: String,
#[serde(rename = "@timezone")]
timezone: String,
#[serde(rename = "@source")]
source: Option<String>,
#[serde(rename = "@host")]
host: Option<String>,
#[serde(flatten)]
fields: HashMap<String, JsonValue>,
}
}