vector/sources/
http_server.rs

1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::{Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use http_serde;
7use tokio_util::codec::Decoder as _;
8use vector_lib::{
9    codecs::{
10        BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
11        NewlineDelimitedDecoderConfig,
12        decoding::{DeserializerConfig, FramingConfig},
13    },
14    config::{DataType, LegacyKey, LogNamespace},
15    configurable::configurable_component,
16    lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
17    schema::Definition,
18};
19use vrl::value::{Kind, kind::Collection};
20use warp::http::HeaderMap;
21
22use crate::{
23    codecs::{Decoder, DecodingConfig},
24    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
25    config::{
26        GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
27        SourceOutput,
28    },
29    event::Event,
30    http::KeepaliveConfig,
31    serde::{bool_or_struct, default_decoding},
32    sources::util::{
33        Encoding, HttpSource,
34        http::{HttpMethod, add_headers, add_query_parameters},
35    },
36    tls::TlsEnableableConfig,
37};
38
39/// Configuration for the `http` source.
40#[configurable_component(source("http", "Host an HTTP endpoint to receive logs."))]
41#[configurable(metadata(deprecated))]
42#[derive(Clone, Debug)]
43pub struct HttpConfig(SimpleHttpConfig);
44
45impl GenerateConfig for HttpConfig {
46    fn generate_config() -> toml::Value {
47        <SimpleHttpConfig as GenerateConfig>::generate_config()
48    }
49}
50
51#[async_trait::async_trait]
52#[typetag::serde(name = "http")]
53impl SourceConfig for HttpConfig {
54    async fn build(&self, cx: SourceContext) -> vector_lib::Result<super::Source> {
55        self.0.build(cx).await
56    }
57
58    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
59        self.0.outputs(global_log_namespace)
60    }
61
62    fn resources(&self) -> Vec<Resource> {
63        self.0.resources()
64    }
65
66    fn can_acknowledge(&self) -> bool {
67        self.0.can_acknowledge()
68    }
69}
70
71/// Configuration for the `http_server` source.
72#[configurable_component(source("http_server", "Host an HTTP endpoint to receive logs."))]
73#[derive(Clone, Debug)]
74pub struct SimpleHttpConfig {
75    /// The socket address to listen for connections on.
76    ///
77    /// It _must_ include a port.
78    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
79    #[configurable(metadata(docs::examples = "localhost:80"))]
80    address: SocketAddr,
81
82    /// The expected encoding of received data.
83    ///
84    /// For `json` and `ndjson` encodings, the fields of the JSON objects are output as separate fields.
85    #[configurable(deprecated)]
86    #[serde(default)]
87    encoding: Option<Encoding>,
88
89    /// A list of HTTP headers to include in the log event.
90    ///
91    /// Accepts the wildcard (`*`) character for headers matching a specified pattern.
92    ///
93    /// Specifying "*" results in all headers included in the log event.
94    ///
95    /// These headers are not included in the JSON payload if a field with a conflicting name exists.
96    #[serde(default)]
97    #[configurable(metadata(docs::examples = "User-Agent"))]
98    #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
99    #[configurable(metadata(docs::examples = "X-*"))]
100    #[configurable(metadata(docs::examples = "*"))]
101    headers: Vec<String>,
102
103    /// A list of URL query parameters to include in the log event.
104    ///
105    /// Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
106    ///
107    /// Specifying "*" results in all query parameters included in the log event.
108    ///
109    /// These override any values included in the body with conflicting names.
110    #[serde(default)]
111    #[configurable(metadata(docs::examples = "application"))]
112    #[configurable(metadata(docs::examples = "source"))]
113    #[configurable(metadata(docs::examples = "param*"))]
114    #[configurable(metadata(docs::examples = "*"))]
115    query_parameters: Vec<String>,
116
117    #[configurable(derived)]
118    auth: Option<HttpServerAuthConfig>,
119
120    /// Whether or not to treat the configured `path` as an absolute path.
121    ///
122    /// If set to `true`, only requests using the exact URL path specified in `path` are accepted. Otherwise,
123    /// requests sent to a URL path that starts with the value of `path` are accepted.
124    ///
125    /// With `strict_path` set to `false` and `path` set to `""`, the configured HTTP source accepts requests from
126    /// any URL path.
127    #[serde(default = "crate::serde::default_true")]
128    strict_path: bool,
129
130    /// The URL path on which log event POST requests are sent.
131    #[serde(default = "default_path")]
132    #[configurable(metadata(docs::examples = "/event/path"))]
133    #[configurable(metadata(docs::examples = "/logs"))]
134    path: String,
135
136    /// The event key in which the requested URL path used to send the request is stored.
137    #[serde(default = "default_path_key")]
138    #[configurable(metadata(docs::examples = "vector_http_path"))]
139    path_key: OptionalValuePath,
140
141    /// If set, the name of the log field used to add the remote IP to each event
142    #[serde(default = "default_host_key")]
143    #[configurable(metadata(docs::examples = "hostname"))]
144    host_key: OptionalValuePath,
145
146    /// Specifies the action of the HTTP request.
147    #[serde(default = "default_http_method")]
148    method: HttpMethod,
149
150    /// Specifies the HTTP response status code that will be returned on successful requests.
151    #[configurable(metadata(docs::examples = 202))]
152    #[configurable(metadata(docs::numeric_type = "uint"))]
153    #[serde(with = "http_serde::status_code")]
154    #[serde(default = "default_http_response_code")]
155    response_code: StatusCode,
156
157    #[configurable(derived)]
158    tls: Option<TlsEnableableConfig>,
159
160    #[configurable(derived)]
161    framing: Option<FramingConfig>,
162
163    #[configurable(derived)]
164    decoding: Option<DeserializerConfig>,
165
166    #[configurable(derived)]
167    #[serde(default, deserialize_with = "bool_or_struct")]
168    acknowledgements: SourceAcknowledgementsConfig,
169
170    /// The namespace to use for logs. This overrides the global setting.
171    #[configurable(metadata(docs::hidden))]
172    #[serde(default)]
173    log_namespace: Option<bool>,
174
175    #[configurable(derived)]
176    #[serde(default)]
177    keepalive: KeepaliveConfig,
178}
179
180impl SimpleHttpConfig {
181    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
182    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
183        let mut schema_definition = self
184            .decoding
185            .as_ref()
186            .unwrap_or(&default_decoding())
187            .schema_definition(log_namespace)
188            .with_source_metadata(
189                SimpleHttpConfig::NAME,
190                self.path_key.path.clone().map(LegacyKey::InsertIfEmpty),
191                &owned_value_path!("path"),
192                Kind::bytes(),
193                None,
194            )
195            // for metadata that is added to the events dynamically from the self.headers
196            .with_source_metadata(
197                SimpleHttpConfig::NAME,
198                None,
199                &owned_value_path!("headers"),
200                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
201                None,
202            )
203            // for metadata that is added to the events dynamically from the self.query_parameters
204            .with_source_metadata(
205                SimpleHttpConfig::NAME,
206                None,
207                &owned_value_path!("query_parameters"),
208                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
209                None,
210            )
211            .with_source_metadata(
212                SimpleHttpConfig::NAME,
213                self.host_key.path.clone().map(LegacyKey::Overwrite),
214                &owned_value_path!("host"),
215                Kind::bytes().or_undefined(),
216                None,
217            )
218            .with_standard_vector_source_metadata();
219
220        // for metadata that is added to the events dynamically from config options
221        if log_namespace == LogNamespace::Legacy {
222            schema_definition = schema_definition.unknown_fields(Kind::bytes());
223        }
224
225        schema_definition
226    }
227
228    fn get_decoding_config(&self) -> crate::Result<DecodingConfig> {
229        if self.encoding.is_some() && (self.framing.is_some() || self.decoding.is_some()) {
230            return Err("Using `encoding` is deprecated and does not have any effect when `decoding` or `framing` is provided. Configure `framing` and `decoding` instead.".into());
231        }
232
233        let (framing, decoding) = if let Some(encoding) = self.encoding {
234            match encoding {
235                Encoding::Text => (
236                    NewlineDelimitedDecoderConfig::new().into(),
237                    BytesDeserializerConfig::new().into(),
238                ),
239                Encoding::Json => (
240                    BytesDecoderConfig::new().into(),
241                    JsonDeserializerConfig::default().into(),
242                ),
243                Encoding::Ndjson => (
244                    NewlineDelimitedDecoderConfig::new().into(),
245                    JsonDeserializerConfig::default().into(),
246                ),
247                Encoding::Binary => (
248                    BytesDecoderConfig::new().into(),
249                    BytesDeserializerConfig::new().into(),
250                ),
251            }
252        } else {
253            let decoding = self.decoding.clone().unwrap_or_else(default_decoding);
254            let framing = self
255                .framing
256                .clone()
257                .unwrap_or_else(|| decoding.default_stream_framing());
258            (framing, decoding)
259        };
260
261        Ok(DecodingConfig::new(
262            framing,
263            decoding,
264            self.log_namespace.unwrap_or(false).into(),
265        ))
266    }
267}
268
269impl Default for SimpleHttpConfig {
270    fn default() -> Self {
271        Self {
272            address: "0.0.0.0:8080".parse().unwrap(),
273            encoding: None,
274            headers: Vec::new(),
275            query_parameters: Vec::new(),
276            tls: None,
277            auth: None,
278            path: default_path(),
279            path_key: default_path_key(),
280            host_key: default_host_key(),
281            method: default_http_method(),
282            response_code: default_http_response_code(),
283            strict_path: true,
284            framing: None,
285            decoding: Some(default_decoding()),
286            acknowledgements: SourceAcknowledgementsConfig::default(),
287            log_namespace: None,
288            keepalive: KeepaliveConfig::default(),
289        }
290    }
291}
292
293impl_generate_config_from_default!(SimpleHttpConfig);
294
295const fn default_http_method() -> HttpMethod {
296    HttpMethod::Post
297}
298
299fn default_path() -> String {
300    "/".to_string()
301}
302
303fn default_path_key() -> OptionalValuePath {
304    OptionalValuePath::from(owned_value_path!("path"))
305}
306
307fn default_host_key() -> OptionalValuePath {
308    OptionalValuePath::none()
309}
310
311const fn default_http_response_code() -> StatusCode {
312    StatusCode::OK
313}
314
315/// Removes duplicates from the list, and logs a `warn!()` for each duplicate removed.
316pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
317    list.sort();
318
319    let mut dedup = false;
320    for (idx, name) in list.iter().enumerate() {
321        if idx < list.len() - 1 && list[idx] == list[idx + 1] {
322            warn!(
323                "`{}` configuration contains duplicate entry for `{}`. Removing duplicate.",
324                list_name, name
325            );
326            dedup = true;
327        }
328    }
329
330    if dedup {
331        list.dedup();
332    }
333    list
334}
335
336/// Convert [`SocketAddr`] into a string, returning only the IP address.
337fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
338    addr.ip().to_string()
339}
340
341#[derive(Clone)]
342pub enum HttpConfigParamKind {
343    Glob(glob::Pattern),
344    Exact(String),
345}
346
347pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
348    list.iter()
349        .map(|s| match s.contains('*') {
350            true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
351            false => Ok(HttpConfigParamKind::Exact(s.to_string())),
352        })
353        .collect::<crate::Result<Vec<HttpConfigParamKind>>>()
354}
355
356#[async_trait::async_trait]
357#[typetag::serde(name = "http_server")]
358impl SourceConfig for SimpleHttpConfig {
359    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
360        let log_namespace = cx.log_namespace(self.log_namespace);
361        let decoder = self
362            .get_decoding_config()?
363            .build()?
364            .with_log_namespace(log_namespace);
365
366        let source = SimpleHttpSource {
367            headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
368            query_parameters: build_param_matcher(&remove_duplicates(
369                self.query_parameters.clone(),
370                "query_parameters",
371            ))?,
372            path_key: self.path_key.clone(),
373            host_key: self.host_key.clone(),
374            decoder,
375            log_namespace,
376        };
377        source.run(
378            self.address,
379            self.path.as_str(),
380            self.method,
381            self.response_code,
382            self.strict_path,
383            self.tls.as_ref(),
384            self.auth.as_ref(),
385            cx,
386            self.acknowledgements,
387            self.keepalive.clone(),
388        )
389    }
390
391    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
392        // There is a global and per-source `log_namespace` config.
393        // The source config overrides the global setting and is merged here.
394        let log_namespace = global_log_namespace.merge(self.log_namespace);
395
396        let schema_definition = self.schema_definition(log_namespace);
397
398        vec![SourceOutput::new_maybe_logs(
399            self.decoding
400                .as_ref()
401                .map(|d| d.output_type())
402                .unwrap_or(DataType::Log),
403            schema_definition,
404        )]
405    }
406
407    fn resources(&self) -> Vec<Resource> {
408        vec![Resource::tcp(self.address)]
409    }
410
411    fn can_acknowledge(&self) -> bool {
412        true
413    }
414}
415
416#[derive(Clone)]
417struct SimpleHttpSource {
418    headers: Vec<HttpConfigParamKind>,
419    query_parameters: Vec<HttpConfigParamKind>,
420    path_key: OptionalValuePath,
421    host_key: OptionalValuePath,
422    decoder: Decoder,
423    log_namespace: LogNamespace,
424}
425
426impl HttpSource for SimpleHttpSource {
427    /// Enriches the log events with metadata for the `request_path` and for each of the headers.
428    /// Non-log events are skipped.
429    fn enrich_events(
430        &self,
431        events: &mut [Event],
432        request_path: &str,
433        headers: &HeaderMap,
434        query_parameters: &HashMap<String, String>,
435        source_ip: Option<&SocketAddr>,
436    ) {
437        let now = Utc::now();
438        for event in events.iter_mut() {
439            match event {
440                Event::Log(log) => {
441                    // add request_path to each event
442                    self.log_namespace.insert_source_metadata(
443                        SimpleHttpConfig::NAME,
444                        log,
445                        self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
446                        path!("path"),
447                        request_path.to_owned(),
448                    );
449
450                    self.log_namespace.insert_standard_vector_source_metadata(
451                        log,
452                        SimpleHttpConfig::NAME,
453                        now,
454                    );
455
456                    if let Some(addr) = source_ip {
457                        self.log_namespace.insert_source_metadata(
458                            SimpleHttpConfig::NAME,
459                            log,
460                            self.host_key.path.as_ref().map(LegacyKey::Overwrite),
461                            path!("host"),
462                            socket_addr_to_ip_string(addr),
463                        );
464                    }
465                }
466                _ => {
467                    continue;
468                }
469            }
470        }
471
472        add_headers(
473            events,
474            &self.headers,
475            headers,
476            self.log_namespace,
477            SimpleHttpConfig::NAME,
478        );
479
480        add_query_parameters(
481            events,
482            &self.query_parameters,
483            query_parameters,
484            self.log_namespace,
485            SimpleHttpConfig::NAME,
486        );
487    }
488
489    fn build_events(
490        &self,
491        body: Bytes,
492        _header_map: &HeaderMap,
493        _query_parameters: &HashMap<String, String>,
494        _request_path: &str,
495    ) -> Result<Vec<Event>, ErrorMessage> {
496        let mut decoder = self.decoder.clone();
497        let mut events = Vec::new();
498        let mut bytes = BytesMut::new();
499        bytes.extend_from_slice(&body);
500
501        loop {
502            match decoder.decode_eof(&mut bytes) {
503                Ok(Some((next, _))) => {
504                    events.extend(next);
505                }
506                Ok(None) => break,
507                Err(error) => {
508                    // Error is logged / emitted by `crate::codecs::Decoder`, no further
509                    // handling is needed here
510                    return Err(ErrorMessage::new(
511                        StatusCode::BAD_REQUEST,
512                        format!("Failed decoding body: {error}"),
513                    ));
514                }
515            }
516        }
517
518        Ok(events)
519    }
520
521    fn enable_source_ip(&self) -> bool {
522        self.host_key.path.is_some()
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use std::{io::Write, net::SocketAddr, str::FromStr};
529
530    use flate2::{
531        Compression,
532        write::{GzEncoder, ZlibEncoder},
533    };
534    use futures::Stream;
535    use headers::{Authorization, authorization::Credentials};
536    use http::{HeaderMap, Method, StatusCode, Uri, header::AUTHORIZATION};
537    use similar_asserts::assert_eq;
538    use vector_lib::{
539        codecs::{
540            BytesDecoderConfig, JsonDeserializerConfig,
541            decoding::{DeserializerConfig, FramingConfig},
542        },
543        config::LogNamespace,
544        event::LogEvent,
545        lookup::{
546            OwnedTargetPath, PathPrefix, event_path, lookup_v2::OptionalValuePath, owned_value_path,
547        },
548        schema::Definition,
549    };
550    use vrl::value::{Kind, ObjectMap, kind::Collection};
551
552    use super::{SimpleHttpConfig, remove_duplicates};
553    use crate::{
554        SourceSender,
555        common::http::server_auth::HttpServerAuthConfig,
556        components::validation::prelude::*,
557        config::{SourceConfig, SourceContext, log_schema},
558        event::{Event, EventStatus, Value},
559        sources::http_server::HttpMethod,
560        test_util::{
561            components::{self, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
562            next_addr, spawn_collect_n, wait_for_tcp,
563        },
564    };
565
566    #[test]
567    fn generate_config() {
568        crate::test_util::test_generate_config::<SimpleHttpConfig>();
569    }
570
571    #[allow(clippy::too_many_arguments)]
572    async fn source<'a>(
573        headers: Vec<String>,
574        query_parameters: Vec<String>,
575        path_key: &'a str,
576        host_key: &'a str,
577        path: &'a str,
578        method: &'a str,
579        response_code: StatusCode,
580        auth: Option<HttpServerAuthConfig>,
581        strict_path: bool,
582        status: EventStatus,
583        acknowledgements: bool,
584        framing: Option<FramingConfig>,
585        decoding: Option<DeserializerConfig>,
586    ) -> (impl Stream<Item = Event> + 'a, SocketAddr) {
587        let (sender, recv) = SourceSender::new_test_finalize(status);
588        let address = next_addr();
589        let path = path.to_owned();
590        let host_key = OptionalValuePath::from(owned_value_path!(host_key));
591        let path_key = OptionalValuePath::from(owned_value_path!(path_key));
592        let context = SourceContext::new_test(sender, None);
593        let method = match Method::from_str(method).unwrap() {
594            Method::GET => HttpMethod::Get,
595            Method::POST => HttpMethod::Post,
596            _ => HttpMethod::Post,
597        };
598
599        tokio::spawn(async move {
600            SimpleHttpConfig {
601                address,
602                headers,
603                encoding: None,
604                query_parameters,
605                response_code,
606                tls: None,
607                auth,
608                strict_path,
609                path_key,
610                host_key,
611                path,
612                method,
613                framing,
614                decoding,
615                acknowledgements: acknowledgements.into(),
616                log_namespace: None,
617                keepalive: Default::default(),
618            }
619            .build(context)
620            .await
621            .unwrap()
622            .await
623            .unwrap();
624        });
625        wait_for_tcp(address).await;
626        (recv, address)
627    }
628
629    async fn send(address: SocketAddr, body: &str) -> u16 {
630        reqwest::Client::new()
631            .post(format!("http://{address}/"))
632            .body(body.to_owned())
633            .send()
634            .await
635            .unwrap()
636            .status()
637            .as_u16()
638    }
639
640    async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 {
641        reqwest::Client::new()
642            .post(format!("http://{address}/"))
643            .headers(headers)
644            .body(body.to_owned())
645            .send()
646            .await
647            .unwrap()
648            .status()
649            .as_u16()
650    }
651
652    async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 {
653        reqwest::Client::new()
654            .post(format!("http://{address}?{query}"))
655            .body(body.to_owned())
656            .send()
657            .await
658            .unwrap()
659            .status()
660            .as_u16()
661    }
662
663    async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 {
664        reqwest::Client::new()
665            .post(format!("http://{address}{path}"))
666            .body(body.to_owned())
667            .send()
668            .await
669            .unwrap()
670            .status()
671            .as_u16()
672    }
673
674    async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 {
675        let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap();
676        reqwest::Client::new()
677            .request(method, format!("http://{address}{path}"))
678            .body(body.to_owned())
679            .send()
680            .await
681            .unwrap()
682            .status()
683            .as_u16()
684    }
685
686    async fn send_bytes(address: SocketAddr, body: Vec<u8>, headers: HeaderMap) -> u16 {
687        reqwest::Client::new()
688            .post(format!("http://{address}/"))
689            .headers(headers)
690            .body(body)
691            .send()
692            .await
693            .unwrap()
694            .status()
695            .as_u16()
696    }
697
698    async fn spawn_ok_collect_n(
699        send: impl std::future::Future<Output = u16> + Send + 'static,
700        rx: impl Stream<Item = Event> + Unpin,
701        n: usize,
702    ) -> Vec<Event> {
703        spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await
704    }
705
706    #[tokio::test]
707    async fn http_multiline_text() {
708        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
709            let body = "test body\ntest body 2";
710
711            let (rx, addr) = source(
712                vec![],
713                vec![],
714                "http_path",
715                "remote_ip",
716                "/",
717                "POST",
718                StatusCode::OK,
719                None,
720                true,
721                EventStatus::Delivered,
722                true,
723                None,
724                None,
725            )
726            .await;
727
728            spawn_ok_collect_n(send(addr, body), rx, 2).await
729        })
730        .await;
731
732        {
733            let event = events.remove(0);
734            let log = event.as_log();
735            assert_eq!(*log.get_message().unwrap(), "test body".into());
736            assert!(log.get_timestamp().is_some());
737            assert_eq!(
738                *log.get_source_type().unwrap(),
739                SimpleHttpConfig::NAME.into()
740            );
741            assert_eq!(log["http_path"], "/".into());
742            assert_event_metadata(log).await;
743        }
744        {
745            let event = events.remove(0);
746            let log = event.as_log();
747            assert_eq!(*log.get_message().unwrap(), "test body 2".into());
748            assert_event_metadata(log).await;
749        }
750    }
751
752    #[tokio::test]
753    async fn http_multiline_text2() {
754        //same as above test but with a newline at the end
755        let body = "test body\ntest body 2\n";
756
757        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
758            let (rx, addr) = source(
759                vec![],
760                vec![],
761                "http_path",
762                "remote_ip",
763                "/",
764                "POST",
765                StatusCode::OK,
766                None,
767                true,
768                EventStatus::Delivered,
769                true,
770                None,
771                None,
772            )
773            .await;
774
775            spawn_ok_collect_n(send(addr, body), rx, 2).await
776        })
777        .await;
778
779        {
780            let event = events.remove(0);
781            let log = event.as_log();
782            assert_eq!(*log.get_message().unwrap(), "test body".into());
783            assert_event_metadata(log).await;
784        }
785        {
786            let event = events.remove(0);
787            let log = event.as_log();
788            assert_eq!(*log.get_message().unwrap(), "test body 2".into());
789            assert_event_metadata(log).await;
790        }
791    }
792
793    #[tokio::test]
794    async fn http_bytes_codec_preserves_newlines() {
795        let body = "foo\nbar";
796
797        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
798            let (rx, addr) = source(
799                vec![],
800                vec![],
801                "http_path",
802                "remote_ip",
803                "/",
804                "POST",
805                StatusCode::OK,
806                None,
807                true,
808                EventStatus::Delivered,
809                true,
810                Some(BytesDecoderConfig::new().into()),
811                None,
812            )
813            .await;
814
815            spawn_ok_collect_n(send(addr, body), rx, 1).await
816        })
817        .await;
818
819        assert_eq!(events.len(), 1);
820
821        {
822            let event = events.remove(0);
823            let log = event.as_log();
824            assert_eq!(*log.get_message().unwrap(), "foo\nbar".into());
825            assert_event_metadata(log).await;
826        }
827    }
828
829    #[tokio::test]
830    async fn http_json_parsing() {
831        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
832            let (rx, addr) = source(
833                vec![],
834                vec![],
835                "http_path",
836                "remote_ip",
837                "/",
838                "POST",
839                StatusCode::OK,
840                None,
841                true,
842                EventStatus::Delivered,
843                true,
844                None,
845                Some(JsonDeserializerConfig::default().into()),
846            )
847            .await;
848
849            spawn_collect_n(
850                async move {
851                    assert_eq!(400, send(addr, "{").await); //malformed
852                    assert_eq!(400, send(addr, r#"{"key"}"#).await); //key without value
853
854                    assert_eq!(200, send(addr, "{}").await); //can be one object or array of objects
855                    assert_eq!(200, send(addr, "[{},{},{}]").await);
856                },
857                rx,
858                2,
859            )
860            .await
861        })
862        .await;
863
864        assert!(events.remove(1).as_log().get_timestamp().is_some());
865        assert!(events.remove(0).as_log().get_timestamp().is_some());
866    }
867
868    #[tokio::test]
869    async fn http_json_values() {
870        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
871            let (rx, addr) = source(
872                vec![],
873                vec![],
874                "http_path",
875                "remote_ip",
876                "/",
877                "POST",
878                StatusCode::OK,
879                None,
880                true,
881                EventStatus::Delivered,
882                true,
883                None,
884                Some(JsonDeserializerConfig::default().into()),
885            )
886            .await;
887
888            spawn_collect_n(
889                async move {
890                    assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await);
891                    assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await);
892                },
893                rx,
894                2,
895            )
896            .await
897        })
898        .await;
899
900        {
901            let event = events.remove(0);
902            let log = event.as_log();
903            assert_eq!(log["key"], "value".into());
904            assert_event_metadata(log).await;
905        }
906        {
907            let event = events.remove(0);
908            let log = event.as_log();
909            assert_eq!(log["key2"], "value2".into());
910            assert_event_metadata(log).await;
911        }
912    }
913
914    #[tokio::test]
915    async fn http_json_dotted_keys() {
916        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
917            let (rx, addr) = source(
918                vec![],
919                vec![],
920                "http_path",
921                "remote_ip",
922                "/",
923                "POST",
924                StatusCode::OK,
925                None,
926                true,
927                EventStatus::Delivered,
928                true,
929                None,
930                Some(JsonDeserializerConfig::default().into()),
931            )
932            .await;
933
934            spawn_collect_n(
935                async move {
936                    assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await);
937                    assert_eq!(
938                        200,
939                        send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await
940                    );
941                },
942                rx,
943                2,
944            )
945            .await
946        })
947        .await;
948
949        {
950            let event = events.remove(0);
951            let log = event.as_log();
952            assert_eq!(
953                log.get(event_path!("dotted.key")).unwrap(),
954                &Value::from("value")
955            );
956        }
957        {
958            let event = events.remove(0);
959            let log = event.as_log();
960            let mut map = ObjectMap::new();
961            map.insert("dotted.key2".into(), Value::from("value2"));
962            assert_eq!(log["nested"], map.into());
963        }
964    }
965
966    #[tokio::test]
967    async fn http_ndjson() {
968        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
969            let (rx, addr) = source(
970                vec![],
971                vec![],
972                "http_path",
973                "remote_ip",
974                "/",
975                "POST",
976                StatusCode::OK,
977                None,
978                true,
979                EventStatus::Delivered,
980                true,
981                None,
982                Some(JsonDeserializerConfig::default().into()),
983            )
984            .await;
985
986            spawn_collect_n(
987                async move {
988                    assert_eq!(
989                        200,
990                        send(addr, r#"[{"key1":"value1"},{"key2":"value2"}]"#).await
991                    );
992
993                    assert_eq!(
994                        200,
995                        send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await
996                    );
997                },
998                rx,
999                4,
1000            )
1001            .await
1002        })
1003        .await;
1004
1005        {
1006            let event = events.remove(0);
1007            let log = event.as_log();
1008            assert_eq!(log["key1"], "value1".into());
1009            assert_event_metadata(log).await;
1010        }
1011        {
1012            let event = events.remove(0);
1013            let log = event.as_log();
1014            assert_eq!(log["key2"], "value2".into());
1015            assert_event_metadata(log).await;
1016        }
1017        {
1018            let event = events.remove(0);
1019            let log = event.as_log();
1020            assert_eq!(log["key1"], "value1".into());
1021            assert_event_metadata(log).await;
1022        }
1023        {
1024            let event = events.remove(0);
1025            let log = event.as_log();
1026            assert_eq!(log["key2"], "value2".into());
1027            assert_event_metadata(log).await;
1028        }
1029    }
1030
1031    async fn assert_event_metadata(log: &LogEvent) {
1032        assert!(log.get_timestamp().is_some());
1033
1034        let source_type_key_value = log
1035            .get((PathPrefix::Event, log_schema().source_type_key().unwrap()))
1036            .unwrap()
1037            .as_str()
1038            .unwrap();
1039        assert_eq!(source_type_key_value, SimpleHttpConfig::NAME);
1040        assert_eq!(log["http_path"], "/".into());
1041    }
1042
1043    #[tokio::test]
1044    async fn http_headers() {
1045        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1046            let mut headers = HeaderMap::new();
1047            headers.insert("User-Agent", "test_client".parse().unwrap());
1048            headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
1049            headers.insert("X-Test-Header", "true".parse().unwrap());
1050
1051            let (rx, addr) = source(
1052                vec![
1053                    "User-Agent".to_string(),
1054                    "Upgrade-Insecure-Requests".to_string(),
1055                    "X-*".to_string(),
1056                    "AbsentHeader".to_string(),
1057                ],
1058                vec![],
1059                "http_path",
1060                "remote_ip",
1061                "/",
1062                "POST",
1063                StatusCode::OK,
1064                None,
1065                true,
1066                EventStatus::Delivered,
1067                true,
1068                None,
1069                Some(JsonDeserializerConfig::default().into()),
1070            )
1071            .await;
1072
1073            spawn_ok_collect_n(
1074                send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1075                rx,
1076                1,
1077            )
1078            .await
1079        })
1080        .await;
1081
1082        {
1083            let event = events.remove(0);
1084            let log = event.as_log();
1085            assert_eq!(log["key1"], "value1".into());
1086            assert_eq!(log["\"User-Agent\""], "test_client".into());
1087            assert_eq!(log["\"Upgrade-Insecure-Requests\""], "false".into());
1088            assert_eq!(log["\"x-test-header\""], "true".into());
1089            assert_eq!(log["AbsentHeader"], Value::Null);
1090            assert_event_metadata(log).await;
1091        }
1092    }
1093
1094    #[tokio::test]
1095    async fn http_headers_wildcard() {
1096        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1097            let mut headers = HeaderMap::new();
1098            headers.insert("User-Agent", "test_client".parse().unwrap());
1099            headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
1100            // Header that conflicts with an existing field.
1101            headers.insert("key1", "value_from_header".parse().unwrap());
1102
1103            let (rx, addr) = source(
1104                vec!["*".to_string()],
1105                vec![],
1106                "http_path",
1107                "remote_ip",
1108                "/",
1109                "POST",
1110                StatusCode::OK,
1111                None,
1112                true,
1113                EventStatus::Delivered,
1114                true,
1115                None,
1116                Some(JsonDeserializerConfig::default().into()),
1117            )
1118            .await;
1119
1120            spawn_ok_collect_n(
1121                send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1122                rx,
1123                1,
1124            )
1125            .await
1126        })
1127        .await;
1128
1129        {
1130            let event = events.remove(0);
1131            let log = event.as_log();
1132            assert_eq!(log["key1"], "value1".into());
1133            assert_eq!(log["\"user-agent\""], "test_client".into());
1134            assert_eq!(log["\"x-case-sensitive-value\""], "CaseSensitive".into());
1135            assert_event_metadata(log).await;
1136        }
1137    }
1138
1139    #[tokio::test]
1140    async fn http_query() {
1141        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1142            let (rx, addr) = source(
1143                vec![],
1144                vec![
1145                    "source".to_string(),
1146                    "region".to_string(),
1147                    "absent".to_string(),
1148                ],
1149                "http_path",
1150                "remote_ip",
1151                "/",
1152                "POST",
1153                StatusCode::OK,
1154                None,
1155                true,
1156                EventStatus::Delivered,
1157                true,
1158                None,
1159                Some(JsonDeserializerConfig::default().into()),
1160            )
1161            .await;
1162
1163            spawn_ok_collect_n(
1164                send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging&region=gb"),
1165                rx,
1166                1,
1167            )
1168            .await
1169        })
1170        .await;
1171
1172        {
1173            let event = events.remove(0);
1174            let log = event.as_log();
1175            assert_eq!(log["key1"], "value1".into());
1176            assert_eq!(log["source"], "staging".into());
1177            assert_eq!(log["region"], "gb".into());
1178            assert_eq!(log["absent"], Value::Null);
1179            assert_event_metadata(log).await;
1180        }
1181    }
1182
1183    #[tokio::test]
1184    async fn http_query_wildcard() {
1185        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1186            let (rx, addr) = source(
1187                vec![],
1188                vec!["*".to_string()],
1189                "http_path",
1190                "remote_ip",
1191                "/",
1192                "POST",
1193                StatusCode::OK,
1194                None,
1195                true,
1196                EventStatus::Delivered,
1197                true,
1198                None,
1199                Some(JsonDeserializerConfig::default().into()),
1200            )
1201            .await;
1202
1203            spawn_ok_collect_n(
1204                send_with_query(
1205                    addr,
1206                    "{\"key1\":\"value1\",\"key2\":\"value2\"}",
1207                    "source=staging&region=gb&key1=value_from_query",
1208                ),
1209                rx,
1210                1,
1211            )
1212            .await
1213        })
1214        .await;
1215
1216        {
1217            let event = events.remove(0);
1218            let log = event.as_log();
1219            assert_eq!(log["key1"], "value_from_query".into());
1220            assert_eq!(log["key2"], "value2".into());
1221            assert_eq!(log["source"], "staging".into());
1222            assert_eq!(log["region"], "gb".into());
1223            assert_event_metadata(log).await;
1224        }
1225    }
1226
1227    #[tokio::test]
1228    async fn http_gzip_deflate() {
1229        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1230            let body = "test body";
1231
1232            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1233            encoder.write_all(body.as_bytes()).unwrap();
1234            let body = encoder.finish().unwrap();
1235
1236            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1237            encoder.write_all(body.as_slice()).unwrap();
1238            let body = encoder.finish().unwrap();
1239
1240            let mut headers = HeaderMap::new();
1241            headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap());
1242
1243            let (rx, addr) = source(
1244                vec![],
1245                vec![],
1246                "http_path",
1247                "remote_ip",
1248                "/",
1249                "POST",
1250                StatusCode::OK,
1251                None,
1252                true,
1253                EventStatus::Delivered,
1254                true,
1255                None,
1256                None,
1257            )
1258            .await;
1259
1260            spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await
1261        })
1262        .await;
1263
1264        {
1265            let event = events.remove(0);
1266            let log = event.as_log();
1267            assert_eq!(*log.get_message().unwrap(), "test body".into());
1268            assert_event_metadata(log).await;
1269        }
1270    }
1271
1272    #[tokio::test]
1273    async fn http_path() {
1274        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1275            let (rx, addr) = source(
1276                vec![],
1277                vec![],
1278                "vector_http_path",
1279                "vector_remote_ip",
1280                "/event/path",
1281                "POST",
1282                StatusCode::OK,
1283                None,
1284                true,
1285                EventStatus::Delivered,
1286                true,
1287                None,
1288                Some(JsonDeserializerConfig::default().into()),
1289            )
1290            .await;
1291
1292            spawn_ok_collect_n(
1293                send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"),
1294                rx,
1295                1,
1296            )
1297            .await
1298        })
1299        .await;
1300
1301        {
1302            let event = events.remove(0);
1303            let log = event.as_log();
1304            assert_eq!(log["key1"], "value1".into());
1305            assert_eq!(log["vector_http_path"], "/event/path".into());
1306            assert!(log.get_timestamp().is_some());
1307            assert_eq!(
1308                *log.get_source_type().unwrap(),
1309                SimpleHttpConfig::NAME.into()
1310            );
1311        }
1312    }
1313
1314    #[tokio::test]
1315    async fn http_path_no_restriction() {
1316        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1317            let (rx, addr) = source(
1318                vec![],
1319                vec![],
1320                "vector_http_path",
1321                "vector_remote_ip",
1322                "/event",
1323                "POST",
1324                StatusCode::OK,
1325                None,
1326                false,
1327                EventStatus::Delivered,
1328                true,
1329                None,
1330                Some(JsonDeserializerConfig::default().into()),
1331            )
1332            .await;
1333
1334            spawn_collect_n(
1335                async move {
1336                    assert_eq!(
1337                        200,
1338                        send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await
1339                    );
1340                    assert_eq!(
1341                        200,
1342                        send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await
1343                    );
1344                },
1345                rx,
1346                2,
1347            )
1348            .await
1349        })
1350        .await;
1351
1352        {
1353            let event = events.remove(0);
1354            let log = event.as_log();
1355            assert_eq!(log["key1"], "value1".into());
1356            assert_eq!(log["vector_http_path"], "/event/path1".into());
1357            assert!(log.get_timestamp().is_some());
1358            assert_eq!(
1359                *log.get_source_type().unwrap(),
1360                SimpleHttpConfig::NAME.into()
1361            );
1362        }
1363        {
1364            let event = events.remove(0);
1365            let log = event.as_log();
1366            assert_eq!(log["key2"], "value2".into());
1367            assert_eq!(log["vector_http_path"], "/event/path2".into());
1368            assert!(log.get_timestamp().is_some());
1369            assert_eq!(
1370                *log.get_source_type().unwrap(),
1371                SimpleHttpConfig::NAME.into()
1372            );
1373        }
1374    }
1375
1376    #[tokio::test]
1377    async fn http_wrong_path() {
1378        components::init_test();
1379        let (_rx, addr) = source(
1380            vec![],
1381            vec![],
1382            "vector_http_path",
1383            "vector_remote_ip",
1384            "/",
1385            "POST",
1386            StatusCode::OK,
1387            None,
1388            true,
1389            EventStatus::Delivered,
1390            true,
1391            None,
1392            Some(JsonDeserializerConfig::default().into()),
1393        )
1394        .await;
1395
1396        assert_eq!(
1397            404,
1398            send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await
1399        );
1400    }
1401
1402    #[tokio::test]
1403    async fn http_status_code() {
1404        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
1405            let (rx, addr) = source(
1406                vec![],
1407                vec![],
1408                "http_path",
1409                "remote_ip",
1410                "/",
1411                "POST",
1412                StatusCode::ACCEPTED,
1413                None,
1414                true,
1415                EventStatus::Delivered,
1416                true,
1417                None,
1418                None,
1419            )
1420            .await;
1421
1422            spawn_collect_n(
1423                async move {
1424                    assert_eq!(
1425                        StatusCode::ACCEPTED,
1426                        send(addr, "{\"key1\":\"value1\"}").await
1427                    );
1428                },
1429                rx,
1430                1,
1431            )
1432            .await;
1433        })
1434        .await;
1435    }
1436
1437    #[tokio::test]
1438    async fn http_delivery_failure() {
1439        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1440            let (rx, addr) = source(
1441                vec![],
1442                vec![],
1443                "http_path",
1444                "remote_ip",
1445                "/",
1446                "POST",
1447                StatusCode::OK,
1448                None,
1449                true,
1450                EventStatus::Rejected,
1451                true,
1452                None,
1453                None,
1454            )
1455            .await;
1456
1457            spawn_collect_n(
1458                async move {
1459                    assert_eq!(400, send(addr, "test body\n").await);
1460                },
1461                rx,
1462                1,
1463            )
1464            .await;
1465        })
1466        .await;
1467    }
1468
1469    #[tokio::test]
1470    async fn ignores_disabled_acknowledgements() {
1471        let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1472            let (rx, addr) = source(
1473                vec![],
1474                vec![],
1475                "http_path",
1476                "remote_ip",
1477                "/",
1478                "POST",
1479                StatusCode::OK,
1480                None,
1481                true,
1482                EventStatus::Rejected,
1483                false,
1484                None,
1485                None,
1486            )
1487            .await;
1488
1489            spawn_collect_n(
1490                async move {
1491                    assert_eq!(200, send(addr, "test body\n").await);
1492                },
1493                rx,
1494                1,
1495            )
1496            .await
1497        })
1498        .await;
1499
1500        assert_eq!(events.len(), 1);
1501    }
1502
1503    #[tokio::test]
1504    async fn http_get_method() {
1505        components::init_test();
1506        let (_rx, addr) = source(
1507            vec![],
1508            vec![],
1509            "http_path",
1510            "remote_ip",
1511            "/",
1512            "GET",
1513            StatusCode::OK,
1514            None,
1515            true,
1516            EventStatus::Delivered,
1517            true,
1518            None,
1519            None,
1520        )
1521        .await;
1522
1523        assert_eq!(200, send_request(addr, "GET", "", "/").await);
1524    }
1525
1526    #[tokio::test]
1527    async fn returns_401_when_required_auth_is_missing() {
1528        components::init_test();
1529        let (_rx, addr) = source(
1530            vec![],
1531            vec![],
1532            "http_path",
1533            "remote_ip",
1534            "/",
1535            "GET",
1536            StatusCode::OK,
1537            Some(HttpServerAuthConfig::Basic {
1538                username: "test".to_string(),
1539                password: "test".to_string().into(),
1540            }),
1541            true,
1542            EventStatus::Delivered,
1543            true,
1544            None,
1545            None,
1546        )
1547        .await;
1548
1549        assert_eq!(401, send_request(addr, "GET", "", "/").await);
1550    }
1551
1552    #[tokio::test]
1553    async fn returns_401_when_required_auth_is_wrong() {
1554        components::init_test();
1555        let (_rx, addr) = source(
1556            vec![],
1557            vec![],
1558            "http_path",
1559            "remote_ip",
1560            "/",
1561            "POST",
1562            StatusCode::OK,
1563            Some(HttpServerAuthConfig::Basic {
1564                username: "test".to_string(),
1565                password: "test".to_string().into(),
1566            }),
1567            true,
1568            EventStatus::Delivered,
1569            true,
1570            None,
1571            None,
1572        )
1573        .await;
1574
1575        let mut headers = HeaderMap::new();
1576        headers.insert(
1577            AUTHORIZATION,
1578            Authorization::basic("wrong", "test").0.encode(),
1579        );
1580        assert_eq!(401, send_with_headers(addr, "", headers).await);
1581    }
1582
1583    #[tokio::test]
1584    async fn http_get_with_correct_auth() {
1585        components::init_test();
1586        let (_rx, addr) = source(
1587            vec![],
1588            vec![],
1589            "http_path",
1590            "remote_ip",
1591            "/",
1592            "POST",
1593            StatusCode::OK,
1594            Some(HttpServerAuthConfig::Basic {
1595                username: "test".to_string(),
1596                password: "test".to_string().into(),
1597            }),
1598            true,
1599            EventStatus::Delivered,
1600            true,
1601            None,
1602            None,
1603        )
1604        .await;
1605
1606        let mut headers = HeaderMap::new();
1607        headers.insert(
1608            AUTHORIZATION,
1609            Authorization::basic("test", "test").0.encode(),
1610        );
1611        assert_eq!(200, send_with_headers(addr, "", headers).await);
1612    }
1613
1614    #[test]
1615    fn output_schema_definition_vector_namespace() {
1616        let config = SimpleHttpConfig {
1617            log_namespace: Some(true),
1618            ..Default::default()
1619        };
1620
1621        let definitions = config
1622            .outputs(LogNamespace::Vector)
1623            .remove(0)
1624            .schema_definition(true);
1625
1626        let expected_definition =
1627            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1628                .with_meaning(OwnedTargetPath::event_root(), "message")
1629                .with_metadata_field(
1630                    &owned_value_path!("vector", "source_type"),
1631                    Kind::bytes(),
1632                    None,
1633                )
1634                .with_metadata_field(
1635                    &owned_value_path!(SimpleHttpConfig::NAME, "path"),
1636                    Kind::bytes(),
1637                    None,
1638                )
1639                .with_metadata_field(
1640                    &owned_value_path!(SimpleHttpConfig::NAME, "headers"),
1641                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1642                    None,
1643                )
1644                .with_metadata_field(
1645                    &owned_value_path!(SimpleHttpConfig::NAME, "query_parameters"),
1646                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1647                    None,
1648                )
1649                .with_metadata_field(
1650                    &owned_value_path!(SimpleHttpConfig::NAME, "host"),
1651                    Kind::bytes().or_undefined(),
1652                    None,
1653                )
1654                .with_metadata_field(
1655                    &owned_value_path!("vector", "ingest_timestamp"),
1656                    Kind::timestamp(),
1657                    None,
1658                );
1659
1660        assert_eq!(definitions, Some(expected_definition))
1661    }
1662
1663    #[test]
1664    fn output_schema_definition_legacy_namespace() {
1665        let config = SimpleHttpConfig::default();
1666
1667        let definitions = config
1668            .outputs(LogNamespace::Legacy)
1669            .remove(0)
1670            .schema_definition(true);
1671
1672        let expected_definition = Definition::new_with_default_metadata(
1673            Kind::object(Collection::empty()),
1674            [LogNamespace::Legacy],
1675        )
1676        .with_event_field(
1677            &owned_value_path!("message"),
1678            Kind::bytes(),
1679            Some("message"),
1680        )
1681        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1682        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1683        .with_event_field(&owned_value_path!("path"), Kind::bytes(), None)
1684        .with_event_field(
1685            &owned_value_path!("host"),
1686            Kind::bytes().or_undefined(),
1687            None,
1688        )
1689        .unknown_fields(Kind::bytes());
1690
1691        assert_eq!(definitions, Some(expected_definition))
1692    }
1693
1694    #[test]
1695    fn validate_remove_duplicates() {
1696        let mut list = vec![
1697            "a".to_owned(),
1698            "b".to_owned(),
1699            "c".to_owned(),
1700            "d".to_owned(),
1701        ];
1702
1703        // no duplicates should be identical
1704        {
1705            let list_dedup = remove_duplicates(list.clone(), "foo");
1706
1707            assert_eq!(list, list_dedup);
1708        }
1709
1710        list.push("b".to_owned());
1711
1712        // remove duplicate "b"
1713        {
1714            let list_dedup = remove_duplicates(list.clone(), "foo");
1715            assert_eq!(
1716                vec![
1717                    "a".to_owned(),
1718                    "b".to_owned(),
1719                    "c".to_owned(),
1720                    "d".to_owned()
1721                ],
1722                list_dedup
1723            );
1724        }
1725    }
1726
1727    impl ValidatableComponent for SimpleHttpConfig {
1728        fn validation_configuration() -> ValidationConfiguration {
1729            let config = Self {
1730                decoding: Some(DeserializerConfig::Json(Default::default())),
1731                ..Default::default()
1732            };
1733
1734            let log_namespace: LogNamespace = config.log_namespace.unwrap_or(false).into();
1735
1736            let listen_addr_http = format!("http://{}/", config.address);
1737            let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
1738
1739            let external_resource = ExternalResource::new(
1740                ResourceDirection::Push,
1741                HttpResourceConfig::from_parts(uri, Some(config.method.into())),
1742                config
1743                    .get_decoding_config()
1744                    .expect("should not fail to get decoding config"),
1745            );
1746
1747            ValidationConfiguration::from_source(
1748                Self::NAME,
1749                log_namespace,
1750                vec![ComponentTestCaseConfig::from_source(
1751                    config,
1752                    None,
1753                    Some(external_resource),
1754                )],
1755            )
1756        }
1757    }
1758
1759    register_validatable_component!(SimpleHttpConfig);
1760}