vector/sinks/humio/
logs.rs

1use vector_lib::{
2    codecs::JsonSerializerConfig,
3    configurable::configurable_component,
4    lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath},
5    sensitive_string::SensitiveString,
6};
7
8use super::config_host_key_target_path;
9use crate::{
10    codecs::EncodingConfig,
11    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
12    sinks::{
13        Healthcheck, VectorSink,
14        splunk_hec::{
15            common::{
16                EndpointTarget, SplunkHecDefaultBatchSettings,
17                acknowledgements::HecClientAcknowledgementsConfig,
18                config_timestamp_key_target_path,
19            },
20            logs::config::HecLogsSinkConfig,
21        },
22        util::{BatchConfig, Compression, TowerRequestConfig},
23    },
24    template::Template,
25    tls::TlsConfig,
26};
27
28pub(super) const HOST: &str = "https://cloud.humio.com";
29
30/// Configuration for the `humio_logs` sink.
31#[configurable_component(sink("humio_logs", "Deliver log event data to Humio."))]
32#[derive(Clone, Debug)]
33#[serde(deny_unknown_fields)]
34pub struct HumioLogsConfig {
35    /// The Humio ingestion token.
36    #[configurable(metadata(
37        docs::examples = "${HUMIO_TOKEN}",
38        docs::examples = "A94A8FE5CCB19BA61C4C08"
39    ))]
40    pub token: SensitiveString,
41
42    /// The base URL of the Humio instance.
43    ///
44    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
45    /// by the [`Splunk`][splunk] API are used.
46    ///
47    /// [splunk]: https://docs.splunk.com/Documentation/Splunk/8.0.0/Data/HECRESTendpoints
48    #[serde(alias = "host")]
49    #[serde(default = "default_endpoint")]
50    #[configurable(metadata(
51        docs::examples = "http://127.0.0.1",
52        docs::examples = "https://example.com",
53    ))]
54    pub endpoint: String,
55
56    /// The source of events sent to this sink.
57    ///
58    /// Typically the filename the logs originated from. Maps to `@source` in Humio.
59    pub source: Option<Template>,
60
61    #[configurable(derived)]
62    pub encoding: EncodingConfig,
63
64    /// The type of events sent to this sink. Humio uses this as the name of the parser to use to ingest the data.
65    ///
66    /// If unset, Humio defaults it to none.
67    #[configurable(metadata(
68        docs::examples = "json",
69        docs::examples = "none",
70        docs::examples = "{{ event_type }}"
71    ))]
72    pub event_type: Option<Template>,
73
74    /// Overrides the name of the log field used to retrieve the hostname to send to Humio.
75    ///
76    /// By default, the [global `log_schema.host_key` option][global_host_key] is used if log
77    /// events are Legacy namespaced, or the semantic meaning of "host" is used, if defined.
78    ///
79    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
80    #[serde(default = "config_host_key_target_path")]
81    pub host_key: OptionalTargetPath,
82
83    /// Event fields to be added to Humio’s extra fields.
84    ///
85    /// Can be used to tag events by specifying fields starting with `#`.
86    ///
87    /// For more information, see [Humio’s Format of Data][humio_data_format].
88    ///
89    /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
90    #[serde(default)]
91    pub indexed_fields: Vec<ConfigValuePath>,
92
93    /// Optional name of the repository to ingest into.
94    ///
95    /// In public-facing APIs, this must (if present) be equal to the repository used to create the ingest token used for authentication.
96    ///
97    /// In private cluster setups, Humio can be configured to allow these to be different.
98    ///
99    /// For more information, see [Humio’s Format of Data][humio_data_format].
100    ///
101    /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
102    #[serde(default)]
103    #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
104    pub index: Option<Template>,
105
106    #[configurable(derived)]
107    #[serde(default)]
108    pub compression: Compression,
109
110    #[configurable(derived)]
111    #[serde(default)]
112    pub request: TowerRequestConfig,
113
114    #[configurable(derived)]
115    #[serde(default)]
116    pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
117
118    #[configurable(derived)]
119    pub tls: Option<TlsConfig>,
120
121    /// Overrides the name of the log field used to retrieve the nanosecond-enabled timestamp to send to Humio.
122    #[serde(default = "timestamp_nanos_key")]
123    pub timestamp_nanos_key: Option<String>,
124
125    #[configurable(derived)]
126    #[serde(
127        default,
128        deserialize_with = "crate::serde::bool_or_struct",
129        skip_serializing_if = "crate::serde::is_default"
130    )]
131    pub acknowledgements: AcknowledgementsConfig,
132
133    /// Overrides the name of the log field used to retrieve the timestamp to send to Humio.
134    /// When set to `“”`, a timestamp is not set in the events sent to Humio.
135    ///
136    /// By default, either the [global `log_schema.timestamp_key` option][global_timestamp_key] is used
137    /// if log events are Legacy namespaced, or the semantic meaning of "timestamp" is used, if defined.
138    ///
139    /// [global_timestamp_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.timestamp_key
140    #[serde(default = "config_timestamp_key_target_path")]
141    pub timestamp_key: OptionalTargetPath,
142}
143
144fn default_endpoint() -> String {
145    HOST.to_string()
146}
147
148pub fn timestamp_nanos_key() -> Option<String> {
149    Some("@timestamp.nanos".to_string())
150}
151
152impl GenerateConfig for HumioLogsConfig {
153    fn generate_config() -> toml::Value {
154        toml::Value::try_from(Self {
155            token: "${HUMIO_TOKEN}".to_owned().into(),
156            endpoint: default_endpoint(),
157            source: None,
158            encoding: JsonSerializerConfig::default().into(),
159            event_type: None,
160            indexed_fields: vec![],
161            index: None,
162            host_key: config_host_key_target_path(),
163            compression: Compression::default(),
164            request: TowerRequestConfig::default(),
165            batch: BatchConfig::default(),
166            tls: None,
167            timestamp_nanos_key: None,
168            acknowledgements: Default::default(),
169            timestamp_key: config_timestamp_key_target_path(),
170        })
171        .unwrap()
172    }
173}
174
175#[async_trait::async_trait]
176#[typetag::serde(name = "humio_logs")]
177impl SinkConfig for HumioLogsConfig {
178    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
179        self.build_hec_config().build(cx).await
180    }
181
182    fn input(&self) -> Input {
183        Input::new(self.encoding.config().input_type() & DataType::Log)
184    }
185
186    fn acknowledgements(&self) -> &AcknowledgementsConfig {
187        &self.acknowledgements
188    }
189}
190
191impl HumioLogsConfig {
192    fn build_hec_config(&self) -> HecLogsSinkConfig {
193        HecLogsSinkConfig {
194            default_token: self.token.clone(),
195            endpoint: self.endpoint.clone(),
196            host_key: Some(self.host_key.clone()),
197            indexed_fields: self.indexed_fields.clone(),
198            index: self.index.clone(),
199            sourcetype: self.event_type.clone(),
200            source: self.source.clone(),
201            timestamp_nanos_key: self.timestamp_nanos_key.clone(),
202            encoding: self.encoding.clone(),
203            compression: self.compression,
204            batch: self.batch,
205            request: self.request,
206            tls: self.tls.clone(),
207            acknowledgements: HecClientAcknowledgementsConfig {
208                indexer_acknowledgements_enabled: false,
209                ..Default::default()
210            },
211            timestamp_key: Some(config_timestamp_key_target_path()),
212            endpoint_target: EndpointTarget::Event,
213            auto_extract_timestamp: None,
214        }
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221
222    #[test]
223    fn generate_config() {
224        crate::test_util::test_generate_config::<HumioLogsConfig>();
225    }
226}
227
228#[cfg(test)]
229#[cfg(feature = "humio-integration-tests")]
230mod integration_tests {
231    use std::{collections::HashMap, convert::TryFrom};
232
233    use chrono::{TimeZone, Utc};
234    use futures::{future::ready, stream};
235    use indoc::indoc;
236    use serde::Deserialize;
237    use serde_json::{Value as JsonValue, json};
238    use tokio::time::Duration;
239
240    use super::*;
241    use crate::{
242        config::{SinkConfig, SinkContext, log_schema},
243        event::LogEvent,
244        sinks::util::Compression,
245        test_util::{
246            components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
247            random_string,
248        },
249    };
250
251    fn humio_address() -> String {
252        std::env::var("HUMIO_ADDRESS").unwrap_or_else(|_| "http://localhost:8080".into())
253    }
254
255    #[tokio::test]
256    async fn humio_insert_message() {
257        wait_ready().await;
258
259        let cx = SinkContext::default();
260
261        let repo = create_repository().await;
262
263        let config = config(&repo.default_ingest_token);
264
265        let (sink, _) = config.build(cx).await.unwrap();
266
267        let message = random_string(100);
268        let host = "192.168.1.1".to_string();
269        let mut event = LogEvent::from(message.clone());
270        event.insert(log_schema().host_key_target_path().unwrap(), host.clone());
271
272        let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
273        event.insert(log_schema().timestamp_key_target_path().unwrap(), ts);
274
275        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
276
277        let entry = find_entry(repo.name.as_str(), message.as_str()).await;
278
279        assert_eq!(
280            message,
281            entry
282                .fields
283                .get("message")
284                .expect("no message key")
285                .as_str()
286                .unwrap()
287        );
288        assert!(
289            entry.error.is_none(),
290            "Humio encountered an error parsing this message: {}",
291            entry
292                .error_msg
293                .unwrap_or_else(|| "no error message".to_string())
294        );
295        assert_eq!(Some(host), entry.host);
296        assert_eq!("132456", entry.timestamp_nanos);
297    }
298
299    #[tokio::test]
300    async fn humio_insert_source() {
301        wait_ready().await;
302
303        let cx = SinkContext::default();
304
305        let repo = create_repository().await;
306
307        let mut config = config(&repo.default_ingest_token);
308        config.source = Template::try_from("/var/log/syslog".to_string()).ok();
309
310        let (sink, _) = config.build(cx).await.unwrap();
311
312        let message = random_string(100);
313        let event = LogEvent::from(message.clone());
314        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
315
316        let entry = find_entry(repo.name.as_str(), message.as_str()).await;
317
318        assert_eq!(entry.source, Some("/var/log/syslog".to_owned()));
319        assert!(
320            entry.error.is_none(),
321            "Humio encountered an error parsing this message: {}",
322            entry
323                .error_msg
324                .unwrap_or_else(|| "no error message".to_string())
325        );
326    }
327
328    #[tokio::test]
329    async fn humio_type() {
330        wait_ready().await;
331
332        let repo = create_repository().await;
333
334        // sets type
335        {
336            let mut config = config(&repo.default_ingest_token);
337            config.event_type = Template::try_from("json".to_string()).ok();
338
339            let (sink, _) = config.build(SinkContext::default()).await.unwrap();
340
341            let message = random_string(100);
342            let mut event = LogEvent::from(message.clone());
343            // Humio expects to find an @timestamp field for JSON lines
344            // https://docs.humio.com/ingesting-data/parsers/built-in-parsers/#json
345            event.insert("@timestamp", Utc::now().to_rfc3339());
346
347            run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
348
349            let entry = find_entry(repo.name.as_str(), message.as_str()).await;
350
351            assert_eq!(entry.humio_type, "json");
352            assert!(
353                entry.error.is_none(),
354                "Humio encountered an error parsing this message: {}",
355                entry
356                    .error_msg
357                    .unwrap_or_else(|| "no error message".to_string())
358            );
359        }
360
361        // defaults to none
362        {
363            let config = config(&repo.default_ingest_token);
364
365            let (sink, _) = config.build(SinkContext::default()).await.unwrap();
366
367            let message = random_string(100);
368            let event = LogEvent::from(message.clone());
369
370            run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
371
372            let entry = find_entry(repo.name.as_str(), message.as_str()).await;
373
374            assert_eq!(entry.humio_type, "none");
375        }
376    }
377
378    /// create a new test config with the given ingest token
379    fn config(token: &str) -> super::HumioLogsConfig {
380        let mut batch = BatchConfig::default();
381        batch.max_events = Some(1);
382
383        HumioLogsConfig {
384            token: token.to_string().into(),
385            endpoint: humio_address(),
386            source: None,
387            encoding: JsonSerializerConfig::default().into(),
388            event_type: None,
389            host_key: OptionalTargetPath {
390                path: log_schema().host_key_target_path().cloned(),
391            },
392            indexed_fields: vec![],
393            index: None,
394            compression: Compression::None,
395            request: TowerRequestConfig::default(),
396            batch,
397            tls: None,
398            timestamp_nanos_key: timestamp_nanos_key(),
399            acknowledgements: Default::default(),
400            timestamp_key: Default::default(),
401        }
402    }
403
404    async fn wait_ready() {
405        crate::test_util::retry_until(
406            || async {
407                reqwest::get(format!("{}/api/v1/status", humio_address()))
408                    .await
409                    .map_err(|err| err.to_string())
410                    .and_then(|res| {
411                        if res.status().is_success() {
412                            Ok(())
413                        } else {
414                            Err("server not ready...".into())
415                        }
416                    })
417            },
418            Duration::from_secs(1),
419            Duration::from_secs(30),
420        )
421        .await;
422    }
423
424    /// create a new test humio repository to publish to
425    async fn create_repository() -> HumioRepository {
426        let client = reqwest::Client::builder().build().unwrap();
427
428        // https://docs.humio.com/api/graphql/
429        let graphql_url = format!("{}/graphql", humio_address());
430
431        let name = random_string(50);
432
433        let params = json!({
434        "query": format!(
435            indoc!{ r#"
436                mutation {{
437                  createRepository(name:"{}") {{
438                    repository {{
439                      name
440                      type
441                      ingestTokens {{
442                        name
443                        token
444                      }}
445                    }}
446                  }}
447                }}
448            "#},
449            name
450        ),
451        });
452
453        let res = client
454            .post(&graphql_url)
455            .json(&params)
456            .send()
457            .await
458            .unwrap();
459
460        let json: JsonValue = res.json().await.unwrap();
461        let repository = &json["data"]["createRepository"]["repository"];
462
463        let token = repository["ingestTokens"].as_array().unwrap()[0]["token"]
464            .as_str()
465            .unwrap()
466            .to_string();
467
468        HumioRepository {
469            name: repository["name"].as_str().unwrap().to_string(),
470            default_ingest_token: token,
471        }
472    }
473
474    /// fetch event from the repository that has a matching message value
475    async fn find_entry(repository_name: &str, message: &str) -> HumioLog {
476        let client = reqwest::Client::builder().build().unwrap();
477
478        // https://docs.humio.com/api/using-the-search-api-with-humio
479        let search_url = format!(
480            "{}/api/v1/repositories/{}/query",
481            humio_address(),
482            repository_name
483        );
484        let search_query = format!(r#"message="{message}""#);
485
486        // events are not available to search API immediately
487        // poll up 200 times for event to show up
488        for _ in 0..200usize {
489            let res = client
490                .post(&search_url)
491                .json(&json!({
492                    "queryString": search_query,
493                }))
494                .header(reqwest::header::ACCEPT, "application/json")
495                .send()
496                .await
497                .unwrap();
498
499            let logs: Vec<HumioLog> = res.json().await.unwrap();
500
501            if !logs.is_empty() {
502                return logs[0].clone();
503            }
504        }
505        panic!("did not find event in Humio repository {repository_name} with message {message}");
506    }
507
508    #[derive(Debug)]
509    struct HumioRepository {
510        name: String,
511        default_ingest_token: String,
512    }
513
514    #[derive(Clone, Deserialize)]
515    #[allow(dead_code)] // deserialize all fields
516    struct HumioLog {
517        #[serde(rename = "#repo")]
518        humio_repo: String,
519
520        #[serde(rename = "#type")]
521        humio_type: String,
522
523        #[serde(rename = "@error")]
524        error: Option<String>,
525
526        #[serde(rename = "@error_msg")]
527        error_msg: Option<String>,
528
529        #[serde(rename = "@rawstring")]
530        rawstring: String,
531
532        #[serde(rename = "@id")]
533        id: String,
534
535        #[serde(rename = "@timestamp")]
536        timestamp_millis: u64,
537
538        #[serde(rename = "@timestamp.nanos")]
539        timestamp_nanos: String,
540
541        #[serde(rename = "@timezone")]
542        timezone: String,
543
544        #[serde(rename = "@source")]
545        source: Option<String>,
546
547        #[serde(rename = "@host")]
548        host: Option<String>,
549
550        // fields parsed from ingested log
551        #[serde(flatten)]
552        fields: HashMap<String, JsonValue>,
553    }
554}