vector/sources/
http_server.rs

1use crate::common::http::{server_auth::HttpServerAuthConfig, ErrorMessage};
2use std::{collections::HashMap, net::SocketAddr};
3
4use bytes::{Bytes, BytesMut};
5use chrono::Utc;
6use http::StatusCode;
7use http_serde;
8use tokio_util::codec::Decoder as _;
9use vrl::value::{kind::Collection, Kind};
10use warp::http::HeaderMap;
11
12use vector_lib::codecs::{
13    decoding::{DeserializerConfig, FramingConfig},
14    BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
15    NewlineDelimitedDecoderConfig,
16};
17use vector_lib::configurable::configurable_component;
18use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
19use vector_lib::{
20    config::{DataType, LegacyKey, LogNamespace},
21    schema::Definition,
22};
23
24use crate::{
25    codecs::{Decoder, DecodingConfig},
26    config::{
27        GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
28        SourceOutput,
29    },
30    event::Event,
31    http::KeepaliveConfig,
32    serde::{bool_or_struct, default_decoding},
33    sources::util::{
34        http::{add_headers, add_query_parameters, HttpMethod},
35        Encoding, HttpSource,
36    },
37    tls::TlsEnableableConfig,
38};
39
40/// Configuration for the `http` source.
41#[configurable_component(source("http", "Host an HTTP endpoint to receive logs."))]
42#[configurable(metadata(deprecated))]
43#[derive(Clone, Debug)]
44pub struct HttpConfig(SimpleHttpConfig);
45
46impl GenerateConfig for HttpConfig {
47    fn generate_config() -> toml::Value {
48        <SimpleHttpConfig as GenerateConfig>::generate_config()
49    }
50}
51
52#[async_trait::async_trait]
53#[typetag::serde(name = "http")]
54impl SourceConfig for HttpConfig {
55    async fn build(&self, cx: SourceContext) -> vector_lib::Result<super::Source> {
56        self.0.build(cx).await
57    }
58
59    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
60        self.0.outputs(global_log_namespace)
61    }
62
63    fn resources(&self) -> Vec<Resource> {
64        self.0.resources()
65    }
66
67    fn can_acknowledge(&self) -> bool {
68        self.0.can_acknowledge()
69    }
70}
71
72/// Configuration for the `http_server` source.
73#[configurable_component(source("http_server", "Host an HTTP endpoint to receive logs."))]
74#[derive(Clone, Debug)]
75pub struct SimpleHttpConfig {
76    /// The socket address to listen for connections on.
77    ///
78    /// It _must_ include a port.
79    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
80    #[configurable(metadata(docs::examples = "localhost:80"))]
81    address: SocketAddr,
82
83    /// The expected encoding of received data.
84    ///
85    /// For `json` and `ndjson` encodings, the fields of the JSON objects are output as separate fields.
86    #[serde(default)]
87    encoding: Option<Encoding>,
88
89    /// A list of HTTP headers to include in the log event.
90    ///
91    /// Accepts the wildcard (`*`) character for headers matching a specified pattern.
92    ///
93    /// Specifying "*" results in all headers included in the log event.
94    ///
95    /// These headers are not included in the JSON payload if a field with a conflicting name exists.
96    #[serde(default)]
97    #[configurable(metadata(docs::examples = "User-Agent"))]
98    #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
99    #[configurable(metadata(docs::examples = "X-*"))]
100    #[configurable(metadata(docs::examples = "*"))]
101    headers: Vec<String>,
102
103    /// A list of URL query parameters to include in the log event.
104    ///
105    /// Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
106    ///
107    /// Specifying "*" results in all query parameters included in the log event.
108    ///
109    /// These override any values included in the body with conflicting names.
110    #[serde(default)]
111    #[configurable(metadata(docs::examples = "application"))]
112    #[configurable(metadata(docs::examples = "source"))]
113    #[configurable(metadata(docs::examples = "param*"))]
114    #[configurable(metadata(docs::examples = "*"))]
115    query_parameters: Vec<String>,
116
117    #[configurable(derived)]
118    auth: Option<HttpServerAuthConfig>,
119
120    /// Whether or not to treat the configured `path` as an absolute path.
121    ///
122    /// If set to `true`, only requests using the exact URL path specified in `path` are accepted. Otherwise,
123    /// requests sent to a URL path that starts with the value of `path` are accepted.
124    ///
125    /// With `strict_path` set to `false` and `path` set to `""`, the configured HTTP source accepts requests from
126    /// any URL path.
127    #[serde(default = "crate::serde::default_true")]
128    strict_path: bool,
129
130    /// The URL path on which log event POST requests are sent.
131    #[serde(default = "default_path")]
132    #[configurable(metadata(docs::examples = "/event/path"))]
133    #[configurable(metadata(docs::examples = "/logs"))]
134    path: String,
135
136    /// The event key in which the requested URL path used to send the request is stored.
137    #[serde(default = "default_path_key")]
138    #[configurable(metadata(docs::examples = "vector_http_path"))]
139    path_key: OptionalValuePath,
140
141    /// If set, the name of the log field used to add the remote IP to each event
142    #[serde(default = "default_host_key")]
143    #[configurable(metadata(docs::examples = "hostname"))]
144    host_key: OptionalValuePath,
145
146    /// Specifies the action of the HTTP request.
147    #[serde(default = "default_http_method")]
148    method: HttpMethod,
149
150    /// Specifies the HTTP response status code that will be returned on successful requests.
151    #[configurable(metadata(docs::examples = 202))]
152    #[configurable(metadata(docs::numeric_type = "uint"))]
153    #[serde(with = "http_serde::status_code")]
154    #[serde(default = "default_http_response_code")]
155    response_code: StatusCode,
156
157    #[configurable(derived)]
158    tls: Option<TlsEnableableConfig>,
159
160    #[configurable(derived)]
161    framing: Option<FramingConfig>,
162
163    #[configurable(derived)]
164    decoding: Option<DeserializerConfig>,
165
166    #[configurable(derived)]
167    #[serde(default, deserialize_with = "bool_or_struct")]
168    acknowledgements: SourceAcknowledgementsConfig,
169
170    /// The namespace to use for logs. This overrides the global setting.
171    #[configurable(metadata(docs::hidden))]
172    #[serde(default)]
173    log_namespace: Option<bool>,
174
175    #[configurable(derived)]
176    #[serde(default)]
177    keepalive: KeepaliveConfig,
178}
179
180impl SimpleHttpConfig {
181    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
182    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
183        let mut schema_definition = self
184            .decoding
185            .as_ref()
186            .unwrap_or(&default_decoding())
187            .schema_definition(log_namespace)
188            .with_source_metadata(
189                SimpleHttpConfig::NAME,
190                self.path_key.path.clone().map(LegacyKey::InsertIfEmpty),
191                &owned_value_path!("path"),
192                Kind::bytes(),
193                None,
194            )
195            // for metadata that is added to the events dynamically from the self.headers
196            .with_source_metadata(
197                SimpleHttpConfig::NAME,
198                None,
199                &owned_value_path!("headers"),
200                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
201                None,
202            )
203            // for metadata that is added to the events dynamically from the self.query_parameters
204            .with_source_metadata(
205                SimpleHttpConfig::NAME,
206                None,
207                &owned_value_path!("query_parameters"),
208                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
209                None,
210            )
211            .with_source_metadata(
212                SimpleHttpConfig::NAME,
213                self.host_key.path.clone().map(LegacyKey::Overwrite),
214                &owned_value_path!("host"),
215                Kind::bytes().or_undefined(),
216                None,
217            )
218            .with_standard_vector_source_metadata();
219
220        // for metadata that is added to the events dynamically from config options
221        if log_namespace == LogNamespace::Legacy {
222            schema_definition = schema_definition.unknown_fields(Kind::bytes());
223        }
224
225        schema_definition
226    }
227
228    fn get_decoding_config(&self) -> crate::Result<DecodingConfig> {
229        if self.encoding.is_some() && (self.framing.is_some() || self.decoding.is_some()) {
230            return Err("Using `encoding` is deprecated and does not have any effect when `decoding` or `framing` is provided. Configure `framing` and `decoding` instead.".into());
231        }
232
233        let (framing, decoding) = if let Some(encoding) = self.encoding {
234            match encoding {
235                Encoding::Text => (
236                    NewlineDelimitedDecoderConfig::new().into(),
237                    BytesDeserializerConfig::new().into(),
238                ),
239                Encoding::Json => (
240                    BytesDecoderConfig::new().into(),
241                    JsonDeserializerConfig::default().into(),
242                ),
243                Encoding::Ndjson => (
244                    NewlineDelimitedDecoderConfig::new().into(),
245                    JsonDeserializerConfig::default().into(),
246                ),
247                Encoding::Binary => (
248                    BytesDecoderConfig::new().into(),
249                    BytesDeserializerConfig::new().into(),
250                ),
251            }
252        } else {
253            let decoding = self.decoding.clone().unwrap_or_else(default_decoding);
254            let framing = self
255                .framing
256                .clone()
257                .unwrap_or_else(|| decoding.default_stream_framing());
258            (framing, decoding)
259        };
260
261        Ok(DecodingConfig::new(
262            framing,
263            decoding,
264            self.log_namespace.unwrap_or(false).into(),
265        ))
266    }
267}
268
269impl Default for SimpleHttpConfig {
270    fn default() -> Self {
271        Self {
272            address: "0.0.0.0:8080".parse().unwrap(),
273            encoding: None,
274            headers: Vec::new(),
275            query_parameters: Vec::new(),
276            tls: None,
277            auth: None,
278            path: default_path(),
279            path_key: default_path_key(),
280            host_key: default_host_key(),
281            method: default_http_method(),
282            response_code: default_http_response_code(),
283            strict_path: true,
284            framing: None,
285            decoding: Some(default_decoding()),
286            acknowledgements: SourceAcknowledgementsConfig::default(),
287            log_namespace: None,
288            keepalive: KeepaliveConfig::default(),
289        }
290    }
291}
292
293impl_generate_config_from_default!(SimpleHttpConfig);
294
295const fn default_http_method() -> HttpMethod {
296    HttpMethod::Post
297}
298
299fn default_path() -> String {
300    "/".to_string()
301}
302
303fn default_path_key() -> OptionalValuePath {
304    OptionalValuePath::from(owned_value_path!("path"))
305}
306
307fn default_host_key() -> OptionalValuePath {
308    OptionalValuePath::none()
309}
310
311const fn default_http_response_code() -> StatusCode {
312    StatusCode::OK
313}
314
315/// Removes duplicates from the list, and logs a `warn!()` for each duplicate removed.
316pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
317    list.sort();
318
319    let mut dedup = false;
320    for (idx, name) in list.iter().enumerate() {
321        if idx < list.len() - 1 && list[idx] == list[idx + 1] {
322            warn!(
323                "`{}` configuration contains duplicate entry for `{}`. Removing duplicate.",
324                list_name, name
325            );
326            dedup = true;
327        }
328    }
329
330    if dedup {
331        list.dedup();
332    }
333    list
334}
335
336/// Convert [`SocketAddr`] into a string, returning only the IP address.
337fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
338    addr.ip().to_string()
339}
340
341#[derive(Clone)]
342pub enum HttpConfigParamKind {
343    Glob(glob::Pattern),
344    Exact(String),
345}
346
347pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
348    list.iter()
349        .map(|s| match s.contains('*') {
350            true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
351            false => Ok(HttpConfigParamKind::Exact(s.to_string())),
352        })
353        .collect::<crate::Result<Vec<HttpConfigParamKind>>>()
354}
355
356#[async_trait::async_trait]
357#[typetag::serde(name = "http_server")]
358impl SourceConfig for SimpleHttpConfig {
359    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
360        let log_namespace = cx.log_namespace(self.log_namespace);
361        let decoder = self
362            .get_decoding_config()?
363            .build()?
364            .with_log_namespace(log_namespace);
365
366        let source = SimpleHttpSource {
367            headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
368            query_parameters: build_param_matcher(&remove_duplicates(
369                self.query_parameters.clone(),
370                "query_parameters",
371            ))?,
372            path_key: self.path_key.clone(),
373            host_key: self.host_key.clone(),
374            decoder,
375            log_namespace,
376        };
377        source.run(
378            self.address,
379            self.path.as_str(),
380            self.method,
381            self.response_code,
382            self.strict_path,
383            self.tls.as_ref(),
384            self.auth.as_ref(),
385            cx,
386            self.acknowledgements,
387            self.keepalive.clone(),
388        )
389    }
390
391    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
392        // There is a global and per-source `log_namespace` config.
393        // The source config overrides the global setting and is merged here.
394        let log_namespace = global_log_namespace.merge(self.log_namespace);
395
396        let schema_definition = self.schema_definition(log_namespace);
397
398        vec![SourceOutput::new_maybe_logs(
399            self.decoding
400                .as_ref()
401                .map(|d| d.output_type())
402                .unwrap_or(DataType::Log),
403            schema_definition,
404        )]
405    }
406
407    fn resources(&self) -> Vec<Resource> {
408        vec![Resource::tcp(self.address)]
409    }
410
411    fn can_acknowledge(&self) -> bool {
412        true
413    }
414}
415
416#[derive(Clone)]
417struct SimpleHttpSource {
418    headers: Vec<HttpConfigParamKind>,
419    query_parameters: Vec<HttpConfigParamKind>,
420    path_key: OptionalValuePath,
421    host_key: OptionalValuePath,
422    decoder: Decoder,
423    log_namespace: LogNamespace,
424}
425
426impl HttpSource for SimpleHttpSource {
427    /// Enriches the log events with metadata for the `request_path` and for each of the headers.
428    /// Non-log events are skipped.
429    fn enrich_events(
430        &self,
431        events: &mut [Event],
432        request_path: &str,
433        headers: &HeaderMap,
434        query_parameters: &HashMap<String, String>,
435        source_ip: Option<&SocketAddr>,
436    ) {
437        let now = Utc::now();
438        for event in events.iter_mut() {
439            match event {
440                Event::Log(log) => {
441                    // add request_path to each event
442                    self.log_namespace.insert_source_metadata(
443                        SimpleHttpConfig::NAME,
444                        log,
445                        self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
446                        path!("path"),
447                        request_path.to_owned(),
448                    );
449
450                    self.log_namespace.insert_standard_vector_source_metadata(
451                        log,
452                        SimpleHttpConfig::NAME,
453                        now,
454                    );
455
456                    if let Some(addr) = source_ip {
457                        self.log_namespace.insert_source_metadata(
458                            SimpleHttpConfig::NAME,
459                            log,
460                            self.host_key.path.as_ref().map(LegacyKey::Overwrite),
461                            path!("host"),
462                            socket_addr_to_ip_string(addr),
463                        );
464                    }
465                }
466                _ => {
467                    continue;
468                }
469            }
470        }
471
472        add_headers(
473            events,
474            &self.headers,
475            headers,
476            self.log_namespace,
477            SimpleHttpConfig::NAME,
478        );
479
480        add_query_parameters(
481            events,
482            &self.query_parameters,
483            query_parameters,
484            self.log_namespace,
485            SimpleHttpConfig::NAME,
486        );
487    }
488
489    fn build_events(
490        &self,
491        body: Bytes,
492        _header_map: &HeaderMap,
493        _query_parameters: &HashMap<String, String>,
494        _request_path: &str,
495    ) -> Result<Vec<Event>, ErrorMessage> {
496        let mut decoder = self.decoder.clone();
497        let mut events = Vec::new();
498        let mut bytes = BytesMut::new();
499        bytes.extend_from_slice(&body);
500
501        loop {
502            match decoder.decode_eof(&mut bytes) {
503                Ok(Some((next, _))) => {
504                    events.extend(next);
505                }
506                Ok(None) => break,
507                Err(error) => {
508                    // Error is logged / emitted by `crate::codecs::Decoder`, no further
509                    // handling is needed here
510                    return Err(ErrorMessage::new(
511                        StatusCode::BAD_REQUEST,
512                        format!("Failed decoding body: {error}"),
513                    ));
514                }
515            }
516        }
517
518        Ok(events)
519    }
520
521    fn enable_source_ip(&self) -> bool {
522        self.host_key.path.is_some()
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use std::str::FromStr;
529    use std::{io::Write, net::SocketAddr};
530
531    use flate2::{
532        write::{GzEncoder, ZlibEncoder},
533        Compression,
534    };
535    use futures::Stream;
536    use headers::authorization::Credentials;
537    use headers::Authorization;
538    use http::header::AUTHORIZATION;
539    use http::{HeaderMap, Method, StatusCode, Uri};
540    use similar_asserts::assert_eq;
541    use vector_lib::codecs::{
542        decoding::{DeserializerConfig, FramingConfig},
543        BytesDecoderConfig, JsonDeserializerConfig,
544    };
545    use vector_lib::config::LogNamespace;
546    use vector_lib::event::LogEvent;
547    use vector_lib::lookup::lookup_v2::OptionalValuePath;
548    use vector_lib::lookup::{event_path, owned_value_path, OwnedTargetPath, PathPrefix};
549    use vector_lib::schema::Definition;
550    use vrl::value::{kind::Collection, Kind, ObjectMap};
551
552    use crate::common::http::server_auth::HttpServerAuthConfig;
553    use crate::sources::http_server::HttpMethod;
554    use crate::{
555        components::validation::prelude::*,
556        config::{log_schema, SourceConfig, SourceContext},
557        event::{Event, EventStatus, Value},
558        test_util::{
559            components::{self, assert_source_compliance, HTTP_PUSH_SOURCE_TAGS},
560            next_addr, spawn_collect_n, wait_for_tcp,
561        },
562        SourceSender,
563    };
564
565    use super::{remove_duplicates, SimpleHttpConfig};
566
567    #[test]
568    fn generate_config() {
569        crate::test_util::test_generate_config::<SimpleHttpConfig>();
570    }
571
572    #[allow(clippy::too_many_arguments)]
573    async fn source<'a>(
574        headers: Vec<String>,
575        query_parameters: Vec<String>,
576        path_key: &'a str,
577        host_key: &'a str,
578        path: &'a str,
579        method: &'a str,
580        response_code: StatusCode,
581        auth: Option<HttpServerAuthConfig>,
582        strict_path: bool,
583        status: EventStatus,
584        acknowledgements: bool,
585        framing: Option<FramingConfig>,
586        decoding: Option<DeserializerConfig>,
587    ) -> (impl Stream<Item = Event> + 'a, SocketAddr) {
588        let (sender, recv) = SourceSender::new_test_finalize(status);
589        let address = next_addr();
590        let path = path.to_owned();
591        let host_key = OptionalValuePath::from(owned_value_path!(host_key));
592        let path_key = OptionalValuePath::from(owned_value_path!(path_key));
593        let context = SourceContext::new_test(sender, None);
594        let method = match Method::from_str(method).unwrap() {
595            Method::GET => HttpMethod::Get,
596            Method::POST => HttpMethod::Post,
597            _ => HttpMethod::Post,
598        };
599
600        tokio::spawn(async move {
601            SimpleHttpConfig {
602                address,
603                headers,
604                encoding: None,
605                query_parameters,
606                response_code,
607                tls: None,
608                auth,
609                strict_path,
610                path_key,
611                host_key,
612                path,
613                method,
614                framing,
615                decoding,
616                acknowledgements: acknowledgements.into(),
617                log_namespace: None,
618                keepalive: Default::default(),
619            }
620            .build(context)
621            .await
622            .unwrap()
623            .await
624            .unwrap();
625        });
626        wait_for_tcp(address).await;
627        (recv, address)
628    }
629
630    async fn send(address: SocketAddr, body: &str) -> u16 {
631        reqwest::Client::new()
632            .post(format!("http://{address}/"))
633            .body(body.to_owned())
634            .send()
635            .await
636            .unwrap()
637            .status()
638            .as_u16()
639    }
640
641    async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 {
642        reqwest::Client::new()
643            .post(format!("http://{address}/"))
644            .headers(headers)
645            .body(body.to_owned())
646            .send()
647            .await
648            .unwrap()
649            .status()
650            .as_u16()
651    }
652
653    async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 {
654        reqwest::Client::new()
655            .post(format!("http://{address}?{query}"))
656            .body(body.to_owned())
657            .send()
658            .await
659            .unwrap()
660            .status()
661            .as_u16()
662    }
663
664    async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 {
665        reqwest::Client::new()
666            .post(format!("http://{address}{path}"))
667            .body(body.to_owned())
668            .send()
669            .await
670            .unwrap()
671            .status()
672            .as_u16()
673    }
674
675    async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 {
676        let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap();
677        reqwest::Client::new()
678            .request(method, format!("http://{address}{path}"))
679            .body(body.to_owned())
680            .send()
681            .await
682            .unwrap()
683            .status()
684            .as_u16()
685    }
686
687    async fn send_bytes(address: SocketAddr, body: Vec<u8>, headers: HeaderMap) -> u16 {
688        reqwest::Client::new()
689            .post(format!("http://{address}/"))
690            .headers(headers)
691            .body(body)
692            .send()
693            .await
694            .unwrap()
695            .status()
696            .as_u16()
697    }
698
699    async fn spawn_ok_collect_n(
700        send: impl std::future::Future<Output = u16> + Send + 'static,
701        rx: impl Stream<Item = Event> + Unpin,
702        n: usize,
703    ) -> Vec<Event> {
704        spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await
705    }
706
707    #[tokio::test]
708    async fn http_multiline_text() {
709        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
710            let body = "test body\ntest body 2";
711
712            let (rx, addr) = source(
713                vec![],
714                vec![],
715                "http_path",
716                "remote_ip",
717                "/",
718                "POST",
719                StatusCode::OK,
720                None,
721                true,
722                EventStatus::Delivered,
723                true,
724                None,
725                None,
726            )
727            .await;
728
729            spawn_ok_collect_n(send(addr, body), rx, 2).await
730        })
731        .await;
732
733        {
734            let event = events.remove(0);
735            let log = event.as_log();
736            assert_eq!(*log.get_message().unwrap(), "test body".into());
737            assert!(log.get_timestamp().is_some());
738            assert_eq!(
739                *log.get_source_type().unwrap(),
740                SimpleHttpConfig::NAME.into()
741            );
742            assert_eq!(log["http_path"], "/".into());
743            assert_event_metadata(log).await;
744        }
745        {
746            let event = events.remove(0);
747            let log = event.as_log();
748            assert_eq!(*log.get_message().unwrap(), "test body 2".into());
749            assert_event_metadata(log).await;
750        }
751    }
752
753    #[tokio::test]
754    async fn http_multiline_text2() {
755        //same as above test but with a newline at the end
756        let body = "test body\ntest body 2\n";
757
758        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
759            let (rx, addr) = source(
760                vec![],
761                vec![],
762                "http_path",
763                "remote_ip",
764                "/",
765                "POST",
766                StatusCode::OK,
767                None,
768                true,
769                EventStatus::Delivered,
770                true,
771                None,
772                None,
773            )
774            .await;
775
776            spawn_ok_collect_n(send(addr, body), rx, 2).await
777        })
778        .await;
779
780        {
781            let event = events.remove(0);
782            let log = event.as_log();
783            assert_eq!(*log.get_message().unwrap(), "test body".into());
784            assert_event_metadata(log).await;
785        }
786        {
787            let event = events.remove(0);
788            let log = event.as_log();
789            assert_eq!(*log.get_message().unwrap(), "test body 2".into());
790            assert_event_metadata(log).await;
791        }
792    }
793
794    #[tokio::test]
795    async fn http_bytes_codec_preserves_newlines() {
796        let body = "foo\nbar";
797
798        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
799            let (rx, addr) = source(
800                vec![],
801                vec![],
802                "http_path",
803                "remote_ip",
804                "/",
805                "POST",
806                StatusCode::OK,
807                None,
808                true,
809                EventStatus::Delivered,
810                true,
811                Some(BytesDecoderConfig::new().into()),
812                None,
813            )
814            .await;
815
816            spawn_ok_collect_n(send(addr, body), rx, 1).await
817        })
818        .await;
819
820        assert_eq!(events.len(), 1);
821
822        {
823            let event = events.remove(0);
824            let log = event.as_log();
825            assert_eq!(*log.get_message().unwrap(), "foo\nbar".into());
826            assert_event_metadata(log).await;
827        }
828    }
829
830    #[tokio::test]
831    async fn http_json_parsing() {
832        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
833            let (rx, addr) = source(
834                vec![],
835                vec![],
836                "http_path",
837                "remote_ip",
838                "/",
839                "POST",
840                StatusCode::OK,
841                None,
842                true,
843                EventStatus::Delivered,
844                true,
845                None,
846                Some(JsonDeserializerConfig::default().into()),
847            )
848            .await;
849
850            spawn_collect_n(
851                async move {
852                    assert_eq!(400, send(addr, "{").await); //malformed
853                    assert_eq!(400, send(addr, r#"{"key"}"#).await); //key without value
854
855                    assert_eq!(200, send(addr, "{}").await); //can be one object or array of objects
856                    assert_eq!(200, send(addr, "[{},{},{}]").await);
857                },
858                rx,
859                2,
860            )
861            .await
862        })
863        .await;
864
865        assert!(events.remove(1).as_log().get_timestamp().is_some());
866        assert!(events.remove(0).as_log().get_timestamp().is_some());
867    }
868
869    #[tokio::test]
870    async fn http_json_values() {
871        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
872            let (rx, addr) = source(
873                vec![],
874                vec![],
875                "http_path",
876                "remote_ip",
877                "/",
878                "POST",
879                StatusCode::OK,
880                None,
881                true,
882                EventStatus::Delivered,
883                true,
884                None,
885                Some(JsonDeserializerConfig::default().into()),
886            )
887            .await;
888
889            spawn_collect_n(
890                async move {
891                    assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await);
892                    assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await);
893                },
894                rx,
895                2,
896            )
897            .await
898        })
899        .await;
900
901        {
902            let event = events.remove(0);
903            let log = event.as_log();
904            assert_eq!(log["key"], "value".into());
905            assert_event_metadata(log).await;
906        }
907        {
908            let event = events.remove(0);
909            let log = event.as_log();
910            assert_eq!(log["key2"], "value2".into());
911            assert_event_metadata(log).await;
912        }
913    }
914
915    #[tokio::test]
916    async fn http_json_dotted_keys() {
917        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
918            let (rx, addr) = source(
919                vec![],
920                vec![],
921                "http_path",
922                "remote_ip",
923                "/",
924                "POST",
925                StatusCode::OK,
926                None,
927                true,
928                EventStatus::Delivered,
929                true,
930                None,
931                Some(JsonDeserializerConfig::default().into()),
932            )
933            .await;
934
935            spawn_collect_n(
936                async move {
937                    assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await);
938                    assert_eq!(
939                        200,
940                        send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await
941                    );
942                },
943                rx,
944                2,
945            )
946            .await
947        })
948        .await;
949
950        {
951            let event = events.remove(0);
952            let log = event.as_log();
953            assert_eq!(
954                log.get(event_path!("dotted.key")).unwrap(),
955                &Value::from("value")
956            );
957        }
958        {
959            let event = events.remove(0);
960            let log = event.as_log();
961            let mut map = ObjectMap::new();
962            map.insert("dotted.key2".into(), Value::from("value2"));
963            assert_eq!(log["nested"], map.into());
964        }
965    }
966
967    #[tokio::test]
968    async fn http_ndjson() {
969        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
970            let (rx, addr) = source(
971                vec![],
972                vec![],
973                "http_path",
974                "remote_ip",
975                "/",
976                "POST",
977                StatusCode::OK,
978                None,
979                true,
980                EventStatus::Delivered,
981                true,
982                None,
983                Some(JsonDeserializerConfig::default().into()),
984            )
985            .await;
986
987            spawn_collect_n(
988                async move {
989                    assert_eq!(
990                        200,
991                        send(addr, r#"[{"key1":"value1"},{"key2":"value2"}]"#).await
992                    );
993
994                    assert_eq!(
995                        200,
996                        send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await
997                    );
998                },
999                rx,
1000                4,
1001            )
1002            .await
1003        })
1004        .await;
1005
1006        {
1007            let event = events.remove(0);
1008            let log = event.as_log();
1009            assert_eq!(log["key1"], "value1".into());
1010            assert_event_metadata(log).await;
1011        }
1012        {
1013            let event = events.remove(0);
1014            let log = event.as_log();
1015            assert_eq!(log["key2"], "value2".into());
1016            assert_event_metadata(log).await;
1017        }
1018        {
1019            let event = events.remove(0);
1020            let log = event.as_log();
1021            assert_eq!(log["key1"], "value1".into());
1022            assert_event_metadata(log).await;
1023        }
1024        {
1025            let event = events.remove(0);
1026            let log = event.as_log();
1027            assert_eq!(log["key2"], "value2".into());
1028            assert_event_metadata(log).await;
1029        }
1030    }
1031
1032    async fn assert_event_metadata(log: &LogEvent) {
1033        assert!(log.get_timestamp().is_some());
1034
1035        let source_type_key_value = log
1036            .get((PathPrefix::Event, log_schema().source_type_key().unwrap()))
1037            .unwrap()
1038            .as_str()
1039            .unwrap();
1040        assert_eq!(source_type_key_value, SimpleHttpConfig::NAME);
1041        assert_eq!(log["http_path"], "/".into());
1042    }
1043
1044    #[tokio::test]
1045    async fn http_headers() {
1046        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1047            let mut headers = HeaderMap::new();
1048            headers.insert("User-Agent", "test_client".parse().unwrap());
1049            headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
1050            headers.insert("X-Test-Header", "true".parse().unwrap());
1051
1052            let (rx, addr) = source(
1053                vec![
1054                    "User-Agent".to_string(),
1055                    "Upgrade-Insecure-Requests".to_string(),
1056                    "X-*".to_string(),
1057                    "AbsentHeader".to_string(),
1058                ],
1059                vec![],
1060                "http_path",
1061                "remote_ip",
1062                "/",
1063                "POST",
1064                StatusCode::OK,
1065                None,
1066                true,
1067                EventStatus::Delivered,
1068                true,
1069                None,
1070                Some(JsonDeserializerConfig::default().into()),
1071            )
1072            .await;
1073
1074            spawn_ok_collect_n(
1075                send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1076                rx,
1077                1,
1078            )
1079            .await
1080        })
1081        .await;
1082
1083        {
1084            let event = events.remove(0);
1085            let log = event.as_log();
1086            assert_eq!(log["key1"], "value1".into());
1087            assert_eq!(log["\"User-Agent\""], "test_client".into());
1088            assert_eq!(log["\"Upgrade-Insecure-Requests\""], "false".into());
1089            assert_eq!(log["\"x-test-header\""], "true".into());
1090            assert_eq!(log["AbsentHeader"], Value::Null);
1091            assert_event_metadata(log).await;
1092        }
1093    }
1094
1095    #[tokio::test]
1096    async fn http_headers_wildcard() {
1097        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1098            let mut headers = HeaderMap::new();
1099            headers.insert("User-Agent", "test_client".parse().unwrap());
1100            headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
1101            // Header that conflicts with an existing field.
1102            headers.insert("key1", "value_from_header".parse().unwrap());
1103
1104            let (rx, addr) = source(
1105                vec!["*".to_string()],
1106                vec![],
1107                "http_path",
1108                "remote_ip",
1109                "/",
1110                "POST",
1111                StatusCode::OK,
1112                None,
1113                true,
1114                EventStatus::Delivered,
1115                true,
1116                None,
1117                Some(JsonDeserializerConfig::default().into()),
1118            )
1119            .await;
1120
1121            spawn_ok_collect_n(
1122                send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1123                rx,
1124                1,
1125            )
1126            .await
1127        })
1128        .await;
1129
1130        {
1131            let event = events.remove(0);
1132            let log = event.as_log();
1133            assert_eq!(log["key1"], "value1".into());
1134            assert_eq!(log["\"user-agent\""], "test_client".into());
1135            assert_eq!(log["\"x-case-sensitive-value\""], "CaseSensitive".into());
1136            assert_event_metadata(log).await;
1137        }
1138    }
1139
1140    #[tokio::test]
1141    async fn http_query() {
1142        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1143            let (rx, addr) = source(
1144                vec![],
1145                vec![
1146                    "source".to_string(),
1147                    "region".to_string(),
1148                    "absent".to_string(),
1149                ],
1150                "http_path",
1151                "remote_ip",
1152                "/",
1153                "POST",
1154                StatusCode::OK,
1155                None,
1156                true,
1157                EventStatus::Delivered,
1158                true,
1159                None,
1160                Some(JsonDeserializerConfig::default().into()),
1161            )
1162            .await;
1163
1164            spawn_ok_collect_n(
1165                send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging&region=gb"),
1166                rx,
1167                1,
1168            )
1169            .await
1170        })
1171        .await;
1172
1173        {
1174            let event = events.remove(0);
1175            let log = event.as_log();
1176            assert_eq!(log["key1"], "value1".into());
1177            assert_eq!(log["source"], "staging".into());
1178            assert_eq!(log["region"], "gb".into());
1179            assert_eq!(log["absent"], Value::Null);
1180            assert_event_metadata(log).await;
1181        }
1182    }
1183
1184    #[tokio::test]
1185    async fn http_query_wildcard() {
1186        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1187            let (rx, addr) = source(
1188                vec![],
1189                vec!["*".to_string()],
1190                "http_path",
1191                "remote_ip",
1192                "/",
1193                "POST",
1194                StatusCode::OK,
1195                None,
1196                true,
1197                EventStatus::Delivered,
1198                true,
1199                None,
1200                Some(JsonDeserializerConfig::default().into()),
1201            )
1202            .await;
1203
1204            spawn_ok_collect_n(
1205                send_with_query(
1206                    addr,
1207                    "{\"key1\":\"value1\",\"key2\":\"value2\"}",
1208                    "source=staging&region=gb&key1=value_from_query",
1209                ),
1210                rx,
1211                1,
1212            )
1213            .await
1214        })
1215        .await;
1216
1217        {
1218            let event = events.remove(0);
1219            let log = event.as_log();
1220            assert_eq!(log["key1"], "value_from_query".into());
1221            assert_eq!(log["key2"], "value2".into());
1222            assert_eq!(log["source"], "staging".into());
1223            assert_eq!(log["region"], "gb".into());
1224            assert_event_metadata(log).await;
1225        }
1226    }
1227
1228    #[tokio::test]
1229    async fn http_gzip_deflate() {
1230        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1231            let body = "test body";
1232
1233            let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1234            encoder.write_all(body.as_bytes()).unwrap();
1235            let body = encoder.finish().unwrap();
1236
1237            let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1238            encoder.write_all(body.as_slice()).unwrap();
1239            let body = encoder.finish().unwrap();
1240
1241            let mut headers = HeaderMap::new();
1242            headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap());
1243
1244            let (rx, addr) = source(
1245                vec![],
1246                vec![],
1247                "http_path",
1248                "remote_ip",
1249                "/",
1250                "POST",
1251                StatusCode::OK,
1252                None,
1253                true,
1254                EventStatus::Delivered,
1255                true,
1256                None,
1257                None,
1258            )
1259            .await;
1260
1261            spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await
1262        })
1263        .await;
1264
1265        {
1266            let event = events.remove(0);
1267            let log = event.as_log();
1268            assert_eq!(*log.get_message().unwrap(), "test body".into());
1269            assert_event_metadata(log).await;
1270        }
1271    }
1272
1273    #[tokio::test]
1274    async fn http_path() {
1275        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1276            let (rx, addr) = source(
1277                vec![],
1278                vec![],
1279                "vector_http_path",
1280                "vector_remote_ip",
1281                "/event/path",
1282                "POST",
1283                StatusCode::OK,
1284                None,
1285                true,
1286                EventStatus::Delivered,
1287                true,
1288                None,
1289                Some(JsonDeserializerConfig::default().into()),
1290            )
1291            .await;
1292
1293            spawn_ok_collect_n(
1294                send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"),
1295                rx,
1296                1,
1297            )
1298            .await
1299        })
1300        .await;
1301
1302        {
1303            let event = events.remove(0);
1304            let log = event.as_log();
1305            assert_eq!(log["key1"], "value1".into());
1306            assert_eq!(log["vector_http_path"], "/event/path".into());
1307            assert!(log.get_timestamp().is_some());
1308            assert_eq!(
1309                *log.get_source_type().unwrap(),
1310                SimpleHttpConfig::NAME.into()
1311            );
1312        }
1313    }
1314
1315    #[tokio::test]
1316    async fn http_path_no_restriction() {
1317        let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1318            let (rx, addr) = source(
1319                vec![],
1320                vec![],
1321                "vector_http_path",
1322                "vector_remote_ip",
1323                "/event",
1324                "POST",
1325                StatusCode::OK,
1326                None,
1327                false,
1328                EventStatus::Delivered,
1329                true,
1330                None,
1331                Some(JsonDeserializerConfig::default().into()),
1332            )
1333            .await;
1334
1335            spawn_collect_n(
1336                async move {
1337                    assert_eq!(
1338                        200,
1339                        send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await
1340                    );
1341                    assert_eq!(
1342                        200,
1343                        send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await
1344                    );
1345                },
1346                rx,
1347                2,
1348            )
1349            .await
1350        })
1351        .await;
1352
1353        {
1354            let event = events.remove(0);
1355            let log = event.as_log();
1356            assert_eq!(log["key1"], "value1".into());
1357            assert_eq!(log["vector_http_path"], "/event/path1".into());
1358            assert!(log.get_timestamp().is_some());
1359            assert_eq!(
1360                *log.get_source_type().unwrap(),
1361                SimpleHttpConfig::NAME.into()
1362            );
1363        }
1364        {
1365            let event = events.remove(0);
1366            let log = event.as_log();
1367            assert_eq!(log["key2"], "value2".into());
1368            assert_eq!(log["vector_http_path"], "/event/path2".into());
1369            assert!(log.get_timestamp().is_some());
1370            assert_eq!(
1371                *log.get_source_type().unwrap(),
1372                SimpleHttpConfig::NAME.into()
1373            );
1374        }
1375    }
1376
1377    #[tokio::test]
1378    async fn http_wrong_path() {
1379        components::init_test();
1380        let (_rx, addr) = source(
1381            vec![],
1382            vec![],
1383            "vector_http_path",
1384            "vector_remote_ip",
1385            "/",
1386            "POST",
1387            StatusCode::OK,
1388            None,
1389            true,
1390            EventStatus::Delivered,
1391            true,
1392            None,
1393            Some(JsonDeserializerConfig::default().into()),
1394        )
1395        .await;
1396
1397        assert_eq!(
1398            404,
1399            send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await
1400        );
1401    }
1402
1403    #[tokio::test]
1404    async fn http_status_code() {
1405        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
1406            let (rx, addr) = source(
1407                vec![],
1408                vec![],
1409                "http_path",
1410                "remote_ip",
1411                "/",
1412                "POST",
1413                StatusCode::ACCEPTED,
1414                None,
1415                true,
1416                EventStatus::Delivered,
1417                true,
1418                None,
1419                None,
1420            )
1421            .await;
1422
1423            spawn_collect_n(
1424                async move {
1425                    assert_eq!(
1426                        StatusCode::ACCEPTED,
1427                        send(addr, "{\"key1\":\"value1\"}").await
1428                    );
1429                },
1430                rx,
1431                1,
1432            )
1433            .await;
1434        })
1435        .await;
1436    }
1437
1438    #[tokio::test]
1439    async fn http_delivery_failure() {
1440        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1441            let (rx, addr) = source(
1442                vec![],
1443                vec![],
1444                "http_path",
1445                "remote_ip",
1446                "/",
1447                "POST",
1448                StatusCode::OK,
1449                None,
1450                true,
1451                EventStatus::Rejected,
1452                true,
1453                None,
1454                None,
1455            )
1456            .await;
1457
1458            spawn_collect_n(
1459                async move {
1460                    assert_eq!(400, send(addr, "test body\n").await);
1461                },
1462                rx,
1463                1,
1464            )
1465            .await;
1466        })
1467        .await;
1468    }
1469
1470    #[tokio::test]
1471    async fn ignores_disabled_acknowledgements() {
1472        let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1473            let (rx, addr) = source(
1474                vec![],
1475                vec![],
1476                "http_path",
1477                "remote_ip",
1478                "/",
1479                "POST",
1480                StatusCode::OK,
1481                None,
1482                true,
1483                EventStatus::Rejected,
1484                false,
1485                None,
1486                None,
1487            )
1488            .await;
1489
1490            spawn_collect_n(
1491                async move {
1492                    assert_eq!(200, send(addr, "test body\n").await);
1493                },
1494                rx,
1495                1,
1496            )
1497            .await
1498        })
1499        .await;
1500
1501        assert_eq!(events.len(), 1);
1502    }
1503
1504    #[tokio::test]
1505    async fn http_get_method() {
1506        components::init_test();
1507        let (_rx, addr) = source(
1508            vec![],
1509            vec![],
1510            "http_path",
1511            "remote_ip",
1512            "/",
1513            "GET",
1514            StatusCode::OK,
1515            None,
1516            true,
1517            EventStatus::Delivered,
1518            true,
1519            None,
1520            None,
1521        )
1522        .await;
1523
1524        assert_eq!(200, send_request(addr, "GET", "", "/").await);
1525    }
1526
1527    #[tokio::test]
1528    async fn returns_401_when_required_auth_is_missing() {
1529        components::init_test();
1530        let (_rx, addr) = source(
1531            vec![],
1532            vec![],
1533            "http_path",
1534            "remote_ip",
1535            "/",
1536            "GET",
1537            StatusCode::OK,
1538            Some(HttpServerAuthConfig::Basic {
1539                username: "test".to_string(),
1540                password: "test".to_string().into(),
1541            }),
1542            true,
1543            EventStatus::Delivered,
1544            true,
1545            None,
1546            None,
1547        )
1548        .await;
1549
1550        assert_eq!(401, send_request(addr, "GET", "", "/").await);
1551    }
1552
1553    #[tokio::test]
1554    async fn returns_401_when_required_auth_is_wrong() {
1555        components::init_test();
1556        let (_rx, addr) = source(
1557            vec![],
1558            vec![],
1559            "http_path",
1560            "remote_ip",
1561            "/",
1562            "POST",
1563            StatusCode::OK,
1564            Some(HttpServerAuthConfig::Basic {
1565                username: "test".to_string(),
1566                password: "test".to_string().into(),
1567            }),
1568            true,
1569            EventStatus::Delivered,
1570            true,
1571            None,
1572            None,
1573        )
1574        .await;
1575
1576        let mut headers = HeaderMap::new();
1577        headers.insert(
1578            AUTHORIZATION,
1579            Authorization::basic("wrong", "test").0.encode(),
1580        );
1581        assert_eq!(401, send_with_headers(addr, "", headers).await);
1582    }
1583
1584    #[tokio::test]
1585    async fn http_get_with_correct_auth() {
1586        components::init_test();
1587        let (_rx, addr) = source(
1588            vec![],
1589            vec![],
1590            "http_path",
1591            "remote_ip",
1592            "/",
1593            "POST",
1594            StatusCode::OK,
1595            Some(HttpServerAuthConfig::Basic {
1596                username: "test".to_string(),
1597                password: "test".to_string().into(),
1598            }),
1599            true,
1600            EventStatus::Delivered,
1601            true,
1602            None,
1603            None,
1604        )
1605        .await;
1606
1607        let mut headers = HeaderMap::new();
1608        headers.insert(
1609            AUTHORIZATION,
1610            Authorization::basic("test", "test").0.encode(),
1611        );
1612        assert_eq!(200, send_with_headers(addr, "", headers).await);
1613    }
1614
1615    #[test]
1616    fn output_schema_definition_vector_namespace() {
1617        let config = SimpleHttpConfig {
1618            log_namespace: Some(true),
1619            ..Default::default()
1620        };
1621
1622        let definitions = config
1623            .outputs(LogNamespace::Vector)
1624            .remove(0)
1625            .schema_definition(true);
1626
1627        let expected_definition =
1628            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1629                .with_meaning(OwnedTargetPath::event_root(), "message")
1630                .with_metadata_field(
1631                    &owned_value_path!("vector", "source_type"),
1632                    Kind::bytes(),
1633                    None,
1634                )
1635                .with_metadata_field(
1636                    &owned_value_path!(SimpleHttpConfig::NAME, "path"),
1637                    Kind::bytes(),
1638                    None,
1639                )
1640                .with_metadata_field(
1641                    &owned_value_path!(SimpleHttpConfig::NAME, "headers"),
1642                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1643                    None,
1644                )
1645                .with_metadata_field(
1646                    &owned_value_path!(SimpleHttpConfig::NAME, "query_parameters"),
1647                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1648                    None,
1649                )
1650                .with_metadata_field(
1651                    &owned_value_path!(SimpleHttpConfig::NAME, "host"),
1652                    Kind::bytes().or_undefined(),
1653                    None,
1654                )
1655                .with_metadata_field(
1656                    &owned_value_path!("vector", "ingest_timestamp"),
1657                    Kind::timestamp(),
1658                    None,
1659                );
1660
1661        assert_eq!(definitions, Some(expected_definition))
1662    }
1663
1664    #[test]
1665    fn output_schema_definition_legacy_namespace() {
1666        let config = SimpleHttpConfig::default();
1667
1668        let definitions = config
1669            .outputs(LogNamespace::Legacy)
1670            .remove(0)
1671            .schema_definition(true);
1672
1673        let expected_definition = Definition::new_with_default_metadata(
1674            Kind::object(Collection::empty()),
1675            [LogNamespace::Legacy],
1676        )
1677        .with_event_field(
1678            &owned_value_path!("message"),
1679            Kind::bytes(),
1680            Some("message"),
1681        )
1682        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1683        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1684        .with_event_field(&owned_value_path!("path"), Kind::bytes(), None)
1685        .with_event_field(
1686            &owned_value_path!("host"),
1687            Kind::bytes().or_undefined(),
1688            None,
1689        )
1690        .unknown_fields(Kind::bytes());
1691
1692        assert_eq!(definitions, Some(expected_definition))
1693    }
1694
1695    #[test]
1696    fn validate_remove_duplicates() {
1697        let mut list = vec![
1698            "a".to_owned(),
1699            "b".to_owned(),
1700            "c".to_owned(),
1701            "d".to_owned(),
1702        ];
1703
1704        // no duplicates should be identical
1705        {
1706            let list_dedup = remove_duplicates(list.clone(), "foo");
1707
1708            assert_eq!(list, list_dedup);
1709        }
1710
1711        list.push("b".to_owned());
1712
1713        // remove duplicate "b"
1714        {
1715            let list_dedup = remove_duplicates(list.clone(), "foo");
1716            assert_eq!(
1717                vec![
1718                    "a".to_owned(),
1719                    "b".to_owned(),
1720                    "c".to_owned(),
1721                    "d".to_owned()
1722                ],
1723                list_dedup
1724            );
1725        }
1726    }
1727
1728    impl ValidatableComponent for SimpleHttpConfig {
1729        fn validation_configuration() -> ValidationConfiguration {
1730            let config = Self {
1731                decoding: Some(DeserializerConfig::Json(Default::default())),
1732                ..Default::default()
1733            };
1734
1735            let log_namespace: LogNamespace = config.log_namespace.unwrap_or(false).into();
1736
1737            let listen_addr_http = format!("http://{}/", config.address);
1738            let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
1739
1740            let external_resource = ExternalResource::new(
1741                ResourceDirection::Push,
1742                HttpResourceConfig::from_parts(uri, Some(config.method.into())),
1743                config
1744                    .get_decoding_config()
1745                    .expect("should not fail to get decoding config"),
1746            );
1747
1748            ValidationConfiguration::from_source(
1749                Self::NAME,
1750                log_namespace,
1751                vec![ComponentTestCaseConfig::from_source(
1752                    config,
1753                    None,
1754                    Some(external_resource),
1755                )],
1756            )
1757        }
1758    }
1759
1760    register_validatable_component!(SimpleHttpConfig);
1761}