1use std::{
2 collections::HashMap,
3 convert::Infallible,
4 io::Read,
5 net::{Ipv4Addr, SocketAddr},
6 sync::Arc,
7 time::Duration,
8};
9
10use bytes::{Buf, Bytes};
11use chrono::{DateTime, TimeZone, Utc};
12use flate2::read::MultiGzDecoder;
13use futures::FutureExt;
14use http::StatusCode;
15use hyper::{Server, service::make_service_fn};
16use serde::{Serialize, de::DeserializeOwned};
17use serde_json::{
18 Deserializer, Value as JsonValue,
19 de::{Read as JsonRead, StrRead},
20};
21use snafu::Snafu;
22use tokio::net::TcpStream;
23use tower::ServiceBuilder;
24use tracing::Span;
25use vector_lib::{
26 EstimatedJsonEncodedSizeOf,
27 config::{LegacyKey, LogNamespace},
28 configurable::configurable_component,
29 event::BatchNotifier,
30 internal_event::{CountByteSize, InternalEventHandle as _, Registered},
31 lookup::{self, event_path, lookup_v2::OptionalValuePath, owned_value_path},
32 schema::meaning,
33 sensitive_string::SensitiveString,
34 source_sender::SendError,
35 tls::MaybeTlsIncomingStream,
36};
37use vrl::{
38 path::OwnedTargetPath,
39 value::{Kind, kind::Collection},
40};
41use warp::{
42 Filter, Reply,
43 filters::BoxedFilter,
44 http::header::{CONTENT_TYPE, HeaderValue},
45 path,
46 reject::Rejection,
47 reply::Response,
48};
49
50use self::{
51 acknowledgements::{
52 HecAckStatusRequest, HecAckStatusResponse, HecAcknowledgementsConfig,
53 IndexerAcknowledgement,
54 },
55 splunk_response::{HecResponse, HecResponseMetadata, HecStatusCode},
56};
57use crate::{
58 SourceSender,
59 config::{DataType, Resource, SourceConfig, SourceContext, SourceOutput, log_schema},
60 event::{Event, LogEvent, Value},
61 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
62 internal_events::{
63 EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError,
64 },
65 serde::bool_or_struct,
66 tls::{MaybeTlsSettings, TlsEnableableConfig},
67};
68
69mod acknowledgements;
70
71pub const CHANNEL: &str = "splunk_channel";
73pub const INDEX: &str = "splunk_index";
74pub const SOURCE: &str = "splunk_source";
75pub const SOURCETYPE: &str = "splunk_sourcetype";
76
77const X_SPLUNK_REQUEST_CHANNEL: &str = "x-splunk-request-channel";
78
79#[configurable_component(source("splunk_hec", "Receive logs from Splunk."))]
81#[derive(Clone, Debug)]
82#[serde(deny_unknown_fields, default)]
83pub struct SplunkConfig {
84 #[serde(default = "default_socket_address")]
88 pub address: SocketAddr,
89
90 #[configurable(deprecated = "This option has been deprecated, use `valid_tokens` instead.")]
97 token: Option<SensitiveString>,
98
99 #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
106 valid_tokens: Option<Vec<SensitiveString>>,
107
108 store_hec_token: bool,
113
114 #[configurable(derived)]
115 tls: Option<TlsEnableableConfig>,
116
117 #[configurable(derived)]
118 #[serde(deserialize_with = "bool_or_struct")]
119 acknowledgements: HecAcknowledgementsConfig,
120
121 #[configurable(metadata(docs::hidden))]
123 #[serde(default)]
124 log_namespace: Option<bool>,
125
126 #[configurable(derived)]
127 #[serde(default)]
128 keepalive: KeepaliveConfig,
129}
130
131impl_generate_config_from_default!(SplunkConfig);
132
133impl Default for SplunkConfig {
134 fn default() -> Self {
135 SplunkConfig {
136 address: default_socket_address(),
137 token: None,
138 valid_tokens: None,
139 tls: None,
140 acknowledgements: Default::default(),
141 store_hec_token: false,
142 log_namespace: None,
143 keepalive: Default::default(),
144 }
145 }
146}
147
148fn default_socket_address() -> SocketAddr {
149 SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8088)
150}
151
152#[async_trait::async_trait]
153#[typetag::serde(name = "splunk_hec")]
154impl SourceConfig for SplunkConfig {
155 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
156 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
157 let shutdown = cx.shutdown.clone();
158 let out = cx.out.clone();
159 let source = SplunkSource::new(self, tls.http_protocol_name(), cx);
160
161 let event_service = source.event_service(out.clone());
162 let raw_service = source.raw_service(out);
163 let health_service = source.health_service();
164 let ack_service = source.ack_service();
165 let options = SplunkSource::options();
166
167 let services = path!("services" / "collector" / ..)
168 .and(
169 event_service
170 .or(raw_service)
171 .unify()
172 .or(health_service)
173 .unify()
174 .or(ack_service)
175 .unify()
176 .or(options)
177 .unify(),
178 )
179 .or_else(finish_err);
180
181 let listener = tls.bind(&self.address).await?;
182
183 let keepalive_settings = self.keepalive.clone();
184 Ok(Box::pin(async move {
185 let span = Span::current();
186 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
187 let svc = ServiceBuilder::new()
188 .layer(build_http_trace_layer(span.clone()))
189 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
190 MaxConnectionAgeLayer::new(
191 Duration::from_secs(secs),
192 keepalive_settings.max_connection_age_jitter_factor,
193 conn.peer_addr(),
194 )
195 }))
196 .service(warp::service(services.clone()));
197 futures_util::future::ok::<_, Infallible>(svc)
198 });
199
200 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
201 .serve(make_svc)
202 .with_graceful_shutdown(shutdown.map(|_| ()))
203 .await
204 .map_err(|err| {
205 error!("An error occurred: {:?}.", err);
206 })?;
207
208 Ok(())
209 }))
210 }
211
212 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
213 let log_namespace = global_log_namespace.merge(self.log_namespace);
214
215 let schema_definition = match log_namespace {
216 LogNamespace::Legacy => {
217 let definition = vector_lib::schema::Definition::empty_legacy_namespace()
218 .with_event_field(
219 &owned_value_path!("line"),
220 Kind::object(Collection::empty())
221 .or_array(Collection::empty())
222 .or_undefined(),
223 None,
224 );
225
226 if let Some(message_key) = log_schema().message_key() {
227 definition.with_event_field(
228 message_key,
229 Kind::bytes().or_undefined(),
230 Some(meaning::MESSAGE),
231 )
232 } else {
233 definition
234 }
235 }
236 LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
237 Kind::bytes().or_object(Collection::empty()),
238 [log_namespace],
239 )
240 .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
241 }
242 .with_standard_vector_source_metadata()
243 .with_source_metadata(
244 SplunkConfig::NAME,
245 log_schema()
246 .host_key()
247 .cloned()
248 .map(LegacyKey::InsertIfEmpty),
249 &owned_value_path!("host"),
250 Kind::bytes(),
251 Some(meaning::HOST),
252 )
253 .with_source_metadata(
254 SplunkConfig::NAME,
255 Some(LegacyKey::Overwrite(owned_value_path!(CHANNEL))),
256 &owned_value_path!("channel"),
257 Kind::bytes(),
258 None,
259 )
260 .with_source_metadata(
261 SplunkConfig::NAME,
262 Some(LegacyKey::Overwrite(owned_value_path!(INDEX))),
263 &owned_value_path!("index"),
264 Kind::bytes(),
265 None,
266 )
267 .with_source_metadata(
268 SplunkConfig::NAME,
269 Some(LegacyKey::Overwrite(owned_value_path!(SOURCE))),
270 &owned_value_path!("source"),
271 Kind::bytes(),
272 Some(meaning::SERVICE),
273 )
274 .with_source_metadata(
276 SplunkConfig::NAME,
277 Some(LegacyKey::Overwrite(owned_value_path!(SOURCETYPE))),
278 &owned_value_path!("sourcetype"),
279 Kind::bytes(),
280 None,
281 );
282
283 vec![SourceOutput::new_maybe_logs(
284 DataType::Log,
285 schema_definition,
286 )]
287 }
288
289 fn resources(&self) -> Vec<Resource> {
290 vec![Resource::tcp(self.address)]
291 }
292
293 fn can_acknowledge(&self) -> bool {
294 true
295 }
296}
297
298struct SplunkSource {
300 valid_credentials: Vec<String>,
301 protocol: &'static str,
302 idx_ack: Option<Arc<IndexerAcknowledgement>>,
303 store_hec_token: bool,
304 log_namespace: LogNamespace,
305 events_received: Registered<EventsReceived>,
306}
307
308impl SplunkSource {
309 fn new(config: &SplunkConfig, protocol: &'static str, cx: SourceContext) -> Self {
310 let log_namespace = cx.log_namespace(config.log_namespace);
311 let acknowledgements = cx.do_acknowledgements(config.acknowledgements.enabled.into());
312 let shutdown = cx.shutdown;
313 let valid_tokens = config
314 .valid_tokens
315 .iter()
316 .flatten()
317 .chain(config.token.iter());
318
319 let idx_ack = acknowledgements.then(|| {
320 Arc::new(IndexerAcknowledgement::new(
321 config.acknowledgements.clone(),
322 shutdown,
323 ))
324 });
325
326 SplunkSource {
327 valid_credentials: valid_tokens
328 .map(|token| format!("Splunk {}", token.inner()))
329 .collect(),
330 protocol,
331 idx_ack,
332 store_hec_token: config.store_hec_token,
333 log_namespace,
334 events_received: register!(EventsReceived),
335 }
336 }
337
338 fn event_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
339 let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
340 .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
341 let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
342
343 let splunk_channel = splunk_channel_header
344 .and(splunk_channel_query_param)
345 .map(|header: Option<String>, query_param| header.or(query_param));
346
347 let protocol = self.protocol;
348 let idx_ack = self.idx_ack.clone();
349 let store_hec_token = self.store_hec_token;
350 let log_namespace = self.log_namespace;
351 let events_received = self.events_received.clone();
352
353 warp::post()
354 .and(
355 path!("event")
356 .or(path!("event" / "1.0"))
357 .or(warp::path::end()),
358 )
359 .and(self.authorization())
360 .and(splunk_channel)
361 .and(warp::addr::remote())
362 .and(warp::header::optional::<String>("X-Forwarded-For"))
363 .and(self.gzip())
364 .and(warp::body::bytes())
365 .and(warp::path::full())
366 .and_then(
367 move |_,
368 token: Option<String>,
369 channel: Option<String>,
370 remote: Option<SocketAddr>,
371 remote_addr: Option<String>,
372 gzip: bool,
373 body: Bytes,
374 path: warp::path::FullPath| {
375 let mut out = out.clone();
376 let idx_ack = idx_ack.clone();
377 let events_received = events_received.clone();
378
379 async move {
380 if idx_ack.is_some() && channel.is_none() {
381 return Err(Rejection::from(ApiError::MissingChannel));
382 }
383
384 let mut data = Vec::new();
385 let (byte_size, body) = if gzip {
386 MultiGzDecoder::new(body.reader())
387 .read_to_end(&mut data)
388 .map_err(|_| Rejection::from(ApiError::BadRequest))?;
389 (data.len(), String::from_utf8_lossy(data.as_slice()))
390 } else {
391 (body.len(), String::from_utf8_lossy(body.as_ref()))
392 };
393 emit!(HttpBytesReceived {
394 byte_size,
395 http_path: path.as_str(),
396 protocol,
397 });
398
399 let (batch, receiver) =
400 BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
401 let maybe_ack_id = match (idx_ack, receiver, channel.clone()) {
402 (Some(idx_ack), Some(receiver), Some(channel_id)) => {
403 match idx_ack.get_ack_id_from_channel(channel_id, receiver).await {
404 Ok(ack_id) => Some(ack_id),
405 Err(rej) => return Err(rej),
406 }
407 }
408 _ => None,
409 };
410
411 let mut error = None;
412 let mut events = Vec::new();
413
414 let iter: EventIterator<'_, StrRead<'_>> = EventIteratorGenerator {
415 deserializer: Deserializer::from_str(&body).into_iter::<JsonValue>(),
416 channel,
417 remote,
418 remote_addr,
419 batch,
420 token: token.filter(|_| store_hec_token).map(Into::into),
421 log_namespace,
422 events_received,
423 }
424 .into();
425
426 for result in iter {
427 match result {
428 Ok(event) => events.push(event),
429 Err(err) => {
430 error = Some(err);
431 break;
432 }
433 }
434 }
435
436 if !events.is_empty() {
437 match out.send_batch(events).await {
438 Ok(()) => (),
439 Err(SendError::Closed) => {
440 return Err(Rejection::from(ApiError::ServerShutdown));
441 }
442 Err(SendError::Timeout) => {
443 unreachable!("No timeout is configured for this source.")
444 }
445 }
446 }
447
448 if let Some(error) = error {
449 Err(error)
450 } else {
451 Ok(maybe_ack_id)
452 }
453 }
454 },
455 )
456 .map(finish_ok)
457 .boxed()
458 }
459
460 fn raw_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
461 let protocol = self.protocol;
462 let idx_ack = self.idx_ack.clone();
463 let store_hec_token = self.store_hec_token;
464 let events_received = self.events_received.clone();
465 let log_namespace = self.log_namespace;
466
467 warp::post()
468 .and(path!("raw" / "1.0").or(path!("raw")))
469 .and(self.authorization())
470 .and(SplunkSource::required_channel())
471 .and(warp::addr::remote())
472 .and(warp::header::optional::<String>("X-Forwarded-For"))
473 .and(self.gzip())
474 .and(warp::body::bytes())
475 .and(warp::path::full())
476 .and_then(
477 move |_,
478 token: Option<String>,
479 channel_id: String,
480 remote: Option<SocketAddr>,
481 xff: Option<String>,
482 gzip: bool,
483 body: Bytes,
484 path: warp::path::FullPath| {
485 let mut out = out.clone();
486 let idx_ack = idx_ack.clone();
487 let events_received = events_received.clone();
488 emit!(HttpBytesReceived {
489 byte_size: body.len(),
490 http_path: path.as_str(),
491 protocol,
492 });
493
494 async move {
495 let (batch, receiver) =
496 BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
497 let maybe_ack_id = match (idx_ack, receiver) {
498 (Some(idx_ack), Some(receiver)) => Some(
499 idx_ack
500 .get_ack_id_from_channel(channel_id.clone(), receiver)
501 .await?,
502 ),
503 _ => None,
504 };
505 let mut event = raw_event(
506 body,
507 gzip,
508 channel_id,
509 remote,
510 xff,
511 batch,
512 log_namespace,
513 &events_received,
514 )?;
515 if let Some(token) = token.filter(|_| store_hec_token) {
516 event.metadata_mut().set_splunk_hec_token(token.into());
517 }
518
519 let res = out.send_event(event).await;
520 res.map(|_| maybe_ack_id)
521 .map_err(|_| Rejection::from(ApiError::ServerShutdown))
522 }
523 },
524 )
525 .map(finish_ok)
526 .boxed()
527 }
528
529 fn health_service(&self) -> BoxedFilter<(Response,)> {
530 warp::get()
537 .and(path!("health" / "1.0").or(path!("health")))
538 .map(move |_| {
539 http::Response::builder()
540 .header(http::header::CONTENT_TYPE, "application/json")
541 .body(hyper::Body::from(r#"{"text":"HEC is healthy","code":17}"#))
542 .expect("static response")
543 })
544 .boxed()
545 }
546
547 fn lenient_json_content_type_check<T>() -> impl Filter<Extract = (T,), Error = Rejection> + Clone
548 where
549 T: Send + DeserializeOwned + 'static,
550 {
551 warp::header::optional::<HeaderValue>(CONTENT_TYPE.as_str())
552 .and(warp::body::bytes())
553 .and_then(
554 |ctype: Option<HeaderValue>, body: bytes::Bytes| async move {
555 let ok = ctype
556 .as_ref()
557 .and_then(|v| v.to_str().ok())
558 .map(|h| h.to_ascii_lowercase().contains("application/json"))
559 .unwrap_or(true);
560
561 if !ok {
562 return Err(warp::reject::custom(ApiError::UnsupportedContentType));
563 }
564
565 let value = serde_json::from_slice::<T>(&body)
566 .map_err(|_| warp::reject::custom(ApiError::BadRequest))?;
567
568 Ok(value)
569 },
570 )
571 }
572
573 fn ack_service(&self) -> BoxedFilter<(Response,)> {
574 let idx_ack = self.idx_ack.clone();
575
576 warp::post()
577 .and(warp::path!("ack"))
578 .and(self.authorization())
579 .and(SplunkSource::required_channel())
580 .and(Self::lenient_json_content_type_check::<HecAckStatusRequest>())
581 .and_then(move |_, channel: String, req: HecAckStatusRequest| {
582 let idx_ack = idx_ack.clone();
583 async move {
584 if let Some(idx_ack) = idx_ack {
585 let acks = idx_ack
586 .get_acks_status_from_channel(channel, &req.acks)
587 .await?;
588 Ok(warp::reply::json(&HecAckStatusResponse { acks }).into_response())
589 } else {
590 Err(warp::reject::custom(ApiError::AckIsDisabled))
591 }
592 }
593 })
594 .boxed()
595 }
596
597 fn options() -> BoxedFilter<(Response,)> {
598 let post = warp::options()
599 .and(
600 path!("event")
601 .or(path!("event" / "1.0"))
602 .or(path!("raw" / "1.0"))
603 .or(path!("raw")),
604 )
605 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
606
607 let get = warp::options()
608 .and(path!("health").or(path!("health" / "1.0")))
609 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
610
611 post.or(get).unify().boxed()
612 }
613
614 fn authorization(&self) -> BoxedFilter<(Option<String>,)> {
616 let valid_credentials = self.valid_credentials.clone();
617 warp::header::optional("Authorization")
618 .and_then(move |token: Option<String>| {
619 let valid_credentials = valid_credentials.clone();
620 async move {
621 match (token, valid_credentials.is_empty()) {
622 (token, true) => {
625 Ok(token
626 .map(|t| t.strip_prefix("Splunk ").map(Into::into).unwrap_or(t)))
627 }
628 (Some(token), false) if valid_credentials.contains(&token) => Ok(Some(
629 token
630 .strip_prefix("Splunk ")
631 .map(Into::into)
632 .unwrap_or(token),
633 )),
634 (Some(_), false) => Err(Rejection::from(ApiError::InvalidAuthorization)),
635 (None, false) => Err(Rejection::from(ApiError::MissingAuthorization)),
636 }
637 }
638 })
639 .boxed()
640 }
641
642 fn gzip(&self) -> BoxedFilter<(bool,)> {
644 warp::header::optional::<String>("Content-Encoding")
645 .and_then(|encoding: Option<String>| async move {
646 match encoding {
647 Some(s) if s.as_bytes() == b"gzip" => Ok(true),
648 Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
649 None => Ok(false),
650 }
651 })
652 .boxed()
653 }
654
655 fn required_channel() -> BoxedFilter<(String,)> {
656 let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
657 .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
658 let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
659
660 splunk_channel_header
661 .and(splunk_channel_query_param)
662 .and_then(|header: Option<String>, query_param| async move {
663 header
664 .or(query_param)
665 .ok_or_else(|| Rejection::from(ApiError::MissingChannel))
666 })
667 .boxed()
668 }
669}
670struct EventIterator<'de, R: JsonRead<'de>> {
673 deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
675 events: usize,
677 channel: Option<Value>,
679 time: Time,
681 extractors: [DefaultExtractor; 4],
683 batch: Option<BatchNotifier>,
685 token: Option<Arc<str>>,
687 log_namespace: LogNamespace,
689 events_received: Registered<EventsReceived>,
691}
692
693struct EventIteratorGenerator<'de, R: JsonRead<'de>> {
695 deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
696 channel: Option<String>,
697 batch: Option<BatchNotifier>,
698 token: Option<Arc<str>>,
699 log_namespace: LogNamespace,
700 events_received: Registered<EventsReceived>,
701 remote: Option<SocketAddr>,
702 remote_addr: Option<String>,
703}
704
705impl<'de, R: JsonRead<'de>> From<EventIteratorGenerator<'de, R>> for EventIterator<'de, R> {
706 fn from(f: EventIteratorGenerator<'de, R>) -> Self {
707 Self {
708 deserializer: f.deserializer,
709 events: 0,
710 channel: f.channel.map(Value::from),
711 time: Time::Now(Utc::now()),
712 extractors: [
713 DefaultExtractor::new_with(
718 "host",
719 log_schema().host_key().cloned().into(),
720 f.remote_addr
721 .or_else(|| f.remote.map(|addr| addr.to_string()))
722 .map(Value::from),
723 f.log_namespace,
724 ),
725 DefaultExtractor::new("index", OptionalValuePath::new(INDEX), f.log_namespace),
726 DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), f.log_namespace),
727 DefaultExtractor::new(
728 "sourcetype",
729 OptionalValuePath::new(SOURCETYPE),
730 f.log_namespace,
731 ),
732 ],
733 batch: f.batch,
734 token: f.token,
735 log_namespace: f.log_namespace,
736 events_received: f.events_received,
737 }
738 }
739}
740
741impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
742 fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
743 let mut log = match self.log_namespace {
745 LogNamespace::Vector => self.build_log_vector(&mut json)?,
746 LogNamespace::Legacy => self.build_log_legacy(&mut json)?,
747 };
748
749 self.log_namespace.insert_vector_metadata(
751 &mut log,
752 log_schema().source_type_key(),
753 &owned_value_path!("source_type"),
754 SplunkConfig::NAME,
755 );
756
757 let channel_path = owned_value_path!(CHANNEL);
759 if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
760 self.log_namespace.insert_source_metadata(
761 SplunkConfig::NAME,
762 &mut log,
763 Some(LegacyKey::Overwrite(&channel_path)),
764 lookup::path!(CHANNEL),
765 guid,
766 );
767 } else if let Some(guid) = self.channel.as_ref() {
768 self.log_namespace.insert_source_metadata(
769 SplunkConfig::NAME,
770 &mut log,
771 Some(LegacyKey::Overwrite(&channel_path)),
772 lookup::path!(CHANNEL),
773 guid.clone(),
774 );
775 }
776
777 if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
779 for (key, value) in object {
780 self.log_namespace.insert_source_metadata(
781 SplunkConfig::NAME,
782 &mut log,
783 Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
784 lookup::path!(key.as_str()),
785 value,
786 );
787 }
788 }
789
790 let parsed_time = match json.get_mut("time").map(JsonValue::take) {
792 Some(JsonValue::Number(time)) => Some(Some(time)),
793 Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
794 _ => None,
795 };
796
797 match parsed_time {
798 None => (),
799 Some(Some(t)) => {
800 if let Some(t) = t.as_u64() {
801 let time = parse_timestamp(t as i64)
802 .ok_or(ApiError::InvalidDataFormat { event: self.events })?;
803
804 self.time = Time::Provided(time);
805 } else if let Some(t) = t.as_f64() {
806 self.time = Time::Provided(
807 Utc.timestamp_opt(
808 t.floor() as i64,
809 (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
810 )
811 .single()
812 .expect("invalid timestamp"),
813 );
814 } else {
815 return Err(ApiError::InvalidDataFormat { event: self.events }.into());
816 }
817 }
818 Some(None) => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
819 }
820
821 let timestamp = match self.time.clone() {
823 Time::Provided(time) => time,
824 Time::Now(time) => time,
825 };
826
827 self.log_namespace.insert_source_metadata(
828 SplunkConfig::NAME,
829 &mut log,
830 log_schema().timestamp_key().map(LegacyKey::Overwrite),
831 lookup::path!("timestamp"),
832 timestamp,
833 );
834
835 for de in self.extractors.iter_mut() {
837 de.extract(&mut log, &mut json);
838 }
839
840 if let Some(token) = &self.token {
842 log.metadata_mut().set_splunk_hec_token(Arc::clone(token));
843 }
844
845 if let Some(batch) = self.batch.clone() {
846 log = log.with_batch_notifier(&batch);
847 }
848
849 self.events += 1;
850
851 Ok(log.into())
852 }
853
854 fn build_log_vector(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
858 match json.get("event") {
859 Some(event) => {
860 let event: Value = event.into();
861 let mut log = LogEvent::from(event);
862
863 self.events_received
865 .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
866
867 self.log_namespace.insert_vector_metadata(
869 &mut log,
870 log_schema().timestamp_key(),
871 lookup::path!("ingest_timestamp"),
872 chrono::Utc::now(),
873 );
874
875 Ok(log)
876 }
877 None => Err(ApiError::MissingEventField { event: self.events }.into()),
878 }
879 }
880
881 fn build_log_legacy(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
886 let mut log = LogEvent::default();
887 match json.get_mut("event") {
888 Some(event) => match event.take() {
889 JsonValue::String(string) => {
890 if string.is_empty() {
891 return Err(ApiError::EmptyEventField { event: self.events }.into());
892 }
893 log.maybe_insert(log_schema().message_key_target_path(), string);
894 }
895 JsonValue::Object(mut object) => {
896 if object.is_empty() {
897 return Err(ApiError::EmptyEventField { event: self.events }.into());
898 }
899
900 if let Some(line) = object.remove("line") {
902 match line {
903 JsonValue::Array(_) | JsonValue::Object(_) => {
905 log.insert(event_path!("line"), line);
906 }
907 _ => {
908 log.maybe_insert(log_schema().message_key_target_path(), line);
909 }
910 }
911 }
912
913 for (key, value) in object {
914 log.insert(event_path!(key.as_str()), value);
915 }
916 }
917 _ => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
918 },
919 None => return Err(ApiError::MissingEventField { event: self.events }.into()),
920 };
921
922 self.events_received
924 .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
925
926 Ok(log)
927 }
928}
929
930impl<'de, R: JsonRead<'de>> Iterator for EventIterator<'de, R> {
931 type Item = Result<Event, Rejection>;
932
933 fn next(&mut self) -> Option<Self::Item> {
934 match self.deserializer.next() {
935 Some(Ok(json)) => Some(self.build_event(json)),
936 None => {
937 if self.events == 0 {
938 Some(Err(ApiError::NoData.into()))
939 } else {
940 None
941 }
942 }
943 Some(Err(error)) => {
944 emit!(SplunkHecRequestBodyInvalidError {
945 error: error.into()
946 });
947 Some(Err(
948 ApiError::InvalidDataFormat { event: self.events }.into()
949 ))
950 }
951 }
952 }
953}
954
955fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
966 const SEC_CUTOFF: i64 = 13569465600;
968 const MILLISEC_CUTOFF: i64 = 253402300800000;
970
971 if t < 0 {
973 return None;
974 }
975
976 let ts = if t < SEC_CUTOFF {
977 Utc.timestamp_opt(t, 0).single().expect("invalid timestamp")
978 } else if t < MILLISEC_CUTOFF {
979 Utc.timestamp_millis_opt(t)
980 .single()
981 .expect("invalid timestamp")
982 } else {
983 Utc.timestamp_nanos(t)
984 };
985
986 Some(ts)
987}
988
989struct DefaultExtractor {
991 field: &'static str,
992 to_field: OptionalValuePath,
993 value: Option<Value>,
994 log_namespace: LogNamespace,
995}
996
997impl DefaultExtractor {
998 const fn new(
999 field: &'static str,
1000 to_field: OptionalValuePath,
1001 log_namespace: LogNamespace,
1002 ) -> Self {
1003 DefaultExtractor {
1004 field,
1005 to_field,
1006 value: None,
1007 log_namespace,
1008 }
1009 }
1010
1011 fn new_with(
1012 field: &'static str,
1013 to_field: OptionalValuePath,
1014 value: impl Into<Option<Value>>,
1015 log_namespace: LogNamespace,
1016 ) -> Self {
1017 DefaultExtractor {
1018 field,
1019 to_field,
1020 value: value.into(),
1021 log_namespace,
1022 }
1023 }
1024
1025 fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
1026 if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
1028 self.value = Some(new_value.into());
1029 }
1030
1031 if let Some(index) = self.value.as_ref()
1033 && let Some(metadata_key) = self.to_field.path.as_ref()
1034 {
1035 self.log_namespace.insert_source_metadata(
1036 SplunkConfig::NAME,
1037 log,
1038 Some(LegacyKey::Overwrite(metadata_key)),
1039 &self.to_field.path.clone().unwrap_or(owned_value_path!("")),
1040 index.clone(),
1041 )
1042 }
1043 }
1044}
1045
1046#[derive(Clone, Debug)]
1048enum Time {
1049 Now(DateTime<Utc>),
1051 Provided(DateTime<Utc>),
1053}
1054
1055#[allow(clippy::too_many_arguments)]
1057fn raw_event(
1058 bytes: Bytes,
1059 gzip: bool,
1060 channel: String,
1061 remote: Option<SocketAddr>,
1062 xff: Option<String>,
1063 batch: Option<BatchNotifier>,
1064 log_namespace: LogNamespace,
1065 events_received: &Registered<EventsReceived>,
1066) -> Result<Event, Rejection> {
1067 let message: Value = if gzip {
1069 let mut data = Vec::new();
1070 match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut data) {
1071 Ok(0) => return Err(ApiError::NoData.into()),
1072 Ok(_) => Value::from(Bytes::from(data)),
1073 Err(error) => {
1074 emit!(SplunkHecRequestBodyInvalidError { error });
1075 return Err(ApiError::InvalidDataFormat { event: 0 }.into());
1076 }
1077 }
1078 } else {
1079 bytes.into()
1080 };
1081
1082 let mut log = match log_namespace {
1084 LogNamespace::Vector => LogEvent::from(message),
1085 LogNamespace::Legacy => {
1086 let mut log = LogEvent::default();
1087 log.maybe_insert(log_schema().message_key_target_path(), message);
1088 log
1089 }
1090 };
1091 events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1093
1094 log_namespace.insert_source_metadata(
1096 SplunkConfig::NAME,
1097 &mut log,
1098 Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
1099 lookup::path!(CHANNEL),
1100 channel,
1101 );
1102
1103 let host = if let Some(remote_address) = xff {
1107 Some(remote_address)
1108 } else {
1109 remote.map(|remote| remote.to_string())
1110 };
1111
1112 if let Some(host) = host {
1113 log_namespace.insert_source_metadata(
1114 SplunkConfig::NAME,
1115 &mut log,
1116 log_schema().host_key().map(LegacyKey::InsertIfEmpty),
1117 lookup::path!("host"),
1118 host,
1119 );
1120 }
1121
1122 log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME, Utc::now());
1123
1124 if let Some(batch) = batch {
1125 log = log.with_batch_notifier(&batch);
1126 }
1127
1128 Ok(Event::from(log))
1129}
1130
1131#[derive(Clone, Copy, Debug, Snafu)]
1132pub(crate) enum ApiError {
1133 MissingAuthorization,
1134 InvalidAuthorization,
1135 UnsupportedEncoding,
1136 UnsupportedContentType,
1137 MissingChannel,
1138 NoData,
1139 InvalidDataFormat { event: usize },
1140 ServerShutdown,
1141 EmptyEventField { event: usize },
1142 MissingEventField { event: usize },
1143 BadRequest,
1144 ServiceUnavailable,
1145 AckIsDisabled,
1146}
1147
1148impl warp::reject::Reject for ApiError {}
1149
1150mod splunk_response {
1152 use serde::Serialize;
1153
1154 pub enum HecStatusCode {
1156 Success = 0,
1157 TokenIsRequired = 2,
1158 InvalidAuthorization = 3,
1159 NoData = 5,
1160 InvalidDataFormat = 6,
1161 ServerIsBusy = 9,
1162 DataChannelIsMissing = 10,
1163 EventFieldIsRequired = 12,
1164 EventFieldCannotBeBlank = 13,
1165 AckIsDisabled = 14,
1166 }
1167
1168 #[derive(Serialize)]
1169 pub enum HecResponseMetadata {
1170 #[serde(rename = "ackId")]
1171 AckId(u64),
1172 #[serde(rename = "invalid-event-number")]
1173 InvalidEventNumber(usize),
1174 }
1175
1176 #[derive(Serialize)]
1177 pub struct HecResponse {
1178 text: &'static str,
1179 code: u8,
1180 #[serde(skip_serializing_if = "Option::is_none", flatten)]
1181 pub metadata: Option<HecResponseMetadata>,
1182 }
1183
1184 impl HecResponse {
1185 pub const fn new(code: HecStatusCode) -> Self {
1186 let text = match code {
1187 HecStatusCode::Success => "Success",
1188 HecStatusCode::TokenIsRequired => "Token is required",
1189 HecStatusCode::InvalidAuthorization => "Invalid authorization",
1190 HecStatusCode::NoData => "No data",
1191 HecStatusCode::InvalidDataFormat => "Invalid data format",
1192 HecStatusCode::DataChannelIsMissing => "Data channel is missing",
1193 HecStatusCode::EventFieldIsRequired => "Event field is required",
1194 HecStatusCode::EventFieldCannotBeBlank => "Event field cannot be blank",
1195 HecStatusCode::ServerIsBusy => "Server is busy",
1196 HecStatusCode::AckIsDisabled => "Ack is disabled",
1197 };
1198
1199 Self {
1200 text,
1201 code: code as u8,
1202 metadata: None,
1203 }
1204 }
1205
1206 pub const fn with_metadata(mut self, metadata: HecResponseMetadata) -> Self {
1207 self.metadata = Some(metadata);
1208 self
1209 }
1210 }
1211
1212 pub const INVALID_AUTHORIZATION: HecResponse =
1213 HecResponse::new(HecStatusCode::InvalidAuthorization);
1214 pub const TOKEN_IS_REQUIRED: HecResponse = HecResponse::new(HecStatusCode::TokenIsRequired);
1215 pub const NO_DATA: HecResponse = HecResponse::new(HecStatusCode::NoData);
1216 pub const SUCCESS: HecResponse = HecResponse::new(HecStatusCode::Success);
1217 pub const SERVER_IS_BUSY: HecResponse = HecResponse::new(HecStatusCode::ServerIsBusy);
1218 pub const NO_CHANNEL: HecResponse = HecResponse::new(HecStatusCode::DataChannelIsMissing);
1219 pub const ACK_IS_DISABLED: HecResponse = HecResponse::new(HecStatusCode::AckIsDisabled);
1220}
1221
1222fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
1223 let body = if let Some(ack_id) = maybe_ack_id {
1224 HecResponse::new(HecStatusCode::Success).with_metadata(HecResponseMetadata::AckId(ack_id))
1225 } else {
1226 splunk_response::SUCCESS
1227 };
1228 response_json(StatusCode::OK, body)
1229}
1230
1231fn response_plain(code: StatusCode, msg: &'static str) -> Response {
1232 warp::reply::with_status(
1233 warp::reply::with_header(msg, http::header::CONTENT_TYPE, "text/plain; charset=utf-8"),
1234 code,
1235 )
1236 .into_response()
1237}
1238
1239async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
1240 if let Some(&error) = rejection.find::<ApiError>() {
1241 emit!(SplunkHecRequestError { error });
1242 Ok((match error {
1243 ApiError::MissingAuthorization => {
1244 response_json(StatusCode::UNAUTHORIZED, splunk_response::TOKEN_IS_REQUIRED)
1245 }
1246 ApiError::InvalidAuthorization => response_json(
1247 StatusCode::UNAUTHORIZED,
1248 splunk_response::INVALID_AUTHORIZATION,
1249 ),
1250 ApiError::UnsupportedEncoding => empty_response(StatusCode::UNSUPPORTED_MEDIA_TYPE),
1251 ApiError::UnsupportedContentType => response_plain(
1252 StatusCode::UNSUPPORTED_MEDIA_TYPE,
1253 "The request's content-type is not supported",
1254 ),
1255 ApiError::MissingChannel => {
1256 response_json(StatusCode::BAD_REQUEST, splunk_response::NO_CHANNEL)
1257 }
1258 ApiError::NoData => response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA),
1259 ApiError::ServerShutdown => empty_response(StatusCode::SERVICE_UNAVAILABLE),
1260 ApiError::InvalidDataFormat { event } => response_json(
1261 StatusCode::BAD_REQUEST,
1262 HecResponse::new(HecStatusCode::InvalidDataFormat)
1263 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1264 ),
1265 ApiError::EmptyEventField { event } => response_json(
1266 StatusCode::BAD_REQUEST,
1267 HecResponse::new(HecStatusCode::EventFieldCannotBeBlank)
1268 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1269 ),
1270 ApiError::MissingEventField { event } => response_json(
1271 StatusCode::BAD_REQUEST,
1272 HecResponse::new(HecStatusCode::EventFieldIsRequired)
1273 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1274 ),
1275 ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
1276 ApiError::ServiceUnavailable => response_json(
1277 StatusCode::SERVICE_UNAVAILABLE,
1278 splunk_response::SERVER_IS_BUSY,
1279 ),
1280 ApiError::AckIsDisabled => {
1281 response_json(StatusCode::BAD_REQUEST, splunk_response::ACK_IS_DISABLED)
1282 }
1283 },))
1284 } else {
1285 Err(rejection)
1286 }
1287}
1288
1289fn empty_response(code: StatusCode) -> Response {
1291 let mut res = Response::default();
1292 *res.status_mut() = code;
1293 res
1294}
1295
1296fn response_json(code: StatusCode, body: impl Serialize) -> Response {
1298 warp::reply::with_status(warp::reply::json(&body), code).into_response()
1299}
1300
1301#[cfg(feature = "sinks-splunk_hec")]
1302#[cfg(test)]
1303mod tests {
1304 use std::{net::SocketAddr, num::NonZeroU64};
1305
1306 use chrono::{TimeZone, Utc};
1307 use futures_util::Stream;
1308 use http::Uri;
1309 use reqwest::{RequestBuilder, Response};
1310 use serde::Deserialize;
1311 use vector_lib::{
1312 codecs::{
1313 BytesDecoderConfig, JsonSerializerConfig, TextSerializerConfig,
1314 decoding::DeserializerConfig,
1315 },
1316 event::EventStatus,
1317 schema::Definition,
1318 sensitive_string::SensitiveString,
1319 };
1320 use vrl::path::PathPrefix;
1321
1322 use super::*;
1323 use crate::{
1324 SourceSender,
1325 codecs::{DecodingConfig, EncodingConfig},
1326 components::validation::prelude::*,
1327 config::{SinkConfig, SinkContext, SourceConfig, SourceContext, log_schema},
1328 event::{Event, LogEvent},
1329 sinks::{
1330 Healthcheck, VectorSink,
1331 splunk_hec::logs::config::HecLogsSinkConfig,
1332 util::{BatchConfig, Compression, TowerRequestConfig},
1333 },
1334 sources::splunk_hec::acknowledgements::{HecAckStatusRequest, HecAckStatusResponse},
1335 test_util::{
1336 addr::{PortGuard, next_addr},
1337 collect_n,
1338 components::{
1339 COMPONENT_ERROR_TAGS, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance,
1340 assert_source_error,
1341 },
1342 wait_for_tcp,
1343 },
1344 };
1345
1346 #[test]
1347 fn generate_config() {
1348 crate::test_util::test_generate_config::<SplunkConfig>();
1349 }
1350
1351 const TOKEN: &str = "token";
1353 const VALID_TOKENS: &[&str; 2] = &[TOKEN, "secondary-token"];
1354
1355 async fn source(
1356 acknowledgements: Option<HecAcknowledgementsConfig>,
1357 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
1358 source_with(Some(TOKEN.to_owned().into()), None, acknowledgements, false).await
1359 }
1360
1361 async fn source_with(
1362 token: Option<SensitiveString>,
1363 valid_tokens: Option<&[&str]>,
1364 acknowledgements: Option<HecAcknowledgementsConfig>,
1365 store_hec_token: bool,
1366 ) -> (
1367 impl Stream<Item = Event> + Unpin + use<>,
1368 SocketAddr,
1369 PortGuard,
1370 ) {
1371 let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1372 let (_guard, address) = next_addr();
1373 let valid_tokens =
1374 valid_tokens.map(|tokens| tokens.iter().map(|v| v.to_string().into()).collect());
1375 let cx = SourceContext::new_test(sender, None);
1376 tokio::spawn(async move {
1377 SplunkConfig {
1378 address,
1379 token,
1380 valid_tokens,
1381 tls: None,
1382 acknowledgements: acknowledgements.unwrap_or_default(),
1383 store_hec_token,
1384 log_namespace: None,
1385 keepalive: Default::default(),
1386 }
1387 .build(cx)
1388 .await
1389 .unwrap()
1390 .await
1391 .unwrap()
1392 });
1393 wait_for_tcp(address).await;
1394 (recv, address, _guard)
1395 }
1396
1397 async fn sink(
1398 address: SocketAddr,
1399 encoding: EncodingConfig,
1400 compression: Compression,
1401 ) -> (VectorSink, Healthcheck) {
1402 HecLogsSinkConfig {
1403 default_token: TOKEN.to_owned().into(),
1404 endpoint: format!("http://{address}"),
1405 host_key: None,
1406 indexed_fields: vec![],
1407 index: None,
1408 sourcetype: None,
1409 source: None,
1410 encoding,
1411 compression,
1412 batch: BatchConfig::default(),
1413 request: TowerRequestConfig::default(),
1414 tls: None,
1415 acknowledgements: Default::default(),
1416 timestamp_nanos_key: None,
1417 timestamp_key: None,
1418 auto_extract_timestamp: None,
1419 endpoint_target: Default::default(),
1420 }
1421 .build(SinkContext::default())
1422 .await
1423 .unwrap()
1424 }
1425
1426 async fn start(
1427 encoding: EncodingConfig,
1428 compression: Compression,
1429 acknowledgements: Option<HecAcknowledgementsConfig>,
1430 ) -> (VectorSink, impl Stream<Item = Event> + Unpin) {
1431 let (source, address, _guard) = source(acknowledgements).await;
1432 let (sink, health) = sink(address, encoding, compression).await;
1433 assert!(health.await.is_ok());
1434 (sink, source)
1435 }
1436
1437 async fn channel_n(
1438 messages: Vec<impl Into<String> + Send + 'static>,
1439 sink: VectorSink,
1440 source: impl Stream<Item = Event> + Unpin,
1441 ) -> Vec<Event> {
1442 let n = messages.len();
1443
1444 tokio::spawn(async move {
1445 sink.run_events(
1446 messages
1447 .into_iter()
1448 .map(|s| Event::Log(LogEvent::from(s.into()))),
1449 )
1450 .await
1451 .unwrap();
1452 });
1453
1454 let events = collect_n(source, n).await;
1455 assert_eq!(n, events.len());
1456
1457 events
1458 }
1459
1460 #[derive(Clone, Copy, Debug)]
1461 enum Channel<'a> {
1462 Header(&'a str),
1463 QueryParam(&'a str),
1464 }
1465
1466 #[derive(Default)]
1467 struct SendWithOpts<'a> {
1468 channel: Option<Channel<'a>>,
1469 forwarded_for: Option<String>,
1470 }
1471
1472 async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
1473 let channel = Channel::Header("channel");
1474 let options = SendWithOpts {
1475 channel: Some(channel),
1476 forwarded_for: None,
1477 };
1478 send_with(address, api, message, TOKEN, &options).await
1479 }
1480
1481 fn build_request(
1482 address: SocketAddr,
1483 api: &str,
1484 message: &str,
1485 token: &str,
1486 opts: &SendWithOpts<'_>,
1487 ) -> RequestBuilder {
1488 let mut b = reqwest::Client::new()
1489 .post(format!("http://{address}/{api}"))
1490 .header("Authorization", format!("Splunk {token}"));
1491
1492 b = match opts.channel {
1493 Some(c) => match c {
1494 Channel::Header(v) => b.header("x-splunk-request-channel", v),
1495 Channel::QueryParam(v) => b.query(&[("channel", v)]),
1496 },
1497 None => b,
1498 };
1499
1500 b = match &opts.forwarded_for {
1501 Some(f) => b.header("X-Forwarded-For", f),
1502 None => b,
1503 };
1504
1505 b.body(message.to_owned())
1506 }
1507
1508 async fn send_with(
1509 address: SocketAddr,
1510 api: &str,
1511 message: &str,
1512 token: &str,
1513 opts: &SendWithOpts<'_>,
1514 ) -> u16 {
1515 let b = build_request(address, api, message, token, opts);
1516 b.send().await.unwrap().status().as_u16()
1517 }
1518
1519 async fn send_with_response(
1520 address: SocketAddr,
1521 api: &str,
1522 message: &str,
1523 token: &str,
1524 opts: &SendWithOpts<'_>,
1525 ) -> Response {
1526 let b = build_request(address, api, message, token, opts);
1527 b.send().await.unwrap()
1528 }
1529
1530 #[tokio::test]
1531 async fn no_compression_text_event() {
1532 let message = "gzip_text_event";
1533 let (sink, source) = start(
1534 TextSerializerConfig::default().into(),
1535 Compression::None,
1536 None,
1537 )
1538 .await;
1539
1540 let event = channel_n(vec![message], sink, source).await.remove(0);
1541
1542 assert_eq!(
1543 event.as_log()[log_schema().message_key().unwrap().to_string()],
1544 message.into()
1545 );
1546 assert!(event.as_log().get_timestamp().is_some());
1547 assert_eq!(
1548 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1549 "splunk_hec".into()
1550 );
1551 assert!(event.metadata().splunk_hec_token().is_none());
1552 }
1553
1554 #[tokio::test]
1555 async fn one_simple_text_event() {
1556 let message = "one_simple_text_event";
1557 let (sink, source) = start(
1558 TextSerializerConfig::default().into(),
1559 Compression::gzip_default(),
1560 None,
1561 )
1562 .await;
1563
1564 let event = channel_n(vec![message], sink, source).await.remove(0);
1565
1566 assert_eq!(
1567 event.as_log()[log_schema().message_key().unwrap().to_string()],
1568 message.into()
1569 );
1570 assert!(event.as_log().get_timestamp().is_some());
1571 assert_eq!(
1572 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1573 "splunk_hec".into()
1574 );
1575 assert!(event.metadata().splunk_hec_token().is_none());
1576 }
1577
1578 #[tokio::test]
1579 async fn multiple_simple_text_event() {
1580 let n = 200;
1581 let (sink, source) = start(
1582 TextSerializerConfig::default().into(),
1583 Compression::None,
1584 None,
1585 )
1586 .await;
1587
1588 let messages = (0..n)
1589 .map(|i| format!("multiple_simple_text_event_{i}"))
1590 .collect::<Vec<_>>();
1591 let events = channel_n(messages.clone(), sink, source).await;
1592
1593 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1594 assert_eq!(
1595 event.as_log()[log_schema().message_key().unwrap().to_string()],
1596 msg.into()
1597 );
1598 assert!(event.as_log().get_timestamp().is_some());
1599 assert_eq!(
1600 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1601 "splunk_hec".into()
1602 );
1603 assert!(event.metadata().splunk_hec_token().is_none());
1604 }
1605 }
1606
1607 #[tokio::test]
1608 async fn one_simple_json_event() {
1609 let message = "one_simple_json_event";
1610 let (sink, source) = start(
1611 JsonSerializerConfig::default().into(),
1612 Compression::gzip_default(),
1613 None,
1614 )
1615 .await;
1616
1617 let event = channel_n(vec![message], sink, source).await.remove(0);
1618
1619 assert_eq!(
1620 event.as_log()[log_schema().message_key().unwrap().to_string()],
1621 message.into()
1622 );
1623 assert!(event.as_log().get_timestamp().is_some());
1624 assert_eq!(
1625 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1626 "splunk_hec".into()
1627 );
1628 assert!(event.metadata().splunk_hec_token().is_none());
1629 }
1630
1631 #[tokio::test]
1632 async fn multiple_simple_json_event() {
1633 let n = 200;
1634 let (sink, source) = start(
1635 JsonSerializerConfig::default().into(),
1636 Compression::gzip_default(),
1637 None,
1638 )
1639 .await;
1640
1641 let messages = (0..n)
1642 .map(|i| format!("multiple_simple_json_event{i}"))
1643 .collect::<Vec<_>>();
1644 let events = channel_n(messages.clone(), sink, source).await;
1645
1646 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1647 assert_eq!(
1648 event.as_log()[log_schema().message_key().unwrap().to_string()],
1649 msg.into()
1650 );
1651 assert!(event.as_log().get_timestamp().is_some());
1652 assert_eq!(
1653 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1654 "splunk_hec".into()
1655 );
1656 assert!(event.metadata().splunk_hec_token().is_none());
1657 }
1658 }
1659
1660 #[tokio::test]
1661 async fn json_event() {
1662 let (sink, source) = start(
1663 JsonSerializerConfig::default().into(),
1664 Compression::gzip_default(),
1665 None,
1666 )
1667 .await;
1668
1669 let mut log = LogEvent::default();
1670 log.insert("greeting", "hello");
1671 log.insert("name", "bob");
1672 sink.run_events(vec![log.into()]).await.unwrap();
1673
1674 let event = collect_n(source, 1).await.remove(0).into_log();
1675 assert_eq!(event["greeting"], "hello".into());
1676 assert_eq!(event["name"], "bob".into());
1677 assert!(event.get_timestamp().is_some());
1678 assert_eq!(
1679 event[log_schema().source_type_key().unwrap().to_string()],
1680 "splunk_hec".into()
1681 );
1682 assert!(event.metadata().splunk_hec_token().is_none());
1683 }
1684
1685 #[tokio::test]
1686 async fn json_invalid_path_event() {
1687 let (sink, source) = start(
1688 JsonSerializerConfig::default().into(),
1689 Compression::gzip_default(),
1690 None,
1691 )
1692 .await;
1693
1694 let mut log = LogEvent::default();
1695 log.insert(event_path!("(greeting | thing"), "hello");
1698 sink.run_events(vec![log.into()]).await.unwrap();
1699
1700 let event = collect_n(source, 1).await.remove(0).into_log();
1701 assert_eq!(
1702 event.get(event_path!("(greeting | thing")),
1703 Some(&Value::from("hello"))
1704 );
1705 }
1706
1707 #[tokio::test]
1708 async fn line_to_message() {
1709 let (sink, source) = start(
1710 JsonSerializerConfig::default().into(),
1711 Compression::gzip_default(),
1712 None,
1713 )
1714 .await;
1715
1716 let mut event = LogEvent::default();
1717 event.insert("line", "hello");
1718 sink.run_events(vec![event.into()]).await.unwrap();
1719
1720 let event = collect_n(source, 1).await.remove(0);
1721 assert_eq!(
1722 event.as_log()[log_schema().message_key().unwrap().to_string()],
1723 "hello".into()
1724 );
1725 assert!(event.metadata().splunk_hec_token().is_none());
1726 }
1727
1728 #[tokio::test]
1729 async fn raw() {
1730 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1731 let message = "raw";
1732 let (source, address, _guard) = source(None).await;
1733
1734 assert_eq!(200, post(address, "services/collector/raw", message).await);
1735
1736 let event = collect_n(source, 1).await.remove(0);
1737 assert_eq!(
1738 event.as_log()[log_schema().message_key().unwrap().to_string()],
1739 message.into()
1740 );
1741 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1742 assert!(event.as_log().get_timestamp().is_some());
1743 assert_eq!(
1744 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1745 "splunk_hec".into()
1746 );
1747 assert!(event.metadata().splunk_hec_token().is_none());
1748 })
1749 .await;
1750 }
1751
1752 #[tokio::test]
1753 async fn root() {
1754 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1755 let message = r#"{ "event": { "message": "root"} }"#;
1756 let (source, address, _guard) = source(None).await;
1757
1758 assert_eq!(200, post(address, "services/collector", message).await);
1759
1760 let event = collect_n(source, 1).await.remove(0);
1761 assert_eq!(
1762 event.as_log()[log_schema().message_key().unwrap().to_string()],
1763 "root".into()
1764 );
1765 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1766 assert!(event.as_log().get_timestamp().is_some());
1767 assert_eq!(
1768 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1769 "splunk_hec".into()
1770 );
1771 assert!(event.metadata().splunk_hec_token().is_none());
1772 })
1773 .await;
1774 }
1775
1776 #[tokio::test]
1777 async fn channel_header() {
1778 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1779 let message = "raw";
1780 let (source, address, _guard) = source(None).await;
1781
1782 let opts = SendWithOpts {
1783 channel: Some(Channel::Header("guid")),
1784 forwarded_for: None,
1785 };
1786
1787 assert_eq!(
1788 200,
1789 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1790 );
1791
1792 let event = collect_n(source, 1).await.remove(0);
1793 assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1794 })
1795 .await;
1796 }
1797
1798 #[tokio::test]
1799 async fn xff_header_raw() {
1800 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1801 let message = "raw";
1802 let (source, address, _guard) = source(None).await;
1803
1804 let opts = SendWithOpts {
1805 channel: Some(Channel::Header("guid")),
1806 forwarded_for: Some(String::from("10.0.0.1")),
1807 };
1808
1809 assert_eq!(
1810 200,
1811 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1812 );
1813
1814 let event = collect_n(source, 1).await.remove(0);
1815 assert_eq!(
1816 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1817 "10.0.0.1".into()
1818 );
1819 })
1820 .await;
1821 }
1822
1823 #[tokio::test]
1825 async fn xff_header_event_with_host_field() {
1826 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1827 let message = r#"{"event":"first", "host": "10.1.0.2"}"#;
1828 let (source, address, _guard) = source(None).await;
1829
1830 let opts = SendWithOpts {
1831 channel: Some(Channel::Header("guid")),
1832 forwarded_for: Some(String::from("10.0.0.1")),
1833 };
1834
1835 assert_eq!(
1836 200,
1837 send_with(address, "services/collector/event", message, TOKEN, &opts).await
1838 );
1839
1840 let event = collect_n(source, 1).await.remove(0);
1841 assert_eq!(
1842 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1843 "10.1.0.2".into()
1844 );
1845 })
1846 .await;
1847 }
1848
1849 #[tokio::test]
1851 async fn xff_header_event_without_host_field() {
1852 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1853 let message = r#"{"event":"first", "color": "blue"}"#;
1854 let (source, address, _guard) = source(None).await;
1855
1856 let opts = SendWithOpts {
1857 channel: Some(Channel::Header("guid")),
1858 forwarded_for: Some(String::from("10.0.0.1")),
1859 };
1860
1861 assert_eq!(
1862 200,
1863 send_with(address, "services/collector/event", message, TOKEN, &opts).await
1864 );
1865
1866 let event = collect_n(source, 1).await.remove(0);
1867 assert_eq!(
1868 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1869 "10.0.0.1".into()
1870 );
1871 })
1872 .await;
1873 }
1874
1875 #[tokio::test]
1876 async fn channel_query_param() {
1877 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1878 let message = "raw";
1879 let (source, address, _guard) = source(None).await;
1880
1881 let opts = SendWithOpts {
1882 channel: Some(Channel::QueryParam("guid")),
1883 forwarded_for: None,
1884 };
1885
1886 assert_eq!(
1887 200,
1888 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1889 );
1890
1891 let event = collect_n(source, 1).await.remove(0);
1892 assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1893 })
1894 .await;
1895 }
1896
1897 #[tokio::test]
1898 async fn no_data() {
1899 let (_source, address, _guard) = source(None).await;
1900
1901 assert_eq!(400, post(address, "services/collector/event", "").await);
1902 }
1903
1904 #[tokio::test]
1905 async fn invalid_token() {
1906 assert_source_error(&COMPONENT_ERROR_TAGS, async {
1907 let (_source, address, _guard) = source(None).await;
1908 let opts = SendWithOpts {
1909 channel: Some(Channel::Header("channel")),
1910 forwarded_for: None,
1911 };
1912
1913 assert_eq!(
1914 401,
1915 send_with(address, "services/collector/event", "", "nope", &opts).await
1916 );
1917 })
1918 .await;
1919 }
1920
1921 #[tokio::test]
1922 async fn health_ignores_token() {
1923 let (_source, address, _guard) = source(None).await;
1924
1925 let res = reqwest::Client::new()
1926 .get(format!("http://{address}/services/collector/health"))
1927 .header("Authorization", format!("Splunk {}", "invalid token"))
1928 .send()
1929 .await
1930 .unwrap();
1931
1932 assert_eq!(200, res.status().as_u16());
1933 }
1934
1935 #[tokio::test]
1936 async fn health() {
1937 let (_source, address, _guard) = source(None).await;
1938
1939 let res = reqwest::Client::new()
1940 .get(format!("http://{address}/services/collector/health"))
1941 .send()
1942 .await
1943 .unwrap();
1944
1945 assert_eq!(200, res.status().as_u16());
1946 }
1947
1948 #[tokio::test]
1949 async fn secondary_token() {
1950 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1951 let message = r#"{"event":"first", "color": "blue"}"#;
1952 let (_source, address, _guard) =
1953 source_with(None, Some(VALID_TOKENS), None, false).await;
1954 let options = SendWithOpts {
1955 channel: None,
1956 forwarded_for: None,
1957 };
1958
1959 assert_eq!(
1960 200,
1961 send_with(
1962 address,
1963 "services/collector/event",
1964 message,
1965 VALID_TOKENS.get(1).unwrap(),
1966 &options
1967 )
1968 .await
1969 );
1970 })
1971 .await;
1972 }
1973
1974 #[tokio::test]
1975 async fn event_service_token_passthrough_enabled() {
1976 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1977 let message = "passthrough_token_enabled";
1978 let (source, address, _guard) = source_with(None, Some(VALID_TOKENS), None, true).await;
1979 let (sink, health) = sink(
1980 address,
1981 TextSerializerConfig::default().into(),
1982 Compression::gzip_default(),
1983 )
1984 .await;
1985 assert!(health.await.is_ok());
1986
1987 let event = channel_n(vec![message], sink, source).await.remove(0);
1988
1989 assert_eq!(
1990 event.as_log()[log_schema().message_key().unwrap().to_string()],
1991 message.into()
1992 );
1993 assert_eq!(
1994 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
1995 TOKEN
1996 );
1997 })
1998 .await;
1999 }
2000
2001 #[tokio::test]
2002 async fn raw_service_token_passthrough_enabled() {
2003 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2004 let message = "raw";
2005 let (source, address, _guard) = source_with(None, Some(VALID_TOKENS), None, true).await;
2006
2007 assert_eq!(200, post(address, "services/collector/raw", message).await);
2008
2009 let event = collect_n(source, 1).await.remove(0);
2010 assert_eq!(
2011 event.as_log()[log_schema().message_key().unwrap().to_string()],
2012 message.into()
2013 );
2014 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2015 assert!(event.as_log().get_timestamp().is_some());
2016 assert_eq!(
2017 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2018 "splunk_hec".into()
2019 );
2020 assert_eq!(
2021 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2022 TOKEN
2023 );
2024 })
2025 .await;
2026 }
2027
2028 #[tokio::test]
2029 async fn no_authorization() {
2030 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2031 let message = "no_authorization";
2032 let (source, address, _guard) = source_with(None, None, None, false).await;
2033 let (sink, health) = sink(
2034 address,
2035 TextSerializerConfig::default().into(),
2036 Compression::gzip_default(),
2037 )
2038 .await;
2039 assert!(health.await.is_ok());
2040
2041 let event = channel_n(vec![message], sink, source).await.remove(0);
2042
2043 assert_eq!(
2044 event.as_log()[log_schema().message_key().unwrap().to_string()],
2045 message.into()
2046 );
2047 assert!(event.metadata().splunk_hec_token().is_none());
2048 })
2049 .await;
2050 }
2051
2052 #[tokio::test]
2053 async fn no_authorization_token_passthrough_enabled() {
2054 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2055 let message = "no_authorization";
2056 let (source, address, _guard) = source_with(None, None, None, true).await;
2057 let (sink, health) = sink(
2058 address,
2059 TextSerializerConfig::default().into(),
2060 Compression::gzip_default(),
2061 )
2062 .await;
2063 assert!(health.await.is_ok());
2064
2065 let event = channel_n(vec![message], sink, source).await.remove(0);
2066
2067 assert_eq!(
2068 event.as_log()[log_schema().message_key().unwrap().to_string()],
2069 message.into()
2070 );
2071 assert_eq!(
2072 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2073 TOKEN
2074 );
2075 })
2076 .await;
2077 }
2078
2079 #[tokio::test]
2080 async fn partial() {
2081 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2082 let message = r#"{"event":"first"}{"event":"second""#;
2083 let (source, address, _guard) = source(None).await;
2084
2085 assert_eq!(
2086 400,
2087 post(address, "services/collector/event", message).await
2088 );
2089
2090 let event = collect_n(source, 1).await.remove(0);
2091 assert_eq!(
2092 event.as_log()[log_schema().message_key().unwrap().to_string()],
2093 "first".into()
2094 );
2095 assert!(event.as_log().get_timestamp().is_some());
2096 assert_eq!(
2097 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2098 "splunk_hec".into()
2099 );
2100 })
2101 .await;
2102 }
2103
2104 #[tokio::test]
2105 async fn handles_newlines() {
2106 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2107 let message = r#"
2108{"event":"first"}
2109 "#;
2110 let (source, address, _guard) = source(None).await;
2111
2112 assert_eq!(
2113 200,
2114 post(address, "services/collector/event", message).await
2115 );
2116
2117 let event = collect_n(source, 1).await.remove(0);
2118 assert_eq!(
2119 event.as_log()[log_schema().message_key().unwrap().to_string()],
2120 "first".into()
2121 );
2122 assert!(event.as_log().get_timestamp().is_some());
2123 assert_eq!(
2124 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2125 "splunk_hec".into()
2126 );
2127 })
2128 .await;
2129 }
2130
2131 #[tokio::test]
2132 async fn handles_spaces() {
2133 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2134 let message = r#" {"event":"first"} "#;
2135 let (source, address, _guard) = source(None).await;
2136
2137 assert_eq!(
2138 200,
2139 post(address, "services/collector/event", message).await
2140 );
2141
2142 let event = collect_n(source, 1).await.remove(0);
2143 assert_eq!(
2144 event.as_log()[log_schema().message_key().unwrap().to_string()],
2145 "first".into()
2146 );
2147 assert!(event.as_log().get_timestamp().is_some());
2148 assert_eq!(
2149 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2150 "splunk_hec".into()
2151 );
2152 })
2153 .await;
2154 }
2155
2156 #[tokio::test]
2157 async fn handles_non_utf8() {
2158 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2159 let message = b" {\"event\": { \"non\": \"A non UTF8 character \xE4\", \"number\": 2, \"bool\": true } } ";
2160 let (source, address, _guard) = source(None).await;
2161
2162 let b = reqwest::Client::new()
2163 .post(format!(
2164 "http://{}/{}",
2165 address, "services/collector/event"
2166 ))
2167 .header("Authorization", format!("Splunk {TOKEN}"))
2168 .body::<&[u8]>(message);
2169
2170 assert_eq!(200, b.send().await.unwrap().status().as_u16());
2171
2172 let event = collect_n(source, 1).await.remove(0);
2173 assert_eq!(event.as_log()["non"], "A non UTF8 character �".into());
2174 assert_eq!(event.as_log()["number"], 2.into());
2175 assert_eq!(event.as_log()["bool"], true.into());
2176 assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some());
2177 assert_eq!(
2178 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2179 "splunk_hec".into()
2180 );
2181 }).await;
2182 }
2183
2184 #[tokio::test]
2185 async fn default() {
2186 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2187 let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
2188 let (source, address, _guard) = source(None).await;
2189
2190 assert_eq!(
2191 200,
2192 post(address, "services/collector/event", message).await
2193 );
2194
2195 let events = collect_n(source, 3).await;
2196
2197 assert_eq!(
2198 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
2199 "first".into()
2200 );
2201 assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
2202
2203 assert_eq!(
2204 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
2205 "second".into()
2206 );
2207 assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
2208
2209 assert_eq!(
2210 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
2211 "third".into()
2212 );
2213 assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
2214 }).await;
2215 }
2216
2217 #[test]
2218 fn parse_timestamps() {
2219 let cases = vec![
2220 Utc::now(),
2221 Utc.with_ymd_and_hms(1971, 11, 7, 1, 1, 1)
2222 .single()
2223 .expect("invalid timestamp"),
2224 Utc.with_ymd_and_hms(2011, 8, 5, 1, 1, 1)
2225 .single()
2226 .expect("invalid timestamp"),
2227 Utc.with_ymd_and_hms(2189, 11, 4, 2, 2, 2)
2228 .single()
2229 .expect("invalid timestamp"),
2230 ];
2231
2232 for case in cases {
2233 let sec = case.timestamp();
2234 let millis = case.timestamp_millis();
2235 let nano = case.timestamp_nanos_opt().expect("Timestamp out of range");
2236
2237 assert_eq!(parse_timestamp(sec).unwrap().timestamp(), case.timestamp());
2238 assert_eq!(
2239 parse_timestamp(millis).unwrap().timestamp_millis(),
2240 case.timestamp_millis()
2241 );
2242 assert_eq!(
2243 parse_timestamp(nano)
2244 .unwrap()
2245 .timestamp_nanos_opt()
2246 .unwrap(),
2247 case.timestamp_nanos_opt().expect("Timestamp out of range")
2248 );
2249 }
2250
2251 assert!(parse_timestamp(-1).is_none());
2252 }
2253
2254 #[tokio::test]
2261 async fn host_test() {
2262 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2263 let message = "for the host";
2264 let (sink, source) = start(
2265 TextSerializerConfig::default().into(),
2266 Compression::gzip_default(),
2267 None,
2268 )
2269 .await;
2270
2271 let event = channel_n(vec![message], sink, source).await.remove(0);
2272
2273 assert_eq!(
2274 event.as_log()[log_schema().message_key().unwrap().to_string()],
2275 message.into()
2276 );
2277 assert!(
2278 event
2279 .as_log()
2280 .get((PathPrefix::Event, log_schema().host_key().unwrap()))
2281 .is_none()
2282 );
2283 })
2284 .await;
2285 }
2286
2287 #[derive(Deserialize)]
2288 struct HecAckEventResponse {
2289 text: String,
2290 code: u8,
2291 #[serde(rename = "ackId")]
2292 ack_id: u64,
2293 }
2294
2295 #[tokio::test]
2296 async fn ack_json_event() {
2297 let ack_config = HecAcknowledgementsConfig {
2298 enabled: Some(true),
2299 ..Default::default()
2300 };
2301 let (source, address, _guard) = source(Some(ack_config)).await;
2302 let event_message = r#"{"event":"first", "color": "blue"}{"event":"second"}"#;
2303 let opts = SendWithOpts {
2304 channel: Some(Channel::Header("guid")),
2305 forwarded_for: None,
2306 };
2307 let event_res = send_with_response(
2308 address,
2309 "services/collector/event",
2310 event_message,
2311 TOKEN,
2312 &opts,
2313 )
2314 .await
2315 .json::<HecAckEventResponse>()
2316 .await
2317 .unwrap();
2318 assert_eq!("Success", event_res.text.as_str());
2319 assert_eq!(0, event_res.code);
2320 _ = collect_n(source, 1).await;
2321
2322 let ack_message = serde_json::to_string(&HecAckStatusRequest {
2323 acks: vec![event_res.ack_id],
2324 })
2325 .unwrap();
2326 let ack_res = send_with_response(
2327 address,
2328 "services/collector/ack",
2329 ack_message.as_str(),
2330 TOKEN,
2331 &opts,
2332 )
2333 .await
2334 .json::<HecAckStatusResponse>()
2335 .await
2336 .unwrap();
2337 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2338 }
2339
2340 #[tokio::test]
2341 async fn ack_raw_event() {
2342 let ack_config = HecAcknowledgementsConfig {
2343 enabled: Some(true),
2344 ..Default::default()
2345 };
2346 let (source, address, _guard) = source(Some(ack_config)).await;
2347 let event_message = "raw event message";
2348 let opts = SendWithOpts {
2349 channel: Some(Channel::Header("guid")),
2350 forwarded_for: None,
2351 };
2352 let event_res = send_with_response(
2353 address,
2354 "services/collector/raw",
2355 event_message,
2356 TOKEN,
2357 &opts,
2358 )
2359 .await
2360 .json::<HecAckEventResponse>()
2361 .await
2362 .unwrap();
2363 assert_eq!("Success", event_res.text.as_str());
2364 assert_eq!(0, event_res.code);
2365 _ = collect_n(source, 1).await;
2366
2367 let ack_message = serde_json::to_string(&HecAckStatusRequest {
2368 acks: vec![event_res.ack_id],
2369 })
2370 .unwrap();
2371 let ack_res = send_with_response(
2372 address,
2373 "services/collector/ack",
2374 ack_message.as_str(),
2375 TOKEN,
2376 &opts,
2377 )
2378 .await
2379 .json::<HecAckStatusResponse>()
2380 .await
2381 .unwrap();
2382 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2383 }
2384
2385 #[tokio::test]
2386 async fn ack_repeat_ack_query() {
2387 let ack_config = HecAcknowledgementsConfig {
2388 enabled: Some(true),
2389 ..Default::default()
2390 };
2391 let (source, address, _guard) = source(Some(ack_config)).await;
2392 let event_message = "raw event message";
2393 let opts = SendWithOpts {
2394 channel: Some(Channel::Header("guid")),
2395 forwarded_for: None,
2396 };
2397 let event_res = send_with_response(
2398 address,
2399 "services/collector/raw",
2400 event_message,
2401 TOKEN,
2402 &opts,
2403 )
2404 .await
2405 .json::<HecAckEventResponse>()
2406 .await
2407 .unwrap();
2408 _ = collect_n(source, 1).await;
2409
2410 let ack_message = serde_json::to_string(&HecAckStatusRequest {
2411 acks: vec![event_res.ack_id],
2412 })
2413 .unwrap();
2414 let ack_res = send_with_response(
2415 address,
2416 "services/collector/ack",
2417 ack_message.as_str(),
2418 TOKEN,
2419 &opts,
2420 )
2421 .await
2422 .json::<HecAckStatusResponse>()
2423 .await
2424 .unwrap();
2425 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2426
2427 let ack_res = send_with_response(
2428 address,
2429 "services/collector/ack",
2430 ack_message.as_str(),
2431 TOKEN,
2432 &opts,
2433 )
2434 .await
2435 .json::<HecAckStatusResponse>()
2436 .await
2437 .unwrap();
2438 assert!(!ack_res.acks.get(&event_res.ack_id).unwrap());
2439 }
2440
2441 #[tokio::test]
2442 async fn ack_exceed_max_number_of_ack_channels() {
2443 let ack_config = HecAcknowledgementsConfig {
2444 enabled: Some(true),
2445 max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
2446 ..Default::default()
2447 };
2448
2449 let (_source, address, _guard) = source(Some(ack_config)).await;
2450 let mut opts = SendWithOpts {
2451 channel: Some(Channel::Header("guid")),
2452 forwarded_for: None,
2453 };
2454 assert_eq!(
2455 200,
2456 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2457 );
2458
2459 opts.channel = Some(Channel::Header("other-guid"));
2460 assert_eq!(
2461 503,
2462 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2463 );
2464 assert_eq!(
2465 503,
2466 send_with(
2467 address,
2468 "services/collector/event",
2469 r#"{"event":"first"}"#,
2470 TOKEN,
2471 &opts
2472 )
2473 .await
2474 );
2475 }
2476
2477 #[tokio::test]
2478 async fn ack_exceed_max_pending_acks_per_channel() {
2479 let ack_config = HecAcknowledgementsConfig {
2480 enabled: Some(true),
2481 max_pending_acks_per_channel: NonZeroU64::new(1).unwrap(),
2482 ..Default::default()
2483 };
2484
2485 let (source, address, _guard) = source(Some(ack_config)).await;
2486 let opts = SendWithOpts {
2487 channel: Some(Channel::Header("guid")),
2488 forwarded_for: None,
2489 };
2490 for _ in 0..5 {
2491 send_with(
2492 address,
2493 "services/collector/event",
2494 r#"{"event":"first"}"#,
2495 TOKEN,
2496 &opts,
2497 )
2498 .await;
2499 }
2500 for _ in 0..5 {
2501 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await;
2502 }
2503 let event_res = send_with_response(
2504 address,
2505 "services/collector/event",
2506 r#"{"event":"this will be acked"}"#,
2507 TOKEN,
2508 &opts,
2509 )
2510 .await
2511 .json::<HecAckEventResponse>()
2512 .await
2513 .unwrap();
2514 _ = collect_n(source, 11).await;
2515
2516 let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest {
2517 acks: (0..10).collect::<Vec<u64>>(),
2518 })
2519 .unwrap();
2520 let ack_res = send_with_response(
2521 address,
2522 "services/collector/ack",
2523 ack_message_dropped.as_str(),
2524 TOKEN,
2525 &opts,
2526 )
2527 .await
2528 .json::<HecAckStatusResponse>()
2529 .await
2530 .unwrap();
2531 assert!(ack_res.acks.values().all(|ack_status| !*ack_status));
2532
2533 let ack_message_acked = serde_json::to_string(&HecAckStatusRequest {
2534 acks: vec![event_res.ack_id],
2535 })
2536 .unwrap();
2537 let ack_res = send_with_response(
2538 address,
2539 "services/collector/ack",
2540 ack_message_acked.as_str(),
2541 TOKEN,
2542 &opts,
2543 )
2544 .await
2545 .json::<HecAckStatusResponse>()
2546 .await
2547 .unwrap();
2548 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2549 }
2550
2551 #[tokio::test]
2552 async fn ack_service_accepts_parameterized_content_type() {
2553 let ack_config = HecAcknowledgementsConfig {
2554 enabled: Some(true),
2555 ..Default::default()
2556 };
2557 let (source, address, _guard) = source(Some(ack_config)).await;
2558 let opts = SendWithOpts {
2559 channel: Some(Channel::Header("guid")),
2560 forwarded_for: None,
2561 };
2562
2563 let event_res = send_with_response(
2564 address,
2565 "services/collector/event",
2566 r#"{"event":"param-test"}"#,
2567 TOKEN,
2568 &opts,
2569 )
2570 .await
2571 .json::<HecAckEventResponse>()
2572 .await
2573 .unwrap();
2574 let _ = collect_n(source, 1).await;
2575
2576 let body = serde_json::to_string(&HecAckStatusRequest {
2577 acks: vec![event_res.ack_id],
2578 })
2579 .unwrap();
2580
2581 let res = reqwest::Client::new()
2582 .post(format!("http://{address}/services/collector/ack"))
2583 .header("Authorization", format!("Splunk {TOKEN}"))
2584 .header("x-splunk-request-channel", "guid")
2585 .header("Content-Type", "application/json; some-random-text; hello")
2586 .body(body)
2587 .send()
2588 .await
2589 .unwrap();
2590
2591 assert_eq!(200, res.status().as_u16());
2592
2593 let _parsed: HecAckStatusResponse = res.json().await.unwrap();
2594 }
2595
2596 #[tokio::test]
2597 async fn event_service_acknowledgements_enabled_channel_required() {
2598 let message = r#"{"event":"first", "color": "blue"}"#;
2599 let ack_config = HecAcknowledgementsConfig {
2600 enabled: Some(true),
2601 ..Default::default()
2602 };
2603 let (_, address, _guard) = source(Some(ack_config)).await;
2604
2605 let opts = SendWithOpts {
2606 channel: None,
2607 forwarded_for: None,
2608 };
2609
2610 assert_eq!(
2611 400,
2612 send_with(address, "services/collector/event", message, TOKEN, &opts).await
2613 );
2614 }
2615
2616 #[tokio::test]
2617 async fn ack_service_acknowledgements_disabled() {
2618 let message = r#" {"acks":[0]} "#;
2619 let (_, address, _guard) = source(None).await;
2620
2621 let opts = SendWithOpts {
2622 channel: Some(Channel::Header("guid")),
2623 forwarded_for: None,
2624 };
2625
2626 assert_eq!(
2627 400,
2628 send_with(address, "services/collector/ack", message, TOKEN, &opts).await
2629 );
2630 }
2631
2632 #[test]
2633 fn output_schema_definition_vector_namespace() {
2634 let config = SplunkConfig {
2635 log_namespace: Some(true),
2636 ..Default::default()
2637 };
2638
2639 let definition = config
2640 .outputs(LogNamespace::Vector)
2641 .remove(0)
2642 .schema_definition(true);
2643
2644 let expected_definition = Definition::new_with_default_metadata(
2645 Kind::object(Collection::empty()).or_bytes(),
2646 [LogNamespace::Vector],
2647 )
2648 .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
2649 .with_metadata_field(
2650 &owned_value_path!("vector", "source_type"),
2651 Kind::bytes(),
2652 None,
2653 )
2654 .with_metadata_field(
2655 &owned_value_path!("vector", "ingest_timestamp"),
2656 Kind::timestamp(),
2657 None,
2658 )
2659 .with_metadata_field(
2660 &owned_value_path!("splunk_hec", "host"),
2661 Kind::bytes(),
2662 Some("host"),
2663 )
2664 .with_metadata_field(
2665 &owned_value_path!("splunk_hec", "index"),
2666 Kind::bytes(),
2667 None,
2668 )
2669 .with_metadata_field(
2670 &owned_value_path!("splunk_hec", "source"),
2671 Kind::bytes(),
2672 Some("service"),
2673 )
2674 .with_metadata_field(
2675 &owned_value_path!("splunk_hec", "channel"),
2676 Kind::bytes(),
2677 None,
2678 )
2679 .with_metadata_field(
2680 &owned_value_path!("splunk_hec", "sourcetype"),
2681 Kind::bytes(),
2682 None,
2683 );
2684
2685 assert_eq!(definition, Some(expected_definition));
2686 }
2687
2688 #[test]
2689 fn output_schema_definition_legacy_namespace() {
2690 let config = SplunkConfig::default();
2691 let definitions = config
2692 .outputs(LogNamespace::Legacy)
2693 .remove(0)
2694 .schema_definition(true);
2695
2696 let expected_definition = Definition::new_with_default_metadata(
2697 Kind::object(Collection::empty()),
2698 [LogNamespace::Legacy],
2699 )
2700 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
2701 .with_event_field(
2702 &owned_value_path!("message"),
2703 Kind::bytes().or_undefined(),
2704 Some("message"),
2705 )
2706 .with_event_field(
2707 &owned_value_path!("line"),
2708 Kind::array(Collection::empty())
2709 .or_object(Collection::empty())
2710 .or_undefined(),
2711 None,
2712 )
2713 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
2714 .with_event_field(&owned_value_path!("splunk_channel"), Kind::bytes(), None)
2715 .with_event_field(&owned_value_path!("splunk_index"), Kind::bytes(), None)
2716 .with_event_field(
2717 &owned_value_path!("splunk_source"),
2718 Kind::bytes(),
2719 Some("service"),
2720 )
2721 .with_event_field(&owned_value_path!("splunk_sourcetype"), Kind::bytes(), None)
2722 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
2723
2724 assert_eq!(definitions, Some(expected_definition));
2725 }
2726
2727 impl ValidatableComponent for SplunkConfig {
2728 fn validation_configuration() -> ValidationConfiguration {
2729 let config = Self {
2730 address: default_socket_address(),
2731 ..Default::default()
2732 };
2733
2734 let listen_addr_http = format!("http://{}/services/collector/event", config.address);
2735 let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
2736
2737 let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into();
2738 let framing = BytesDecoderConfig::new().into();
2739 let decoding = DeserializerConfig::Json(Default::default());
2740
2741 let external_resource = ExternalResource::new(
2742 ResourceDirection::Push,
2743 HttpResourceConfig::from_parts(uri, None).with_headers(HashMap::from([(
2744 X_SPLUNK_REQUEST_CHANNEL.to_string(),
2745 "channel".to_string(),
2746 )])),
2747 DecodingConfig::new(framing, decoding, false.into()),
2748 );
2749
2750 ValidationConfiguration::from_source(
2751 Self::NAME,
2752 log_namespace,
2753 vec![ComponentTestCaseConfig::from_source(
2754 config,
2755 None,
2756 Some(external_resource),
2757 )],
2758 )
2759 }
2760 }
2761
2762 register_validatable_component!(SplunkConfig);
2763}