1use vector_lib::codecs::JsonSerializerConfig;
2use vector_lib::configurable::configurable_component;
3use vector_lib::lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath};
4use vector_lib::sensitive_string::SensitiveString;
5
6use super::config_host_key_target_path;
7use crate::sinks::splunk_hec::common::config_timestamp_key_target_path;
8use crate::{
9 codecs::EncodingConfig,
10 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
11 sinks::{
12 splunk_hec::{
13 common::{
14 acknowledgements::HecClientAcknowledgementsConfig, EndpointTarget,
15 SplunkHecDefaultBatchSettings,
16 },
17 logs::config::HecLogsSinkConfig,
18 },
19 util::{BatchConfig, Compression, TowerRequestConfig},
20 Healthcheck, VectorSink,
21 },
22 template::Template,
23 tls::TlsConfig,
24};
25
26pub(super) const HOST: &str = "https://cloud.humio.com";
27
28#[configurable_component(sink("humio_logs", "Deliver log event data to Humio."))]
30#[derive(Clone, Debug)]
31#[serde(deny_unknown_fields)]
32pub struct HumioLogsConfig {
33 #[configurable(metadata(
35 docs::examples = "${HUMIO_TOKEN}",
36 docs::examples = "A94A8FE5CCB19BA61C4C08"
37 ))]
38 pub token: SensitiveString,
39
40 #[serde(alias = "host")]
47 #[serde(default = "default_endpoint")]
48 #[configurable(metadata(
49 docs::examples = "http://127.0.0.1",
50 docs::examples = "https://example.com",
51 ))]
52 pub endpoint: String,
53
54 pub source: Option<Template>,
58
59 #[configurable(derived)]
60 pub encoding: EncodingConfig,
61
62 #[configurable(metadata(
66 docs::examples = "json",
67 docs::examples = "none",
68 docs::examples = "{{ event_type }}"
69 ))]
70 pub event_type: Option<Template>,
71
72 #[serde(default = "config_host_key_target_path")]
79 pub host_key: OptionalTargetPath,
80
81 #[serde(default)]
89 pub indexed_fields: Vec<ConfigValuePath>,
90
91 #[serde(default)]
101 #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
102 pub index: Option<Template>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 pub compression: Compression,
107
108 #[configurable(derived)]
109 #[serde(default)]
110 pub request: TowerRequestConfig,
111
112 #[configurable(derived)]
113 #[serde(default)]
114 pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
115
116 #[configurable(derived)]
117 pub tls: Option<TlsConfig>,
118
119 #[serde(default = "timestamp_nanos_key")]
121 pub timestamp_nanos_key: Option<String>,
122
123 #[configurable(derived)]
124 #[serde(
125 default,
126 deserialize_with = "crate::serde::bool_or_struct",
127 skip_serializing_if = "crate::serde::is_default"
128 )]
129 pub acknowledgements: AcknowledgementsConfig,
130
131 #[serde(default = "config_timestamp_key_target_path")]
139 pub timestamp_key: OptionalTargetPath,
140}
141
142fn default_endpoint() -> String {
143 HOST.to_string()
144}
145
146pub fn timestamp_nanos_key() -> Option<String> {
147 Some("@timestamp.nanos".to_string())
148}
149
150impl GenerateConfig for HumioLogsConfig {
151 fn generate_config() -> toml::Value {
152 toml::Value::try_from(Self {
153 token: "${HUMIO_TOKEN}".to_owned().into(),
154 endpoint: default_endpoint(),
155 source: None,
156 encoding: JsonSerializerConfig::default().into(),
157 event_type: None,
158 indexed_fields: vec![],
159 index: None,
160 host_key: config_host_key_target_path(),
161 compression: Compression::default(),
162 request: TowerRequestConfig::default(),
163 batch: BatchConfig::default(),
164 tls: None,
165 timestamp_nanos_key: None,
166 acknowledgements: Default::default(),
167 timestamp_key: config_timestamp_key_target_path(),
168 })
169 .unwrap()
170 }
171}
172
173#[async_trait::async_trait]
174#[typetag::serde(name = "humio_logs")]
175impl SinkConfig for HumioLogsConfig {
176 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
177 self.build_hec_config().build(cx).await
178 }
179
180 fn input(&self) -> Input {
181 Input::new(self.encoding.config().input_type() & DataType::Log)
182 }
183
184 fn acknowledgements(&self) -> &AcknowledgementsConfig {
185 &self.acknowledgements
186 }
187}
188
189impl HumioLogsConfig {
190 fn build_hec_config(&self) -> HecLogsSinkConfig {
191 HecLogsSinkConfig {
192 default_token: self.token.clone(),
193 endpoint: self.endpoint.clone(),
194 host_key: Some(self.host_key.clone()),
195 indexed_fields: self.indexed_fields.clone(),
196 index: self.index.clone(),
197 sourcetype: self.event_type.clone(),
198 source: self.source.clone(),
199 timestamp_nanos_key: self.timestamp_nanos_key.clone(),
200 encoding: self.encoding.clone(),
201 compression: self.compression,
202 batch: self.batch,
203 request: self.request,
204 tls: self.tls.clone(),
205 acknowledgements: HecClientAcknowledgementsConfig {
206 indexer_acknowledgements_enabled: false,
207 ..Default::default()
208 },
209 timestamp_key: Some(config_timestamp_key_target_path()),
210 endpoint_target: EndpointTarget::Event,
211 auto_extract_timestamp: None,
212 }
213 }
214}
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn generate_config() {
222 crate::test_util::test_generate_config::<HumioLogsConfig>();
223 }
224}
225
226#[cfg(test)]
227#[cfg(feature = "humio-integration-tests")]
228mod integration_tests {
229 use chrono::{TimeZone, Utc};
230 use futures::{future::ready, stream};
231 use indoc::indoc;
232 use serde::Deserialize;
233 use serde_json::{json, Value as JsonValue};
234 use std::{collections::HashMap, convert::TryFrom};
235 use tokio::time::Duration;
236
237 use super::*;
238 use crate::{
239 config::{log_schema, SinkConfig, SinkContext},
240 event::LogEvent,
241 sinks::util::Compression,
242 test_util::{
243 components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
244 random_string,
245 },
246 };
247
248 fn humio_address() -> String {
249 std::env::var("HUMIO_ADDRESS").unwrap_or_else(|_| "http://localhost:8080".into())
250 }
251
252 #[tokio::test]
253 async fn humio_insert_message() {
254 wait_ready().await;
255
256 let cx = SinkContext::default();
257
258 let repo = create_repository().await;
259
260 let config = config(&repo.default_ingest_token);
261
262 let (sink, _) = config.build(cx).await.unwrap();
263
264 let message = random_string(100);
265 let host = "192.168.1.1".to_string();
266 let mut event = LogEvent::from(message.clone());
267 event.insert(log_schema().host_key_target_path().unwrap(), host.clone());
268
269 let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
270 event.insert(log_schema().timestamp_key_target_path().unwrap(), ts);
271
272 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
273
274 let entry = find_entry(repo.name.as_str(), message.as_str()).await;
275
276 assert_eq!(
277 message,
278 entry
279 .fields
280 .get("message")
281 .expect("no message key")
282 .as_str()
283 .unwrap()
284 );
285 assert!(
286 entry.error.is_none(),
287 "Humio encountered an error parsing this message: {}",
288 entry
289 .error_msg
290 .unwrap_or_else(|| "no error message".to_string())
291 );
292 assert_eq!(Some(host), entry.host);
293 assert_eq!("132456", entry.timestamp_nanos);
294 }
295
296 #[tokio::test]
297 async fn humio_insert_source() {
298 wait_ready().await;
299
300 let cx = SinkContext::default();
301
302 let repo = create_repository().await;
303
304 let mut config = config(&repo.default_ingest_token);
305 config.source = Template::try_from("/var/log/syslog".to_string()).ok();
306
307 let (sink, _) = config.build(cx).await.unwrap();
308
309 let message = random_string(100);
310 let event = LogEvent::from(message.clone());
311 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
312
313 let entry = find_entry(repo.name.as_str(), message.as_str()).await;
314
315 assert_eq!(entry.source, Some("/var/log/syslog".to_owned()));
316 assert!(
317 entry.error.is_none(),
318 "Humio encountered an error parsing this message: {}",
319 entry
320 .error_msg
321 .unwrap_or_else(|| "no error message".to_string())
322 );
323 }
324
325 #[tokio::test]
326 async fn humio_type() {
327 wait_ready().await;
328
329 let repo = create_repository().await;
330
331 {
333 let mut config = config(&repo.default_ingest_token);
334 config.event_type = Template::try_from("json".to_string()).ok();
335
336 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
337
338 let message = random_string(100);
339 let mut event = LogEvent::from(message.clone());
340 event.insert("@timestamp", Utc::now().to_rfc3339());
343
344 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
345
346 let entry = find_entry(repo.name.as_str(), message.as_str()).await;
347
348 assert_eq!(entry.humio_type, "json");
349 assert!(
350 entry.error.is_none(),
351 "Humio encountered an error parsing this message: {}",
352 entry
353 .error_msg
354 .unwrap_or_else(|| "no error message".to_string())
355 );
356 }
357
358 {
360 let config = config(&repo.default_ingest_token);
361
362 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
363
364 let message = random_string(100);
365 let event = LogEvent::from(message.clone());
366
367 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
368
369 let entry = find_entry(repo.name.as_str(), message.as_str()).await;
370
371 assert_eq!(entry.humio_type, "none");
372 }
373 }
374
375 fn config(token: &str) -> super::HumioLogsConfig {
377 let mut batch = BatchConfig::default();
378 batch.max_events = Some(1);
379
380 HumioLogsConfig {
381 token: token.to_string().into(),
382 endpoint: humio_address(),
383 source: None,
384 encoding: JsonSerializerConfig::default().into(),
385 event_type: None,
386 host_key: OptionalTargetPath {
387 path: log_schema().host_key_target_path().cloned(),
388 },
389 indexed_fields: vec![],
390 index: None,
391 compression: Compression::None,
392 request: TowerRequestConfig::default(),
393 batch,
394 tls: None,
395 timestamp_nanos_key: timestamp_nanos_key(),
396 acknowledgements: Default::default(),
397 timestamp_key: Default::default(),
398 }
399 }
400
401 async fn wait_ready() {
402 crate::test_util::retry_until(
403 || async {
404 reqwest::get(format!("{}/api/v1/status", humio_address()))
405 .await
406 .map_err(|err| err.to_string())
407 .and_then(|res| {
408 if res.status().is_success() {
409 Ok(())
410 } else {
411 Err("server not ready...".into())
412 }
413 })
414 },
415 Duration::from_secs(1),
416 Duration::from_secs(30),
417 )
418 .await;
419 }
420
421 async fn create_repository() -> HumioRepository {
423 let client = reqwest::Client::builder().build().unwrap();
424
425 let graphql_url = format!("{}/graphql", humio_address());
427
428 let name = random_string(50);
429
430 let params = json!({
431 "query": format!(
432 indoc!{ r#"
433 mutation {{
434 createRepository(name:"{}") {{
435 repository {{
436 name
437 type
438 ingestTokens {{
439 name
440 token
441 }}
442 }}
443 }}
444 }}
445 "#},
446 name
447 ),
448 });
449
450 let res = client
451 .post(&graphql_url)
452 .json(¶ms)
453 .send()
454 .await
455 .unwrap();
456
457 let json: JsonValue = res.json().await.unwrap();
458 let repository = &json["data"]["createRepository"]["repository"];
459
460 let token = repository["ingestTokens"].as_array().unwrap()[0]["token"]
461 .as_str()
462 .unwrap()
463 .to_string();
464
465 HumioRepository {
466 name: repository["name"].as_str().unwrap().to_string(),
467 default_ingest_token: token,
468 }
469 }
470
471 async fn find_entry(repository_name: &str, message: &str) -> HumioLog {
473 let client = reqwest::Client::builder().build().unwrap();
474
475 let search_url = format!(
477 "{}/api/v1/repositories/{}/query",
478 humio_address(),
479 repository_name
480 );
481 let search_query = format!(r#"message="{message}""#);
482
483 for _ in 0..200usize {
486 let res = client
487 .post(&search_url)
488 .json(&json!({
489 "queryString": search_query,
490 }))
491 .header(reqwest::header::ACCEPT, "application/json")
492 .send()
493 .await
494 .unwrap();
495
496 let logs: Vec<HumioLog> = res.json().await.unwrap();
497
498 if !logs.is_empty() {
499 return logs[0].clone();
500 }
501 }
502 panic!("did not find event in Humio repository {repository_name} with message {message}");
503 }
504
505 #[derive(Debug)]
506 struct HumioRepository {
507 name: String,
508 default_ingest_token: String,
509 }
510
511 #[derive(Clone, Deserialize)]
512 #[allow(dead_code)] struct HumioLog {
514 #[serde(rename = "#repo")]
515 humio_repo: String,
516
517 #[serde(rename = "#type")]
518 humio_type: String,
519
520 #[serde(rename = "@error")]
521 error: Option<String>,
522
523 #[serde(rename = "@error_msg")]
524 error_msg: Option<String>,
525
526 #[serde(rename = "@rawstring")]
527 rawstring: String,
528
529 #[serde(rename = "@id")]
530 id: String,
531
532 #[serde(rename = "@timestamp")]
533 timestamp_millis: u64,
534
535 #[serde(rename = "@timestamp.nanos")]
536 timestamp_nanos: String,
537
538 #[serde(rename = "@timezone")]
539 timezone: String,
540
541 #[serde(rename = "@source")]
542 source: Option<String>,
543
544 #[serde(rename = "@host")]
545 host: Option<String>,
546
547 #[serde(flatten)]
549 fields: HashMap<String, JsonValue>,
550 }
551}