1use std::{
2 collections::HashMap,
3 convert::Infallible,
4 io::Read,
5 net::{Ipv4Addr, SocketAddr},
6 sync::Arc,
7 time::Duration,
8};
9
10use bytes::{Buf, Bytes};
11use chrono::{DateTime, TimeZone, Utc};
12use flate2::read::MultiGzDecoder;
13use futures::FutureExt;
14use http::StatusCode;
15use hyper::{service::make_service_fn, Server};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use serde_json::{
19 de::{Read as JsonRead, StrRead},
20 Deserializer, Value as JsonValue,
21};
22use snafu::Snafu;
23use tokio::net::TcpStream;
24use tower::ServiceBuilder;
25use tracing::Span;
26use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered};
27use vector_lib::lookup::lookup_v2::OptionalValuePath;
28use vector_lib::lookup::{self, event_path, owned_value_path};
29use vector_lib::sensitive_string::SensitiveString;
30use vector_lib::{
31 config::{LegacyKey, LogNamespace},
32 event::BatchNotifier,
33 schema::meaning,
34 EstimatedJsonEncodedSizeOf,
35};
36use vector_lib::{configurable::configurable_component, tls::MaybeTlsIncomingStream};
37use vrl::path::OwnedTargetPath;
38use vrl::value::{kind::Collection, Kind};
39use warp::http::header::{HeaderValue, CONTENT_TYPE};
40use warp::{filters::BoxedFilter, path, reject::Rejection, reply::Response, Filter, Reply};
41
42use self::{
43 acknowledgements::{
44 HecAckStatusRequest, HecAckStatusResponse, HecAcknowledgementsConfig,
45 IndexerAcknowledgement,
46 },
47 splunk_response::{HecResponse, HecResponseMetadata, HecStatusCode},
48};
49use crate::{
50 config::{log_schema, DataType, Resource, SourceConfig, SourceContext, SourceOutput},
51 event::{Event, LogEvent, Value},
52 http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer},
53 internal_events::{
54 EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError,
55 },
56 serde::bool_or_struct,
57 source_sender::ClosedError,
58 tls::{MaybeTlsSettings, TlsEnableableConfig},
59 SourceSender,
60};
61
62mod acknowledgements;
63
64pub const CHANNEL: &str = "splunk_channel";
66pub const INDEX: &str = "splunk_index";
67pub const SOURCE: &str = "splunk_source";
68pub const SOURCETYPE: &str = "splunk_sourcetype";
69
70const X_SPLUNK_REQUEST_CHANNEL: &str = "x-splunk-request-channel";
71
72#[configurable_component(source("splunk_hec", "Receive logs from Splunk."))]
74#[derive(Clone, Debug)]
75#[serde(deny_unknown_fields, default)]
76pub struct SplunkConfig {
77 #[serde(default = "default_socket_address")]
81 pub address: SocketAddr,
82
83 #[configurable(deprecated = "This option has been deprecated, use `valid_tokens` instead.")]
90 token: Option<SensitiveString>,
91
92 #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
99 valid_tokens: Option<Vec<SensitiveString>>,
100
101 store_hec_token: bool,
106
107 #[configurable(derived)]
108 tls: Option<TlsEnableableConfig>,
109
110 #[configurable(derived)]
111 #[serde(deserialize_with = "bool_or_struct")]
112 acknowledgements: HecAcknowledgementsConfig,
113
114 #[configurable(metadata(docs::hidden))]
116 #[serde(default)]
117 log_namespace: Option<bool>,
118
119 #[configurable(derived)]
120 #[serde(default)]
121 keepalive: KeepaliveConfig,
122}
123
124impl_generate_config_from_default!(SplunkConfig);
125
126impl Default for SplunkConfig {
127 fn default() -> Self {
128 SplunkConfig {
129 address: default_socket_address(),
130 token: None,
131 valid_tokens: None,
132 tls: None,
133 acknowledgements: Default::default(),
134 store_hec_token: false,
135 log_namespace: None,
136 keepalive: Default::default(),
137 }
138 }
139}
140
141fn default_socket_address() -> SocketAddr {
142 SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8088)
143}
144
145#[async_trait::async_trait]
146#[typetag::serde(name = "splunk_hec")]
147impl SourceConfig for SplunkConfig {
148 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
149 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
150 let shutdown = cx.shutdown.clone();
151 let out = cx.out.clone();
152 let source = SplunkSource::new(self, tls.http_protocol_name(), cx);
153
154 let event_service = source.event_service(out.clone());
155 let raw_service = source.raw_service(out);
156 let health_service = source.health_service();
157 let ack_service = source.ack_service();
158 let options = SplunkSource::options();
159
160 let services = path!("services" / "collector" / ..)
161 .and(
162 event_service
163 .or(raw_service)
164 .unify()
165 .or(health_service)
166 .unify()
167 .or(ack_service)
168 .unify()
169 .or(options)
170 .unify(),
171 )
172 .or_else(finish_err);
173
174 let listener = tls.bind(&self.address).await?;
175
176 let keepalive_settings = self.keepalive.clone();
177 Ok(Box::pin(async move {
178 let span = Span::current();
179 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
180 let svc = ServiceBuilder::new()
181 .layer(build_http_trace_layer(span.clone()))
182 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
183 MaxConnectionAgeLayer::new(
184 Duration::from_secs(secs),
185 keepalive_settings.max_connection_age_jitter_factor,
186 conn.peer_addr(),
187 )
188 }))
189 .service(warp::service(services.clone()));
190 futures_util::future::ok::<_, Infallible>(svc)
191 });
192
193 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
194 .serve(make_svc)
195 .with_graceful_shutdown(shutdown.map(|_| ()))
196 .await
197 .map_err(|err| {
198 error!("An error occurred: {:?}.", err);
199 })?;
200
201 Ok(())
202 }))
203 }
204
205 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
206 let log_namespace = global_log_namespace.merge(self.log_namespace);
207
208 let schema_definition = match log_namespace {
209 LogNamespace::Legacy => {
210 let definition = vector_lib::schema::Definition::empty_legacy_namespace()
211 .with_event_field(
212 &owned_value_path!("line"),
213 Kind::object(Collection::empty())
214 .or_array(Collection::empty())
215 .or_undefined(),
216 None,
217 );
218
219 if let Some(message_key) = log_schema().message_key() {
220 definition.with_event_field(
221 message_key,
222 Kind::bytes().or_undefined(),
223 Some(meaning::MESSAGE),
224 )
225 } else {
226 definition
227 }
228 }
229 LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
230 Kind::bytes().or_object(Collection::empty()),
231 [log_namespace],
232 )
233 .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
234 }
235 .with_standard_vector_source_metadata()
236 .with_source_metadata(
237 SplunkConfig::NAME,
238 log_schema()
239 .host_key()
240 .cloned()
241 .map(LegacyKey::InsertIfEmpty),
242 &owned_value_path!("host"),
243 Kind::bytes(),
244 Some(meaning::HOST),
245 )
246 .with_source_metadata(
247 SplunkConfig::NAME,
248 Some(LegacyKey::Overwrite(owned_value_path!(CHANNEL))),
249 &owned_value_path!("channel"),
250 Kind::bytes(),
251 None,
252 )
253 .with_source_metadata(
254 SplunkConfig::NAME,
255 Some(LegacyKey::Overwrite(owned_value_path!(INDEX))),
256 &owned_value_path!("index"),
257 Kind::bytes(),
258 None,
259 )
260 .with_source_metadata(
261 SplunkConfig::NAME,
262 Some(LegacyKey::Overwrite(owned_value_path!(SOURCE))),
263 &owned_value_path!("source"),
264 Kind::bytes(),
265 Some(meaning::SERVICE),
266 )
267 .with_source_metadata(
269 SplunkConfig::NAME,
270 Some(LegacyKey::Overwrite(owned_value_path!(SOURCETYPE))),
271 &owned_value_path!("sourcetype"),
272 Kind::bytes(),
273 None,
274 );
275
276 vec![SourceOutput::new_maybe_logs(
277 DataType::Log,
278 schema_definition,
279 )]
280 }
281
282 fn resources(&self) -> Vec<Resource> {
283 vec![Resource::tcp(self.address)]
284 }
285
286 fn can_acknowledge(&self) -> bool {
287 true
288 }
289}
290
291struct SplunkSource {
293 valid_credentials: Vec<String>,
294 protocol: &'static str,
295 idx_ack: Option<Arc<IndexerAcknowledgement>>,
296 store_hec_token: bool,
297 log_namespace: LogNamespace,
298 events_received: Registered<EventsReceived>,
299}
300
301impl SplunkSource {
302 fn new(config: &SplunkConfig, protocol: &'static str, cx: SourceContext) -> Self {
303 let log_namespace = cx.log_namespace(config.log_namespace);
304 let acknowledgements = cx.do_acknowledgements(config.acknowledgements.enabled.into());
305 let shutdown = cx.shutdown;
306 let valid_tokens = config
307 .valid_tokens
308 .iter()
309 .flatten()
310 .chain(config.token.iter());
311
312 let idx_ack = acknowledgements.then(|| {
313 Arc::new(IndexerAcknowledgement::new(
314 config.acknowledgements.clone(),
315 shutdown,
316 ))
317 });
318
319 SplunkSource {
320 valid_credentials: valid_tokens
321 .map(|token| format!("Splunk {}", token.inner()))
322 .collect(),
323 protocol,
324 idx_ack,
325 store_hec_token: config.store_hec_token,
326 log_namespace,
327 events_received: register!(EventsReceived),
328 }
329 }
330
331 fn event_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
332 let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
333 .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
334 let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
335
336 let splunk_channel = splunk_channel_header
337 .and(splunk_channel_query_param)
338 .map(|header: Option<String>, query_param| header.or(query_param));
339
340 let protocol = self.protocol;
341 let idx_ack = self.idx_ack.clone();
342 let store_hec_token = self.store_hec_token;
343 let log_namespace = self.log_namespace;
344 let events_received = self.events_received.clone();
345
346 warp::post()
347 .and(
348 path!("event")
349 .or(path!("event" / "1.0"))
350 .or(warp::path::end()),
351 )
352 .and(self.authorization())
353 .and(splunk_channel)
354 .and(warp::addr::remote())
355 .and(warp::header::optional::<String>("X-Forwarded-For"))
356 .and(self.gzip())
357 .and(warp::body::bytes())
358 .and(warp::path::full())
359 .and_then(
360 move |_,
361 token: Option<String>,
362 channel: Option<String>,
363 remote: Option<SocketAddr>,
364 remote_addr: Option<String>,
365 gzip: bool,
366 body: Bytes,
367 path: warp::path::FullPath| {
368 let mut out = out.clone();
369 let idx_ack = idx_ack.clone();
370 let events_received = events_received.clone();
371
372 async move {
373 if idx_ack.is_some() && channel.is_none() {
374 return Err(Rejection::from(ApiError::MissingChannel));
375 }
376
377 let mut data = Vec::new();
378 let (byte_size, body) = if gzip {
379 MultiGzDecoder::new(body.reader())
380 .read_to_end(&mut data)
381 .map_err(|_| Rejection::from(ApiError::BadRequest))?;
382 (data.len(), String::from_utf8_lossy(data.as_slice()))
383 } else {
384 (body.len(), String::from_utf8_lossy(body.as_ref()))
385 };
386 emit!(HttpBytesReceived {
387 byte_size,
388 http_path: path.as_str(),
389 protocol,
390 });
391
392 let (batch, receiver) =
393 BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
394 let maybe_ack_id = match (idx_ack, receiver, channel.clone()) {
395 (Some(idx_ack), Some(receiver), Some(channel_id)) => {
396 match idx_ack.get_ack_id_from_channel(channel_id, receiver).await {
397 Ok(ack_id) => Some(ack_id),
398 Err(rej) => return Err(rej),
399 }
400 }
401 _ => None,
402 };
403
404 let mut error = None;
405 let mut events = Vec::new();
406
407 let iter: EventIterator<'_, StrRead<'_>> = EventIteratorGenerator {
408 deserializer: Deserializer::from_str(&body).into_iter::<JsonValue>(),
409 channel,
410 remote,
411 remote_addr,
412 batch,
413 token: token.filter(|_| store_hec_token).map(Into::into),
414 log_namespace,
415 events_received,
416 }
417 .into();
418
419 for result in iter {
420 match result {
421 Ok(event) => events.push(event),
422 Err(err) => {
423 error = Some(err);
424 break;
425 }
426 }
427 }
428
429 if !events.is_empty() {
430 if let Err(ClosedError) = out.send_batch(events).await {
431 return Err(Rejection::from(ApiError::ServerShutdown));
432 }
433 }
434
435 if let Some(error) = error {
436 Err(error)
437 } else {
438 Ok(maybe_ack_id)
439 }
440 }
441 },
442 )
443 .map(finish_ok)
444 .boxed()
445 }
446
447 fn raw_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
448 let protocol = self.protocol;
449 let idx_ack = self.idx_ack.clone();
450 let store_hec_token = self.store_hec_token;
451 let events_received = self.events_received.clone();
452 let log_namespace = self.log_namespace;
453
454 warp::post()
455 .and(path!("raw" / "1.0").or(path!("raw")))
456 .and(self.authorization())
457 .and(SplunkSource::required_channel())
458 .and(warp::addr::remote())
459 .and(warp::header::optional::<String>("X-Forwarded-For"))
460 .and(self.gzip())
461 .and(warp::body::bytes())
462 .and(warp::path::full())
463 .and_then(
464 move |_,
465 token: Option<String>,
466 channel_id: String,
467 remote: Option<SocketAddr>,
468 xff: Option<String>,
469 gzip: bool,
470 body: Bytes,
471 path: warp::path::FullPath| {
472 let mut out = out.clone();
473 let idx_ack = idx_ack.clone();
474 let events_received = events_received.clone();
475 emit!(HttpBytesReceived {
476 byte_size: body.len(),
477 http_path: path.as_str(),
478 protocol,
479 });
480
481 async move {
482 let (batch, receiver) =
483 BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
484 let maybe_ack_id = match (idx_ack, receiver) {
485 (Some(idx_ack), Some(receiver)) => Some(
486 idx_ack
487 .get_ack_id_from_channel(channel_id.clone(), receiver)
488 .await?,
489 ),
490 _ => None,
491 };
492 let mut event = raw_event(
493 body,
494 gzip,
495 channel_id,
496 remote,
497 xff,
498 batch,
499 log_namespace,
500 &events_received,
501 )?;
502 if let Some(token) = token.filter(|_| store_hec_token) {
503 event.metadata_mut().set_splunk_hec_token(token.into());
504 }
505
506 let res = out.send_event(event).await;
507 res.map(|_| maybe_ack_id)
508 .map_err(|_| Rejection::from(ApiError::ServerShutdown))
509 }
510 },
511 )
512 .map(finish_ok)
513 .boxed()
514 }
515
516 fn health_service(&self) -> BoxedFilter<(Response,)> {
517 warp::get()
524 .and(path!("health" / "1.0").or(path!("health")))
525 .map(move |_| {
526 http::Response::builder()
527 .header(http::header::CONTENT_TYPE, "application/json")
528 .body(hyper::Body::from(r#"{"text":"HEC is healthy","code":17}"#))
529 .expect("static response")
530 })
531 .boxed()
532 }
533
534 fn lenient_json_content_type_check<T>() -> impl Filter<Extract = (T,), Error = Rejection> + Clone
535 where
536 T: Send + DeserializeOwned + 'static,
537 {
538 warp::header::optional::<HeaderValue>(CONTENT_TYPE.as_str())
539 .and(warp::body::bytes())
540 .and_then(
541 |ctype: Option<HeaderValue>, body: bytes::Bytes| async move {
542 let ok = ctype
543 .as_ref()
544 .and_then(|v| v.to_str().ok())
545 .map(|h| h.to_ascii_lowercase().contains("application/json"))
546 .unwrap_or(true);
547
548 if !ok {
549 return Err(warp::reject::custom(ApiError::UnsupportedContentType));
550 }
551
552 let value = serde_json::from_slice::<T>(&body)
553 .map_err(|_| warp::reject::custom(ApiError::BadRequest))?;
554
555 Ok(value)
556 },
557 )
558 }
559
560 fn ack_service(&self) -> BoxedFilter<(Response,)> {
561 let idx_ack = self.idx_ack.clone();
562
563 warp::post()
564 .and(warp::path!("ack"))
565 .and(self.authorization())
566 .and(SplunkSource::required_channel())
567 .and(Self::lenient_json_content_type_check::<HecAckStatusRequest>())
568 .and_then(move |_, channel: String, req: HecAckStatusRequest| {
569 let idx_ack = idx_ack.clone();
570 async move {
571 if let Some(idx_ack) = idx_ack {
572 let acks = idx_ack
573 .get_acks_status_from_channel(channel, &req.acks)
574 .await?;
575 Ok(warp::reply::json(&HecAckStatusResponse { acks }).into_response())
576 } else {
577 Err(warp::reject::custom(ApiError::AckIsDisabled))
578 }
579 }
580 })
581 .boxed()
582 }
583
584 fn options() -> BoxedFilter<(Response,)> {
585 let post = warp::options()
586 .and(
587 path!("event")
588 .or(path!("event" / "1.0"))
589 .or(path!("raw" / "1.0"))
590 .or(path!("raw")),
591 )
592 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
593
594 let get = warp::options()
595 .and(path!("health").or(path!("health" / "1.0")))
596 .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
597
598 post.or(get).unify().boxed()
599 }
600
601 fn authorization(&self) -> BoxedFilter<(Option<String>,)> {
603 let valid_credentials = self.valid_credentials.clone();
604 warp::header::optional("Authorization")
605 .and_then(move |token: Option<String>| {
606 let valid_credentials = valid_credentials.clone();
607 async move {
608 match (token, valid_credentials.is_empty()) {
609 (token, true) => {
612 Ok(token
613 .map(|t| t.strip_prefix("Splunk ").map(Into::into).unwrap_or(t)))
614 }
615 (Some(token), false) if valid_credentials.contains(&token) => Ok(Some(
616 token
617 .strip_prefix("Splunk ")
618 .map(Into::into)
619 .unwrap_or(token),
620 )),
621 (Some(_), false) => Err(Rejection::from(ApiError::InvalidAuthorization)),
622 (None, false) => Err(Rejection::from(ApiError::MissingAuthorization)),
623 }
624 }
625 })
626 .boxed()
627 }
628
629 fn gzip(&self) -> BoxedFilter<(bool,)> {
631 warp::header::optional::<String>("Content-Encoding")
632 .and_then(|encoding: Option<String>| async move {
633 match encoding {
634 Some(s) if s.as_bytes() == b"gzip" => Ok(true),
635 Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
636 None => Ok(false),
637 }
638 })
639 .boxed()
640 }
641
642 fn required_channel() -> BoxedFilter<(String,)> {
643 let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
644 .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
645 let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
646
647 splunk_channel_header
648 .and(splunk_channel_query_param)
649 .and_then(|header: Option<String>, query_param| async move {
650 header
651 .or(query_param)
652 .ok_or_else(|| Rejection::from(ApiError::MissingChannel))
653 })
654 .boxed()
655 }
656}
657struct EventIterator<'de, R: JsonRead<'de>> {
660 deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
662 events: usize,
664 channel: Option<Value>,
666 time: Time,
668 extractors: [DefaultExtractor; 4],
670 batch: Option<BatchNotifier>,
672 token: Option<Arc<str>>,
674 log_namespace: LogNamespace,
676 events_received: Registered<EventsReceived>,
678}
679
680struct EventIteratorGenerator<'de, R: JsonRead<'de>> {
682 deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
683 channel: Option<String>,
684 batch: Option<BatchNotifier>,
685 token: Option<Arc<str>>,
686 log_namespace: LogNamespace,
687 events_received: Registered<EventsReceived>,
688 remote: Option<SocketAddr>,
689 remote_addr: Option<String>,
690}
691
692impl<'de, R: JsonRead<'de>> From<EventIteratorGenerator<'de, R>> for EventIterator<'de, R> {
693 fn from(f: EventIteratorGenerator<'de, R>) -> Self {
694 Self {
695 deserializer: f.deserializer,
696 events: 0,
697 channel: f.channel.map(Value::from),
698 time: Time::Now(Utc::now()),
699 extractors: [
700 DefaultExtractor::new_with(
705 "host",
706 log_schema().host_key().cloned().into(),
707 f.remote_addr
708 .or_else(|| f.remote.map(|addr| addr.to_string()))
709 .map(Value::from),
710 f.log_namespace,
711 ),
712 DefaultExtractor::new("index", OptionalValuePath::new(INDEX), f.log_namespace),
713 DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), f.log_namespace),
714 DefaultExtractor::new(
715 "sourcetype",
716 OptionalValuePath::new(SOURCETYPE),
717 f.log_namespace,
718 ),
719 ],
720 batch: f.batch,
721 token: f.token,
722 log_namespace: f.log_namespace,
723 events_received: f.events_received,
724 }
725 }
726}
727
728impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
729 fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
730 let mut log = match self.log_namespace {
732 LogNamespace::Vector => self.build_log_vector(&mut json)?,
733 LogNamespace::Legacy => self.build_log_legacy(&mut json)?,
734 };
735
736 self.log_namespace.insert_vector_metadata(
738 &mut log,
739 log_schema().source_type_key(),
740 &owned_value_path!("source_type"),
741 SplunkConfig::NAME,
742 );
743
744 let channel_path = owned_value_path!(CHANNEL);
746 if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
747 self.log_namespace.insert_source_metadata(
748 SplunkConfig::NAME,
749 &mut log,
750 Some(LegacyKey::Overwrite(&channel_path)),
751 lookup::path!(CHANNEL),
752 guid,
753 );
754 } else if let Some(guid) = self.channel.as_ref() {
755 self.log_namespace.insert_source_metadata(
756 SplunkConfig::NAME,
757 &mut log,
758 Some(LegacyKey::Overwrite(&channel_path)),
759 lookup::path!(CHANNEL),
760 guid.clone(),
761 );
762 }
763
764 if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
766 for (key, value) in object {
767 self.log_namespace.insert_source_metadata(
768 SplunkConfig::NAME,
769 &mut log,
770 Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
771 lookup::path!(key.as_str()),
772 value,
773 );
774 }
775 }
776
777 let parsed_time = match json.get_mut("time").map(JsonValue::take) {
779 Some(JsonValue::Number(time)) => Some(Some(time)),
780 Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
781 _ => None,
782 };
783
784 match parsed_time {
785 None => (),
786 Some(Some(t)) => {
787 if let Some(t) = t.as_u64() {
788 let time = parse_timestamp(t as i64)
789 .ok_or(ApiError::InvalidDataFormat { event: self.events })?;
790
791 self.time = Time::Provided(time);
792 } else if let Some(t) = t.as_f64() {
793 self.time = Time::Provided(
794 Utc.timestamp_opt(
795 t.floor() as i64,
796 (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
797 )
798 .single()
799 .expect("invalid timestamp"),
800 );
801 } else {
802 return Err(ApiError::InvalidDataFormat { event: self.events }.into());
803 }
804 }
805 Some(None) => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
806 }
807
808 let timestamp = match self.time.clone() {
810 Time::Provided(time) => time,
811 Time::Now(time) => time,
812 };
813
814 self.log_namespace.insert_source_metadata(
815 SplunkConfig::NAME,
816 &mut log,
817 log_schema().timestamp_key().map(LegacyKey::Overwrite),
818 lookup::path!("timestamp"),
819 timestamp,
820 );
821
822 for de in self.extractors.iter_mut() {
824 de.extract(&mut log, &mut json);
825 }
826
827 if let Some(token) = &self.token {
829 log.metadata_mut().set_splunk_hec_token(Arc::clone(token));
830 }
831
832 if let Some(batch) = self.batch.clone() {
833 log = log.with_batch_notifier(&batch);
834 }
835
836 self.events += 1;
837
838 Ok(log.into())
839 }
840
841 fn build_log_vector(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
845 match json.get("event") {
846 Some(event) => {
847 let event: Value = event.into();
848 let mut log = LogEvent::from(event);
849
850 self.events_received
852 .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
853
854 self.log_namespace.insert_vector_metadata(
856 &mut log,
857 log_schema().timestamp_key(),
858 lookup::path!("ingest_timestamp"),
859 chrono::Utc::now(),
860 );
861
862 Ok(log)
863 }
864 None => Err(ApiError::MissingEventField { event: self.events }.into()),
865 }
866 }
867
868 fn build_log_legacy(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
873 let mut log = LogEvent::default();
874 match json.get_mut("event") {
875 Some(event) => match event.take() {
876 JsonValue::String(string) => {
877 if string.is_empty() {
878 return Err(ApiError::EmptyEventField { event: self.events }.into());
879 }
880 log.maybe_insert(log_schema().message_key_target_path(), string);
881 }
882 JsonValue::Object(mut object) => {
883 if object.is_empty() {
884 return Err(ApiError::EmptyEventField { event: self.events }.into());
885 }
886
887 if let Some(line) = object.remove("line") {
889 match line {
890 JsonValue::Array(_) | JsonValue::Object(_) => {
892 log.insert(event_path!("line"), line);
893 }
894 _ => {
895 log.maybe_insert(log_schema().message_key_target_path(), line);
896 }
897 }
898 }
899
900 for (key, value) in object {
901 log.insert(event_path!(key.as_str()), value);
902 }
903 }
904 _ => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
905 },
906 None => return Err(ApiError::MissingEventField { event: self.events }.into()),
907 };
908
909 self.events_received
911 .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
912
913 Ok(log)
914 }
915}
916
917impl<'de, R: JsonRead<'de>> Iterator for EventIterator<'de, R> {
918 type Item = Result<Event, Rejection>;
919
920 fn next(&mut self) -> Option<Self::Item> {
921 match self.deserializer.next() {
922 Some(Ok(json)) => Some(self.build_event(json)),
923 None => {
924 if self.events == 0 {
925 Some(Err(ApiError::NoData.into()))
926 } else {
927 None
928 }
929 }
930 Some(Err(error)) => {
931 emit!(SplunkHecRequestBodyInvalidError {
932 error: error.into()
933 });
934 Some(Err(
935 ApiError::InvalidDataFormat { event: self.events }.into()
936 ))
937 }
938 }
939 }
940}
941
942fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
953 const SEC_CUTOFF: i64 = 13569465600;
955 const MILLISEC_CUTOFF: i64 = 253402300800000;
957
958 if t < 0 {
960 return None;
961 }
962
963 let ts = if t < SEC_CUTOFF {
964 Utc.timestamp_opt(t, 0).single().expect("invalid timestamp")
965 } else if t < MILLISEC_CUTOFF {
966 Utc.timestamp_millis_opt(t)
967 .single()
968 .expect("invalid timestamp")
969 } else {
970 Utc.timestamp_nanos(t)
971 };
972
973 Some(ts)
974}
975
976struct DefaultExtractor {
978 field: &'static str,
979 to_field: OptionalValuePath,
980 value: Option<Value>,
981 log_namespace: LogNamespace,
982}
983
984impl DefaultExtractor {
985 const fn new(
986 field: &'static str,
987 to_field: OptionalValuePath,
988 log_namespace: LogNamespace,
989 ) -> Self {
990 DefaultExtractor {
991 field,
992 to_field,
993 value: None,
994 log_namespace,
995 }
996 }
997
998 fn new_with(
999 field: &'static str,
1000 to_field: OptionalValuePath,
1001 value: impl Into<Option<Value>>,
1002 log_namespace: LogNamespace,
1003 ) -> Self {
1004 DefaultExtractor {
1005 field,
1006 to_field,
1007 value: value.into(),
1008 log_namespace,
1009 }
1010 }
1011
1012 fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
1013 if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
1015 self.value = Some(new_value.into());
1016 }
1017
1018 if let Some(index) = self.value.as_ref() {
1020 if let Some(metadata_key) = self.to_field.path.as_ref() {
1021 self.log_namespace.insert_source_metadata(
1022 SplunkConfig::NAME,
1023 log,
1024 Some(LegacyKey::Overwrite(metadata_key)),
1025 &self.to_field.path.clone().unwrap_or(owned_value_path!("")),
1026 index.clone(),
1027 )
1028 }
1029 }
1030 }
1031}
1032
1033#[derive(Clone, Debug)]
1035enum Time {
1036 Now(DateTime<Utc>),
1038 Provided(DateTime<Utc>),
1040}
1041
1042#[allow(clippy::too_many_arguments)]
1044fn raw_event(
1045 bytes: Bytes,
1046 gzip: bool,
1047 channel: String,
1048 remote: Option<SocketAddr>,
1049 xff: Option<String>,
1050 batch: Option<BatchNotifier>,
1051 log_namespace: LogNamespace,
1052 events_received: &Registered<EventsReceived>,
1053) -> Result<Event, Rejection> {
1054 let message: Value = if gzip {
1056 let mut data = Vec::new();
1057 match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut data) {
1058 Ok(0) => return Err(ApiError::NoData.into()),
1059 Ok(_) => Value::from(Bytes::from(data)),
1060 Err(error) => {
1061 emit!(SplunkHecRequestBodyInvalidError { error });
1062 return Err(ApiError::InvalidDataFormat { event: 0 }.into());
1063 }
1064 }
1065 } else {
1066 bytes.into()
1067 };
1068
1069 let mut log = match log_namespace {
1071 LogNamespace::Vector => LogEvent::from(message),
1072 LogNamespace::Legacy => {
1073 let mut log = LogEvent::default();
1074 log.maybe_insert(log_schema().message_key_target_path(), message);
1075 log
1076 }
1077 };
1078 events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1080
1081 log_namespace.insert_source_metadata(
1083 SplunkConfig::NAME,
1084 &mut log,
1085 Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
1086 lookup::path!(CHANNEL),
1087 channel,
1088 );
1089
1090 let host = if let Some(remote_address) = xff {
1094 Some(remote_address)
1095 } else {
1096 remote.map(|remote| remote.to_string())
1097 };
1098
1099 if let Some(host) = host {
1100 log_namespace.insert_source_metadata(
1101 SplunkConfig::NAME,
1102 &mut log,
1103 log_schema().host_key().map(LegacyKey::InsertIfEmpty),
1104 lookup::path!("host"),
1105 host,
1106 );
1107 }
1108
1109 log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME, Utc::now());
1110
1111 if let Some(batch) = batch {
1112 log = log.with_batch_notifier(&batch);
1113 }
1114
1115 Ok(Event::from(log))
1116}
1117
1118#[derive(Clone, Copy, Debug, Snafu)]
1119pub(crate) enum ApiError {
1120 MissingAuthorization,
1121 InvalidAuthorization,
1122 UnsupportedEncoding,
1123 UnsupportedContentType,
1124 MissingChannel,
1125 NoData,
1126 InvalidDataFormat { event: usize },
1127 ServerShutdown,
1128 EmptyEventField { event: usize },
1129 MissingEventField { event: usize },
1130 BadRequest,
1131 ServiceUnavailable,
1132 AckIsDisabled,
1133}
1134
1135impl warp::reject::Reject for ApiError {}
1136
1137mod splunk_response {
1139 use serde::Serialize;
1140
1141 pub enum HecStatusCode {
1143 Success = 0,
1144 TokenIsRequired = 2,
1145 InvalidAuthorization = 3,
1146 NoData = 5,
1147 InvalidDataFormat = 6,
1148 ServerIsBusy = 9,
1149 DataChannelIsMissing = 10,
1150 EventFieldIsRequired = 12,
1151 EventFieldCannotBeBlank = 13,
1152 AckIsDisabled = 14,
1153 }
1154
1155 #[derive(Serialize)]
1156 pub enum HecResponseMetadata {
1157 #[serde(rename = "ackId")]
1158 AckId(u64),
1159 #[serde(rename = "invalid-event-number")]
1160 InvalidEventNumber(usize),
1161 }
1162
1163 #[derive(Serialize)]
1164 pub struct HecResponse {
1165 text: &'static str,
1166 code: u8,
1167 #[serde(skip_serializing_if = "Option::is_none", flatten)]
1168 pub metadata: Option<HecResponseMetadata>,
1169 }
1170
1171 impl HecResponse {
1172 pub const fn new(code: HecStatusCode) -> Self {
1173 let text = match code {
1174 HecStatusCode::Success => "Success",
1175 HecStatusCode::TokenIsRequired => "Token is required",
1176 HecStatusCode::InvalidAuthorization => "Invalid authorization",
1177 HecStatusCode::NoData => "No data",
1178 HecStatusCode::InvalidDataFormat => "Invalid data format",
1179 HecStatusCode::DataChannelIsMissing => "Data channel is missing",
1180 HecStatusCode::EventFieldIsRequired => "Event field is required",
1181 HecStatusCode::EventFieldCannotBeBlank => "Event field cannot be blank",
1182 HecStatusCode::ServerIsBusy => "Server is busy",
1183 HecStatusCode::AckIsDisabled => "Ack is disabled",
1184 };
1185
1186 Self {
1187 text,
1188 code: code as u8,
1189 metadata: None,
1190 }
1191 }
1192
1193 pub const fn with_metadata(mut self, metadata: HecResponseMetadata) -> Self {
1194 self.metadata = Some(metadata);
1195 self
1196 }
1197 }
1198
1199 pub const INVALID_AUTHORIZATION: HecResponse =
1200 HecResponse::new(HecStatusCode::InvalidAuthorization);
1201 pub const TOKEN_IS_REQUIRED: HecResponse = HecResponse::new(HecStatusCode::TokenIsRequired);
1202 pub const NO_DATA: HecResponse = HecResponse::new(HecStatusCode::NoData);
1203 pub const SUCCESS: HecResponse = HecResponse::new(HecStatusCode::Success);
1204 pub const SERVER_IS_BUSY: HecResponse = HecResponse::new(HecStatusCode::ServerIsBusy);
1205 pub const NO_CHANNEL: HecResponse = HecResponse::new(HecStatusCode::DataChannelIsMissing);
1206 pub const ACK_IS_DISABLED: HecResponse = HecResponse::new(HecStatusCode::AckIsDisabled);
1207}
1208
1209fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
1210 let body = if let Some(ack_id) = maybe_ack_id {
1211 HecResponse::new(HecStatusCode::Success).with_metadata(HecResponseMetadata::AckId(ack_id))
1212 } else {
1213 splunk_response::SUCCESS
1214 };
1215 response_json(StatusCode::OK, body)
1216}
1217
1218fn response_plain(code: StatusCode, msg: &'static str) -> Response {
1219 warp::reply::with_status(
1220 warp::reply::with_header(msg, http::header::CONTENT_TYPE, "text/plain; charset=utf-8"),
1221 code,
1222 )
1223 .into_response()
1224}
1225
1226async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
1227 if let Some(&error) = rejection.find::<ApiError>() {
1228 emit!(SplunkHecRequestError { error });
1229 Ok((match error {
1230 ApiError::MissingAuthorization => {
1231 response_json(StatusCode::UNAUTHORIZED, splunk_response::TOKEN_IS_REQUIRED)
1232 }
1233 ApiError::InvalidAuthorization => response_json(
1234 StatusCode::UNAUTHORIZED,
1235 splunk_response::INVALID_AUTHORIZATION,
1236 ),
1237 ApiError::UnsupportedEncoding => empty_response(StatusCode::UNSUPPORTED_MEDIA_TYPE),
1238 ApiError::UnsupportedContentType => response_plain(
1239 StatusCode::UNSUPPORTED_MEDIA_TYPE,
1240 "The request's content-type is not supported",
1241 ),
1242 ApiError::MissingChannel => {
1243 response_json(StatusCode::BAD_REQUEST, splunk_response::NO_CHANNEL)
1244 }
1245 ApiError::NoData => response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA),
1246 ApiError::ServerShutdown => empty_response(StatusCode::SERVICE_UNAVAILABLE),
1247 ApiError::InvalidDataFormat { event } => response_json(
1248 StatusCode::BAD_REQUEST,
1249 HecResponse::new(HecStatusCode::InvalidDataFormat)
1250 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1251 ),
1252 ApiError::EmptyEventField { event } => response_json(
1253 StatusCode::BAD_REQUEST,
1254 HecResponse::new(HecStatusCode::EventFieldCannotBeBlank)
1255 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1256 ),
1257 ApiError::MissingEventField { event } => response_json(
1258 StatusCode::BAD_REQUEST,
1259 HecResponse::new(HecStatusCode::EventFieldIsRequired)
1260 .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1261 ),
1262 ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
1263 ApiError::ServiceUnavailable => response_json(
1264 StatusCode::SERVICE_UNAVAILABLE,
1265 splunk_response::SERVER_IS_BUSY,
1266 ),
1267 ApiError::AckIsDisabled => {
1268 response_json(StatusCode::BAD_REQUEST, splunk_response::ACK_IS_DISABLED)
1269 }
1270 },))
1271 } else {
1272 Err(rejection)
1273 }
1274}
1275
1276fn empty_response(code: StatusCode) -> Response {
1278 let mut res = Response::default();
1279 *res.status_mut() = code;
1280 res
1281}
1282
1283fn response_json(code: StatusCode, body: impl Serialize) -> Response {
1285 warp::reply::with_status(warp::reply::json(&body), code).into_response()
1286}
1287
1288#[cfg(feature = "sinks-splunk_hec")]
1289#[cfg(test)]
1290mod tests {
1291 use std::{net::SocketAddr, num::NonZeroU64};
1292
1293 use chrono::{TimeZone, Utc};
1294 use futures_util::Stream;
1295 use http::Uri;
1296 use reqwest::{RequestBuilder, Response};
1297 use serde::Deserialize;
1298 use vector_lib::codecs::{
1299 decoding::DeserializerConfig, BytesDecoderConfig, JsonSerializerConfig,
1300 TextSerializerConfig,
1301 };
1302 use vector_lib::sensitive_string::SensitiveString;
1303 use vector_lib::{event::EventStatus, schema::Definition};
1304 use vrl::path::PathPrefix;
1305
1306 use super::*;
1307 use crate::{
1308 codecs::{DecodingConfig, EncodingConfig},
1309 components::validation::prelude::*,
1310 config::{log_schema, SinkConfig, SinkContext, SourceConfig, SourceContext},
1311 event::{Event, LogEvent},
1312 sinks::{
1313 splunk_hec::logs::config::HecLogsSinkConfig,
1314 util::{BatchConfig, Compression, TowerRequestConfig},
1315 Healthcheck, VectorSink,
1316 },
1317 sources::splunk_hec::acknowledgements::{HecAckStatusRequest, HecAckStatusResponse},
1318 test_util::{
1319 collect_n,
1320 components::{
1321 assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
1322 HTTP_PUSH_SOURCE_TAGS,
1323 },
1324 next_addr, wait_for_tcp,
1325 },
1326 SourceSender,
1327 };
1328
1329 #[test]
1330 fn generate_config() {
1331 crate::test_util::test_generate_config::<SplunkConfig>();
1332 }
1333
1334 const TOKEN: &str = "token";
1336 const VALID_TOKENS: &[&str; 2] = &[TOKEN, "secondary-token"];
1337
1338 async fn source(
1339 acknowledgements: Option<HecAcknowledgementsConfig>,
1340 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
1341 source_with(Some(TOKEN.to_owned().into()), None, acknowledgements, false).await
1342 }
1343
1344 async fn source_with(
1345 token: Option<SensitiveString>,
1346 valid_tokens: Option<&[&str]>,
1347 acknowledgements: Option<HecAcknowledgementsConfig>,
1348 store_hec_token: bool,
1349 ) -> (impl Stream<Item = Event> + Unpin + use<>, SocketAddr) {
1350 let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1351 let address = next_addr();
1352 let valid_tokens =
1353 valid_tokens.map(|tokens| tokens.iter().map(|v| v.to_string().into()).collect());
1354 let cx = SourceContext::new_test(sender, None);
1355 tokio::spawn(async move {
1356 SplunkConfig {
1357 address,
1358 token,
1359 valid_tokens,
1360 tls: None,
1361 acknowledgements: acknowledgements.unwrap_or_default(),
1362 store_hec_token,
1363 log_namespace: None,
1364 keepalive: Default::default(),
1365 }
1366 .build(cx)
1367 .await
1368 .unwrap()
1369 .await
1370 .unwrap()
1371 });
1372 wait_for_tcp(address).await;
1373 (recv, address)
1374 }
1375
1376 async fn sink(
1377 address: SocketAddr,
1378 encoding: EncodingConfig,
1379 compression: Compression,
1380 ) -> (VectorSink, Healthcheck) {
1381 HecLogsSinkConfig {
1382 default_token: TOKEN.to_owned().into(),
1383 endpoint: format!("http://{address}"),
1384 host_key: None,
1385 indexed_fields: vec![],
1386 index: None,
1387 sourcetype: None,
1388 source: None,
1389 encoding,
1390 compression,
1391 batch: BatchConfig::default(),
1392 request: TowerRequestConfig::default(),
1393 tls: None,
1394 acknowledgements: Default::default(),
1395 timestamp_nanos_key: None,
1396 timestamp_key: None,
1397 auto_extract_timestamp: None,
1398 endpoint_target: Default::default(),
1399 }
1400 .build(SinkContext::default())
1401 .await
1402 .unwrap()
1403 }
1404
1405 async fn start(
1406 encoding: EncodingConfig,
1407 compression: Compression,
1408 acknowledgements: Option<HecAcknowledgementsConfig>,
1409 ) -> (VectorSink, impl Stream<Item = Event> + Unpin) {
1410 let (source, address) = source(acknowledgements).await;
1411 let (sink, health) = sink(address, encoding, compression).await;
1412 assert!(health.await.is_ok());
1413 (sink, source)
1414 }
1415
1416 async fn channel_n(
1417 messages: Vec<impl Into<String> + Send + 'static>,
1418 sink: VectorSink,
1419 source: impl Stream<Item = Event> + Unpin,
1420 ) -> Vec<Event> {
1421 let n = messages.len();
1422
1423 tokio::spawn(async move {
1424 sink.run_events(
1425 messages
1426 .into_iter()
1427 .map(|s| Event::Log(LogEvent::from(s.into()))),
1428 )
1429 .await
1430 .unwrap();
1431 });
1432
1433 let events = collect_n(source, n).await;
1434 assert_eq!(n, events.len());
1435
1436 events
1437 }
1438
1439 #[derive(Clone, Copy, Debug)]
1440 enum Channel<'a> {
1441 Header(&'a str),
1442 QueryParam(&'a str),
1443 }
1444
1445 #[derive(Default)]
1446 struct SendWithOpts<'a> {
1447 channel: Option<Channel<'a>>,
1448 forwarded_for: Option<String>,
1449 }
1450
1451 async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
1452 let channel = Channel::Header("channel");
1453 let options = SendWithOpts {
1454 channel: Some(channel),
1455 forwarded_for: None,
1456 };
1457 send_with(address, api, message, TOKEN, &options).await
1458 }
1459
1460 fn build_request(
1461 address: SocketAddr,
1462 api: &str,
1463 message: &str,
1464 token: &str,
1465 opts: &SendWithOpts<'_>,
1466 ) -> RequestBuilder {
1467 let mut b = reqwest::Client::new()
1468 .post(format!("http://{address}/{api}"))
1469 .header("Authorization", format!("Splunk {token}"));
1470
1471 b = match opts.channel {
1472 Some(c) => match c {
1473 Channel::Header(v) => b.header("x-splunk-request-channel", v),
1474 Channel::QueryParam(v) => b.query(&[("channel", v)]),
1475 },
1476 None => b,
1477 };
1478
1479 b = match &opts.forwarded_for {
1480 Some(f) => b.header("X-Forwarded-For", f),
1481 None => b,
1482 };
1483
1484 b.body(message.to_owned())
1485 }
1486
1487 async fn send_with(
1488 address: SocketAddr,
1489 api: &str,
1490 message: &str,
1491 token: &str,
1492 opts: &SendWithOpts<'_>,
1493 ) -> u16 {
1494 let b = build_request(address, api, message, token, opts);
1495 b.send().await.unwrap().status().as_u16()
1496 }
1497
1498 async fn send_with_response(
1499 address: SocketAddr,
1500 api: &str,
1501 message: &str,
1502 token: &str,
1503 opts: &SendWithOpts<'_>,
1504 ) -> Response {
1505 let b = build_request(address, api, message, token, opts);
1506 b.send().await.unwrap()
1507 }
1508
1509 #[tokio::test]
1510 async fn no_compression_text_event() {
1511 let message = "gzip_text_event";
1512 let (sink, source) = start(
1513 TextSerializerConfig::default().into(),
1514 Compression::None,
1515 None,
1516 )
1517 .await;
1518
1519 let event = channel_n(vec![message], sink, source).await.remove(0);
1520
1521 assert_eq!(
1522 event.as_log()[log_schema().message_key().unwrap().to_string()],
1523 message.into()
1524 );
1525 assert!(event.as_log().get_timestamp().is_some());
1526 assert_eq!(
1527 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1528 "splunk_hec".into()
1529 );
1530 assert!(event.metadata().splunk_hec_token().is_none());
1531 }
1532
1533 #[tokio::test]
1534 async fn one_simple_text_event() {
1535 let message = "one_simple_text_event";
1536 let (sink, source) = start(
1537 TextSerializerConfig::default().into(),
1538 Compression::gzip_default(),
1539 None,
1540 )
1541 .await;
1542
1543 let event = channel_n(vec![message], sink, source).await.remove(0);
1544
1545 assert_eq!(
1546 event.as_log()[log_schema().message_key().unwrap().to_string()],
1547 message.into()
1548 );
1549 assert!(event.as_log().get_timestamp().is_some());
1550 assert_eq!(
1551 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1552 "splunk_hec".into()
1553 );
1554 assert!(event.metadata().splunk_hec_token().is_none());
1555 }
1556
1557 #[tokio::test]
1558 async fn multiple_simple_text_event() {
1559 let n = 200;
1560 let (sink, source) = start(
1561 TextSerializerConfig::default().into(),
1562 Compression::None,
1563 None,
1564 )
1565 .await;
1566
1567 let messages = (0..n)
1568 .map(|i| format!("multiple_simple_text_event_{i}"))
1569 .collect::<Vec<_>>();
1570 let events = channel_n(messages.clone(), sink, source).await;
1571
1572 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1573 assert_eq!(
1574 event.as_log()[log_schema().message_key().unwrap().to_string()],
1575 msg.into()
1576 );
1577 assert!(event.as_log().get_timestamp().is_some());
1578 assert_eq!(
1579 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1580 "splunk_hec".into()
1581 );
1582 assert!(event.metadata().splunk_hec_token().is_none());
1583 }
1584 }
1585
1586 #[tokio::test]
1587 async fn one_simple_json_event() {
1588 let message = "one_simple_json_event";
1589 let (sink, source) = start(
1590 JsonSerializerConfig::default().into(),
1591 Compression::gzip_default(),
1592 None,
1593 )
1594 .await;
1595
1596 let event = channel_n(vec![message], sink, source).await.remove(0);
1597
1598 assert_eq!(
1599 event.as_log()[log_schema().message_key().unwrap().to_string()],
1600 message.into()
1601 );
1602 assert!(event.as_log().get_timestamp().is_some());
1603 assert_eq!(
1604 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1605 "splunk_hec".into()
1606 );
1607 assert!(event.metadata().splunk_hec_token().is_none());
1608 }
1609
1610 #[tokio::test]
1611 async fn multiple_simple_json_event() {
1612 let n = 200;
1613 let (sink, source) = start(
1614 JsonSerializerConfig::default().into(),
1615 Compression::gzip_default(),
1616 None,
1617 )
1618 .await;
1619
1620 let messages = (0..n)
1621 .map(|i| format!("multiple_simple_json_event{i}"))
1622 .collect::<Vec<_>>();
1623 let events = channel_n(messages.clone(), sink, source).await;
1624
1625 for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1626 assert_eq!(
1627 event.as_log()[log_schema().message_key().unwrap().to_string()],
1628 msg.into()
1629 );
1630 assert!(event.as_log().get_timestamp().is_some());
1631 assert_eq!(
1632 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1633 "splunk_hec".into()
1634 );
1635 assert!(event.metadata().splunk_hec_token().is_none());
1636 }
1637 }
1638
1639 #[tokio::test]
1640 async fn json_event() {
1641 let (sink, source) = start(
1642 JsonSerializerConfig::default().into(),
1643 Compression::gzip_default(),
1644 None,
1645 )
1646 .await;
1647
1648 let mut log = LogEvent::default();
1649 log.insert("greeting", "hello");
1650 log.insert("name", "bob");
1651 sink.run_events(vec![log.into()]).await.unwrap();
1652
1653 let event = collect_n(source, 1).await.remove(0).into_log();
1654 assert_eq!(event["greeting"], "hello".into());
1655 assert_eq!(event["name"], "bob".into());
1656 assert!(event.get_timestamp().is_some());
1657 assert_eq!(
1658 event[log_schema().source_type_key().unwrap().to_string()],
1659 "splunk_hec".into()
1660 );
1661 assert!(event.metadata().splunk_hec_token().is_none());
1662 }
1663
1664 #[tokio::test]
1665 async fn json_invalid_path_event() {
1666 let (sink, source) = start(
1667 JsonSerializerConfig::default().into(),
1668 Compression::gzip_default(),
1669 None,
1670 )
1671 .await;
1672
1673 let mut log = LogEvent::default();
1674 log.insert(event_path!("(greeting | thing"), "hello");
1677 sink.run_events(vec![log.into()]).await.unwrap();
1678
1679 let event = collect_n(source, 1).await.remove(0).into_log();
1680 assert_eq!(
1681 event.get(event_path!("(greeting | thing")),
1682 Some(&Value::from("hello"))
1683 );
1684 }
1685
1686 #[tokio::test]
1687 async fn line_to_message() {
1688 let (sink, source) = start(
1689 JsonSerializerConfig::default().into(),
1690 Compression::gzip_default(),
1691 None,
1692 )
1693 .await;
1694
1695 let mut event = LogEvent::default();
1696 event.insert("line", "hello");
1697 sink.run_events(vec![event.into()]).await.unwrap();
1698
1699 let event = collect_n(source, 1).await.remove(0);
1700 assert_eq!(
1701 event.as_log()[log_schema().message_key().unwrap().to_string()],
1702 "hello".into()
1703 );
1704 assert!(event.metadata().splunk_hec_token().is_none());
1705 }
1706
1707 #[tokio::test]
1708 async fn raw() {
1709 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1710 let message = "raw";
1711 let (source, address) = source(None).await;
1712
1713 assert_eq!(200, post(address, "services/collector/raw", message).await);
1714
1715 let event = collect_n(source, 1).await.remove(0);
1716 assert_eq!(
1717 event.as_log()[log_schema().message_key().unwrap().to_string()],
1718 message.into()
1719 );
1720 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1721 assert!(event.as_log().get_timestamp().is_some());
1722 assert_eq!(
1723 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1724 "splunk_hec".into()
1725 );
1726 assert!(event.metadata().splunk_hec_token().is_none());
1727 })
1728 .await;
1729 }
1730
1731 #[tokio::test]
1732 async fn root() {
1733 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1734 let message = r#"{ "event": { "message": "root"} }"#;
1735 let (source, address) = source(None).await;
1736
1737 assert_eq!(200, post(address, "services/collector", message).await);
1738
1739 let event = collect_n(source, 1).await.remove(0);
1740 assert_eq!(
1741 event.as_log()[log_schema().message_key().unwrap().to_string()],
1742 "root".into()
1743 );
1744 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1745 assert!(event.as_log().get_timestamp().is_some());
1746 assert_eq!(
1747 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1748 "splunk_hec".into()
1749 );
1750 assert!(event.metadata().splunk_hec_token().is_none());
1751 })
1752 .await;
1753 }
1754
1755 #[tokio::test]
1756 async fn channel_header() {
1757 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1758 let message = "raw";
1759 let (source, address) = source(None).await;
1760
1761 let opts = SendWithOpts {
1762 channel: Some(Channel::Header("guid")),
1763 forwarded_for: None,
1764 };
1765
1766 assert_eq!(
1767 200,
1768 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1769 );
1770
1771 let event = collect_n(source, 1).await.remove(0);
1772 assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1773 })
1774 .await;
1775 }
1776
1777 #[tokio::test]
1778 async fn xff_header_raw() {
1779 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1780 let message = "raw";
1781 let (source, address) = source(None).await;
1782
1783 let opts = SendWithOpts {
1784 channel: Some(Channel::Header("guid")),
1785 forwarded_for: Some(String::from("10.0.0.1")),
1786 };
1787
1788 assert_eq!(
1789 200,
1790 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1791 );
1792
1793 let event = collect_n(source, 1).await.remove(0);
1794 assert_eq!(
1795 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1796 "10.0.0.1".into()
1797 );
1798 })
1799 .await;
1800 }
1801
1802 #[tokio::test]
1804 async fn xff_header_event_with_host_field() {
1805 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1806 let message = r#"{"event":"first", "host": "10.1.0.2"}"#;
1807 let (source, address) = source(None).await;
1808
1809 let opts = SendWithOpts {
1810 channel: Some(Channel::Header("guid")),
1811 forwarded_for: Some(String::from("10.0.0.1")),
1812 };
1813
1814 assert_eq!(
1815 200,
1816 send_with(address, "services/collector/event", message, TOKEN, &opts).await
1817 );
1818
1819 let event = collect_n(source, 1).await.remove(0);
1820 assert_eq!(
1821 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1822 "10.1.0.2".into()
1823 );
1824 })
1825 .await;
1826 }
1827
1828 #[tokio::test]
1830 async fn xff_header_event_without_host_field() {
1831 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1832 let message = r#"{"event":"first", "color": "blue"}"#;
1833 let (source, address) = source(None).await;
1834
1835 let opts = SendWithOpts {
1836 channel: Some(Channel::Header("guid")),
1837 forwarded_for: Some(String::from("10.0.0.1")),
1838 };
1839
1840 assert_eq!(
1841 200,
1842 send_with(address, "services/collector/event", message, TOKEN, &opts).await
1843 );
1844
1845 let event = collect_n(source, 1).await.remove(0);
1846 assert_eq!(
1847 event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1848 "10.0.0.1".into()
1849 );
1850 })
1851 .await;
1852 }
1853
1854 #[tokio::test]
1855 async fn channel_query_param() {
1856 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1857 let message = "raw";
1858 let (source, address) = source(None).await;
1859
1860 let opts = SendWithOpts {
1861 channel: Some(Channel::QueryParam("guid")),
1862 forwarded_for: None,
1863 };
1864
1865 assert_eq!(
1866 200,
1867 send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1868 );
1869
1870 let event = collect_n(source, 1).await.remove(0);
1871 assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1872 })
1873 .await;
1874 }
1875
1876 #[tokio::test]
1877 async fn no_data() {
1878 let (_source, address) = source(None).await;
1879
1880 assert_eq!(400, post(address, "services/collector/event", "").await);
1881 }
1882
1883 #[tokio::test]
1884 async fn invalid_token() {
1885 assert_source_error(&COMPONENT_ERROR_TAGS, async {
1886 let (_source, address) = source(None).await;
1887 let opts = SendWithOpts {
1888 channel: Some(Channel::Header("channel")),
1889 forwarded_for: None,
1890 };
1891
1892 assert_eq!(
1893 401,
1894 send_with(address, "services/collector/event", "", "nope", &opts).await
1895 );
1896 })
1897 .await;
1898 }
1899
1900 #[tokio::test]
1901 async fn health_ignores_token() {
1902 let (_source, address) = source(None).await;
1903
1904 let res = reqwest::Client::new()
1905 .get(format!("http://{address}/services/collector/health"))
1906 .header("Authorization", format!("Splunk {}", "invalid token"))
1907 .send()
1908 .await
1909 .unwrap();
1910
1911 assert_eq!(200, res.status().as_u16());
1912 }
1913
1914 #[tokio::test]
1915 async fn health() {
1916 let (_source, address) = source(None).await;
1917
1918 let res = reqwest::Client::new()
1919 .get(format!("http://{address}/services/collector/health"))
1920 .send()
1921 .await
1922 .unwrap();
1923
1924 assert_eq!(200, res.status().as_u16());
1925 }
1926
1927 #[tokio::test]
1928 async fn secondary_token() {
1929 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1930 let message = r#"{"event":"first", "color": "blue"}"#;
1931 let (_source, address) = source_with(None, Some(VALID_TOKENS), None, false).await;
1932 let options = SendWithOpts {
1933 channel: None,
1934 forwarded_for: None,
1935 };
1936
1937 assert_eq!(
1938 200,
1939 send_with(
1940 address,
1941 "services/collector/event",
1942 message,
1943 VALID_TOKENS.get(1).unwrap(),
1944 &options
1945 )
1946 .await
1947 );
1948 })
1949 .await;
1950 }
1951
1952 #[tokio::test]
1953 async fn event_service_token_passthrough_enabled() {
1954 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1955 let message = "passthrough_token_enabled";
1956 let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
1957 let (sink, health) = sink(
1958 address,
1959 TextSerializerConfig::default().into(),
1960 Compression::gzip_default(),
1961 )
1962 .await;
1963 assert!(health.await.is_ok());
1964
1965 let event = channel_n(vec![message], sink, source).await.remove(0);
1966
1967 assert_eq!(
1968 event.as_log()[log_schema().message_key().unwrap().to_string()],
1969 message.into()
1970 );
1971 assert_eq!(
1972 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
1973 TOKEN
1974 );
1975 })
1976 .await;
1977 }
1978
1979 #[tokio::test]
1980 async fn raw_service_token_passthrough_enabled() {
1981 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1982 let message = "raw";
1983 let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
1984
1985 assert_eq!(200, post(address, "services/collector/raw", message).await);
1986
1987 let event = collect_n(source, 1).await.remove(0);
1988 assert_eq!(
1989 event.as_log()[log_schema().message_key().unwrap().to_string()],
1990 message.into()
1991 );
1992 assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1993 assert!(event.as_log().get_timestamp().is_some());
1994 assert_eq!(
1995 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1996 "splunk_hec".into()
1997 );
1998 assert_eq!(
1999 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2000 TOKEN
2001 );
2002 })
2003 .await;
2004 }
2005
2006 #[tokio::test]
2007 async fn no_authorization() {
2008 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2009 let message = "no_authorization";
2010 let (source, address) = source_with(None, None, None, false).await;
2011 let (sink, health) = sink(
2012 address,
2013 TextSerializerConfig::default().into(),
2014 Compression::gzip_default(),
2015 )
2016 .await;
2017 assert!(health.await.is_ok());
2018
2019 let event = channel_n(vec![message], sink, source).await.remove(0);
2020
2021 assert_eq!(
2022 event.as_log()[log_schema().message_key().unwrap().to_string()],
2023 message.into()
2024 );
2025 assert!(event.metadata().splunk_hec_token().is_none());
2026 })
2027 .await;
2028 }
2029
2030 #[tokio::test]
2031 async fn no_authorization_token_passthrough_enabled() {
2032 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2033 let message = "no_authorization";
2034 let (source, address) = source_with(None, None, None, true).await;
2035 let (sink, health) = sink(
2036 address,
2037 TextSerializerConfig::default().into(),
2038 Compression::gzip_default(),
2039 )
2040 .await;
2041 assert!(health.await.is_ok());
2042
2043 let event = channel_n(vec![message], sink, source).await.remove(0);
2044
2045 assert_eq!(
2046 event.as_log()[log_schema().message_key().unwrap().to_string()],
2047 message.into()
2048 );
2049 assert_eq!(
2050 &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2051 TOKEN
2052 );
2053 })
2054 .await;
2055 }
2056
2057 #[tokio::test]
2058 async fn partial() {
2059 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2060 let message = r#"{"event":"first"}{"event":"second""#;
2061 let (source, address) = source(None).await;
2062
2063 assert_eq!(
2064 400,
2065 post(address, "services/collector/event", message).await
2066 );
2067
2068 let event = collect_n(source, 1).await.remove(0);
2069 assert_eq!(
2070 event.as_log()[log_schema().message_key().unwrap().to_string()],
2071 "first".into()
2072 );
2073 assert!(event.as_log().get_timestamp().is_some());
2074 assert_eq!(
2075 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2076 "splunk_hec".into()
2077 );
2078 })
2079 .await;
2080 }
2081
2082 #[tokio::test]
2083 async fn handles_newlines() {
2084 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2085 let message = r#"
2086{"event":"first"}
2087 "#;
2088 let (source, address) = source(None).await;
2089
2090 assert_eq!(
2091 200,
2092 post(address, "services/collector/event", message).await
2093 );
2094
2095 let event = collect_n(source, 1).await.remove(0);
2096 assert_eq!(
2097 event.as_log()[log_schema().message_key().unwrap().to_string()],
2098 "first".into()
2099 );
2100 assert!(event.as_log().get_timestamp().is_some());
2101 assert_eq!(
2102 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2103 "splunk_hec".into()
2104 );
2105 })
2106 .await;
2107 }
2108
2109 #[tokio::test]
2110 async fn handles_spaces() {
2111 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2112 let message = r#" {"event":"first"} "#;
2113 let (source, address) = source(None).await;
2114
2115 assert_eq!(
2116 200,
2117 post(address, "services/collector/event", message).await
2118 );
2119
2120 let event = collect_n(source, 1).await.remove(0);
2121 assert_eq!(
2122 event.as_log()[log_schema().message_key().unwrap().to_string()],
2123 "first".into()
2124 );
2125 assert!(event.as_log().get_timestamp().is_some());
2126 assert_eq!(
2127 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2128 "splunk_hec".into()
2129 );
2130 })
2131 .await;
2132 }
2133
2134 #[tokio::test]
2135 async fn handles_non_utf8() {
2136 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2137 let message = b" {\"event\": { \"non\": \"A non UTF8 character \xE4\", \"number\": 2, \"bool\": true } } ";
2138 let (source, address) = source(None).await;
2139
2140 let b = reqwest::Client::new()
2141 .post(format!(
2142 "http://{}/{}",
2143 address, "services/collector/event"
2144 ))
2145 .header("Authorization", format!("Splunk {TOKEN}"))
2146 .body::<&[u8]>(message);
2147
2148 assert_eq!(200, b.send().await.unwrap().status().as_u16());
2149
2150 let event = collect_n(source, 1).await.remove(0);
2151 assert_eq!(event.as_log()["non"], "A non UTF8 character �".into());
2152 assert_eq!(event.as_log()["number"], 2.into());
2153 assert_eq!(event.as_log()["bool"], true.into());
2154 assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some());
2155 assert_eq!(
2156 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2157 "splunk_hec".into()
2158 );
2159 }).await;
2160 }
2161
2162 #[tokio::test]
2163 async fn default() {
2164 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2165 let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
2166 let (source, address) = source(None).await;
2167
2168 assert_eq!(
2169 200,
2170 post(address, "services/collector/event", message).await
2171 );
2172
2173 let events = collect_n(source, 3).await;
2174
2175 assert_eq!(
2176 events[0].as_log()[log_schema().message_key().unwrap().to_string()],
2177 "first".into()
2178 );
2179 assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
2180
2181 assert_eq!(
2182 events[1].as_log()[log_schema().message_key().unwrap().to_string()],
2183 "second".into()
2184 );
2185 assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
2186
2187 assert_eq!(
2188 events[2].as_log()[log_schema().message_key().unwrap().to_string()],
2189 "third".into()
2190 );
2191 assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
2192 }).await;
2193 }
2194
2195 #[test]
2196 fn parse_timestamps() {
2197 let cases = vec![
2198 Utc::now(),
2199 Utc.with_ymd_and_hms(1971, 11, 7, 1, 1, 1)
2200 .single()
2201 .expect("invalid timestamp"),
2202 Utc.with_ymd_and_hms(2011, 8, 5, 1, 1, 1)
2203 .single()
2204 .expect("invalid timestamp"),
2205 Utc.with_ymd_and_hms(2189, 11, 4, 2, 2, 2)
2206 .single()
2207 .expect("invalid timestamp"),
2208 ];
2209
2210 for case in cases {
2211 let sec = case.timestamp();
2212 let millis = case.timestamp_millis();
2213 let nano = case.timestamp_nanos_opt().expect("Timestamp out of range");
2214
2215 assert_eq!(parse_timestamp(sec).unwrap().timestamp(), case.timestamp());
2216 assert_eq!(
2217 parse_timestamp(millis).unwrap().timestamp_millis(),
2218 case.timestamp_millis()
2219 );
2220 assert_eq!(
2221 parse_timestamp(nano)
2222 .unwrap()
2223 .timestamp_nanos_opt()
2224 .unwrap(),
2225 case.timestamp_nanos_opt().expect("Timestamp out of range")
2226 );
2227 }
2228
2229 assert!(parse_timestamp(-1).is_none());
2230 }
2231
2232 #[tokio::test]
2239 async fn host_test() {
2240 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2241 let message = "for the host";
2242 let (sink, source) = start(
2243 TextSerializerConfig::default().into(),
2244 Compression::gzip_default(),
2245 None,
2246 )
2247 .await;
2248
2249 let event = channel_n(vec![message], sink, source).await.remove(0);
2250
2251 assert_eq!(
2252 event.as_log()[log_schema().message_key().unwrap().to_string()],
2253 message.into()
2254 );
2255 assert!(event
2256 .as_log()
2257 .get((PathPrefix::Event, log_schema().host_key().unwrap()))
2258 .is_none());
2259 })
2260 .await;
2261 }
2262
2263 #[derive(Deserialize)]
2264 struct HecAckEventResponse {
2265 text: String,
2266 code: u8,
2267 #[serde(rename = "ackId")]
2268 ack_id: u64,
2269 }
2270
2271 #[tokio::test]
2272 async fn ack_json_event() {
2273 let ack_config = HecAcknowledgementsConfig {
2274 enabled: Some(true),
2275 ..Default::default()
2276 };
2277 let (source, address) = source(Some(ack_config)).await;
2278 let event_message = r#"{"event":"first", "color": "blue"}{"event":"second"}"#;
2279 let opts = SendWithOpts {
2280 channel: Some(Channel::Header("guid")),
2281 forwarded_for: None,
2282 };
2283 let event_res = send_with_response(
2284 address,
2285 "services/collector/event",
2286 event_message,
2287 TOKEN,
2288 &opts,
2289 )
2290 .await
2291 .json::<HecAckEventResponse>()
2292 .await
2293 .unwrap();
2294 assert_eq!("Success", event_res.text.as_str());
2295 assert_eq!(0, event_res.code);
2296 _ = collect_n(source, 1).await;
2297
2298 let ack_message = serde_json::to_string(&HecAckStatusRequest {
2299 acks: vec![event_res.ack_id],
2300 })
2301 .unwrap();
2302 let ack_res = send_with_response(
2303 address,
2304 "services/collector/ack",
2305 ack_message.as_str(),
2306 TOKEN,
2307 &opts,
2308 )
2309 .await
2310 .json::<HecAckStatusResponse>()
2311 .await
2312 .unwrap();
2313 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2314 }
2315
2316 #[tokio::test]
2317 async fn ack_raw_event() {
2318 let ack_config = HecAcknowledgementsConfig {
2319 enabled: Some(true),
2320 ..Default::default()
2321 };
2322 let (source, address) = source(Some(ack_config)).await;
2323 let event_message = "raw event message";
2324 let opts = SendWithOpts {
2325 channel: Some(Channel::Header("guid")),
2326 forwarded_for: None,
2327 };
2328 let event_res = send_with_response(
2329 address,
2330 "services/collector/raw",
2331 event_message,
2332 TOKEN,
2333 &opts,
2334 )
2335 .await
2336 .json::<HecAckEventResponse>()
2337 .await
2338 .unwrap();
2339 assert_eq!("Success", event_res.text.as_str());
2340 assert_eq!(0, event_res.code);
2341 _ = collect_n(source, 1).await;
2342
2343 let ack_message = serde_json::to_string(&HecAckStatusRequest {
2344 acks: vec![event_res.ack_id],
2345 })
2346 .unwrap();
2347 let ack_res = send_with_response(
2348 address,
2349 "services/collector/ack",
2350 ack_message.as_str(),
2351 TOKEN,
2352 &opts,
2353 )
2354 .await
2355 .json::<HecAckStatusResponse>()
2356 .await
2357 .unwrap();
2358 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2359 }
2360
2361 #[tokio::test]
2362 async fn ack_repeat_ack_query() {
2363 let ack_config = HecAcknowledgementsConfig {
2364 enabled: Some(true),
2365 ..Default::default()
2366 };
2367 let (source, address) = source(Some(ack_config)).await;
2368 let event_message = "raw event message";
2369 let opts = SendWithOpts {
2370 channel: Some(Channel::Header("guid")),
2371 forwarded_for: None,
2372 };
2373 let event_res = send_with_response(
2374 address,
2375 "services/collector/raw",
2376 event_message,
2377 TOKEN,
2378 &opts,
2379 )
2380 .await
2381 .json::<HecAckEventResponse>()
2382 .await
2383 .unwrap();
2384 _ = collect_n(source, 1).await;
2385
2386 let ack_message = serde_json::to_string(&HecAckStatusRequest {
2387 acks: vec![event_res.ack_id],
2388 })
2389 .unwrap();
2390 let ack_res = send_with_response(
2391 address,
2392 "services/collector/ack",
2393 ack_message.as_str(),
2394 TOKEN,
2395 &opts,
2396 )
2397 .await
2398 .json::<HecAckStatusResponse>()
2399 .await
2400 .unwrap();
2401 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2402
2403 let ack_res = send_with_response(
2404 address,
2405 "services/collector/ack",
2406 ack_message.as_str(),
2407 TOKEN,
2408 &opts,
2409 )
2410 .await
2411 .json::<HecAckStatusResponse>()
2412 .await
2413 .unwrap();
2414 assert!(!ack_res.acks.get(&event_res.ack_id).unwrap());
2415 }
2416
2417 #[tokio::test]
2418 async fn ack_exceed_max_number_of_ack_channels() {
2419 let ack_config = HecAcknowledgementsConfig {
2420 enabled: Some(true),
2421 max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
2422 ..Default::default()
2423 };
2424
2425 let (_source, address) = source(Some(ack_config)).await;
2426 let mut opts = SendWithOpts {
2427 channel: Some(Channel::Header("guid")),
2428 forwarded_for: None,
2429 };
2430 assert_eq!(
2431 200,
2432 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2433 );
2434
2435 opts.channel = Some(Channel::Header("other-guid"));
2436 assert_eq!(
2437 503,
2438 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2439 );
2440 assert_eq!(
2441 503,
2442 send_with(
2443 address,
2444 "services/collector/event",
2445 r#"{"event":"first"}"#,
2446 TOKEN,
2447 &opts
2448 )
2449 .await
2450 );
2451 }
2452
2453 #[tokio::test]
2454 async fn ack_exceed_max_pending_acks_per_channel() {
2455 let ack_config = HecAcknowledgementsConfig {
2456 enabled: Some(true),
2457 max_pending_acks_per_channel: NonZeroU64::new(1).unwrap(),
2458 ..Default::default()
2459 };
2460
2461 let (source, address) = source(Some(ack_config)).await;
2462 let opts = SendWithOpts {
2463 channel: Some(Channel::Header("guid")),
2464 forwarded_for: None,
2465 };
2466 for _ in 0..5 {
2467 send_with(
2468 address,
2469 "services/collector/event",
2470 r#"{"event":"first"}"#,
2471 TOKEN,
2472 &opts,
2473 )
2474 .await;
2475 }
2476 for _ in 0..5 {
2477 send_with(address, "services/collector/raw", "message", TOKEN, &opts).await;
2478 }
2479 let event_res = send_with_response(
2480 address,
2481 "services/collector/event",
2482 r#"{"event":"this will be acked"}"#,
2483 TOKEN,
2484 &opts,
2485 )
2486 .await
2487 .json::<HecAckEventResponse>()
2488 .await
2489 .unwrap();
2490 _ = collect_n(source, 11).await;
2491
2492 let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest {
2493 acks: (0..10).collect::<Vec<u64>>(),
2494 })
2495 .unwrap();
2496 let ack_res = send_with_response(
2497 address,
2498 "services/collector/ack",
2499 ack_message_dropped.as_str(),
2500 TOKEN,
2501 &opts,
2502 )
2503 .await
2504 .json::<HecAckStatusResponse>()
2505 .await
2506 .unwrap();
2507 assert!(ack_res.acks.values().all(|ack_status| !*ack_status));
2508
2509 let ack_message_acked = serde_json::to_string(&HecAckStatusRequest {
2510 acks: vec![event_res.ack_id],
2511 })
2512 .unwrap();
2513 let ack_res = send_with_response(
2514 address,
2515 "services/collector/ack",
2516 ack_message_acked.as_str(),
2517 TOKEN,
2518 &opts,
2519 )
2520 .await
2521 .json::<HecAckStatusResponse>()
2522 .await
2523 .unwrap();
2524 assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2525 }
2526
2527 #[tokio::test]
2528 async fn ack_service_accepts_parameterized_content_type() {
2529 let ack_config = HecAcknowledgementsConfig {
2530 enabled: Some(true),
2531 ..Default::default()
2532 };
2533 let (source, address) = source(Some(ack_config)).await;
2534 let opts = SendWithOpts {
2535 channel: Some(Channel::Header("guid")),
2536 forwarded_for: None,
2537 };
2538
2539 let event_res = send_with_response(
2540 address,
2541 "services/collector/event",
2542 r#"{"event":"param-test"}"#,
2543 TOKEN,
2544 &opts,
2545 )
2546 .await
2547 .json::<HecAckEventResponse>()
2548 .await
2549 .unwrap();
2550 let _ = collect_n(source, 1).await;
2551
2552 let body = serde_json::to_string(&HecAckStatusRequest {
2553 acks: vec![event_res.ack_id],
2554 })
2555 .unwrap();
2556
2557 let res = reqwest::Client::new()
2558 .post(format!("http://{address}/services/collector/ack"))
2559 .header("Authorization", format!("Splunk {TOKEN}"))
2560 .header("x-splunk-request-channel", "guid")
2561 .header("Content-Type", "application/json; some-random-text; hello")
2562 .body(body)
2563 .send()
2564 .await
2565 .unwrap();
2566
2567 assert_eq!(200, res.status().as_u16());
2568
2569 let _parsed: HecAckStatusResponse = res.json().await.unwrap();
2570 }
2571
2572 #[tokio::test]
2573 async fn event_service_acknowledgements_enabled_channel_required() {
2574 let message = r#"{"event":"first", "color": "blue"}"#;
2575 let ack_config = HecAcknowledgementsConfig {
2576 enabled: Some(true),
2577 ..Default::default()
2578 };
2579 let (_, address) = source(Some(ack_config)).await;
2580
2581 let opts = SendWithOpts {
2582 channel: None,
2583 forwarded_for: None,
2584 };
2585
2586 assert_eq!(
2587 400,
2588 send_with(address, "services/collector/event", message, TOKEN, &opts).await
2589 );
2590 }
2591
2592 #[tokio::test]
2593 async fn ack_service_acknowledgements_disabled() {
2594 let message = r#" {"acks":[0]} "#;
2595 let (_, address) = source(None).await;
2596
2597 let opts = SendWithOpts {
2598 channel: Some(Channel::Header("guid")),
2599 forwarded_for: None,
2600 };
2601
2602 assert_eq!(
2603 400,
2604 send_with(address, "services/collector/ack", message, TOKEN, &opts).await
2605 );
2606 }
2607
2608 #[test]
2609 fn output_schema_definition_vector_namespace() {
2610 let config = SplunkConfig {
2611 log_namespace: Some(true),
2612 ..Default::default()
2613 };
2614
2615 let definition = config
2616 .outputs(LogNamespace::Vector)
2617 .remove(0)
2618 .schema_definition(true);
2619
2620 let expected_definition = Definition::new_with_default_metadata(
2621 Kind::object(Collection::empty()).or_bytes(),
2622 [LogNamespace::Vector],
2623 )
2624 .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
2625 .with_metadata_field(
2626 &owned_value_path!("vector", "source_type"),
2627 Kind::bytes(),
2628 None,
2629 )
2630 .with_metadata_field(
2631 &owned_value_path!("vector", "ingest_timestamp"),
2632 Kind::timestamp(),
2633 None,
2634 )
2635 .with_metadata_field(
2636 &owned_value_path!("splunk_hec", "host"),
2637 Kind::bytes(),
2638 Some("host"),
2639 )
2640 .with_metadata_field(
2641 &owned_value_path!("splunk_hec", "index"),
2642 Kind::bytes(),
2643 None,
2644 )
2645 .with_metadata_field(
2646 &owned_value_path!("splunk_hec", "source"),
2647 Kind::bytes(),
2648 Some("service"),
2649 )
2650 .with_metadata_field(
2651 &owned_value_path!("splunk_hec", "channel"),
2652 Kind::bytes(),
2653 None,
2654 )
2655 .with_metadata_field(
2656 &owned_value_path!("splunk_hec", "sourcetype"),
2657 Kind::bytes(),
2658 None,
2659 );
2660
2661 assert_eq!(definition, Some(expected_definition));
2662 }
2663
2664 #[test]
2665 fn output_schema_definition_legacy_namespace() {
2666 let config = SplunkConfig::default();
2667 let definitions = config
2668 .outputs(LogNamespace::Legacy)
2669 .remove(0)
2670 .schema_definition(true);
2671
2672 let expected_definition = Definition::new_with_default_metadata(
2673 Kind::object(Collection::empty()),
2674 [LogNamespace::Legacy],
2675 )
2676 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
2677 .with_event_field(
2678 &owned_value_path!("message"),
2679 Kind::bytes().or_undefined(),
2680 Some("message"),
2681 )
2682 .with_event_field(
2683 &owned_value_path!("line"),
2684 Kind::array(Collection::empty())
2685 .or_object(Collection::empty())
2686 .or_undefined(),
2687 None,
2688 )
2689 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
2690 .with_event_field(&owned_value_path!("splunk_channel"), Kind::bytes(), None)
2691 .with_event_field(&owned_value_path!("splunk_index"), Kind::bytes(), None)
2692 .with_event_field(
2693 &owned_value_path!("splunk_source"),
2694 Kind::bytes(),
2695 Some("service"),
2696 )
2697 .with_event_field(&owned_value_path!("splunk_sourcetype"), Kind::bytes(), None)
2698 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
2699
2700 assert_eq!(definitions, Some(expected_definition));
2701 }
2702
2703 impl ValidatableComponent for SplunkConfig {
2704 fn validation_configuration() -> ValidationConfiguration {
2705 let config = Self {
2706 address: default_socket_address(),
2707 ..Default::default()
2708 };
2709
2710 let listen_addr_http = format!("http://{}/services/collector/event", config.address);
2711 let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
2712
2713 let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into();
2714 let framing = BytesDecoderConfig::new().into();
2715 let decoding = DeserializerConfig::Json(Default::default());
2716
2717 let external_resource = ExternalResource::new(
2718 ResourceDirection::Push,
2719 HttpResourceConfig::from_parts(uri, None).with_headers(HashMap::from([(
2720 X_SPLUNK_REQUEST_CHANNEL.to_string(),
2721 "channel".to_string(),
2722 )])),
2723 DecodingConfig::new(framing, decoding, false.into()),
2724 );
2725
2726 ValidationConfiguration::from_source(
2727 Self::NAME,
2728 log_namespace,
2729 vec![ComponentTestCaseConfig::from_source(
2730 config,
2731 None,
2732 Some(external_resource),
2733 )],
2734 )
2735 }
2736 }
2737
2738 register_validatable_component!(SplunkConfig);
2739}