1use crate::common::http::{server_auth::HttpServerAuthConfig, ErrorMessage};
2use std::{collections::HashMap, net::SocketAddr};
3
4use bytes::{Bytes, BytesMut};
5use chrono::Utc;
6use http::StatusCode;
7use http_serde;
8use tokio_util::codec::Decoder as _;
9use vrl::value::{kind::Collection, Kind};
10use warp::http::HeaderMap;
11
12use vector_lib::codecs::{
13 decoding::{DeserializerConfig, FramingConfig},
14 BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
15 NewlineDelimitedDecoderConfig,
16};
17use vector_lib::configurable::configurable_component;
18use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
19use vector_lib::{
20 config::{DataType, LegacyKey, LogNamespace},
21 schema::Definition,
22};
23
24use crate::{
25 codecs::{Decoder, DecodingConfig},
26 config::{
27 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
28 SourceOutput,
29 },
30 event::Event,
31 http::KeepaliveConfig,
32 serde::{bool_or_struct, default_decoding},
33 sources::util::{
34 http::{add_headers, add_query_parameters, HttpMethod},
35 Encoding, HttpSource,
36 },
37 tls::TlsEnableableConfig,
38};
39
40#[configurable_component(source("http", "Host an HTTP endpoint to receive logs."))]
42#[configurable(metadata(deprecated))]
43#[derive(Clone, Debug)]
44pub struct HttpConfig(SimpleHttpConfig);
45
46impl GenerateConfig for HttpConfig {
47 fn generate_config() -> toml::Value {
48 <SimpleHttpConfig as GenerateConfig>::generate_config()
49 }
50}
51
52#[async_trait::async_trait]
53#[typetag::serde(name = "http")]
54impl SourceConfig for HttpConfig {
55 async fn build(&self, cx: SourceContext) -> vector_lib::Result<super::Source> {
56 self.0.build(cx).await
57 }
58
59 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
60 self.0.outputs(global_log_namespace)
61 }
62
63 fn resources(&self) -> Vec<Resource> {
64 self.0.resources()
65 }
66
67 fn can_acknowledge(&self) -> bool {
68 self.0.can_acknowledge()
69 }
70}
71
72#[configurable_component(source("http_server", "Host an HTTP endpoint to receive logs."))]
74#[derive(Clone, Debug)]
75pub struct SimpleHttpConfig {
76 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
80 #[configurable(metadata(docs::examples = "localhost:80"))]
81 address: SocketAddr,
82
83 #[serde(default)]
87 encoding: Option<Encoding>,
88
89 #[serde(default)]
97 #[configurable(metadata(docs::examples = "User-Agent"))]
98 #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
99 #[configurable(metadata(docs::examples = "X-*"))]
100 #[configurable(metadata(docs::examples = "*"))]
101 headers: Vec<String>,
102
103 #[serde(default)]
111 #[configurable(metadata(docs::examples = "application"))]
112 #[configurable(metadata(docs::examples = "source"))]
113 #[configurable(metadata(docs::examples = "param*"))]
114 #[configurable(metadata(docs::examples = "*"))]
115 query_parameters: Vec<String>,
116
117 #[configurable(derived)]
118 auth: Option<HttpServerAuthConfig>,
119
120 #[serde(default = "crate::serde::default_true")]
128 strict_path: bool,
129
130 #[serde(default = "default_path")]
132 #[configurable(metadata(docs::examples = "/event/path"))]
133 #[configurable(metadata(docs::examples = "/logs"))]
134 path: String,
135
136 #[serde(default = "default_path_key")]
138 #[configurable(metadata(docs::examples = "vector_http_path"))]
139 path_key: OptionalValuePath,
140
141 #[serde(default = "default_host_key")]
143 #[configurable(metadata(docs::examples = "hostname"))]
144 host_key: OptionalValuePath,
145
146 #[serde(default = "default_http_method")]
148 method: HttpMethod,
149
150 #[configurable(metadata(docs::examples = 202))]
152 #[configurable(metadata(docs::numeric_type = "uint"))]
153 #[serde(with = "http_serde::status_code")]
154 #[serde(default = "default_http_response_code")]
155 response_code: StatusCode,
156
157 #[configurable(derived)]
158 tls: Option<TlsEnableableConfig>,
159
160 #[configurable(derived)]
161 framing: Option<FramingConfig>,
162
163 #[configurable(derived)]
164 decoding: Option<DeserializerConfig>,
165
166 #[configurable(derived)]
167 #[serde(default, deserialize_with = "bool_or_struct")]
168 acknowledgements: SourceAcknowledgementsConfig,
169
170 #[configurable(metadata(docs::hidden))]
172 #[serde(default)]
173 log_namespace: Option<bool>,
174
175 #[configurable(derived)]
176 #[serde(default)]
177 keepalive: KeepaliveConfig,
178}
179
180impl SimpleHttpConfig {
181 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
183 let mut schema_definition = self
184 .decoding
185 .as_ref()
186 .unwrap_or(&default_decoding())
187 .schema_definition(log_namespace)
188 .with_source_metadata(
189 SimpleHttpConfig::NAME,
190 self.path_key.path.clone().map(LegacyKey::InsertIfEmpty),
191 &owned_value_path!("path"),
192 Kind::bytes(),
193 None,
194 )
195 .with_source_metadata(
197 SimpleHttpConfig::NAME,
198 None,
199 &owned_value_path!("headers"),
200 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
201 None,
202 )
203 .with_source_metadata(
205 SimpleHttpConfig::NAME,
206 None,
207 &owned_value_path!("query_parameters"),
208 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
209 None,
210 )
211 .with_source_metadata(
212 SimpleHttpConfig::NAME,
213 self.host_key.path.clone().map(LegacyKey::Overwrite),
214 &owned_value_path!("host"),
215 Kind::bytes().or_undefined(),
216 None,
217 )
218 .with_standard_vector_source_metadata();
219
220 if log_namespace == LogNamespace::Legacy {
222 schema_definition = schema_definition.unknown_fields(Kind::bytes());
223 }
224
225 schema_definition
226 }
227
228 fn get_decoding_config(&self) -> crate::Result<DecodingConfig> {
229 if self.encoding.is_some() && (self.framing.is_some() || self.decoding.is_some()) {
230 return Err("Using `encoding` is deprecated and does not have any effect when `decoding` or `framing` is provided. Configure `framing` and `decoding` instead.".into());
231 }
232
233 let (framing, decoding) = if let Some(encoding) = self.encoding {
234 match encoding {
235 Encoding::Text => (
236 NewlineDelimitedDecoderConfig::new().into(),
237 BytesDeserializerConfig::new().into(),
238 ),
239 Encoding::Json => (
240 BytesDecoderConfig::new().into(),
241 JsonDeserializerConfig::default().into(),
242 ),
243 Encoding::Ndjson => (
244 NewlineDelimitedDecoderConfig::new().into(),
245 JsonDeserializerConfig::default().into(),
246 ),
247 Encoding::Binary => (
248 BytesDecoderConfig::new().into(),
249 BytesDeserializerConfig::new().into(),
250 ),
251 }
252 } else {
253 let decoding = self.decoding.clone().unwrap_or_else(default_decoding);
254 let framing = self
255 .framing
256 .clone()
257 .unwrap_or_else(|| decoding.default_stream_framing());
258 (framing, decoding)
259 };
260
261 Ok(DecodingConfig::new(
262 framing,
263 decoding,
264 self.log_namespace.unwrap_or(false).into(),
265 ))
266 }
267}
268
269impl Default for SimpleHttpConfig {
270 fn default() -> Self {
271 Self {
272 address: "0.0.0.0:8080".parse().unwrap(),
273 encoding: None,
274 headers: Vec::new(),
275 query_parameters: Vec::new(),
276 tls: None,
277 auth: None,
278 path: default_path(),
279 path_key: default_path_key(),
280 host_key: default_host_key(),
281 method: default_http_method(),
282 response_code: default_http_response_code(),
283 strict_path: true,
284 framing: None,
285 decoding: Some(default_decoding()),
286 acknowledgements: SourceAcknowledgementsConfig::default(),
287 log_namespace: None,
288 keepalive: KeepaliveConfig::default(),
289 }
290 }
291}
292
293impl_generate_config_from_default!(SimpleHttpConfig);
294
295const fn default_http_method() -> HttpMethod {
296 HttpMethod::Post
297}
298
299fn default_path() -> String {
300 "/".to_string()
301}
302
303fn default_path_key() -> OptionalValuePath {
304 OptionalValuePath::from(owned_value_path!("path"))
305}
306
307fn default_host_key() -> OptionalValuePath {
308 OptionalValuePath::none()
309}
310
311const fn default_http_response_code() -> StatusCode {
312 StatusCode::OK
313}
314
315pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
317 list.sort();
318
319 let mut dedup = false;
320 for (idx, name) in list.iter().enumerate() {
321 if idx < list.len() - 1 && list[idx] == list[idx + 1] {
322 warn!(
323 "`{}` configuration contains duplicate entry for `{}`. Removing duplicate.",
324 list_name, name
325 );
326 dedup = true;
327 }
328 }
329
330 if dedup {
331 list.dedup();
332 }
333 list
334}
335
336fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
338 addr.ip().to_string()
339}
340
341#[derive(Clone)]
342pub enum HttpConfigParamKind {
343 Glob(glob::Pattern),
344 Exact(String),
345}
346
347pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
348 list.iter()
349 .map(|s| match s.contains('*') {
350 true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
351 false => Ok(HttpConfigParamKind::Exact(s.to_string())),
352 })
353 .collect::<crate::Result<Vec<HttpConfigParamKind>>>()
354}
355
356#[async_trait::async_trait]
357#[typetag::serde(name = "http_server")]
358impl SourceConfig for SimpleHttpConfig {
359 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
360 let log_namespace = cx.log_namespace(self.log_namespace);
361 let decoder = self
362 .get_decoding_config()?
363 .build()?
364 .with_log_namespace(log_namespace);
365
366 let source = SimpleHttpSource {
367 headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
368 query_parameters: build_param_matcher(&remove_duplicates(
369 self.query_parameters.clone(),
370 "query_parameters",
371 ))?,
372 path_key: self.path_key.clone(),
373 host_key: self.host_key.clone(),
374 decoder,
375 log_namespace,
376 };
377 source.run(
378 self.address,
379 self.path.as_str(),
380 self.method,
381 self.response_code,
382 self.strict_path,
383 self.tls.as_ref(),
384 self.auth.as_ref(),
385 cx,
386 self.acknowledgements,
387 self.keepalive.clone(),
388 )
389 }
390
391 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
392 let log_namespace = global_log_namespace.merge(self.log_namespace);
395
396 let schema_definition = self.schema_definition(log_namespace);
397
398 vec![SourceOutput::new_maybe_logs(
399 self.decoding
400 .as_ref()
401 .map(|d| d.output_type())
402 .unwrap_or(DataType::Log),
403 schema_definition,
404 )]
405 }
406
407 fn resources(&self) -> Vec<Resource> {
408 vec![Resource::tcp(self.address)]
409 }
410
411 fn can_acknowledge(&self) -> bool {
412 true
413 }
414}
415
416#[derive(Clone)]
417struct SimpleHttpSource {
418 headers: Vec<HttpConfigParamKind>,
419 query_parameters: Vec<HttpConfigParamKind>,
420 path_key: OptionalValuePath,
421 host_key: OptionalValuePath,
422 decoder: Decoder,
423 log_namespace: LogNamespace,
424}
425
426impl HttpSource for SimpleHttpSource {
427 fn enrich_events(
430 &self,
431 events: &mut [Event],
432 request_path: &str,
433 headers: &HeaderMap,
434 query_parameters: &HashMap<String, String>,
435 source_ip: Option<&SocketAddr>,
436 ) {
437 let now = Utc::now();
438 for event in events.iter_mut() {
439 match event {
440 Event::Log(log) => {
441 self.log_namespace.insert_source_metadata(
443 SimpleHttpConfig::NAME,
444 log,
445 self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
446 path!("path"),
447 request_path.to_owned(),
448 );
449
450 self.log_namespace.insert_standard_vector_source_metadata(
451 log,
452 SimpleHttpConfig::NAME,
453 now,
454 );
455
456 if let Some(addr) = source_ip {
457 self.log_namespace.insert_source_metadata(
458 SimpleHttpConfig::NAME,
459 log,
460 self.host_key.path.as_ref().map(LegacyKey::Overwrite),
461 path!("host"),
462 socket_addr_to_ip_string(addr),
463 );
464 }
465 }
466 _ => {
467 continue;
468 }
469 }
470 }
471
472 add_headers(
473 events,
474 &self.headers,
475 headers,
476 self.log_namespace,
477 SimpleHttpConfig::NAME,
478 );
479
480 add_query_parameters(
481 events,
482 &self.query_parameters,
483 query_parameters,
484 self.log_namespace,
485 SimpleHttpConfig::NAME,
486 );
487 }
488
489 fn build_events(
490 &self,
491 body: Bytes,
492 _header_map: &HeaderMap,
493 _query_parameters: &HashMap<String, String>,
494 _request_path: &str,
495 ) -> Result<Vec<Event>, ErrorMessage> {
496 let mut decoder = self.decoder.clone();
497 let mut events = Vec::new();
498 let mut bytes = BytesMut::new();
499 bytes.extend_from_slice(&body);
500
501 loop {
502 match decoder.decode_eof(&mut bytes) {
503 Ok(Some((next, _))) => {
504 events.extend(next);
505 }
506 Ok(None) => break,
507 Err(error) => {
508 return Err(ErrorMessage::new(
511 StatusCode::BAD_REQUEST,
512 format!("Failed decoding body: {error}"),
513 ));
514 }
515 }
516 }
517
518 Ok(events)
519 }
520
521 fn enable_source_ip(&self) -> bool {
522 self.host_key.path.is_some()
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use std::str::FromStr;
529 use std::{io::Write, net::SocketAddr};
530
531 use flate2::{
532 write::{GzEncoder, ZlibEncoder},
533 Compression,
534 };
535 use futures::Stream;
536 use headers::authorization::Credentials;
537 use headers::Authorization;
538 use http::header::AUTHORIZATION;
539 use http::{HeaderMap, Method, StatusCode, Uri};
540 use similar_asserts::assert_eq;
541 use vector_lib::codecs::{
542 decoding::{DeserializerConfig, FramingConfig},
543 BytesDecoderConfig, JsonDeserializerConfig,
544 };
545 use vector_lib::config::LogNamespace;
546 use vector_lib::event::LogEvent;
547 use vector_lib::lookup::lookup_v2::OptionalValuePath;
548 use vector_lib::lookup::{event_path, owned_value_path, OwnedTargetPath, PathPrefix};
549 use vector_lib::schema::Definition;
550 use vrl::value::{kind::Collection, Kind, ObjectMap};
551
552 use crate::common::http::server_auth::HttpServerAuthConfig;
553 use crate::sources::http_server::HttpMethod;
554 use crate::{
555 components::validation::prelude::*,
556 config::{log_schema, SourceConfig, SourceContext},
557 event::{Event, EventStatus, Value},
558 test_util::{
559 components::{self, assert_source_compliance, HTTP_PUSH_SOURCE_TAGS},
560 next_addr, spawn_collect_n, wait_for_tcp,
561 },
562 SourceSender,
563 };
564
565 use super::{remove_duplicates, SimpleHttpConfig};
566
567 #[test]
568 fn generate_config() {
569 crate::test_util::test_generate_config::<SimpleHttpConfig>();
570 }
571
572 #[allow(clippy::too_many_arguments)]
573 async fn source<'a>(
574 headers: Vec<String>,
575 query_parameters: Vec<String>,
576 path_key: &'a str,
577 host_key: &'a str,
578 path: &'a str,
579 method: &'a str,
580 response_code: StatusCode,
581 auth: Option<HttpServerAuthConfig>,
582 strict_path: bool,
583 status: EventStatus,
584 acknowledgements: bool,
585 framing: Option<FramingConfig>,
586 decoding: Option<DeserializerConfig>,
587 ) -> (impl Stream<Item = Event> + 'a, SocketAddr) {
588 let (sender, recv) = SourceSender::new_test_finalize(status);
589 let address = next_addr();
590 let path = path.to_owned();
591 let host_key = OptionalValuePath::from(owned_value_path!(host_key));
592 let path_key = OptionalValuePath::from(owned_value_path!(path_key));
593 let context = SourceContext::new_test(sender, None);
594 let method = match Method::from_str(method).unwrap() {
595 Method::GET => HttpMethod::Get,
596 Method::POST => HttpMethod::Post,
597 _ => HttpMethod::Post,
598 };
599
600 tokio::spawn(async move {
601 SimpleHttpConfig {
602 address,
603 headers,
604 encoding: None,
605 query_parameters,
606 response_code,
607 tls: None,
608 auth,
609 strict_path,
610 path_key,
611 host_key,
612 path,
613 method,
614 framing,
615 decoding,
616 acknowledgements: acknowledgements.into(),
617 log_namespace: None,
618 keepalive: Default::default(),
619 }
620 .build(context)
621 .await
622 .unwrap()
623 .await
624 .unwrap();
625 });
626 wait_for_tcp(address).await;
627 (recv, address)
628 }
629
630 async fn send(address: SocketAddr, body: &str) -> u16 {
631 reqwest::Client::new()
632 .post(format!("http://{address}/"))
633 .body(body.to_owned())
634 .send()
635 .await
636 .unwrap()
637 .status()
638 .as_u16()
639 }
640
641 async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 {
642 reqwest::Client::new()
643 .post(format!("http://{address}/"))
644 .headers(headers)
645 .body(body.to_owned())
646 .send()
647 .await
648 .unwrap()
649 .status()
650 .as_u16()
651 }
652
653 async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 {
654 reqwest::Client::new()
655 .post(format!("http://{address}?{query}"))
656 .body(body.to_owned())
657 .send()
658 .await
659 .unwrap()
660 .status()
661 .as_u16()
662 }
663
664 async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 {
665 reqwest::Client::new()
666 .post(format!("http://{address}{path}"))
667 .body(body.to_owned())
668 .send()
669 .await
670 .unwrap()
671 .status()
672 .as_u16()
673 }
674
675 async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 {
676 let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap();
677 reqwest::Client::new()
678 .request(method, format!("http://{address}{path}"))
679 .body(body.to_owned())
680 .send()
681 .await
682 .unwrap()
683 .status()
684 .as_u16()
685 }
686
687 async fn send_bytes(address: SocketAddr, body: Vec<u8>, headers: HeaderMap) -> u16 {
688 reqwest::Client::new()
689 .post(format!("http://{address}/"))
690 .headers(headers)
691 .body(body)
692 .send()
693 .await
694 .unwrap()
695 .status()
696 .as_u16()
697 }
698
699 async fn spawn_ok_collect_n(
700 send: impl std::future::Future<Output = u16> + Send + 'static,
701 rx: impl Stream<Item = Event> + Unpin,
702 n: usize,
703 ) -> Vec<Event> {
704 spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await
705 }
706
707 #[tokio::test]
708 async fn http_multiline_text() {
709 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
710 let body = "test body\ntest body 2";
711
712 let (rx, addr) = source(
713 vec![],
714 vec![],
715 "http_path",
716 "remote_ip",
717 "/",
718 "POST",
719 StatusCode::OK,
720 None,
721 true,
722 EventStatus::Delivered,
723 true,
724 None,
725 None,
726 )
727 .await;
728
729 spawn_ok_collect_n(send(addr, body), rx, 2).await
730 })
731 .await;
732
733 {
734 let event = events.remove(0);
735 let log = event.as_log();
736 assert_eq!(*log.get_message().unwrap(), "test body".into());
737 assert!(log.get_timestamp().is_some());
738 assert_eq!(
739 *log.get_source_type().unwrap(),
740 SimpleHttpConfig::NAME.into()
741 );
742 assert_eq!(log["http_path"], "/".into());
743 assert_event_metadata(log).await;
744 }
745 {
746 let event = events.remove(0);
747 let log = event.as_log();
748 assert_eq!(*log.get_message().unwrap(), "test body 2".into());
749 assert_event_metadata(log).await;
750 }
751 }
752
753 #[tokio::test]
754 async fn http_multiline_text2() {
755 let body = "test body\ntest body 2\n";
757
758 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
759 let (rx, addr) = source(
760 vec![],
761 vec![],
762 "http_path",
763 "remote_ip",
764 "/",
765 "POST",
766 StatusCode::OK,
767 None,
768 true,
769 EventStatus::Delivered,
770 true,
771 None,
772 None,
773 )
774 .await;
775
776 spawn_ok_collect_n(send(addr, body), rx, 2).await
777 })
778 .await;
779
780 {
781 let event = events.remove(0);
782 let log = event.as_log();
783 assert_eq!(*log.get_message().unwrap(), "test body".into());
784 assert_event_metadata(log).await;
785 }
786 {
787 let event = events.remove(0);
788 let log = event.as_log();
789 assert_eq!(*log.get_message().unwrap(), "test body 2".into());
790 assert_event_metadata(log).await;
791 }
792 }
793
794 #[tokio::test]
795 async fn http_bytes_codec_preserves_newlines() {
796 let body = "foo\nbar";
797
798 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
799 let (rx, addr) = source(
800 vec![],
801 vec![],
802 "http_path",
803 "remote_ip",
804 "/",
805 "POST",
806 StatusCode::OK,
807 None,
808 true,
809 EventStatus::Delivered,
810 true,
811 Some(BytesDecoderConfig::new().into()),
812 None,
813 )
814 .await;
815
816 spawn_ok_collect_n(send(addr, body), rx, 1).await
817 })
818 .await;
819
820 assert_eq!(events.len(), 1);
821
822 {
823 let event = events.remove(0);
824 let log = event.as_log();
825 assert_eq!(*log.get_message().unwrap(), "foo\nbar".into());
826 assert_event_metadata(log).await;
827 }
828 }
829
830 #[tokio::test]
831 async fn http_json_parsing() {
832 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
833 let (rx, addr) = source(
834 vec![],
835 vec![],
836 "http_path",
837 "remote_ip",
838 "/",
839 "POST",
840 StatusCode::OK,
841 None,
842 true,
843 EventStatus::Delivered,
844 true,
845 None,
846 Some(JsonDeserializerConfig::default().into()),
847 )
848 .await;
849
850 spawn_collect_n(
851 async move {
852 assert_eq!(400, send(addr, "{").await); assert_eq!(400, send(addr, r#"{"key"}"#).await); assert_eq!(200, send(addr, "{}").await); assert_eq!(200, send(addr, "[{},{},{}]").await);
857 },
858 rx,
859 2,
860 )
861 .await
862 })
863 .await;
864
865 assert!(events.remove(1).as_log().get_timestamp().is_some());
866 assert!(events.remove(0).as_log().get_timestamp().is_some());
867 }
868
869 #[tokio::test]
870 async fn http_json_values() {
871 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
872 let (rx, addr) = source(
873 vec![],
874 vec![],
875 "http_path",
876 "remote_ip",
877 "/",
878 "POST",
879 StatusCode::OK,
880 None,
881 true,
882 EventStatus::Delivered,
883 true,
884 None,
885 Some(JsonDeserializerConfig::default().into()),
886 )
887 .await;
888
889 spawn_collect_n(
890 async move {
891 assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await);
892 assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await);
893 },
894 rx,
895 2,
896 )
897 .await
898 })
899 .await;
900
901 {
902 let event = events.remove(0);
903 let log = event.as_log();
904 assert_eq!(log["key"], "value".into());
905 assert_event_metadata(log).await;
906 }
907 {
908 let event = events.remove(0);
909 let log = event.as_log();
910 assert_eq!(log["key2"], "value2".into());
911 assert_event_metadata(log).await;
912 }
913 }
914
915 #[tokio::test]
916 async fn http_json_dotted_keys() {
917 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
918 let (rx, addr) = source(
919 vec![],
920 vec![],
921 "http_path",
922 "remote_ip",
923 "/",
924 "POST",
925 StatusCode::OK,
926 None,
927 true,
928 EventStatus::Delivered,
929 true,
930 None,
931 Some(JsonDeserializerConfig::default().into()),
932 )
933 .await;
934
935 spawn_collect_n(
936 async move {
937 assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await);
938 assert_eq!(
939 200,
940 send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await
941 );
942 },
943 rx,
944 2,
945 )
946 .await
947 })
948 .await;
949
950 {
951 let event = events.remove(0);
952 let log = event.as_log();
953 assert_eq!(
954 log.get(event_path!("dotted.key")).unwrap(),
955 &Value::from("value")
956 );
957 }
958 {
959 let event = events.remove(0);
960 let log = event.as_log();
961 let mut map = ObjectMap::new();
962 map.insert("dotted.key2".into(), Value::from("value2"));
963 assert_eq!(log["nested"], map.into());
964 }
965 }
966
967 #[tokio::test]
968 async fn http_ndjson() {
969 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
970 let (rx, addr) = source(
971 vec![],
972 vec![],
973 "http_path",
974 "remote_ip",
975 "/",
976 "POST",
977 StatusCode::OK,
978 None,
979 true,
980 EventStatus::Delivered,
981 true,
982 None,
983 Some(JsonDeserializerConfig::default().into()),
984 )
985 .await;
986
987 spawn_collect_n(
988 async move {
989 assert_eq!(
990 200,
991 send(addr, r#"[{"key1":"value1"},{"key2":"value2"}]"#).await
992 );
993
994 assert_eq!(
995 200,
996 send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await
997 );
998 },
999 rx,
1000 4,
1001 )
1002 .await
1003 })
1004 .await;
1005
1006 {
1007 let event = events.remove(0);
1008 let log = event.as_log();
1009 assert_eq!(log["key1"], "value1".into());
1010 assert_event_metadata(log).await;
1011 }
1012 {
1013 let event = events.remove(0);
1014 let log = event.as_log();
1015 assert_eq!(log["key2"], "value2".into());
1016 assert_event_metadata(log).await;
1017 }
1018 {
1019 let event = events.remove(0);
1020 let log = event.as_log();
1021 assert_eq!(log["key1"], "value1".into());
1022 assert_event_metadata(log).await;
1023 }
1024 {
1025 let event = events.remove(0);
1026 let log = event.as_log();
1027 assert_eq!(log["key2"], "value2".into());
1028 assert_event_metadata(log).await;
1029 }
1030 }
1031
1032 async fn assert_event_metadata(log: &LogEvent) {
1033 assert!(log.get_timestamp().is_some());
1034
1035 let source_type_key_value = log
1036 .get((PathPrefix::Event, log_schema().source_type_key().unwrap()))
1037 .unwrap()
1038 .as_str()
1039 .unwrap();
1040 assert_eq!(source_type_key_value, SimpleHttpConfig::NAME);
1041 assert_eq!(log["http_path"], "/".into());
1042 }
1043
1044 #[tokio::test]
1045 async fn http_headers() {
1046 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1047 let mut headers = HeaderMap::new();
1048 headers.insert("User-Agent", "test_client".parse().unwrap());
1049 headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
1050 headers.insert("X-Test-Header", "true".parse().unwrap());
1051
1052 let (rx, addr) = source(
1053 vec![
1054 "User-Agent".to_string(),
1055 "Upgrade-Insecure-Requests".to_string(),
1056 "X-*".to_string(),
1057 "AbsentHeader".to_string(),
1058 ],
1059 vec![],
1060 "http_path",
1061 "remote_ip",
1062 "/",
1063 "POST",
1064 StatusCode::OK,
1065 None,
1066 true,
1067 EventStatus::Delivered,
1068 true,
1069 None,
1070 Some(JsonDeserializerConfig::default().into()),
1071 )
1072 .await;
1073
1074 spawn_ok_collect_n(
1075 send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1076 rx,
1077 1,
1078 )
1079 .await
1080 })
1081 .await;
1082
1083 {
1084 let event = events.remove(0);
1085 let log = event.as_log();
1086 assert_eq!(log["key1"], "value1".into());
1087 assert_eq!(log["\"User-Agent\""], "test_client".into());
1088 assert_eq!(log["\"Upgrade-Insecure-Requests\""], "false".into());
1089 assert_eq!(log["\"x-test-header\""], "true".into());
1090 assert_eq!(log["AbsentHeader"], Value::Null);
1091 assert_event_metadata(log).await;
1092 }
1093 }
1094
1095 #[tokio::test]
1096 async fn http_headers_wildcard() {
1097 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1098 let mut headers = HeaderMap::new();
1099 headers.insert("User-Agent", "test_client".parse().unwrap());
1100 headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
1101 headers.insert("key1", "value_from_header".parse().unwrap());
1103
1104 let (rx, addr) = source(
1105 vec!["*".to_string()],
1106 vec![],
1107 "http_path",
1108 "remote_ip",
1109 "/",
1110 "POST",
1111 StatusCode::OK,
1112 None,
1113 true,
1114 EventStatus::Delivered,
1115 true,
1116 None,
1117 Some(JsonDeserializerConfig::default().into()),
1118 )
1119 .await;
1120
1121 spawn_ok_collect_n(
1122 send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1123 rx,
1124 1,
1125 )
1126 .await
1127 })
1128 .await;
1129
1130 {
1131 let event = events.remove(0);
1132 let log = event.as_log();
1133 assert_eq!(log["key1"], "value1".into());
1134 assert_eq!(log["\"user-agent\""], "test_client".into());
1135 assert_eq!(log["\"x-case-sensitive-value\""], "CaseSensitive".into());
1136 assert_event_metadata(log).await;
1137 }
1138 }
1139
1140 #[tokio::test]
1141 async fn http_query() {
1142 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1143 let (rx, addr) = source(
1144 vec![],
1145 vec![
1146 "source".to_string(),
1147 "region".to_string(),
1148 "absent".to_string(),
1149 ],
1150 "http_path",
1151 "remote_ip",
1152 "/",
1153 "POST",
1154 StatusCode::OK,
1155 None,
1156 true,
1157 EventStatus::Delivered,
1158 true,
1159 None,
1160 Some(JsonDeserializerConfig::default().into()),
1161 )
1162 .await;
1163
1164 spawn_ok_collect_n(
1165 send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging®ion=gb"),
1166 rx,
1167 1,
1168 )
1169 .await
1170 })
1171 .await;
1172
1173 {
1174 let event = events.remove(0);
1175 let log = event.as_log();
1176 assert_eq!(log["key1"], "value1".into());
1177 assert_eq!(log["source"], "staging".into());
1178 assert_eq!(log["region"], "gb".into());
1179 assert_eq!(log["absent"], Value::Null);
1180 assert_event_metadata(log).await;
1181 }
1182 }
1183
1184 #[tokio::test]
1185 async fn http_query_wildcard() {
1186 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1187 let (rx, addr) = source(
1188 vec![],
1189 vec!["*".to_string()],
1190 "http_path",
1191 "remote_ip",
1192 "/",
1193 "POST",
1194 StatusCode::OK,
1195 None,
1196 true,
1197 EventStatus::Delivered,
1198 true,
1199 None,
1200 Some(JsonDeserializerConfig::default().into()),
1201 )
1202 .await;
1203
1204 spawn_ok_collect_n(
1205 send_with_query(
1206 addr,
1207 "{\"key1\":\"value1\",\"key2\":\"value2\"}",
1208 "source=staging®ion=gb&key1=value_from_query",
1209 ),
1210 rx,
1211 1,
1212 )
1213 .await
1214 })
1215 .await;
1216
1217 {
1218 let event = events.remove(0);
1219 let log = event.as_log();
1220 assert_eq!(log["key1"], "value_from_query".into());
1221 assert_eq!(log["key2"], "value2".into());
1222 assert_eq!(log["source"], "staging".into());
1223 assert_eq!(log["region"], "gb".into());
1224 assert_event_metadata(log).await;
1225 }
1226 }
1227
1228 #[tokio::test]
1229 async fn http_gzip_deflate() {
1230 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1231 let body = "test body";
1232
1233 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1234 encoder.write_all(body.as_bytes()).unwrap();
1235 let body = encoder.finish().unwrap();
1236
1237 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1238 encoder.write_all(body.as_slice()).unwrap();
1239 let body = encoder.finish().unwrap();
1240
1241 let mut headers = HeaderMap::new();
1242 headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap());
1243
1244 let (rx, addr) = source(
1245 vec![],
1246 vec![],
1247 "http_path",
1248 "remote_ip",
1249 "/",
1250 "POST",
1251 StatusCode::OK,
1252 None,
1253 true,
1254 EventStatus::Delivered,
1255 true,
1256 None,
1257 None,
1258 )
1259 .await;
1260
1261 spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await
1262 })
1263 .await;
1264
1265 {
1266 let event = events.remove(0);
1267 let log = event.as_log();
1268 assert_eq!(*log.get_message().unwrap(), "test body".into());
1269 assert_event_metadata(log).await;
1270 }
1271 }
1272
1273 #[tokio::test]
1274 async fn http_path() {
1275 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1276 let (rx, addr) = source(
1277 vec![],
1278 vec![],
1279 "vector_http_path",
1280 "vector_remote_ip",
1281 "/event/path",
1282 "POST",
1283 StatusCode::OK,
1284 None,
1285 true,
1286 EventStatus::Delivered,
1287 true,
1288 None,
1289 Some(JsonDeserializerConfig::default().into()),
1290 )
1291 .await;
1292
1293 spawn_ok_collect_n(
1294 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"),
1295 rx,
1296 1,
1297 )
1298 .await
1299 })
1300 .await;
1301
1302 {
1303 let event = events.remove(0);
1304 let log = event.as_log();
1305 assert_eq!(log["key1"], "value1".into());
1306 assert_eq!(log["vector_http_path"], "/event/path".into());
1307 assert!(log.get_timestamp().is_some());
1308 assert_eq!(
1309 *log.get_source_type().unwrap(),
1310 SimpleHttpConfig::NAME.into()
1311 );
1312 }
1313 }
1314
1315 #[tokio::test]
1316 async fn http_path_no_restriction() {
1317 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1318 let (rx, addr) = source(
1319 vec![],
1320 vec![],
1321 "vector_http_path",
1322 "vector_remote_ip",
1323 "/event",
1324 "POST",
1325 StatusCode::OK,
1326 None,
1327 false,
1328 EventStatus::Delivered,
1329 true,
1330 None,
1331 Some(JsonDeserializerConfig::default().into()),
1332 )
1333 .await;
1334
1335 spawn_collect_n(
1336 async move {
1337 assert_eq!(
1338 200,
1339 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await
1340 );
1341 assert_eq!(
1342 200,
1343 send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await
1344 );
1345 },
1346 rx,
1347 2,
1348 )
1349 .await
1350 })
1351 .await;
1352
1353 {
1354 let event = events.remove(0);
1355 let log = event.as_log();
1356 assert_eq!(log["key1"], "value1".into());
1357 assert_eq!(log["vector_http_path"], "/event/path1".into());
1358 assert!(log.get_timestamp().is_some());
1359 assert_eq!(
1360 *log.get_source_type().unwrap(),
1361 SimpleHttpConfig::NAME.into()
1362 );
1363 }
1364 {
1365 let event = events.remove(0);
1366 let log = event.as_log();
1367 assert_eq!(log["key2"], "value2".into());
1368 assert_eq!(log["vector_http_path"], "/event/path2".into());
1369 assert!(log.get_timestamp().is_some());
1370 assert_eq!(
1371 *log.get_source_type().unwrap(),
1372 SimpleHttpConfig::NAME.into()
1373 );
1374 }
1375 }
1376
1377 #[tokio::test]
1378 async fn http_wrong_path() {
1379 components::init_test();
1380 let (_rx, addr) = source(
1381 vec![],
1382 vec![],
1383 "vector_http_path",
1384 "vector_remote_ip",
1385 "/",
1386 "POST",
1387 StatusCode::OK,
1388 None,
1389 true,
1390 EventStatus::Delivered,
1391 true,
1392 None,
1393 Some(JsonDeserializerConfig::default().into()),
1394 )
1395 .await;
1396
1397 assert_eq!(
1398 404,
1399 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await
1400 );
1401 }
1402
1403 #[tokio::test]
1404 async fn http_status_code() {
1405 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
1406 let (rx, addr) = source(
1407 vec![],
1408 vec![],
1409 "http_path",
1410 "remote_ip",
1411 "/",
1412 "POST",
1413 StatusCode::ACCEPTED,
1414 None,
1415 true,
1416 EventStatus::Delivered,
1417 true,
1418 None,
1419 None,
1420 )
1421 .await;
1422
1423 spawn_collect_n(
1424 async move {
1425 assert_eq!(
1426 StatusCode::ACCEPTED,
1427 send(addr, "{\"key1\":\"value1\"}").await
1428 );
1429 },
1430 rx,
1431 1,
1432 )
1433 .await;
1434 })
1435 .await;
1436 }
1437
1438 #[tokio::test]
1439 async fn http_delivery_failure() {
1440 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1441 let (rx, addr) = source(
1442 vec![],
1443 vec![],
1444 "http_path",
1445 "remote_ip",
1446 "/",
1447 "POST",
1448 StatusCode::OK,
1449 None,
1450 true,
1451 EventStatus::Rejected,
1452 true,
1453 None,
1454 None,
1455 )
1456 .await;
1457
1458 spawn_collect_n(
1459 async move {
1460 assert_eq!(400, send(addr, "test body\n").await);
1461 },
1462 rx,
1463 1,
1464 )
1465 .await;
1466 })
1467 .await;
1468 }
1469
1470 #[tokio::test]
1471 async fn ignores_disabled_acknowledgements() {
1472 let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1473 let (rx, addr) = source(
1474 vec![],
1475 vec![],
1476 "http_path",
1477 "remote_ip",
1478 "/",
1479 "POST",
1480 StatusCode::OK,
1481 None,
1482 true,
1483 EventStatus::Rejected,
1484 false,
1485 None,
1486 None,
1487 )
1488 .await;
1489
1490 spawn_collect_n(
1491 async move {
1492 assert_eq!(200, send(addr, "test body\n").await);
1493 },
1494 rx,
1495 1,
1496 )
1497 .await
1498 })
1499 .await;
1500
1501 assert_eq!(events.len(), 1);
1502 }
1503
1504 #[tokio::test]
1505 async fn http_get_method() {
1506 components::init_test();
1507 let (_rx, addr) = source(
1508 vec![],
1509 vec![],
1510 "http_path",
1511 "remote_ip",
1512 "/",
1513 "GET",
1514 StatusCode::OK,
1515 None,
1516 true,
1517 EventStatus::Delivered,
1518 true,
1519 None,
1520 None,
1521 )
1522 .await;
1523
1524 assert_eq!(200, send_request(addr, "GET", "", "/").await);
1525 }
1526
1527 #[tokio::test]
1528 async fn returns_401_when_required_auth_is_missing() {
1529 components::init_test();
1530 let (_rx, addr) = source(
1531 vec![],
1532 vec![],
1533 "http_path",
1534 "remote_ip",
1535 "/",
1536 "GET",
1537 StatusCode::OK,
1538 Some(HttpServerAuthConfig::Basic {
1539 username: "test".to_string(),
1540 password: "test".to_string().into(),
1541 }),
1542 true,
1543 EventStatus::Delivered,
1544 true,
1545 None,
1546 None,
1547 )
1548 .await;
1549
1550 assert_eq!(401, send_request(addr, "GET", "", "/").await);
1551 }
1552
1553 #[tokio::test]
1554 async fn returns_401_when_required_auth_is_wrong() {
1555 components::init_test();
1556 let (_rx, addr) = source(
1557 vec![],
1558 vec![],
1559 "http_path",
1560 "remote_ip",
1561 "/",
1562 "POST",
1563 StatusCode::OK,
1564 Some(HttpServerAuthConfig::Basic {
1565 username: "test".to_string(),
1566 password: "test".to_string().into(),
1567 }),
1568 true,
1569 EventStatus::Delivered,
1570 true,
1571 None,
1572 None,
1573 )
1574 .await;
1575
1576 let mut headers = HeaderMap::new();
1577 headers.insert(
1578 AUTHORIZATION,
1579 Authorization::basic("wrong", "test").0.encode(),
1580 );
1581 assert_eq!(401, send_with_headers(addr, "", headers).await);
1582 }
1583
1584 #[tokio::test]
1585 async fn http_get_with_correct_auth() {
1586 components::init_test();
1587 let (_rx, addr) = source(
1588 vec![],
1589 vec![],
1590 "http_path",
1591 "remote_ip",
1592 "/",
1593 "POST",
1594 StatusCode::OK,
1595 Some(HttpServerAuthConfig::Basic {
1596 username: "test".to_string(),
1597 password: "test".to_string().into(),
1598 }),
1599 true,
1600 EventStatus::Delivered,
1601 true,
1602 None,
1603 None,
1604 )
1605 .await;
1606
1607 let mut headers = HeaderMap::new();
1608 headers.insert(
1609 AUTHORIZATION,
1610 Authorization::basic("test", "test").0.encode(),
1611 );
1612 assert_eq!(200, send_with_headers(addr, "", headers).await);
1613 }
1614
1615 #[test]
1616 fn output_schema_definition_vector_namespace() {
1617 let config = SimpleHttpConfig {
1618 log_namespace: Some(true),
1619 ..Default::default()
1620 };
1621
1622 let definitions = config
1623 .outputs(LogNamespace::Vector)
1624 .remove(0)
1625 .schema_definition(true);
1626
1627 let expected_definition =
1628 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1629 .with_meaning(OwnedTargetPath::event_root(), "message")
1630 .with_metadata_field(
1631 &owned_value_path!("vector", "source_type"),
1632 Kind::bytes(),
1633 None,
1634 )
1635 .with_metadata_field(
1636 &owned_value_path!(SimpleHttpConfig::NAME, "path"),
1637 Kind::bytes(),
1638 None,
1639 )
1640 .with_metadata_field(
1641 &owned_value_path!(SimpleHttpConfig::NAME, "headers"),
1642 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1643 None,
1644 )
1645 .with_metadata_field(
1646 &owned_value_path!(SimpleHttpConfig::NAME, "query_parameters"),
1647 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1648 None,
1649 )
1650 .with_metadata_field(
1651 &owned_value_path!(SimpleHttpConfig::NAME, "host"),
1652 Kind::bytes().or_undefined(),
1653 None,
1654 )
1655 .with_metadata_field(
1656 &owned_value_path!("vector", "ingest_timestamp"),
1657 Kind::timestamp(),
1658 None,
1659 );
1660
1661 assert_eq!(definitions, Some(expected_definition))
1662 }
1663
1664 #[test]
1665 fn output_schema_definition_legacy_namespace() {
1666 let config = SimpleHttpConfig::default();
1667
1668 let definitions = config
1669 .outputs(LogNamespace::Legacy)
1670 .remove(0)
1671 .schema_definition(true);
1672
1673 let expected_definition = Definition::new_with_default_metadata(
1674 Kind::object(Collection::empty()),
1675 [LogNamespace::Legacy],
1676 )
1677 .with_event_field(
1678 &owned_value_path!("message"),
1679 Kind::bytes(),
1680 Some("message"),
1681 )
1682 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1683 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1684 .with_event_field(&owned_value_path!("path"), Kind::bytes(), None)
1685 .with_event_field(
1686 &owned_value_path!("host"),
1687 Kind::bytes().or_undefined(),
1688 None,
1689 )
1690 .unknown_fields(Kind::bytes());
1691
1692 assert_eq!(definitions, Some(expected_definition))
1693 }
1694
1695 #[test]
1696 fn validate_remove_duplicates() {
1697 let mut list = vec![
1698 "a".to_owned(),
1699 "b".to_owned(),
1700 "c".to_owned(),
1701 "d".to_owned(),
1702 ];
1703
1704 {
1706 let list_dedup = remove_duplicates(list.clone(), "foo");
1707
1708 assert_eq!(list, list_dedup);
1709 }
1710
1711 list.push("b".to_owned());
1712
1713 {
1715 let list_dedup = remove_duplicates(list.clone(), "foo");
1716 assert_eq!(
1717 vec![
1718 "a".to_owned(),
1719 "b".to_owned(),
1720 "c".to_owned(),
1721 "d".to_owned()
1722 ],
1723 list_dedup
1724 );
1725 }
1726 }
1727
1728 impl ValidatableComponent for SimpleHttpConfig {
1729 fn validation_configuration() -> ValidationConfiguration {
1730 let config = Self {
1731 decoding: Some(DeserializerConfig::Json(Default::default())),
1732 ..Default::default()
1733 };
1734
1735 let log_namespace: LogNamespace = config.log_namespace.unwrap_or(false).into();
1736
1737 let listen_addr_http = format!("http://{}/", config.address);
1738 let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
1739
1740 let external_resource = ExternalResource::new(
1741 ResourceDirection::Push,
1742 HttpResourceConfig::from_parts(uri, Some(config.method.into())),
1743 config
1744 .get_decoding_config()
1745 .expect("should not fail to get decoding config"),
1746 );
1747
1748 ValidationConfiguration::from_source(
1749 Self::NAME,
1750 log_namespace,
1751 vec![ComponentTestCaseConfig::from_source(
1752 config,
1753 None,
1754 Some(external_resource),
1755 )],
1756 )
1757 }
1758 }
1759
1760 register_validatable_component!(SimpleHttpConfig);
1761}