vector/sinks/splunk_hec/common/
util.rs

1use std::{borrow::Cow, sync::Arc};
2
3use bytes::Bytes;
4use futures_util::future::BoxFuture;
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use snafu::{ResultExt, Snafu};
8use vector_lib::{
9    config::proxy::ProxyConfig,
10    event::EventRef,
11    lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath},
12};
13
14use super::{
15    EndpointTarget,
16    request::HecRequest,
17    service::{HttpRequestBuilder, MetadataFields},
18};
19use crate::{
20    http::HttpClient,
21    internal_events::TemplateRenderingError,
22    sinks::{
23        self, UriParseSnafu,
24        util::{SinkBatchSettings, http::HttpBatchService},
25    },
26    template::Template,
27    tls::{TlsConfig, TlsSettings},
28};
29
30#[derive(Clone, Copy, Debug, Default)]
31pub struct SplunkHecDefaultBatchSettings;
32
33impl SinkBatchSettings for SplunkHecDefaultBatchSettings {
34    const MAX_EVENTS: Option<usize> = None;
35    const MAX_BYTES: Option<usize> = Some(1_000_000);
36    const TIMEOUT_SECS: f64 = 1.0;
37}
38
39#[derive(Debug, Snafu)]
40pub enum HealthcheckError {
41    #[snafu(display("Invalid HEC token"))]
42    InvalidToken,
43    #[snafu(display("Queues are full"))]
44    QueuesFull,
45}
46
47pub fn create_client(
48    tls: Option<&TlsConfig>,
49    proxy_config: &ProxyConfig,
50) -> crate::Result<HttpClient> {
51    let tls_settings = TlsSettings::from_options(tls)?;
52    Ok(HttpClient::new(tls_settings, proxy_config)?)
53}
54
55// TODO: `HttpBatchService` has been deprecated for direct use in sinks.
56//       This sink should undergo a refactor to utilize the `HttpService`
57//       instead, which extracts much of the boilerplate code for `Service`.
58pub fn build_http_batch_service(
59    client: HttpClient,
60    http_request_builder: Arc<HttpRequestBuilder>,
61    endpoint_target: EndpointTarget,
62    auto_extract_timestamp: bool,
63) -> HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HecRequest> {
64    HttpBatchService::new(client, move |req: HecRequest| {
65        let request_builder = Arc::clone(&http_request_builder);
66        let future: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
67            Box::pin(async move {
68                request_builder.build_request(
69                    req.body,
70                    match endpoint_target {
71                        EndpointTarget::Event => "/services/collector/event",
72                        EndpointTarget::Raw => "/services/collector/raw",
73                    },
74                    req.passthrough_token,
75                    MetadataFields {
76                        source: req.source,
77                        sourcetype: req.sourcetype,
78                        index: req.index,
79                        host: req.host,
80                    },
81                    auto_extract_timestamp,
82                )
83            });
84        future
85    })
86}
87
88pub async fn build_healthcheck(
89    endpoint: String,
90    token: String,
91    client: HttpClient,
92) -> crate::Result<()> {
93    let uri = build_uri(endpoint.as_str(), "/services/collector/health/1.0", None)
94        .context(UriParseSnafu)?;
95
96    let request = Request::get(uri)
97        .header("Authorization", format!("Splunk {token}"))
98        .body(Body::empty())
99        .unwrap();
100
101    let response = client.send(request).await?;
102    match response.status() {
103        StatusCode::OK => Ok(()),
104        StatusCode::BAD_REQUEST => Err(HealthcheckError::InvalidToken.into()),
105        StatusCode::SERVICE_UNAVAILABLE => Err(HealthcheckError::QueuesFull.into()),
106        other => Err(sinks::HealthcheckError::UnexpectedStatus { status: other }.into()),
107    }
108}
109
110pub fn build_uri(
111    host: &str,
112    path: &str,
113    query: impl IntoIterator<Item = (&'static str, String)>,
114) -> Result<Uri, http::uri::InvalidUri> {
115    let mut uri = format!("{}{}", host.trim_end_matches('/'), path);
116
117    let mut first = true;
118
119    for (key, value) in query.into_iter() {
120        if first {
121            uri.push('?');
122            first = false;
123        } else {
124            uri.push('&');
125        }
126        uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
127            key,
128            percent_encoding::NON_ALPHANUMERIC,
129        )));
130        uri.push('=');
131        uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
132            &value,
133            percent_encoding::NON_ALPHANUMERIC,
134        )));
135    }
136
137    uri.parse::<Uri>()
138}
139
140pub fn config_host_key() -> OptionalValuePath {
141    OptionalValuePath {
142        path: crate::config::log_schema().host_key().cloned(),
143    }
144}
145
146pub fn config_timestamp_key_target_path() -> OptionalTargetPath {
147    OptionalTargetPath {
148        path: crate::config::log_schema()
149            .timestamp_key_target_path()
150            .cloned(),
151    }
152}
153
154pub fn render_template_string<'a>(
155    template: &Template,
156    event: impl Into<EventRef<'a>>,
157    field_name: &str,
158) -> Option<String> {
159    template
160        .render_string(event)
161        .map_err(|error| {
162            emit!(TemplateRenderingError {
163                error,
164                field: Some(field_name),
165                drop_event: false
166            });
167        })
168        .ok()
169}
170
171#[cfg(test)]
172mod tests {
173    use bytes::Bytes;
174    use http::{HeaderValue, Uri};
175    use vector_lib::config::proxy::ProxyConfig;
176    use wiremock::{
177        Mock, MockServer, ResponseTemplate,
178        matchers::{header, method, path},
179    };
180
181    use crate::sinks::{
182        splunk_hec::common::{
183            EndpointTarget, HOST_FIELD, SOURCE_FIELD, build_healthcheck, build_uri, create_client,
184            service::{HttpRequestBuilder, MetadataFields},
185        },
186        util::Compression,
187    };
188
189    #[tokio::test]
190    async fn test_build_healthcheck_200_response_returns_ok() {
191        let mock_server = MockServer::start().await;
192
193        Mock::given(method("GET"))
194            .and(path("/services/collector/health/1.0"))
195            .and(header("Authorization", "Splunk token"))
196            .respond_with(ResponseTemplate::new(200))
197            .mount(&mock_server)
198            .await;
199
200        let client = create_client(None, &ProxyConfig::default()).unwrap();
201        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
202
203        assert!(healthcheck.await.is_ok())
204    }
205
206    #[tokio::test]
207    async fn test_build_healthcheck_400_response_returns_error() {
208        let mock_server = MockServer::start().await;
209
210        Mock::given(method("GET"))
211            .and(path("/services/collector/health/1.0"))
212            .and(header("Authorization", "Splunk token"))
213            .respond_with(ResponseTemplate::new(400))
214            .mount(&mock_server)
215            .await;
216
217        let client = create_client(None, &ProxyConfig::default()).unwrap();
218        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
219
220        assert_eq!(
221            &healthcheck.await.unwrap_err().to_string(),
222            "Invalid HEC token"
223        );
224    }
225
226    #[tokio::test]
227    async fn test_build_healthcheck_503_response_returns_error() {
228        let mock_server = MockServer::start().await;
229
230        Mock::given(method("GET"))
231            .and(path("/services/collector/health/1.0"))
232            .and(header("Authorization", "Splunk token"))
233            .respond_with(ResponseTemplate::new(503))
234            .mount(&mock_server)
235            .await;
236
237        let client = create_client(None, &ProxyConfig::default()).unwrap();
238        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
239
240        assert_eq!(
241            &healthcheck.await.unwrap_err().to_string(),
242            "Queues are full"
243        );
244    }
245
246    #[tokio::test]
247    async fn test_build_healthcheck_500_response_returns_error() {
248        let mock_server = MockServer::start().await;
249
250        Mock::given(method("GET"))
251            .and(path("/services/collector/health/1.0"))
252            .and(header("Authorization", "Splunk token"))
253            .respond_with(ResponseTemplate::new(500))
254            .mount(&mock_server)
255            .await;
256
257        let client = create_client(None, &ProxyConfig::default()).unwrap();
258        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
259
260        assert_eq!(
261            &healthcheck.await.unwrap_err().to_string(),
262            "Unexpected status: 500 Internal Server Error"
263        );
264    }
265
266    #[tokio::test]
267    async fn test_build_request_compression_none_returns_expected_request() {
268        let endpoint = "http://localhost:8888";
269        let token = "token";
270        let compression = Compression::None;
271        let events = Bytes::from("events");
272        let http_request_builder = HttpRequestBuilder::new(
273            String::from(endpoint),
274            EndpointTarget::default(),
275            String::from(token),
276            compression,
277        );
278
279        let request = http_request_builder
280            .build_request(
281                events.clone(),
282                "/services/collector/event",
283                None,
284                MetadataFields::default(),
285                false,
286            )
287            .unwrap();
288
289        assert_eq!(
290            request.uri(),
291            &Uri::from_static("http://localhost:8888/services/collector/event")
292        );
293
294        assert_eq!(
295            request.headers().get("Content-Type"),
296            Some(&HeaderValue::from_static("application/json"))
297        );
298
299        assert_eq!(
300            request.headers().get("Authorization"),
301            Some(&HeaderValue::from_static("Splunk token"))
302        );
303
304        assert_eq!(request.headers().get("Content-Encoding"), None);
305
306        assert_eq!(request.body(), &events)
307    }
308
309    #[tokio::test]
310    async fn test_build_request_compression_gzip_returns_expected_request() {
311        let endpoint = "http://localhost:8888";
312        let token = "token";
313        let compression = Compression::gzip_default();
314        let events = Bytes::from("events");
315        let http_request_builder = HttpRequestBuilder::new(
316            String::from(endpoint),
317            EndpointTarget::default(),
318            String::from(token),
319            compression,
320        );
321
322        let request = http_request_builder
323            .build_request(
324                events.clone(),
325                "/services/collector/event",
326                None,
327                MetadataFields::default(),
328                false,
329            )
330            .unwrap();
331
332        assert_eq!(
333            request.uri(),
334            &Uri::from_static("http://localhost:8888/services/collector/event")
335        );
336
337        assert_eq!(
338            request.headers().get("Content-Type"),
339            Some(&HeaderValue::from_static("application/json"))
340        );
341
342        assert_eq!(
343            request.headers().get("Authorization"),
344            Some(&HeaderValue::from_static("Splunk token"))
345        );
346
347        assert_eq!(
348            request.headers().get("Content-Encoding"),
349            Some(&HeaderValue::from_static("gzip"))
350        );
351
352        assert_eq!(request.body(), &events)
353    }
354
355    #[tokio::test]
356    async fn test_build_request_uri_invalid_uri_returns_error() {
357        let endpoint = "invalid";
358        let token = "token";
359        let compression = Compression::gzip_default();
360        let events = Bytes::from("events");
361        let http_request_builder = HttpRequestBuilder::new(
362            String::from(endpoint),
363            EndpointTarget::default(),
364            String::from(token),
365            compression,
366        );
367
368        let err = http_request_builder
369            .build_request(
370                events,
371                "/services/collector/event",
372                None,
373                MetadataFields::default(),
374                false,
375            )
376            .unwrap_err();
377        assert_eq!(err.to_string(), "URI parse error: invalid format")
378    }
379
380    #[test]
381    fn test_build_uri() {
382        let query = [
383            (HOST_FIELD, "zork flork".to_string()),
384            (SOURCE_FIELD, "zam".to_string()),
385        ];
386        let uri = build_uri("http://sproink.com", "/thing/thang", query).unwrap();
387
388        assert_eq!(
389            "http://sproink.com/thing/thang?host=zork%20flork&source=zam"
390                .parse::<Uri>()
391                .unwrap(),
392            uri
393        );
394    }
395}
396
397#[cfg(all(test, feature = "splunk-integration-tests"))]
398mod integration_tests {
399    use std::net::SocketAddr;
400
401    use http::StatusCode;
402    use tokio::time::Duration;
403    use vector_lib::config::proxy::ProxyConfig;
404    use warp::Filter;
405
406    use super::{
407        build_healthcheck, create_client,
408        integration_test_helpers::{get_token, splunk_hec_address},
409    };
410    use crate::{
411        assert_downcast_matches, sinks::splunk_hec::common::HealthcheckError,
412        test_util::retry_until,
413    };
414
415    #[tokio::test]
416    async fn splunk_healthcheck_ok() {
417        let client = create_client(None, &ProxyConfig::default()).unwrap();
418        let address = splunk_hec_address();
419        let token = get_token().await;
420
421        retry_until(
422            || build_healthcheck(address.clone(), token.clone(), client.clone()),
423            Duration::from_millis(500),
424            Duration::from_secs(30),
425        )
426        .await;
427    }
428
429    #[tokio::test]
430    async fn splunk_healthcheck_server_not_listening() {
431        let client = create_client(None, &ProxyConfig::default()).unwrap();
432        let healthcheck = build_healthcheck(
433            "http://localhost:1111/".to_string(),
434            get_token().await,
435            client,
436        );
437
438        healthcheck.await.unwrap_err();
439    }
440
441    #[tokio::test]
442    async fn splunk_healthcheck_server_unavailable() {
443        let client = create_client(None, &ProxyConfig::default()).unwrap();
444        let healthcheck = build_healthcheck(
445            "http://localhost:5503/".to_string(),
446            get_token().await,
447            client,
448        );
449
450        let unhealthy = warp::any()
451            .map(|| warp::reply::with_status("i'm sad", StatusCode::SERVICE_UNAVAILABLE));
452        let server = warp::serve(unhealthy).bind("0.0.0.0:5503".parse::<SocketAddr>().unwrap());
453        tokio::spawn(server);
454
455        assert_downcast_matches!(
456            healthcheck.await.unwrap_err(),
457            HealthcheckError,
458            HealthcheckError::QueuesFull
459        );
460    }
461}
462
463#[cfg(all(test, feature = "splunk-integration-tests"))]
464pub mod integration_test_helpers {
465    use serde_json::Value as JsonValue;
466    use tokio::time::Duration;
467
468    use crate::test_util::retry_until;
469
470    const USERNAME: &str = "admin";
471    const PASSWORD: &str = "password";
472
473    pub fn splunk_hec_address() -> String {
474        std::env::var("SPLUNK_HEC_ADDRESS").unwrap_or_else(|_| "http://localhost:8088".into())
475    }
476
477    pub fn splunk_api_address() -> String {
478        std::env::var("SPLUNK_API_ADDRESS").unwrap_or_else(|_| "https://localhost:8089".into())
479    }
480
481    pub async fn get_token() -> String {
482        let client = reqwest::Client::builder()
483            .danger_accept_invalid_certs(true)
484            .build()
485            .unwrap();
486
487        let res = retry_until(
488            || {
489                client
490                    .get(format!(
491                        "{}/services/data/inputs/http?output_mode=json",
492                        splunk_api_address()
493                    ))
494                    .basic_auth(USERNAME, Some(PASSWORD))
495                    .send()
496            },
497            Duration::from_millis(500),
498            Duration::from_secs(30),
499        )
500        .await;
501
502        let json: JsonValue = res.json().await.unwrap();
503        let entries = json["entry"].as_array().unwrap().clone();
504
505        if entries.is_empty() {
506            panic!("You don't have any HTTP Event Collector inputs set up in Splunk");
507        }
508
509        entries[0]["content"]["token"].as_str().unwrap().to_owned()
510    }
511}