vector/sinks/splunk_hec/common/
util.rs

1use std::{borrow::Cow, sync::Arc};
2
3use bytes::Bytes;
4use futures_util::future::BoxFuture;
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use snafu::{ResultExt, Snafu};
8use vector_lib::lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath};
9use vector_lib::{config::proxy::ProxyConfig, event::EventRef};
10
11use super::{
12    request::HecRequest,
13    service::{HttpRequestBuilder, MetadataFields},
14    EndpointTarget,
15};
16use crate::{
17    http::HttpClient,
18    internal_events::TemplateRenderingError,
19    sinks::{
20        self,
21        util::{http::HttpBatchService, SinkBatchSettings},
22        UriParseSnafu,
23    },
24    template::Template,
25    tls::{TlsConfig, TlsSettings},
26};
27
28#[derive(Clone, Copy, Debug, Default)]
29pub struct SplunkHecDefaultBatchSettings;
30
31impl SinkBatchSettings for SplunkHecDefaultBatchSettings {
32    const MAX_EVENTS: Option<usize> = None;
33    const MAX_BYTES: Option<usize> = Some(1_000_000);
34    const TIMEOUT_SECS: f64 = 1.0;
35}
36
37#[derive(Debug, Snafu)]
38pub enum HealthcheckError {
39    #[snafu(display("Invalid HEC token"))]
40    InvalidToken,
41    #[snafu(display("Queues are full"))]
42    QueuesFull,
43}
44
45pub fn create_client(
46    tls: Option<&TlsConfig>,
47    proxy_config: &ProxyConfig,
48) -> crate::Result<HttpClient> {
49    let tls_settings = TlsSettings::from_options(tls)?;
50    Ok(HttpClient::new(tls_settings, proxy_config)?)
51}
52
53// TODO: `HttpBatchService` has been deprecated for direct use in sinks.
54//       This sink should undergo a refactor to utilize the `HttpService`
55//       instead, which extracts much of the boilerplate code for `Service`.
56pub fn build_http_batch_service(
57    client: HttpClient,
58    http_request_builder: Arc<HttpRequestBuilder>,
59    endpoint_target: EndpointTarget,
60    auto_extract_timestamp: bool,
61) -> HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HecRequest> {
62    HttpBatchService::new(client, move |req: HecRequest| {
63        let request_builder = Arc::clone(&http_request_builder);
64        let future: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
65            Box::pin(async move {
66                request_builder.build_request(
67                    req.body,
68                    match endpoint_target {
69                        EndpointTarget::Event => "/services/collector/event",
70                        EndpointTarget::Raw => "/services/collector/raw",
71                    },
72                    req.passthrough_token,
73                    MetadataFields {
74                        source: req.source,
75                        sourcetype: req.sourcetype,
76                        index: req.index,
77                        host: req.host,
78                    },
79                    auto_extract_timestamp,
80                )
81            });
82        future
83    })
84}
85
86pub async fn build_healthcheck(
87    endpoint: String,
88    token: String,
89    client: HttpClient,
90) -> crate::Result<()> {
91    let uri = build_uri(endpoint.as_str(), "/services/collector/health/1.0", None)
92        .context(UriParseSnafu)?;
93
94    let request = Request::get(uri)
95        .header("Authorization", format!("Splunk {token}"))
96        .body(Body::empty())
97        .unwrap();
98
99    let response = client.send(request).await?;
100    match response.status() {
101        StatusCode::OK => Ok(()),
102        StatusCode::BAD_REQUEST => Err(HealthcheckError::InvalidToken.into()),
103        StatusCode::SERVICE_UNAVAILABLE => Err(HealthcheckError::QueuesFull.into()),
104        other => Err(sinks::HealthcheckError::UnexpectedStatus { status: other }.into()),
105    }
106}
107
108pub fn build_uri(
109    host: &str,
110    path: &str,
111    query: impl IntoIterator<Item = (&'static str, String)>,
112) -> Result<Uri, http::uri::InvalidUri> {
113    let mut uri = format!("{}{}", host.trim_end_matches('/'), path);
114
115    let mut first = true;
116
117    for (key, value) in query.into_iter() {
118        if first {
119            uri.push('?');
120            first = false;
121        } else {
122            uri.push('&');
123        }
124        uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
125            key,
126            percent_encoding::NON_ALPHANUMERIC,
127        )));
128        uri.push('=');
129        uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
130            &value,
131            percent_encoding::NON_ALPHANUMERIC,
132        )));
133    }
134
135    uri.parse::<Uri>()
136}
137
138pub fn config_host_key() -> OptionalValuePath {
139    OptionalValuePath {
140        path: crate::config::log_schema().host_key().cloned(),
141    }
142}
143
144pub fn config_timestamp_key_target_path() -> OptionalTargetPath {
145    OptionalTargetPath {
146        path: crate::config::log_schema()
147            .timestamp_key_target_path()
148            .cloned(),
149    }
150}
151
152pub fn render_template_string<'a>(
153    template: &Template,
154    event: impl Into<EventRef<'a>>,
155    field_name: &str,
156) -> Option<String> {
157    template
158        .render_string(event)
159        .map_err(|error| {
160            emit!(TemplateRenderingError {
161                error,
162                field: Some(field_name),
163                drop_event: false
164            });
165        })
166        .ok()
167}
168
169#[cfg(test)]
170mod tests {
171    use bytes::Bytes;
172    use http::{HeaderValue, Uri};
173    use vector_lib::config::proxy::ProxyConfig;
174    use wiremock::{
175        matchers::{header, method, path},
176        Mock, MockServer, ResponseTemplate,
177    };
178
179    use crate::sinks::{
180        splunk_hec::common::{
181            build_healthcheck, build_uri, create_client,
182            service::{HttpRequestBuilder, MetadataFields},
183            EndpointTarget, HOST_FIELD, SOURCE_FIELD,
184        },
185        util::Compression,
186    };
187
188    #[tokio::test]
189    async fn test_build_healthcheck_200_response_returns_ok() {
190        let mock_server = MockServer::start().await;
191
192        Mock::given(method("GET"))
193            .and(path("/services/collector/health/1.0"))
194            .and(header("Authorization", "Splunk token"))
195            .respond_with(ResponseTemplate::new(200))
196            .mount(&mock_server)
197            .await;
198
199        let client = create_client(None, &ProxyConfig::default()).unwrap();
200        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
201
202        assert!(healthcheck.await.is_ok())
203    }
204
205    #[tokio::test]
206    async fn test_build_healthcheck_400_response_returns_error() {
207        let mock_server = MockServer::start().await;
208
209        Mock::given(method("GET"))
210            .and(path("/services/collector/health/1.0"))
211            .and(header("Authorization", "Splunk token"))
212            .respond_with(ResponseTemplate::new(400))
213            .mount(&mock_server)
214            .await;
215
216        let client = create_client(None, &ProxyConfig::default()).unwrap();
217        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
218
219        assert_eq!(
220            &healthcheck.await.unwrap_err().to_string(),
221            "Invalid HEC token"
222        );
223    }
224
225    #[tokio::test]
226    async fn test_build_healthcheck_503_response_returns_error() {
227        let mock_server = MockServer::start().await;
228
229        Mock::given(method("GET"))
230            .and(path("/services/collector/health/1.0"))
231            .and(header("Authorization", "Splunk token"))
232            .respond_with(ResponseTemplate::new(503))
233            .mount(&mock_server)
234            .await;
235
236        let client = create_client(None, &ProxyConfig::default()).unwrap();
237        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
238
239        assert_eq!(
240            &healthcheck.await.unwrap_err().to_string(),
241            "Queues are full"
242        );
243    }
244
245    #[tokio::test]
246    async fn test_build_healthcheck_500_response_returns_error() {
247        let mock_server = MockServer::start().await;
248
249        Mock::given(method("GET"))
250            .and(path("/services/collector/health/1.0"))
251            .and(header("Authorization", "Splunk token"))
252            .respond_with(ResponseTemplate::new(500))
253            .mount(&mock_server)
254            .await;
255
256        let client = create_client(None, &ProxyConfig::default()).unwrap();
257        let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
258
259        assert_eq!(
260            &healthcheck.await.unwrap_err().to_string(),
261            "Unexpected status: 500 Internal Server Error"
262        );
263    }
264
265    #[tokio::test]
266    async fn test_build_request_compression_none_returns_expected_request() {
267        let endpoint = "http://localhost:8888";
268        let token = "token";
269        let compression = Compression::None;
270        let events = Bytes::from("events");
271        let http_request_builder = HttpRequestBuilder::new(
272            String::from(endpoint),
273            EndpointTarget::default(),
274            String::from(token),
275            compression,
276        );
277
278        let request = http_request_builder
279            .build_request(
280                events.clone(),
281                "/services/collector/event",
282                None,
283                MetadataFields::default(),
284                false,
285            )
286            .unwrap();
287
288        assert_eq!(
289            request.uri(),
290            &Uri::from_static("http://localhost:8888/services/collector/event")
291        );
292
293        assert_eq!(
294            request.headers().get("Content-Type"),
295            Some(&HeaderValue::from_static("application/json"))
296        );
297
298        assert_eq!(
299            request.headers().get("Authorization"),
300            Some(&HeaderValue::from_static("Splunk token"))
301        );
302
303        assert_eq!(request.headers().get("Content-Encoding"), None);
304
305        assert_eq!(request.body(), &events)
306    }
307
308    #[tokio::test]
309    async fn test_build_request_compression_gzip_returns_expected_request() {
310        let endpoint = "http://localhost:8888";
311        let token = "token";
312        let compression = Compression::gzip_default();
313        let events = Bytes::from("events");
314        let http_request_builder = HttpRequestBuilder::new(
315            String::from(endpoint),
316            EndpointTarget::default(),
317            String::from(token),
318            compression,
319        );
320
321        let request = http_request_builder
322            .build_request(
323                events.clone(),
324                "/services/collector/event",
325                None,
326                MetadataFields::default(),
327                false,
328            )
329            .unwrap();
330
331        assert_eq!(
332            request.uri(),
333            &Uri::from_static("http://localhost:8888/services/collector/event")
334        );
335
336        assert_eq!(
337            request.headers().get("Content-Type"),
338            Some(&HeaderValue::from_static("application/json"))
339        );
340
341        assert_eq!(
342            request.headers().get("Authorization"),
343            Some(&HeaderValue::from_static("Splunk token"))
344        );
345
346        assert_eq!(
347            request.headers().get("Content-Encoding"),
348            Some(&HeaderValue::from_static("gzip"))
349        );
350
351        assert_eq!(request.body(), &events)
352    }
353
354    #[tokio::test]
355    async fn test_build_request_uri_invalid_uri_returns_error() {
356        let endpoint = "invalid";
357        let token = "token";
358        let compression = Compression::gzip_default();
359        let events = Bytes::from("events");
360        let http_request_builder = HttpRequestBuilder::new(
361            String::from(endpoint),
362            EndpointTarget::default(),
363            String::from(token),
364            compression,
365        );
366
367        let err = http_request_builder
368            .build_request(
369                events,
370                "/services/collector/event",
371                None,
372                MetadataFields::default(),
373                false,
374            )
375            .unwrap_err();
376        assert_eq!(err.to_string(), "URI parse error: invalid format")
377    }
378
379    #[test]
380    fn test_build_uri() {
381        let query = [
382            (HOST_FIELD, "zork flork".to_string()),
383            (SOURCE_FIELD, "zam".to_string()),
384        ];
385        let uri = build_uri("http://sproink.com", "/thing/thang", query).unwrap();
386
387        assert_eq!(
388            "http://sproink.com/thing/thang?host=zork%20flork&source=zam"
389                .parse::<Uri>()
390                .unwrap(),
391            uri
392        );
393    }
394}
395
396#[cfg(all(test, feature = "splunk-integration-tests"))]
397mod integration_tests {
398    use std::net::SocketAddr;
399
400    use http::StatusCode;
401    use tokio::time::Duration;
402    use vector_lib::config::proxy::ProxyConfig;
403    use warp::Filter;
404
405    use super::{
406        build_healthcheck, create_client,
407        integration_test_helpers::{get_token, splunk_hec_address},
408    };
409    use crate::{
410        assert_downcast_matches, sinks::splunk_hec::common::HealthcheckError,
411        test_util::retry_until,
412    };
413
414    #[tokio::test]
415    async fn splunk_healthcheck_ok() {
416        let client = create_client(None, &ProxyConfig::default()).unwrap();
417        let address = splunk_hec_address();
418        let token = get_token().await;
419
420        retry_until(
421            || build_healthcheck(address.clone(), token.clone(), client.clone()),
422            Duration::from_millis(500),
423            Duration::from_secs(30),
424        )
425        .await;
426    }
427
428    #[tokio::test]
429    async fn splunk_healthcheck_server_not_listening() {
430        let client = create_client(None, &ProxyConfig::default()).unwrap();
431        let healthcheck = build_healthcheck(
432            "http://localhost:1111/".to_string(),
433            get_token().await,
434            client,
435        );
436
437        healthcheck.await.unwrap_err();
438    }
439
440    #[tokio::test]
441    async fn splunk_healthcheck_server_unavailable() {
442        let client = create_client(None, &ProxyConfig::default()).unwrap();
443        let healthcheck = build_healthcheck(
444            "http://localhost:5503/".to_string(),
445            get_token().await,
446            client,
447        );
448
449        let unhealthy = warp::any()
450            .map(|| warp::reply::with_status("i'm sad", StatusCode::SERVICE_UNAVAILABLE));
451        let server = warp::serve(unhealthy).bind("0.0.0.0:5503".parse::<SocketAddr>().unwrap());
452        tokio::spawn(server);
453
454        assert_downcast_matches!(
455            healthcheck.await.unwrap_err(),
456            HealthcheckError,
457            HealthcheckError::QueuesFull
458        );
459    }
460}
461
462#[cfg(all(test, feature = "splunk-integration-tests"))]
463pub mod integration_test_helpers {
464    use serde_json::Value as JsonValue;
465    use tokio::time::Duration;
466
467    use crate::test_util::retry_until;
468
469    const USERNAME: &str = "admin";
470    const PASSWORD: &str = "password";
471
472    pub fn splunk_hec_address() -> String {
473        std::env::var("SPLUNK_HEC_ADDRESS").unwrap_or_else(|_| "http://localhost:8088".into())
474    }
475
476    pub fn splunk_api_address() -> String {
477        std::env::var("SPLUNK_API_ADDRESS").unwrap_or_else(|_| "https://localhost:8089".into())
478    }
479
480    pub async fn get_token() -> String {
481        let client = reqwest::Client::builder()
482            .danger_accept_invalid_certs(true)
483            .build()
484            .unwrap();
485
486        let res = retry_until(
487            || {
488                client
489                    .get(format!(
490                        "{}/services/data/inputs/http?output_mode=json",
491                        splunk_api_address()
492                    ))
493                    .basic_auth(USERNAME, Some(PASSWORD))
494                    .send()
495            },
496            Duration::from_millis(500),
497            Duration::from_secs(30),
498        )
499        .await;
500
501        let json: JsonValue = res.json().await.unwrap();
502        let entries = json["entry"].as_array().unwrap().clone();
503
504        if entries.is_empty() {
505            panic!("You don't have any HTTP Event Collector inputs set up in Splunk");
506        }
507
508        entries[0]["content"]["token"].as_str().unwrap().to_owned()
509    }
510}