vector/sources/splunk_hec/
mod.rs

1use std::{
2    collections::HashMap,
3    convert::Infallible,
4    io::Read,
5    net::{Ipv4Addr, SocketAddr},
6    sync::Arc,
7    time::Duration,
8};
9
10use bytes::{Buf, Bytes};
11use chrono::{DateTime, TimeZone, Utc};
12use flate2::read::MultiGzDecoder;
13use futures::FutureExt;
14use http::StatusCode;
15use hyper::{service::make_service_fn, Server};
16use serde::de::DeserializeOwned;
17use serde::Serialize;
18use serde_json::{
19    de::{Read as JsonRead, StrRead},
20    Deserializer, Value as JsonValue,
21};
22use snafu::Snafu;
23use tokio::net::TcpStream;
24use tower::ServiceBuilder;
25use tracing::Span;
26use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered};
27use vector_lib::lookup::lookup_v2::OptionalValuePath;
28use vector_lib::lookup::{self, event_path, owned_value_path};
29use vector_lib::sensitive_string::SensitiveString;
30use vector_lib::{
31    config::{LegacyKey, LogNamespace},
32    event::BatchNotifier,
33    schema::meaning,
34    EstimatedJsonEncodedSizeOf,
35};
36use vector_lib::{configurable::configurable_component, tls::MaybeTlsIncomingStream};
37use vrl::path::OwnedTargetPath;
38use vrl::value::{kind::Collection, Kind};
39use warp::http::header::{HeaderValue, CONTENT_TYPE};
40use warp::{filters::BoxedFilter, path, reject::Rejection, reply::Response, Filter, Reply};
41
42use self::{
43    acknowledgements::{
44        HecAckStatusRequest, HecAckStatusResponse, HecAcknowledgementsConfig,
45        IndexerAcknowledgement,
46    },
47    splunk_response::{HecResponse, HecResponseMetadata, HecStatusCode},
48};
49use crate::{
50    config::{log_schema, DataType, Resource, SourceConfig, SourceContext, SourceOutput},
51    event::{Event, LogEvent, Value},
52    http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer},
53    internal_events::{
54        EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError,
55    },
56    serde::bool_or_struct,
57    source_sender::ClosedError,
58    tls::{MaybeTlsSettings, TlsEnableableConfig},
59    SourceSender,
60};
61
62mod acknowledgements;
63
64// Event fields unique to splunk_hec source
65pub const CHANNEL: &str = "splunk_channel";
66pub const INDEX: &str = "splunk_index";
67pub const SOURCE: &str = "splunk_source";
68pub const SOURCETYPE: &str = "splunk_sourcetype";
69
70const X_SPLUNK_REQUEST_CHANNEL: &str = "x-splunk-request-channel";
71
72/// Configuration for the `splunk_hec` source.
73#[configurable_component(source("splunk_hec", "Receive logs from Splunk."))]
74#[derive(Clone, Debug)]
75#[serde(deny_unknown_fields, default)]
76pub struct SplunkConfig {
77    /// The socket address to listen for connections on.
78    ///
79    /// The address _must_ include a port.
80    #[serde(default = "default_socket_address")]
81    pub address: SocketAddr,
82
83    /// Optional authorization token.
84    ///
85    /// If supplied, incoming requests must supply this token in the `Authorization` header, just as a client would if
86    /// it was communicating with the Splunk HEC endpoint directly.
87    ///
88    /// If _not_ supplied, the `Authorization` header is ignored and requests are not authenticated.
89    #[configurable(deprecated = "This option has been deprecated, use `valid_tokens` instead.")]
90    token: Option<SensitiveString>,
91
92    /// A list of valid authorization tokens.
93    ///
94    /// If supplied, incoming requests must supply one of these tokens in the `Authorization` header, just as a client
95    /// would if it was communicating with the Splunk HEC endpoint directly.
96    ///
97    /// If _not_ supplied, the `Authorization` header is ignored and requests are not authenticated.
98    #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
99    valid_tokens: Option<Vec<SensitiveString>>,
100
101    /// Whether or not to forward the Splunk HEC authentication token with events.
102    ///
103    /// If set to `true`, when incoming requests contain a Splunk HEC token, the token used is kept in the
104    /// event metadata and preferentially used if the event is sent to a Splunk HEC sink.
105    store_hec_token: bool,
106
107    #[configurable(derived)]
108    tls: Option<TlsEnableableConfig>,
109
110    #[configurable(derived)]
111    #[serde(deserialize_with = "bool_or_struct")]
112    acknowledgements: HecAcknowledgementsConfig,
113
114    /// The namespace to use for logs. This overrides the global settings.
115    #[configurable(metadata(docs::hidden))]
116    #[serde(default)]
117    log_namespace: Option<bool>,
118
119    #[configurable(derived)]
120    #[serde(default)]
121    keepalive: KeepaliveConfig,
122}
123
124impl_generate_config_from_default!(SplunkConfig);
125
126impl Default for SplunkConfig {
127    fn default() -> Self {
128        SplunkConfig {
129            address: default_socket_address(),
130            token: None,
131            valid_tokens: None,
132            tls: None,
133            acknowledgements: Default::default(),
134            store_hec_token: false,
135            log_namespace: None,
136            keepalive: Default::default(),
137        }
138    }
139}
140
141fn default_socket_address() -> SocketAddr {
142    SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8088)
143}
144
145#[async_trait::async_trait]
146#[typetag::serde(name = "splunk_hec")]
147impl SourceConfig for SplunkConfig {
148    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
149        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
150        let shutdown = cx.shutdown.clone();
151        let out = cx.out.clone();
152        let source = SplunkSource::new(self, tls.http_protocol_name(), cx);
153
154        let event_service = source.event_service(out.clone());
155        let raw_service = source.raw_service(out);
156        let health_service = source.health_service();
157        let ack_service = source.ack_service();
158        let options = SplunkSource::options();
159
160        let services = path!("services" / "collector" / ..)
161            .and(
162                event_service
163                    .or(raw_service)
164                    .unify()
165                    .or(health_service)
166                    .unify()
167                    .or(ack_service)
168                    .unify()
169                    .or(options)
170                    .unify(),
171            )
172            .or_else(finish_err);
173
174        let listener = tls.bind(&self.address).await?;
175
176        let keepalive_settings = self.keepalive.clone();
177        Ok(Box::pin(async move {
178            let span = Span::current();
179            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
180                let svc = ServiceBuilder::new()
181                    .layer(build_http_trace_layer(span.clone()))
182                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
183                        MaxConnectionAgeLayer::new(
184                            Duration::from_secs(secs),
185                            keepalive_settings.max_connection_age_jitter_factor,
186                            conn.peer_addr(),
187                        )
188                    }))
189                    .service(warp::service(services.clone()));
190                futures_util::future::ok::<_, Infallible>(svc)
191            });
192
193            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
194                .serve(make_svc)
195                .with_graceful_shutdown(shutdown.map(|_| ()))
196                .await
197                .map_err(|err| {
198                    error!("An error occurred: {:?}.", err);
199                })?;
200
201            Ok(())
202        }))
203    }
204
205    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
206        let log_namespace = global_log_namespace.merge(self.log_namespace);
207
208        let schema_definition = match log_namespace {
209            LogNamespace::Legacy => {
210                let definition = vector_lib::schema::Definition::empty_legacy_namespace()
211                    .with_event_field(
212                        &owned_value_path!("line"),
213                        Kind::object(Collection::empty())
214                            .or_array(Collection::empty())
215                            .or_undefined(),
216                        None,
217                    );
218
219                if let Some(message_key) = log_schema().message_key() {
220                    definition.with_event_field(
221                        message_key,
222                        Kind::bytes().or_undefined(),
223                        Some(meaning::MESSAGE),
224                    )
225                } else {
226                    definition
227                }
228            }
229            LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
230                Kind::bytes().or_object(Collection::empty()),
231                [log_namespace],
232            )
233            .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
234        }
235        .with_standard_vector_source_metadata()
236        .with_source_metadata(
237            SplunkConfig::NAME,
238            log_schema()
239                .host_key()
240                .cloned()
241                .map(LegacyKey::InsertIfEmpty),
242            &owned_value_path!("host"),
243            Kind::bytes(),
244            Some(meaning::HOST),
245        )
246        .with_source_metadata(
247            SplunkConfig::NAME,
248            Some(LegacyKey::Overwrite(owned_value_path!(CHANNEL))),
249            &owned_value_path!("channel"),
250            Kind::bytes(),
251            None,
252        )
253        .with_source_metadata(
254            SplunkConfig::NAME,
255            Some(LegacyKey::Overwrite(owned_value_path!(INDEX))),
256            &owned_value_path!("index"),
257            Kind::bytes(),
258            None,
259        )
260        .with_source_metadata(
261            SplunkConfig::NAME,
262            Some(LegacyKey::Overwrite(owned_value_path!(SOURCE))),
263            &owned_value_path!("source"),
264            Kind::bytes(),
265            Some(meaning::SERVICE),
266        )
267        // Not to be confused with `source_type`.
268        .with_source_metadata(
269            SplunkConfig::NAME,
270            Some(LegacyKey::Overwrite(owned_value_path!(SOURCETYPE))),
271            &owned_value_path!("sourcetype"),
272            Kind::bytes(),
273            None,
274        );
275
276        vec![SourceOutput::new_maybe_logs(
277            DataType::Log,
278            schema_definition,
279        )]
280    }
281
282    fn resources(&self) -> Vec<Resource> {
283        vec![Resource::tcp(self.address)]
284    }
285
286    fn can_acknowledge(&self) -> bool {
287        true
288    }
289}
290
291/// Shared data for responding to requests.
292struct SplunkSource {
293    valid_credentials: Vec<String>,
294    protocol: &'static str,
295    idx_ack: Option<Arc<IndexerAcknowledgement>>,
296    store_hec_token: bool,
297    log_namespace: LogNamespace,
298    events_received: Registered<EventsReceived>,
299}
300
301impl SplunkSource {
302    fn new(config: &SplunkConfig, protocol: &'static str, cx: SourceContext) -> Self {
303        let log_namespace = cx.log_namespace(config.log_namespace);
304        let acknowledgements = cx.do_acknowledgements(config.acknowledgements.enabled.into());
305        let shutdown = cx.shutdown;
306        let valid_tokens = config
307            .valid_tokens
308            .iter()
309            .flatten()
310            .chain(config.token.iter());
311
312        let idx_ack = acknowledgements.then(|| {
313            Arc::new(IndexerAcknowledgement::new(
314                config.acknowledgements.clone(),
315                shutdown,
316            ))
317        });
318
319        SplunkSource {
320            valid_credentials: valid_tokens
321                .map(|token| format!("Splunk {}", token.inner()))
322                .collect(),
323            protocol,
324            idx_ack,
325            store_hec_token: config.store_hec_token,
326            log_namespace,
327            events_received: register!(EventsReceived),
328        }
329    }
330
331    fn event_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
332        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
333            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
334        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
335
336        let splunk_channel = splunk_channel_header
337            .and(splunk_channel_query_param)
338            .map(|header: Option<String>, query_param| header.or(query_param));
339
340        let protocol = self.protocol;
341        let idx_ack = self.idx_ack.clone();
342        let store_hec_token = self.store_hec_token;
343        let log_namespace = self.log_namespace;
344        let events_received = self.events_received.clone();
345
346        warp::post()
347            .and(
348                path!("event")
349                    .or(path!("event" / "1.0"))
350                    .or(warp::path::end()),
351            )
352            .and(self.authorization())
353            .and(splunk_channel)
354            .and(warp::addr::remote())
355            .and(warp::header::optional::<String>("X-Forwarded-For"))
356            .and(self.gzip())
357            .and(warp::body::bytes())
358            .and(warp::path::full())
359            .and_then(
360                move |_,
361                      token: Option<String>,
362                      channel: Option<String>,
363                      remote: Option<SocketAddr>,
364                      remote_addr: Option<String>,
365                      gzip: bool,
366                      body: Bytes,
367                      path: warp::path::FullPath| {
368                    let mut out = out.clone();
369                    let idx_ack = idx_ack.clone();
370                    let events_received = events_received.clone();
371
372                    async move {
373                        if idx_ack.is_some() && channel.is_none() {
374                            return Err(Rejection::from(ApiError::MissingChannel));
375                        }
376
377                        let mut data = Vec::new();
378                        let (byte_size, body) = if gzip {
379                            MultiGzDecoder::new(body.reader())
380                                .read_to_end(&mut data)
381                                .map_err(|_| Rejection::from(ApiError::BadRequest))?;
382                            (data.len(), String::from_utf8_lossy(data.as_slice()))
383                        } else {
384                            (body.len(), String::from_utf8_lossy(body.as_ref()))
385                        };
386                        emit!(HttpBytesReceived {
387                            byte_size,
388                            http_path: path.as_str(),
389                            protocol,
390                        });
391
392                        let (batch, receiver) =
393                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
394                        let maybe_ack_id = match (idx_ack, receiver, channel.clone()) {
395                            (Some(idx_ack), Some(receiver), Some(channel_id)) => {
396                                match idx_ack.get_ack_id_from_channel(channel_id, receiver).await {
397                                    Ok(ack_id) => Some(ack_id),
398                                    Err(rej) => return Err(rej),
399                                }
400                            }
401                            _ => None,
402                        };
403
404                        let mut error = None;
405                        let mut events = Vec::new();
406
407                        let iter: EventIterator<'_, StrRead<'_>> = EventIteratorGenerator {
408                            deserializer: Deserializer::from_str(&body).into_iter::<JsonValue>(),
409                            channel,
410                            remote,
411                            remote_addr,
412                            batch,
413                            token: token.filter(|_| store_hec_token).map(Into::into),
414                            log_namespace,
415                            events_received,
416                        }
417                        .into();
418
419                        for result in iter {
420                            match result {
421                                Ok(event) => events.push(event),
422                                Err(err) => {
423                                    error = Some(err);
424                                    break;
425                                }
426                            }
427                        }
428
429                        if !events.is_empty() {
430                            if let Err(ClosedError) = out.send_batch(events).await {
431                                return Err(Rejection::from(ApiError::ServerShutdown));
432                            }
433                        }
434
435                        if let Some(error) = error {
436                            Err(error)
437                        } else {
438                            Ok(maybe_ack_id)
439                        }
440                    }
441                },
442            )
443            .map(finish_ok)
444            .boxed()
445    }
446
447    fn raw_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
448        let protocol = self.protocol;
449        let idx_ack = self.idx_ack.clone();
450        let store_hec_token = self.store_hec_token;
451        let events_received = self.events_received.clone();
452        let log_namespace = self.log_namespace;
453
454        warp::post()
455            .and(path!("raw" / "1.0").or(path!("raw")))
456            .and(self.authorization())
457            .and(SplunkSource::required_channel())
458            .and(warp::addr::remote())
459            .and(warp::header::optional::<String>("X-Forwarded-For"))
460            .and(self.gzip())
461            .and(warp::body::bytes())
462            .and(warp::path::full())
463            .and_then(
464                move |_,
465                      token: Option<String>,
466                      channel_id: String,
467                      remote: Option<SocketAddr>,
468                      xff: Option<String>,
469                      gzip: bool,
470                      body: Bytes,
471                      path: warp::path::FullPath| {
472                    let mut out = out.clone();
473                    let idx_ack = idx_ack.clone();
474                    let events_received = events_received.clone();
475                    emit!(HttpBytesReceived {
476                        byte_size: body.len(),
477                        http_path: path.as_str(),
478                        protocol,
479                    });
480
481                    async move {
482                        let (batch, receiver) =
483                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
484                        let maybe_ack_id = match (idx_ack, receiver) {
485                            (Some(idx_ack), Some(receiver)) => Some(
486                                idx_ack
487                                    .get_ack_id_from_channel(channel_id.clone(), receiver)
488                                    .await?,
489                            ),
490                            _ => None,
491                        };
492                        let mut event = raw_event(
493                            body,
494                            gzip,
495                            channel_id,
496                            remote,
497                            xff,
498                            batch,
499                            log_namespace,
500                            &events_received,
501                        )?;
502                        if let Some(token) = token.filter(|_| store_hec_token) {
503                            event.metadata_mut().set_splunk_hec_token(token.into());
504                        }
505
506                        let res = out.send_event(event).await;
507                        res.map(|_| maybe_ack_id)
508                            .map_err(|_| Rejection::from(ApiError::ServerShutdown))
509                    }
510                },
511            )
512            .map(finish_ok)
513            .boxed()
514    }
515
516    fn health_service(&self) -> BoxedFilter<(Response,)> {
517        // The Splunk docs document this endpoint as returning a 400 if given an invalid Splunk
518        // token, but, in practice, it seems to ignore the token altogether
519        //
520        // The response body was taken from Splunk 8.2.4
521        //
522        // https://docs.splunk.com/Documentation/Splunk/8.2.5/RESTREF/RESTinput#services.2Fcollector.2Fhealth
523        warp::get()
524            .and(path!("health" / "1.0").or(path!("health")))
525            .map(move |_| {
526                http::Response::builder()
527                    .header(http::header::CONTENT_TYPE, "application/json")
528                    .body(hyper::Body::from(r#"{"text":"HEC is healthy","code":17}"#))
529                    .expect("static response")
530            })
531            .boxed()
532    }
533
534    fn lenient_json_content_type_check<T>() -> impl Filter<Extract = (T,), Error = Rejection> + Clone
535    where
536        T: Send + DeserializeOwned + 'static,
537    {
538        warp::header::optional::<HeaderValue>(CONTENT_TYPE.as_str())
539            .and(warp::body::bytes())
540            .and_then(
541                |ctype: Option<HeaderValue>, body: bytes::Bytes| async move {
542                    let ok = ctype
543                        .as_ref()
544                        .and_then(|v| v.to_str().ok())
545                        .map(|h| h.to_ascii_lowercase().contains("application/json"))
546                        .unwrap_or(true);
547
548                    if !ok {
549                        return Err(warp::reject::custom(ApiError::UnsupportedContentType));
550                    }
551
552                    let value = serde_json::from_slice::<T>(&body)
553                        .map_err(|_| warp::reject::custom(ApiError::BadRequest))?;
554
555                    Ok(value)
556                },
557            )
558    }
559
560    fn ack_service(&self) -> BoxedFilter<(Response,)> {
561        let idx_ack = self.idx_ack.clone();
562
563        warp::post()
564            .and(warp::path!("ack"))
565            .and(self.authorization())
566            .and(SplunkSource::required_channel())
567            .and(Self::lenient_json_content_type_check::<HecAckStatusRequest>())
568            .and_then(move |_, channel: String, req: HecAckStatusRequest| {
569                let idx_ack = idx_ack.clone();
570                async move {
571                    if let Some(idx_ack) = idx_ack {
572                        let acks = idx_ack
573                            .get_acks_status_from_channel(channel, &req.acks)
574                            .await?;
575                        Ok(warp::reply::json(&HecAckStatusResponse { acks }).into_response())
576                    } else {
577                        Err(warp::reject::custom(ApiError::AckIsDisabled))
578                    }
579                }
580            })
581            .boxed()
582    }
583
584    fn options() -> BoxedFilter<(Response,)> {
585        let post = warp::options()
586            .and(
587                path!("event")
588                    .or(path!("event" / "1.0"))
589                    .or(path!("raw" / "1.0"))
590                    .or(path!("raw")),
591            )
592            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
593
594        let get = warp::options()
595            .and(path!("health").or(path!("health" / "1.0")))
596            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
597
598        post.or(get).unify().boxed()
599    }
600
601    /// Authorize request
602    fn authorization(&self) -> BoxedFilter<(Option<String>,)> {
603        let valid_credentials = self.valid_credentials.clone();
604        warp::header::optional("Authorization")
605            .and_then(move |token: Option<String>| {
606                let valid_credentials = valid_credentials.clone();
607                async move {
608                    match (token, valid_credentials.is_empty()) {
609                        // Remove the "Splunk " prefix if present as it is not
610                        // part of the token itself
611                        (token, true) => {
612                            Ok(token
613                                .map(|t| t.strip_prefix("Splunk ").map(Into::into).unwrap_or(t)))
614                        }
615                        (Some(token), false) if valid_credentials.contains(&token) => Ok(Some(
616                            token
617                                .strip_prefix("Splunk ")
618                                .map(Into::into)
619                                .unwrap_or(token),
620                        )),
621                        (Some(_), false) => Err(Rejection::from(ApiError::InvalidAuthorization)),
622                        (None, false) => Err(Rejection::from(ApiError::MissingAuthorization)),
623                    }
624                }
625            })
626            .boxed()
627    }
628
629    /// Is body encoded with gzip
630    fn gzip(&self) -> BoxedFilter<(bool,)> {
631        warp::header::optional::<String>("Content-Encoding")
632            .and_then(|encoding: Option<String>| async move {
633                match encoding {
634                    Some(s) if s.as_bytes() == b"gzip" => Ok(true),
635                    Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
636                    None => Ok(false),
637                }
638            })
639            .boxed()
640    }
641
642    fn required_channel() -> BoxedFilter<(String,)> {
643        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
644            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
645        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
646
647        splunk_channel_header
648            .and(splunk_channel_query_param)
649            .and_then(|header: Option<String>, query_param| async move {
650                header
651                    .or(query_param)
652                    .ok_or_else(|| Rejection::from(ApiError::MissingChannel))
653            })
654            .boxed()
655    }
656}
657/// Constructs one or more events from json-s coming from reader.
658/// If errors, it's done with input.
659struct EventIterator<'de, R: JsonRead<'de>> {
660    /// Remaining request with JSON events
661    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
662    /// Count of sent events
663    events: usize,
664    /// Optional channel from headers
665    channel: Option<Value>,
666    /// Default time
667    time: Time,
668    /// Remaining extracted default values
669    extractors: [DefaultExtractor; 4],
670    /// Event finalization
671    batch: Option<BatchNotifier>,
672    /// Splunk HEC Token for passthrough
673    token: Option<Arc<str>>,
674    /// Lognamespace to put the events in
675    log_namespace: LogNamespace,
676    /// handle to EventsReceived registry
677    events_received: Registered<EventsReceived>,
678}
679
680/// Intermediate struct to generate an `EventIterator`
681struct EventIteratorGenerator<'de, R: JsonRead<'de>> {
682    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
683    channel: Option<String>,
684    batch: Option<BatchNotifier>,
685    token: Option<Arc<str>>,
686    log_namespace: LogNamespace,
687    events_received: Registered<EventsReceived>,
688    remote: Option<SocketAddr>,
689    remote_addr: Option<String>,
690}
691
692impl<'de, R: JsonRead<'de>> From<EventIteratorGenerator<'de, R>> for EventIterator<'de, R> {
693    fn from(f: EventIteratorGenerator<'de, R>) -> Self {
694        Self {
695            deserializer: f.deserializer,
696            events: 0,
697            channel: f.channel.map(Value::from),
698            time: Time::Now(Utc::now()),
699            extractors: [
700                // Extract the host field with the given priority:
701                // 1. The host field is present in the event payload
702                // 2. The x-forwarded-for header is present in the incoming request
703                // 3. Use the `remote`: SocketAddr value provided by warp
704                DefaultExtractor::new_with(
705                    "host",
706                    log_schema().host_key().cloned().into(),
707                    f.remote_addr
708                        .or_else(|| f.remote.map(|addr| addr.to_string()))
709                        .map(Value::from),
710                    f.log_namespace,
711                ),
712                DefaultExtractor::new("index", OptionalValuePath::new(INDEX), f.log_namespace),
713                DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), f.log_namespace),
714                DefaultExtractor::new(
715                    "sourcetype",
716                    OptionalValuePath::new(SOURCETYPE),
717                    f.log_namespace,
718                ),
719            ],
720            batch: f.batch,
721            token: f.token,
722            log_namespace: f.log_namespace,
723            events_received: f.events_received,
724        }
725    }
726}
727
728impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
729    fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
730        // Construct Event from parsed json event
731        let mut log = match self.log_namespace {
732            LogNamespace::Vector => self.build_log_vector(&mut json)?,
733            LogNamespace::Legacy => self.build_log_legacy(&mut json)?,
734        };
735
736        // Add source type
737        self.log_namespace.insert_vector_metadata(
738            &mut log,
739            log_schema().source_type_key(),
740            &owned_value_path!("source_type"),
741            SplunkConfig::NAME,
742        );
743
744        // Process channel field
745        let channel_path = owned_value_path!(CHANNEL);
746        if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
747            self.log_namespace.insert_source_metadata(
748                SplunkConfig::NAME,
749                &mut log,
750                Some(LegacyKey::Overwrite(&channel_path)),
751                lookup::path!(CHANNEL),
752                guid,
753            );
754        } else if let Some(guid) = self.channel.as_ref() {
755            self.log_namespace.insert_source_metadata(
756                SplunkConfig::NAME,
757                &mut log,
758                Some(LegacyKey::Overwrite(&channel_path)),
759                lookup::path!(CHANNEL),
760                guid.clone(),
761            );
762        }
763
764        // Process fields field
765        if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
766            for (key, value) in object {
767                self.log_namespace.insert_source_metadata(
768                    SplunkConfig::NAME,
769                    &mut log,
770                    Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
771                    lookup::path!(key.as_str()),
772                    value,
773                );
774            }
775        }
776
777        // Process time field
778        let parsed_time = match json.get_mut("time").map(JsonValue::take) {
779            Some(JsonValue::Number(time)) => Some(Some(time)),
780            Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
781            _ => None,
782        };
783
784        match parsed_time {
785            None => (),
786            Some(Some(t)) => {
787                if let Some(t) = t.as_u64() {
788                    let time = parse_timestamp(t as i64)
789                        .ok_or(ApiError::InvalidDataFormat { event: self.events })?;
790
791                    self.time = Time::Provided(time);
792                } else if let Some(t) = t.as_f64() {
793                    self.time = Time::Provided(
794                        Utc.timestamp_opt(
795                            t.floor() as i64,
796                            (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
797                        )
798                        .single()
799                        .expect("invalid timestamp"),
800                    );
801                } else {
802                    return Err(ApiError::InvalidDataFormat { event: self.events }.into());
803                }
804            }
805            Some(None) => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
806        }
807
808        // Add time field
809        let timestamp = match self.time.clone() {
810            Time::Provided(time) => time,
811            Time::Now(time) => time,
812        };
813
814        self.log_namespace.insert_source_metadata(
815            SplunkConfig::NAME,
816            &mut log,
817            log_schema().timestamp_key().map(LegacyKey::Overwrite),
818            lookup::path!("timestamp"),
819            timestamp,
820        );
821
822        // Extract default extracted fields
823        for de in self.extractors.iter_mut() {
824            de.extract(&mut log, &mut json);
825        }
826
827        // Add passthrough token if present
828        if let Some(token) = &self.token {
829            log.metadata_mut().set_splunk_hec_token(Arc::clone(token));
830        }
831
832        if let Some(batch) = self.batch.clone() {
833            log = log.with_batch_notifier(&batch);
834        }
835
836        self.events += 1;
837
838        Ok(log.into())
839    }
840
841    /// Build the log event for the vector namespace.
842    /// In this namespace the log event is created entirely from the event field.
843    /// No renaming of the `line` field is done.
844    fn build_log_vector(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
845        match json.get("event") {
846            Some(event) => {
847                let event: Value = event.into();
848                let mut log = LogEvent::from(event);
849
850                // EstimatedJsonSizeOf must be calculated before enrichment
851                self.events_received
852                    .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
853
854                // The timestamp is extracted from the message for the Legacy namespace.
855                self.log_namespace.insert_vector_metadata(
856                    &mut log,
857                    log_schema().timestamp_key(),
858                    lookup::path!("ingest_timestamp"),
859                    chrono::Utc::now(),
860                );
861
862                Ok(log)
863            }
864            None => Err(ApiError::MissingEventField { event: self.events }.into()),
865        }
866    }
867
868    /// Build the log event for the legacy namespace.
869    /// If the event is a string, or the event contains a field called `line` that is a string
870    /// (the docker splunk logger places the message in the event.line field) that string
871    /// is placed in the message field.
872    fn build_log_legacy(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
873        let mut log = LogEvent::default();
874        match json.get_mut("event") {
875            Some(event) => match event.take() {
876                JsonValue::String(string) => {
877                    if string.is_empty() {
878                        return Err(ApiError::EmptyEventField { event: self.events }.into());
879                    }
880                    log.maybe_insert(log_schema().message_key_target_path(), string);
881                }
882                JsonValue::Object(mut object) => {
883                    if object.is_empty() {
884                        return Err(ApiError::EmptyEventField { event: self.events }.into());
885                    }
886
887                    // Add 'line' value as 'event::schema().message_key'
888                    if let Some(line) = object.remove("line") {
889                        match line {
890                            // This don't quite fit the meaning of a event::schema().message_key
891                            JsonValue::Array(_) | JsonValue::Object(_) => {
892                                log.insert(event_path!("line"), line);
893                            }
894                            _ => {
895                                log.maybe_insert(log_schema().message_key_target_path(), line);
896                            }
897                        }
898                    }
899
900                    for (key, value) in object {
901                        log.insert(event_path!(key.as_str()), value);
902                    }
903                }
904                _ => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
905            },
906            None => return Err(ApiError::MissingEventField { event: self.events }.into()),
907        };
908
909        // EstimatedJsonSizeOf must be calculated before enrichment
910        self.events_received
911            .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
912
913        Ok(log)
914    }
915}
916
917impl<'de, R: JsonRead<'de>> Iterator for EventIterator<'de, R> {
918    type Item = Result<Event, Rejection>;
919
920    fn next(&mut self) -> Option<Self::Item> {
921        match self.deserializer.next() {
922            Some(Ok(json)) => Some(self.build_event(json)),
923            None => {
924                if self.events == 0 {
925                    Some(Err(ApiError::NoData.into()))
926                } else {
927                    None
928                }
929            }
930            Some(Err(error)) => {
931                emit!(SplunkHecRequestBodyInvalidError {
932                    error: error.into()
933                });
934                Some(Err(
935                    ApiError::InvalidDataFormat { event: self.events }.into()
936                ))
937            }
938        }
939    }
940}
941
942/// Parse a `i64` unix timestamp that can either be in seconds, milliseconds or
943/// nanoseconds.
944///
945/// This attempts to parse timestamps based on what cutoff range they fall into.
946/// For seconds to be parsed the timestamp must be less than the unix epoch of
947/// the year `2400`. For this to parse milliseconds the time must be smaller
948/// than the year `10,000` in unix epoch milliseconds. If the value is larger
949/// than both we attempt to parse it as nanoseconds.
950///
951/// Returns `None` if `t` is negative.
952fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
953    // Utc.ymd(2400, 1, 1).and_hms(0,0,0).timestamp();
954    const SEC_CUTOFF: i64 = 13569465600;
955    // Utc.ymd(10_000, 1, 1).and_hms(0,0,0).timestamp_millis();
956    const MILLISEC_CUTOFF: i64 = 253402300800000;
957
958    // Timestamps can't be negative!
959    if t < 0 {
960        return None;
961    }
962
963    let ts = if t < SEC_CUTOFF {
964        Utc.timestamp_opt(t, 0).single().expect("invalid timestamp")
965    } else if t < MILLISEC_CUTOFF {
966        Utc.timestamp_millis_opt(t)
967            .single()
968            .expect("invalid timestamp")
969    } else {
970        Utc.timestamp_nanos(t)
971    };
972
973    Some(ts)
974}
975
976/// Maintains last known extracted value of field and uses it in the absence of field.
977struct DefaultExtractor {
978    field: &'static str,
979    to_field: OptionalValuePath,
980    value: Option<Value>,
981    log_namespace: LogNamespace,
982}
983
984impl DefaultExtractor {
985    const fn new(
986        field: &'static str,
987        to_field: OptionalValuePath,
988        log_namespace: LogNamespace,
989    ) -> Self {
990        DefaultExtractor {
991            field,
992            to_field,
993            value: None,
994            log_namespace,
995        }
996    }
997
998    fn new_with(
999        field: &'static str,
1000        to_field: OptionalValuePath,
1001        value: impl Into<Option<Value>>,
1002        log_namespace: LogNamespace,
1003    ) -> Self {
1004        DefaultExtractor {
1005            field,
1006            to_field,
1007            value: value.into(),
1008            log_namespace,
1009        }
1010    }
1011
1012    fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
1013        // Process json_field
1014        if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
1015            self.value = Some(new_value.into());
1016        }
1017
1018        // Add data field
1019        if let Some(index) = self.value.as_ref() {
1020            if let Some(metadata_key) = self.to_field.path.as_ref() {
1021                self.log_namespace.insert_source_metadata(
1022                    SplunkConfig::NAME,
1023                    log,
1024                    Some(LegacyKey::Overwrite(metadata_key)),
1025                    &self.to_field.path.clone().unwrap_or(owned_value_path!("")),
1026                    index.clone(),
1027                )
1028            }
1029        }
1030    }
1031}
1032
1033/// For tracking origin of the timestamp
1034#[derive(Clone, Debug)]
1035enum Time {
1036    /// Backup
1037    Now(DateTime<Utc>),
1038    /// Provided in the request
1039    Provided(DateTime<Utc>),
1040}
1041
1042/// Creates event from raw request
1043#[allow(clippy::too_many_arguments)]
1044fn raw_event(
1045    bytes: Bytes,
1046    gzip: bool,
1047    channel: String,
1048    remote: Option<SocketAddr>,
1049    xff: Option<String>,
1050    batch: Option<BatchNotifier>,
1051    log_namespace: LogNamespace,
1052    events_received: &Registered<EventsReceived>,
1053) -> Result<Event, Rejection> {
1054    // Process gzip
1055    let message: Value = if gzip {
1056        let mut data = Vec::new();
1057        match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut data) {
1058            Ok(0) => return Err(ApiError::NoData.into()),
1059            Ok(_) => Value::from(Bytes::from(data)),
1060            Err(error) => {
1061                emit!(SplunkHecRequestBodyInvalidError { error });
1062                return Err(ApiError::InvalidDataFormat { event: 0 }.into());
1063            }
1064        }
1065    } else {
1066        bytes.into()
1067    };
1068
1069    // Construct event
1070    let mut log = match log_namespace {
1071        LogNamespace::Vector => LogEvent::from(message),
1072        LogNamespace::Legacy => {
1073            let mut log = LogEvent::default();
1074            log.maybe_insert(log_schema().message_key_target_path(), message);
1075            log
1076        }
1077    };
1078    // We need to calculate the estimated json size of the event BEFORE enrichment.
1079    events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1080
1081    // Add channel
1082    log_namespace.insert_source_metadata(
1083        SplunkConfig::NAME,
1084        &mut log,
1085        Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
1086        lookup::path!(CHANNEL),
1087        channel,
1088    );
1089
1090    // host-field priority for raw endpoint:
1091    // - x-forwarded-for is set to `host` field first, if present. If not present:
1092    // - set remote addr to host field
1093    let host = if let Some(remote_address) = xff {
1094        Some(remote_address)
1095    } else {
1096        remote.map(|remote| remote.to_string())
1097    };
1098
1099    if let Some(host) = host {
1100        log_namespace.insert_source_metadata(
1101            SplunkConfig::NAME,
1102            &mut log,
1103            log_schema().host_key().map(LegacyKey::InsertIfEmpty),
1104            lookup::path!("host"),
1105            host,
1106        );
1107    }
1108
1109    log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME, Utc::now());
1110
1111    if let Some(batch) = batch {
1112        log = log.with_batch_notifier(&batch);
1113    }
1114
1115    Ok(Event::from(log))
1116}
1117
1118#[derive(Clone, Copy, Debug, Snafu)]
1119pub(crate) enum ApiError {
1120    MissingAuthorization,
1121    InvalidAuthorization,
1122    UnsupportedEncoding,
1123    UnsupportedContentType,
1124    MissingChannel,
1125    NoData,
1126    InvalidDataFormat { event: usize },
1127    ServerShutdown,
1128    EmptyEventField { event: usize },
1129    MissingEventField { event: usize },
1130    BadRequest,
1131    ServiceUnavailable,
1132    AckIsDisabled,
1133}
1134
1135impl warp::reject::Reject for ApiError {}
1136
1137/// Cached bodies for common responses
1138mod splunk_response {
1139    use serde::Serialize;
1140
1141    // https://docs.splunk.com/Documentation/Splunk/8.2.3/Data/TroubleshootHTTPEventCollector#Possible_error_codes
1142    pub enum HecStatusCode {
1143        Success = 0,
1144        TokenIsRequired = 2,
1145        InvalidAuthorization = 3,
1146        NoData = 5,
1147        InvalidDataFormat = 6,
1148        ServerIsBusy = 9,
1149        DataChannelIsMissing = 10,
1150        EventFieldIsRequired = 12,
1151        EventFieldCannotBeBlank = 13,
1152        AckIsDisabled = 14,
1153    }
1154
1155    #[derive(Serialize)]
1156    pub enum HecResponseMetadata {
1157        #[serde(rename = "ackId")]
1158        AckId(u64),
1159        #[serde(rename = "invalid-event-number")]
1160        InvalidEventNumber(usize),
1161    }
1162
1163    #[derive(Serialize)]
1164    pub struct HecResponse {
1165        text: &'static str,
1166        code: u8,
1167        #[serde(skip_serializing_if = "Option::is_none", flatten)]
1168        pub metadata: Option<HecResponseMetadata>,
1169    }
1170
1171    impl HecResponse {
1172        pub const fn new(code: HecStatusCode) -> Self {
1173            let text = match code {
1174                HecStatusCode::Success => "Success",
1175                HecStatusCode::TokenIsRequired => "Token is required",
1176                HecStatusCode::InvalidAuthorization => "Invalid authorization",
1177                HecStatusCode::NoData => "No data",
1178                HecStatusCode::InvalidDataFormat => "Invalid data format",
1179                HecStatusCode::DataChannelIsMissing => "Data channel is missing",
1180                HecStatusCode::EventFieldIsRequired => "Event field is required",
1181                HecStatusCode::EventFieldCannotBeBlank => "Event field cannot be blank",
1182                HecStatusCode::ServerIsBusy => "Server is busy",
1183                HecStatusCode::AckIsDisabled => "Ack is disabled",
1184            };
1185
1186            Self {
1187                text,
1188                code: code as u8,
1189                metadata: None,
1190            }
1191        }
1192
1193        pub const fn with_metadata(mut self, metadata: HecResponseMetadata) -> Self {
1194            self.metadata = Some(metadata);
1195            self
1196        }
1197    }
1198
1199    pub const INVALID_AUTHORIZATION: HecResponse =
1200        HecResponse::new(HecStatusCode::InvalidAuthorization);
1201    pub const TOKEN_IS_REQUIRED: HecResponse = HecResponse::new(HecStatusCode::TokenIsRequired);
1202    pub const NO_DATA: HecResponse = HecResponse::new(HecStatusCode::NoData);
1203    pub const SUCCESS: HecResponse = HecResponse::new(HecStatusCode::Success);
1204    pub const SERVER_IS_BUSY: HecResponse = HecResponse::new(HecStatusCode::ServerIsBusy);
1205    pub const NO_CHANNEL: HecResponse = HecResponse::new(HecStatusCode::DataChannelIsMissing);
1206    pub const ACK_IS_DISABLED: HecResponse = HecResponse::new(HecStatusCode::AckIsDisabled);
1207}
1208
1209fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
1210    let body = if let Some(ack_id) = maybe_ack_id {
1211        HecResponse::new(HecStatusCode::Success).with_metadata(HecResponseMetadata::AckId(ack_id))
1212    } else {
1213        splunk_response::SUCCESS
1214    };
1215    response_json(StatusCode::OK, body)
1216}
1217
1218fn response_plain(code: StatusCode, msg: &'static str) -> Response {
1219    warp::reply::with_status(
1220        warp::reply::with_header(msg, http::header::CONTENT_TYPE, "text/plain; charset=utf-8"),
1221        code,
1222    )
1223    .into_response()
1224}
1225
1226async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
1227    if let Some(&error) = rejection.find::<ApiError>() {
1228        emit!(SplunkHecRequestError { error });
1229        Ok((match error {
1230            ApiError::MissingAuthorization => {
1231                response_json(StatusCode::UNAUTHORIZED, splunk_response::TOKEN_IS_REQUIRED)
1232            }
1233            ApiError::InvalidAuthorization => response_json(
1234                StatusCode::UNAUTHORIZED,
1235                splunk_response::INVALID_AUTHORIZATION,
1236            ),
1237            ApiError::UnsupportedEncoding => empty_response(StatusCode::UNSUPPORTED_MEDIA_TYPE),
1238            ApiError::UnsupportedContentType => response_plain(
1239                StatusCode::UNSUPPORTED_MEDIA_TYPE,
1240                "The request's content-type is not supported",
1241            ),
1242            ApiError::MissingChannel => {
1243                response_json(StatusCode::BAD_REQUEST, splunk_response::NO_CHANNEL)
1244            }
1245            ApiError::NoData => response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA),
1246            ApiError::ServerShutdown => empty_response(StatusCode::SERVICE_UNAVAILABLE),
1247            ApiError::InvalidDataFormat { event } => response_json(
1248                StatusCode::BAD_REQUEST,
1249                HecResponse::new(HecStatusCode::InvalidDataFormat)
1250                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1251            ),
1252            ApiError::EmptyEventField { event } => response_json(
1253                StatusCode::BAD_REQUEST,
1254                HecResponse::new(HecStatusCode::EventFieldCannotBeBlank)
1255                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1256            ),
1257            ApiError::MissingEventField { event } => response_json(
1258                StatusCode::BAD_REQUEST,
1259                HecResponse::new(HecStatusCode::EventFieldIsRequired)
1260                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1261            ),
1262            ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
1263            ApiError::ServiceUnavailable => response_json(
1264                StatusCode::SERVICE_UNAVAILABLE,
1265                splunk_response::SERVER_IS_BUSY,
1266            ),
1267            ApiError::AckIsDisabled => {
1268                response_json(StatusCode::BAD_REQUEST, splunk_response::ACK_IS_DISABLED)
1269            }
1270        },))
1271    } else {
1272        Err(rejection)
1273    }
1274}
1275
1276/// Response without body
1277fn empty_response(code: StatusCode) -> Response {
1278    let mut res = Response::default();
1279    *res.status_mut() = code;
1280    res
1281}
1282
1283/// Response with body
1284fn response_json(code: StatusCode, body: impl Serialize) -> Response {
1285    warp::reply::with_status(warp::reply::json(&body), code).into_response()
1286}
1287
1288#[cfg(feature = "sinks-splunk_hec")]
1289#[cfg(test)]
1290mod tests {
1291    use std::{net::SocketAddr, num::NonZeroU64};
1292
1293    use chrono::{TimeZone, Utc};
1294    use futures_util::Stream;
1295    use http::Uri;
1296    use reqwest::{RequestBuilder, Response};
1297    use serde::Deserialize;
1298    use vector_lib::codecs::{
1299        decoding::DeserializerConfig, BytesDecoderConfig, JsonSerializerConfig,
1300        TextSerializerConfig,
1301    };
1302    use vector_lib::sensitive_string::SensitiveString;
1303    use vector_lib::{event::EventStatus, schema::Definition};
1304    use vrl::path::PathPrefix;
1305
1306    use super::*;
1307    use crate::{
1308        codecs::{DecodingConfig, EncodingConfig},
1309        components::validation::prelude::*,
1310        config::{log_schema, SinkConfig, SinkContext, SourceConfig, SourceContext},
1311        event::{Event, LogEvent},
1312        sinks::{
1313            splunk_hec::logs::config::HecLogsSinkConfig,
1314            util::{BatchConfig, Compression, TowerRequestConfig},
1315            Healthcheck, VectorSink,
1316        },
1317        sources::splunk_hec::acknowledgements::{HecAckStatusRequest, HecAckStatusResponse},
1318        test_util::{
1319            collect_n,
1320            components::{
1321                assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
1322                HTTP_PUSH_SOURCE_TAGS,
1323            },
1324            next_addr, wait_for_tcp,
1325        },
1326        SourceSender,
1327    };
1328
1329    #[test]
1330    fn generate_config() {
1331        crate::test_util::test_generate_config::<SplunkConfig>();
1332    }
1333
1334    /// Splunk token
1335    const TOKEN: &str = "token";
1336    const VALID_TOKENS: &[&str; 2] = &[TOKEN, "secondary-token"];
1337
1338    async fn source(
1339        acknowledgements: Option<HecAcknowledgementsConfig>,
1340    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
1341        source_with(Some(TOKEN.to_owned().into()), None, acknowledgements, false).await
1342    }
1343
1344    async fn source_with(
1345        token: Option<SensitiveString>,
1346        valid_tokens: Option<&[&str]>,
1347        acknowledgements: Option<HecAcknowledgementsConfig>,
1348        store_hec_token: bool,
1349    ) -> (impl Stream<Item = Event> + Unpin + use<>, SocketAddr) {
1350        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1351        let address = next_addr();
1352        let valid_tokens =
1353            valid_tokens.map(|tokens| tokens.iter().map(|v| v.to_string().into()).collect());
1354        let cx = SourceContext::new_test(sender, None);
1355        tokio::spawn(async move {
1356            SplunkConfig {
1357                address,
1358                token,
1359                valid_tokens,
1360                tls: None,
1361                acknowledgements: acknowledgements.unwrap_or_default(),
1362                store_hec_token,
1363                log_namespace: None,
1364                keepalive: Default::default(),
1365            }
1366            .build(cx)
1367            .await
1368            .unwrap()
1369            .await
1370            .unwrap()
1371        });
1372        wait_for_tcp(address).await;
1373        (recv, address)
1374    }
1375
1376    async fn sink(
1377        address: SocketAddr,
1378        encoding: EncodingConfig,
1379        compression: Compression,
1380    ) -> (VectorSink, Healthcheck) {
1381        HecLogsSinkConfig {
1382            default_token: TOKEN.to_owned().into(),
1383            endpoint: format!("http://{address}"),
1384            host_key: None,
1385            indexed_fields: vec![],
1386            index: None,
1387            sourcetype: None,
1388            source: None,
1389            encoding,
1390            compression,
1391            batch: BatchConfig::default(),
1392            request: TowerRequestConfig::default(),
1393            tls: None,
1394            acknowledgements: Default::default(),
1395            timestamp_nanos_key: None,
1396            timestamp_key: None,
1397            auto_extract_timestamp: None,
1398            endpoint_target: Default::default(),
1399        }
1400        .build(SinkContext::default())
1401        .await
1402        .unwrap()
1403    }
1404
1405    async fn start(
1406        encoding: EncodingConfig,
1407        compression: Compression,
1408        acknowledgements: Option<HecAcknowledgementsConfig>,
1409    ) -> (VectorSink, impl Stream<Item = Event> + Unpin) {
1410        let (source, address) = source(acknowledgements).await;
1411        let (sink, health) = sink(address, encoding, compression).await;
1412        assert!(health.await.is_ok());
1413        (sink, source)
1414    }
1415
1416    async fn channel_n(
1417        messages: Vec<impl Into<String> + Send + 'static>,
1418        sink: VectorSink,
1419        source: impl Stream<Item = Event> + Unpin,
1420    ) -> Vec<Event> {
1421        let n = messages.len();
1422
1423        tokio::spawn(async move {
1424            sink.run_events(
1425                messages
1426                    .into_iter()
1427                    .map(|s| Event::Log(LogEvent::from(s.into()))),
1428            )
1429            .await
1430            .unwrap();
1431        });
1432
1433        let events = collect_n(source, n).await;
1434        assert_eq!(n, events.len());
1435
1436        events
1437    }
1438
1439    #[derive(Clone, Copy, Debug)]
1440    enum Channel<'a> {
1441        Header(&'a str),
1442        QueryParam(&'a str),
1443    }
1444
1445    #[derive(Default)]
1446    struct SendWithOpts<'a> {
1447        channel: Option<Channel<'a>>,
1448        forwarded_for: Option<String>,
1449    }
1450
1451    async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
1452        let channel = Channel::Header("channel");
1453        let options = SendWithOpts {
1454            channel: Some(channel),
1455            forwarded_for: None,
1456        };
1457        send_with(address, api, message, TOKEN, &options).await
1458    }
1459
1460    fn build_request(
1461        address: SocketAddr,
1462        api: &str,
1463        message: &str,
1464        token: &str,
1465        opts: &SendWithOpts<'_>,
1466    ) -> RequestBuilder {
1467        let mut b = reqwest::Client::new()
1468            .post(format!("http://{address}/{api}"))
1469            .header("Authorization", format!("Splunk {token}"));
1470
1471        b = match opts.channel {
1472            Some(c) => match c {
1473                Channel::Header(v) => b.header("x-splunk-request-channel", v),
1474                Channel::QueryParam(v) => b.query(&[("channel", v)]),
1475            },
1476            None => b,
1477        };
1478
1479        b = match &opts.forwarded_for {
1480            Some(f) => b.header("X-Forwarded-For", f),
1481            None => b,
1482        };
1483
1484        b.body(message.to_owned())
1485    }
1486
1487    async fn send_with(
1488        address: SocketAddr,
1489        api: &str,
1490        message: &str,
1491        token: &str,
1492        opts: &SendWithOpts<'_>,
1493    ) -> u16 {
1494        let b = build_request(address, api, message, token, opts);
1495        b.send().await.unwrap().status().as_u16()
1496    }
1497
1498    async fn send_with_response(
1499        address: SocketAddr,
1500        api: &str,
1501        message: &str,
1502        token: &str,
1503        opts: &SendWithOpts<'_>,
1504    ) -> Response {
1505        let b = build_request(address, api, message, token, opts);
1506        b.send().await.unwrap()
1507    }
1508
1509    #[tokio::test]
1510    async fn no_compression_text_event() {
1511        let message = "gzip_text_event";
1512        let (sink, source) = start(
1513            TextSerializerConfig::default().into(),
1514            Compression::None,
1515            None,
1516        )
1517        .await;
1518
1519        let event = channel_n(vec![message], sink, source).await.remove(0);
1520
1521        assert_eq!(
1522            event.as_log()[log_schema().message_key().unwrap().to_string()],
1523            message.into()
1524        );
1525        assert!(event.as_log().get_timestamp().is_some());
1526        assert_eq!(
1527            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1528            "splunk_hec".into()
1529        );
1530        assert!(event.metadata().splunk_hec_token().is_none());
1531    }
1532
1533    #[tokio::test]
1534    async fn one_simple_text_event() {
1535        let message = "one_simple_text_event";
1536        let (sink, source) = start(
1537            TextSerializerConfig::default().into(),
1538            Compression::gzip_default(),
1539            None,
1540        )
1541        .await;
1542
1543        let event = channel_n(vec![message], sink, source).await.remove(0);
1544
1545        assert_eq!(
1546            event.as_log()[log_schema().message_key().unwrap().to_string()],
1547            message.into()
1548        );
1549        assert!(event.as_log().get_timestamp().is_some());
1550        assert_eq!(
1551            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1552            "splunk_hec".into()
1553        );
1554        assert!(event.metadata().splunk_hec_token().is_none());
1555    }
1556
1557    #[tokio::test]
1558    async fn multiple_simple_text_event() {
1559        let n = 200;
1560        let (sink, source) = start(
1561            TextSerializerConfig::default().into(),
1562            Compression::None,
1563            None,
1564        )
1565        .await;
1566
1567        let messages = (0..n)
1568            .map(|i| format!("multiple_simple_text_event_{i}"))
1569            .collect::<Vec<_>>();
1570        let events = channel_n(messages.clone(), sink, source).await;
1571
1572        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1573            assert_eq!(
1574                event.as_log()[log_schema().message_key().unwrap().to_string()],
1575                msg.into()
1576            );
1577            assert!(event.as_log().get_timestamp().is_some());
1578            assert_eq!(
1579                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1580                "splunk_hec".into()
1581            );
1582            assert!(event.metadata().splunk_hec_token().is_none());
1583        }
1584    }
1585
1586    #[tokio::test]
1587    async fn one_simple_json_event() {
1588        let message = "one_simple_json_event";
1589        let (sink, source) = start(
1590            JsonSerializerConfig::default().into(),
1591            Compression::gzip_default(),
1592            None,
1593        )
1594        .await;
1595
1596        let event = channel_n(vec![message], sink, source).await.remove(0);
1597
1598        assert_eq!(
1599            event.as_log()[log_schema().message_key().unwrap().to_string()],
1600            message.into()
1601        );
1602        assert!(event.as_log().get_timestamp().is_some());
1603        assert_eq!(
1604            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1605            "splunk_hec".into()
1606        );
1607        assert!(event.metadata().splunk_hec_token().is_none());
1608    }
1609
1610    #[tokio::test]
1611    async fn multiple_simple_json_event() {
1612        let n = 200;
1613        let (sink, source) = start(
1614            JsonSerializerConfig::default().into(),
1615            Compression::gzip_default(),
1616            None,
1617        )
1618        .await;
1619
1620        let messages = (0..n)
1621            .map(|i| format!("multiple_simple_json_event{i}"))
1622            .collect::<Vec<_>>();
1623        let events = channel_n(messages.clone(), sink, source).await;
1624
1625        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1626            assert_eq!(
1627                event.as_log()[log_schema().message_key().unwrap().to_string()],
1628                msg.into()
1629            );
1630            assert!(event.as_log().get_timestamp().is_some());
1631            assert_eq!(
1632                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1633                "splunk_hec".into()
1634            );
1635            assert!(event.metadata().splunk_hec_token().is_none());
1636        }
1637    }
1638
1639    #[tokio::test]
1640    async fn json_event() {
1641        let (sink, source) = start(
1642            JsonSerializerConfig::default().into(),
1643            Compression::gzip_default(),
1644            None,
1645        )
1646        .await;
1647
1648        let mut log = LogEvent::default();
1649        log.insert("greeting", "hello");
1650        log.insert("name", "bob");
1651        sink.run_events(vec![log.into()]).await.unwrap();
1652
1653        let event = collect_n(source, 1).await.remove(0).into_log();
1654        assert_eq!(event["greeting"], "hello".into());
1655        assert_eq!(event["name"], "bob".into());
1656        assert!(event.get_timestamp().is_some());
1657        assert_eq!(
1658            event[log_schema().source_type_key().unwrap().to_string()],
1659            "splunk_hec".into()
1660        );
1661        assert!(event.metadata().splunk_hec_token().is_none());
1662    }
1663
1664    #[tokio::test]
1665    async fn json_invalid_path_event() {
1666        let (sink, source) = start(
1667            JsonSerializerConfig::default().into(),
1668            Compression::gzip_default(),
1669            None,
1670        )
1671        .await;
1672
1673        let mut log = LogEvent::default();
1674        // Test with a field that would be considered an invalid path if it were to
1675        // be treated as a path and not a simple field name.
1676        log.insert(event_path!("(greeting | thing"), "hello");
1677        sink.run_events(vec![log.into()]).await.unwrap();
1678
1679        let event = collect_n(source, 1).await.remove(0).into_log();
1680        assert_eq!(
1681            event.get(event_path!("(greeting | thing")),
1682            Some(&Value::from("hello"))
1683        );
1684    }
1685
1686    #[tokio::test]
1687    async fn line_to_message() {
1688        let (sink, source) = start(
1689            JsonSerializerConfig::default().into(),
1690            Compression::gzip_default(),
1691            None,
1692        )
1693        .await;
1694
1695        let mut event = LogEvent::default();
1696        event.insert("line", "hello");
1697        sink.run_events(vec![event.into()]).await.unwrap();
1698
1699        let event = collect_n(source, 1).await.remove(0);
1700        assert_eq!(
1701            event.as_log()[log_schema().message_key().unwrap().to_string()],
1702            "hello".into()
1703        );
1704        assert!(event.metadata().splunk_hec_token().is_none());
1705    }
1706
1707    #[tokio::test]
1708    async fn raw() {
1709        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1710            let message = "raw";
1711            let (source, address) = source(None).await;
1712
1713            assert_eq!(200, post(address, "services/collector/raw", message).await);
1714
1715            let event = collect_n(source, 1).await.remove(0);
1716            assert_eq!(
1717                event.as_log()[log_schema().message_key().unwrap().to_string()],
1718                message.into()
1719            );
1720            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1721            assert!(event.as_log().get_timestamp().is_some());
1722            assert_eq!(
1723                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1724                "splunk_hec".into()
1725            );
1726            assert!(event.metadata().splunk_hec_token().is_none());
1727        })
1728        .await;
1729    }
1730
1731    #[tokio::test]
1732    async fn root() {
1733        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1734            let message = r#"{ "event": { "message": "root"} }"#;
1735            let (source, address) = source(None).await;
1736
1737            assert_eq!(200, post(address, "services/collector", message).await);
1738
1739            let event = collect_n(source, 1).await.remove(0);
1740            assert_eq!(
1741                event.as_log()[log_schema().message_key().unwrap().to_string()],
1742                "root".into()
1743            );
1744            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1745            assert!(event.as_log().get_timestamp().is_some());
1746            assert_eq!(
1747                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1748                "splunk_hec".into()
1749            );
1750            assert!(event.metadata().splunk_hec_token().is_none());
1751        })
1752        .await;
1753    }
1754
1755    #[tokio::test]
1756    async fn channel_header() {
1757        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1758            let message = "raw";
1759            let (source, address) = source(None).await;
1760
1761            let opts = SendWithOpts {
1762                channel: Some(Channel::Header("guid")),
1763                forwarded_for: None,
1764            };
1765
1766            assert_eq!(
1767                200,
1768                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1769            );
1770
1771            let event = collect_n(source, 1).await.remove(0);
1772            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1773        })
1774        .await;
1775    }
1776
1777    #[tokio::test]
1778    async fn xff_header_raw() {
1779        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1780            let message = "raw";
1781            let (source, address) = source(None).await;
1782
1783            let opts = SendWithOpts {
1784                channel: Some(Channel::Header("guid")),
1785                forwarded_for: Some(String::from("10.0.0.1")),
1786            };
1787
1788            assert_eq!(
1789                200,
1790                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1791            );
1792
1793            let event = collect_n(source, 1).await.remove(0);
1794            assert_eq!(
1795                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1796                "10.0.0.1".into()
1797            );
1798        })
1799        .await;
1800    }
1801
1802    // Test helps to illustrate that a payload's `host` value should override an x-forwarded-for header
1803    #[tokio::test]
1804    async fn xff_header_event_with_host_field() {
1805        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1806            let message = r#"{"event":"first", "host": "10.1.0.2"}"#;
1807            let (source, address) = source(None).await;
1808
1809            let opts = SendWithOpts {
1810                channel: Some(Channel::Header("guid")),
1811                forwarded_for: Some(String::from("10.0.0.1")),
1812            };
1813
1814            assert_eq!(
1815                200,
1816                send_with(address, "services/collector/event", message, TOKEN, &opts).await
1817            );
1818
1819            let event = collect_n(source, 1).await.remove(0);
1820            assert_eq!(
1821                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1822                "10.1.0.2".into()
1823            );
1824        })
1825        .await;
1826    }
1827
1828    // Test helps to illustrate that a payload's `host` value should override an x-forwarded-for header
1829    #[tokio::test]
1830    async fn xff_header_event_without_host_field() {
1831        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1832            let message = r#"{"event":"first", "color": "blue"}"#;
1833            let (source, address) = source(None).await;
1834
1835            let opts = SendWithOpts {
1836                channel: Some(Channel::Header("guid")),
1837                forwarded_for: Some(String::from("10.0.0.1")),
1838            };
1839
1840            assert_eq!(
1841                200,
1842                send_with(address, "services/collector/event", message, TOKEN, &opts).await
1843            );
1844
1845            let event = collect_n(source, 1).await.remove(0);
1846            assert_eq!(
1847                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1848                "10.0.0.1".into()
1849            );
1850        })
1851        .await;
1852    }
1853
1854    #[tokio::test]
1855    async fn channel_query_param() {
1856        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1857            let message = "raw";
1858            let (source, address) = source(None).await;
1859
1860            let opts = SendWithOpts {
1861                channel: Some(Channel::QueryParam("guid")),
1862                forwarded_for: None,
1863            };
1864
1865            assert_eq!(
1866                200,
1867                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1868            );
1869
1870            let event = collect_n(source, 1).await.remove(0);
1871            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1872        })
1873        .await;
1874    }
1875
1876    #[tokio::test]
1877    async fn no_data() {
1878        let (_source, address) = source(None).await;
1879
1880        assert_eq!(400, post(address, "services/collector/event", "").await);
1881    }
1882
1883    #[tokio::test]
1884    async fn invalid_token() {
1885        assert_source_error(&COMPONENT_ERROR_TAGS, async {
1886            let (_source, address) = source(None).await;
1887            let opts = SendWithOpts {
1888                channel: Some(Channel::Header("channel")),
1889                forwarded_for: None,
1890            };
1891
1892            assert_eq!(
1893                401,
1894                send_with(address, "services/collector/event", "", "nope", &opts).await
1895            );
1896        })
1897        .await;
1898    }
1899
1900    #[tokio::test]
1901    async fn health_ignores_token() {
1902        let (_source, address) = source(None).await;
1903
1904        let res = reqwest::Client::new()
1905            .get(format!("http://{address}/services/collector/health"))
1906            .header("Authorization", format!("Splunk {}", "invalid token"))
1907            .send()
1908            .await
1909            .unwrap();
1910
1911        assert_eq!(200, res.status().as_u16());
1912    }
1913
1914    #[tokio::test]
1915    async fn health() {
1916        let (_source, address) = source(None).await;
1917
1918        let res = reqwest::Client::new()
1919            .get(format!("http://{address}/services/collector/health"))
1920            .send()
1921            .await
1922            .unwrap();
1923
1924        assert_eq!(200, res.status().as_u16());
1925    }
1926
1927    #[tokio::test]
1928    async fn secondary_token() {
1929        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1930            let message = r#"{"event":"first", "color": "blue"}"#;
1931            let (_source, address) = source_with(None, Some(VALID_TOKENS), None, false).await;
1932            let options = SendWithOpts {
1933                channel: None,
1934                forwarded_for: None,
1935            };
1936
1937            assert_eq!(
1938                200,
1939                send_with(
1940                    address,
1941                    "services/collector/event",
1942                    message,
1943                    VALID_TOKENS.get(1).unwrap(),
1944                    &options
1945                )
1946                .await
1947            );
1948        })
1949        .await;
1950    }
1951
1952    #[tokio::test]
1953    async fn event_service_token_passthrough_enabled() {
1954        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1955            let message = "passthrough_token_enabled";
1956            let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
1957            let (sink, health) = sink(
1958                address,
1959                TextSerializerConfig::default().into(),
1960                Compression::gzip_default(),
1961            )
1962            .await;
1963            assert!(health.await.is_ok());
1964
1965            let event = channel_n(vec![message], sink, source).await.remove(0);
1966
1967            assert_eq!(
1968                event.as_log()[log_schema().message_key().unwrap().to_string()],
1969                message.into()
1970            );
1971            assert_eq!(
1972                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
1973                TOKEN
1974            );
1975        })
1976        .await;
1977    }
1978
1979    #[tokio::test]
1980    async fn raw_service_token_passthrough_enabled() {
1981        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1982            let message = "raw";
1983            let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
1984
1985            assert_eq!(200, post(address, "services/collector/raw", message).await);
1986
1987            let event = collect_n(source, 1).await.remove(0);
1988            assert_eq!(
1989                event.as_log()[log_schema().message_key().unwrap().to_string()],
1990                message.into()
1991            );
1992            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1993            assert!(event.as_log().get_timestamp().is_some());
1994            assert_eq!(
1995                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1996                "splunk_hec".into()
1997            );
1998            assert_eq!(
1999                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2000                TOKEN
2001            );
2002        })
2003        .await;
2004    }
2005
2006    #[tokio::test]
2007    async fn no_authorization() {
2008        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2009            let message = "no_authorization";
2010            let (source, address) = source_with(None, None, None, false).await;
2011            let (sink, health) = sink(
2012                address,
2013                TextSerializerConfig::default().into(),
2014                Compression::gzip_default(),
2015            )
2016            .await;
2017            assert!(health.await.is_ok());
2018
2019            let event = channel_n(vec![message], sink, source).await.remove(0);
2020
2021            assert_eq!(
2022                event.as_log()[log_schema().message_key().unwrap().to_string()],
2023                message.into()
2024            );
2025            assert!(event.metadata().splunk_hec_token().is_none());
2026        })
2027        .await;
2028    }
2029
2030    #[tokio::test]
2031    async fn no_authorization_token_passthrough_enabled() {
2032        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2033            let message = "no_authorization";
2034            let (source, address) = source_with(None, None, None, true).await;
2035            let (sink, health) = sink(
2036                address,
2037                TextSerializerConfig::default().into(),
2038                Compression::gzip_default(),
2039            )
2040            .await;
2041            assert!(health.await.is_ok());
2042
2043            let event = channel_n(vec![message], sink, source).await.remove(0);
2044
2045            assert_eq!(
2046                event.as_log()[log_schema().message_key().unwrap().to_string()],
2047                message.into()
2048            );
2049            assert_eq!(
2050                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2051                TOKEN
2052            );
2053        })
2054        .await;
2055    }
2056
2057    #[tokio::test]
2058    async fn partial() {
2059        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2060            let message = r#"{"event":"first"}{"event":"second""#;
2061            let (source, address) = source(None).await;
2062
2063            assert_eq!(
2064                400,
2065                post(address, "services/collector/event", message).await
2066            );
2067
2068            let event = collect_n(source, 1).await.remove(0);
2069            assert_eq!(
2070                event.as_log()[log_schema().message_key().unwrap().to_string()],
2071                "first".into()
2072            );
2073            assert!(event.as_log().get_timestamp().is_some());
2074            assert_eq!(
2075                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2076                "splunk_hec".into()
2077            );
2078        })
2079        .await;
2080    }
2081
2082    #[tokio::test]
2083    async fn handles_newlines() {
2084        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2085            let message = r#"
2086{"event":"first"}
2087        "#;
2088            let (source, address) = source(None).await;
2089
2090            assert_eq!(
2091                200,
2092                post(address, "services/collector/event", message).await
2093            );
2094
2095            let event = collect_n(source, 1).await.remove(0);
2096            assert_eq!(
2097                event.as_log()[log_schema().message_key().unwrap().to_string()],
2098                "first".into()
2099            );
2100            assert!(event.as_log().get_timestamp().is_some());
2101            assert_eq!(
2102                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2103                "splunk_hec".into()
2104            );
2105        })
2106        .await;
2107    }
2108
2109    #[tokio::test]
2110    async fn handles_spaces() {
2111        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2112            let message = r#" {"event":"first"} "#;
2113            let (source, address) = source(None).await;
2114
2115            assert_eq!(
2116                200,
2117                post(address, "services/collector/event", message).await
2118            );
2119
2120            let event = collect_n(source, 1).await.remove(0);
2121            assert_eq!(
2122                event.as_log()[log_schema().message_key().unwrap().to_string()],
2123                "first".into()
2124            );
2125            assert!(event.as_log().get_timestamp().is_some());
2126            assert_eq!(
2127                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2128                "splunk_hec".into()
2129            );
2130        })
2131        .await;
2132    }
2133
2134    #[tokio::test]
2135    async fn handles_non_utf8() {
2136        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2137        let message = b" {\"event\": { \"non\": \"A non UTF8 character \xE4\", \"number\": 2, \"bool\": true } } ";
2138        let (source, address) = source(None).await;
2139
2140        let b = reqwest::Client::new()
2141            .post(format!(
2142                "http://{}/{}",
2143                address, "services/collector/event"
2144            ))
2145            .header("Authorization", format!("Splunk {TOKEN}"))
2146            .body::<&[u8]>(message);
2147
2148        assert_eq!(200, b.send().await.unwrap().status().as_u16());
2149
2150        let event = collect_n(source, 1).await.remove(0);
2151        assert_eq!(event.as_log()["non"], "A non UTF8 character �".into());
2152        assert_eq!(event.as_log()["number"], 2.into());
2153        assert_eq!(event.as_log()["bool"], true.into());
2154        assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some());
2155        assert_eq!(
2156            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2157            "splunk_hec".into()
2158        );
2159    }).await;
2160    }
2161
2162    #[tokio::test]
2163    async fn default() {
2164        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2165        let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
2166        let (source, address) = source(None).await;
2167
2168        assert_eq!(
2169            200,
2170            post(address, "services/collector/event", message).await
2171        );
2172
2173        let events = collect_n(source, 3).await;
2174
2175        assert_eq!(
2176            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
2177            "first".into()
2178        );
2179        assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
2180
2181        assert_eq!(
2182            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
2183            "second".into()
2184        );
2185        assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
2186
2187        assert_eq!(
2188            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
2189            "third".into()
2190        );
2191        assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
2192    }).await;
2193    }
2194
2195    #[test]
2196    fn parse_timestamps() {
2197        let cases = vec![
2198            Utc::now(),
2199            Utc.with_ymd_and_hms(1971, 11, 7, 1, 1, 1)
2200                .single()
2201                .expect("invalid timestamp"),
2202            Utc.with_ymd_and_hms(2011, 8, 5, 1, 1, 1)
2203                .single()
2204                .expect("invalid timestamp"),
2205            Utc.with_ymd_and_hms(2189, 11, 4, 2, 2, 2)
2206                .single()
2207                .expect("invalid timestamp"),
2208        ];
2209
2210        for case in cases {
2211            let sec = case.timestamp();
2212            let millis = case.timestamp_millis();
2213            let nano = case.timestamp_nanos_opt().expect("Timestamp out of range");
2214
2215            assert_eq!(parse_timestamp(sec).unwrap().timestamp(), case.timestamp());
2216            assert_eq!(
2217                parse_timestamp(millis).unwrap().timestamp_millis(),
2218                case.timestamp_millis()
2219            );
2220            assert_eq!(
2221                parse_timestamp(nano)
2222                    .unwrap()
2223                    .timestamp_nanos_opt()
2224                    .unwrap(),
2225                case.timestamp_nanos_opt().expect("Timestamp out of range")
2226            );
2227        }
2228
2229        assert!(parse_timestamp(-1).is_none());
2230    }
2231
2232    /// This test will fail once `warp` crate fixes support for
2233    /// custom connection listener, at that point this test can be
2234    /// modified to pass.
2235    /// https://github.com/vectordotdev/vector/issues/7097
2236    /// https://github.com/seanmonstar/warp/issues/830
2237    /// https://github.com/seanmonstar/warp/pull/713
2238    #[tokio::test]
2239    async fn host_test() {
2240        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2241            let message = "for the host";
2242            let (sink, source) = start(
2243                TextSerializerConfig::default().into(),
2244                Compression::gzip_default(),
2245                None,
2246            )
2247            .await;
2248
2249            let event = channel_n(vec![message], sink, source).await.remove(0);
2250
2251            assert_eq!(
2252                event.as_log()[log_schema().message_key().unwrap().to_string()],
2253                message.into()
2254            );
2255            assert!(event
2256                .as_log()
2257                .get((PathPrefix::Event, log_schema().host_key().unwrap()))
2258                .is_none());
2259        })
2260        .await;
2261    }
2262
2263    #[derive(Deserialize)]
2264    struct HecAckEventResponse {
2265        text: String,
2266        code: u8,
2267        #[serde(rename = "ackId")]
2268        ack_id: u64,
2269    }
2270
2271    #[tokio::test]
2272    async fn ack_json_event() {
2273        let ack_config = HecAcknowledgementsConfig {
2274            enabled: Some(true),
2275            ..Default::default()
2276        };
2277        let (source, address) = source(Some(ack_config)).await;
2278        let event_message = r#"{"event":"first", "color": "blue"}{"event":"second"}"#;
2279        let opts = SendWithOpts {
2280            channel: Some(Channel::Header("guid")),
2281            forwarded_for: None,
2282        };
2283        let event_res = send_with_response(
2284            address,
2285            "services/collector/event",
2286            event_message,
2287            TOKEN,
2288            &opts,
2289        )
2290        .await
2291        .json::<HecAckEventResponse>()
2292        .await
2293        .unwrap();
2294        assert_eq!("Success", event_res.text.as_str());
2295        assert_eq!(0, event_res.code);
2296        _ = collect_n(source, 1).await;
2297
2298        let ack_message = serde_json::to_string(&HecAckStatusRequest {
2299            acks: vec![event_res.ack_id],
2300        })
2301        .unwrap();
2302        let ack_res = send_with_response(
2303            address,
2304            "services/collector/ack",
2305            ack_message.as_str(),
2306            TOKEN,
2307            &opts,
2308        )
2309        .await
2310        .json::<HecAckStatusResponse>()
2311        .await
2312        .unwrap();
2313        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2314    }
2315
2316    #[tokio::test]
2317    async fn ack_raw_event() {
2318        let ack_config = HecAcknowledgementsConfig {
2319            enabled: Some(true),
2320            ..Default::default()
2321        };
2322        let (source, address) = source(Some(ack_config)).await;
2323        let event_message = "raw event message";
2324        let opts = SendWithOpts {
2325            channel: Some(Channel::Header("guid")),
2326            forwarded_for: None,
2327        };
2328        let event_res = send_with_response(
2329            address,
2330            "services/collector/raw",
2331            event_message,
2332            TOKEN,
2333            &opts,
2334        )
2335        .await
2336        .json::<HecAckEventResponse>()
2337        .await
2338        .unwrap();
2339        assert_eq!("Success", event_res.text.as_str());
2340        assert_eq!(0, event_res.code);
2341        _ = collect_n(source, 1).await;
2342
2343        let ack_message = serde_json::to_string(&HecAckStatusRequest {
2344            acks: vec![event_res.ack_id],
2345        })
2346        .unwrap();
2347        let ack_res = send_with_response(
2348            address,
2349            "services/collector/ack",
2350            ack_message.as_str(),
2351            TOKEN,
2352            &opts,
2353        )
2354        .await
2355        .json::<HecAckStatusResponse>()
2356        .await
2357        .unwrap();
2358        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2359    }
2360
2361    #[tokio::test]
2362    async fn ack_repeat_ack_query() {
2363        let ack_config = HecAcknowledgementsConfig {
2364            enabled: Some(true),
2365            ..Default::default()
2366        };
2367        let (source, address) = source(Some(ack_config)).await;
2368        let event_message = "raw event message";
2369        let opts = SendWithOpts {
2370            channel: Some(Channel::Header("guid")),
2371            forwarded_for: None,
2372        };
2373        let event_res = send_with_response(
2374            address,
2375            "services/collector/raw",
2376            event_message,
2377            TOKEN,
2378            &opts,
2379        )
2380        .await
2381        .json::<HecAckEventResponse>()
2382        .await
2383        .unwrap();
2384        _ = collect_n(source, 1).await;
2385
2386        let ack_message = serde_json::to_string(&HecAckStatusRequest {
2387            acks: vec![event_res.ack_id],
2388        })
2389        .unwrap();
2390        let ack_res = send_with_response(
2391            address,
2392            "services/collector/ack",
2393            ack_message.as_str(),
2394            TOKEN,
2395            &opts,
2396        )
2397        .await
2398        .json::<HecAckStatusResponse>()
2399        .await
2400        .unwrap();
2401        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2402
2403        let ack_res = send_with_response(
2404            address,
2405            "services/collector/ack",
2406            ack_message.as_str(),
2407            TOKEN,
2408            &opts,
2409        )
2410        .await
2411        .json::<HecAckStatusResponse>()
2412        .await
2413        .unwrap();
2414        assert!(!ack_res.acks.get(&event_res.ack_id).unwrap());
2415    }
2416
2417    #[tokio::test]
2418    async fn ack_exceed_max_number_of_ack_channels() {
2419        let ack_config = HecAcknowledgementsConfig {
2420            enabled: Some(true),
2421            max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
2422            ..Default::default()
2423        };
2424
2425        let (_source, address) = source(Some(ack_config)).await;
2426        let mut opts = SendWithOpts {
2427            channel: Some(Channel::Header("guid")),
2428            forwarded_for: None,
2429        };
2430        assert_eq!(
2431            200,
2432            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2433        );
2434
2435        opts.channel = Some(Channel::Header("other-guid"));
2436        assert_eq!(
2437            503,
2438            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2439        );
2440        assert_eq!(
2441            503,
2442            send_with(
2443                address,
2444                "services/collector/event",
2445                r#"{"event":"first"}"#,
2446                TOKEN,
2447                &opts
2448            )
2449            .await
2450        );
2451    }
2452
2453    #[tokio::test]
2454    async fn ack_exceed_max_pending_acks_per_channel() {
2455        let ack_config = HecAcknowledgementsConfig {
2456            enabled: Some(true),
2457            max_pending_acks_per_channel: NonZeroU64::new(1).unwrap(),
2458            ..Default::default()
2459        };
2460
2461        let (source, address) = source(Some(ack_config)).await;
2462        let opts = SendWithOpts {
2463            channel: Some(Channel::Header("guid")),
2464            forwarded_for: None,
2465        };
2466        for _ in 0..5 {
2467            send_with(
2468                address,
2469                "services/collector/event",
2470                r#"{"event":"first"}"#,
2471                TOKEN,
2472                &opts,
2473            )
2474            .await;
2475        }
2476        for _ in 0..5 {
2477            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await;
2478        }
2479        let event_res = send_with_response(
2480            address,
2481            "services/collector/event",
2482            r#"{"event":"this will be acked"}"#,
2483            TOKEN,
2484            &opts,
2485        )
2486        .await
2487        .json::<HecAckEventResponse>()
2488        .await
2489        .unwrap();
2490        _ = collect_n(source, 11).await;
2491
2492        let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest {
2493            acks: (0..10).collect::<Vec<u64>>(),
2494        })
2495        .unwrap();
2496        let ack_res = send_with_response(
2497            address,
2498            "services/collector/ack",
2499            ack_message_dropped.as_str(),
2500            TOKEN,
2501            &opts,
2502        )
2503        .await
2504        .json::<HecAckStatusResponse>()
2505        .await
2506        .unwrap();
2507        assert!(ack_res.acks.values().all(|ack_status| !*ack_status));
2508
2509        let ack_message_acked = serde_json::to_string(&HecAckStatusRequest {
2510            acks: vec![event_res.ack_id],
2511        })
2512        .unwrap();
2513        let ack_res = send_with_response(
2514            address,
2515            "services/collector/ack",
2516            ack_message_acked.as_str(),
2517            TOKEN,
2518            &opts,
2519        )
2520        .await
2521        .json::<HecAckStatusResponse>()
2522        .await
2523        .unwrap();
2524        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2525    }
2526
2527    #[tokio::test]
2528    async fn ack_service_accepts_parameterized_content_type() {
2529        let ack_config = HecAcknowledgementsConfig {
2530            enabled: Some(true),
2531            ..Default::default()
2532        };
2533        let (source, address) = source(Some(ack_config)).await;
2534        let opts = SendWithOpts {
2535            channel: Some(Channel::Header("guid")),
2536            forwarded_for: None,
2537        };
2538
2539        let event_res = send_with_response(
2540            address,
2541            "services/collector/event",
2542            r#"{"event":"param-test"}"#,
2543            TOKEN,
2544            &opts,
2545        )
2546        .await
2547        .json::<HecAckEventResponse>()
2548        .await
2549        .unwrap();
2550        let _ = collect_n(source, 1).await;
2551
2552        let body = serde_json::to_string(&HecAckStatusRequest {
2553            acks: vec![event_res.ack_id],
2554        })
2555        .unwrap();
2556
2557        let res = reqwest::Client::new()
2558            .post(format!("http://{address}/services/collector/ack"))
2559            .header("Authorization", format!("Splunk {TOKEN}"))
2560            .header("x-splunk-request-channel", "guid")
2561            .header("Content-Type", "application/json; some-random-text; hello")
2562            .body(body)
2563            .send()
2564            .await
2565            .unwrap();
2566
2567        assert_eq!(200, res.status().as_u16());
2568
2569        let _parsed: HecAckStatusResponse = res.json().await.unwrap();
2570    }
2571
2572    #[tokio::test]
2573    async fn event_service_acknowledgements_enabled_channel_required() {
2574        let message = r#"{"event":"first", "color": "blue"}"#;
2575        let ack_config = HecAcknowledgementsConfig {
2576            enabled: Some(true),
2577            ..Default::default()
2578        };
2579        let (_, address) = source(Some(ack_config)).await;
2580
2581        let opts = SendWithOpts {
2582            channel: None,
2583            forwarded_for: None,
2584        };
2585
2586        assert_eq!(
2587            400,
2588            send_with(address, "services/collector/event", message, TOKEN, &opts).await
2589        );
2590    }
2591
2592    #[tokio::test]
2593    async fn ack_service_acknowledgements_disabled() {
2594        let message = r#" {"acks":[0]} "#;
2595        let (_, address) = source(None).await;
2596
2597        let opts = SendWithOpts {
2598            channel: Some(Channel::Header("guid")),
2599            forwarded_for: None,
2600        };
2601
2602        assert_eq!(
2603            400,
2604            send_with(address, "services/collector/ack", message, TOKEN, &opts).await
2605        );
2606    }
2607
2608    #[test]
2609    fn output_schema_definition_vector_namespace() {
2610        let config = SplunkConfig {
2611            log_namespace: Some(true),
2612            ..Default::default()
2613        };
2614
2615        let definition = config
2616            .outputs(LogNamespace::Vector)
2617            .remove(0)
2618            .schema_definition(true);
2619
2620        let expected_definition = Definition::new_with_default_metadata(
2621            Kind::object(Collection::empty()).or_bytes(),
2622            [LogNamespace::Vector],
2623        )
2624        .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
2625        .with_metadata_field(
2626            &owned_value_path!("vector", "source_type"),
2627            Kind::bytes(),
2628            None,
2629        )
2630        .with_metadata_field(
2631            &owned_value_path!("vector", "ingest_timestamp"),
2632            Kind::timestamp(),
2633            None,
2634        )
2635        .with_metadata_field(
2636            &owned_value_path!("splunk_hec", "host"),
2637            Kind::bytes(),
2638            Some("host"),
2639        )
2640        .with_metadata_field(
2641            &owned_value_path!("splunk_hec", "index"),
2642            Kind::bytes(),
2643            None,
2644        )
2645        .with_metadata_field(
2646            &owned_value_path!("splunk_hec", "source"),
2647            Kind::bytes(),
2648            Some("service"),
2649        )
2650        .with_metadata_field(
2651            &owned_value_path!("splunk_hec", "channel"),
2652            Kind::bytes(),
2653            None,
2654        )
2655        .with_metadata_field(
2656            &owned_value_path!("splunk_hec", "sourcetype"),
2657            Kind::bytes(),
2658            None,
2659        );
2660
2661        assert_eq!(definition, Some(expected_definition));
2662    }
2663
2664    #[test]
2665    fn output_schema_definition_legacy_namespace() {
2666        let config = SplunkConfig::default();
2667        let definitions = config
2668            .outputs(LogNamespace::Legacy)
2669            .remove(0)
2670            .schema_definition(true);
2671
2672        let expected_definition = Definition::new_with_default_metadata(
2673            Kind::object(Collection::empty()),
2674            [LogNamespace::Legacy],
2675        )
2676        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
2677        .with_event_field(
2678            &owned_value_path!("message"),
2679            Kind::bytes().or_undefined(),
2680            Some("message"),
2681        )
2682        .with_event_field(
2683            &owned_value_path!("line"),
2684            Kind::array(Collection::empty())
2685                .or_object(Collection::empty())
2686                .or_undefined(),
2687            None,
2688        )
2689        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
2690        .with_event_field(&owned_value_path!("splunk_channel"), Kind::bytes(), None)
2691        .with_event_field(&owned_value_path!("splunk_index"), Kind::bytes(), None)
2692        .with_event_field(
2693            &owned_value_path!("splunk_source"),
2694            Kind::bytes(),
2695            Some("service"),
2696        )
2697        .with_event_field(&owned_value_path!("splunk_sourcetype"), Kind::bytes(), None)
2698        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
2699
2700        assert_eq!(definitions, Some(expected_definition));
2701    }
2702
2703    impl ValidatableComponent for SplunkConfig {
2704        fn validation_configuration() -> ValidationConfiguration {
2705            let config = Self {
2706                address: default_socket_address(),
2707                ..Default::default()
2708            };
2709
2710            let listen_addr_http = format!("http://{}/services/collector/event", config.address);
2711            let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
2712
2713            let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into();
2714            let framing = BytesDecoderConfig::new().into();
2715            let decoding = DeserializerConfig::Json(Default::default());
2716
2717            let external_resource = ExternalResource::new(
2718                ResourceDirection::Push,
2719                HttpResourceConfig::from_parts(uri, None).with_headers(HashMap::from([(
2720                    X_SPLUNK_REQUEST_CHANNEL.to_string(),
2721                    "channel".to_string(),
2722                )])),
2723                DecodingConfig::new(framing, decoding, false.into()),
2724            );
2725
2726            ValidationConfiguration::from_source(
2727                Self::NAME,
2728                log_namespace,
2729                vec![ComponentTestCaseConfig::from_source(
2730                    config,
2731                    None,
2732                    Some(external_resource),
2733                )],
2734            )
2735        }
2736    }
2737
2738    register_validatable_component!(SplunkConfig);
2739}