1use vector_lib::{
2 codecs::JsonSerializerConfig,
3 configurable::configurable_component,
4 lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath},
5 sensitive_string::SensitiveString,
6};
7
8use super::config_host_key_target_path;
9use crate::{
10 codecs::EncodingConfig,
11 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
12 sinks::{
13 Healthcheck, VectorSink,
14 splunk_hec::{
15 common::{
16 EndpointTarget, SplunkHecDefaultBatchSettings,
17 acknowledgements::HecClientAcknowledgementsConfig,
18 config_timestamp_key_target_path,
19 },
20 logs::config::HecLogsSinkConfig,
21 },
22 util::{BatchConfig, Compression, TowerRequestConfig},
23 },
24 template::Template,
25 tls::TlsConfig,
26};
27
28pub(super) const HOST: &str = "https://cloud.humio.com";
29
30#[configurable_component(sink("humio_logs", "Deliver log event data to Humio."))]
32#[derive(Clone, Debug)]
33#[serde(deny_unknown_fields)]
34pub struct HumioLogsConfig {
35 #[configurable(metadata(
37 docs::examples = "${HUMIO_TOKEN}",
38 docs::examples = "A94A8FE5CCB19BA61C4C08"
39 ))]
40 pub token: SensitiveString,
41
42 #[serde(alias = "host")]
49 #[serde(default = "default_endpoint")]
50 #[configurable(metadata(
51 docs::examples = "http://127.0.0.1",
52 docs::examples = "https://example.com",
53 ))]
54 pub endpoint: String,
55
56 pub source: Option<Template>,
60
61 #[configurable(derived)]
62 pub encoding: EncodingConfig,
63
64 #[configurable(metadata(
68 docs::examples = "json",
69 docs::examples = "none",
70 docs::examples = "{{ event_type }}"
71 ))]
72 pub event_type: Option<Template>,
73
74 #[serde(default = "config_host_key_target_path")]
81 pub host_key: OptionalTargetPath,
82
83 #[serde(default)]
91 pub indexed_fields: Vec<ConfigValuePath>,
92
93 #[serde(default)]
103 #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
104 pub index: Option<Template>,
105
106 #[configurable(derived)]
107 #[serde(default)]
108 pub compression: Compression,
109
110 #[configurable(derived)]
111 #[serde(default)]
112 pub request: TowerRequestConfig,
113
114 #[configurable(derived)]
115 #[serde(default)]
116 pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
117
118 #[configurable(derived)]
119 pub tls: Option<TlsConfig>,
120
121 #[serde(default = "timestamp_nanos_key")]
123 pub timestamp_nanos_key: Option<String>,
124
125 #[configurable(derived)]
126 #[serde(
127 default,
128 deserialize_with = "crate::serde::bool_or_struct",
129 skip_serializing_if = "crate::serde::is_default"
130 )]
131 pub acknowledgements: AcknowledgementsConfig,
132
133 #[serde(default = "config_timestamp_key_target_path")]
141 pub timestamp_key: OptionalTargetPath,
142}
143
144fn default_endpoint() -> String {
145 HOST.to_string()
146}
147
148pub fn timestamp_nanos_key() -> Option<String> {
149 Some("@timestamp.nanos".to_string())
150}
151
152impl GenerateConfig for HumioLogsConfig {
153 fn generate_config() -> toml::Value {
154 toml::Value::try_from(Self {
155 token: "${HUMIO_TOKEN}".to_owned().into(),
156 endpoint: default_endpoint(),
157 source: None,
158 encoding: JsonSerializerConfig::default().into(),
159 event_type: None,
160 indexed_fields: vec![],
161 index: None,
162 host_key: config_host_key_target_path(),
163 compression: Compression::default(),
164 request: TowerRequestConfig::default(),
165 batch: BatchConfig::default(),
166 tls: None,
167 timestamp_nanos_key: None,
168 acknowledgements: Default::default(),
169 timestamp_key: config_timestamp_key_target_path(),
170 })
171 .unwrap()
172 }
173}
174
175#[async_trait::async_trait]
176#[typetag::serde(name = "humio_logs")]
177impl SinkConfig for HumioLogsConfig {
178 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
179 self.build_hec_config().build(cx).await
180 }
181
182 fn input(&self) -> Input {
183 Input::new(self.encoding.config().input_type() & DataType::Log)
184 }
185
186 fn acknowledgements(&self) -> &AcknowledgementsConfig {
187 &self.acknowledgements
188 }
189}
190
191impl HumioLogsConfig {
192 fn build_hec_config(&self) -> HecLogsSinkConfig {
193 HecLogsSinkConfig {
194 default_token: self.token.clone(),
195 endpoint: self.endpoint.clone(),
196 host_key: Some(self.host_key.clone()),
197 indexed_fields: self.indexed_fields.clone(),
198 index: self.index.clone(),
199 sourcetype: self.event_type.clone(),
200 source: self.source.clone(),
201 timestamp_nanos_key: self.timestamp_nanos_key.clone(),
202 encoding: self.encoding.clone(),
203 compression: self.compression,
204 batch: self.batch,
205 request: self.request,
206 tls: self.tls.clone(),
207 acknowledgements: HecClientAcknowledgementsConfig {
208 indexer_acknowledgements_enabled: false,
209 ..Default::default()
210 },
211 timestamp_key: Some(config_timestamp_key_target_path()),
212 endpoint_target: EndpointTarget::Event,
213 auto_extract_timestamp: None,
214 }
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 #[test]
223 fn generate_config() {
224 crate::test_util::test_generate_config::<HumioLogsConfig>();
225 }
226}
227
228#[cfg(test)]
229#[cfg(feature = "humio-integration-tests")]
230mod integration_tests {
231 use std::{collections::HashMap, convert::TryFrom};
232
233 use chrono::{TimeZone, Utc};
234 use futures::{future::ready, stream};
235 use indoc::indoc;
236 use serde::Deserialize;
237 use serde_json::{Value as JsonValue, json};
238 use tokio::time::Duration;
239
240 use super::*;
241 use crate::{
242 config::{SinkConfig, SinkContext, log_schema},
243 event::LogEvent,
244 sinks::util::Compression,
245 test_util::{
246 components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
247 random_string,
248 },
249 };
250
251 fn humio_address() -> String {
252 std::env::var("HUMIO_ADDRESS").unwrap_or_else(|_| "http://localhost:8080".into())
253 }
254
255 #[tokio::test]
256 async fn humio_insert_message() {
257 wait_ready().await;
258
259 let cx = SinkContext::default();
260
261 let repo = create_repository().await;
262
263 let config = config(&repo.default_ingest_token);
264
265 let (sink, _) = config.build(cx).await.unwrap();
266
267 let message = random_string(100);
268 let host = "192.168.1.1".to_string();
269 let mut event = LogEvent::from(message.clone());
270 event.insert(log_schema().host_key_target_path().unwrap(), host.clone());
271
272 let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
273 event.insert(log_schema().timestamp_key_target_path().unwrap(), ts);
274
275 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
276
277 let entry = find_entry(repo.name.as_str(), message.as_str()).await;
278
279 assert_eq!(
280 message,
281 entry
282 .fields
283 .get("message")
284 .expect("no message key")
285 .as_str()
286 .unwrap()
287 );
288 assert!(
289 entry.error.is_none(),
290 "Humio encountered an error parsing this message: {}",
291 entry
292 .error_msg
293 .unwrap_or_else(|| "no error message".to_string())
294 );
295 assert_eq!(Some(host), entry.host);
296 assert_eq!("132456", entry.timestamp_nanos);
297 }
298
299 #[tokio::test]
300 async fn humio_insert_source() {
301 wait_ready().await;
302
303 let cx = SinkContext::default();
304
305 let repo = create_repository().await;
306
307 let mut config = config(&repo.default_ingest_token);
308 config.source = Template::try_from("/var/log/syslog".to_string()).ok();
309
310 let (sink, _) = config.build(cx).await.unwrap();
311
312 let message = random_string(100);
313 let event = LogEvent::from(message.clone());
314 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
315
316 let entry = find_entry(repo.name.as_str(), message.as_str()).await;
317
318 assert_eq!(entry.source, Some("/var/log/syslog".to_owned()));
319 assert!(
320 entry.error.is_none(),
321 "Humio encountered an error parsing this message: {}",
322 entry
323 .error_msg
324 .unwrap_or_else(|| "no error message".to_string())
325 );
326 }
327
328 #[tokio::test]
329 async fn humio_type() {
330 wait_ready().await;
331
332 let repo = create_repository().await;
333
334 {
336 let mut config = config(&repo.default_ingest_token);
337 config.event_type = Template::try_from("json".to_string()).ok();
338
339 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
340
341 let message = random_string(100);
342 let mut event = LogEvent::from(message.clone());
343 event.insert("@timestamp", Utc::now().to_rfc3339());
346
347 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
348
349 let entry = find_entry(repo.name.as_str(), message.as_str()).await;
350
351 assert_eq!(entry.humio_type, "json");
352 assert!(
353 entry.error.is_none(),
354 "Humio encountered an error parsing this message: {}",
355 entry
356 .error_msg
357 .unwrap_or_else(|| "no error message".to_string())
358 );
359 }
360
361 {
363 let config = config(&repo.default_ingest_token);
364
365 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
366
367 let message = random_string(100);
368 let event = LogEvent::from(message.clone());
369
370 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
371
372 let entry = find_entry(repo.name.as_str(), message.as_str()).await;
373
374 assert_eq!(entry.humio_type, "none");
375 }
376 }
377
378 fn config(token: &str) -> super::HumioLogsConfig {
380 let mut batch = BatchConfig::default();
381 batch.max_events = Some(1);
382
383 HumioLogsConfig {
384 token: token.to_string().into(),
385 endpoint: humio_address(),
386 source: None,
387 encoding: JsonSerializerConfig::default().into(),
388 event_type: None,
389 host_key: OptionalTargetPath {
390 path: log_schema().host_key_target_path().cloned(),
391 },
392 indexed_fields: vec![],
393 index: None,
394 compression: Compression::None,
395 request: TowerRequestConfig::default(),
396 batch,
397 tls: None,
398 timestamp_nanos_key: timestamp_nanos_key(),
399 acknowledgements: Default::default(),
400 timestamp_key: Default::default(),
401 }
402 }
403
404 async fn wait_ready() {
405 crate::test_util::retry_until(
406 || async {
407 reqwest::get(format!("{}/api/v1/status", humio_address()))
408 .await
409 .map_err(|err| err.to_string())
410 .and_then(|res| {
411 if res.status().is_success() {
412 Ok(())
413 } else {
414 Err("server not ready...".into())
415 }
416 })
417 },
418 Duration::from_secs(1),
419 Duration::from_secs(30),
420 )
421 .await;
422 }
423
424 async fn create_repository() -> HumioRepository {
426 let client = reqwest::Client::builder().build().unwrap();
427
428 let graphql_url = format!("{}/graphql", humio_address());
430
431 let name = random_string(50);
432
433 let params = json!({
434 "query": format!(
435 indoc!{ r#"
436 mutation {{
437 createRepository(name:"{}") {{
438 repository {{
439 name
440 type
441 ingestTokens {{
442 name
443 token
444 }}
445 }}
446 }}
447 }}
448 "#},
449 name
450 ),
451 });
452
453 let res = client
454 .post(&graphql_url)
455 .json(¶ms)
456 .send()
457 .await
458 .unwrap();
459
460 let json: JsonValue = res.json().await.unwrap();
461 let repository = &json["data"]["createRepository"]["repository"];
462
463 let token = repository["ingestTokens"].as_array().unwrap()[0]["token"]
464 .as_str()
465 .unwrap()
466 .to_string();
467
468 HumioRepository {
469 name: repository["name"].as_str().unwrap().to_string(),
470 default_ingest_token: token,
471 }
472 }
473
474 async fn find_entry(repository_name: &str, message: &str) -> HumioLog {
476 let client = reqwest::Client::builder().build().unwrap();
477
478 let search_url = format!(
480 "{}/api/v1/repositories/{}/query",
481 humio_address(),
482 repository_name
483 );
484 let search_query = format!(r#"message="{message}""#);
485
486 for _ in 0..200usize {
489 let res = client
490 .post(&search_url)
491 .json(&json!({
492 "queryString": search_query,
493 }))
494 .header(reqwest::header::ACCEPT, "application/json")
495 .send()
496 .await
497 .unwrap();
498
499 let logs: Vec<HumioLog> = res.json().await.unwrap();
500
501 if !logs.is_empty() {
502 return logs[0].clone();
503 }
504 }
505 panic!("did not find event in Humio repository {repository_name} with message {message}");
506 }
507
508 #[derive(Debug)]
509 struct HumioRepository {
510 name: String,
511 default_ingest_token: String,
512 }
513
514 #[derive(Clone, Deserialize)]
515 #[allow(dead_code)] struct HumioLog {
517 #[serde(rename = "#repo")]
518 humio_repo: String,
519
520 #[serde(rename = "#type")]
521 humio_type: String,
522
523 #[serde(rename = "@error")]
524 error: Option<String>,
525
526 #[serde(rename = "@error_msg")]
527 error_msg: Option<String>,
528
529 #[serde(rename = "@rawstring")]
530 rawstring: String,
531
532 #[serde(rename = "@id")]
533 id: String,
534
535 #[serde(rename = "@timestamp")]
536 timestamp_millis: u64,
537
538 #[serde(rename = "@timestamp.nanos")]
539 timestamp_nanos: String,
540
541 #[serde(rename = "@timezone")]
542 timezone: String,
543
544 #[serde(rename = "@source")]
545 source: Option<String>,
546
547 #[serde(rename = "@host")]
548 host: Option<String>,
549
550 #[serde(flatten)]
552 fields: HashMap<String, JsonValue>,
553 }
554}