1use std::{borrow::Cow, sync::Arc};
2
3use bytes::Bytes;
4use futures_util::future::BoxFuture;
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use snafu::{ResultExt, Snafu};
8use vector_lib::{
9 config::proxy::ProxyConfig,
10 event::EventRef,
11 lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath},
12};
13
14use super::{
15 EndpointTarget,
16 request::HecRequest,
17 service::{HttpRequestBuilder, MetadataFields},
18};
19use crate::{
20 http::HttpClient,
21 internal_events::TemplateRenderingError,
22 sinks::{
23 self, UriParseSnafu,
24 util::{SinkBatchSettings, http::HttpBatchService},
25 },
26 template::Template,
27 tls::{TlsConfig, TlsSettings},
28};
29
30#[derive(Clone, Copy, Debug, Default)]
31pub struct SplunkHecDefaultBatchSettings;
32
33impl SinkBatchSettings for SplunkHecDefaultBatchSettings {
34 const MAX_EVENTS: Option<usize> = None;
35 const MAX_BYTES: Option<usize> = Some(1_000_000);
36 const TIMEOUT_SECS: f64 = 1.0;
37}
38
39#[derive(Debug, Snafu)]
40pub enum HealthcheckError {
41 #[snafu(display("Invalid HEC token"))]
42 InvalidToken,
43 #[snafu(display("Queues are full"))]
44 QueuesFull,
45}
46
47pub fn create_client(
48 tls: Option<&TlsConfig>,
49 proxy_config: &ProxyConfig,
50) -> crate::Result<HttpClient> {
51 let tls_settings = TlsSettings::from_options(tls)?;
52 Ok(HttpClient::new(tls_settings, proxy_config)?)
53}
54
55pub fn build_http_batch_service(
59 client: HttpClient,
60 http_request_builder: Arc<HttpRequestBuilder>,
61 endpoint_target: EndpointTarget,
62 auto_extract_timestamp: bool,
63) -> HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HecRequest> {
64 HttpBatchService::new(client, move |req: HecRequest| {
65 let request_builder = Arc::clone(&http_request_builder);
66 let future: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
67 Box::pin(async move {
68 request_builder.build_request(
69 req.body,
70 match endpoint_target {
71 EndpointTarget::Event => "/services/collector/event",
72 EndpointTarget::Raw => "/services/collector/raw",
73 },
74 req.passthrough_token,
75 MetadataFields {
76 source: req.source,
77 sourcetype: req.sourcetype,
78 index: req.index,
79 host: req.host,
80 },
81 auto_extract_timestamp,
82 )
83 });
84 future
85 })
86}
87
88pub async fn build_healthcheck(
89 endpoint: String,
90 token: String,
91 client: HttpClient,
92) -> crate::Result<()> {
93 let uri = build_uri(endpoint.as_str(), "/services/collector/health/1.0", None)
94 .context(UriParseSnafu)?;
95
96 let request = Request::get(uri)
97 .header("Authorization", format!("Splunk {token}"))
98 .body(Body::empty())
99 .unwrap();
100
101 let response = client.send(request).await?;
102 match response.status() {
103 StatusCode::OK => Ok(()),
104 StatusCode::BAD_REQUEST => Err(HealthcheckError::InvalidToken.into()),
105 StatusCode::SERVICE_UNAVAILABLE => Err(HealthcheckError::QueuesFull.into()),
106 other => Err(sinks::HealthcheckError::UnexpectedStatus { status: other }.into()),
107 }
108}
109
110pub fn build_uri(
111 host: &str,
112 path: &str,
113 query: impl IntoIterator<Item = (&'static str, String)>,
114) -> Result<Uri, http::uri::InvalidUri> {
115 let mut uri = format!("{}{}", host.trim_end_matches('/'), path);
116
117 let mut first = true;
118
119 for (key, value) in query.into_iter() {
120 if first {
121 uri.push('?');
122 first = false;
123 } else {
124 uri.push('&');
125 }
126 uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
127 key,
128 percent_encoding::NON_ALPHANUMERIC,
129 )));
130 uri.push('=');
131 uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
132 &value,
133 percent_encoding::NON_ALPHANUMERIC,
134 )));
135 }
136
137 uri.parse::<Uri>()
138}
139
140pub fn config_host_key() -> OptionalValuePath {
141 OptionalValuePath {
142 path: crate::config::log_schema().host_key().cloned(),
143 }
144}
145
146pub fn config_timestamp_key_target_path() -> OptionalTargetPath {
147 OptionalTargetPath {
148 path: crate::config::log_schema()
149 .timestamp_key_target_path()
150 .cloned(),
151 }
152}
153
154pub fn render_template_string<'a>(
155 template: &Template,
156 event: impl Into<EventRef<'a>>,
157 field_name: &str,
158) -> Option<String> {
159 template
160 .render_string(event)
161 .map_err(|error| {
162 emit!(TemplateRenderingError {
163 error,
164 field: Some(field_name),
165 drop_event: false
166 });
167 })
168 .ok()
169}
170
171#[cfg(test)]
172mod tests {
173 use bytes::Bytes;
174 use http::{HeaderValue, Uri};
175 use vector_lib::config::proxy::ProxyConfig;
176 use wiremock::{
177 Mock, MockServer, ResponseTemplate,
178 matchers::{header, method, path},
179 };
180
181 use crate::sinks::{
182 splunk_hec::common::{
183 EndpointTarget, HOST_FIELD, SOURCE_FIELD, build_healthcheck, build_uri, create_client,
184 service::{HttpRequestBuilder, MetadataFields},
185 },
186 util::Compression,
187 };
188
189 #[tokio::test]
190 async fn test_build_healthcheck_200_response_returns_ok() {
191 let mock_server = MockServer::start().await;
192
193 Mock::given(method("GET"))
194 .and(path("/services/collector/health/1.0"))
195 .and(header("Authorization", "Splunk token"))
196 .respond_with(ResponseTemplate::new(200))
197 .mount(&mock_server)
198 .await;
199
200 let client = create_client(None, &ProxyConfig::default()).unwrap();
201 let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
202
203 assert!(healthcheck.await.is_ok())
204 }
205
206 #[tokio::test]
207 async fn test_build_healthcheck_400_response_returns_error() {
208 let mock_server = MockServer::start().await;
209
210 Mock::given(method("GET"))
211 .and(path("/services/collector/health/1.0"))
212 .and(header("Authorization", "Splunk token"))
213 .respond_with(ResponseTemplate::new(400))
214 .mount(&mock_server)
215 .await;
216
217 let client = create_client(None, &ProxyConfig::default()).unwrap();
218 let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
219
220 assert_eq!(
221 &healthcheck.await.unwrap_err().to_string(),
222 "Invalid HEC token"
223 );
224 }
225
226 #[tokio::test]
227 async fn test_build_healthcheck_503_response_returns_error() {
228 let mock_server = MockServer::start().await;
229
230 Mock::given(method("GET"))
231 .and(path("/services/collector/health/1.0"))
232 .and(header("Authorization", "Splunk token"))
233 .respond_with(ResponseTemplate::new(503))
234 .mount(&mock_server)
235 .await;
236
237 let client = create_client(None, &ProxyConfig::default()).unwrap();
238 let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
239
240 assert_eq!(
241 &healthcheck.await.unwrap_err().to_string(),
242 "Queues are full"
243 );
244 }
245
246 #[tokio::test]
247 async fn test_build_healthcheck_500_response_returns_error() {
248 let mock_server = MockServer::start().await;
249
250 Mock::given(method("GET"))
251 .and(path("/services/collector/health/1.0"))
252 .and(header("Authorization", "Splunk token"))
253 .respond_with(ResponseTemplate::new(500))
254 .mount(&mock_server)
255 .await;
256
257 let client = create_client(None, &ProxyConfig::default()).unwrap();
258 let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
259
260 assert_eq!(
261 &healthcheck.await.unwrap_err().to_string(),
262 "Unexpected status: 500 Internal Server Error"
263 );
264 }
265
266 #[tokio::test]
267 async fn test_build_request_compression_none_returns_expected_request() {
268 let endpoint = "http://localhost:8888";
269 let token = "token";
270 let compression = Compression::None;
271 let events = Bytes::from("events");
272 let http_request_builder = HttpRequestBuilder::new(
273 String::from(endpoint),
274 EndpointTarget::default(),
275 String::from(token),
276 compression,
277 );
278
279 let request = http_request_builder
280 .build_request(
281 events.clone(),
282 "/services/collector/event",
283 None,
284 MetadataFields::default(),
285 false,
286 )
287 .unwrap();
288
289 assert_eq!(
290 request.uri(),
291 &Uri::from_static("http://localhost:8888/services/collector/event")
292 );
293
294 assert_eq!(
295 request.headers().get("Content-Type"),
296 Some(&HeaderValue::from_static("application/json"))
297 );
298
299 assert_eq!(
300 request.headers().get("Authorization"),
301 Some(&HeaderValue::from_static("Splunk token"))
302 );
303
304 assert_eq!(request.headers().get("Content-Encoding"), None);
305
306 assert_eq!(request.body(), &events)
307 }
308
309 #[tokio::test]
310 async fn test_build_request_compression_gzip_returns_expected_request() {
311 let endpoint = "http://localhost:8888";
312 let token = "token";
313 let compression = Compression::gzip_default();
314 let events = Bytes::from("events");
315 let http_request_builder = HttpRequestBuilder::new(
316 String::from(endpoint),
317 EndpointTarget::default(),
318 String::from(token),
319 compression,
320 );
321
322 let request = http_request_builder
323 .build_request(
324 events.clone(),
325 "/services/collector/event",
326 None,
327 MetadataFields::default(),
328 false,
329 )
330 .unwrap();
331
332 assert_eq!(
333 request.uri(),
334 &Uri::from_static("http://localhost:8888/services/collector/event")
335 );
336
337 assert_eq!(
338 request.headers().get("Content-Type"),
339 Some(&HeaderValue::from_static("application/json"))
340 );
341
342 assert_eq!(
343 request.headers().get("Authorization"),
344 Some(&HeaderValue::from_static("Splunk token"))
345 );
346
347 assert_eq!(
348 request.headers().get("Content-Encoding"),
349 Some(&HeaderValue::from_static("gzip"))
350 );
351
352 assert_eq!(request.body(), &events)
353 }
354
355 #[tokio::test]
356 async fn test_build_request_uri_invalid_uri_returns_error() {
357 let endpoint = "invalid";
358 let token = "token";
359 let compression = Compression::gzip_default();
360 let events = Bytes::from("events");
361 let http_request_builder = HttpRequestBuilder::new(
362 String::from(endpoint),
363 EndpointTarget::default(),
364 String::from(token),
365 compression,
366 );
367
368 let err = http_request_builder
369 .build_request(
370 events,
371 "/services/collector/event",
372 None,
373 MetadataFields::default(),
374 false,
375 )
376 .unwrap_err();
377 assert_eq!(err.to_string(), "URI parse error: invalid format")
378 }
379
380 #[test]
381 fn test_build_uri() {
382 let query = [
383 (HOST_FIELD, "zork flork".to_string()),
384 (SOURCE_FIELD, "zam".to_string()),
385 ];
386 let uri = build_uri("http://sproink.com", "/thing/thang", query).unwrap();
387
388 assert_eq!(
389 "http://sproink.com/thing/thang?host=zork%20flork&source=zam"
390 .parse::<Uri>()
391 .unwrap(),
392 uri
393 );
394 }
395}
396
397#[cfg(all(test, feature = "splunk-integration-tests"))]
398mod integration_tests {
399 use std::net::SocketAddr;
400
401 use http::StatusCode;
402 use tokio::time::Duration;
403 use vector_lib::config::proxy::ProxyConfig;
404 use warp::Filter;
405
406 use super::{
407 build_healthcheck, create_client,
408 integration_test_helpers::{get_token, splunk_hec_address},
409 };
410 use crate::{
411 assert_downcast_matches, sinks::splunk_hec::common::HealthcheckError,
412 test_util::retry_until,
413 };
414
415 #[tokio::test]
416 async fn splunk_healthcheck_ok() {
417 let client = create_client(None, &ProxyConfig::default()).unwrap();
418 let address = splunk_hec_address();
419 let token = get_token().await;
420
421 retry_until(
422 || build_healthcheck(address.clone(), token.clone(), client.clone()),
423 Duration::from_millis(500),
424 Duration::from_secs(30),
425 )
426 .await;
427 }
428
429 #[tokio::test]
430 async fn splunk_healthcheck_server_not_listening() {
431 let client = create_client(None, &ProxyConfig::default()).unwrap();
432 let healthcheck = build_healthcheck(
433 "http://localhost:1111/".to_string(),
434 get_token().await,
435 client,
436 );
437
438 healthcheck.await.unwrap_err();
439 }
440
441 #[tokio::test]
442 async fn splunk_healthcheck_server_unavailable() {
443 let client = create_client(None, &ProxyConfig::default()).unwrap();
444 let healthcheck = build_healthcheck(
445 "http://localhost:5503/".to_string(),
446 get_token().await,
447 client,
448 );
449
450 let unhealthy = warp::any()
451 .map(|| warp::reply::with_status("i'm sad", StatusCode::SERVICE_UNAVAILABLE));
452 let server = warp::serve(unhealthy).bind("0.0.0.0:5503".parse::<SocketAddr>().unwrap());
453 tokio::spawn(server);
454
455 assert_downcast_matches!(
456 healthcheck.await.unwrap_err(),
457 HealthcheckError,
458 HealthcheckError::QueuesFull
459 );
460 }
461}
462
463#[cfg(all(test, feature = "splunk-integration-tests"))]
464pub mod integration_test_helpers {
465 use serde_json::Value as JsonValue;
466 use tokio::time::Duration;
467
468 use crate::test_util::retry_until;
469
470 const USERNAME: &str = "admin";
471 const PASSWORD: &str = "password";
472
473 pub fn splunk_hec_address() -> String {
474 std::env::var("SPLUNK_HEC_ADDRESS").unwrap_or_else(|_| "http://localhost:8088".into())
475 }
476
477 pub fn splunk_api_address() -> String {
478 std::env::var("SPLUNK_API_ADDRESS").unwrap_or_else(|_| "https://localhost:8089".into())
479 }
480
481 pub async fn get_token() -> String {
482 let client = reqwest::Client::builder()
483 .danger_accept_invalid_certs(true)
484 .build()
485 .unwrap();
486
487 let res = retry_until(
488 || {
489 client
490 .get(format!(
491 "{}/services/data/inputs/http?output_mode=json",
492 splunk_api_address()
493 ))
494 .basic_auth(USERNAME, Some(PASSWORD))
495 .send()
496 },
497 Duration::from_millis(500),
498 Duration::from_secs(30),
499 )
500 .await;
501
502 let json: JsonValue = res.json().await.unwrap();
503 let entries = json["entry"].as_array().unwrap().clone();
504
505 if entries.is_empty() {
506 panic!("You don't have any HTTP Event Collector inputs set up in Splunk");
507 }
508
509 entries[0]["content"]["token"].as_str().unwrap().to_owned()
510 }
511}