1use std::{borrow::Cow, sync::Arc};
2
3use bytes::Bytes;
4use futures_util::future::BoxFuture;
5use http::{Request, StatusCode, Uri};
6use hyper::Body;
7use snafu::{ResultExt, Snafu};
8use vector_lib::lookup::lookup_v2::{OptionalTargetPath, OptionalValuePath};
9use vector_lib::{config::proxy::ProxyConfig, event::EventRef};
10
11use super::{
12 request::HecRequest,
13 service::{HttpRequestBuilder, MetadataFields},
14 EndpointTarget,
15};
16use crate::{
17 http::HttpClient,
18 internal_events::TemplateRenderingError,
19 sinks::{
20 self,
21 util::{http::HttpBatchService, SinkBatchSettings},
22 UriParseSnafu,
23 },
24 template::Template,
25 tls::{TlsConfig, TlsSettings},
26};
27
28#[derive(Clone, Copy, Debug, Default)]
29pub struct SplunkHecDefaultBatchSettings;
30
31impl SinkBatchSettings for SplunkHecDefaultBatchSettings {
32 const MAX_EVENTS: Option<usize> = None;
33 const MAX_BYTES: Option<usize> = Some(1_000_000);
34 const TIMEOUT_SECS: f64 = 1.0;
35}
36
37#[derive(Debug, Snafu)]
38pub enum HealthcheckError {
39 #[snafu(display("Invalid HEC token"))]
40 InvalidToken,
41 #[snafu(display("Queues are full"))]
42 QueuesFull,
43}
44
45pub fn create_client(
46 tls: Option<&TlsConfig>,
47 proxy_config: &ProxyConfig,
48) -> crate::Result<HttpClient> {
49 let tls_settings = TlsSettings::from_options(tls)?;
50 Ok(HttpClient::new(tls_settings, proxy_config)?)
51}
52
53pub fn build_http_batch_service(
57 client: HttpClient,
58 http_request_builder: Arc<HttpRequestBuilder>,
59 endpoint_target: EndpointTarget,
60 auto_extract_timestamp: bool,
61) -> HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HecRequest> {
62 HttpBatchService::new(client, move |req: HecRequest| {
63 let request_builder = Arc::clone(&http_request_builder);
64 let future: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
65 Box::pin(async move {
66 request_builder.build_request(
67 req.body,
68 match endpoint_target {
69 EndpointTarget::Event => "/services/collector/event",
70 EndpointTarget::Raw => "/services/collector/raw",
71 },
72 req.passthrough_token,
73 MetadataFields {
74 source: req.source,
75 sourcetype: req.sourcetype,
76 index: req.index,
77 host: req.host,
78 },
79 auto_extract_timestamp,
80 )
81 });
82 future
83 })
84}
85
86pub async fn build_healthcheck(
87 endpoint: String,
88 token: String,
89 client: HttpClient,
90) -> crate::Result<()> {
91 let uri = build_uri(endpoint.as_str(), "/services/collector/health/1.0", None)
92 .context(UriParseSnafu)?;
93
94 let request = Request::get(uri)
95 .header("Authorization", format!("Splunk {token}"))
96 .body(Body::empty())
97 .unwrap();
98
99 let response = client.send(request).await?;
100 match response.status() {
101 StatusCode::OK => Ok(()),
102 StatusCode::BAD_REQUEST => Err(HealthcheckError::InvalidToken.into()),
103 StatusCode::SERVICE_UNAVAILABLE => Err(HealthcheckError::QueuesFull.into()),
104 other => Err(sinks::HealthcheckError::UnexpectedStatus { status: other }.into()),
105 }
106}
107
108pub fn build_uri(
109 host: &str,
110 path: &str,
111 query: impl IntoIterator<Item = (&'static str, String)>,
112) -> Result<Uri, http::uri::InvalidUri> {
113 let mut uri = format!("{}{}", host.trim_end_matches('/'), path);
114
115 let mut first = true;
116
117 for (key, value) in query.into_iter() {
118 if first {
119 uri.push('?');
120 first = false;
121 } else {
122 uri.push('&');
123 }
124 uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
125 key,
126 percent_encoding::NON_ALPHANUMERIC,
127 )));
128 uri.push('=');
129 uri.push_str(&Cow::<str>::from(percent_encoding::utf8_percent_encode(
130 &value,
131 percent_encoding::NON_ALPHANUMERIC,
132 )));
133 }
134
135 uri.parse::<Uri>()
136}
137
138pub fn config_host_key() -> OptionalValuePath {
139 OptionalValuePath {
140 path: crate::config::log_schema().host_key().cloned(),
141 }
142}
143
144pub fn config_timestamp_key_target_path() -> OptionalTargetPath {
145 OptionalTargetPath {
146 path: crate::config::log_schema()
147 .timestamp_key_target_path()
148 .cloned(),
149 }
150}
151
152pub fn render_template_string<'a>(
153 template: &Template,
154 event: impl Into<EventRef<'a>>,
155 field_name: &str,
156) -> Option<String> {
157 template
158 .render_string(event)
159 .map_err(|error| {
160 emit!(TemplateRenderingError {
161 error,
162 field: Some(field_name),
163 drop_event: false
164 });
165 })
166 .ok()
167}
168
169#[cfg(test)]
170mod tests {
171 use bytes::Bytes;
172 use http::{HeaderValue, Uri};
173 use vector_lib::config::proxy::ProxyConfig;
174 use wiremock::{
175 matchers::{header, method, path},
176 Mock, MockServer, ResponseTemplate,
177 };
178
179 use crate::sinks::{
180 splunk_hec::common::{
181 build_healthcheck, build_uri, create_client,
182 service::{HttpRequestBuilder, MetadataFields},
183 EndpointTarget, HOST_FIELD, SOURCE_FIELD,
184 },
185 util::Compression,
186 };
187
188 #[tokio::test]
189 async fn test_build_healthcheck_200_response_returns_ok() {
190 let mock_server = MockServer::start().await;
191
192 Mock::given(method("GET"))
193 .and(path("/services/collector/health/1.0"))
194 .and(header("Authorization", "Splunk token"))
195 .respond_with(ResponseTemplate::new(200))
196 .mount(&mock_server)
197 .await;
198
199 let client = create_client(None, &ProxyConfig::default()).unwrap();
200 let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
201
202 assert!(healthcheck.await.is_ok())
203 }
204
205 #[tokio::test]
206 async fn test_build_healthcheck_400_response_returns_error() {
207 let mock_server = MockServer::start().await;
208
209 Mock::given(method("GET"))
210 .and(path("/services/collector/health/1.0"))
211 .and(header("Authorization", "Splunk token"))
212 .respond_with(ResponseTemplate::new(400))
213 .mount(&mock_server)
214 .await;
215
216 let client = create_client(None, &ProxyConfig::default()).unwrap();
217 let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
218
219 assert_eq!(
220 &healthcheck.await.unwrap_err().to_string(),
221 "Invalid HEC token"
222 );
223 }
224
225 #[tokio::test]
226 async fn test_build_healthcheck_503_response_returns_error() {
227 let mock_server = MockServer::start().await;
228
229 Mock::given(method("GET"))
230 .and(path("/services/collector/health/1.0"))
231 .and(header("Authorization", "Splunk token"))
232 .respond_with(ResponseTemplate::new(503))
233 .mount(&mock_server)
234 .await;
235
236 let client = create_client(None, &ProxyConfig::default()).unwrap();
237 let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
238
239 assert_eq!(
240 &healthcheck.await.unwrap_err().to_string(),
241 "Queues are full"
242 );
243 }
244
245 #[tokio::test]
246 async fn test_build_healthcheck_500_response_returns_error() {
247 let mock_server = MockServer::start().await;
248
249 Mock::given(method("GET"))
250 .and(path("/services/collector/health/1.0"))
251 .and(header("Authorization", "Splunk token"))
252 .respond_with(ResponseTemplate::new(500))
253 .mount(&mock_server)
254 .await;
255
256 let client = create_client(None, &ProxyConfig::default()).unwrap();
257 let healthcheck = build_healthcheck(mock_server.uri(), "token".to_string(), client);
258
259 assert_eq!(
260 &healthcheck.await.unwrap_err().to_string(),
261 "Unexpected status: 500 Internal Server Error"
262 );
263 }
264
265 #[tokio::test]
266 async fn test_build_request_compression_none_returns_expected_request() {
267 let endpoint = "http://localhost:8888";
268 let token = "token";
269 let compression = Compression::None;
270 let events = Bytes::from("events");
271 let http_request_builder = HttpRequestBuilder::new(
272 String::from(endpoint),
273 EndpointTarget::default(),
274 String::from(token),
275 compression,
276 );
277
278 let request = http_request_builder
279 .build_request(
280 events.clone(),
281 "/services/collector/event",
282 None,
283 MetadataFields::default(),
284 false,
285 )
286 .unwrap();
287
288 assert_eq!(
289 request.uri(),
290 &Uri::from_static("http://localhost:8888/services/collector/event")
291 );
292
293 assert_eq!(
294 request.headers().get("Content-Type"),
295 Some(&HeaderValue::from_static("application/json"))
296 );
297
298 assert_eq!(
299 request.headers().get("Authorization"),
300 Some(&HeaderValue::from_static("Splunk token"))
301 );
302
303 assert_eq!(request.headers().get("Content-Encoding"), None);
304
305 assert_eq!(request.body(), &events)
306 }
307
308 #[tokio::test]
309 async fn test_build_request_compression_gzip_returns_expected_request() {
310 let endpoint = "http://localhost:8888";
311 let token = "token";
312 let compression = Compression::gzip_default();
313 let events = Bytes::from("events");
314 let http_request_builder = HttpRequestBuilder::new(
315 String::from(endpoint),
316 EndpointTarget::default(),
317 String::from(token),
318 compression,
319 );
320
321 let request = http_request_builder
322 .build_request(
323 events.clone(),
324 "/services/collector/event",
325 None,
326 MetadataFields::default(),
327 false,
328 )
329 .unwrap();
330
331 assert_eq!(
332 request.uri(),
333 &Uri::from_static("http://localhost:8888/services/collector/event")
334 );
335
336 assert_eq!(
337 request.headers().get("Content-Type"),
338 Some(&HeaderValue::from_static("application/json"))
339 );
340
341 assert_eq!(
342 request.headers().get("Authorization"),
343 Some(&HeaderValue::from_static("Splunk token"))
344 );
345
346 assert_eq!(
347 request.headers().get("Content-Encoding"),
348 Some(&HeaderValue::from_static("gzip"))
349 );
350
351 assert_eq!(request.body(), &events)
352 }
353
354 #[tokio::test]
355 async fn test_build_request_uri_invalid_uri_returns_error() {
356 let endpoint = "invalid";
357 let token = "token";
358 let compression = Compression::gzip_default();
359 let events = Bytes::from("events");
360 let http_request_builder = HttpRequestBuilder::new(
361 String::from(endpoint),
362 EndpointTarget::default(),
363 String::from(token),
364 compression,
365 );
366
367 let err = http_request_builder
368 .build_request(
369 events,
370 "/services/collector/event",
371 None,
372 MetadataFields::default(),
373 false,
374 )
375 .unwrap_err();
376 assert_eq!(err.to_string(), "URI parse error: invalid format")
377 }
378
379 #[test]
380 fn test_build_uri() {
381 let query = [
382 (HOST_FIELD, "zork flork".to_string()),
383 (SOURCE_FIELD, "zam".to_string()),
384 ];
385 let uri = build_uri("http://sproink.com", "/thing/thang", query).unwrap();
386
387 assert_eq!(
388 "http://sproink.com/thing/thang?host=zork%20flork&source=zam"
389 .parse::<Uri>()
390 .unwrap(),
391 uri
392 );
393 }
394}
395
396#[cfg(all(test, feature = "splunk-integration-tests"))]
397mod integration_tests {
398 use std::net::SocketAddr;
399
400 use http::StatusCode;
401 use tokio::time::Duration;
402 use vector_lib::config::proxy::ProxyConfig;
403 use warp::Filter;
404
405 use super::{
406 build_healthcheck, create_client,
407 integration_test_helpers::{get_token, splunk_hec_address},
408 };
409 use crate::{
410 assert_downcast_matches, sinks::splunk_hec::common::HealthcheckError,
411 test_util::retry_until,
412 };
413
414 #[tokio::test]
415 async fn splunk_healthcheck_ok() {
416 let client = create_client(None, &ProxyConfig::default()).unwrap();
417 let address = splunk_hec_address();
418 let token = get_token().await;
419
420 retry_until(
421 || build_healthcheck(address.clone(), token.clone(), client.clone()),
422 Duration::from_millis(500),
423 Duration::from_secs(30),
424 )
425 .await;
426 }
427
428 #[tokio::test]
429 async fn splunk_healthcheck_server_not_listening() {
430 let client = create_client(None, &ProxyConfig::default()).unwrap();
431 let healthcheck = build_healthcheck(
432 "http://localhost:1111/".to_string(),
433 get_token().await,
434 client,
435 );
436
437 healthcheck.await.unwrap_err();
438 }
439
440 #[tokio::test]
441 async fn splunk_healthcheck_server_unavailable() {
442 let client = create_client(None, &ProxyConfig::default()).unwrap();
443 let healthcheck = build_healthcheck(
444 "http://localhost:5503/".to_string(),
445 get_token().await,
446 client,
447 );
448
449 let unhealthy = warp::any()
450 .map(|| warp::reply::with_status("i'm sad", StatusCode::SERVICE_UNAVAILABLE));
451 let server = warp::serve(unhealthy).bind("0.0.0.0:5503".parse::<SocketAddr>().unwrap());
452 tokio::spawn(server);
453
454 assert_downcast_matches!(
455 healthcheck.await.unwrap_err(),
456 HealthcheckError,
457 HealthcheckError::QueuesFull
458 );
459 }
460}
461
462#[cfg(all(test, feature = "splunk-integration-tests"))]
463pub mod integration_test_helpers {
464 use serde_json::Value as JsonValue;
465 use tokio::time::Duration;
466
467 use crate::test_util::retry_until;
468
469 const USERNAME: &str = "admin";
470 const PASSWORD: &str = "password";
471
472 pub fn splunk_hec_address() -> String {
473 std::env::var("SPLUNK_HEC_ADDRESS").unwrap_or_else(|_| "http://localhost:8088".into())
474 }
475
476 pub fn splunk_api_address() -> String {
477 std::env::var("SPLUNK_API_ADDRESS").unwrap_or_else(|_| "https://localhost:8089".into())
478 }
479
480 pub async fn get_token() -> String {
481 let client = reqwest::Client::builder()
482 .danger_accept_invalid_certs(true)
483 .build()
484 .unwrap();
485
486 let res = retry_until(
487 || {
488 client
489 .get(format!(
490 "{}/services/data/inputs/http?output_mode=json",
491 splunk_api_address()
492 ))
493 .basic_auth(USERNAME, Some(PASSWORD))
494 .send()
495 },
496 Duration::from_millis(500),
497 Duration::from_secs(30),
498 )
499 .await;
500
501 let json: JsonValue = res.json().await.unwrap();
502 let entries = json["entry"].as_array().unwrap().clone();
503
504 if entries.is_empty() {
505 panic!("You don't have any HTTP Event Collector inputs set up in Splunk");
506 }
507
508 entries[0]["content"]["token"].as_str().unwrap().to_owned()
509 }
510}