vector/sources/splunk_hec/
mod.rs

1use std::{
2    collections::HashMap,
3    convert::Infallible,
4    io::Read,
5    net::{Ipv4Addr, SocketAddr},
6    sync::Arc,
7    time::Duration,
8};
9
10use bytes::{Buf, Bytes};
11use chrono::{DateTime, TimeZone, Utc};
12use flate2::read::MultiGzDecoder;
13use futures::FutureExt;
14use http::StatusCode;
15use hyper::{Server, service::make_service_fn};
16use serde::{Serialize, de::DeserializeOwned};
17use serde_json::{
18    Deserializer, Value as JsonValue,
19    de::{Read as JsonRead, StrRead},
20};
21use snafu::Snafu;
22use tokio::net::TcpStream;
23use tower::ServiceBuilder;
24use tracing::Span;
25use vector_lib::{
26    EstimatedJsonEncodedSizeOf,
27    config::{LegacyKey, LogNamespace},
28    configurable::configurable_component,
29    event::BatchNotifier,
30    internal_event::{CountByteSize, InternalEventHandle as _, Registered},
31    lookup::{self, event_path, lookup_v2::OptionalValuePath, owned_value_path},
32    schema::meaning,
33    sensitive_string::SensitiveString,
34    source_sender::SendError,
35    tls::MaybeTlsIncomingStream,
36};
37use vrl::{
38    path::OwnedTargetPath,
39    value::{Kind, kind::Collection},
40};
41use warp::{
42    Filter, Reply,
43    filters::BoxedFilter,
44    http::header::{CONTENT_TYPE, HeaderValue},
45    path,
46    reject::Rejection,
47    reply::Response,
48};
49
50use self::{
51    acknowledgements::{
52        HecAckStatusRequest, HecAckStatusResponse, HecAcknowledgementsConfig,
53        IndexerAcknowledgement,
54    },
55    splunk_response::{HecResponse, HecResponseMetadata, HecStatusCode},
56};
57use crate::{
58    SourceSender,
59    config::{DataType, Resource, SourceConfig, SourceContext, SourceOutput, log_schema},
60    event::{Event, LogEvent, Value},
61    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
62    internal_events::{
63        EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError,
64    },
65    serde::bool_or_struct,
66    tls::{MaybeTlsSettings, TlsEnableableConfig},
67};
68
69mod acknowledgements;
70
71// Event fields unique to splunk_hec source
72pub const CHANNEL: &str = "splunk_channel";
73pub const INDEX: &str = "splunk_index";
74pub const SOURCE: &str = "splunk_source";
75pub const SOURCETYPE: &str = "splunk_sourcetype";
76
77const X_SPLUNK_REQUEST_CHANNEL: &str = "x-splunk-request-channel";
78
79/// Configuration for the `splunk_hec` source.
80#[configurable_component(source("splunk_hec", "Receive logs from Splunk."))]
81#[derive(Clone, Debug)]
82#[serde(deny_unknown_fields, default)]
83pub struct SplunkConfig {
84    /// The socket address to listen for connections on.
85    ///
86    /// The address _must_ include a port.
87    #[serde(default = "default_socket_address")]
88    pub address: SocketAddr,
89
90    /// Optional authorization token.
91    ///
92    /// If supplied, incoming requests must supply this token in the `Authorization` header, just as a client would if
93    /// it was communicating with the Splunk HEC endpoint directly.
94    ///
95    /// If _not_ supplied, the `Authorization` header is ignored and requests are not authenticated.
96    #[configurable(deprecated = "This option has been deprecated, use `valid_tokens` instead.")]
97    token: Option<SensitiveString>,
98
99    /// A list of valid authorization tokens.
100    ///
101    /// If supplied, incoming requests must supply one of these tokens in the `Authorization` header, just as a client
102    /// would if it was communicating with the Splunk HEC endpoint directly.
103    ///
104    /// If _not_ supplied, the `Authorization` header is ignored and requests are not authenticated.
105    #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
106    valid_tokens: Option<Vec<SensitiveString>>,
107
108    /// Whether or not to forward the Splunk HEC authentication token with events.
109    ///
110    /// If set to `true`, when incoming requests contain a Splunk HEC token, the token used is kept in the
111    /// event metadata and preferentially used if the event is sent to a Splunk HEC sink.
112    store_hec_token: bool,
113
114    #[configurable(derived)]
115    tls: Option<TlsEnableableConfig>,
116
117    #[configurable(derived)]
118    #[serde(deserialize_with = "bool_or_struct")]
119    acknowledgements: HecAcknowledgementsConfig,
120
121    /// The namespace to use for logs. This overrides the global settings.
122    #[configurable(metadata(docs::hidden))]
123    #[serde(default)]
124    log_namespace: Option<bool>,
125
126    #[configurable(derived)]
127    #[serde(default)]
128    keepalive: KeepaliveConfig,
129}
130
131impl_generate_config_from_default!(SplunkConfig);
132
133impl Default for SplunkConfig {
134    fn default() -> Self {
135        SplunkConfig {
136            address: default_socket_address(),
137            token: None,
138            valid_tokens: None,
139            tls: None,
140            acknowledgements: Default::default(),
141            store_hec_token: false,
142            log_namespace: None,
143            keepalive: Default::default(),
144        }
145    }
146}
147
148fn default_socket_address() -> SocketAddr {
149    SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8088)
150}
151
152#[async_trait::async_trait]
153#[typetag::serde(name = "splunk_hec")]
154impl SourceConfig for SplunkConfig {
155    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
156        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
157        let shutdown = cx.shutdown.clone();
158        let out = cx.out.clone();
159        let source = SplunkSource::new(self, tls.http_protocol_name(), cx);
160
161        let event_service = source.event_service(out.clone());
162        let raw_service = source.raw_service(out);
163        let health_service = source.health_service();
164        let ack_service = source.ack_service();
165        let options = SplunkSource::options();
166
167        let services = path!("services" / "collector" / ..)
168            .and(
169                event_service
170                    .or(raw_service)
171                    .unify()
172                    .or(health_service)
173                    .unify()
174                    .or(ack_service)
175                    .unify()
176                    .or(options)
177                    .unify(),
178            )
179            .or_else(finish_err);
180
181        let listener = tls.bind(&self.address).await?;
182
183        let keepalive_settings = self.keepalive.clone();
184        Ok(Box::pin(async move {
185            let span = Span::current();
186            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
187                let svc = ServiceBuilder::new()
188                    .layer(build_http_trace_layer(span.clone()))
189                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
190                        MaxConnectionAgeLayer::new(
191                            Duration::from_secs(secs),
192                            keepalive_settings.max_connection_age_jitter_factor,
193                            conn.peer_addr(),
194                        )
195                    }))
196                    .service(warp::service(services.clone()));
197                futures_util::future::ok::<_, Infallible>(svc)
198            });
199
200            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
201                .serve(make_svc)
202                .with_graceful_shutdown(shutdown.map(|_| ()))
203                .await
204                .map_err(|err| {
205                    error!("An error occurred: {:?}.", err);
206                })?;
207
208            Ok(())
209        }))
210    }
211
212    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
213        let log_namespace = global_log_namespace.merge(self.log_namespace);
214
215        let schema_definition = match log_namespace {
216            LogNamespace::Legacy => {
217                let definition = vector_lib::schema::Definition::empty_legacy_namespace()
218                    .with_event_field(
219                        &owned_value_path!("line"),
220                        Kind::object(Collection::empty())
221                            .or_array(Collection::empty())
222                            .or_undefined(),
223                        None,
224                    );
225
226                if let Some(message_key) = log_schema().message_key() {
227                    definition.with_event_field(
228                        message_key,
229                        Kind::bytes().or_undefined(),
230                        Some(meaning::MESSAGE),
231                    )
232                } else {
233                    definition
234                }
235            }
236            LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
237                Kind::bytes().or_object(Collection::empty()),
238                [log_namespace],
239            )
240            .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
241        }
242        .with_standard_vector_source_metadata()
243        .with_source_metadata(
244            SplunkConfig::NAME,
245            log_schema()
246                .host_key()
247                .cloned()
248                .map(LegacyKey::InsertIfEmpty),
249            &owned_value_path!("host"),
250            Kind::bytes(),
251            Some(meaning::HOST),
252        )
253        .with_source_metadata(
254            SplunkConfig::NAME,
255            Some(LegacyKey::Overwrite(owned_value_path!(CHANNEL))),
256            &owned_value_path!("channel"),
257            Kind::bytes(),
258            None,
259        )
260        .with_source_metadata(
261            SplunkConfig::NAME,
262            Some(LegacyKey::Overwrite(owned_value_path!(INDEX))),
263            &owned_value_path!("index"),
264            Kind::bytes(),
265            None,
266        )
267        .with_source_metadata(
268            SplunkConfig::NAME,
269            Some(LegacyKey::Overwrite(owned_value_path!(SOURCE))),
270            &owned_value_path!("source"),
271            Kind::bytes(),
272            Some(meaning::SERVICE),
273        )
274        // Not to be confused with `source_type`.
275        .with_source_metadata(
276            SplunkConfig::NAME,
277            Some(LegacyKey::Overwrite(owned_value_path!(SOURCETYPE))),
278            &owned_value_path!("sourcetype"),
279            Kind::bytes(),
280            None,
281        );
282
283        vec![SourceOutput::new_maybe_logs(
284            DataType::Log,
285            schema_definition,
286        )]
287    }
288
289    fn resources(&self) -> Vec<Resource> {
290        vec![Resource::tcp(self.address)]
291    }
292
293    fn can_acknowledge(&self) -> bool {
294        true
295    }
296}
297
298/// Shared data for responding to requests.
299struct SplunkSource {
300    valid_credentials: Vec<String>,
301    protocol: &'static str,
302    idx_ack: Option<Arc<IndexerAcknowledgement>>,
303    store_hec_token: bool,
304    log_namespace: LogNamespace,
305    events_received: Registered<EventsReceived>,
306}
307
308impl SplunkSource {
309    fn new(config: &SplunkConfig, protocol: &'static str, cx: SourceContext) -> Self {
310        let log_namespace = cx.log_namespace(config.log_namespace);
311        let acknowledgements = cx.do_acknowledgements(config.acknowledgements.enabled.into());
312        let shutdown = cx.shutdown;
313        let valid_tokens = config
314            .valid_tokens
315            .iter()
316            .flatten()
317            .chain(config.token.iter());
318
319        let idx_ack = acknowledgements.then(|| {
320            Arc::new(IndexerAcknowledgement::new(
321                config.acknowledgements.clone(),
322                shutdown,
323            ))
324        });
325
326        SplunkSource {
327            valid_credentials: valid_tokens
328                .map(|token| format!("Splunk {}", token.inner()))
329                .collect(),
330            protocol,
331            idx_ack,
332            store_hec_token: config.store_hec_token,
333            log_namespace,
334            events_received: register!(EventsReceived),
335        }
336    }
337
338    fn event_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
339        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
340            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
341        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
342
343        let splunk_channel = splunk_channel_header
344            .and(splunk_channel_query_param)
345            .map(|header: Option<String>, query_param| header.or(query_param));
346
347        let protocol = self.protocol;
348        let idx_ack = self.idx_ack.clone();
349        let store_hec_token = self.store_hec_token;
350        let log_namespace = self.log_namespace;
351        let events_received = self.events_received.clone();
352
353        warp::post()
354            .and(
355                path!("event")
356                    .or(path!("event" / "1.0"))
357                    .or(warp::path::end()),
358            )
359            .and(self.authorization())
360            .and(splunk_channel)
361            .and(warp::addr::remote())
362            .and(warp::header::optional::<String>("X-Forwarded-For"))
363            .and(self.gzip())
364            .and(warp::body::bytes())
365            .and(warp::path::full())
366            .and_then(
367                move |_,
368                      token: Option<String>,
369                      channel: Option<String>,
370                      remote: Option<SocketAddr>,
371                      remote_addr: Option<String>,
372                      gzip: bool,
373                      body: Bytes,
374                      path: warp::path::FullPath| {
375                    let mut out = out.clone();
376                    let idx_ack = idx_ack.clone();
377                    let events_received = events_received.clone();
378
379                    async move {
380                        if idx_ack.is_some() && channel.is_none() {
381                            return Err(Rejection::from(ApiError::MissingChannel));
382                        }
383
384                        let mut data = Vec::new();
385                        let (byte_size, body) = if gzip {
386                            MultiGzDecoder::new(body.reader())
387                                .read_to_end(&mut data)
388                                .map_err(|_| Rejection::from(ApiError::BadRequest))?;
389                            (data.len(), String::from_utf8_lossy(data.as_slice()))
390                        } else {
391                            (body.len(), String::from_utf8_lossy(body.as_ref()))
392                        };
393                        emit!(HttpBytesReceived {
394                            byte_size,
395                            http_path: path.as_str(),
396                            protocol,
397                        });
398
399                        let (batch, receiver) =
400                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
401                        let maybe_ack_id = match (idx_ack, receiver, channel.clone()) {
402                            (Some(idx_ack), Some(receiver), Some(channel_id)) => {
403                                match idx_ack.get_ack_id_from_channel(channel_id, receiver).await {
404                                    Ok(ack_id) => Some(ack_id),
405                                    Err(rej) => return Err(rej),
406                                }
407                            }
408                            _ => None,
409                        };
410
411                        let mut error = None;
412                        let mut events = Vec::new();
413
414                        let iter: EventIterator<'_, StrRead<'_>> = EventIteratorGenerator {
415                            deserializer: Deserializer::from_str(&body).into_iter::<JsonValue>(),
416                            channel,
417                            remote,
418                            remote_addr,
419                            batch,
420                            token: token.filter(|_| store_hec_token).map(Into::into),
421                            log_namespace,
422                            events_received,
423                        }
424                        .into();
425
426                        for result in iter {
427                            match result {
428                                Ok(event) => events.push(event),
429                                Err(err) => {
430                                    error = Some(err);
431                                    break;
432                                }
433                            }
434                        }
435
436                        if !events.is_empty() {
437                            match out.send_batch(events).await {
438                                Ok(()) => (),
439                                Err(SendError::Closed) => {
440                                    return Err(Rejection::from(ApiError::ServerShutdown));
441                                }
442                                Err(SendError::Timeout) => {
443                                    unreachable!("No timeout is configured for this source.")
444                                }
445                            }
446                        }
447
448                        if let Some(error) = error {
449                            Err(error)
450                        } else {
451                            Ok(maybe_ack_id)
452                        }
453                    }
454                },
455            )
456            .map(finish_ok)
457            .boxed()
458    }
459
460    fn raw_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
461        let protocol = self.protocol;
462        let idx_ack = self.idx_ack.clone();
463        let store_hec_token = self.store_hec_token;
464        let events_received = self.events_received.clone();
465        let log_namespace = self.log_namespace;
466
467        warp::post()
468            .and(path!("raw" / "1.0").or(path!("raw")))
469            .and(self.authorization())
470            .and(SplunkSource::required_channel())
471            .and(warp::addr::remote())
472            .and(warp::header::optional::<String>("X-Forwarded-For"))
473            .and(self.gzip())
474            .and(warp::body::bytes())
475            .and(warp::path::full())
476            .and_then(
477                move |_,
478                      token: Option<String>,
479                      channel_id: String,
480                      remote: Option<SocketAddr>,
481                      xff: Option<String>,
482                      gzip: bool,
483                      body: Bytes,
484                      path: warp::path::FullPath| {
485                    let mut out = out.clone();
486                    let idx_ack = idx_ack.clone();
487                    let events_received = events_received.clone();
488                    emit!(HttpBytesReceived {
489                        byte_size: body.len(),
490                        http_path: path.as_str(),
491                        protocol,
492                    });
493
494                    async move {
495                        let (batch, receiver) =
496                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
497                        let maybe_ack_id = match (idx_ack, receiver) {
498                            (Some(idx_ack), Some(receiver)) => Some(
499                                idx_ack
500                                    .get_ack_id_from_channel(channel_id.clone(), receiver)
501                                    .await?,
502                            ),
503                            _ => None,
504                        };
505                        let mut event = raw_event(
506                            body,
507                            gzip,
508                            channel_id,
509                            remote,
510                            xff,
511                            batch,
512                            log_namespace,
513                            &events_received,
514                        )?;
515                        if let Some(token) = token.filter(|_| store_hec_token) {
516                            event.metadata_mut().set_splunk_hec_token(token.into());
517                        }
518
519                        let res = out.send_event(event).await;
520                        res.map(|_| maybe_ack_id)
521                            .map_err(|_| Rejection::from(ApiError::ServerShutdown))
522                    }
523                },
524            )
525            .map(finish_ok)
526            .boxed()
527    }
528
529    fn health_service(&self) -> BoxedFilter<(Response,)> {
530        // The Splunk docs document this endpoint as returning a 400 if given an invalid Splunk
531        // token, but, in practice, it seems to ignore the token altogether
532        //
533        // The response body was taken from Splunk 8.2.4
534        //
535        // https://docs.splunk.com/Documentation/Splunk/8.2.5/RESTREF/RESTinput#services.2Fcollector.2Fhealth
536        warp::get()
537            .and(path!("health" / "1.0").or(path!("health")))
538            .map(move |_| {
539                http::Response::builder()
540                    .header(http::header::CONTENT_TYPE, "application/json")
541                    .body(hyper::Body::from(r#"{"text":"HEC is healthy","code":17}"#))
542                    .expect("static response")
543            })
544            .boxed()
545    }
546
547    fn lenient_json_content_type_check<T>() -> impl Filter<Extract = (T,), Error = Rejection> + Clone
548    where
549        T: Send + DeserializeOwned + 'static,
550    {
551        warp::header::optional::<HeaderValue>(CONTENT_TYPE.as_str())
552            .and(warp::body::bytes())
553            .and_then(
554                |ctype: Option<HeaderValue>, body: bytes::Bytes| async move {
555                    let ok = ctype
556                        .as_ref()
557                        .and_then(|v| v.to_str().ok())
558                        .map(|h| h.to_ascii_lowercase().contains("application/json"))
559                        .unwrap_or(true);
560
561                    if !ok {
562                        return Err(warp::reject::custom(ApiError::UnsupportedContentType));
563                    }
564
565                    let value = serde_json::from_slice::<T>(&body)
566                        .map_err(|_| warp::reject::custom(ApiError::BadRequest))?;
567
568                    Ok(value)
569                },
570            )
571    }
572
573    fn ack_service(&self) -> BoxedFilter<(Response,)> {
574        let idx_ack = self.idx_ack.clone();
575
576        warp::post()
577            .and(warp::path!("ack"))
578            .and(self.authorization())
579            .and(SplunkSource::required_channel())
580            .and(Self::lenient_json_content_type_check::<HecAckStatusRequest>())
581            .and_then(move |_, channel: String, req: HecAckStatusRequest| {
582                let idx_ack = idx_ack.clone();
583                async move {
584                    if let Some(idx_ack) = idx_ack {
585                        let acks = idx_ack
586                            .get_acks_status_from_channel(channel, &req.acks)
587                            .await?;
588                        Ok(warp::reply::json(&HecAckStatusResponse { acks }).into_response())
589                    } else {
590                        Err(warp::reject::custom(ApiError::AckIsDisabled))
591                    }
592                }
593            })
594            .boxed()
595    }
596
597    fn options() -> BoxedFilter<(Response,)> {
598        let post = warp::options()
599            .and(
600                path!("event")
601                    .or(path!("event" / "1.0"))
602                    .or(path!("raw" / "1.0"))
603                    .or(path!("raw")),
604            )
605            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
606
607        let get = warp::options()
608            .and(path!("health").or(path!("health" / "1.0")))
609            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
610
611        post.or(get).unify().boxed()
612    }
613
614    /// Authorize request
615    fn authorization(&self) -> BoxedFilter<(Option<String>,)> {
616        let valid_credentials = self.valid_credentials.clone();
617        warp::header::optional("Authorization")
618            .and_then(move |token: Option<String>| {
619                let valid_credentials = valid_credentials.clone();
620                async move {
621                    match (token, valid_credentials.is_empty()) {
622                        // Remove the "Splunk " prefix if present as it is not
623                        // part of the token itself
624                        (token, true) => {
625                            Ok(token
626                                .map(|t| t.strip_prefix("Splunk ").map(Into::into).unwrap_or(t)))
627                        }
628                        (Some(token), false) if valid_credentials.contains(&token) => Ok(Some(
629                            token
630                                .strip_prefix("Splunk ")
631                                .map(Into::into)
632                                .unwrap_or(token),
633                        )),
634                        (Some(_), false) => Err(Rejection::from(ApiError::InvalidAuthorization)),
635                        (None, false) => Err(Rejection::from(ApiError::MissingAuthorization)),
636                    }
637                }
638            })
639            .boxed()
640    }
641
642    /// Is body encoded with gzip
643    fn gzip(&self) -> BoxedFilter<(bool,)> {
644        warp::header::optional::<String>("Content-Encoding")
645            .and_then(|encoding: Option<String>| async move {
646                match encoding {
647                    Some(s) if s.as_bytes() == b"gzip" => Ok(true),
648                    Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
649                    None => Ok(false),
650                }
651            })
652            .boxed()
653    }
654
655    fn required_channel() -> BoxedFilter<(String,)> {
656        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
657            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
658        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
659
660        splunk_channel_header
661            .and(splunk_channel_query_param)
662            .and_then(|header: Option<String>, query_param| async move {
663                header
664                    .or(query_param)
665                    .ok_or_else(|| Rejection::from(ApiError::MissingChannel))
666            })
667            .boxed()
668    }
669}
670/// Constructs one or more events from json-s coming from reader.
671/// If errors, it's done with input.
672struct EventIterator<'de, R: JsonRead<'de>> {
673    /// Remaining request with JSON events
674    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
675    /// Count of sent events
676    events: usize,
677    /// Optional channel from headers
678    channel: Option<Value>,
679    /// Default time
680    time: Time,
681    /// Remaining extracted default values
682    extractors: [DefaultExtractor; 4],
683    /// Event finalization
684    batch: Option<BatchNotifier>,
685    /// Splunk HEC Token for passthrough
686    token: Option<Arc<str>>,
687    /// Lognamespace to put the events in
688    log_namespace: LogNamespace,
689    /// handle to EventsReceived registry
690    events_received: Registered<EventsReceived>,
691}
692
693/// Intermediate struct to generate an `EventIterator`
694struct EventIteratorGenerator<'de, R: JsonRead<'de>> {
695    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
696    channel: Option<String>,
697    batch: Option<BatchNotifier>,
698    token: Option<Arc<str>>,
699    log_namespace: LogNamespace,
700    events_received: Registered<EventsReceived>,
701    remote: Option<SocketAddr>,
702    remote_addr: Option<String>,
703}
704
705impl<'de, R: JsonRead<'de>> From<EventIteratorGenerator<'de, R>> for EventIterator<'de, R> {
706    fn from(f: EventIteratorGenerator<'de, R>) -> Self {
707        Self {
708            deserializer: f.deserializer,
709            events: 0,
710            channel: f.channel.map(Value::from),
711            time: Time::Now(Utc::now()),
712            extractors: [
713                // Extract the host field with the given priority:
714                // 1. The host field is present in the event payload
715                // 2. The x-forwarded-for header is present in the incoming request
716                // 3. Use the `remote`: SocketAddr value provided by warp
717                DefaultExtractor::new_with(
718                    "host",
719                    log_schema().host_key().cloned().into(),
720                    f.remote_addr
721                        .or_else(|| f.remote.map(|addr| addr.to_string()))
722                        .map(Value::from),
723                    f.log_namespace,
724                ),
725                DefaultExtractor::new("index", OptionalValuePath::new(INDEX), f.log_namespace),
726                DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), f.log_namespace),
727                DefaultExtractor::new(
728                    "sourcetype",
729                    OptionalValuePath::new(SOURCETYPE),
730                    f.log_namespace,
731                ),
732            ],
733            batch: f.batch,
734            token: f.token,
735            log_namespace: f.log_namespace,
736            events_received: f.events_received,
737        }
738    }
739}
740
741impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
742    fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
743        // Construct Event from parsed json event
744        let mut log = match self.log_namespace {
745            LogNamespace::Vector => self.build_log_vector(&mut json)?,
746            LogNamespace::Legacy => self.build_log_legacy(&mut json)?,
747        };
748
749        // Add source type
750        self.log_namespace.insert_vector_metadata(
751            &mut log,
752            log_schema().source_type_key(),
753            &owned_value_path!("source_type"),
754            SplunkConfig::NAME,
755        );
756
757        // Process channel field
758        let channel_path = owned_value_path!(CHANNEL);
759        if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
760            self.log_namespace.insert_source_metadata(
761                SplunkConfig::NAME,
762                &mut log,
763                Some(LegacyKey::Overwrite(&channel_path)),
764                lookup::path!(CHANNEL),
765                guid,
766            );
767        } else if let Some(guid) = self.channel.as_ref() {
768            self.log_namespace.insert_source_metadata(
769                SplunkConfig::NAME,
770                &mut log,
771                Some(LegacyKey::Overwrite(&channel_path)),
772                lookup::path!(CHANNEL),
773                guid.clone(),
774            );
775        }
776
777        // Process fields field
778        if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
779            for (key, value) in object {
780                self.log_namespace.insert_source_metadata(
781                    SplunkConfig::NAME,
782                    &mut log,
783                    Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
784                    lookup::path!(key.as_str()),
785                    value,
786                );
787            }
788        }
789
790        // Process time field
791        let parsed_time = match json.get_mut("time").map(JsonValue::take) {
792            Some(JsonValue::Number(time)) => Some(Some(time)),
793            Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
794            _ => None,
795        };
796
797        match parsed_time {
798            None => (),
799            Some(Some(t)) => {
800                if let Some(t) = t.as_u64() {
801                    let time = parse_timestamp(t as i64)
802                        .ok_or(ApiError::InvalidDataFormat { event: self.events })?;
803
804                    self.time = Time::Provided(time);
805                } else if let Some(t) = t.as_f64() {
806                    self.time = Time::Provided(
807                        Utc.timestamp_opt(
808                            t.floor() as i64,
809                            (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
810                        )
811                        .single()
812                        .expect("invalid timestamp"),
813                    );
814                } else {
815                    return Err(ApiError::InvalidDataFormat { event: self.events }.into());
816                }
817            }
818            Some(None) => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
819        }
820
821        // Add time field
822        let timestamp = match self.time.clone() {
823            Time::Provided(time) => time,
824            Time::Now(time) => time,
825        };
826
827        self.log_namespace.insert_source_metadata(
828            SplunkConfig::NAME,
829            &mut log,
830            log_schema().timestamp_key().map(LegacyKey::Overwrite),
831            lookup::path!("timestamp"),
832            timestamp,
833        );
834
835        // Extract default extracted fields
836        for de in self.extractors.iter_mut() {
837            de.extract(&mut log, &mut json);
838        }
839
840        // Add passthrough token if present
841        if let Some(token) = &self.token {
842            log.metadata_mut().set_splunk_hec_token(Arc::clone(token));
843        }
844
845        if let Some(batch) = self.batch.clone() {
846            log = log.with_batch_notifier(&batch);
847        }
848
849        self.events += 1;
850
851        Ok(log.into())
852    }
853
854    /// Build the log event for the vector namespace.
855    /// In this namespace the log event is created entirely from the event field.
856    /// No renaming of the `line` field is done.
857    fn build_log_vector(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
858        match json.get("event") {
859            Some(event) => {
860                let event: Value = event.into();
861                let mut log = LogEvent::from(event);
862
863                // EstimatedJsonSizeOf must be calculated before enrichment
864                self.events_received
865                    .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
866
867                // The timestamp is extracted from the message for the Legacy namespace.
868                self.log_namespace.insert_vector_metadata(
869                    &mut log,
870                    log_schema().timestamp_key(),
871                    lookup::path!("ingest_timestamp"),
872                    chrono::Utc::now(),
873                );
874
875                Ok(log)
876            }
877            None => Err(ApiError::MissingEventField { event: self.events }.into()),
878        }
879    }
880
881    /// Build the log event for the legacy namespace.
882    /// If the event is a string, or the event contains a field called `line` that is a string
883    /// (the docker splunk logger places the message in the event.line field) that string
884    /// is placed in the message field.
885    fn build_log_legacy(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
886        let mut log = LogEvent::default();
887        match json.get_mut("event") {
888            Some(event) => match event.take() {
889                JsonValue::String(string) => {
890                    if string.is_empty() {
891                        return Err(ApiError::EmptyEventField { event: self.events }.into());
892                    }
893                    log.maybe_insert(log_schema().message_key_target_path(), string);
894                }
895                JsonValue::Object(mut object) => {
896                    if object.is_empty() {
897                        return Err(ApiError::EmptyEventField { event: self.events }.into());
898                    }
899
900                    // Add 'line' value as 'event::schema().message_key'
901                    if let Some(line) = object.remove("line") {
902                        match line {
903                            // This don't quite fit the meaning of a event::schema().message_key
904                            JsonValue::Array(_) | JsonValue::Object(_) => {
905                                log.insert(event_path!("line"), line);
906                            }
907                            _ => {
908                                log.maybe_insert(log_schema().message_key_target_path(), line);
909                            }
910                        }
911                    }
912
913                    for (key, value) in object {
914                        log.insert(event_path!(key.as_str()), value);
915                    }
916                }
917                _ => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
918            },
919            None => return Err(ApiError::MissingEventField { event: self.events }.into()),
920        };
921
922        // EstimatedJsonSizeOf must be calculated before enrichment
923        self.events_received
924            .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
925
926        Ok(log)
927    }
928}
929
930impl<'de, R: JsonRead<'de>> Iterator for EventIterator<'de, R> {
931    type Item = Result<Event, Rejection>;
932
933    fn next(&mut self) -> Option<Self::Item> {
934        match self.deserializer.next() {
935            Some(Ok(json)) => Some(self.build_event(json)),
936            None => {
937                if self.events == 0 {
938                    Some(Err(ApiError::NoData.into()))
939                } else {
940                    None
941                }
942            }
943            Some(Err(error)) => {
944                emit!(SplunkHecRequestBodyInvalidError {
945                    error: error.into()
946                });
947                Some(Err(
948                    ApiError::InvalidDataFormat { event: self.events }.into()
949                ))
950            }
951        }
952    }
953}
954
955/// Parse a `i64` unix timestamp that can either be in seconds, milliseconds or
956/// nanoseconds.
957///
958/// This attempts to parse timestamps based on what cutoff range they fall into.
959/// For seconds to be parsed the timestamp must be less than the unix epoch of
960/// the year `2400`. For this to parse milliseconds the time must be smaller
961/// than the year `10,000` in unix epoch milliseconds. If the value is larger
962/// than both we attempt to parse it as nanoseconds.
963///
964/// Returns `None` if `t` is negative.
965fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
966    // Utc.ymd(2400, 1, 1).and_hms(0,0,0).timestamp();
967    const SEC_CUTOFF: i64 = 13569465600;
968    // Utc.ymd(10_000, 1, 1).and_hms(0,0,0).timestamp_millis();
969    const MILLISEC_CUTOFF: i64 = 253402300800000;
970
971    // Timestamps can't be negative!
972    if t < 0 {
973        return None;
974    }
975
976    let ts = if t < SEC_CUTOFF {
977        Utc.timestamp_opt(t, 0).single().expect("invalid timestamp")
978    } else if t < MILLISEC_CUTOFF {
979        Utc.timestamp_millis_opt(t)
980            .single()
981            .expect("invalid timestamp")
982    } else {
983        Utc.timestamp_nanos(t)
984    };
985
986    Some(ts)
987}
988
989/// Maintains last known extracted value of field and uses it in the absence of field.
990struct DefaultExtractor {
991    field: &'static str,
992    to_field: OptionalValuePath,
993    value: Option<Value>,
994    log_namespace: LogNamespace,
995}
996
997impl DefaultExtractor {
998    const fn new(
999        field: &'static str,
1000        to_field: OptionalValuePath,
1001        log_namespace: LogNamespace,
1002    ) -> Self {
1003        DefaultExtractor {
1004            field,
1005            to_field,
1006            value: None,
1007            log_namespace,
1008        }
1009    }
1010
1011    fn new_with(
1012        field: &'static str,
1013        to_field: OptionalValuePath,
1014        value: impl Into<Option<Value>>,
1015        log_namespace: LogNamespace,
1016    ) -> Self {
1017        DefaultExtractor {
1018            field,
1019            to_field,
1020            value: value.into(),
1021            log_namespace,
1022        }
1023    }
1024
1025    fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
1026        // Process json_field
1027        if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
1028            self.value = Some(new_value.into());
1029        }
1030
1031        // Add data field
1032        if let Some(index) = self.value.as_ref()
1033            && let Some(metadata_key) = self.to_field.path.as_ref()
1034        {
1035            self.log_namespace.insert_source_metadata(
1036                SplunkConfig::NAME,
1037                log,
1038                Some(LegacyKey::Overwrite(metadata_key)),
1039                &self.to_field.path.clone().unwrap_or(owned_value_path!("")),
1040                index.clone(),
1041            )
1042        }
1043    }
1044}
1045
1046/// For tracking origin of the timestamp
1047#[derive(Clone, Debug)]
1048enum Time {
1049    /// Backup
1050    Now(DateTime<Utc>),
1051    /// Provided in the request
1052    Provided(DateTime<Utc>),
1053}
1054
1055/// Creates event from raw request
1056#[allow(clippy::too_many_arguments)]
1057fn raw_event(
1058    bytes: Bytes,
1059    gzip: bool,
1060    channel: String,
1061    remote: Option<SocketAddr>,
1062    xff: Option<String>,
1063    batch: Option<BatchNotifier>,
1064    log_namespace: LogNamespace,
1065    events_received: &Registered<EventsReceived>,
1066) -> Result<Event, Rejection> {
1067    // Process gzip
1068    let message: Value = if gzip {
1069        let mut data = Vec::new();
1070        match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut data) {
1071            Ok(0) => return Err(ApiError::NoData.into()),
1072            Ok(_) => Value::from(Bytes::from(data)),
1073            Err(error) => {
1074                emit!(SplunkHecRequestBodyInvalidError { error });
1075                return Err(ApiError::InvalidDataFormat { event: 0 }.into());
1076            }
1077        }
1078    } else {
1079        bytes.into()
1080    };
1081
1082    // Construct event
1083    let mut log = match log_namespace {
1084        LogNamespace::Vector => LogEvent::from(message),
1085        LogNamespace::Legacy => {
1086            let mut log = LogEvent::default();
1087            log.maybe_insert(log_schema().message_key_target_path(), message);
1088            log
1089        }
1090    };
1091    // We need to calculate the estimated json size of the event BEFORE enrichment.
1092    events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1093
1094    // Add channel
1095    log_namespace.insert_source_metadata(
1096        SplunkConfig::NAME,
1097        &mut log,
1098        Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
1099        lookup::path!(CHANNEL),
1100        channel,
1101    );
1102
1103    // host-field priority for raw endpoint:
1104    // - x-forwarded-for is set to `host` field first, if present. If not present:
1105    // - set remote addr to host field
1106    let host = if let Some(remote_address) = xff {
1107        Some(remote_address)
1108    } else {
1109        remote.map(|remote| remote.to_string())
1110    };
1111
1112    if let Some(host) = host {
1113        log_namespace.insert_source_metadata(
1114            SplunkConfig::NAME,
1115            &mut log,
1116            log_schema().host_key().map(LegacyKey::InsertIfEmpty),
1117            lookup::path!("host"),
1118            host,
1119        );
1120    }
1121
1122    log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME, Utc::now());
1123
1124    if let Some(batch) = batch {
1125        log = log.with_batch_notifier(&batch);
1126    }
1127
1128    Ok(Event::from(log))
1129}
1130
1131#[derive(Clone, Copy, Debug, Snafu)]
1132pub(crate) enum ApiError {
1133    MissingAuthorization,
1134    InvalidAuthorization,
1135    UnsupportedEncoding,
1136    UnsupportedContentType,
1137    MissingChannel,
1138    NoData,
1139    InvalidDataFormat { event: usize },
1140    ServerShutdown,
1141    EmptyEventField { event: usize },
1142    MissingEventField { event: usize },
1143    BadRequest,
1144    ServiceUnavailable,
1145    AckIsDisabled,
1146}
1147
1148impl warp::reject::Reject for ApiError {}
1149
1150/// Cached bodies for common responses
1151mod splunk_response {
1152    use serde::Serialize;
1153
1154    // https://docs.splunk.com/Documentation/Splunk/8.2.3/Data/TroubleshootHTTPEventCollector#Possible_error_codes
1155    pub enum HecStatusCode {
1156        Success = 0,
1157        TokenIsRequired = 2,
1158        InvalidAuthorization = 3,
1159        NoData = 5,
1160        InvalidDataFormat = 6,
1161        ServerIsBusy = 9,
1162        DataChannelIsMissing = 10,
1163        EventFieldIsRequired = 12,
1164        EventFieldCannotBeBlank = 13,
1165        AckIsDisabled = 14,
1166    }
1167
1168    #[derive(Serialize)]
1169    pub enum HecResponseMetadata {
1170        #[serde(rename = "ackId")]
1171        AckId(u64),
1172        #[serde(rename = "invalid-event-number")]
1173        InvalidEventNumber(usize),
1174    }
1175
1176    #[derive(Serialize)]
1177    pub struct HecResponse {
1178        text: &'static str,
1179        code: u8,
1180        #[serde(skip_serializing_if = "Option::is_none", flatten)]
1181        pub metadata: Option<HecResponseMetadata>,
1182    }
1183
1184    impl HecResponse {
1185        pub const fn new(code: HecStatusCode) -> Self {
1186            let text = match code {
1187                HecStatusCode::Success => "Success",
1188                HecStatusCode::TokenIsRequired => "Token is required",
1189                HecStatusCode::InvalidAuthorization => "Invalid authorization",
1190                HecStatusCode::NoData => "No data",
1191                HecStatusCode::InvalidDataFormat => "Invalid data format",
1192                HecStatusCode::DataChannelIsMissing => "Data channel is missing",
1193                HecStatusCode::EventFieldIsRequired => "Event field is required",
1194                HecStatusCode::EventFieldCannotBeBlank => "Event field cannot be blank",
1195                HecStatusCode::ServerIsBusy => "Server is busy",
1196                HecStatusCode::AckIsDisabled => "Ack is disabled",
1197            };
1198
1199            Self {
1200                text,
1201                code: code as u8,
1202                metadata: None,
1203            }
1204        }
1205
1206        pub const fn with_metadata(mut self, metadata: HecResponseMetadata) -> Self {
1207            self.metadata = Some(metadata);
1208            self
1209        }
1210    }
1211
1212    pub const INVALID_AUTHORIZATION: HecResponse =
1213        HecResponse::new(HecStatusCode::InvalidAuthorization);
1214    pub const TOKEN_IS_REQUIRED: HecResponse = HecResponse::new(HecStatusCode::TokenIsRequired);
1215    pub const NO_DATA: HecResponse = HecResponse::new(HecStatusCode::NoData);
1216    pub const SUCCESS: HecResponse = HecResponse::new(HecStatusCode::Success);
1217    pub const SERVER_IS_BUSY: HecResponse = HecResponse::new(HecStatusCode::ServerIsBusy);
1218    pub const NO_CHANNEL: HecResponse = HecResponse::new(HecStatusCode::DataChannelIsMissing);
1219    pub const ACK_IS_DISABLED: HecResponse = HecResponse::new(HecStatusCode::AckIsDisabled);
1220}
1221
1222fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
1223    let body = if let Some(ack_id) = maybe_ack_id {
1224        HecResponse::new(HecStatusCode::Success).with_metadata(HecResponseMetadata::AckId(ack_id))
1225    } else {
1226        splunk_response::SUCCESS
1227    };
1228    response_json(StatusCode::OK, body)
1229}
1230
1231fn response_plain(code: StatusCode, msg: &'static str) -> Response {
1232    warp::reply::with_status(
1233        warp::reply::with_header(msg, http::header::CONTENT_TYPE, "text/plain; charset=utf-8"),
1234        code,
1235    )
1236    .into_response()
1237}
1238
1239async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
1240    if let Some(&error) = rejection.find::<ApiError>() {
1241        emit!(SplunkHecRequestError { error });
1242        Ok((match error {
1243            ApiError::MissingAuthorization => {
1244                response_json(StatusCode::UNAUTHORIZED, splunk_response::TOKEN_IS_REQUIRED)
1245            }
1246            ApiError::InvalidAuthorization => response_json(
1247                StatusCode::UNAUTHORIZED,
1248                splunk_response::INVALID_AUTHORIZATION,
1249            ),
1250            ApiError::UnsupportedEncoding => empty_response(StatusCode::UNSUPPORTED_MEDIA_TYPE),
1251            ApiError::UnsupportedContentType => response_plain(
1252                StatusCode::UNSUPPORTED_MEDIA_TYPE,
1253                "The request's content-type is not supported",
1254            ),
1255            ApiError::MissingChannel => {
1256                response_json(StatusCode::BAD_REQUEST, splunk_response::NO_CHANNEL)
1257            }
1258            ApiError::NoData => response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA),
1259            ApiError::ServerShutdown => empty_response(StatusCode::SERVICE_UNAVAILABLE),
1260            ApiError::InvalidDataFormat { event } => response_json(
1261                StatusCode::BAD_REQUEST,
1262                HecResponse::new(HecStatusCode::InvalidDataFormat)
1263                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1264            ),
1265            ApiError::EmptyEventField { event } => response_json(
1266                StatusCode::BAD_REQUEST,
1267                HecResponse::new(HecStatusCode::EventFieldCannotBeBlank)
1268                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1269            ),
1270            ApiError::MissingEventField { event } => response_json(
1271                StatusCode::BAD_REQUEST,
1272                HecResponse::new(HecStatusCode::EventFieldIsRequired)
1273                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1274            ),
1275            ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
1276            ApiError::ServiceUnavailable => response_json(
1277                StatusCode::SERVICE_UNAVAILABLE,
1278                splunk_response::SERVER_IS_BUSY,
1279            ),
1280            ApiError::AckIsDisabled => {
1281                response_json(StatusCode::BAD_REQUEST, splunk_response::ACK_IS_DISABLED)
1282            }
1283        },))
1284    } else {
1285        Err(rejection)
1286    }
1287}
1288
1289/// Response without body
1290fn empty_response(code: StatusCode) -> Response {
1291    let mut res = Response::default();
1292    *res.status_mut() = code;
1293    res
1294}
1295
1296/// Response with body
1297fn response_json(code: StatusCode, body: impl Serialize) -> Response {
1298    warp::reply::with_status(warp::reply::json(&body), code).into_response()
1299}
1300
1301#[cfg(feature = "sinks-splunk_hec")]
1302#[cfg(test)]
1303mod tests {
1304    use std::{net::SocketAddr, num::NonZeroU64};
1305
1306    use chrono::{TimeZone, Utc};
1307    use futures_util::Stream;
1308    use http::Uri;
1309    use reqwest::{RequestBuilder, Response};
1310    use serde::Deserialize;
1311    use vector_lib::{
1312        codecs::{
1313            BytesDecoderConfig, JsonSerializerConfig, TextSerializerConfig,
1314            decoding::DeserializerConfig,
1315        },
1316        event::EventStatus,
1317        schema::Definition,
1318        sensitive_string::SensitiveString,
1319    };
1320    use vrl::path::PathPrefix;
1321
1322    use super::*;
1323    use crate::{
1324        SourceSender,
1325        codecs::{DecodingConfig, EncodingConfig},
1326        components::validation::prelude::*,
1327        config::{SinkConfig, SinkContext, SourceConfig, SourceContext, log_schema},
1328        event::{Event, LogEvent},
1329        sinks::{
1330            Healthcheck, VectorSink,
1331            splunk_hec::logs::config::HecLogsSinkConfig,
1332            util::{BatchConfig, Compression, TowerRequestConfig},
1333        },
1334        sources::splunk_hec::acknowledgements::{HecAckStatusRequest, HecAckStatusResponse},
1335        test_util::{
1336            addr::{PortGuard, next_addr},
1337            collect_n,
1338            components::{
1339                COMPONENT_ERROR_TAGS, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance,
1340                assert_source_error,
1341            },
1342            wait_for_tcp,
1343        },
1344    };
1345
1346    #[test]
1347    fn generate_config() {
1348        crate::test_util::test_generate_config::<SplunkConfig>();
1349    }
1350
1351    /// Splunk token
1352    const TOKEN: &str = "token";
1353    const VALID_TOKENS: &[&str; 2] = &[TOKEN, "secondary-token"];
1354
1355    async fn source(
1356        acknowledgements: Option<HecAcknowledgementsConfig>,
1357    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
1358        source_with(Some(TOKEN.to_owned().into()), None, acknowledgements, false).await
1359    }
1360
1361    async fn source_with(
1362        token: Option<SensitiveString>,
1363        valid_tokens: Option<&[&str]>,
1364        acknowledgements: Option<HecAcknowledgementsConfig>,
1365        store_hec_token: bool,
1366    ) -> (
1367        impl Stream<Item = Event> + Unpin + use<>,
1368        SocketAddr,
1369        PortGuard,
1370    ) {
1371        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1372        let (_guard, address) = next_addr();
1373        let valid_tokens =
1374            valid_tokens.map(|tokens| tokens.iter().map(|v| v.to_string().into()).collect());
1375        let cx = SourceContext::new_test(sender, None);
1376        tokio::spawn(async move {
1377            SplunkConfig {
1378                address,
1379                token,
1380                valid_tokens,
1381                tls: None,
1382                acknowledgements: acknowledgements.unwrap_or_default(),
1383                store_hec_token,
1384                log_namespace: None,
1385                keepalive: Default::default(),
1386            }
1387            .build(cx)
1388            .await
1389            .unwrap()
1390            .await
1391            .unwrap()
1392        });
1393        wait_for_tcp(address).await;
1394        (recv, address, _guard)
1395    }
1396
1397    async fn sink(
1398        address: SocketAddr,
1399        encoding: EncodingConfig,
1400        compression: Compression,
1401    ) -> (VectorSink, Healthcheck) {
1402        HecLogsSinkConfig {
1403            default_token: TOKEN.to_owned().into(),
1404            endpoint: format!("http://{address}"),
1405            host_key: None,
1406            indexed_fields: vec![],
1407            index: None,
1408            sourcetype: None,
1409            source: None,
1410            encoding,
1411            compression,
1412            batch: BatchConfig::default(),
1413            request: TowerRequestConfig::default(),
1414            tls: None,
1415            acknowledgements: Default::default(),
1416            timestamp_nanos_key: None,
1417            timestamp_key: None,
1418            auto_extract_timestamp: None,
1419            endpoint_target: Default::default(),
1420        }
1421        .build(SinkContext::default())
1422        .await
1423        .unwrap()
1424    }
1425
1426    async fn start(
1427        encoding: EncodingConfig,
1428        compression: Compression,
1429        acknowledgements: Option<HecAcknowledgementsConfig>,
1430    ) -> (VectorSink, impl Stream<Item = Event> + Unpin) {
1431        let (source, address, _guard) = source(acknowledgements).await;
1432        let (sink, health) = sink(address, encoding, compression).await;
1433        assert!(health.await.is_ok());
1434        (sink, source)
1435    }
1436
1437    async fn channel_n(
1438        messages: Vec<impl Into<String> + Send + 'static>,
1439        sink: VectorSink,
1440        source: impl Stream<Item = Event> + Unpin,
1441    ) -> Vec<Event> {
1442        let n = messages.len();
1443
1444        tokio::spawn(async move {
1445            sink.run_events(
1446                messages
1447                    .into_iter()
1448                    .map(|s| Event::Log(LogEvent::from(s.into()))),
1449            )
1450            .await
1451            .unwrap();
1452        });
1453
1454        let events = collect_n(source, n).await;
1455        assert_eq!(n, events.len());
1456
1457        events
1458    }
1459
1460    #[derive(Clone, Copy, Debug)]
1461    enum Channel<'a> {
1462        Header(&'a str),
1463        QueryParam(&'a str),
1464    }
1465
1466    #[derive(Default)]
1467    struct SendWithOpts<'a> {
1468        channel: Option<Channel<'a>>,
1469        forwarded_for: Option<String>,
1470    }
1471
1472    async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
1473        let channel = Channel::Header("channel");
1474        let options = SendWithOpts {
1475            channel: Some(channel),
1476            forwarded_for: None,
1477        };
1478        send_with(address, api, message, TOKEN, &options).await
1479    }
1480
1481    fn build_request(
1482        address: SocketAddr,
1483        api: &str,
1484        message: &str,
1485        token: &str,
1486        opts: &SendWithOpts<'_>,
1487    ) -> RequestBuilder {
1488        let mut b = reqwest::Client::new()
1489            .post(format!("http://{address}/{api}"))
1490            .header("Authorization", format!("Splunk {token}"));
1491
1492        b = match opts.channel {
1493            Some(c) => match c {
1494                Channel::Header(v) => b.header("x-splunk-request-channel", v),
1495                Channel::QueryParam(v) => b.query(&[("channel", v)]),
1496            },
1497            None => b,
1498        };
1499
1500        b = match &opts.forwarded_for {
1501            Some(f) => b.header("X-Forwarded-For", f),
1502            None => b,
1503        };
1504
1505        b.body(message.to_owned())
1506    }
1507
1508    async fn send_with(
1509        address: SocketAddr,
1510        api: &str,
1511        message: &str,
1512        token: &str,
1513        opts: &SendWithOpts<'_>,
1514    ) -> u16 {
1515        let b = build_request(address, api, message, token, opts);
1516        b.send().await.unwrap().status().as_u16()
1517    }
1518
1519    async fn send_with_response(
1520        address: SocketAddr,
1521        api: &str,
1522        message: &str,
1523        token: &str,
1524        opts: &SendWithOpts<'_>,
1525    ) -> Response {
1526        let b = build_request(address, api, message, token, opts);
1527        b.send().await.unwrap()
1528    }
1529
1530    #[tokio::test]
1531    async fn no_compression_text_event() {
1532        let message = "gzip_text_event";
1533        let (sink, source) = start(
1534            TextSerializerConfig::default().into(),
1535            Compression::None,
1536            None,
1537        )
1538        .await;
1539
1540        let event = channel_n(vec![message], sink, source).await.remove(0);
1541
1542        assert_eq!(
1543            event.as_log()[log_schema().message_key().unwrap().to_string()],
1544            message.into()
1545        );
1546        assert!(event.as_log().get_timestamp().is_some());
1547        assert_eq!(
1548            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1549            "splunk_hec".into()
1550        );
1551        assert!(event.metadata().splunk_hec_token().is_none());
1552    }
1553
1554    #[tokio::test]
1555    async fn one_simple_text_event() {
1556        let message = "one_simple_text_event";
1557        let (sink, source) = start(
1558            TextSerializerConfig::default().into(),
1559            Compression::gzip_default(),
1560            None,
1561        )
1562        .await;
1563
1564        let event = channel_n(vec![message], sink, source).await.remove(0);
1565
1566        assert_eq!(
1567            event.as_log()[log_schema().message_key().unwrap().to_string()],
1568            message.into()
1569        );
1570        assert!(event.as_log().get_timestamp().is_some());
1571        assert_eq!(
1572            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1573            "splunk_hec".into()
1574        );
1575        assert!(event.metadata().splunk_hec_token().is_none());
1576    }
1577
1578    #[tokio::test]
1579    async fn multiple_simple_text_event() {
1580        let n = 200;
1581        let (sink, source) = start(
1582            TextSerializerConfig::default().into(),
1583            Compression::None,
1584            None,
1585        )
1586        .await;
1587
1588        let messages = (0..n)
1589            .map(|i| format!("multiple_simple_text_event_{i}"))
1590            .collect::<Vec<_>>();
1591        let events = channel_n(messages.clone(), sink, source).await;
1592
1593        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1594            assert_eq!(
1595                event.as_log()[log_schema().message_key().unwrap().to_string()],
1596                msg.into()
1597            );
1598            assert!(event.as_log().get_timestamp().is_some());
1599            assert_eq!(
1600                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1601                "splunk_hec".into()
1602            );
1603            assert!(event.metadata().splunk_hec_token().is_none());
1604        }
1605    }
1606
1607    #[tokio::test]
1608    async fn one_simple_json_event() {
1609        let message = "one_simple_json_event";
1610        let (sink, source) = start(
1611            JsonSerializerConfig::default().into(),
1612            Compression::gzip_default(),
1613            None,
1614        )
1615        .await;
1616
1617        let event = channel_n(vec![message], sink, source).await.remove(0);
1618
1619        assert_eq!(
1620            event.as_log()[log_schema().message_key().unwrap().to_string()],
1621            message.into()
1622        );
1623        assert!(event.as_log().get_timestamp().is_some());
1624        assert_eq!(
1625            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1626            "splunk_hec".into()
1627        );
1628        assert!(event.metadata().splunk_hec_token().is_none());
1629    }
1630
1631    #[tokio::test]
1632    async fn multiple_simple_json_event() {
1633        let n = 200;
1634        let (sink, source) = start(
1635            JsonSerializerConfig::default().into(),
1636            Compression::gzip_default(),
1637            None,
1638        )
1639        .await;
1640
1641        let messages = (0..n)
1642            .map(|i| format!("multiple_simple_json_event{i}"))
1643            .collect::<Vec<_>>();
1644        let events = channel_n(messages.clone(), sink, source).await;
1645
1646        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1647            assert_eq!(
1648                event.as_log()[log_schema().message_key().unwrap().to_string()],
1649                msg.into()
1650            );
1651            assert!(event.as_log().get_timestamp().is_some());
1652            assert_eq!(
1653                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1654                "splunk_hec".into()
1655            );
1656            assert!(event.metadata().splunk_hec_token().is_none());
1657        }
1658    }
1659
1660    #[tokio::test]
1661    async fn json_event() {
1662        let (sink, source) = start(
1663            JsonSerializerConfig::default().into(),
1664            Compression::gzip_default(),
1665            None,
1666        )
1667        .await;
1668
1669        let mut log = LogEvent::default();
1670        log.insert("greeting", "hello");
1671        log.insert("name", "bob");
1672        sink.run_events(vec![log.into()]).await.unwrap();
1673
1674        let event = collect_n(source, 1).await.remove(0).into_log();
1675        assert_eq!(event["greeting"], "hello".into());
1676        assert_eq!(event["name"], "bob".into());
1677        assert!(event.get_timestamp().is_some());
1678        assert_eq!(
1679            event[log_schema().source_type_key().unwrap().to_string()],
1680            "splunk_hec".into()
1681        );
1682        assert!(event.metadata().splunk_hec_token().is_none());
1683    }
1684
1685    #[tokio::test]
1686    async fn json_invalid_path_event() {
1687        let (sink, source) = start(
1688            JsonSerializerConfig::default().into(),
1689            Compression::gzip_default(),
1690            None,
1691        )
1692        .await;
1693
1694        let mut log = LogEvent::default();
1695        // Test with a field that would be considered an invalid path if it were to
1696        // be treated as a path and not a simple field name.
1697        log.insert(event_path!("(greeting | thing"), "hello");
1698        sink.run_events(vec![log.into()]).await.unwrap();
1699
1700        let event = collect_n(source, 1).await.remove(0).into_log();
1701        assert_eq!(
1702            event.get(event_path!("(greeting | thing")),
1703            Some(&Value::from("hello"))
1704        );
1705    }
1706
1707    #[tokio::test]
1708    async fn line_to_message() {
1709        let (sink, source) = start(
1710            JsonSerializerConfig::default().into(),
1711            Compression::gzip_default(),
1712            None,
1713        )
1714        .await;
1715
1716        let mut event = LogEvent::default();
1717        event.insert("line", "hello");
1718        sink.run_events(vec![event.into()]).await.unwrap();
1719
1720        let event = collect_n(source, 1).await.remove(0);
1721        assert_eq!(
1722            event.as_log()[log_schema().message_key().unwrap().to_string()],
1723            "hello".into()
1724        );
1725        assert!(event.metadata().splunk_hec_token().is_none());
1726    }
1727
1728    #[tokio::test]
1729    async fn raw() {
1730        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1731            let message = "raw";
1732            let (source, address, _guard) = source(None).await;
1733
1734            assert_eq!(200, post(address, "services/collector/raw", message).await);
1735
1736            let event = collect_n(source, 1).await.remove(0);
1737            assert_eq!(
1738                event.as_log()[log_schema().message_key().unwrap().to_string()],
1739                message.into()
1740            );
1741            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1742            assert!(event.as_log().get_timestamp().is_some());
1743            assert_eq!(
1744                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1745                "splunk_hec".into()
1746            );
1747            assert!(event.metadata().splunk_hec_token().is_none());
1748        })
1749        .await;
1750    }
1751
1752    #[tokio::test]
1753    async fn root() {
1754        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1755            let message = r#"{ "event": { "message": "root"} }"#;
1756            let (source, address, _guard) = source(None).await;
1757
1758            assert_eq!(200, post(address, "services/collector", message).await);
1759
1760            let event = collect_n(source, 1).await.remove(0);
1761            assert_eq!(
1762                event.as_log()[log_schema().message_key().unwrap().to_string()],
1763                "root".into()
1764            );
1765            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1766            assert!(event.as_log().get_timestamp().is_some());
1767            assert_eq!(
1768                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1769                "splunk_hec".into()
1770            );
1771            assert!(event.metadata().splunk_hec_token().is_none());
1772        })
1773        .await;
1774    }
1775
1776    #[tokio::test]
1777    async fn channel_header() {
1778        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1779            let message = "raw";
1780            let (source, address, _guard) = source(None).await;
1781
1782            let opts = SendWithOpts {
1783                channel: Some(Channel::Header("guid")),
1784                forwarded_for: None,
1785            };
1786
1787            assert_eq!(
1788                200,
1789                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1790            );
1791
1792            let event = collect_n(source, 1).await.remove(0);
1793            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1794        })
1795        .await;
1796    }
1797
1798    #[tokio::test]
1799    async fn xff_header_raw() {
1800        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1801            let message = "raw";
1802            let (source, address, _guard) = source(None).await;
1803
1804            let opts = SendWithOpts {
1805                channel: Some(Channel::Header("guid")),
1806                forwarded_for: Some(String::from("10.0.0.1")),
1807            };
1808
1809            assert_eq!(
1810                200,
1811                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1812            );
1813
1814            let event = collect_n(source, 1).await.remove(0);
1815            assert_eq!(
1816                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1817                "10.0.0.1".into()
1818            );
1819        })
1820        .await;
1821    }
1822
1823    // Test helps to illustrate that a payload's `host` value should override an x-forwarded-for header
1824    #[tokio::test]
1825    async fn xff_header_event_with_host_field() {
1826        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1827            let message = r#"{"event":"first", "host": "10.1.0.2"}"#;
1828            let (source, address, _guard) = source(None).await;
1829
1830            let opts = SendWithOpts {
1831                channel: Some(Channel::Header("guid")),
1832                forwarded_for: Some(String::from("10.0.0.1")),
1833            };
1834
1835            assert_eq!(
1836                200,
1837                send_with(address, "services/collector/event", message, TOKEN, &opts).await
1838            );
1839
1840            let event = collect_n(source, 1).await.remove(0);
1841            assert_eq!(
1842                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1843                "10.1.0.2".into()
1844            );
1845        })
1846        .await;
1847    }
1848
1849    // Test helps to illustrate that a payload's `host` value should override an x-forwarded-for header
1850    #[tokio::test]
1851    async fn xff_header_event_without_host_field() {
1852        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1853            let message = r#"{"event":"first", "color": "blue"}"#;
1854            let (source, address, _guard) = source(None).await;
1855
1856            let opts = SendWithOpts {
1857                channel: Some(Channel::Header("guid")),
1858                forwarded_for: Some(String::from("10.0.0.1")),
1859            };
1860
1861            assert_eq!(
1862                200,
1863                send_with(address, "services/collector/event", message, TOKEN, &opts).await
1864            );
1865
1866            let event = collect_n(source, 1).await.remove(0);
1867            assert_eq!(
1868                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1869                "10.0.0.1".into()
1870            );
1871        })
1872        .await;
1873    }
1874
1875    #[tokio::test]
1876    async fn channel_query_param() {
1877        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1878            let message = "raw";
1879            let (source, address, _guard) = source(None).await;
1880
1881            let opts = SendWithOpts {
1882                channel: Some(Channel::QueryParam("guid")),
1883                forwarded_for: None,
1884            };
1885
1886            assert_eq!(
1887                200,
1888                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1889            );
1890
1891            let event = collect_n(source, 1).await.remove(0);
1892            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1893        })
1894        .await;
1895    }
1896
1897    #[tokio::test]
1898    async fn no_data() {
1899        let (_source, address, _guard) = source(None).await;
1900
1901        assert_eq!(400, post(address, "services/collector/event", "").await);
1902    }
1903
1904    #[tokio::test]
1905    async fn invalid_token() {
1906        assert_source_error(&COMPONENT_ERROR_TAGS, async {
1907            let (_source, address, _guard) = source(None).await;
1908            let opts = SendWithOpts {
1909                channel: Some(Channel::Header("channel")),
1910                forwarded_for: None,
1911            };
1912
1913            assert_eq!(
1914                401,
1915                send_with(address, "services/collector/event", "", "nope", &opts).await
1916            );
1917        })
1918        .await;
1919    }
1920
1921    #[tokio::test]
1922    async fn health_ignores_token() {
1923        let (_source, address, _guard) = source(None).await;
1924
1925        let res = reqwest::Client::new()
1926            .get(format!("http://{address}/services/collector/health"))
1927            .header("Authorization", format!("Splunk {}", "invalid token"))
1928            .send()
1929            .await
1930            .unwrap();
1931
1932        assert_eq!(200, res.status().as_u16());
1933    }
1934
1935    #[tokio::test]
1936    async fn health() {
1937        let (_source, address, _guard) = source(None).await;
1938
1939        let res = reqwest::Client::new()
1940            .get(format!("http://{address}/services/collector/health"))
1941            .send()
1942            .await
1943            .unwrap();
1944
1945        assert_eq!(200, res.status().as_u16());
1946    }
1947
1948    #[tokio::test]
1949    async fn secondary_token() {
1950        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1951            let message = r#"{"event":"first", "color": "blue"}"#;
1952            let (_source, address, _guard) =
1953                source_with(None, Some(VALID_TOKENS), None, false).await;
1954            let options = SendWithOpts {
1955                channel: None,
1956                forwarded_for: None,
1957            };
1958
1959            assert_eq!(
1960                200,
1961                send_with(
1962                    address,
1963                    "services/collector/event",
1964                    message,
1965                    VALID_TOKENS.get(1).unwrap(),
1966                    &options
1967                )
1968                .await
1969            );
1970        })
1971        .await;
1972    }
1973
1974    #[tokio::test]
1975    async fn event_service_token_passthrough_enabled() {
1976        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1977            let message = "passthrough_token_enabled";
1978            let (source, address, _guard) = source_with(None, Some(VALID_TOKENS), None, true).await;
1979            let (sink, health) = sink(
1980                address,
1981                TextSerializerConfig::default().into(),
1982                Compression::gzip_default(),
1983            )
1984            .await;
1985            assert!(health.await.is_ok());
1986
1987            let event = channel_n(vec![message], sink, source).await.remove(0);
1988
1989            assert_eq!(
1990                event.as_log()[log_schema().message_key().unwrap().to_string()],
1991                message.into()
1992            );
1993            assert_eq!(
1994                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
1995                TOKEN
1996            );
1997        })
1998        .await;
1999    }
2000
2001    #[tokio::test]
2002    async fn raw_service_token_passthrough_enabled() {
2003        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2004            let message = "raw";
2005            let (source, address, _guard) = source_with(None, Some(VALID_TOKENS), None, true).await;
2006
2007            assert_eq!(200, post(address, "services/collector/raw", message).await);
2008
2009            let event = collect_n(source, 1).await.remove(0);
2010            assert_eq!(
2011                event.as_log()[log_schema().message_key().unwrap().to_string()],
2012                message.into()
2013            );
2014            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2015            assert!(event.as_log().get_timestamp().is_some());
2016            assert_eq!(
2017                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2018                "splunk_hec".into()
2019            );
2020            assert_eq!(
2021                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2022                TOKEN
2023            );
2024        })
2025        .await;
2026    }
2027
2028    #[tokio::test]
2029    async fn no_authorization() {
2030        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2031            let message = "no_authorization";
2032            let (source, address, _guard) = source_with(None, None, None, false).await;
2033            let (sink, health) = sink(
2034                address,
2035                TextSerializerConfig::default().into(),
2036                Compression::gzip_default(),
2037            )
2038            .await;
2039            assert!(health.await.is_ok());
2040
2041            let event = channel_n(vec![message], sink, source).await.remove(0);
2042
2043            assert_eq!(
2044                event.as_log()[log_schema().message_key().unwrap().to_string()],
2045                message.into()
2046            );
2047            assert!(event.metadata().splunk_hec_token().is_none());
2048        })
2049        .await;
2050    }
2051
2052    #[tokio::test]
2053    async fn no_authorization_token_passthrough_enabled() {
2054        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2055            let message = "no_authorization";
2056            let (source, address, _guard) = source_with(None, None, None, true).await;
2057            let (sink, health) = sink(
2058                address,
2059                TextSerializerConfig::default().into(),
2060                Compression::gzip_default(),
2061            )
2062            .await;
2063            assert!(health.await.is_ok());
2064
2065            let event = channel_n(vec![message], sink, source).await.remove(0);
2066
2067            assert_eq!(
2068                event.as_log()[log_schema().message_key().unwrap().to_string()],
2069                message.into()
2070            );
2071            assert_eq!(
2072                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2073                TOKEN
2074            );
2075        })
2076        .await;
2077    }
2078
2079    #[tokio::test]
2080    async fn partial() {
2081        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2082            let message = r#"{"event":"first"}{"event":"second""#;
2083            let (source, address, _guard) = source(None).await;
2084
2085            assert_eq!(
2086                400,
2087                post(address, "services/collector/event", message).await
2088            );
2089
2090            let event = collect_n(source, 1).await.remove(0);
2091            assert_eq!(
2092                event.as_log()[log_schema().message_key().unwrap().to_string()],
2093                "first".into()
2094            );
2095            assert!(event.as_log().get_timestamp().is_some());
2096            assert_eq!(
2097                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2098                "splunk_hec".into()
2099            );
2100        })
2101        .await;
2102    }
2103
2104    #[tokio::test]
2105    async fn handles_newlines() {
2106        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2107            let message = r#"
2108{"event":"first"}
2109        "#;
2110            let (source, address, _guard) = source(None).await;
2111
2112            assert_eq!(
2113                200,
2114                post(address, "services/collector/event", message).await
2115            );
2116
2117            let event = collect_n(source, 1).await.remove(0);
2118            assert_eq!(
2119                event.as_log()[log_schema().message_key().unwrap().to_string()],
2120                "first".into()
2121            );
2122            assert!(event.as_log().get_timestamp().is_some());
2123            assert_eq!(
2124                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2125                "splunk_hec".into()
2126            );
2127        })
2128        .await;
2129    }
2130
2131    #[tokio::test]
2132    async fn handles_spaces() {
2133        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2134            let message = r#" {"event":"first"} "#;
2135            let (source, address, _guard) = source(None).await;
2136
2137            assert_eq!(
2138                200,
2139                post(address, "services/collector/event", message).await
2140            );
2141
2142            let event = collect_n(source, 1).await.remove(0);
2143            assert_eq!(
2144                event.as_log()[log_schema().message_key().unwrap().to_string()],
2145                "first".into()
2146            );
2147            assert!(event.as_log().get_timestamp().is_some());
2148            assert_eq!(
2149                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2150                "splunk_hec".into()
2151            );
2152        })
2153        .await;
2154    }
2155
2156    #[tokio::test]
2157    async fn handles_non_utf8() {
2158        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2159        let message = b" {\"event\": { \"non\": \"A non UTF8 character \xE4\", \"number\": 2, \"bool\": true } } ";
2160        let (source, address, _guard) = source(None).await;
2161
2162        let b = reqwest::Client::new()
2163            .post(format!(
2164                "http://{}/{}",
2165                address, "services/collector/event"
2166            ))
2167            .header("Authorization", format!("Splunk {TOKEN}"))
2168            .body::<&[u8]>(message);
2169
2170        assert_eq!(200, b.send().await.unwrap().status().as_u16());
2171
2172        let event = collect_n(source, 1).await.remove(0);
2173        assert_eq!(event.as_log()["non"], "A non UTF8 character �".into());
2174        assert_eq!(event.as_log()["number"], 2.into());
2175        assert_eq!(event.as_log()["bool"], true.into());
2176        assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some());
2177        assert_eq!(
2178            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2179            "splunk_hec".into()
2180        );
2181    }).await;
2182    }
2183
2184    #[tokio::test]
2185    async fn default() {
2186        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2187        let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
2188        let (source, address, _guard) = source(None).await;
2189
2190        assert_eq!(
2191            200,
2192            post(address, "services/collector/event", message).await
2193        );
2194
2195        let events = collect_n(source, 3).await;
2196
2197        assert_eq!(
2198            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
2199            "first".into()
2200        );
2201        assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
2202
2203        assert_eq!(
2204            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
2205            "second".into()
2206        );
2207        assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
2208
2209        assert_eq!(
2210            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
2211            "third".into()
2212        );
2213        assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
2214    }).await;
2215    }
2216
2217    #[test]
2218    fn parse_timestamps() {
2219        let cases = vec![
2220            Utc::now(),
2221            Utc.with_ymd_and_hms(1971, 11, 7, 1, 1, 1)
2222                .single()
2223                .expect("invalid timestamp"),
2224            Utc.with_ymd_and_hms(2011, 8, 5, 1, 1, 1)
2225                .single()
2226                .expect("invalid timestamp"),
2227            Utc.with_ymd_and_hms(2189, 11, 4, 2, 2, 2)
2228                .single()
2229                .expect("invalid timestamp"),
2230        ];
2231
2232        for case in cases {
2233            let sec = case.timestamp();
2234            let millis = case.timestamp_millis();
2235            let nano = case.timestamp_nanos_opt().expect("Timestamp out of range");
2236
2237            assert_eq!(parse_timestamp(sec).unwrap().timestamp(), case.timestamp());
2238            assert_eq!(
2239                parse_timestamp(millis).unwrap().timestamp_millis(),
2240                case.timestamp_millis()
2241            );
2242            assert_eq!(
2243                parse_timestamp(nano)
2244                    .unwrap()
2245                    .timestamp_nanos_opt()
2246                    .unwrap(),
2247                case.timestamp_nanos_opt().expect("Timestamp out of range")
2248            );
2249        }
2250
2251        assert!(parse_timestamp(-1).is_none());
2252    }
2253
2254    /// This test will fail once `warp` crate fixes support for
2255    /// custom connection listener, at that point this test can be
2256    /// modified to pass.
2257    /// https://github.com/vectordotdev/vector/issues/7097
2258    /// https://github.com/seanmonstar/warp/issues/830
2259    /// https://github.com/seanmonstar/warp/pull/713
2260    #[tokio::test]
2261    async fn host_test() {
2262        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2263            let message = "for the host";
2264            let (sink, source) = start(
2265                TextSerializerConfig::default().into(),
2266                Compression::gzip_default(),
2267                None,
2268            )
2269            .await;
2270
2271            let event = channel_n(vec![message], sink, source).await.remove(0);
2272
2273            assert_eq!(
2274                event.as_log()[log_schema().message_key().unwrap().to_string()],
2275                message.into()
2276            );
2277            assert!(
2278                event
2279                    .as_log()
2280                    .get((PathPrefix::Event, log_schema().host_key().unwrap()))
2281                    .is_none()
2282            );
2283        })
2284        .await;
2285    }
2286
2287    #[derive(Deserialize)]
2288    struct HecAckEventResponse {
2289        text: String,
2290        code: u8,
2291        #[serde(rename = "ackId")]
2292        ack_id: u64,
2293    }
2294
2295    #[tokio::test]
2296    async fn ack_json_event() {
2297        let ack_config = HecAcknowledgementsConfig {
2298            enabled: Some(true),
2299            ..Default::default()
2300        };
2301        let (source, address, _guard) = source(Some(ack_config)).await;
2302        let event_message = r#"{"event":"first", "color": "blue"}{"event":"second"}"#;
2303        let opts = SendWithOpts {
2304            channel: Some(Channel::Header("guid")),
2305            forwarded_for: None,
2306        };
2307        let event_res = send_with_response(
2308            address,
2309            "services/collector/event",
2310            event_message,
2311            TOKEN,
2312            &opts,
2313        )
2314        .await
2315        .json::<HecAckEventResponse>()
2316        .await
2317        .unwrap();
2318        assert_eq!("Success", event_res.text.as_str());
2319        assert_eq!(0, event_res.code);
2320        _ = collect_n(source, 1).await;
2321
2322        let ack_message = serde_json::to_string(&HecAckStatusRequest {
2323            acks: vec![event_res.ack_id],
2324        })
2325        .unwrap();
2326        let ack_res = send_with_response(
2327            address,
2328            "services/collector/ack",
2329            ack_message.as_str(),
2330            TOKEN,
2331            &opts,
2332        )
2333        .await
2334        .json::<HecAckStatusResponse>()
2335        .await
2336        .unwrap();
2337        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2338    }
2339
2340    #[tokio::test]
2341    async fn ack_raw_event() {
2342        let ack_config = HecAcknowledgementsConfig {
2343            enabled: Some(true),
2344            ..Default::default()
2345        };
2346        let (source, address, _guard) = source(Some(ack_config)).await;
2347        let event_message = "raw event message";
2348        let opts = SendWithOpts {
2349            channel: Some(Channel::Header("guid")),
2350            forwarded_for: None,
2351        };
2352        let event_res = send_with_response(
2353            address,
2354            "services/collector/raw",
2355            event_message,
2356            TOKEN,
2357            &opts,
2358        )
2359        .await
2360        .json::<HecAckEventResponse>()
2361        .await
2362        .unwrap();
2363        assert_eq!("Success", event_res.text.as_str());
2364        assert_eq!(0, event_res.code);
2365        _ = collect_n(source, 1).await;
2366
2367        let ack_message = serde_json::to_string(&HecAckStatusRequest {
2368            acks: vec![event_res.ack_id],
2369        })
2370        .unwrap();
2371        let ack_res = send_with_response(
2372            address,
2373            "services/collector/ack",
2374            ack_message.as_str(),
2375            TOKEN,
2376            &opts,
2377        )
2378        .await
2379        .json::<HecAckStatusResponse>()
2380        .await
2381        .unwrap();
2382        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2383    }
2384
2385    #[tokio::test]
2386    async fn ack_repeat_ack_query() {
2387        let ack_config = HecAcknowledgementsConfig {
2388            enabled: Some(true),
2389            ..Default::default()
2390        };
2391        let (source, address, _guard) = source(Some(ack_config)).await;
2392        let event_message = "raw event message";
2393        let opts = SendWithOpts {
2394            channel: Some(Channel::Header("guid")),
2395            forwarded_for: None,
2396        };
2397        let event_res = send_with_response(
2398            address,
2399            "services/collector/raw",
2400            event_message,
2401            TOKEN,
2402            &opts,
2403        )
2404        .await
2405        .json::<HecAckEventResponse>()
2406        .await
2407        .unwrap();
2408        _ = collect_n(source, 1).await;
2409
2410        let ack_message = serde_json::to_string(&HecAckStatusRequest {
2411            acks: vec![event_res.ack_id],
2412        })
2413        .unwrap();
2414        let ack_res = send_with_response(
2415            address,
2416            "services/collector/ack",
2417            ack_message.as_str(),
2418            TOKEN,
2419            &opts,
2420        )
2421        .await
2422        .json::<HecAckStatusResponse>()
2423        .await
2424        .unwrap();
2425        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2426
2427        let ack_res = send_with_response(
2428            address,
2429            "services/collector/ack",
2430            ack_message.as_str(),
2431            TOKEN,
2432            &opts,
2433        )
2434        .await
2435        .json::<HecAckStatusResponse>()
2436        .await
2437        .unwrap();
2438        assert!(!ack_res.acks.get(&event_res.ack_id).unwrap());
2439    }
2440
2441    #[tokio::test]
2442    async fn ack_exceed_max_number_of_ack_channels() {
2443        let ack_config = HecAcknowledgementsConfig {
2444            enabled: Some(true),
2445            max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
2446            ..Default::default()
2447        };
2448
2449        let (_source, address, _guard) = source(Some(ack_config)).await;
2450        let mut opts = SendWithOpts {
2451            channel: Some(Channel::Header("guid")),
2452            forwarded_for: None,
2453        };
2454        assert_eq!(
2455            200,
2456            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2457        );
2458
2459        opts.channel = Some(Channel::Header("other-guid"));
2460        assert_eq!(
2461            503,
2462            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2463        );
2464        assert_eq!(
2465            503,
2466            send_with(
2467                address,
2468                "services/collector/event",
2469                r#"{"event":"first"}"#,
2470                TOKEN,
2471                &opts
2472            )
2473            .await
2474        );
2475    }
2476
2477    #[tokio::test]
2478    async fn ack_exceed_max_pending_acks_per_channel() {
2479        let ack_config = HecAcknowledgementsConfig {
2480            enabled: Some(true),
2481            max_pending_acks_per_channel: NonZeroU64::new(1).unwrap(),
2482            ..Default::default()
2483        };
2484
2485        let (source, address, _guard) = source(Some(ack_config)).await;
2486        let opts = SendWithOpts {
2487            channel: Some(Channel::Header("guid")),
2488            forwarded_for: None,
2489        };
2490        for _ in 0..5 {
2491            send_with(
2492                address,
2493                "services/collector/event",
2494                r#"{"event":"first"}"#,
2495                TOKEN,
2496                &opts,
2497            )
2498            .await;
2499        }
2500        for _ in 0..5 {
2501            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await;
2502        }
2503        let event_res = send_with_response(
2504            address,
2505            "services/collector/event",
2506            r#"{"event":"this will be acked"}"#,
2507            TOKEN,
2508            &opts,
2509        )
2510        .await
2511        .json::<HecAckEventResponse>()
2512        .await
2513        .unwrap();
2514        _ = collect_n(source, 11).await;
2515
2516        let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest {
2517            acks: (0..10).collect::<Vec<u64>>(),
2518        })
2519        .unwrap();
2520        let ack_res = send_with_response(
2521            address,
2522            "services/collector/ack",
2523            ack_message_dropped.as_str(),
2524            TOKEN,
2525            &opts,
2526        )
2527        .await
2528        .json::<HecAckStatusResponse>()
2529        .await
2530        .unwrap();
2531        assert!(ack_res.acks.values().all(|ack_status| !*ack_status));
2532
2533        let ack_message_acked = serde_json::to_string(&HecAckStatusRequest {
2534            acks: vec![event_res.ack_id],
2535        })
2536        .unwrap();
2537        let ack_res = send_with_response(
2538            address,
2539            "services/collector/ack",
2540            ack_message_acked.as_str(),
2541            TOKEN,
2542            &opts,
2543        )
2544        .await
2545        .json::<HecAckStatusResponse>()
2546        .await
2547        .unwrap();
2548        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2549    }
2550
2551    #[tokio::test]
2552    async fn ack_service_accepts_parameterized_content_type() {
2553        let ack_config = HecAcknowledgementsConfig {
2554            enabled: Some(true),
2555            ..Default::default()
2556        };
2557        let (source, address, _guard) = source(Some(ack_config)).await;
2558        let opts = SendWithOpts {
2559            channel: Some(Channel::Header("guid")),
2560            forwarded_for: None,
2561        };
2562
2563        let event_res = send_with_response(
2564            address,
2565            "services/collector/event",
2566            r#"{"event":"param-test"}"#,
2567            TOKEN,
2568            &opts,
2569        )
2570        .await
2571        .json::<HecAckEventResponse>()
2572        .await
2573        .unwrap();
2574        let _ = collect_n(source, 1).await;
2575
2576        let body = serde_json::to_string(&HecAckStatusRequest {
2577            acks: vec![event_res.ack_id],
2578        })
2579        .unwrap();
2580
2581        let res = reqwest::Client::new()
2582            .post(format!("http://{address}/services/collector/ack"))
2583            .header("Authorization", format!("Splunk {TOKEN}"))
2584            .header("x-splunk-request-channel", "guid")
2585            .header("Content-Type", "application/json; some-random-text; hello")
2586            .body(body)
2587            .send()
2588            .await
2589            .unwrap();
2590
2591        assert_eq!(200, res.status().as_u16());
2592
2593        let _parsed: HecAckStatusResponse = res.json().await.unwrap();
2594    }
2595
2596    #[tokio::test]
2597    async fn event_service_acknowledgements_enabled_channel_required() {
2598        let message = r#"{"event":"first", "color": "blue"}"#;
2599        let ack_config = HecAcknowledgementsConfig {
2600            enabled: Some(true),
2601            ..Default::default()
2602        };
2603        let (_, address, _guard) = source(Some(ack_config)).await;
2604
2605        let opts = SendWithOpts {
2606            channel: None,
2607            forwarded_for: None,
2608        };
2609
2610        assert_eq!(
2611            400,
2612            send_with(address, "services/collector/event", message, TOKEN, &opts).await
2613        );
2614    }
2615
2616    #[tokio::test]
2617    async fn ack_service_acknowledgements_disabled() {
2618        let message = r#" {"acks":[0]} "#;
2619        let (_, address, _guard) = source(None).await;
2620
2621        let opts = SendWithOpts {
2622            channel: Some(Channel::Header("guid")),
2623            forwarded_for: None,
2624        };
2625
2626        assert_eq!(
2627            400,
2628            send_with(address, "services/collector/ack", message, TOKEN, &opts).await
2629        );
2630    }
2631
2632    #[test]
2633    fn output_schema_definition_vector_namespace() {
2634        let config = SplunkConfig {
2635            log_namespace: Some(true),
2636            ..Default::default()
2637        };
2638
2639        let definition = config
2640            .outputs(LogNamespace::Vector)
2641            .remove(0)
2642            .schema_definition(true);
2643
2644        let expected_definition = Definition::new_with_default_metadata(
2645            Kind::object(Collection::empty()).or_bytes(),
2646            [LogNamespace::Vector],
2647        )
2648        .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
2649        .with_metadata_field(
2650            &owned_value_path!("vector", "source_type"),
2651            Kind::bytes(),
2652            None,
2653        )
2654        .with_metadata_field(
2655            &owned_value_path!("vector", "ingest_timestamp"),
2656            Kind::timestamp(),
2657            None,
2658        )
2659        .with_metadata_field(
2660            &owned_value_path!("splunk_hec", "host"),
2661            Kind::bytes(),
2662            Some("host"),
2663        )
2664        .with_metadata_field(
2665            &owned_value_path!("splunk_hec", "index"),
2666            Kind::bytes(),
2667            None,
2668        )
2669        .with_metadata_field(
2670            &owned_value_path!("splunk_hec", "source"),
2671            Kind::bytes(),
2672            Some("service"),
2673        )
2674        .with_metadata_field(
2675            &owned_value_path!("splunk_hec", "channel"),
2676            Kind::bytes(),
2677            None,
2678        )
2679        .with_metadata_field(
2680            &owned_value_path!("splunk_hec", "sourcetype"),
2681            Kind::bytes(),
2682            None,
2683        );
2684
2685        assert_eq!(definition, Some(expected_definition));
2686    }
2687
2688    #[test]
2689    fn output_schema_definition_legacy_namespace() {
2690        let config = SplunkConfig::default();
2691        let definitions = config
2692            .outputs(LogNamespace::Legacy)
2693            .remove(0)
2694            .schema_definition(true);
2695
2696        let expected_definition = Definition::new_with_default_metadata(
2697            Kind::object(Collection::empty()),
2698            [LogNamespace::Legacy],
2699        )
2700        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
2701        .with_event_field(
2702            &owned_value_path!("message"),
2703            Kind::bytes().or_undefined(),
2704            Some("message"),
2705        )
2706        .with_event_field(
2707            &owned_value_path!("line"),
2708            Kind::array(Collection::empty())
2709                .or_object(Collection::empty())
2710                .or_undefined(),
2711            None,
2712        )
2713        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
2714        .with_event_field(&owned_value_path!("splunk_channel"), Kind::bytes(), None)
2715        .with_event_field(&owned_value_path!("splunk_index"), Kind::bytes(), None)
2716        .with_event_field(
2717            &owned_value_path!("splunk_source"),
2718            Kind::bytes(),
2719            Some("service"),
2720        )
2721        .with_event_field(&owned_value_path!("splunk_sourcetype"), Kind::bytes(), None)
2722        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
2723
2724        assert_eq!(definitions, Some(expected_definition));
2725    }
2726
2727    impl ValidatableComponent for SplunkConfig {
2728        fn validation_configuration() -> ValidationConfiguration {
2729            let config = Self {
2730                address: default_socket_address(),
2731                ..Default::default()
2732            };
2733
2734            let listen_addr_http = format!("http://{}/services/collector/event", config.address);
2735            let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
2736
2737            let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into();
2738            let framing = BytesDecoderConfig::new().into();
2739            let decoding = DeserializerConfig::Json(Default::default());
2740
2741            let external_resource = ExternalResource::new(
2742                ResourceDirection::Push,
2743                HttpResourceConfig::from_parts(uri, None).with_headers(HashMap::from([(
2744                    X_SPLUNK_REQUEST_CHANNEL.to_string(),
2745                    "channel".to_string(),
2746                )])),
2747                DecodingConfig::new(framing, decoding, false.into()),
2748            );
2749
2750            ValidationConfiguration::from_source(
2751                Self::NAME,
2752                log_namespace,
2753                vec![ComponentTestCaseConfig::from_source(
2754                    config,
2755                    None,
2756                    Some(external_resource),
2757                )],
2758            )
2759        }
2760    }
2761
2762    register_validatable_component!(SplunkConfig);
2763}