vector/sources/splunk_hec/
mod.rs

1use std::{
2    collections::HashMap,
3    convert::Infallible,
4    io::Read,
5    net::{Ipv4Addr, SocketAddr},
6    sync::Arc,
7    time::Duration,
8};
9
10use bytes::{Buf, Bytes};
11use chrono::{DateTime, TimeZone, Utc};
12use flate2::read::MultiGzDecoder;
13use futures::FutureExt;
14use http::StatusCode;
15use hyper::{Server, service::make_service_fn};
16use serde::{Serialize, de::DeserializeOwned};
17use serde_json::{
18    Deserializer, Value as JsonValue,
19    de::{Read as JsonRead, StrRead},
20};
21use snafu::Snafu;
22use tokio::net::TcpStream;
23use tower::ServiceBuilder;
24use tracing::Span;
25use vector_lib::{
26    EstimatedJsonEncodedSizeOf,
27    config::{LegacyKey, LogNamespace},
28    configurable::configurable_component,
29    event::BatchNotifier,
30    internal_event::{CountByteSize, InternalEventHandle as _, Registered},
31    lookup::{self, event_path, lookup_v2::OptionalValuePath, owned_value_path},
32    schema::meaning,
33    sensitive_string::SensitiveString,
34    tls::MaybeTlsIncomingStream,
35};
36use vrl::{
37    path::OwnedTargetPath,
38    value::{Kind, kind::Collection},
39};
40use warp::{
41    Filter, Reply,
42    filters::BoxedFilter,
43    http::header::{CONTENT_TYPE, HeaderValue},
44    path,
45    reject::Rejection,
46    reply::Response,
47};
48
49use self::{
50    acknowledgements::{
51        HecAckStatusRequest, HecAckStatusResponse, HecAcknowledgementsConfig,
52        IndexerAcknowledgement,
53    },
54    splunk_response::{HecResponse, HecResponseMetadata, HecStatusCode},
55};
56use crate::{
57    SourceSender,
58    config::{DataType, Resource, SourceConfig, SourceContext, SourceOutput, log_schema},
59    event::{Event, LogEvent, Value},
60    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
61    internal_events::{
62        EventsReceived, HttpBytesReceived, SplunkHecRequestBodyInvalidError, SplunkHecRequestError,
63    },
64    serde::bool_or_struct,
65    source_sender::ClosedError,
66    tls::{MaybeTlsSettings, TlsEnableableConfig},
67};
68
69mod acknowledgements;
70
71// Event fields unique to splunk_hec source
72pub const CHANNEL: &str = "splunk_channel";
73pub const INDEX: &str = "splunk_index";
74pub const SOURCE: &str = "splunk_source";
75pub const SOURCETYPE: &str = "splunk_sourcetype";
76
77const X_SPLUNK_REQUEST_CHANNEL: &str = "x-splunk-request-channel";
78
79/// Configuration for the `splunk_hec` source.
80#[configurable_component(source("splunk_hec", "Receive logs from Splunk."))]
81#[derive(Clone, Debug)]
82#[serde(deny_unknown_fields, default)]
83pub struct SplunkConfig {
84    /// The socket address to listen for connections on.
85    ///
86    /// The address _must_ include a port.
87    #[serde(default = "default_socket_address")]
88    pub address: SocketAddr,
89
90    /// Optional authorization token.
91    ///
92    /// If supplied, incoming requests must supply this token in the `Authorization` header, just as a client would if
93    /// it was communicating with the Splunk HEC endpoint directly.
94    ///
95    /// If _not_ supplied, the `Authorization` header is ignored and requests are not authenticated.
96    #[configurable(deprecated = "This option has been deprecated, use `valid_tokens` instead.")]
97    token: Option<SensitiveString>,
98
99    /// A list of valid authorization tokens.
100    ///
101    /// If supplied, incoming requests must supply one of these tokens in the `Authorization` header, just as a client
102    /// would if it was communicating with the Splunk HEC endpoint directly.
103    ///
104    /// If _not_ supplied, the `Authorization` header is ignored and requests are not authenticated.
105    #[configurable(metadata(docs::examples = "A94A8FE5CCB19BA61C4C08"))]
106    valid_tokens: Option<Vec<SensitiveString>>,
107
108    /// Whether or not to forward the Splunk HEC authentication token with events.
109    ///
110    /// If set to `true`, when incoming requests contain a Splunk HEC token, the token used is kept in the
111    /// event metadata and preferentially used if the event is sent to a Splunk HEC sink.
112    store_hec_token: bool,
113
114    #[configurable(derived)]
115    tls: Option<TlsEnableableConfig>,
116
117    #[configurable(derived)]
118    #[serde(deserialize_with = "bool_or_struct")]
119    acknowledgements: HecAcknowledgementsConfig,
120
121    /// The namespace to use for logs. This overrides the global settings.
122    #[configurable(metadata(docs::hidden))]
123    #[serde(default)]
124    log_namespace: Option<bool>,
125
126    #[configurable(derived)]
127    #[serde(default)]
128    keepalive: KeepaliveConfig,
129}
130
131impl_generate_config_from_default!(SplunkConfig);
132
133impl Default for SplunkConfig {
134    fn default() -> Self {
135        SplunkConfig {
136            address: default_socket_address(),
137            token: None,
138            valid_tokens: None,
139            tls: None,
140            acknowledgements: Default::default(),
141            store_hec_token: false,
142            log_namespace: None,
143            keepalive: Default::default(),
144        }
145    }
146}
147
148fn default_socket_address() -> SocketAddr {
149    SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), 8088)
150}
151
152#[async_trait::async_trait]
153#[typetag::serde(name = "splunk_hec")]
154impl SourceConfig for SplunkConfig {
155    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
156        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
157        let shutdown = cx.shutdown.clone();
158        let out = cx.out.clone();
159        let source = SplunkSource::new(self, tls.http_protocol_name(), cx);
160
161        let event_service = source.event_service(out.clone());
162        let raw_service = source.raw_service(out);
163        let health_service = source.health_service();
164        let ack_service = source.ack_service();
165        let options = SplunkSource::options();
166
167        let services = path!("services" / "collector" / ..)
168            .and(
169                event_service
170                    .or(raw_service)
171                    .unify()
172                    .or(health_service)
173                    .unify()
174                    .or(ack_service)
175                    .unify()
176                    .or(options)
177                    .unify(),
178            )
179            .or_else(finish_err);
180
181        let listener = tls.bind(&self.address).await?;
182
183        let keepalive_settings = self.keepalive.clone();
184        Ok(Box::pin(async move {
185            let span = Span::current();
186            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
187                let svc = ServiceBuilder::new()
188                    .layer(build_http_trace_layer(span.clone()))
189                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
190                        MaxConnectionAgeLayer::new(
191                            Duration::from_secs(secs),
192                            keepalive_settings.max_connection_age_jitter_factor,
193                            conn.peer_addr(),
194                        )
195                    }))
196                    .service(warp::service(services.clone()));
197                futures_util::future::ok::<_, Infallible>(svc)
198            });
199
200            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
201                .serve(make_svc)
202                .with_graceful_shutdown(shutdown.map(|_| ()))
203                .await
204                .map_err(|err| {
205                    error!("An error occurred: {:?}.", err);
206                })?;
207
208            Ok(())
209        }))
210    }
211
212    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
213        let log_namespace = global_log_namespace.merge(self.log_namespace);
214
215        let schema_definition = match log_namespace {
216            LogNamespace::Legacy => {
217                let definition = vector_lib::schema::Definition::empty_legacy_namespace()
218                    .with_event_field(
219                        &owned_value_path!("line"),
220                        Kind::object(Collection::empty())
221                            .or_array(Collection::empty())
222                            .or_undefined(),
223                        None,
224                    );
225
226                if let Some(message_key) = log_schema().message_key() {
227                    definition.with_event_field(
228                        message_key,
229                        Kind::bytes().or_undefined(),
230                        Some(meaning::MESSAGE),
231                    )
232                } else {
233                    definition
234                }
235            }
236            LogNamespace::Vector => vector_lib::schema::Definition::new_with_default_metadata(
237                Kind::bytes().or_object(Collection::empty()),
238                [log_namespace],
239            )
240            .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE),
241        }
242        .with_standard_vector_source_metadata()
243        .with_source_metadata(
244            SplunkConfig::NAME,
245            log_schema()
246                .host_key()
247                .cloned()
248                .map(LegacyKey::InsertIfEmpty),
249            &owned_value_path!("host"),
250            Kind::bytes(),
251            Some(meaning::HOST),
252        )
253        .with_source_metadata(
254            SplunkConfig::NAME,
255            Some(LegacyKey::Overwrite(owned_value_path!(CHANNEL))),
256            &owned_value_path!("channel"),
257            Kind::bytes(),
258            None,
259        )
260        .with_source_metadata(
261            SplunkConfig::NAME,
262            Some(LegacyKey::Overwrite(owned_value_path!(INDEX))),
263            &owned_value_path!("index"),
264            Kind::bytes(),
265            None,
266        )
267        .with_source_metadata(
268            SplunkConfig::NAME,
269            Some(LegacyKey::Overwrite(owned_value_path!(SOURCE))),
270            &owned_value_path!("source"),
271            Kind::bytes(),
272            Some(meaning::SERVICE),
273        )
274        // Not to be confused with `source_type`.
275        .with_source_metadata(
276            SplunkConfig::NAME,
277            Some(LegacyKey::Overwrite(owned_value_path!(SOURCETYPE))),
278            &owned_value_path!("sourcetype"),
279            Kind::bytes(),
280            None,
281        );
282
283        vec![SourceOutput::new_maybe_logs(
284            DataType::Log,
285            schema_definition,
286        )]
287    }
288
289    fn resources(&self) -> Vec<Resource> {
290        vec![Resource::tcp(self.address)]
291    }
292
293    fn can_acknowledge(&self) -> bool {
294        true
295    }
296}
297
298/// Shared data for responding to requests.
299struct SplunkSource {
300    valid_credentials: Vec<String>,
301    protocol: &'static str,
302    idx_ack: Option<Arc<IndexerAcknowledgement>>,
303    store_hec_token: bool,
304    log_namespace: LogNamespace,
305    events_received: Registered<EventsReceived>,
306}
307
308impl SplunkSource {
309    fn new(config: &SplunkConfig, protocol: &'static str, cx: SourceContext) -> Self {
310        let log_namespace = cx.log_namespace(config.log_namespace);
311        let acknowledgements = cx.do_acknowledgements(config.acknowledgements.enabled.into());
312        let shutdown = cx.shutdown;
313        let valid_tokens = config
314            .valid_tokens
315            .iter()
316            .flatten()
317            .chain(config.token.iter());
318
319        let idx_ack = acknowledgements.then(|| {
320            Arc::new(IndexerAcknowledgement::new(
321                config.acknowledgements.clone(),
322                shutdown,
323            ))
324        });
325
326        SplunkSource {
327            valid_credentials: valid_tokens
328                .map(|token| format!("Splunk {}", token.inner()))
329                .collect(),
330            protocol,
331            idx_ack,
332            store_hec_token: config.store_hec_token,
333            log_namespace,
334            events_received: register!(EventsReceived),
335        }
336    }
337
338    fn event_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
339        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
340            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
341        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
342
343        let splunk_channel = splunk_channel_header
344            .and(splunk_channel_query_param)
345            .map(|header: Option<String>, query_param| header.or(query_param));
346
347        let protocol = self.protocol;
348        let idx_ack = self.idx_ack.clone();
349        let store_hec_token = self.store_hec_token;
350        let log_namespace = self.log_namespace;
351        let events_received = self.events_received.clone();
352
353        warp::post()
354            .and(
355                path!("event")
356                    .or(path!("event" / "1.0"))
357                    .or(warp::path::end()),
358            )
359            .and(self.authorization())
360            .and(splunk_channel)
361            .and(warp::addr::remote())
362            .and(warp::header::optional::<String>("X-Forwarded-For"))
363            .and(self.gzip())
364            .and(warp::body::bytes())
365            .and(warp::path::full())
366            .and_then(
367                move |_,
368                      token: Option<String>,
369                      channel: Option<String>,
370                      remote: Option<SocketAddr>,
371                      remote_addr: Option<String>,
372                      gzip: bool,
373                      body: Bytes,
374                      path: warp::path::FullPath| {
375                    let mut out = out.clone();
376                    let idx_ack = idx_ack.clone();
377                    let events_received = events_received.clone();
378
379                    async move {
380                        if idx_ack.is_some() && channel.is_none() {
381                            return Err(Rejection::from(ApiError::MissingChannel));
382                        }
383
384                        let mut data = Vec::new();
385                        let (byte_size, body) = if gzip {
386                            MultiGzDecoder::new(body.reader())
387                                .read_to_end(&mut data)
388                                .map_err(|_| Rejection::from(ApiError::BadRequest))?;
389                            (data.len(), String::from_utf8_lossy(data.as_slice()))
390                        } else {
391                            (body.len(), String::from_utf8_lossy(body.as_ref()))
392                        };
393                        emit!(HttpBytesReceived {
394                            byte_size,
395                            http_path: path.as_str(),
396                            protocol,
397                        });
398
399                        let (batch, receiver) =
400                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
401                        let maybe_ack_id = match (idx_ack, receiver, channel.clone()) {
402                            (Some(idx_ack), Some(receiver), Some(channel_id)) => {
403                                match idx_ack.get_ack_id_from_channel(channel_id, receiver).await {
404                                    Ok(ack_id) => Some(ack_id),
405                                    Err(rej) => return Err(rej),
406                                }
407                            }
408                            _ => None,
409                        };
410
411                        let mut error = None;
412                        let mut events = Vec::new();
413
414                        let iter: EventIterator<'_, StrRead<'_>> = EventIteratorGenerator {
415                            deserializer: Deserializer::from_str(&body).into_iter::<JsonValue>(),
416                            channel,
417                            remote,
418                            remote_addr,
419                            batch,
420                            token: token.filter(|_| store_hec_token).map(Into::into),
421                            log_namespace,
422                            events_received,
423                        }
424                        .into();
425
426                        for result in iter {
427                            match result {
428                                Ok(event) => events.push(event),
429                                Err(err) => {
430                                    error = Some(err);
431                                    break;
432                                }
433                            }
434                        }
435
436                        if !events.is_empty()
437                            && let Err(ClosedError) = out.send_batch(events).await
438                        {
439                            return Err(Rejection::from(ApiError::ServerShutdown));
440                        }
441
442                        if let Some(error) = error {
443                            Err(error)
444                        } else {
445                            Ok(maybe_ack_id)
446                        }
447                    }
448                },
449            )
450            .map(finish_ok)
451            .boxed()
452    }
453
454    fn raw_service(&self, out: SourceSender) -> BoxedFilter<(Response,)> {
455        let protocol = self.protocol;
456        let idx_ack = self.idx_ack.clone();
457        let store_hec_token = self.store_hec_token;
458        let events_received = self.events_received.clone();
459        let log_namespace = self.log_namespace;
460
461        warp::post()
462            .and(path!("raw" / "1.0").or(path!("raw")))
463            .and(self.authorization())
464            .and(SplunkSource::required_channel())
465            .and(warp::addr::remote())
466            .and(warp::header::optional::<String>("X-Forwarded-For"))
467            .and(self.gzip())
468            .and(warp::body::bytes())
469            .and(warp::path::full())
470            .and_then(
471                move |_,
472                      token: Option<String>,
473                      channel_id: String,
474                      remote: Option<SocketAddr>,
475                      xff: Option<String>,
476                      gzip: bool,
477                      body: Bytes,
478                      path: warp::path::FullPath| {
479                    let mut out = out.clone();
480                    let idx_ack = idx_ack.clone();
481                    let events_received = events_received.clone();
482                    emit!(HttpBytesReceived {
483                        byte_size: body.len(),
484                        http_path: path.as_str(),
485                        protocol,
486                    });
487
488                    async move {
489                        let (batch, receiver) =
490                            BatchNotifier::maybe_new_with_receiver(idx_ack.is_some());
491                        let maybe_ack_id = match (idx_ack, receiver) {
492                            (Some(idx_ack), Some(receiver)) => Some(
493                                idx_ack
494                                    .get_ack_id_from_channel(channel_id.clone(), receiver)
495                                    .await?,
496                            ),
497                            _ => None,
498                        };
499                        let mut event = raw_event(
500                            body,
501                            gzip,
502                            channel_id,
503                            remote,
504                            xff,
505                            batch,
506                            log_namespace,
507                            &events_received,
508                        )?;
509                        if let Some(token) = token.filter(|_| store_hec_token) {
510                            event.metadata_mut().set_splunk_hec_token(token.into());
511                        }
512
513                        let res = out.send_event(event).await;
514                        res.map(|_| maybe_ack_id)
515                            .map_err(|_| Rejection::from(ApiError::ServerShutdown))
516                    }
517                },
518            )
519            .map(finish_ok)
520            .boxed()
521    }
522
523    fn health_service(&self) -> BoxedFilter<(Response,)> {
524        // The Splunk docs document this endpoint as returning a 400 if given an invalid Splunk
525        // token, but, in practice, it seems to ignore the token altogether
526        //
527        // The response body was taken from Splunk 8.2.4
528        //
529        // https://docs.splunk.com/Documentation/Splunk/8.2.5/RESTREF/RESTinput#services.2Fcollector.2Fhealth
530        warp::get()
531            .and(path!("health" / "1.0").or(path!("health")))
532            .map(move |_| {
533                http::Response::builder()
534                    .header(http::header::CONTENT_TYPE, "application/json")
535                    .body(hyper::Body::from(r#"{"text":"HEC is healthy","code":17}"#))
536                    .expect("static response")
537            })
538            .boxed()
539    }
540
541    fn lenient_json_content_type_check<T>() -> impl Filter<Extract = (T,), Error = Rejection> + Clone
542    where
543        T: Send + DeserializeOwned + 'static,
544    {
545        warp::header::optional::<HeaderValue>(CONTENT_TYPE.as_str())
546            .and(warp::body::bytes())
547            .and_then(
548                |ctype: Option<HeaderValue>, body: bytes::Bytes| async move {
549                    let ok = ctype
550                        .as_ref()
551                        .and_then(|v| v.to_str().ok())
552                        .map(|h| h.to_ascii_lowercase().contains("application/json"))
553                        .unwrap_or(true);
554
555                    if !ok {
556                        return Err(warp::reject::custom(ApiError::UnsupportedContentType));
557                    }
558
559                    let value = serde_json::from_slice::<T>(&body)
560                        .map_err(|_| warp::reject::custom(ApiError::BadRequest))?;
561
562                    Ok(value)
563                },
564            )
565    }
566
567    fn ack_service(&self) -> BoxedFilter<(Response,)> {
568        let idx_ack = self.idx_ack.clone();
569
570        warp::post()
571            .and(warp::path!("ack"))
572            .and(self.authorization())
573            .and(SplunkSource::required_channel())
574            .and(Self::lenient_json_content_type_check::<HecAckStatusRequest>())
575            .and_then(move |_, channel: String, req: HecAckStatusRequest| {
576                let idx_ack = idx_ack.clone();
577                async move {
578                    if let Some(idx_ack) = idx_ack {
579                        let acks = idx_ack
580                            .get_acks_status_from_channel(channel, &req.acks)
581                            .await?;
582                        Ok(warp::reply::json(&HecAckStatusResponse { acks }).into_response())
583                    } else {
584                        Err(warp::reject::custom(ApiError::AckIsDisabled))
585                    }
586                }
587            })
588            .boxed()
589    }
590
591    fn options() -> BoxedFilter<(Response,)> {
592        let post = warp::options()
593            .and(
594                path!("event")
595                    .or(path!("event" / "1.0"))
596                    .or(path!("raw" / "1.0"))
597                    .or(path!("raw")),
598            )
599            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "POST").into_response());
600
601        let get = warp::options()
602            .and(path!("health").or(path!("health" / "1.0")))
603            .map(|_| warp::reply::with_header(warp::reply(), "Allow", "GET").into_response());
604
605        post.or(get).unify().boxed()
606    }
607
608    /// Authorize request
609    fn authorization(&self) -> BoxedFilter<(Option<String>,)> {
610        let valid_credentials = self.valid_credentials.clone();
611        warp::header::optional("Authorization")
612            .and_then(move |token: Option<String>| {
613                let valid_credentials = valid_credentials.clone();
614                async move {
615                    match (token, valid_credentials.is_empty()) {
616                        // Remove the "Splunk " prefix if present as it is not
617                        // part of the token itself
618                        (token, true) => {
619                            Ok(token
620                                .map(|t| t.strip_prefix("Splunk ").map(Into::into).unwrap_or(t)))
621                        }
622                        (Some(token), false) if valid_credentials.contains(&token) => Ok(Some(
623                            token
624                                .strip_prefix("Splunk ")
625                                .map(Into::into)
626                                .unwrap_or(token),
627                        )),
628                        (Some(_), false) => Err(Rejection::from(ApiError::InvalidAuthorization)),
629                        (None, false) => Err(Rejection::from(ApiError::MissingAuthorization)),
630                    }
631                }
632            })
633            .boxed()
634    }
635
636    /// Is body encoded with gzip
637    fn gzip(&self) -> BoxedFilter<(bool,)> {
638        warp::header::optional::<String>("Content-Encoding")
639            .and_then(|encoding: Option<String>| async move {
640                match encoding {
641                    Some(s) if s.as_bytes() == b"gzip" => Ok(true),
642                    Some(_) => Err(Rejection::from(ApiError::UnsupportedEncoding)),
643                    None => Ok(false),
644                }
645            })
646            .boxed()
647    }
648
649    fn required_channel() -> BoxedFilter<(String,)> {
650        let splunk_channel_query_param = warp::query::<HashMap<String, String>>()
651            .map(|qs: HashMap<String, String>| qs.get("channel").map(|v| v.to_owned()));
652        let splunk_channel_header = warp::header::optional::<String>(X_SPLUNK_REQUEST_CHANNEL);
653
654        splunk_channel_header
655            .and(splunk_channel_query_param)
656            .and_then(|header: Option<String>, query_param| async move {
657                header
658                    .or(query_param)
659                    .ok_or_else(|| Rejection::from(ApiError::MissingChannel))
660            })
661            .boxed()
662    }
663}
664/// Constructs one or more events from json-s coming from reader.
665/// If errors, it's done with input.
666struct EventIterator<'de, R: JsonRead<'de>> {
667    /// Remaining request with JSON events
668    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
669    /// Count of sent events
670    events: usize,
671    /// Optional channel from headers
672    channel: Option<Value>,
673    /// Default time
674    time: Time,
675    /// Remaining extracted default values
676    extractors: [DefaultExtractor; 4],
677    /// Event finalization
678    batch: Option<BatchNotifier>,
679    /// Splunk HEC Token for passthrough
680    token: Option<Arc<str>>,
681    /// Lognamespace to put the events in
682    log_namespace: LogNamespace,
683    /// handle to EventsReceived registry
684    events_received: Registered<EventsReceived>,
685}
686
687/// Intermediate struct to generate an `EventIterator`
688struct EventIteratorGenerator<'de, R: JsonRead<'de>> {
689    deserializer: serde_json::StreamDeserializer<'de, R, JsonValue>,
690    channel: Option<String>,
691    batch: Option<BatchNotifier>,
692    token: Option<Arc<str>>,
693    log_namespace: LogNamespace,
694    events_received: Registered<EventsReceived>,
695    remote: Option<SocketAddr>,
696    remote_addr: Option<String>,
697}
698
699impl<'de, R: JsonRead<'de>> From<EventIteratorGenerator<'de, R>> for EventIterator<'de, R> {
700    fn from(f: EventIteratorGenerator<'de, R>) -> Self {
701        Self {
702            deserializer: f.deserializer,
703            events: 0,
704            channel: f.channel.map(Value::from),
705            time: Time::Now(Utc::now()),
706            extractors: [
707                // Extract the host field with the given priority:
708                // 1. The host field is present in the event payload
709                // 2. The x-forwarded-for header is present in the incoming request
710                // 3. Use the `remote`: SocketAddr value provided by warp
711                DefaultExtractor::new_with(
712                    "host",
713                    log_schema().host_key().cloned().into(),
714                    f.remote_addr
715                        .or_else(|| f.remote.map(|addr| addr.to_string()))
716                        .map(Value::from),
717                    f.log_namespace,
718                ),
719                DefaultExtractor::new("index", OptionalValuePath::new(INDEX), f.log_namespace),
720                DefaultExtractor::new("source", OptionalValuePath::new(SOURCE), f.log_namespace),
721                DefaultExtractor::new(
722                    "sourcetype",
723                    OptionalValuePath::new(SOURCETYPE),
724                    f.log_namespace,
725                ),
726            ],
727            batch: f.batch,
728            token: f.token,
729            log_namespace: f.log_namespace,
730            events_received: f.events_received,
731        }
732    }
733}
734
735impl<'de, R: JsonRead<'de>> EventIterator<'de, R> {
736    fn build_event(&mut self, mut json: JsonValue) -> Result<Event, Rejection> {
737        // Construct Event from parsed json event
738        let mut log = match self.log_namespace {
739            LogNamespace::Vector => self.build_log_vector(&mut json)?,
740            LogNamespace::Legacy => self.build_log_legacy(&mut json)?,
741        };
742
743        // Add source type
744        self.log_namespace.insert_vector_metadata(
745            &mut log,
746            log_schema().source_type_key(),
747            &owned_value_path!("source_type"),
748            SplunkConfig::NAME,
749        );
750
751        // Process channel field
752        let channel_path = owned_value_path!(CHANNEL);
753        if let Some(JsonValue::String(guid)) = json.get_mut("channel").map(JsonValue::take) {
754            self.log_namespace.insert_source_metadata(
755                SplunkConfig::NAME,
756                &mut log,
757                Some(LegacyKey::Overwrite(&channel_path)),
758                lookup::path!(CHANNEL),
759                guid,
760            );
761        } else if let Some(guid) = self.channel.as_ref() {
762            self.log_namespace.insert_source_metadata(
763                SplunkConfig::NAME,
764                &mut log,
765                Some(LegacyKey::Overwrite(&channel_path)),
766                lookup::path!(CHANNEL),
767                guid.clone(),
768            );
769        }
770
771        // Process fields field
772        if let Some(JsonValue::Object(object)) = json.get_mut("fields").map(JsonValue::take) {
773            for (key, value) in object {
774                self.log_namespace.insert_source_metadata(
775                    SplunkConfig::NAME,
776                    &mut log,
777                    Some(LegacyKey::Overwrite(&owned_value_path!(key.as_str()))),
778                    lookup::path!(key.as_str()),
779                    value,
780                );
781            }
782        }
783
784        // Process time field
785        let parsed_time = match json.get_mut("time").map(JsonValue::take) {
786            Some(JsonValue::Number(time)) => Some(Some(time)),
787            Some(JsonValue::String(time)) => Some(time.parse::<serde_json::Number>().ok()),
788            _ => None,
789        };
790
791        match parsed_time {
792            None => (),
793            Some(Some(t)) => {
794                if let Some(t) = t.as_u64() {
795                    let time = parse_timestamp(t as i64)
796                        .ok_or(ApiError::InvalidDataFormat { event: self.events })?;
797
798                    self.time = Time::Provided(time);
799                } else if let Some(t) = t.as_f64() {
800                    self.time = Time::Provided(
801                        Utc.timestamp_opt(
802                            t.floor() as i64,
803                            (t.fract() * 1000.0 * 1000.0 * 1000.0) as u32,
804                        )
805                        .single()
806                        .expect("invalid timestamp"),
807                    );
808                } else {
809                    return Err(ApiError::InvalidDataFormat { event: self.events }.into());
810                }
811            }
812            Some(None) => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
813        }
814
815        // Add time field
816        let timestamp = match self.time.clone() {
817            Time::Provided(time) => time,
818            Time::Now(time) => time,
819        };
820
821        self.log_namespace.insert_source_metadata(
822            SplunkConfig::NAME,
823            &mut log,
824            log_schema().timestamp_key().map(LegacyKey::Overwrite),
825            lookup::path!("timestamp"),
826            timestamp,
827        );
828
829        // Extract default extracted fields
830        for de in self.extractors.iter_mut() {
831            de.extract(&mut log, &mut json);
832        }
833
834        // Add passthrough token if present
835        if let Some(token) = &self.token {
836            log.metadata_mut().set_splunk_hec_token(Arc::clone(token));
837        }
838
839        if let Some(batch) = self.batch.clone() {
840            log = log.with_batch_notifier(&batch);
841        }
842
843        self.events += 1;
844
845        Ok(log.into())
846    }
847
848    /// Build the log event for the vector namespace.
849    /// In this namespace the log event is created entirely from the event field.
850    /// No renaming of the `line` field is done.
851    fn build_log_vector(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
852        match json.get("event") {
853            Some(event) => {
854                let event: Value = event.into();
855                let mut log = LogEvent::from(event);
856
857                // EstimatedJsonSizeOf must be calculated before enrichment
858                self.events_received
859                    .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
860
861                // The timestamp is extracted from the message for the Legacy namespace.
862                self.log_namespace.insert_vector_metadata(
863                    &mut log,
864                    log_schema().timestamp_key(),
865                    lookup::path!("ingest_timestamp"),
866                    chrono::Utc::now(),
867                );
868
869                Ok(log)
870            }
871            None => Err(ApiError::MissingEventField { event: self.events }.into()),
872        }
873    }
874
875    /// Build the log event for the legacy namespace.
876    /// If the event is a string, or the event contains a field called `line` that is a string
877    /// (the docker splunk logger places the message in the event.line field) that string
878    /// is placed in the message field.
879    fn build_log_legacy(&mut self, json: &mut JsonValue) -> Result<LogEvent, Rejection> {
880        let mut log = LogEvent::default();
881        match json.get_mut("event") {
882            Some(event) => match event.take() {
883                JsonValue::String(string) => {
884                    if string.is_empty() {
885                        return Err(ApiError::EmptyEventField { event: self.events }.into());
886                    }
887                    log.maybe_insert(log_schema().message_key_target_path(), string);
888                }
889                JsonValue::Object(mut object) => {
890                    if object.is_empty() {
891                        return Err(ApiError::EmptyEventField { event: self.events }.into());
892                    }
893
894                    // Add 'line' value as 'event::schema().message_key'
895                    if let Some(line) = object.remove("line") {
896                        match line {
897                            // This don't quite fit the meaning of a event::schema().message_key
898                            JsonValue::Array(_) | JsonValue::Object(_) => {
899                                log.insert(event_path!("line"), line);
900                            }
901                            _ => {
902                                log.maybe_insert(log_schema().message_key_target_path(), line);
903                            }
904                        }
905                    }
906
907                    for (key, value) in object {
908                        log.insert(event_path!(key.as_str()), value);
909                    }
910                }
911                _ => return Err(ApiError::InvalidDataFormat { event: self.events }.into()),
912            },
913            None => return Err(ApiError::MissingEventField { event: self.events }.into()),
914        };
915
916        // EstimatedJsonSizeOf must be calculated before enrichment
917        self.events_received
918            .emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
919
920        Ok(log)
921    }
922}
923
924impl<'de, R: JsonRead<'de>> Iterator for EventIterator<'de, R> {
925    type Item = Result<Event, Rejection>;
926
927    fn next(&mut self) -> Option<Self::Item> {
928        match self.deserializer.next() {
929            Some(Ok(json)) => Some(self.build_event(json)),
930            None => {
931                if self.events == 0 {
932                    Some(Err(ApiError::NoData.into()))
933                } else {
934                    None
935                }
936            }
937            Some(Err(error)) => {
938                emit!(SplunkHecRequestBodyInvalidError {
939                    error: error.into()
940                });
941                Some(Err(
942                    ApiError::InvalidDataFormat { event: self.events }.into()
943                ))
944            }
945        }
946    }
947}
948
949/// Parse a `i64` unix timestamp that can either be in seconds, milliseconds or
950/// nanoseconds.
951///
952/// This attempts to parse timestamps based on what cutoff range they fall into.
953/// For seconds to be parsed the timestamp must be less than the unix epoch of
954/// the year `2400`. For this to parse milliseconds the time must be smaller
955/// than the year `10,000` in unix epoch milliseconds. If the value is larger
956/// than both we attempt to parse it as nanoseconds.
957///
958/// Returns `None` if `t` is negative.
959fn parse_timestamp(t: i64) -> Option<DateTime<Utc>> {
960    // Utc.ymd(2400, 1, 1).and_hms(0,0,0).timestamp();
961    const SEC_CUTOFF: i64 = 13569465600;
962    // Utc.ymd(10_000, 1, 1).and_hms(0,0,0).timestamp_millis();
963    const MILLISEC_CUTOFF: i64 = 253402300800000;
964
965    // Timestamps can't be negative!
966    if t < 0 {
967        return None;
968    }
969
970    let ts = if t < SEC_CUTOFF {
971        Utc.timestamp_opt(t, 0).single().expect("invalid timestamp")
972    } else if t < MILLISEC_CUTOFF {
973        Utc.timestamp_millis_opt(t)
974            .single()
975            .expect("invalid timestamp")
976    } else {
977        Utc.timestamp_nanos(t)
978    };
979
980    Some(ts)
981}
982
983/// Maintains last known extracted value of field and uses it in the absence of field.
984struct DefaultExtractor {
985    field: &'static str,
986    to_field: OptionalValuePath,
987    value: Option<Value>,
988    log_namespace: LogNamespace,
989}
990
991impl DefaultExtractor {
992    const fn new(
993        field: &'static str,
994        to_field: OptionalValuePath,
995        log_namespace: LogNamespace,
996    ) -> Self {
997        DefaultExtractor {
998            field,
999            to_field,
1000            value: None,
1001            log_namespace,
1002        }
1003    }
1004
1005    fn new_with(
1006        field: &'static str,
1007        to_field: OptionalValuePath,
1008        value: impl Into<Option<Value>>,
1009        log_namespace: LogNamespace,
1010    ) -> Self {
1011        DefaultExtractor {
1012            field,
1013            to_field,
1014            value: value.into(),
1015            log_namespace,
1016        }
1017    }
1018
1019    fn extract(&mut self, log: &mut LogEvent, value: &mut JsonValue) {
1020        // Process json_field
1021        if let Some(JsonValue::String(new_value)) = value.get_mut(self.field).map(JsonValue::take) {
1022            self.value = Some(new_value.into());
1023        }
1024
1025        // Add data field
1026        if let Some(index) = self.value.as_ref()
1027            && let Some(metadata_key) = self.to_field.path.as_ref()
1028        {
1029            self.log_namespace.insert_source_metadata(
1030                SplunkConfig::NAME,
1031                log,
1032                Some(LegacyKey::Overwrite(metadata_key)),
1033                &self.to_field.path.clone().unwrap_or(owned_value_path!("")),
1034                index.clone(),
1035            )
1036        }
1037    }
1038}
1039
1040/// For tracking origin of the timestamp
1041#[derive(Clone, Debug)]
1042enum Time {
1043    /// Backup
1044    Now(DateTime<Utc>),
1045    /// Provided in the request
1046    Provided(DateTime<Utc>),
1047}
1048
1049/// Creates event from raw request
1050#[allow(clippy::too_many_arguments)]
1051fn raw_event(
1052    bytes: Bytes,
1053    gzip: bool,
1054    channel: String,
1055    remote: Option<SocketAddr>,
1056    xff: Option<String>,
1057    batch: Option<BatchNotifier>,
1058    log_namespace: LogNamespace,
1059    events_received: &Registered<EventsReceived>,
1060) -> Result<Event, Rejection> {
1061    // Process gzip
1062    let message: Value = if gzip {
1063        let mut data = Vec::new();
1064        match MultiGzDecoder::new(bytes.reader()).read_to_end(&mut data) {
1065            Ok(0) => return Err(ApiError::NoData.into()),
1066            Ok(_) => Value::from(Bytes::from(data)),
1067            Err(error) => {
1068                emit!(SplunkHecRequestBodyInvalidError { error });
1069                return Err(ApiError::InvalidDataFormat { event: 0 }.into());
1070            }
1071        }
1072    } else {
1073        bytes.into()
1074    };
1075
1076    // Construct event
1077    let mut log = match log_namespace {
1078        LogNamespace::Vector => LogEvent::from(message),
1079        LogNamespace::Legacy => {
1080            let mut log = LogEvent::default();
1081            log.maybe_insert(log_schema().message_key_target_path(), message);
1082            log
1083        }
1084    };
1085    // We need to calculate the estimated json size of the event BEFORE enrichment.
1086    events_received.emit(CountByteSize(1, log.estimated_json_encoded_size_of()));
1087
1088    // Add channel
1089    log_namespace.insert_source_metadata(
1090        SplunkConfig::NAME,
1091        &mut log,
1092        Some(LegacyKey::Overwrite(&owned_value_path!(CHANNEL))),
1093        lookup::path!(CHANNEL),
1094        channel,
1095    );
1096
1097    // host-field priority for raw endpoint:
1098    // - x-forwarded-for is set to `host` field first, if present. If not present:
1099    // - set remote addr to host field
1100    let host = if let Some(remote_address) = xff {
1101        Some(remote_address)
1102    } else {
1103        remote.map(|remote| remote.to_string())
1104    };
1105
1106    if let Some(host) = host {
1107        log_namespace.insert_source_metadata(
1108            SplunkConfig::NAME,
1109            &mut log,
1110            log_schema().host_key().map(LegacyKey::InsertIfEmpty),
1111            lookup::path!("host"),
1112            host,
1113        );
1114    }
1115
1116    log_namespace.insert_standard_vector_source_metadata(&mut log, SplunkConfig::NAME, Utc::now());
1117
1118    if let Some(batch) = batch {
1119        log = log.with_batch_notifier(&batch);
1120    }
1121
1122    Ok(Event::from(log))
1123}
1124
1125#[derive(Clone, Copy, Debug, Snafu)]
1126pub(crate) enum ApiError {
1127    MissingAuthorization,
1128    InvalidAuthorization,
1129    UnsupportedEncoding,
1130    UnsupportedContentType,
1131    MissingChannel,
1132    NoData,
1133    InvalidDataFormat { event: usize },
1134    ServerShutdown,
1135    EmptyEventField { event: usize },
1136    MissingEventField { event: usize },
1137    BadRequest,
1138    ServiceUnavailable,
1139    AckIsDisabled,
1140}
1141
1142impl warp::reject::Reject for ApiError {}
1143
1144/// Cached bodies for common responses
1145mod splunk_response {
1146    use serde::Serialize;
1147
1148    // https://docs.splunk.com/Documentation/Splunk/8.2.3/Data/TroubleshootHTTPEventCollector#Possible_error_codes
1149    pub enum HecStatusCode {
1150        Success = 0,
1151        TokenIsRequired = 2,
1152        InvalidAuthorization = 3,
1153        NoData = 5,
1154        InvalidDataFormat = 6,
1155        ServerIsBusy = 9,
1156        DataChannelIsMissing = 10,
1157        EventFieldIsRequired = 12,
1158        EventFieldCannotBeBlank = 13,
1159        AckIsDisabled = 14,
1160    }
1161
1162    #[derive(Serialize)]
1163    pub enum HecResponseMetadata {
1164        #[serde(rename = "ackId")]
1165        AckId(u64),
1166        #[serde(rename = "invalid-event-number")]
1167        InvalidEventNumber(usize),
1168    }
1169
1170    #[derive(Serialize)]
1171    pub struct HecResponse {
1172        text: &'static str,
1173        code: u8,
1174        #[serde(skip_serializing_if = "Option::is_none", flatten)]
1175        pub metadata: Option<HecResponseMetadata>,
1176    }
1177
1178    impl HecResponse {
1179        pub const fn new(code: HecStatusCode) -> Self {
1180            let text = match code {
1181                HecStatusCode::Success => "Success",
1182                HecStatusCode::TokenIsRequired => "Token is required",
1183                HecStatusCode::InvalidAuthorization => "Invalid authorization",
1184                HecStatusCode::NoData => "No data",
1185                HecStatusCode::InvalidDataFormat => "Invalid data format",
1186                HecStatusCode::DataChannelIsMissing => "Data channel is missing",
1187                HecStatusCode::EventFieldIsRequired => "Event field is required",
1188                HecStatusCode::EventFieldCannotBeBlank => "Event field cannot be blank",
1189                HecStatusCode::ServerIsBusy => "Server is busy",
1190                HecStatusCode::AckIsDisabled => "Ack is disabled",
1191            };
1192
1193            Self {
1194                text,
1195                code: code as u8,
1196                metadata: None,
1197            }
1198        }
1199
1200        pub const fn with_metadata(mut self, metadata: HecResponseMetadata) -> Self {
1201            self.metadata = Some(metadata);
1202            self
1203        }
1204    }
1205
1206    pub const INVALID_AUTHORIZATION: HecResponse =
1207        HecResponse::new(HecStatusCode::InvalidAuthorization);
1208    pub const TOKEN_IS_REQUIRED: HecResponse = HecResponse::new(HecStatusCode::TokenIsRequired);
1209    pub const NO_DATA: HecResponse = HecResponse::new(HecStatusCode::NoData);
1210    pub const SUCCESS: HecResponse = HecResponse::new(HecStatusCode::Success);
1211    pub const SERVER_IS_BUSY: HecResponse = HecResponse::new(HecStatusCode::ServerIsBusy);
1212    pub const NO_CHANNEL: HecResponse = HecResponse::new(HecStatusCode::DataChannelIsMissing);
1213    pub const ACK_IS_DISABLED: HecResponse = HecResponse::new(HecStatusCode::AckIsDisabled);
1214}
1215
1216fn finish_ok(maybe_ack_id: Option<u64>) -> Response {
1217    let body = if let Some(ack_id) = maybe_ack_id {
1218        HecResponse::new(HecStatusCode::Success).with_metadata(HecResponseMetadata::AckId(ack_id))
1219    } else {
1220        splunk_response::SUCCESS
1221    };
1222    response_json(StatusCode::OK, body)
1223}
1224
1225fn response_plain(code: StatusCode, msg: &'static str) -> Response {
1226    warp::reply::with_status(
1227        warp::reply::with_header(msg, http::header::CONTENT_TYPE, "text/plain; charset=utf-8"),
1228        code,
1229    )
1230    .into_response()
1231}
1232
1233async fn finish_err(rejection: Rejection) -> Result<(Response,), Rejection> {
1234    if let Some(&error) = rejection.find::<ApiError>() {
1235        emit!(SplunkHecRequestError { error });
1236        Ok((match error {
1237            ApiError::MissingAuthorization => {
1238                response_json(StatusCode::UNAUTHORIZED, splunk_response::TOKEN_IS_REQUIRED)
1239            }
1240            ApiError::InvalidAuthorization => response_json(
1241                StatusCode::UNAUTHORIZED,
1242                splunk_response::INVALID_AUTHORIZATION,
1243            ),
1244            ApiError::UnsupportedEncoding => empty_response(StatusCode::UNSUPPORTED_MEDIA_TYPE),
1245            ApiError::UnsupportedContentType => response_plain(
1246                StatusCode::UNSUPPORTED_MEDIA_TYPE,
1247                "The request's content-type is not supported",
1248            ),
1249            ApiError::MissingChannel => {
1250                response_json(StatusCode::BAD_REQUEST, splunk_response::NO_CHANNEL)
1251            }
1252            ApiError::NoData => response_json(StatusCode::BAD_REQUEST, splunk_response::NO_DATA),
1253            ApiError::ServerShutdown => empty_response(StatusCode::SERVICE_UNAVAILABLE),
1254            ApiError::InvalidDataFormat { event } => response_json(
1255                StatusCode::BAD_REQUEST,
1256                HecResponse::new(HecStatusCode::InvalidDataFormat)
1257                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1258            ),
1259            ApiError::EmptyEventField { event } => response_json(
1260                StatusCode::BAD_REQUEST,
1261                HecResponse::new(HecStatusCode::EventFieldCannotBeBlank)
1262                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1263            ),
1264            ApiError::MissingEventField { event } => response_json(
1265                StatusCode::BAD_REQUEST,
1266                HecResponse::new(HecStatusCode::EventFieldIsRequired)
1267                    .with_metadata(HecResponseMetadata::InvalidEventNumber(event)),
1268            ),
1269            ApiError::BadRequest => empty_response(StatusCode::BAD_REQUEST),
1270            ApiError::ServiceUnavailable => response_json(
1271                StatusCode::SERVICE_UNAVAILABLE,
1272                splunk_response::SERVER_IS_BUSY,
1273            ),
1274            ApiError::AckIsDisabled => {
1275                response_json(StatusCode::BAD_REQUEST, splunk_response::ACK_IS_DISABLED)
1276            }
1277        },))
1278    } else {
1279        Err(rejection)
1280    }
1281}
1282
1283/// Response without body
1284fn empty_response(code: StatusCode) -> Response {
1285    let mut res = Response::default();
1286    *res.status_mut() = code;
1287    res
1288}
1289
1290/// Response with body
1291fn response_json(code: StatusCode, body: impl Serialize) -> Response {
1292    warp::reply::with_status(warp::reply::json(&body), code).into_response()
1293}
1294
1295#[cfg(feature = "sinks-splunk_hec")]
1296#[cfg(test)]
1297mod tests {
1298    use std::{net::SocketAddr, num::NonZeroU64};
1299
1300    use chrono::{TimeZone, Utc};
1301    use futures_util::Stream;
1302    use http::Uri;
1303    use reqwest::{RequestBuilder, Response};
1304    use serde::Deserialize;
1305    use vector_lib::{
1306        codecs::{
1307            BytesDecoderConfig, JsonSerializerConfig, TextSerializerConfig,
1308            decoding::DeserializerConfig,
1309        },
1310        event::EventStatus,
1311        schema::Definition,
1312        sensitive_string::SensitiveString,
1313    };
1314    use vrl::path::PathPrefix;
1315
1316    use super::*;
1317    use crate::{
1318        SourceSender,
1319        codecs::{DecodingConfig, EncodingConfig},
1320        components::validation::prelude::*,
1321        config::{SinkConfig, SinkContext, SourceConfig, SourceContext, log_schema},
1322        event::{Event, LogEvent},
1323        sinks::{
1324            Healthcheck, VectorSink,
1325            splunk_hec::logs::config::HecLogsSinkConfig,
1326            util::{BatchConfig, Compression, TowerRequestConfig},
1327        },
1328        sources::splunk_hec::acknowledgements::{HecAckStatusRequest, HecAckStatusResponse},
1329        test_util::{
1330            collect_n,
1331            components::{
1332                COMPONENT_ERROR_TAGS, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance,
1333                assert_source_error,
1334            },
1335            next_addr, wait_for_tcp,
1336        },
1337    };
1338
1339    #[test]
1340    fn generate_config() {
1341        crate::test_util::test_generate_config::<SplunkConfig>();
1342    }
1343
1344    /// Splunk token
1345    const TOKEN: &str = "token";
1346    const VALID_TOKENS: &[&str; 2] = &[TOKEN, "secondary-token"];
1347
1348    async fn source(
1349        acknowledgements: Option<HecAcknowledgementsConfig>,
1350    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
1351        source_with(Some(TOKEN.to_owned().into()), None, acknowledgements, false).await
1352    }
1353
1354    async fn source_with(
1355        token: Option<SensitiveString>,
1356        valid_tokens: Option<&[&str]>,
1357        acknowledgements: Option<HecAcknowledgementsConfig>,
1358        store_hec_token: bool,
1359    ) -> (impl Stream<Item = Event> + Unpin + use<>, SocketAddr) {
1360        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1361        let address = next_addr();
1362        let valid_tokens =
1363            valid_tokens.map(|tokens| tokens.iter().map(|v| v.to_string().into()).collect());
1364        let cx = SourceContext::new_test(sender, None);
1365        tokio::spawn(async move {
1366            SplunkConfig {
1367                address,
1368                token,
1369                valid_tokens,
1370                tls: None,
1371                acknowledgements: acknowledgements.unwrap_or_default(),
1372                store_hec_token,
1373                log_namespace: None,
1374                keepalive: Default::default(),
1375            }
1376            .build(cx)
1377            .await
1378            .unwrap()
1379            .await
1380            .unwrap()
1381        });
1382        wait_for_tcp(address).await;
1383        (recv, address)
1384    }
1385
1386    async fn sink(
1387        address: SocketAddr,
1388        encoding: EncodingConfig,
1389        compression: Compression,
1390    ) -> (VectorSink, Healthcheck) {
1391        HecLogsSinkConfig {
1392            default_token: TOKEN.to_owned().into(),
1393            endpoint: format!("http://{address}"),
1394            host_key: None,
1395            indexed_fields: vec![],
1396            index: None,
1397            sourcetype: None,
1398            source: None,
1399            encoding,
1400            compression,
1401            batch: BatchConfig::default(),
1402            request: TowerRequestConfig::default(),
1403            tls: None,
1404            acknowledgements: Default::default(),
1405            timestamp_nanos_key: None,
1406            timestamp_key: None,
1407            auto_extract_timestamp: None,
1408            endpoint_target: Default::default(),
1409        }
1410        .build(SinkContext::default())
1411        .await
1412        .unwrap()
1413    }
1414
1415    async fn start(
1416        encoding: EncodingConfig,
1417        compression: Compression,
1418        acknowledgements: Option<HecAcknowledgementsConfig>,
1419    ) -> (VectorSink, impl Stream<Item = Event> + Unpin) {
1420        let (source, address) = source(acknowledgements).await;
1421        let (sink, health) = sink(address, encoding, compression).await;
1422        assert!(health.await.is_ok());
1423        (sink, source)
1424    }
1425
1426    async fn channel_n(
1427        messages: Vec<impl Into<String> + Send + 'static>,
1428        sink: VectorSink,
1429        source: impl Stream<Item = Event> + Unpin,
1430    ) -> Vec<Event> {
1431        let n = messages.len();
1432
1433        tokio::spawn(async move {
1434            sink.run_events(
1435                messages
1436                    .into_iter()
1437                    .map(|s| Event::Log(LogEvent::from(s.into()))),
1438            )
1439            .await
1440            .unwrap();
1441        });
1442
1443        let events = collect_n(source, n).await;
1444        assert_eq!(n, events.len());
1445
1446        events
1447    }
1448
1449    #[derive(Clone, Copy, Debug)]
1450    enum Channel<'a> {
1451        Header(&'a str),
1452        QueryParam(&'a str),
1453    }
1454
1455    #[derive(Default)]
1456    struct SendWithOpts<'a> {
1457        channel: Option<Channel<'a>>,
1458        forwarded_for: Option<String>,
1459    }
1460
1461    async fn post(address: SocketAddr, api: &str, message: &str) -> u16 {
1462        let channel = Channel::Header("channel");
1463        let options = SendWithOpts {
1464            channel: Some(channel),
1465            forwarded_for: None,
1466        };
1467        send_with(address, api, message, TOKEN, &options).await
1468    }
1469
1470    fn build_request(
1471        address: SocketAddr,
1472        api: &str,
1473        message: &str,
1474        token: &str,
1475        opts: &SendWithOpts<'_>,
1476    ) -> RequestBuilder {
1477        let mut b = reqwest::Client::new()
1478            .post(format!("http://{address}/{api}"))
1479            .header("Authorization", format!("Splunk {token}"));
1480
1481        b = match opts.channel {
1482            Some(c) => match c {
1483                Channel::Header(v) => b.header("x-splunk-request-channel", v),
1484                Channel::QueryParam(v) => b.query(&[("channel", v)]),
1485            },
1486            None => b,
1487        };
1488
1489        b = match &opts.forwarded_for {
1490            Some(f) => b.header("X-Forwarded-For", f),
1491            None => b,
1492        };
1493
1494        b.body(message.to_owned())
1495    }
1496
1497    async fn send_with(
1498        address: SocketAddr,
1499        api: &str,
1500        message: &str,
1501        token: &str,
1502        opts: &SendWithOpts<'_>,
1503    ) -> u16 {
1504        let b = build_request(address, api, message, token, opts);
1505        b.send().await.unwrap().status().as_u16()
1506    }
1507
1508    async fn send_with_response(
1509        address: SocketAddr,
1510        api: &str,
1511        message: &str,
1512        token: &str,
1513        opts: &SendWithOpts<'_>,
1514    ) -> Response {
1515        let b = build_request(address, api, message, token, opts);
1516        b.send().await.unwrap()
1517    }
1518
1519    #[tokio::test]
1520    async fn no_compression_text_event() {
1521        let message = "gzip_text_event";
1522        let (sink, source) = start(
1523            TextSerializerConfig::default().into(),
1524            Compression::None,
1525            None,
1526        )
1527        .await;
1528
1529        let event = channel_n(vec![message], sink, source).await.remove(0);
1530
1531        assert_eq!(
1532            event.as_log()[log_schema().message_key().unwrap().to_string()],
1533            message.into()
1534        );
1535        assert!(event.as_log().get_timestamp().is_some());
1536        assert_eq!(
1537            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1538            "splunk_hec".into()
1539        );
1540        assert!(event.metadata().splunk_hec_token().is_none());
1541    }
1542
1543    #[tokio::test]
1544    async fn one_simple_text_event() {
1545        let message = "one_simple_text_event";
1546        let (sink, source) = start(
1547            TextSerializerConfig::default().into(),
1548            Compression::gzip_default(),
1549            None,
1550        )
1551        .await;
1552
1553        let event = channel_n(vec![message], sink, source).await.remove(0);
1554
1555        assert_eq!(
1556            event.as_log()[log_schema().message_key().unwrap().to_string()],
1557            message.into()
1558        );
1559        assert!(event.as_log().get_timestamp().is_some());
1560        assert_eq!(
1561            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1562            "splunk_hec".into()
1563        );
1564        assert!(event.metadata().splunk_hec_token().is_none());
1565    }
1566
1567    #[tokio::test]
1568    async fn multiple_simple_text_event() {
1569        let n = 200;
1570        let (sink, source) = start(
1571            TextSerializerConfig::default().into(),
1572            Compression::None,
1573            None,
1574        )
1575        .await;
1576
1577        let messages = (0..n)
1578            .map(|i| format!("multiple_simple_text_event_{i}"))
1579            .collect::<Vec<_>>();
1580        let events = channel_n(messages.clone(), sink, source).await;
1581
1582        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1583            assert_eq!(
1584                event.as_log()[log_schema().message_key().unwrap().to_string()],
1585                msg.into()
1586            );
1587            assert!(event.as_log().get_timestamp().is_some());
1588            assert_eq!(
1589                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1590                "splunk_hec".into()
1591            );
1592            assert!(event.metadata().splunk_hec_token().is_none());
1593        }
1594    }
1595
1596    #[tokio::test]
1597    async fn one_simple_json_event() {
1598        let message = "one_simple_json_event";
1599        let (sink, source) = start(
1600            JsonSerializerConfig::default().into(),
1601            Compression::gzip_default(),
1602            None,
1603        )
1604        .await;
1605
1606        let event = channel_n(vec![message], sink, source).await.remove(0);
1607
1608        assert_eq!(
1609            event.as_log()[log_schema().message_key().unwrap().to_string()],
1610            message.into()
1611        );
1612        assert!(event.as_log().get_timestamp().is_some());
1613        assert_eq!(
1614            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1615            "splunk_hec".into()
1616        );
1617        assert!(event.metadata().splunk_hec_token().is_none());
1618    }
1619
1620    #[tokio::test]
1621    async fn multiple_simple_json_event() {
1622        let n = 200;
1623        let (sink, source) = start(
1624            JsonSerializerConfig::default().into(),
1625            Compression::gzip_default(),
1626            None,
1627        )
1628        .await;
1629
1630        let messages = (0..n)
1631            .map(|i| format!("multiple_simple_json_event{i}"))
1632            .collect::<Vec<_>>();
1633        let events = channel_n(messages.clone(), sink, source).await;
1634
1635        for (msg, event) in messages.into_iter().zip(events.into_iter()) {
1636            assert_eq!(
1637                event.as_log()[log_schema().message_key().unwrap().to_string()],
1638                msg.into()
1639            );
1640            assert!(event.as_log().get_timestamp().is_some());
1641            assert_eq!(
1642                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1643                "splunk_hec".into()
1644            );
1645            assert!(event.metadata().splunk_hec_token().is_none());
1646        }
1647    }
1648
1649    #[tokio::test]
1650    async fn json_event() {
1651        let (sink, source) = start(
1652            JsonSerializerConfig::default().into(),
1653            Compression::gzip_default(),
1654            None,
1655        )
1656        .await;
1657
1658        let mut log = LogEvent::default();
1659        log.insert("greeting", "hello");
1660        log.insert("name", "bob");
1661        sink.run_events(vec![log.into()]).await.unwrap();
1662
1663        let event = collect_n(source, 1).await.remove(0).into_log();
1664        assert_eq!(event["greeting"], "hello".into());
1665        assert_eq!(event["name"], "bob".into());
1666        assert!(event.get_timestamp().is_some());
1667        assert_eq!(
1668            event[log_schema().source_type_key().unwrap().to_string()],
1669            "splunk_hec".into()
1670        );
1671        assert!(event.metadata().splunk_hec_token().is_none());
1672    }
1673
1674    #[tokio::test]
1675    async fn json_invalid_path_event() {
1676        let (sink, source) = start(
1677            JsonSerializerConfig::default().into(),
1678            Compression::gzip_default(),
1679            None,
1680        )
1681        .await;
1682
1683        let mut log = LogEvent::default();
1684        // Test with a field that would be considered an invalid path if it were to
1685        // be treated as a path and not a simple field name.
1686        log.insert(event_path!("(greeting | thing"), "hello");
1687        sink.run_events(vec![log.into()]).await.unwrap();
1688
1689        let event = collect_n(source, 1).await.remove(0).into_log();
1690        assert_eq!(
1691            event.get(event_path!("(greeting | thing")),
1692            Some(&Value::from("hello"))
1693        );
1694    }
1695
1696    #[tokio::test]
1697    async fn line_to_message() {
1698        let (sink, source) = start(
1699            JsonSerializerConfig::default().into(),
1700            Compression::gzip_default(),
1701            None,
1702        )
1703        .await;
1704
1705        let mut event = LogEvent::default();
1706        event.insert("line", "hello");
1707        sink.run_events(vec![event.into()]).await.unwrap();
1708
1709        let event = collect_n(source, 1).await.remove(0);
1710        assert_eq!(
1711            event.as_log()[log_schema().message_key().unwrap().to_string()],
1712            "hello".into()
1713        );
1714        assert!(event.metadata().splunk_hec_token().is_none());
1715    }
1716
1717    #[tokio::test]
1718    async fn raw() {
1719        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1720            let message = "raw";
1721            let (source, address) = source(None).await;
1722
1723            assert_eq!(200, post(address, "services/collector/raw", message).await);
1724
1725            let event = collect_n(source, 1).await.remove(0);
1726            assert_eq!(
1727                event.as_log()[log_schema().message_key().unwrap().to_string()],
1728                message.into()
1729            );
1730            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1731            assert!(event.as_log().get_timestamp().is_some());
1732            assert_eq!(
1733                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1734                "splunk_hec".into()
1735            );
1736            assert!(event.metadata().splunk_hec_token().is_none());
1737        })
1738        .await;
1739    }
1740
1741    #[tokio::test]
1742    async fn root() {
1743        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1744            let message = r#"{ "event": { "message": "root"} }"#;
1745            let (source, address) = source(None).await;
1746
1747            assert_eq!(200, post(address, "services/collector", message).await);
1748
1749            let event = collect_n(source, 1).await.remove(0);
1750            assert_eq!(
1751                event.as_log()[log_schema().message_key().unwrap().to_string()],
1752                "root".into()
1753            );
1754            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
1755            assert!(event.as_log().get_timestamp().is_some());
1756            assert_eq!(
1757                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1758                "splunk_hec".into()
1759            );
1760            assert!(event.metadata().splunk_hec_token().is_none());
1761        })
1762        .await;
1763    }
1764
1765    #[tokio::test]
1766    async fn channel_header() {
1767        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1768            let message = "raw";
1769            let (source, address) = source(None).await;
1770
1771            let opts = SendWithOpts {
1772                channel: Some(Channel::Header("guid")),
1773                forwarded_for: None,
1774            };
1775
1776            assert_eq!(
1777                200,
1778                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1779            );
1780
1781            let event = collect_n(source, 1).await.remove(0);
1782            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1783        })
1784        .await;
1785    }
1786
1787    #[tokio::test]
1788    async fn xff_header_raw() {
1789        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1790            let message = "raw";
1791            let (source, address) = source(None).await;
1792
1793            let opts = SendWithOpts {
1794                channel: Some(Channel::Header("guid")),
1795                forwarded_for: Some(String::from("10.0.0.1")),
1796            };
1797
1798            assert_eq!(
1799                200,
1800                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1801            );
1802
1803            let event = collect_n(source, 1).await.remove(0);
1804            assert_eq!(
1805                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1806                "10.0.0.1".into()
1807            );
1808        })
1809        .await;
1810    }
1811
1812    // Test helps to illustrate that a payload's `host` value should override an x-forwarded-for header
1813    #[tokio::test]
1814    async fn xff_header_event_with_host_field() {
1815        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1816            let message = r#"{"event":"first", "host": "10.1.0.2"}"#;
1817            let (source, address) = source(None).await;
1818
1819            let opts = SendWithOpts {
1820                channel: Some(Channel::Header("guid")),
1821                forwarded_for: Some(String::from("10.0.0.1")),
1822            };
1823
1824            assert_eq!(
1825                200,
1826                send_with(address, "services/collector/event", message, TOKEN, &opts).await
1827            );
1828
1829            let event = collect_n(source, 1).await.remove(0);
1830            assert_eq!(
1831                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1832                "10.1.0.2".into()
1833            );
1834        })
1835        .await;
1836    }
1837
1838    // Test helps to illustrate that a payload's `host` value should override an x-forwarded-for header
1839    #[tokio::test]
1840    async fn xff_header_event_without_host_field() {
1841        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1842            let message = r#"{"event":"first", "color": "blue"}"#;
1843            let (source, address) = source(None).await;
1844
1845            let opts = SendWithOpts {
1846                channel: Some(Channel::Header("guid")),
1847                forwarded_for: Some(String::from("10.0.0.1")),
1848            };
1849
1850            assert_eq!(
1851                200,
1852                send_with(address, "services/collector/event", message, TOKEN, &opts).await
1853            );
1854
1855            let event = collect_n(source, 1).await.remove(0);
1856            assert_eq!(
1857                event.as_log()[log_schema().host_key().unwrap().to_string().as_str()],
1858                "10.0.0.1".into()
1859            );
1860        })
1861        .await;
1862    }
1863
1864    #[tokio::test]
1865    async fn channel_query_param() {
1866        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1867            let message = "raw";
1868            let (source, address) = source(None).await;
1869
1870            let opts = SendWithOpts {
1871                channel: Some(Channel::QueryParam("guid")),
1872                forwarded_for: None,
1873            };
1874
1875            assert_eq!(
1876                200,
1877                send_with(address, "services/collector/raw", message, TOKEN, &opts).await
1878            );
1879
1880            let event = collect_n(source, 1).await.remove(0);
1881            assert_eq!(event.as_log()[&super::CHANNEL], "guid".into());
1882        })
1883        .await;
1884    }
1885
1886    #[tokio::test]
1887    async fn no_data() {
1888        let (_source, address) = source(None).await;
1889
1890        assert_eq!(400, post(address, "services/collector/event", "").await);
1891    }
1892
1893    #[tokio::test]
1894    async fn invalid_token() {
1895        assert_source_error(&COMPONENT_ERROR_TAGS, async {
1896            let (_source, address) = source(None).await;
1897            let opts = SendWithOpts {
1898                channel: Some(Channel::Header("channel")),
1899                forwarded_for: None,
1900            };
1901
1902            assert_eq!(
1903                401,
1904                send_with(address, "services/collector/event", "", "nope", &opts).await
1905            );
1906        })
1907        .await;
1908    }
1909
1910    #[tokio::test]
1911    async fn health_ignores_token() {
1912        let (_source, address) = source(None).await;
1913
1914        let res = reqwest::Client::new()
1915            .get(format!("http://{address}/services/collector/health"))
1916            .header("Authorization", format!("Splunk {}", "invalid token"))
1917            .send()
1918            .await
1919            .unwrap();
1920
1921        assert_eq!(200, res.status().as_u16());
1922    }
1923
1924    #[tokio::test]
1925    async fn health() {
1926        let (_source, address) = source(None).await;
1927
1928        let res = reqwest::Client::new()
1929            .get(format!("http://{address}/services/collector/health"))
1930            .send()
1931            .await
1932            .unwrap();
1933
1934        assert_eq!(200, res.status().as_u16());
1935    }
1936
1937    #[tokio::test]
1938    async fn secondary_token() {
1939        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1940            let message = r#"{"event":"first", "color": "blue"}"#;
1941            let (_source, address) = source_with(None, Some(VALID_TOKENS), None, false).await;
1942            let options = SendWithOpts {
1943                channel: None,
1944                forwarded_for: None,
1945            };
1946
1947            assert_eq!(
1948                200,
1949                send_with(
1950                    address,
1951                    "services/collector/event",
1952                    message,
1953                    VALID_TOKENS.get(1).unwrap(),
1954                    &options
1955                )
1956                .await
1957            );
1958        })
1959        .await;
1960    }
1961
1962    #[tokio::test]
1963    async fn event_service_token_passthrough_enabled() {
1964        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1965            let message = "passthrough_token_enabled";
1966            let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
1967            let (sink, health) = sink(
1968                address,
1969                TextSerializerConfig::default().into(),
1970                Compression::gzip_default(),
1971            )
1972            .await;
1973            assert!(health.await.is_ok());
1974
1975            let event = channel_n(vec![message], sink, source).await.remove(0);
1976
1977            assert_eq!(
1978                event.as_log()[log_schema().message_key().unwrap().to_string()],
1979                message.into()
1980            );
1981            assert_eq!(
1982                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
1983                TOKEN
1984            );
1985        })
1986        .await;
1987    }
1988
1989    #[tokio::test]
1990    async fn raw_service_token_passthrough_enabled() {
1991        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1992            let message = "raw";
1993            let (source, address) = source_with(None, Some(VALID_TOKENS), None, true).await;
1994
1995            assert_eq!(200, post(address, "services/collector/raw", message).await);
1996
1997            let event = collect_n(source, 1).await.remove(0);
1998            assert_eq!(
1999                event.as_log()[log_schema().message_key().unwrap().to_string()],
2000                message.into()
2001            );
2002            assert_eq!(event.as_log()[&super::CHANNEL], "channel".into());
2003            assert!(event.as_log().get_timestamp().is_some());
2004            assert_eq!(
2005                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2006                "splunk_hec".into()
2007            );
2008            assert_eq!(
2009                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2010                TOKEN
2011            );
2012        })
2013        .await;
2014    }
2015
2016    #[tokio::test]
2017    async fn no_authorization() {
2018        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2019            let message = "no_authorization";
2020            let (source, address) = source_with(None, None, None, false).await;
2021            let (sink, health) = sink(
2022                address,
2023                TextSerializerConfig::default().into(),
2024                Compression::gzip_default(),
2025            )
2026            .await;
2027            assert!(health.await.is_ok());
2028
2029            let event = channel_n(vec![message], sink, source).await.remove(0);
2030
2031            assert_eq!(
2032                event.as_log()[log_schema().message_key().unwrap().to_string()],
2033                message.into()
2034            );
2035            assert!(event.metadata().splunk_hec_token().is_none());
2036        })
2037        .await;
2038    }
2039
2040    #[tokio::test]
2041    async fn no_authorization_token_passthrough_enabled() {
2042        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2043            let message = "no_authorization";
2044            let (source, address) = source_with(None, None, None, true).await;
2045            let (sink, health) = sink(
2046                address,
2047                TextSerializerConfig::default().into(),
2048                Compression::gzip_default(),
2049            )
2050            .await;
2051            assert!(health.await.is_ok());
2052
2053            let event = channel_n(vec![message], sink, source).await.remove(0);
2054
2055            assert_eq!(
2056                event.as_log()[log_schema().message_key().unwrap().to_string()],
2057                message.into()
2058            );
2059            assert_eq!(
2060                &event.metadata().splunk_hec_token().as_ref().unwrap()[..],
2061                TOKEN
2062            );
2063        })
2064        .await;
2065    }
2066
2067    #[tokio::test]
2068    async fn partial() {
2069        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2070            let message = r#"{"event":"first"}{"event":"second""#;
2071            let (source, address) = source(None).await;
2072
2073            assert_eq!(
2074                400,
2075                post(address, "services/collector/event", message).await
2076            );
2077
2078            let event = collect_n(source, 1).await.remove(0);
2079            assert_eq!(
2080                event.as_log()[log_schema().message_key().unwrap().to_string()],
2081                "first".into()
2082            );
2083            assert!(event.as_log().get_timestamp().is_some());
2084            assert_eq!(
2085                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2086                "splunk_hec".into()
2087            );
2088        })
2089        .await;
2090    }
2091
2092    #[tokio::test]
2093    async fn handles_newlines() {
2094        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2095            let message = r#"
2096{"event":"first"}
2097        "#;
2098            let (source, address) = source(None).await;
2099
2100            assert_eq!(
2101                200,
2102                post(address, "services/collector/event", message).await
2103            );
2104
2105            let event = collect_n(source, 1).await.remove(0);
2106            assert_eq!(
2107                event.as_log()[log_schema().message_key().unwrap().to_string()],
2108                "first".into()
2109            );
2110            assert!(event.as_log().get_timestamp().is_some());
2111            assert_eq!(
2112                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2113                "splunk_hec".into()
2114            );
2115        })
2116        .await;
2117    }
2118
2119    #[tokio::test]
2120    async fn handles_spaces() {
2121        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2122            let message = r#" {"event":"first"} "#;
2123            let (source, address) = source(None).await;
2124
2125            assert_eq!(
2126                200,
2127                post(address, "services/collector/event", message).await
2128            );
2129
2130            let event = collect_n(source, 1).await.remove(0);
2131            assert_eq!(
2132                event.as_log()[log_schema().message_key().unwrap().to_string()],
2133                "first".into()
2134            );
2135            assert!(event.as_log().get_timestamp().is_some());
2136            assert_eq!(
2137                event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2138                "splunk_hec".into()
2139            );
2140        })
2141        .await;
2142    }
2143
2144    #[tokio::test]
2145    async fn handles_non_utf8() {
2146        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2147        let message = b" {\"event\": { \"non\": \"A non UTF8 character \xE4\", \"number\": 2, \"bool\": true } } ";
2148        let (source, address) = source(None).await;
2149
2150        let b = reqwest::Client::new()
2151            .post(format!(
2152                "http://{}/{}",
2153                address, "services/collector/event"
2154            ))
2155            .header("Authorization", format!("Splunk {TOKEN}"))
2156            .body::<&[u8]>(message);
2157
2158        assert_eq!(200, b.send().await.unwrap().status().as_u16());
2159
2160        let event = collect_n(source, 1).await.remove(0);
2161        assert_eq!(event.as_log()["non"], "A non UTF8 character �".into());
2162        assert_eq!(event.as_log()["number"], 2.into());
2163        assert_eq!(event.as_log()["bool"], true.into());
2164        assert!(event.as_log().get((lookup::PathPrefix::Event, log_schema().timestamp_key().unwrap())).is_some());
2165        assert_eq!(
2166            event.as_log()[log_schema().source_type_key().unwrap().to_string()],
2167            "splunk_hec".into()
2168        );
2169    }).await;
2170    }
2171
2172    #[tokio::test]
2173    async fn default() {
2174        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2175        let message = r#"{"event":"first","source":"main"}{"event":"second"}{"event":"third","source":"secondary"}"#;
2176        let (source, address) = source(None).await;
2177
2178        assert_eq!(
2179            200,
2180            post(address, "services/collector/event", message).await
2181        );
2182
2183        let events = collect_n(source, 3).await;
2184
2185        assert_eq!(
2186            events[0].as_log()[log_schema().message_key().unwrap().to_string()],
2187            "first".into()
2188        );
2189        assert_eq!(events[0].as_log()[&super::SOURCE], "main".into());
2190
2191        assert_eq!(
2192            events[1].as_log()[log_schema().message_key().unwrap().to_string()],
2193            "second".into()
2194        );
2195        assert_eq!(events[1].as_log()[&super::SOURCE], "main".into());
2196
2197        assert_eq!(
2198            events[2].as_log()[log_schema().message_key().unwrap().to_string()],
2199            "third".into()
2200        );
2201        assert_eq!(events[2].as_log()[&super::SOURCE], "secondary".into());
2202    }).await;
2203    }
2204
2205    #[test]
2206    fn parse_timestamps() {
2207        let cases = vec![
2208            Utc::now(),
2209            Utc.with_ymd_and_hms(1971, 11, 7, 1, 1, 1)
2210                .single()
2211                .expect("invalid timestamp"),
2212            Utc.with_ymd_and_hms(2011, 8, 5, 1, 1, 1)
2213                .single()
2214                .expect("invalid timestamp"),
2215            Utc.with_ymd_and_hms(2189, 11, 4, 2, 2, 2)
2216                .single()
2217                .expect("invalid timestamp"),
2218        ];
2219
2220        for case in cases {
2221            let sec = case.timestamp();
2222            let millis = case.timestamp_millis();
2223            let nano = case.timestamp_nanos_opt().expect("Timestamp out of range");
2224
2225            assert_eq!(parse_timestamp(sec).unwrap().timestamp(), case.timestamp());
2226            assert_eq!(
2227                parse_timestamp(millis).unwrap().timestamp_millis(),
2228                case.timestamp_millis()
2229            );
2230            assert_eq!(
2231                parse_timestamp(nano)
2232                    .unwrap()
2233                    .timestamp_nanos_opt()
2234                    .unwrap(),
2235                case.timestamp_nanos_opt().expect("Timestamp out of range")
2236            );
2237        }
2238
2239        assert!(parse_timestamp(-1).is_none());
2240    }
2241
2242    /// This test will fail once `warp` crate fixes support for
2243    /// custom connection listener, at that point this test can be
2244    /// modified to pass.
2245    /// https://github.com/vectordotdev/vector/issues/7097
2246    /// https://github.com/seanmonstar/warp/issues/830
2247    /// https://github.com/seanmonstar/warp/pull/713
2248    #[tokio::test]
2249    async fn host_test() {
2250        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
2251            let message = "for the host";
2252            let (sink, source) = start(
2253                TextSerializerConfig::default().into(),
2254                Compression::gzip_default(),
2255                None,
2256            )
2257            .await;
2258
2259            let event = channel_n(vec![message], sink, source).await.remove(0);
2260
2261            assert_eq!(
2262                event.as_log()[log_schema().message_key().unwrap().to_string()],
2263                message.into()
2264            );
2265            assert!(
2266                event
2267                    .as_log()
2268                    .get((PathPrefix::Event, log_schema().host_key().unwrap()))
2269                    .is_none()
2270            );
2271        })
2272        .await;
2273    }
2274
2275    #[derive(Deserialize)]
2276    struct HecAckEventResponse {
2277        text: String,
2278        code: u8,
2279        #[serde(rename = "ackId")]
2280        ack_id: u64,
2281    }
2282
2283    #[tokio::test]
2284    async fn ack_json_event() {
2285        let ack_config = HecAcknowledgementsConfig {
2286            enabled: Some(true),
2287            ..Default::default()
2288        };
2289        let (source, address) = source(Some(ack_config)).await;
2290        let event_message = r#"{"event":"first", "color": "blue"}{"event":"second"}"#;
2291        let opts = SendWithOpts {
2292            channel: Some(Channel::Header("guid")),
2293            forwarded_for: None,
2294        };
2295        let event_res = send_with_response(
2296            address,
2297            "services/collector/event",
2298            event_message,
2299            TOKEN,
2300            &opts,
2301        )
2302        .await
2303        .json::<HecAckEventResponse>()
2304        .await
2305        .unwrap();
2306        assert_eq!("Success", event_res.text.as_str());
2307        assert_eq!(0, event_res.code);
2308        _ = collect_n(source, 1).await;
2309
2310        let ack_message = serde_json::to_string(&HecAckStatusRequest {
2311            acks: vec![event_res.ack_id],
2312        })
2313        .unwrap();
2314        let ack_res = send_with_response(
2315            address,
2316            "services/collector/ack",
2317            ack_message.as_str(),
2318            TOKEN,
2319            &opts,
2320        )
2321        .await
2322        .json::<HecAckStatusResponse>()
2323        .await
2324        .unwrap();
2325        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2326    }
2327
2328    #[tokio::test]
2329    async fn ack_raw_event() {
2330        let ack_config = HecAcknowledgementsConfig {
2331            enabled: Some(true),
2332            ..Default::default()
2333        };
2334        let (source, address) = source(Some(ack_config)).await;
2335        let event_message = "raw event message";
2336        let opts = SendWithOpts {
2337            channel: Some(Channel::Header("guid")),
2338            forwarded_for: None,
2339        };
2340        let event_res = send_with_response(
2341            address,
2342            "services/collector/raw",
2343            event_message,
2344            TOKEN,
2345            &opts,
2346        )
2347        .await
2348        .json::<HecAckEventResponse>()
2349        .await
2350        .unwrap();
2351        assert_eq!("Success", event_res.text.as_str());
2352        assert_eq!(0, event_res.code);
2353        _ = collect_n(source, 1).await;
2354
2355        let ack_message = serde_json::to_string(&HecAckStatusRequest {
2356            acks: vec![event_res.ack_id],
2357        })
2358        .unwrap();
2359        let ack_res = send_with_response(
2360            address,
2361            "services/collector/ack",
2362            ack_message.as_str(),
2363            TOKEN,
2364            &opts,
2365        )
2366        .await
2367        .json::<HecAckStatusResponse>()
2368        .await
2369        .unwrap();
2370        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2371    }
2372
2373    #[tokio::test]
2374    async fn ack_repeat_ack_query() {
2375        let ack_config = HecAcknowledgementsConfig {
2376            enabled: Some(true),
2377            ..Default::default()
2378        };
2379        let (source, address) = source(Some(ack_config)).await;
2380        let event_message = "raw event message";
2381        let opts = SendWithOpts {
2382            channel: Some(Channel::Header("guid")),
2383            forwarded_for: None,
2384        };
2385        let event_res = send_with_response(
2386            address,
2387            "services/collector/raw",
2388            event_message,
2389            TOKEN,
2390            &opts,
2391        )
2392        .await
2393        .json::<HecAckEventResponse>()
2394        .await
2395        .unwrap();
2396        _ = collect_n(source, 1).await;
2397
2398        let ack_message = serde_json::to_string(&HecAckStatusRequest {
2399            acks: vec![event_res.ack_id],
2400        })
2401        .unwrap();
2402        let ack_res = send_with_response(
2403            address,
2404            "services/collector/ack",
2405            ack_message.as_str(),
2406            TOKEN,
2407            &opts,
2408        )
2409        .await
2410        .json::<HecAckStatusResponse>()
2411        .await
2412        .unwrap();
2413        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2414
2415        let ack_res = send_with_response(
2416            address,
2417            "services/collector/ack",
2418            ack_message.as_str(),
2419            TOKEN,
2420            &opts,
2421        )
2422        .await
2423        .json::<HecAckStatusResponse>()
2424        .await
2425        .unwrap();
2426        assert!(!ack_res.acks.get(&event_res.ack_id).unwrap());
2427    }
2428
2429    #[tokio::test]
2430    async fn ack_exceed_max_number_of_ack_channels() {
2431        let ack_config = HecAcknowledgementsConfig {
2432            enabled: Some(true),
2433            max_number_of_ack_channels: NonZeroU64::new(1).unwrap(),
2434            ..Default::default()
2435        };
2436
2437        let (_source, address) = source(Some(ack_config)).await;
2438        let mut opts = SendWithOpts {
2439            channel: Some(Channel::Header("guid")),
2440            forwarded_for: None,
2441        };
2442        assert_eq!(
2443            200,
2444            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2445        );
2446
2447        opts.channel = Some(Channel::Header("other-guid"));
2448        assert_eq!(
2449            503,
2450            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await
2451        );
2452        assert_eq!(
2453            503,
2454            send_with(
2455                address,
2456                "services/collector/event",
2457                r#"{"event":"first"}"#,
2458                TOKEN,
2459                &opts
2460            )
2461            .await
2462        );
2463    }
2464
2465    #[tokio::test]
2466    async fn ack_exceed_max_pending_acks_per_channel() {
2467        let ack_config = HecAcknowledgementsConfig {
2468            enabled: Some(true),
2469            max_pending_acks_per_channel: NonZeroU64::new(1).unwrap(),
2470            ..Default::default()
2471        };
2472
2473        let (source, address) = source(Some(ack_config)).await;
2474        let opts = SendWithOpts {
2475            channel: Some(Channel::Header("guid")),
2476            forwarded_for: None,
2477        };
2478        for _ in 0..5 {
2479            send_with(
2480                address,
2481                "services/collector/event",
2482                r#"{"event":"first"}"#,
2483                TOKEN,
2484                &opts,
2485            )
2486            .await;
2487        }
2488        for _ in 0..5 {
2489            send_with(address, "services/collector/raw", "message", TOKEN, &opts).await;
2490        }
2491        let event_res = send_with_response(
2492            address,
2493            "services/collector/event",
2494            r#"{"event":"this will be acked"}"#,
2495            TOKEN,
2496            &opts,
2497        )
2498        .await
2499        .json::<HecAckEventResponse>()
2500        .await
2501        .unwrap();
2502        _ = collect_n(source, 11).await;
2503
2504        let ack_message_dropped = serde_json::to_string(&HecAckStatusRequest {
2505            acks: (0..10).collect::<Vec<u64>>(),
2506        })
2507        .unwrap();
2508        let ack_res = send_with_response(
2509            address,
2510            "services/collector/ack",
2511            ack_message_dropped.as_str(),
2512            TOKEN,
2513            &opts,
2514        )
2515        .await
2516        .json::<HecAckStatusResponse>()
2517        .await
2518        .unwrap();
2519        assert!(ack_res.acks.values().all(|ack_status| !*ack_status));
2520
2521        let ack_message_acked = serde_json::to_string(&HecAckStatusRequest {
2522            acks: vec![event_res.ack_id],
2523        })
2524        .unwrap();
2525        let ack_res = send_with_response(
2526            address,
2527            "services/collector/ack",
2528            ack_message_acked.as_str(),
2529            TOKEN,
2530            &opts,
2531        )
2532        .await
2533        .json::<HecAckStatusResponse>()
2534        .await
2535        .unwrap();
2536        assert!(ack_res.acks.get(&event_res.ack_id).unwrap());
2537    }
2538
2539    #[tokio::test]
2540    async fn ack_service_accepts_parameterized_content_type() {
2541        let ack_config = HecAcknowledgementsConfig {
2542            enabled: Some(true),
2543            ..Default::default()
2544        };
2545        let (source, address) = source(Some(ack_config)).await;
2546        let opts = SendWithOpts {
2547            channel: Some(Channel::Header("guid")),
2548            forwarded_for: None,
2549        };
2550
2551        let event_res = send_with_response(
2552            address,
2553            "services/collector/event",
2554            r#"{"event":"param-test"}"#,
2555            TOKEN,
2556            &opts,
2557        )
2558        .await
2559        .json::<HecAckEventResponse>()
2560        .await
2561        .unwrap();
2562        let _ = collect_n(source, 1).await;
2563
2564        let body = serde_json::to_string(&HecAckStatusRequest {
2565            acks: vec![event_res.ack_id],
2566        })
2567        .unwrap();
2568
2569        let res = reqwest::Client::new()
2570            .post(format!("http://{address}/services/collector/ack"))
2571            .header("Authorization", format!("Splunk {TOKEN}"))
2572            .header("x-splunk-request-channel", "guid")
2573            .header("Content-Type", "application/json; some-random-text; hello")
2574            .body(body)
2575            .send()
2576            .await
2577            .unwrap();
2578
2579        assert_eq!(200, res.status().as_u16());
2580
2581        let _parsed: HecAckStatusResponse = res.json().await.unwrap();
2582    }
2583
2584    #[tokio::test]
2585    async fn event_service_acknowledgements_enabled_channel_required() {
2586        let message = r#"{"event":"first", "color": "blue"}"#;
2587        let ack_config = HecAcknowledgementsConfig {
2588            enabled: Some(true),
2589            ..Default::default()
2590        };
2591        let (_, address) = source(Some(ack_config)).await;
2592
2593        let opts = SendWithOpts {
2594            channel: None,
2595            forwarded_for: None,
2596        };
2597
2598        assert_eq!(
2599            400,
2600            send_with(address, "services/collector/event", message, TOKEN, &opts).await
2601        );
2602    }
2603
2604    #[tokio::test]
2605    async fn ack_service_acknowledgements_disabled() {
2606        let message = r#" {"acks":[0]} "#;
2607        let (_, address) = source(None).await;
2608
2609        let opts = SendWithOpts {
2610            channel: Some(Channel::Header("guid")),
2611            forwarded_for: None,
2612        };
2613
2614        assert_eq!(
2615            400,
2616            send_with(address, "services/collector/ack", message, TOKEN, &opts).await
2617        );
2618    }
2619
2620    #[test]
2621    fn output_schema_definition_vector_namespace() {
2622        let config = SplunkConfig {
2623            log_namespace: Some(true),
2624            ..Default::default()
2625        };
2626
2627        let definition = config
2628            .outputs(LogNamespace::Vector)
2629            .remove(0)
2630            .schema_definition(true);
2631
2632        let expected_definition = Definition::new_with_default_metadata(
2633            Kind::object(Collection::empty()).or_bytes(),
2634            [LogNamespace::Vector],
2635        )
2636        .with_meaning(OwnedTargetPath::event_root(), meaning::MESSAGE)
2637        .with_metadata_field(
2638            &owned_value_path!("vector", "source_type"),
2639            Kind::bytes(),
2640            None,
2641        )
2642        .with_metadata_field(
2643            &owned_value_path!("vector", "ingest_timestamp"),
2644            Kind::timestamp(),
2645            None,
2646        )
2647        .with_metadata_field(
2648            &owned_value_path!("splunk_hec", "host"),
2649            Kind::bytes(),
2650            Some("host"),
2651        )
2652        .with_metadata_field(
2653            &owned_value_path!("splunk_hec", "index"),
2654            Kind::bytes(),
2655            None,
2656        )
2657        .with_metadata_field(
2658            &owned_value_path!("splunk_hec", "source"),
2659            Kind::bytes(),
2660            Some("service"),
2661        )
2662        .with_metadata_field(
2663            &owned_value_path!("splunk_hec", "channel"),
2664            Kind::bytes(),
2665            None,
2666        )
2667        .with_metadata_field(
2668            &owned_value_path!("splunk_hec", "sourcetype"),
2669            Kind::bytes(),
2670            None,
2671        );
2672
2673        assert_eq!(definition, Some(expected_definition));
2674    }
2675
2676    #[test]
2677    fn output_schema_definition_legacy_namespace() {
2678        let config = SplunkConfig::default();
2679        let definitions = config
2680            .outputs(LogNamespace::Legacy)
2681            .remove(0)
2682            .schema_definition(true);
2683
2684        let expected_definition = Definition::new_with_default_metadata(
2685            Kind::object(Collection::empty()),
2686            [LogNamespace::Legacy],
2687        )
2688        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
2689        .with_event_field(
2690            &owned_value_path!("message"),
2691            Kind::bytes().or_undefined(),
2692            Some("message"),
2693        )
2694        .with_event_field(
2695            &owned_value_path!("line"),
2696            Kind::array(Collection::empty())
2697                .or_object(Collection::empty())
2698                .or_undefined(),
2699            None,
2700        )
2701        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
2702        .with_event_field(&owned_value_path!("splunk_channel"), Kind::bytes(), None)
2703        .with_event_field(&owned_value_path!("splunk_index"), Kind::bytes(), None)
2704        .with_event_field(
2705            &owned_value_path!("splunk_source"),
2706            Kind::bytes(),
2707            Some("service"),
2708        )
2709        .with_event_field(&owned_value_path!("splunk_sourcetype"), Kind::bytes(), None)
2710        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None);
2711
2712        assert_eq!(definitions, Some(expected_definition));
2713    }
2714
2715    impl ValidatableComponent for SplunkConfig {
2716        fn validation_configuration() -> ValidationConfiguration {
2717            let config = Self {
2718                address: default_socket_address(),
2719                ..Default::default()
2720            };
2721
2722            let listen_addr_http = format!("http://{}/services/collector/event", config.address);
2723            let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
2724
2725            let log_namespace: LogNamespace = config.log_namespace.unwrap_or_default().into();
2726            let framing = BytesDecoderConfig::new().into();
2727            let decoding = DeserializerConfig::Json(Default::default());
2728
2729            let external_resource = ExternalResource::new(
2730                ResourceDirection::Push,
2731                HttpResourceConfig::from_parts(uri, None).with_headers(HashMap::from([(
2732                    X_SPLUNK_REQUEST_CHANNEL.to_string(),
2733                    "channel".to_string(),
2734                )])),
2735                DecodingConfig::new(framing, decoding, false.into()),
2736            );
2737
2738            ValidationConfiguration::from_source(
2739                Self::NAME,
2740                log_namespace,
2741                vec![ComponentTestCaseConfig::from_source(
2742                    config,
2743                    None,
2744                    Some(external_resource),
2745                )],
2746            )
2747        }
2748    }
2749
2750    register_validatable_component!(SplunkConfig);
2751}