vector/sinks/humio/
logs.rs

1use vector_lib::codecs::JsonSerializerConfig;
2use vector_lib::configurable::configurable_component;
3use vector_lib::lookup::lookup_v2::{ConfigValuePath, OptionalTargetPath};
4use vector_lib::sensitive_string::SensitiveString;
5
6use super::config_host_key_target_path;
7use crate::sinks::splunk_hec::common::config_timestamp_key_target_path;
8use crate::{
9    codecs::EncodingConfig,
10    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
11    sinks::{
12        splunk_hec::{
13            common::{
14                acknowledgements::HecClientAcknowledgementsConfig, EndpointTarget,
15                SplunkHecDefaultBatchSettings,
16            },
17            logs::config::HecLogsSinkConfig,
18        },
19        util::{BatchConfig, Compression, TowerRequestConfig},
20        Healthcheck, VectorSink,
21    },
22    template::Template,
23    tls::TlsConfig,
24};
25
26pub(super) const HOST: &str = "https://cloud.humio.com";
27
28/// Configuration for the `humio_logs` sink.
29#[configurable_component(sink("humio_logs", "Deliver log event data to Humio."))]
30#[derive(Clone, Debug)]
31#[serde(deny_unknown_fields)]
32pub struct HumioLogsConfig {
33    /// The Humio ingestion token.
34    #[configurable(metadata(
35        docs::examples = "${HUMIO_TOKEN}",
36        docs::examples = "A94A8FE5CCB19BA61C4C08"
37    ))]
38    pub token: SensitiveString,
39
40    /// The base URL of the Humio instance.
41    ///
42    /// The scheme (`http` or `https`) must be specified. No path should be included since the paths defined
43    /// by the [`Splunk`][splunk] API are used.
44    ///
45    /// [splunk]: https://docs.splunk.com/Documentation/Splunk/8.0.0/Data/HECRESTendpoints
46    #[serde(alias = "host")]
47    #[serde(default = "default_endpoint")]
48    #[configurable(metadata(
49        docs::examples = "http://127.0.0.1",
50        docs::examples = "https://example.com",
51    ))]
52    pub endpoint: String,
53
54    /// The source of events sent to this sink.
55    ///
56    /// Typically the filename the logs originated from. Maps to `@source` in Humio.
57    pub source: Option<Template>,
58
59    #[configurable(derived)]
60    pub encoding: EncodingConfig,
61
62    /// The type of events sent to this sink. Humio uses this as the name of the parser to use to ingest the data.
63    ///
64    /// If unset, Humio defaults it to none.
65    #[configurable(metadata(
66        docs::examples = "json",
67        docs::examples = "none",
68        docs::examples = "{{ event_type }}"
69    ))]
70    pub event_type: Option<Template>,
71
72    /// Overrides the name of the log field used to retrieve the hostname to send to Humio.
73    ///
74    /// By default, the [global `log_schema.host_key` option][global_host_key] is used if log
75    /// events are Legacy namespaced, or the semantic meaning of "host" is used, if defined.
76    ///
77    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
78    #[serde(default = "config_host_key_target_path")]
79    pub host_key: OptionalTargetPath,
80
81    /// Event fields to be added to Humio’s extra fields.
82    ///
83    /// Can be used to tag events by specifying fields starting with `#`.
84    ///
85    /// For more information, see [Humio’s Format of Data][humio_data_format].
86    ///
87    /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
88    #[serde(default)]
89    pub indexed_fields: Vec<ConfigValuePath>,
90
91    /// Optional name of the repository to ingest into.
92    ///
93    /// In public-facing APIs, this must (if present) be equal to the repository used to create the ingest token used for authentication.
94    ///
95    /// In private cluster setups, Humio can be configured to allow these to be different.
96    ///
97    /// For more information, see [Humio’s Format of Data][humio_data_format].
98    ///
99    /// [humio_data_format]: https://docs.humio.com/integrations/data-shippers/hec/#format-of-data
100    #[serde(default)]
101    #[configurable(metadata(docs::examples = "{{ host }}", docs::examples = "custom_index"))]
102    pub index: Option<Template>,
103
104    #[configurable(derived)]
105    #[serde(default)]
106    pub compression: Compression,
107
108    #[configurable(derived)]
109    #[serde(default)]
110    pub request: TowerRequestConfig,
111
112    #[configurable(derived)]
113    #[serde(default)]
114    pub batch: BatchConfig<SplunkHecDefaultBatchSettings>,
115
116    #[configurable(derived)]
117    pub tls: Option<TlsConfig>,
118
119    /// Overrides the name of the log field used to retrieve the nanosecond-enabled timestamp to send to Humio.
120    #[serde(default = "timestamp_nanos_key")]
121    pub timestamp_nanos_key: Option<String>,
122
123    #[configurable(derived)]
124    #[serde(
125        default,
126        deserialize_with = "crate::serde::bool_or_struct",
127        skip_serializing_if = "crate::serde::is_default"
128    )]
129    pub acknowledgements: AcknowledgementsConfig,
130
131    /// Overrides the name of the log field used to retrieve the timestamp to send to Humio.
132    /// When set to `“”`, a timestamp is not set in the events sent to Humio.
133    ///
134    /// By default, either the [global `log_schema.timestamp_key` option][global_timestamp_key] is used
135    /// if log events are Legacy namespaced, or the semantic meaning of "timestamp" is used, if defined.
136    ///
137    /// [global_timestamp_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.timestamp_key
138    #[serde(default = "config_timestamp_key_target_path")]
139    pub timestamp_key: OptionalTargetPath,
140}
141
142fn default_endpoint() -> String {
143    HOST.to_string()
144}
145
146pub fn timestamp_nanos_key() -> Option<String> {
147    Some("@timestamp.nanos".to_string())
148}
149
150impl GenerateConfig for HumioLogsConfig {
151    fn generate_config() -> toml::Value {
152        toml::Value::try_from(Self {
153            token: "${HUMIO_TOKEN}".to_owned().into(),
154            endpoint: default_endpoint(),
155            source: None,
156            encoding: JsonSerializerConfig::default().into(),
157            event_type: None,
158            indexed_fields: vec![],
159            index: None,
160            host_key: config_host_key_target_path(),
161            compression: Compression::default(),
162            request: TowerRequestConfig::default(),
163            batch: BatchConfig::default(),
164            tls: None,
165            timestamp_nanos_key: None,
166            acknowledgements: Default::default(),
167            timestamp_key: config_timestamp_key_target_path(),
168        })
169        .unwrap()
170    }
171}
172
173#[async_trait::async_trait]
174#[typetag::serde(name = "humio_logs")]
175impl SinkConfig for HumioLogsConfig {
176    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
177        self.build_hec_config().build(cx).await
178    }
179
180    fn input(&self) -> Input {
181        Input::new(self.encoding.config().input_type() & DataType::Log)
182    }
183
184    fn acknowledgements(&self) -> &AcknowledgementsConfig {
185        &self.acknowledgements
186    }
187}
188
189impl HumioLogsConfig {
190    fn build_hec_config(&self) -> HecLogsSinkConfig {
191        HecLogsSinkConfig {
192            default_token: self.token.clone(),
193            endpoint: self.endpoint.clone(),
194            host_key: Some(self.host_key.clone()),
195            indexed_fields: self.indexed_fields.clone(),
196            index: self.index.clone(),
197            sourcetype: self.event_type.clone(),
198            source: self.source.clone(),
199            timestamp_nanos_key: self.timestamp_nanos_key.clone(),
200            encoding: self.encoding.clone(),
201            compression: self.compression,
202            batch: self.batch,
203            request: self.request,
204            tls: self.tls.clone(),
205            acknowledgements: HecClientAcknowledgementsConfig {
206                indexer_acknowledgements_enabled: false,
207                ..Default::default()
208            },
209            timestamp_key: Some(config_timestamp_key_target_path()),
210            endpoint_target: EndpointTarget::Event,
211            auto_extract_timestamp: None,
212        }
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    #[test]
221    fn generate_config() {
222        crate::test_util::test_generate_config::<HumioLogsConfig>();
223    }
224}
225
226#[cfg(test)]
227#[cfg(feature = "humio-integration-tests")]
228mod integration_tests {
229    use chrono::{TimeZone, Utc};
230    use futures::{future::ready, stream};
231    use indoc::indoc;
232    use serde::Deserialize;
233    use serde_json::{json, Value as JsonValue};
234    use std::{collections::HashMap, convert::TryFrom};
235    use tokio::time::Duration;
236
237    use super::*;
238    use crate::{
239        config::{log_schema, SinkConfig, SinkContext},
240        event::LogEvent,
241        sinks::util::Compression,
242        test_util::{
243            components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
244            random_string,
245        },
246    };
247
248    fn humio_address() -> String {
249        std::env::var("HUMIO_ADDRESS").unwrap_or_else(|_| "http://localhost:8080".into())
250    }
251
252    #[tokio::test]
253    async fn humio_insert_message() {
254        wait_ready().await;
255
256        let cx = SinkContext::default();
257
258        let repo = create_repository().await;
259
260        let config = config(&repo.default_ingest_token);
261
262        let (sink, _) = config.build(cx).await.unwrap();
263
264        let message = random_string(100);
265        let host = "192.168.1.1".to_string();
266        let mut event = LogEvent::from(message.clone());
267        event.insert(log_schema().host_key_target_path().unwrap(), host.clone());
268
269        let ts = Utc.timestamp_nanos(Utc::now().timestamp_millis() * 1_000_000 + 132_456);
270        event.insert(log_schema().timestamp_key_target_path().unwrap(), ts);
271
272        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
273
274        let entry = find_entry(repo.name.as_str(), message.as_str()).await;
275
276        assert_eq!(
277            message,
278            entry
279                .fields
280                .get("message")
281                .expect("no message key")
282                .as_str()
283                .unwrap()
284        );
285        assert!(
286            entry.error.is_none(),
287            "Humio encountered an error parsing this message: {}",
288            entry
289                .error_msg
290                .unwrap_or_else(|| "no error message".to_string())
291        );
292        assert_eq!(Some(host), entry.host);
293        assert_eq!("132456", entry.timestamp_nanos);
294    }
295
296    #[tokio::test]
297    async fn humio_insert_source() {
298        wait_ready().await;
299
300        let cx = SinkContext::default();
301
302        let repo = create_repository().await;
303
304        let mut config = config(&repo.default_ingest_token);
305        config.source = Template::try_from("/var/log/syslog".to_string()).ok();
306
307        let (sink, _) = config.build(cx).await.unwrap();
308
309        let message = random_string(100);
310        let event = LogEvent::from(message.clone());
311        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
312
313        let entry = find_entry(repo.name.as_str(), message.as_str()).await;
314
315        assert_eq!(entry.source, Some("/var/log/syslog".to_owned()));
316        assert!(
317            entry.error.is_none(),
318            "Humio encountered an error parsing this message: {}",
319            entry
320                .error_msg
321                .unwrap_or_else(|| "no error message".to_string())
322        );
323    }
324
325    #[tokio::test]
326    async fn humio_type() {
327        wait_ready().await;
328
329        let repo = create_repository().await;
330
331        // sets type
332        {
333            let mut config = config(&repo.default_ingest_token);
334            config.event_type = Template::try_from("json".to_string()).ok();
335
336            let (sink, _) = config.build(SinkContext::default()).await.unwrap();
337
338            let message = random_string(100);
339            let mut event = LogEvent::from(message.clone());
340            // Humio expects to find an @timestamp field for JSON lines
341            // https://docs.humio.com/ingesting-data/parsers/built-in-parsers/#json
342            event.insert("@timestamp", Utc::now().to_rfc3339());
343
344            run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
345
346            let entry = find_entry(repo.name.as_str(), message.as_str()).await;
347
348            assert_eq!(entry.humio_type, "json");
349            assert!(
350                entry.error.is_none(),
351                "Humio encountered an error parsing this message: {}",
352                entry
353                    .error_msg
354                    .unwrap_or_else(|| "no error message".to_string())
355            );
356        }
357
358        // defaults to none
359        {
360            let config = config(&repo.default_ingest_token);
361
362            let (sink, _) = config.build(SinkContext::default()).await.unwrap();
363
364            let message = random_string(100);
365            let event = LogEvent::from(message.clone());
366
367            run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await;
368
369            let entry = find_entry(repo.name.as_str(), message.as_str()).await;
370
371            assert_eq!(entry.humio_type, "none");
372        }
373    }
374
375    /// create a new test config with the given ingest token
376    fn config(token: &str) -> super::HumioLogsConfig {
377        let mut batch = BatchConfig::default();
378        batch.max_events = Some(1);
379
380        HumioLogsConfig {
381            token: token.to_string().into(),
382            endpoint: humio_address(),
383            source: None,
384            encoding: JsonSerializerConfig::default().into(),
385            event_type: None,
386            host_key: OptionalTargetPath {
387                path: log_schema().host_key_target_path().cloned(),
388            },
389            indexed_fields: vec![],
390            index: None,
391            compression: Compression::None,
392            request: TowerRequestConfig::default(),
393            batch,
394            tls: None,
395            timestamp_nanos_key: timestamp_nanos_key(),
396            acknowledgements: Default::default(),
397            timestamp_key: Default::default(),
398        }
399    }
400
401    async fn wait_ready() {
402        crate::test_util::retry_until(
403            || async {
404                reqwest::get(format!("{}/api/v1/status", humio_address()))
405                    .await
406                    .map_err(|err| err.to_string())
407                    .and_then(|res| {
408                        if res.status().is_success() {
409                            Ok(())
410                        } else {
411                            Err("server not ready...".into())
412                        }
413                    })
414            },
415            Duration::from_secs(1),
416            Duration::from_secs(30),
417        )
418        .await;
419    }
420
421    /// create a new test humio repository to publish to
422    async fn create_repository() -> HumioRepository {
423        let client = reqwest::Client::builder().build().unwrap();
424
425        // https://docs.humio.com/api/graphql/
426        let graphql_url = format!("{}/graphql", humio_address());
427
428        let name = random_string(50);
429
430        let params = json!({
431        "query": format!(
432            indoc!{ r#"
433                mutation {{
434                  createRepository(name:"{}") {{
435                    repository {{
436                      name
437                      type
438                      ingestTokens {{
439                        name
440                        token
441                      }}
442                    }}
443                  }}
444                }}
445            "#},
446            name
447        ),
448        });
449
450        let res = client
451            .post(&graphql_url)
452            .json(&params)
453            .send()
454            .await
455            .unwrap();
456
457        let json: JsonValue = res.json().await.unwrap();
458        let repository = &json["data"]["createRepository"]["repository"];
459
460        let token = repository["ingestTokens"].as_array().unwrap()[0]["token"]
461            .as_str()
462            .unwrap()
463            .to_string();
464
465        HumioRepository {
466            name: repository["name"].as_str().unwrap().to_string(),
467            default_ingest_token: token,
468        }
469    }
470
471    /// fetch event from the repository that has a matching message value
472    async fn find_entry(repository_name: &str, message: &str) -> HumioLog {
473        let client = reqwest::Client::builder().build().unwrap();
474
475        // https://docs.humio.com/api/using-the-search-api-with-humio
476        let search_url = format!(
477            "{}/api/v1/repositories/{}/query",
478            humio_address(),
479            repository_name
480        );
481        let search_query = format!(r#"message="{message}""#);
482
483        // events are not available to search API immediately
484        // poll up 200 times for event to show up
485        for _ in 0..200usize {
486            let res = client
487                .post(&search_url)
488                .json(&json!({
489                    "queryString": search_query,
490                }))
491                .header(reqwest::header::ACCEPT, "application/json")
492                .send()
493                .await
494                .unwrap();
495
496            let logs: Vec<HumioLog> = res.json().await.unwrap();
497
498            if !logs.is_empty() {
499                return logs[0].clone();
500            }
501        }
502        panic!("did not find event in Humio repository {repository_name} with message {message}");
503    }
504
505    #[derive(Debug)]
506    struct HumioRepository {
507        name: String,
508        default_ingest_token: String,
509    }
510
511    #[derive(Clone, Deserialize)]
512    #[allow(dead_code)] // deserialize all fields
513    struct HumioLog {
514        #[serde(rename = "#repo")]
515        humio_repo: String,
516
517        #[serde(rename = "#type")]
518        humio_type: String,
519
520        #[serde(rename = "@error")]
521        error: Option<String>,
522
523        #[serde(rename = "@error_msg")]
524        error_msg: Option<String>,
525
526        #[serde(rename = "@rawstring")]
527        rawstring: String,
528
529        #[serde(rename = "@id")]
530        id: String,
531
532        #[serde(rename = "@timestamp")]
533        timestamp_millis: u64,
534
535        #[serde(rename = "@timestamp.nanos")]
536        timestamp_nanos: String,
537
538        #[serde(rename = "@timezone")]
539        timezone: String,
540
541        #[serde(rename = "@source")]
542        source: Option<String>,
543
544        #[serde(rename = "@host")]
545        host: Option<String>,
546
547        // fields parsed from ingested log
548        #[serde(flatten)]
549        fields: HashMap<String, JsonValue>,
550    }
551}