1use std::{
2 collections::HashMap,
3 convert::Infallible,
4 io::Read,
5 net::{Ipv4Addr, SocketAddr},
6 sync::Arc,
7 time::Duration,
8};
9
10use bytes::{Buf, Bytes};
11use chrono::{DateTime, TimeZone, Utc};
12use flate2::read::MultiGzDecoder;
13use futures::FutureExt;
14use http::StatusCode;
15use hyper::{Server, service::make_service_fn};
16use serde::{Serialize, de::DeserializeOwned};
17use serde_json::{
18 Deserializer, Value as JsonValue,
19 de::{Read as JsonRead, StrRead},
20};
21use snafu::Snafu;
22use tokio::net::TcpStream;
23use tower::ServiceBuilder;
24use tracing::Span;
25use vector_lib::{
26 EstimatedJsonEncodedSizeOf,
27 config::{LegacyKey, LogNamespace},
28 configurable::configurable_component,
29 event::BatchNotifier,
30 internal_event::{CountByteSize, InternalEventHandle as _, Registered},
31 lookup::{self, event_path, lookup_v2::OptionalValuePath, owned_value_path},
32 schema::meaning,
33 sensitive_string::SensitiveString,
34 tls::MaybeTlsIncomingStream,
35};
36use vrl::{
37 path::OwnedTargetPath,
38 value::{Kind, kind::Collection},
39};
40use warp::{
41 Filter, Reply,
42 filters::BoxedFilter,
43 http::header::{CONTENT_TYPE, HeaderValue},
44 path,
45 reject::Rejection,
46 reply::Response,
47};
48
49use self::{
50 acknowledgements::{
51 HecAckStatusRequest, HecAckStatusResponse, HecAcknowledgementsConfig,
52 IndexerAcknowledgement,
53 },
54 splunk_response::{HecResponse, HecResponseMetadata, HecStatusCode},
55};
56use crate::{
57 SourceSender,
58 config::{DataType, Resource, SourceConfig, SourceContext, SourceOutput, log_schema},
59 event::{Event, LogEvent, Value},
60 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
61 internal_events::{
62 EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError,
63 },
64 serde::bool_or_struct,
65 source_sender::ClosedError,
66 tls::{MaybeTlsSettings, TlsEnableableConfig},
67};
68
69mod acknowledgements;
70
71pub const CHANNEL: &str = "splunk_channel";
73pub const INDEX: &str = "splunk_index";
74pub const SOURCE: &str = "splunk_source";
75pub const SOURCETYPE: &str = "splunk_sourcetype";
76
77const X_SPLUNK_REQUEST_CHANNEL: &str = "x-splunk-request-channel";
78
79#[configurable_component(source("splunk_hec", "Receive logs from Splunk."))]
81#[derive(Clone, Debug)]
82#[serde(deny_unknown_fields, default)]
83pub struct SplunkConfig {
84 #[serde(default = "default_socket_address")]
88 pub address: SocketAddr,
89
90 #[configurable(deprecated = "This option has been deprecated, use `valid_tokens` instead.")]
97 token: Option<SensitiveString>,
98
99 #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
106 valid_tokens: Option<Vec<SensitiveString>>,
107
108 store_hec_token: bool,
113
114 #[configurable(derived)]
115 tls: Option<TlsEnableableConfig>,
116
117 #[configurable(derived)]
118 #[serde(deserialize_with = "bool_or_struct")]
119 acknowledgements: HecAcknowledgementsConfig,
120
121 #[configurable(metadata(docs::hidden))]
123 #[serde(default)]
124 log_namespace: Option<bool>,
125
126 #[configurable(derived)]
127 #[serde(default)]
128 keepalive: KeepaliveConfig,
129}
130
131impl_generate_config_from_default!(SplunkConfig);
132
133impl Default for SplunkConfig {
134 fn default() -> Self {
135 SplunkConfig {
136 address: default_socket_address(),
137 token: None,
138 valid_tokens: None,
139 tls: None,
140 acknowledgements: Default::default(),
141 store_hec_token: false,
142 log_namespace: None,
143 keepalive: Default::default(),
144 }
145 }
146}
147
148fn default_socket_address() -> SocketAddr {
149 SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8088)
150}
151
152#[async_trait::async_trait]
153#[typetag::serde(name = "splunk_hec")]
154impl SourceConfig for SplunkConfig {
155 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
156 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
157 let shutdown = cx.shutdown.clone();
158 let out = cx.out.clone();
159 let source = SplunkSource::new(self, tls.http_protocol_name(), cx);
160
161 let event_service = source.event_service(out.clone());
162 let raw_service = source.raw_service(out);
163 let health_service = source.health_service();
164 let ack_service = source.ack_service();
165 let options = SplunkSource::options();
166
167 let services = path!("services" / "collector" / ..)
168 .and(
169 event_service
170 .or(raw_service)
171 .unify()
172 .or(health_service)
173 .unify()
174 .or(ack_service)
175 .unify()
176 .or(options)
177 .unify(),
178 )
179 .or_else(finish_err);
180
181 let listener = tls.bind(&self.address).await?;
182
183 let keepalive_settings = self.keepalive.clone();
184 Ok(Box::pin(async move {
185 let span = Span::current();
186 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
187 let svc = ServiceBuilder::new()
188 .layer(build_http_trace_layer(span.clone()))
189 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
190 MaxConnectionAgeLayer::new(
191 Duration::from_secs(secs),
192 keepalive_settings.max_connection_age_jitter_factor,
193 conn.peer_addr(),
194 )
195 }))
196 .service(warp::service(services.clone()));
197 futures_util::future::ok::<_, Infallible>(svc)
198 });
199
200 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
201 .serve(make_svc)
202 .with_graceful_shutdown(shutdown.map(|_| ()))
203 .await
204 .map_err(|err| {
205 error!("An error occurred: {:?}.", err);
206 })?;
207
208 Ok(())
209 }))
210 }
211
212 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
213 let log_namespace = global_log_namespace.merge(self.log_namespace);
214
215 let schema_definition = match log_namespace {
216 LogNamespace::Legacy => {
217 let definition = vector_lib::schema::Definition::empty_legacy_namespace()
218 .with_event_field(
219 &owned_value_path!("line"),
220 Kind::object(Collection::empty())
221 .or_array(Collection::empty())
222 .or_undefined(),
223 None,
224 );
225
226 if let Some(message_key) = log_schema().message_key() {
227 definition.with_event_field(
228 message_key,
229 Kind::bytes().or_undefined(),
230 Some(meaning::MESSAGE),
231 )
232 } else {
233 definition
234 }
235 }
236 LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
237 Kind::bytes().or_object(Collection::empty()),
238 [log_namespace],
239 )
240 .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
241 }
242 .with_standard_vector_source_metadata()
243 .with_source_metadata(
244 SplunkConfig::NAME,
245 log_schema()
246 .host_key()
247 .cloned()
248 .map(LegacyKey::InsertIfEmpty),
249 &owned_value_path!("host"),
250 Kind::bytes(),
251 Some(meaning::HOST),
252 )
253 .with_source_metadata(
254 SplunkConfig::NAME,
255 Some(LegacyKey::Overwrite(owned_value_path!(CHANNEL))),
256 &owned_value_path!("channel"),
257 Kind::bytes(),
258 None,
259 )
260 .with_source_metadata(
261 SplunkConfig::NAME,
262 Some(LegacyKey::Overwrite(owned_value_path!(INDEX))),
263 &owned_value_path!("index"),
264 Kind::bytes(),
265 None,
266 )
267 .with_source_metadata(
268 SplunkConfig::NAME,
269 Some(LegacyKey::Overwrite(owned_value_path!(SOURCE))),
270 &owned_value_path!("source"),
271 Kind::bytes(),
272 Some(meaning::SERVICE),
273 )
274 .with_source_metadata(
276 SplunkConfig::NAME,
277 Some(LegacyKey::Overwrite(owned_value_path!(SOURCETYPE))),
278 &owned_value_path!("sourcetype"),
279 Kind::bytes(),
280 None,
281 );
282
283 vec![SourceOutput::new_maybe_logs(
284 DataType::Log,
285 schema_definition,
286 )]
287 }
288
289 fn resources(&self) -> Vec<Resource> {
290 vec![Resource::tcp(self.address)]
291 }
292
293 fn can_acknowledge(&self) -> bool {
294 true
295 }
296}
297
298struct SplunkSource {
300 valid_credentials: Vec<String>,
301 protocol: &'static str,
302 idx_ack: Option<Arc<IndexerAcknowledgement>>,
303 store_hec_token: bool,
304 log_namespace: LogNamespace,
305 events_received: Registered<EventsReceived>,
306}
307
308impl SplunkSource {
309 fn new(config: &SplunkConfig, protocol: &'static str, cx: SourceContext) -> Self {
310 let log_namespace = cx.log_namespace(config.log_namespace);
311 let acknowledgements = cx.do_acknowledgements(config.acknowledgements.enabled.into());
312 let shutdown = cx.shutdown;
313 let valid_tokens = config
314 .valid_tokens
315 .iter()
316 .flatten()
317 .chain(config.token.iter());
318
319 let idx_ack = acknowledgements.then(|| {
320 Arc::new(IndexerAcknowledgement::new(
321 config.acknowledgements.clone(),
322 shutdown,
323 ))
324 });
325
326 SplunkSource {
327 valid_credentials: valid_tokens
328 .map(|token| format!("Splunk {}", token.inner()))
329 .collect(),
330 protocol,
331 idx_ack,
332 store_hec_token: config.store_hec_token,
333 log_namespace,
334 events_received: register!(EventsReceived),
335 }
336 }
337
338 fn event_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
339 let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
340 .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
341 let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
342
343 let splunk_channel = splunk_channel_header
344 .and(splunk_channel_query_param)
345 .map(|header: Option<String>, query_param| header.or(query_param));
346
347 let protocol = self.protocol;
348 let idx_ack = self.idx_ack.clone();
349 let store_hec_token = self.store_hec_token;
350 let log_namespace = self.log_namespace;
351 let events_received = self.events_received.clone();
352
353 warp::post()
354 .and(
355 path!("event")
356 .or(path!("event" / "1.0"))
357 .or(warp::path::end()),
358 )
359 .and(self.authorization())
360 .and(splunk_channel)
361 .and(warp::addr::remote())
362 .and(warp::header::optional::<String>("X-Forwarded-For"))
363 .and(self.gzip())
364 .and(warp::body::bytes())
365 .and(warp::path::full())
366 .and_then(
367 move |_,
368 token: Option<String>,
369 channel: Option<String>,
370 remote: Option<SocketAddr>,
371 remote_addr: Option<String>,
372 gzip: bool,
373 body: Bytes,
374 path: warp::path::FullPath| {
375 let mut out = out.clone();
376 let idx_ack = idx_ack.clone();
377 let events_received = events_received.clone();
378
379 async move {
380 if idx_ack.is_some() && channel.is_none() {
381 return Err(Rejection::from(ApiError::MissingChannel));
382 }
383
384 let mut data = Vec::new();
385 let (byte_size, body) = if gzip {
386 MultiGzDecoder::new(body.reader())
387 .read_to_end(&mut data)
388 .map_err(|_| Rejection::from(ApiError::BadRequest))?;
389 (data.len(), String::from_utf8_lossy(data.as_slice()))
390 } else {
391 (body.len(), String::from_utf8_lossy(body.as_ref()))
392 };
393 emit!(HttpBytesReceived {
394 byte_size,
395 http_path: path.as_str(),
396 protocol,
397 });
398
399 let (batch, receiver) =
400 BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
401 let maybe_ack_id = match (idx_ack, receiver, channel.clone()) {
402 (Some(idx_ack), Some(receiver), Some(channel_id)) => {
403 match idx_ack.get_ack_id_from_channel(channel_id, receiver).await {
404 Ok(ack_id) => Some(ack_id),
405 Err(rej) => return Err(rej),
406 }
407 }
408 _ => None,
409 };
410
411 let mut error = None;
412 let mut events = Vec::new();
413
414 let iter: EventIterator<'_, StrRead<'_>> = EventIteratorGenerator {
415 deserializer: Deserializer::from_str(&body).into_iter::<JsonValue>(),
416 channel,
417 remote,
418 remote_addr,
419 batch,
420 token: token.filter(|_| store_hec_token).map(Into::into),
421 log_namespace,
422 events_received,
423 }
424 .into();
425
426 for result in iter {
427 match result {
428 Ok(event) => events.push(event),
429 Err(err) => {
430 error = Some(err);
431 break;
432 }
433 }
434 }
435
436 if !events.is_empty()
437 && let Err(ClosedError) = out.send_batch(events).await
438 {
439 return Err(Rejection::from(ApiError::ServerShutdown));
440 }
441
442 if let Some(error) = error {
443 Err(error)
444 } else {
445 Ok(maybe_ack_id)
446 }
447 }
448 },
449 )
450 .map(finish_ok)
451 .boxed()
452 }
453
454 fn raw_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
455 let protocol = self.protocol;
456 let idx_ack = self.idx_ack.clone();
457 let store_hec_token = self.store_hec_token;
458 let events_received = self.events_received.clone();
459 let log_namespace = self.log_namespace;
460
461 warp::post()
462 .and(path!("raw" / "1.0").or(path!("raw")))
463 .and(self.authorization())
464 .and(SplunkSource::required_channel())
465 .and(warp::addr::remote())
466 .and(warp::header::optional::<String>("X-Forwarded-For"))
467 .and(self.gzip())
468 .and(warp::body::bytes())
469 .and(warp::path::full())
470 .and_then(
471 move |_,
472 token: Option<String>,
473 channel_id: String,
474 remote: Option<SocketAddr>,
475 xff: Option<String>,
476 gzip: bool,
477 body: Bytes,
478 path: warp::path::FullPath| {
479 let mut out = out.clone();
480 let idx_ack = idx_ack.clone();
481 let events_received = events_received.clone();
482 emit!(HttpBytesReceived {
483 byte_size: body.len(),
484 http_path: path.as_str(),
485 protocol,
486 });
487
488 async move {
489 let (batch, receiver) =
490 BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
491 let maybe_ack_id = match (idx_ack, receiver) {
492 (Some(idx_ack), Some(receiver)) => Some(
493 idx_ack
494 .get_ack_id_from_channel(channel_id.clone(), receiver)
495 .await?,
496 ),
497 _ => None,
498 };
499 let mut event = raw_event(
500 body,
501 gzip,
502 channel_id,
503 remote,
504 xff,
505 batch,
506 log_namespace,
507 &events_received,
508 )?;
509 if let Some(token) = token.filter(|_| store_hec_token) {
510 event.metadata_mut().set_splunk_hec_token(token.into());
511 }
512
513 let res = out.send_event(event).await;
514 res.map(|_| maybe_ack_id)
515 .map_err(|_| Rejection::from(ApiError::ServerShutdown))
516 }
517 },
518 )
519 .map(finish_ok)
520 .boxed()
521 }
522
523 fn health_service(&self) -> BoxedFilter<(Response,)> {
524 warp::get()
531 .and(path!("health" / "1.0").or(path!("health")))
532 .map(move |_| {
533 http::Response::builder()
534 .header(http::header::CONTENT_TYPE, "application/json")
535 .body(hyper::Body::from(r#"{"text":"HEC is healthy","code":17}"#))
536 .expect("static response")
537 })
538 .boxed()
539 }
540
541 fn lenient_json_content_type_check<T>() -> impl Filter<Extract = (T,), Error = Rejection> + Clone
542 where
543 T: Send + DeserializeOwned + 'static,
544 {
545 warp::header::optional::<HeaderValue>(CONTENT_TYPE.as_str())
546 .and(warp::body::bytes())
547 .and_then(
548 |ctype: Option<HeaderValue>, body: bytes::Bytes| async move {
549 let ok = ctype
550 .as_ref()
551 .and_then(|v| v.to_str().ok())
552 .map(|h| h.to_ascii_lowercase().contains("application/json"))
553 .unwrap_or(true);
554
555 if !ok {
556 return Err(warp::reject::custom(ApiError::UnsupportedContentType));
557 }
558
559 let value = serde_json::from_slice::<T>(&body)
560 .map_err(|_| warp::reject::custom(ApiError::BadRequest))?;
561
562 Ok(value)
563 },
564 )
565 }
566
567 fn ack_service(&self) -> BoxedFilter<(Response,)> {
568 let idx_ack = self.idx_ack.clone();
569
570 warp::post()
571 .and(warp::path!("ack"))
572 .and(self.authorization())
573 .and(SplunkSource::required_channel())
574 .and(Self::lenient_json_content_type_check::<HecAckStatusRequest>())
575 .and_then(move |_, channel: String, req: HecAckStatusRequest| {
576 let idx_ack = idx_ack.clone();
577 async move {
578 if let Some(idx_ack) = idx_ack {
579 let acks = idx_ack
580 .get_acks_status_from_channel(channel, &req.acks)
581 .await?;
582 Ok(warp::reply::json(&HecAckStatusResponse { acks }).into_response())
583 } else {
584 Err(warp::reject::custom(ApiError::AckIsDisabled))
585 }
586 }
587 })
588 .boxed()
589 }
590
591 fn options() -> BoxedFilter<(Response,)> {
592 let post = warp::options()
593 .and(
594 path!("event")
595 .or(path!("event" / "1.0"))
596 .or(path!("raw" / "1.0"))
597 .or(path!("raw")),
598 )
599 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
600
601 let get = warp::options()
602 .and(path!("health").or(path!("health" / "1.0")))
603 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
604
605 post.or(get).unify().boxed()
606 }
607
608 fn authorization(&self) -> BoxedFilter<(Option<String>,)> {
610 let valid_credentials = self.valid_credentials.clone();
611 warp::header::optional("Authorization")
612 .and_then(move |token: Option<String>| {
613 let valid_credentials = valid_credentials.clone();
614 async move {
615 match (token, valid_credentials.is_empty()) {
616 (token, true) => {
619 Ok(token
620 .map(|t| t.strip_prefix("Splunk ").map(Into::into).unwrap_or(t)))
621 }
622 (Some(token), false) if valid_credentials.contains(&token) => Ok(Some(
623 token
624 .strip_prefix("Splunk ")
625 .map(Into::into)
626 .unwrap_or(token),
627 )),
628 (Some(_), false) => Err(Rejection::from(ApiError::InvalidAuthorization)),
629 (None, false) => Err(Rejection::from(ApiError::MissingAuthorization)),
630 }
631 }
632 })
633 .boxed()
634 }
635
636 fn gzip(&self) -> BoxedFilter<(bool,)> {
638 warp::header::optional::<String>("Content-Encoding")
639 .and_then(|encoding: Option<String>| async move {
640 match encoding {
641 Some(s) if s.as_bytes() == b"gzip" => Ok(true),
642 Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
643 None => Ok(false),
644 }
645 })
646 .boxed()
647 }
648
649 fn required_channel() -> BoxedFilter<(String,)> {
650 let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
651 .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
652 let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
653
654 splunk_channel_header
655 .and(splunk_channel_query_param)
656 .and_then(|header: Option<String>, query_param| async move {
657 header
658 .or(query_param)
659 .ok_or_else(|| Rejection::from(ApiError::MissingChannel))
660 })
661 .boxed()
662 }
663}
664struct EventIterator<'de, R: JsonRead<'de>> {
667 deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
669 events: usize,
671 channel: Option<Value>,
673 time: Time,
675 extractors: [DefaultExtractor; 4],
677 batch: Option<BatchNotifier>,
679 token: Option<Arc<str>>,
681 log_namespace: LogNamespace,
683 events_received: Registered<EventsReceived>,
685}
686
687struct EventIteratorGenerator<'de, R: JsonRead<'de>> {
689 deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
690 channel: Option<String>,
691 batch: Option<BatchNotifier>,
692 token: Option<Arc<str>>,
693 log_namespace: LogNamespace,
694 events_received: Registered<EventsReceived>,
695 remote: Option<SocketAddr>,
696 remote_addr: Option<String>,
697}
698
699impl<'de, R: JsonRead<'de>> From<EventIteratorGenerator<'de, R>> for EventIterator<'de, R> {
700 fn from(f: EventIteratorGenerator<'de, R>) -> Self {
701 Self {
702 deserializer: f.deserializer,
703 events: 0,
704 channel: f.channel.map(Value::from),
705 time: Time::Now(Utc::now()),
706 extractors: [
707 DefaultExtractor::new_with(
712 "host",
713 log_schema().host_key().cloned().into(),
714 f.remote_addr
715 .or_else(|| f.remote.map(|addr| addr.to_string()))
716 .map(Value::from),
717 f.log_namespace,
718 ),
719 DefaultExtractor::new("index", OptionalValuePath::new(INDEX), f.log_namespace),
720 DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), f.log_namespace),
721 DefaultExtractor::new(
722 "sourcetype",
723 OptionalValuePath::new(SOURCETYPE),
724 f.log_namespace,
725 ),
726 ],
727 batch: f.batch,
728 token: f.token,
729 log_namespace: f.log_namespace,
730 events_received: f.events_received,
731 }
732 }
733}
734
735impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
736 fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
737 let mut log = match self.log_namespace {
739 LogNamespace::Vector => self.build_log_vector(&mut json)?,
740 LogNamespace::Legacy => self.build_log_legacy(&mut json)?,
741 };
742
743 self.log_namespace.insert_vector_metadata(
745 &mut log,
746 log_schema().source_type_key(),
747 &owned_value_path!("source_type"),
748 SplunkConfig::NAME,
749 );
750
751 let channel_path = owned_value_path!(CHANNEL);
753 if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
754 self.log_namespace.insert_source_metadata(
755 SplunkConfig::NAME,
756 &mut log,
757 Some(LegacyKey::Overwrite(&channel_path)),
758 lookup::path!(CHANNEL),
759 guid,
760 );
761 } else if let Some(guid) = self.channel.as_ref() {
762 self.log_namespace.insert_source_metadata(
763 SplunkConfig::NAME,
764 &mut log,
765 Some(LegacyKey::Overwrite(&channel_path)),
766 lookup::path!(CHANNEL),
767 guid.clone(),
768 );
769 }
770
771 if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
773 for (key, value) in object {
774 self.log_namespace.insert_source_metadata(
775 SplunkConfig::NAME,
776 &mut log,
777 Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
778 lookup::path!(key.as_str()),
779 value,
780 );
781 }
782 }
783
784 let parsed_time = match json.get_mut("time").map(JsonValue::take) {
786 Some(JsonValue::Number(time)) => Some(Some(time)),
787 Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
788 _ => None,
789 };
790
791 match parsed_time {
792 None => (),
793 Some(Some(t)) => {
794 if let Some(t) = t.as_u64() {
795 let time = parse_timestamp(t as i64)
796 .ok_or(ApiError::InvalidDataFormat { event: self.events })?;
797
798 self.time = Time::Provided(time);
799 } else if let Some(t) = t.as_f64() {
800 self.time = Time::Provided(
801 Utc.timestamp_opt(
802 t.floor() as i64,
803 (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
804 )
805 .single()
806 .expect("invalid timestamp"),
807 );
808 } else {
809 return Err(ApiError::InvalidDataFormat { event: self.events }.into());
810 }
811 }
812 Some(None) => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
813 }
814
815 let timestamp = match self.time.clone() {
817 Time::Provided(time) => time,
818 Time::Now(time) => time,
819 };
820
821 self.log_namespace.insert_source_metadata(
822 SplunkConfig::NAME,
823 &mut log,
824 log_schema().timestamp_key().map(LegacyKey::Overwrite),
825 lookup::path!("timestamp"),
826 timestamp,
827 );
828
829 for de in self.extractors.iter_mut() {
831 de.extract(&mut log, &mut json);
832 }
833
834 if let Some(token) = &self.token {
836 log.metadata_mut().set_splunk_hec_token(Arc::clone(token));
837 }
838
839 if let Some(batch) = self.batch.clone() {
840 log = log.with_batch_notifier(&batch);
841 }
842
843 self.events += 1;
844
845 Ok(log.into())
846 }
847
848 fn build_log_vector(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
852 match json.get("event") {
853 Some(event) => {
854 let event: Value = event.into();
855 let mut log = LogEvent::from(event);
856
857 self.events_received
859 .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
860
861 self.log_namespace.insert_vector_metadata(
863 &mut log,
864 log_schema().timestamp_key(),
865 lookup::path!("ingest_timestamp"),
866 chrono::Utc::now(),
867 );
868
869 Ok(log)
870 }
871 None => Err(ApiError::MissingEventField { event: self.events }.into()),
872 }
873 }
874
875 fn build_log_legacy(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
880 let mut log = LogEvent::default();
881 match json.get_mut("event") {
882 Some(event) => match event.take() {
883 JsonValue::String(string) => {
884 if string.is_empty() {
885 return Err(ApiError::EmptyEventField { event: self.events }.into());
886 }
887 log.maybe_insert(log_schema().message_key_target_path(), string);
888 }
889 JsonValue::Object(mut object) => {
890 if object.is_empty() {
891 return Err(ApiError::EmptyEventField { event: self.events }.into());
892 }
893
894 if let Some(line) = object.remove("line") {
896 match line {
897 JsonValue::Array(_) | JsonValue::Object(_) => {
899 log.insert(event_path!("line"), line);
900 }
901 _ => {
902 log.maybe_insert(log_schema().message_key_target_path(), line);
903 }
904 }
905 }
906
907 for (key, value) in object {
908 log.insert(event_path!(key.as_str()), value);
909 }
910 }
911 _ => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
912 },
913 None => return Err(ApiError::MissingEventField { event: self.events }.into()),
914 };
915
916 self.events_received
918 .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
919
920 Ok(log)
921 }
922}
923
924impl<'de, R: JsonRead<'de>> Iterator for EventIterator<'de, R> {
925 type Item = Result<Event, Rejection>;
926
927 fn next(&mut self) -> Option<Self::Item> {
928 match self.deserializer.next() {
929 Some(Ok(json)) => Some(self.build_event(json)),
930 None => {
931 if self.events == 0 {
932 Some(Err(ApiError::NoData.into()))
933 } else {
934 None
935 }
936 }
937 Some(Err(error)) => {
938 emit!(SplunkHecRequestBodyInvalidError {
939 error: error.into()
940 });
941 Some(Err(
942 ApiError::InvalidDataFormat { event: self.events }.into()
943 ))
944 }
945 }
946 }
947}
948
949fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
960 const SEC_CUTOFF: i64 = 13569465600;
962 const MILLISEC_CUTOFF: i64 = 253402300800000;
964
965 if t < 0 {
967 return None;
968 }
969
970 let ts = if t < SEC_CUTOFF {
971 Utc.timestamp_opt(t, 0).single().expect("invalid timestamp")
972 } else if t < MILLISEC_CUTOFF {
973 Utc.timestamp_millis_opt(t)
974 .single()
975 .expect("invalid timestamp")
976 } else {
977 Utc.timestamp_nanos(t)
978 };
979
980 Some(ts)
981}
982
983struct DefaultExtractor {
985 field: &'static str,
986 to_field: OptionalValuePath,
987 value: Option<Value>,
988 log_namespace: LogNamespace,
989}
990
991impl DefaultExtractor {
992 const fn new(
993 field: &'static str,
994 to_field: OptionalValuePath,
995 log_namespace: LogNamespace,
996 ) -> Self {
997 DefaultExtractor {
998 field,
999 to_field,
1000 value: None,
1001 log_namespace,
1002 }
1003 }
1004
1005 fn new_with(
1006 field: &'static str,
1007 to_field: OptionalValuePath,
1008 value: impl Into<Option<Value>>,
1009 log_namespace: LogNamespace,
1010 ) -> Self {
1011 DefaultExtractor {
1012 field,
1013 to_field,
1014 value: value.into(),
1015 log_namespace,
1016 }
1017 }
1018
1019 fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
1020 if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
1022 self.value = Some(new_value.into());
1023 }
1024
1025 if let Some(index) = self.value.as_ref()
1027 && let Some(metadata_key) = self.to_field.path.as_ref()
1028 {
1029 self.log_namespace.insert_source_metadata(
1030 SplunkConfig::NAME,
1031 log,
1032 Some(LegacyKey::Overwrite(metadata_key)),
1033 &self.to_field.path.clone().unwrap_or(owned_value_path!("")),
1034 index.clone(),
1035 )
1036 }
1037 }
1038}
1039
1040#[derive(Clone, Debug)]
1042enum Time {
1043 Now(DateTime<Utc>),
1045 Provided(DateTime<Utc>),
1047}
1048
1049#[allow(clippy::too_many_arguments)]
1051fn raw_event(
1052 bytes: Bytes,
1053 gzip: bool,
1054 channel: String,
1055 remote: Option<SocketAddr>,
1056 xff: Option<String>,
1057 batch: Option<BatchNotifier>,
1058 log_namespace: LogNamespace,
1059 events_received: &Registered<EventsReceived>,
1060) -> Result<Event, Rejection> {
1061 let message: Value = if gzip {
1063 let mut data = Vec::new();
1064 match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut data) {
1065 Ok(0) => return Err(ApiError::NoData.into()),
1066 Ok(_) => Value::from(Bytes::from(data)),
1067 Err(error) => {
1068 emit!(SplunkHecRequestBodyInvalidError { error });
1069 return Err(ApiError::InvalidDataFormat { event: 0 }.into());
1070 }
1071 }
1072 } else {
1073 bytes.into()
1074 };
1075
1076 let mut log = match log_namespace {
1078 LogNamespace::Vector => LogEvent::from(message),
1079 LogNamespace::Legacy => {
1080 let mut log = LogEvent::default();
1081 log.maybe_insert(log_schema().message_key_target_path(), message);
1082 log
1083 }
1084 };
1085 events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1087
1088 log_namespace.insert_source_metadata(
1090 SplunkConfig::NAME,
1091 &mut log,
1092 Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
1093 lookup::path!(CHANNEL),
1094 channel,
1095 );
1096
1097 let host = if let Some(remote_address) = xff {
1101 Some(remote_address)
1102 } else {
1103 remote.map(|remote| remote.to_string())
1104 };
1105
1106 if let Some(host) = host {
1107 log_namespace.insert_source_metadata(
1108 SplunkConfig::NAME,
1109 &mut log,
1110 log_schema().host_key().map(LegacyKey::InsertIfEmpty),
1111 lookup::path!("host"),
1112 host,
1113 );
1114 }
1115
1116 log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME, Utc::now());
1117
1118 if let Some(batch) = batch {
1119 log = log.with_batch_notifier(&batch);
1120 }
1121
1122 Ok(Event::from(log))
1123}
1124
1125#[derive(Clone, Copy, Debug, Snafu)]
1126pub(crate) enum ApiError {
1127 MissingAuthorization,
1128 InvalidAuthorization,
1129 UnsupportedEncoding,
1130 UnsupportedContentType,
1131 MissingChannel,
1132 NoData,
1133 InvalidDataFormat { event: usize },
1134 ServerShutdown,
1135 EmptyEventField { event: usize },
1136 MissingEventField { event: usize },
1137 BadRequest,
1138 ServiceUnavailable,
1139 AckIsDisabled,
1140}
1141
1142impl warp::reject::Reject for ApiError {}
1143
1144mod splunk_response {
1146 use serde::Serialize;
1147
1148 pub enum HecStatusCode {
1150 Success = 0,
1151 TokenIsRequired = 2,
1152 InvalidAuthorization = 3,
1153 NoData = 5,
1154 InvalidDataFormat = 6,
1155 ServerIsBusy = 9,
1156 DataChannelIsMissing = 10,
1157 EventFieldIsRequired = 12,
1158 EventFieldCannotBeBlank = 13,
1159 AckIsDisabled = 14,
1160 }
1161
1162 #[derive(Serialize)]
1163 pub enum HecResponseMetadata {
1164 #[serde(rename = "ackId")]
1165 AckId(u64),
1166 #[serde(rename = "invalid-event-number")]
1167 InvalidEventNumber(usize),
1168 }
1169
1170 #[derive(Serialize)]
1171 pub struct HecResponse {
1172 text: &'static str,
1173 code: u8,
1174 #[serde(skip_serializing_if = "Option::is_none", flatten)]
1175 pub metadata: Option<HecResponseMetadata>,
1176 }
1177
1178 impl HecResponse {
1179 pub const fn new(code: HecStatusCode) -> Self {
1180 let text = match code {
1181 HecStatusCode::Success => "Success",
1182 HecStatusCode::TokenIsRequired => "Token is required",
1183 HecStatusCode::InvalidAuthorization => "Invalid authorization",
1184 HecStatusCode::NoData => "No data",
1185 HecStatusCode::InvalidDataFormat => "Invalid data format",
1186 HecStatusCode::DataChannelIsMissing => "Data channel is missing",
1187 HecStatusCode::EventFieldIsRequired => "Event field is required",
1188 HecStatusCode::EventFieldCannotBeBlank => "Event field cannot be blank",
1189 HecStatusCode::ServerIsBusy => "Server is busy",
1190 HecStatusCode::AckIsDisabled => "Ack is disabled",
1191 };
1192
1193 Self {
1194 text,
1195 code: code as u8,
1196 metadata: None,
1197 }
1198 }
1199
1200 pub const fn with_metadata(mut self, metadata: HecResponseMetadata) -> Self {
1201 self.metadata = Some(metadata);
1202 self
1203 }
1204 }
1205
1206 pub const INVALID_AUTHORIZATION: HecResponse =
1207 HecResponse::new(HecStatusCode::InvalidAuthorization);
1208 pub const TOKEN_IS_REQUIRED: HecResponse = HecResponse::new(HecStatusCode::TokenIsRequired);
1209 pub const NO_DATA: HecResponse = HecResponse::new(HecStatusCode::NoData);
1210 pub const SUCCESS: HecResponse = HecResponse::new(HecStatusCode::Success);
1211 pub const SERVER_IS_BUSY: HecResponse = HecResponse::new(HecStatusCode::ServerIsBusy);
1212 pub const NO_CHANNEL: HecResponse = HecResponse::new(HecStatusCode::DataChannelIsMissing);
1213 pub const ACK_IS_DISABLED: HecResponse = HecResponse::new(HecStatusCode::AckIsDisabled);
1214}
1215
1216fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
1217 let body = if let Some(ack_id) = maybe_ack_id {
1218 HecResponse::new(HecStatusCode::Success).with_metadata(HecResponseMetadata::AckId(ack_id))
1219 } else {
1220 splunk_response::SUCCESS
1221 };
1222 response_json(StatusCode::OK, body)
1223}
1224
1225fn response_plain(code: StatusCode, msg: &'static str) -> Response {
1226 warp::reply::with_status(
1227 warp::reply::with_header(msg, http::header::CONTENT_TYPE, "text/plain; charset=utf-8"),
1228 code,
1229 )
1230 .into_response()
1231}
1232
1233async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
1234 if let Some(&error) = rejection.find::<ApiError>() {
1235 emit!(SplunkHecRequestError { error });
1236 Ok((match error {
1237 ApiError::MissingAuthorization => {
1238 response_json(StatusCode::UNAUTHORIZED, splunk_response::TOKEN_IS_REQUIRED)
1239 }
1240 ApiError::InvalidAuthorization => response_json(
1241 StatusCode::UNAUTHORIZED,
1242 splunk_response::INVALID_AUTHORIZATION,
1243 ),
1244 ApiError::UnsupportedEncoding => empty_response(StatusCode::UNSUPPORTED_MEDIA_TYPE),
1245 ApiError::UnsupportedContentType => response_plain(
1246 StatusCode::UNSUPPORTED_MEDIA_TYPE,
1247 "The request's content-type is not supported",
1248 ),
1249 ApiError::MissingChannel => {
1250 response_json(StatusCode::BAD_REQUEST, splunk_response::NO_CHANNEL)
1251 }
1252 ApiError::NoData => response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA),
1253 ApiError::ServerShutdown => empty_response(StatusCode::SERVICE_UNAVAILABLE),
1254 ApiError::InvalidDataFormat { event } => response_json(
1255 StatusCode::BAD_REQUEST,
1256 HecResponse::new(HecStatusCode::InvalidDataFormat)
1257 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1258 ),
1259 ApiError::EmptyEventField { event } => response_json(
1260 StatusCode::BAD_REQUEST,
1261 HecResponse::new(HecStatusCode::EventFieldCannotBeBlank)
1262 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1263 ),
1264 ApiError::MissingEventField { event } => response_json(
1265 StatusCode::BAD_REQUEST,
1266 HecResponse::new(HecStatusCode::EventFieldIsRequired)
1267 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1268 ),
1269 ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
1270 ApiError::ServiceUnavailable => response_json(
1271 StatusCode::SERVICE_UNAVAILABLE,
1272 splunk_response::SERVER_IS_BUSY,
1273 ),
1274 ApiError::AckIsDisabled => {
1275 response_json(StatusCode::BAD_REQUEST, splunk_response::ACK_IS_DISABLED)
1276 }
1277 },))
1278 } else {
1279 Err(rejection)
1280 }
1281}
1282
1283fn empty_response(code: StatusCode) -> Response {
1285 let mut res = Response::default();
1286 *res.status_mut() = code;
1287 res
1288}
1289
1290fn response_json(code: StatusCode, body: impl Serialize) -> Response {
1292 warp::reply::with_status(warp::reply::json(&body), code).into_response()
1293}
1294
1295#[cfg(feature = "sinks-splunk_hec")]
1296#[cfg(test)]
1297mod tests {
1298 use std::{net::SocketAddr, num::NonZeroU64};
1299
1300 use chrono::{TimeZone, Utc};
1301 use futures_util::Stream;
1302 use http::Uri;
1303 use reqwest::{RequestBuilder, Response};
1304 use serde::Deserialize;
1305 use vector_lib::{
1306 codecs::{
1307 BytesDecoderConfig, JsonSerializerConfig, TextSerializerConfig,
1308 decoding::DeserializerConfig,
1309 },
1310 event::EventStatus,
1311 schema::Definition,
1312 sensitive_string::SensitiveString,
1313 };
1314 use vrl::path::PathPrefix;
1315
1316 use super::*;
1317 use crate::{
1318 SourceSender,
1319 codecs::{DecodingConfig, EncodingConfig},
1320 components::validation::prelude::*,
1321 config::{SinkConfig, SinkContext, SourceConfig, SourceContext, log_schema},
1322 event::{Event, LogEvent},
1323 sinks::{
1324 Healthcheck, VectorSink,
1325 splunk_hec::logs::config::HecLogsSinkConfig,
1326 util::{BatchConfig, Compression, TowerRequestConfig},
1327 },
1328 sources::splunk_hec::acknowledgements::{HecAckStatusRequest, HecAckStatusResponse},
1329 test_util::{
1330 collect_n,
1331 components::{
1332 COMPONENT_ERROR_TAGS, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance,
1333 assert_source_error,
1334 },
1335 next_addr, wait_for_tcp,
1336 },
1337 };
1338
1339 #[test]
1340 fn generate_config() {
1341 crate::test_util::test_generate_config::<SplunkConfig>();
1342 }
1343
1344 const TOKEN: &str = "token";
1346 const VALID_TOKENS: &[&str; 2] = &[TOKEN, "secondary-token"];
1347
1348 async fn source(
1349 acknowledgements: Option<HecAcknowledgementsConfig>,
1350 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
1351 source_with(Some(TOKEN.to_owned().into()), None, acknowledgements, false).await
1352 }
1353
1354 async fn source_with(
1355 token: Option<SensitiveString>,
1356 valid_tokens: Option<&[&str]>,
1357 acknowledgements: Option<HecAcknowledgementsConfig>,
1358 store_hec_token: bool,
1359 ) -> (impl Stream<Item = Event> + Unpin + use<>, SocketAddr) {
1360 let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1361 let address = next_addr();
1362 let valid_tokens =
1363 valid_tokens.map(|tokens| tokens.iter().map(|v| v.to_string().into()).collect());
1364 let cx = SourceContext::new_test(sender, None);
1365 tokio::spawn(async move {
1366 SplunkConfig {
1367 address,
1368 token,
1369 valid_tokens,
1370 tls: None,
1371 acknowledgements: acknowledgements.unwrap_or_default(),
1372 store_hec_token,
1373 log_namespace: None,
1374 keepalive: Default::default(),
1375 }
1376 .build(cx)
1377 .await
1378 .unwrap()
1379 .await
1380 .unwrap()
1381 });
1382 wait_for_tcp(address).await;
1383 (recv, address)
1384 }
1385
1386 async fn sink(
1387 address: SocketAddr,
1388 encoding: EncodingConfig,
1389 compression: Compression,
1390 ) -> (VectorSink, Healthcheck) {
1391 HecLogsSinkConfig {
1392 default_token: TOKEN.to_owned().into(),
1393 endpoint: format!("http://{address}"),
1394 host_key: None,
1395 indexed_fields: vec![],
1396 index: None,
1397 sourcetype: None,
1398 source: None,
1399 encoding,
1400 compression,
1401 batch: BatchConfig::default(),
1402 request: TowerRequestConfig::default(),
1403 tls: None,
1404 acknowledgements: Default::default(),
1405 timestamp_nanos_key: None,
1406 timestamp_key: None,
1407 auto_extract_timestamp: None,
1408 endpoint_target: Default::default(),
1409 }
1410 .build(SinkContext::default())
1411 .await
1412 .unwrap()
1413 }
1414
1415 async fn start(
1416 encoding: EncodingConfig,
1417 compression: Compression,
1418 acknowledgements: Option<HecAcknowledgementsConfig>,
1419 ) -> (VectorSink, impl Stream<Item = Event> + Unpin) {
1420 let (source, address) = source(acknowledgements).await;
1421 let (sink, health) = sink(address, encoding, compression).await;
1422 assert!(health.await.is_ok());
1423 (sink, source)
1424 }
1425
1426 async fn channel_n(
1427 messages: Vec<impl Into<String> + Send + 'static>,
1428 sink: VectorSink,
1429 source: impl Stream<Item = Event> + Unpin,
1430 ) -> Vec<Event> {
1431 let n = messages.len();
1432
1433 tokio::spawn(async move {
1434 sink.run_events(
1435 messages
1436 .into_iter()
1437 .map(|s| Event::Log(LogEvent::from(s.into()))),
1438 )
1439 .await
1440 .unwrap();
1441 });
1442
1443 let events = collect_n(source, n).await;
1444 assert_eq!(n, events.len());
1445
1446 events
1447 }
1448
1449 #[derive(Clone, Copy, Debug)]
1450 enum Channel<'a> {
1451 Header(&'a str),
1452 QueryParam(&'a str),
1453 }
1454
1455 #[derive(Default)]
1456 struct SendWithOpts<'a> {
1457 channel: Option<Channel<'a>>,
1458 forwarded_for: Option<String>,
1459 }
1460
1461 async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
1462 let channel = Channel::Header("channel");
1463 let options = SendWithOpts {
1464 channel: Some(channel),
1465 forwarded_for: None,
1466 };
1467 send_with(address, api, message, TOKEN, &options).await
1468 }
1469
1470 fn build_request(
1471 address: SocketAddr,
1472 api: &str,
1473 message: &str,
1474 token: &str,
1475 opts: &SendWithOpts<'_>,
1476 ) -> RequestBuilder {
1477 let mut b = reqwest::Client::new()
1478 .post(format!("http://{address}/{api}"))
1479 .header("Authorization", format!("Splunk {token}"));
1480
1481 b = match opts.channel {
1482 Some(c) => match c {
1483 Channel::Header(v) => b.header("x-splunk-request-channel", v),
1484 Channel::QueryParam(v) => b.query(&[("channel", v)]),
1485 },
1486 None => b,
1487 };
1488
1489 b = match &opts.forwarded_for {
1490 Some(f) => b.header("X-Forwarded-For", f),
1491 None => b,
1492 };
1493
1494 b.body(message.to_owned())
1495 }
1496
1497 async fn send_with(
1498 address: SocketAddr,
1499 api: &str,
1500 message: &str,
1501 token: &str,
1502 opts: &SendWithOpts<'_>,
1503 ) -> u16 {
1504 let b = build_request(address, api, message, token, opts);
1505 b.send().await.unwrap().status().as_u16()
1506 }
1507
1508 async fn send_with_response(
1509 address: SocketAddr,
1510 api: &str,
1511 message: &str,
1512 token: &str,
1513 opts: &SendWithOpts<'_>,
1514 ) -> Response {
1515 let b = build_request(address, api, message, token, opts);
1516 b.send().await.unwrap()
1517 }
1518
1519 #[tokio::test]
1520 async fn no_compression_text_event() {
1521 let message = "gzip_text_event";
1522 let (sink, source) = start(
1523 TextSerializerConfig::default().into(),
1524 Compression::None,
1525 None,
1526 )
1527 .await;
1528
1529 let event = channel_n(vec![message], sink, source).await.remove(0);
1530
1531 assert_eq!(
1532 event.as_log()[log_schema().message_key().unwrap().to_string()],
1533 message.into()
1534 );
1535 assert!(event.as_log().get_timestamp().is_some());
1536 assert_eq!(
1537 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1538 "splunk_hec".into()
1539 );
1540 assert!(event.metadata().splunk_hec_token().is_none());
1541 }
1542
1543 #[tokio::test]
1544 async fn one_simple_text_event() {
1545 let message = "one_simple_text_event";
1546 let (sink, source) = start(
1547 TextSerializerConfig::default().into(),
1548 Compression::gzip_default(),
1549 None,
1550 )
1551 .await;
1552
1553 let event = channel_n(vec![message], sink, source).await.remove(0);
1554
1555 assert_eq!(
1556 event.as_log()[log_schema().message_key().unwrap().to_string()],
1557 message.into()
1558 );
1559 assert!(event.as_log().get_timestamp().is_some());
1560 assert_eq!(
1561 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1562 "splunk_hec".into()
1563 );
1564 assert!(event.metadata().splunk_hec_token().is_none());
1565 }
1566
1567 #[tokio::test]
1568 async fn multiple_simple_text_event() {
1569 let n = 200;
1570 let (sink, source) = start(
1571 TextSerializerConfig::default().into(),
1572 Compression::None,
1573 None,
1574 )
1575 .await;
1576
1577 let messages = (0..n)
1578 .map(|i| format!("multiple_simple_text_event_{i}"))
1579 .collect::<Vec<_>>();
1580 let events = channel_n(messages.clone(), sink, source).await;
1581
1582 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1583 assert_eq!(
1584 event.as_log()[log_schema().message_key().unwrap().to_string()],
1585 msg.into()
1586 );
1587 assert!(event.as_log().get_timestamp().is_some());
1588 assert_eq!(
1589 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1590 "splunk_hec".into()
1591 );
1592 assert!(event.metadata().splunk_hec_token().is_none());
1593 }
1594 }
1595
1596 #[tokio::test]
1597 async fn one_simple_json_event() {
1598 let message = "one_simple_json_event";
1599 let (sink, source) = start(
1600 JsonSerializerConfig::default().into(),
1601 Compression::gzip_default(),
1602 None,
1603 )
1604 .await;
1605
1606 let event = channel_n(vec![message], sink, source).await.remove(0);
1607
1608 assert_eq!(
1609 event.as_log()[log_schema().message_key().unwrap().to_string()],
1610 message.into()
1611 );
1612 assert!(event.as_log().get_timestamp().is_some());
1613 assert_eq!(
1614 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1615 "splunk_hec".into()
1616 );
1617 assert!(event.metadata().splunk_hec_token().is_none());
1618 }
1619
1620 #[tokio::test]
1621 async fn multiple_simple_json_event() {
1622 let n = 200;
1623 let (sink, source) = start(
1624 JsonSerializerConfig::default().into(),
1625 Compression::gzip_default(),
1626 None,
1627 )
1628 .await;
1629
1630 let messages = (0..n)
1631 .map(|i| format!("multiple_simple_json_event{i}"))
1632 .collect::<Vec<_>>();
1633 let events = channel_n(messages.clone(), sink, source).await;
1634
1635 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1636 assert_eq!(
1637 event.as_log()[log_schema().message_key().unwrap().to_string()],
1638 msg.into()
1639 );
1640 assert!(event.as_log().get_timestamp().is_some());
1641 assert_eq!(
1642 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1643 "splunk_hec".into()
1644 );
1645 assert!(event.metadata().splunk_hec_token().is_none());
1646 }
1647 }
1648
1649 #[tokio::test]
1650 async fn json_event() {
1651 let (sink, source) = start(
1652 JsonSerializerConfig::default().into(),
1653 Compression::gzip_default(),
1654 None,
1655 )
1656 .await;
1657
1658 let mut log = LogEvent::default();
1659 log.insert("greeting", "hello");
1660 log.insert("name", "bob");
1661 sink.run_events(vec![log.into()]).await.unwrap();
1662
1663 let event = collect_n(source, 1).await.remove(0).into_log();
1664 assert_eq!(event["greeting"], "hello".into());
1665 assert_eq!(event["name"], "bob".into());
1666 assert!(event.get_timestamp().is_some());
1667 assert_eq!(
1668 event[log_schema().source_type_key().unwrap().to_string()],
1669 "splunk_hec".into()
1670 );
1671 assert!(event.metadata().splunk_hec_token().is_none());
1672 }
1673
1674 #[tokio::test]
1675 async fn json_invalid_path_event() {
1676 let (sink, source) = start(
1677 JsonSerializerConfig::default().into(),
1678 Compression::gzip_default(),
1679 None,
1680 )
1681 .await;
1682
1683 let mut log = LogEvent::default();
1684 log.insert(event_path!("(greeting | thing"), "hello");
1687 sink.run_events(vec![log.into()]).await.unwrap();
1688
1689 let event = collect_n(source, 1).await.remove(0).into_log();
1690 assert_eq!(
1691 event.get(event_path!("(greeting | thing")),
1692 Some(&Value::from("hello"))
1693 );
1694 }
1695
1696 #[tokio::test]
1697 async fn line_to_message() {
1698 let (sink, source) = start(
1699 JsonSerializerConfig::default().into(),
1700 Compression::gzip_default(),
1701 None,
1702 )
1703 .await;
1704
1705 let mut event = LogEvent::default();
1706 event.insert("line", "hello");
1707 sink.run_events(vec![event.into()]).await.unwrap();
1708
1709 let event = collect_n(source, 1).await.remove(0);
1710 assert_eq!(
1711 event.as_log()[log_schema().message_key().unwrap().to_string()],
1712 "hello".into()
1713 );
1714 assert!(event.metadata().splunk_hec_token().is_none());
1715 }
1716
1717 #[tokio::test]
1718 async fn raw() {
1719 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1720 let message = "raw";
1721 let (source, address) = source(None).await;
1722
1723 assert_eq!(200, post(address, "services/collector/raw", message).await);
1724
1725 let event = collect_n(source, 1).await.remove(0);
1726 assert_eq!(
1727 event.as_log()[log_schema().message_key().unwrap().to_string()],
1728 message.into()
1729 );
1730 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1731 assert!(event.as_log().get_timestamp().is_some());
1732 assert_eq!(
1733 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1734 "splunk_hec".into()
1735 );
1736 assert!(event.metadata().splunk_hec_token().is_none());
1737 })
1738 .await;
1739 }
1740
1741 #[tokio::test]
1742 async fn root() {
1743 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1744 let message = r#"{ "event": { "message": "root"} }"#;
1745 let (source, address) = source(None).await;
1746
1747 assert_eq!(200, post(address, "services/collector", message).await);
1748
1749 let event = collect_n(source, 1).await.remove(0);
1750 assert_eq!(
1751 event.as_log()[log_schema().message_key().unwrap().to_string()],
1752 "root".into()
1753 );
1754 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1755 assert!(event.as_log().get_timestamp().is_some());
1756 assert_eq!(
1757 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1758 "splunk_hec".into()
1759 );
1760 assert!(event.metadata().splunk_hec_token().is_none());
1761 })
1762 .await;
1763 }
1764
1765 #[tokio::test]
1766 async fn channel_header() {
1767 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1768 let message = "raw";
1769 let (source, address) = source(None).await;
1770
1771 let opts = SendWithOpts {
1772 channel: Some(Channel::Header("guid")),
1773 forwarded_for: None,
1774 };
1775
1776 assert_eq!(
1777 200,
1778 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1779 );
1780
1781 let event = collect_n(source, 1).await.remove(0);
1782 assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1783 })
1784 .await;
1785 }
1786
1787 #[tokio::test]
1788 async fn xff_header_raw() {
1789 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1790 let message = "raw";
1791 let (source, address) = source(None).await;
1792
1793 let opts = SendWithOpts {
1794 channel: Some(Channel::Header("guid")),
1795 forwarded_for: Some(String::from("10.0.0.1")),
1796 };
1797
1798 assert_eq!(
1799 200,
1800 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1801 );
1802
1803 let event = collect_n(source, 1).await.remove(0);
1804 assert_eq!(
1805 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1806 "10.0.0.1".into()
1807 );
1808 })
1809 .await;
1810 }
1811
1812 #[tokio::test]
1814 async fn xff_header_event_with_host_field() {
1815 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1816 let message = r#"{"event":"first", "host": "10.1.0.2"}"#;
1817 let (source, address) = source(None).await;
1818
1819 let opts = SendWithOpts {
1820 channel: Some(Channel::Header("guid")),
1821 forwarded_for: Some(String::from("10.0.0.1")),
1822 };
1823
1824 assert_eq!(
1825 200,
1826 send_with(address, "services/collector/event", message, TOKEN, &opts).await
1827 );
1828
1829 let event = collect_n(source, 1).await.remove(0);
1830 assert_eq!(
1831 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1832 "10.1.0.2".into()
1833 );
1834 })
1835 .await;
1836 }
1837
1838 #[tokio::test]
1840 async fn xff_header_event_without_host_field() {
1841 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1842 let message = r#"{"event":"first", "color": "blue"}"#;
1843 let (source, address) = source(None).await;
1844
1845 let opts = SendWithOpts {
1846 channel: Some(Channel::Header("guid")),
1847 forwarded_for: Some(String::from("10.0.0.1")),
1848 };
1849
1850 assert_eq!(
1851 200,
1852 send_with(address, "services/collector/event", message, TOKEN, &opts).await
1853 );
1854
1855 let event = collect_n(source, 1).await.remove(0);
1856 assert_eq!(
1857 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1858 "10.0.0.1".into()
1859 );
1860 })
1861 .await;
1862 }
1863
1864 #[tokio::test]
1865 async fn channel_query_param() {
1866 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1867 let message = "raw";
1868 let (source, address) = source(None).await;
1869
1870 let opts = SendWithOpts {
1871 channel: Some(Channel::QueryParam("guid")),
1872 forwarded_for: None,
1873 };
1874
1875 assert_eq!(
1876 200,
1877 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1878 );
1879
1880 let event = collect_n(source, 1).await.remove(0);
1881 assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1882 })
1883 .await;
1884 }
1885
1886 #[tokio::test]
1887 async fn no_data() {
1888 let (_source, address) = source(None).await;
1889
1890 assert_eq!(400, post(address, "services/collector/event", "").await);
1891 }
1892
1893 #[tokio::test]
1894 async fn invalid_token() {
1895 assert_source_error(&COMPONENT_ERROR_TAGS, async {
1896 let (_source, address) = source(None).await;
1897 let opts = SendWithOpts {
1898 channel: Some(Channel::Header("channel")),
1899 forwarded_for: None,
1900 };
1901
1902 assert_eq!(
1903 401,
1904 send_with(address, "services/collector/event", "", "nope", &opts).await
1905 );
1906 })
1907 .await;
1908 }
1909
1910 #[tokio::test]
1911 async fn health_ignores_token() {
1912 let (_source, address) = source(None).await;
1913
1914 let res = reqwest::Client::new()
1915 .get(format!("http://{address}/services/collector/health"))
1916 .header("Authorization", format!("Splunk {}", "invalid token"))
1917 .send()
1918 .await
1919 .unwrap();
1920
1921 assert_eq!(200, res.status().as_u16());
1922 }
1923
1924 #[tokio::test]
1925 async fn health() {
1926 let (_source, address) = source(None).await;
1927
1928 let res = reqwest::Client::new()
1929 .get(format!("http://{address}/services/collector/health"))
1930 .send()
1931 .await
1932 .unwrap();
1933
1934 assert_eq!(200, res.status().as_u16());
1935 }
1936
1937 #[tokio::test]
1938 async fn secondary_token() {
1939 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1940 let message = r#"{"event":"first", "color": "blue"}"#;
1941 let (_source, address) = source_with(None, Some(VALID_TOKENS), None, false).await;
1942 let options = SendWithOpts {
1943 channel: None,
1944 forwarded_for: None,
1945 };
1946
1947 assert_eq!(
1948 200,
1949 send_with(
1950 address,
1951 "services/collector/event",
1952 message,
1953 VALID_TOKENS.get(1).unwrap(),
1954 &options
1955 )
1956 .await
1957 );
1958 })
1959 .await;
1960 }
1961
1962 #[tokio::test]
1963 async fn event_service_token_passthrough_enabled() {
1964 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1965 let message = "passthrough_token_enabled";
1966 let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
1967 let (sink, health) = sink(
1968 address,
1969 TextSerializerConfig::default().into(),
1970 Compression::gzip_default(),
1971 )
1972 .await;
1973 assert!(health.await.is_ok());
1974
1975 let event = channel_n(vec![message], sink, source).await.remove(0);
1976
1977 assert_eq!(
1978 event.as_log()[log_schema().message_key().unwrap().to_string()],
1979 message.into()
1980 );
1981 assert_eq!(
1982 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
1983 TOKEN
1984 );
1985 })
1986 .await;
1987 }
1988
1989 #[tokio::test]
1990 async fn raw_service_token_passthrough_enabled() {
1991 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1992 let message = "raw";
1993 let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
1994
1995 assert_eq!(200, post(address, "services/collector/raw", message).await);
1996
1997 let event = collect_n(source, 1).await.remove(0);
1998 assert_eq!(
1999 event.as_log()[log_schema().message_key().unwrap().to_string()],
2000 message.into()
2001 );
2002 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2003 assert!(event.as_log().get_timestamp().is_some());
2004 assert_eq!(
2005 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2006 "splunk_hec".into()
2007 );
2008 assert_eq!(
2009 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2010 TOKEN
2011 );
2012 })
2013 .await;
2014 }
2015
2016 #[tokio::test]
2017 async fn no_authorization() {
2018 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2019 let message = "no_authorization";
2020 let (source, address) = source_with(None, None, None, false).await;
2021 let (sink, health) = sink(
2022 address,
2023 TextSerializerConfig::default().into(),
2024 Compression::gzip_default(),
2025 )
2026 .await;
2027 assert!(health.await.is_ok());
2028
2029 let event = channel_n(vec![message], sink, source).await.remove(0);
2030
2031 assert_eq!(
2032 event.as_log()[log_schema().message_key().unwrap().to_string()],
2033 message.into()
2034 );
2035 assert!(event.metadata().splunk_hec_token().is_none());
2036 })
2037 .await;
2038 }
2039
2040 #[tokio::test]
2041 async fn no_authorization_token_passthrough_enabled() {
2042 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2043 let message = "no_authorization";
2044 let (source, address) = source_with(None, None, None, true).await;
2045 let (sink, health) = sink(
2046 address,
2047 TextSerializerConfig::default().into(),
2048 Compression::gzip_default(),
2049 )
2050 .await;
2051 assert!(health.await.is_ok());
2052
2053 let event = channel_n(vec![message], sink, source).await.remove(0);
2054
2055 assert_eq!(
2056 event.as_log()[log_schema().message_key().unwrap().to_string()],
2057 message.into()
2058 );
2059 assert_eq!(
2060 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2061 TOKEN
2062 );
2063 })
2064 .await;
2065 }
2066
2067 #[tokio::test]
2068 async fn partial() {
2069 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2070 let message = r#"{"event":"first"}{"event":"second""#;
2071 let (source, address) = source(None).await;
2072
2073 assert_eq!(
2074 400,
2075 post(address, "services/collector/event", message).await
2076 );
2077
2078 let event = collect_n(source, 1).await.remove(0);
2079 assert_eq!(
2080 event.as_log()[log_schema().message_key().unwrap().to_string()],
2081 "first".into()
2082 );
2083 assert!(event.as_log().get_timestamp().is_some());
2084 assert_eq!(
2085 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2086 "splunk_hec".into()
2087 );
2088 })
2089 .await;
2090 }
2091
2092 #[tokio::test]
2093 async fn handles_newlines() {
2094 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2095 let message = r#"
2096{"event":"first"}
2097 "#;
2098 let (source, address) = source(None).await;
2099
2100 assert_eq!(
2101 200,
2102 post(address, "services/collector/event", message).await
2103 );
2104
2105 let event = collect_n(source, 1).await.remove(0);
2106 assert_eq!(
2107 event.as_log()[log_schema().message_key().unwrap().to_string()],
2108 "first".into()
2109 );
2110 assert!(event.as_log().get_timestamp().is_some());
2111 assert_eq!(
2112 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2113 "splunk_hec".into()
2114 );
2115 })
2116 .await;
2117 }
2118
2119 #[tokio::test]
2120 async fn handles_spaces() {
2121 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2122 let message = r#" {"event":"first"} "#;
2123 let (source, address) = source(None).await;
2124
2125 assert_eq!(
2126 200,
2127 post(address, "services/collector/event", message).await
2128 );
2129
2130 let event = collect_n(source, 1).await.remove(0);
2131 assert_eq!(
2132 event.as_log()[log_schema().message_key().unwrap().to_string()],
2133 "first".into()
2134 );
2135 assert!(event.as_log().get_timestamp().is_some());
2136 assert_eq!(
2137 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2138 "splunk_hec".into()
2139 );
2140 })
2141 .await;
2142 }
2143
2144 #[tokio::test]
2145 async fn handles_non_utf8() {
2146 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2147 let message = b" {\"event\": { \"non\": \"A non UTF8 character \xE4\", \"number\": 2, \"bool\": true } } ";
2148 let (source, address) = source(None).await;
2149
2150 let b = reqwest::Client::new()
2151 .post(format!(
2152 "http://{}/{}",
2153 address, "services/collector/event"
2154 ))
2155 .header("Authorization", format!("Splunk {TOKEN}"))
2156 .body::<&[u8]>(message);
2157
2158 assert_eq!(200, b.send().await.unwrap().status().as_u16());
2159
2160 let event = collect_n(source, 1).await.remove(0);
2161 assert_eq!(event.as_log()["non"], "A non UTF8 character �".into());
2162 assert_eq!(event.as_log()["number"], 2.into());
2163 assert_eq!(event.as_log()["bool"], true.into());
2164 assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some());
2165 assert_eq!(
2166 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2167 "splunk_hec".into()
2168 );
2169 }).await;
2170 }
2171
2172 #[tokio::test]
2173 async fn default() {
2174 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2175 let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
2176 let (source, address) = source(None).await;
2177
2178 assert_eq!(
2179 200,
2180 post(address, "services/collector/event", message).await
2181 );
2182
2183 let events = collect_n(source, 3).await;
2184
2185 assert_eq!(
2186 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
2187 "first".into()
2188 );
2189 assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
2190
2191 assert_eq!(
2192 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
2193 "second".into()
2194 );
2195 assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
2196
2197 assert_eq!(
2198 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
2199 "third".into()
2200 );
2201 assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
2202 }).await;
2203 }
2204
2205 #[test]
2206 fn parse_timestamps() {
2207 let cases = vec![
2208 Utc::now(),
2209 Utc.with_ymd_and_hms(1971, 11, 7, 1, 1, 1)
2210 .single()
2211 .expect("invalid timestamp"),
2212 Utc.with_ymd_and_hms(2011, 8, 5, 1, 1, 1)
2213 .single()
2214 .expect("invalid timestamp"),
2215 Utc.with_ymd_and_hms(2189, 11, 4, 2, 2, 2)
2216 .single()
2217 .expect("invalid timestamp"),
2218 ];
2219
2220 for case in cases {
2221 let sec = case.timestamp();
2222 let millis = case.timestamp_millis();
2223 let nano = case.timestamp_nanos_opt().expect("Timestamp out of range");
2224
2225 assert_eq!(parse_timestamp(sec).unwrap().timestamp(), case.timestamp());
2226 assert_eq!(
2227 parse_timestamp(millis).unwrap().timestamp_millis(),
2228 case.timestamp_millis()
2229 );
2230 assert_eq!(
2231 parse_timestamp(nano)
2232 .unwrap()
2233 .timestamp_nanos_opt()
2234 .unwrap(),
2235 case.timestamp_nanos_opt().expect("Timestamp out of range")
2236 );
2237 }
2238
2239 assert!(parse_timestamp(-1).is_none());
2240 }
2241
2242 #[tokio::test]
2249 async fn host_test() {
2250 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2251 let message = "for the host";
2252 let (sink, source) = start(
2253 TextSerializerConfig::default().into(),
2254 Compression::gzip_default(),
2255 None,
2256 )
2257 .await;
2258
2259 let event = channel_n(vec![message], sink, source).await.remove(0);
2260
2261 assert_eq!(
2262 event.as_log()[log_schema().message_key().unwrap().to_string()],
2263 message.into()
2264 );
2265 assert!(
2266 event
2267 .as_log()
2268 .get((PathPrefix::Event, log_schema().host_key().unwrap()))
2269 .is_none()
2270 );
2271 })
2272 .await;
2273 }
2274
2275 #[derive(Deserialize)]
2276 struct HecAckEventResponse {
2277 text: String,
2278 code: u8,
2279 #[serde(rename = "ackId")]
2280 ack_id: u64,
2281 }
2282
2283 #[tokio::test]
2284 async fn ack_json_event() {
2285 let ack_config = HecAcknowledgementsConfig {
2286 enabled: Some(true),
2287 ..Default::default()
2288 };
2289 let (source, address) = source(Some(ack_config)).await;
2290 let event_message = r#"{"event":"first", "color": "blue"}{"event":"second"}"#;
2291 let opts = SendWithOpts {
2292 channel: Some(Channel::Header("guid")),
2293 forwarded_for: None,
2294 };
2295 let event_res = send_with_response(
2296 address,
2297 "services/collector/event",
2298 event_message,
2299 TOKEN,
2300 &opts,
2301 )
2302 .await
2303 .json::<HecAckEventResponse>()
2304 .await
2305 .unwrap();
2306 assert_eq!("Success", event_res.text.as_str());
2307 assert_eq!(0, event_res.code);
2308 _ = collect_n(source, 1).await;
2309
2310 let ack_message = serde_json::to_string(&HecAckStatusRequest {
2311 acks: vec![event_res.ack_id],
2312 })
2313 .unwrap();
2314 let ack_res = send_with_response(
2315 address,
2316 "services/collector/ack",
2317 ack_message.as_str(),
2318 TOKEN,
2319 &opts,
2320 )
2321 .await
2322 .json::<HecAckStatusResponse>()
2323 .await
2324 .unwrap();
2325 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2326 }
2327
2328 #[tokio::test]
2329 async fn ack_raw_event() {
2330 let ack_config = HecAcknowledgementsConfig {
2331 enabled: Some(true),
2332 ..Default::default()
2333 };
2334 let (source, address) = source(Some(ack_config)).await;
2335 let event_message = "raw event message";
2336 let opts = SendWithOpts {
2337 channel: Some(Channel::Header("guid")),
2338 forwarded_for: None,
2339 };
2340 let event_res = send_with_response(
2341 address,
2342 "services/collector/raw",
2343 event_message,
2344 TOKEN,
2345 &opts,
2346 )
2347 .await
2348 .json::<HecAckEventResponse>()
2349 .await
2350 .unwrap();
2351 assert_eq!("Success", event_res.text.as_str());
2352 assert_eq!(0, event_res.code);
2353 _ = collect_n(source, 1).await;
2354
2355 let ack_message = serde_json::to_string(&HecAckStatusRequest {
2356 acks: vec![event_res.ack_id],
2357 })
2358 .unwrap();
2359 let ack_res = send_with_response(
2360 address,
2361 "services/collector/ack",
2362 ack_message.as_str(),
2363 TOKEN,
2364 &opts,
2365 )
2366 .await
2367 .json::<HecAckStatusResponse>()
2368 .await
2369 .unwrap();
2370 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2371 }
2372
2373 #[tokio::test]
2374 async fn ack_repeat_ack_query() {
2375 let ack_config = HecAcknowledgementsConfig {
2376 enabled: Some(true),
2377 ..Default::default()
2378 };
2379 let (source, address) = source(Some(ack_config)).await;
2380 let event_message = "raw event message";
2381 let opts = SendWithOpts {
2382 channel: Some(Channel::Header("guid")),
2383 forwarded_for: None,
2384 };
2385 let event_res = send_with_response(
2386 address,
2387 "services/collector/raw",
2388 event_message,
2389 TOKEN,
2390 &opts,
2391 )
2392 .await
2393 .json::<HecAckEventResponse>()
2394 .await
2395 .unwrap();
2396 _ = collect_n(source, 1).await;
2397
2398 let ack_message = serde_json::to_string(&HecAckStatusRequest {
2399 acks: vec![event_res.ack_id],
2400 })
2401 .unwrap();
2402 let ack_res = send_with_response(
2403 address,
2404 "services/collector/ack",
2405 ack_message.as_str(),
2406 TOKEN,
2407 &opts,
2408 )
2409 .await
2410 .json::<HecAckStatusResponse>()
2411 .await
2412 .unwrap();
2413 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2414
2415 let ack_res = send_with_response(
2416 address,
2417 "services/collector/ack",
2418 ack_message.as_str(),
2419 TOKEN,
2420 &opts,
2421 )
2422 .await
2423 .json::<HecAckStatusResponse>()
2424 .await
2425 .unwrap();
2426 assert!(!ack_res.acks.get(&event_res.ack_id).unwrap());
2427 }
2428
2429 #[tokio::test]
2430 async fn ack_exceed_max_number_of_ack_channels() {
2431 let ack_config = HecAcknowledgementsConfig {
2432 enabled: Some(true),
2433 max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
2434 ..Default::default()
2435 };
2436
2437 let (_source, address) = source(Some(ack_config)).await;
2438 let mut opts = SendWithOpts {
2439 channel: Some(Channel::Header("guid")),
2440 forwarded_for: None,
2441 };
2442 assert_eq!(
2443 200,
2444 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2445 );
2446
2447 opts.channel = Some(Channel::Header("other-guid"));
2448 assert_eq!(
2449 503,
2450 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2451 );
2452 assert_eq!(
2453 503,
2454 send_with(
2455 address,
2456 "services/collector/event",
2457 r#"{"event":"first"}"#,
2458 TOKEN,
2459 &opts
2460 )
2461 .await
2462 );
2463 }
2464
2465 #[tokio::test]
2466 async fn ack_exceed_max_pending_acks_per_channel() {
2467 let ack_config = HecAcknowledgementsConfig {
2468 enabled: Some(true),
2469 max_pending_acks_per_channel: NonZeroU64::new(1).unwrap(),
2470 ..Default::default()
2471 };
2472
2473 let (source, address) = source(Some(ack_config)).await;
2474 let opts = SendWithOpts {
2475 channel: Some(Channel::Header("guid")),
2476 forwarded_for: None,
2477 };
2478 for _ in 0..5 {
2479 send_with(
2480 address,
2481 "services/collector/event",
2482 r#"{"event":"first"}"#,
2483 TOKEN,
2484 &opts,
2485 )
2486 .await;
2487 }
2488 for _ in 0..5 {
2489 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await;
2490 }
2491 let event_res = send_with_response(
2492 address,
2493 "services/collector/event",
2494 r#"{"event":"this will be acked"}"#,
2495 TOKEN,
2496 &opts,
2497 )
2498 .await
2499 .json::<HecAckEventResponse>()
2500 .await
2501 .unwrap();
2502 _ = collect_n(source, 11).await;
2503
2504 let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest {
2505 acks: (0..10).collect::<Vec<u64>>(),
2506 })
2507 .unwrap();
2508 let ack_res = send_with_response(
2509 address,
2510 "services/collector/ack",
2511 ack_message_dropped.as_str(),
2512 TOKEN,
2513 &opts,
2514 )
2515 .await
2516 .json::<HecAckStatusResponse>()
2517 .await
2518 .unwrap();
2519 assert!(ack_res.acks.values().all(|ack_status| !*ack_status));
2520
2521 let ack_message_acked = serde_json::to_string(&HecAckStatusRequest {
2522 acks: vec![event_res.ack_id],
2523 })
2524 .unwrap();
2525 let ack_res = send_with_response(
2526 address,
2527 "services/collector/ack",
2528 ack_message_acked.as_str(),
2529 TOKEN,
2530 &opts,
2531 )
2532 .await
2533 .json::<HecAckStatusResponse>()
2534 .await
2535 .unwrap();
2536 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2537 }
2538
2539 #[tokio::test]
2540 async fn ack_service_accepts_parameterized_content_type() {
2541 let ack_config = HecAcknowledgementsConfig {
2542 enabled: Some(true),
2543 ..Default::default()
2544 };
2545 let (source, address) = source(Some(ack_config)).await;
2546 let opts = SendWithOpts {
2547 channel: Some(Channel::Header("guid")),
2548 forwarded_for: None,
2549 };
2550
2551 let event_res = send_with_response(
2552 address,
2553 "services/collector/event",
2554 r#"{"event":"param-test"}"#,
2555 TOKEN,
2556 &opts,
2557 )
2558 .await
2559 .json::<HecAckEventResponse>()
2560 .await
2561 .unwrap();
2562 let _ = collect_n(source, 1).await;
2563
2564 let body = serde_json::to_string(&HecAckStatusRequest {
2565 acks: vec![event_res.ack_id],
2566 })
2567 .unwrap();
2568
2569 let res = reqwest::Client::new()
2570 .post(format!("http://{address}/services/collector/ack"))
2571 .header("Authorization", format!("Splunk {TOKEN}"))
2572 .header("x-splunk-request-channel", "guid")
2573 .header("Content-Type", "application/json; some-random-text; hello")
2574 .body(body)
2575 .send()
2576 .await
2577 .unwrap();
2578
2579 assert_eq!(200, res.status().as_u16());
2580
2581 let _parsed: HecAckStatusResponse = res.json().await.unwrap();
2582 }
2583
2584 #[tokio::test]
2585 async fn event_service_acknowledgements_enabled_channel_required() {
2586 let message = r#"{"event":"first", "color": "blue"}"#;
2587 let ack_config = HecAcknowledgementsConfig {
2588 enabled: Some(true),
2589 ..Default::default()
2590 };
2591 let (_, address) = source(Some(ack_config)).await;
2592
2593 let opts = SendWithOpts {
2594 channel: None,
2595 forwarded_for: None,
2596 };
2597
2598 assert_eq!(
2599 400,
2600 send_with(address, "services/collector/event", message, TOKEN, &opts).await
2601 );
2602 }
2603
2604 #[tokio::test]
2605 async fn ack_service_acknowledgements_disabled() {
2606 let message = r#" {"acks":[0]} "#;
2607 let (_, address) = source(None).await;
2608
2609 let opts = SendWithOpts {
2610 channel: Some(Channel::Header("guid")),
2611 forwarded_for: None,
2612 };
2613
2614 assert_eq!(
2615 400,
2616 send_with(address, "services/collector/ack", message, TOKEN, &opts).await
2617 );
2618 }
2619
2620 #[test]
2621 fn output_schema_definition_vector_namespace() {
2622 let config = SplunkConfig {
2623 log_namespace: Some(true),
2624 ..Default::default()
2625 };
2626
2627 let definition = config
2628 .outputs(LogNamespace::Vector)
2629 .remove(0)
2630 .schema_definition(true);
2631
2632 let expected_definition = Definition::new_with_default_metadata(
2633 Kind::object(Collection::empty()).or_bytes(),
2634 [LogNamespace::Vector],
2635 )
2636 .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
2637 .with_metadata_field(
2638 &owned_value_path!("vector", "source_type"),
2639 Kind::bytes(),
2640 None,
2641 )
2642 .with_metadata_field(
2643 &owned_value_path!("vector", "ingest_timestamp"),
2644 Kind::timestamp(),
2645 None,
2646 )
2647 .with_metadata_field(
2648 &owned_value_path!("splunk_hec", "host"),
2649 Kind::bytes(),
2650 Some("host"),
2651 )
2652 .with_metadata_field(
2653 &owned_value_path!("splunk_hec", "index"),
2654 Kind::bytes(),
2655 None,
2656 )
2657 .with_metadata_field(
2658 &owned_value_path!("splunk_hec", "source"),
2659 Kind::bytes(),
2660 Some("service"),
2661 )
2662 .with_metadata_field(
2663 &owned_value_path!("splunk_hec", "channel"),
2664 Kind::bytes(),
2665 None,
2666 )
2667 .with_metadata_field(
2668 &owned_value_path!("splunk_hec", "sourcetype"),
2669 Kind::bytes(),
2670 None,
2671 );
2672
2673 assert_eq!(definition, Some(expected_definition));
2674 }
2675
2676 #[test]
2677 fn output_schema_definition_legacy_namespace() {
2678 let config = SplunkConfig::default();
2679 let definitions = config
2680 .outputs(LogNamespace::Legacy)
2681 .remove(0)
2682 .schema_definition(true);
2683
2684 let expected_definition = Definition::new_with_default_metadata(
2685 Kind::object(Collection::empty()),
2686 [LogNamespace::Legacy],
2687 )
2688 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
2689 .with_event_field(
2690 &owned_value_path!("message"),
2691 Kind::bytes().or_undefined(),
2692 Some("message"),
2693 )
2694 .with_event_field(
2695 &owned_value_path!("line"),
2696 Kind::array(Collection::empty())
2697 .or_object(Collection::empty())
2698 .or_undefined(),
2699 None,
2700 )
2701 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
2702 .with_event_field(&owned_value_path!("splunk_channel"), Kind::bytes(), None)
2703 .with_event_field(&owned_value_path!("splunk_index"), Kind::bytes(), None)
2704 .with_event_field(
2705 &owned_value_path!("splunk_source"),
2706 Kind::bytes(),
2707 Some("service"),
2708 )
2709 .with_event_field(&owned_value_path!("splunk_sourcetype"), Kind::bytes(), None)
2710 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
2711
2712 assert_eq!(definitions, Some(expected_definition));
2713 }
2714
2715 impl ValidatableComponent for SplunkConfig {
2716 fn validation_configuration() -> ValidationConfiguration {
2717 let config = Self {
2718 address: default_socket_address(),
2719 ..Default::default()
2720 };
2721
2722 let listen_addr_http = format!("http://{}/services/collector/event", config.address);
2723 let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
2724
2725 let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into();
2726 let framing = BytesDecoderConfig::new().into();
2727 let decoding = DeserializerConfig::Json(Default::default());
2728
2729 let external_resource = ExternalResource::new(
2730 ResourceDirection::Push,
2731 HttpResourceConfig::from_parts(uri, None).with_headers(HashMap::from([(
2732 X_SPLUNK_REQUEST_CHANNEL.to_string(),
2733 "channel".to_string(),
2734 )])),
2735 DecodingConfig::new(framing, decoding, false.into()),
2736 );
2737
2738 ValidationConfiguration::from_source(
2739 Self::NAME,
2740 log_namespace,
2741 vec![ComponentTestCaseConfig::from_source(
2742 config,
2743 None,
2744 Some(external_resource),
2745 )],
2746 )
2747 }
2748 }
2749
2750 register_validatable_component!(SplunkConfig);
2751}