1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::{Bytes, BytesMut};
4use chrono::Utc;
5use http::StatusCode;
6use http_serde;
7use tokio_util::codec::Decoder as _;
8use vector_lib::{
9 codecs::{
10 BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
11 NewlineDelimitedDecoderConfig,
12 decoding::{DeserializerConfig, FramingConfig},
13 },
14 config::{DataType, LegacyKey, LogNamespace},
15 configurable::configurable_component,
16 lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
17 schema::Definition,
18};
19use vrl::value::{Kind, kind::Collection};
20use warp::http::HeaderMap;
21
22use crate::{
23 codecs::{Decoder, DecodingConfig},
24 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
25 config::{
26 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
27 SourceOutput,
28 },
29 event::Event,
30 http::KeepaliveConfig,
31 serde::{bool_or_struct, default_decoding},
32 sources::util::{
33 Encoding, HttpSource,
34 http::{HttpMethod, add_headers, add_query_parameters},
35 },
36 tls::TlsEnableableConfig,
37};
38
39#[configurable_component(source("http", "Host an HTTP endpoint to receive logs."))]
41#[configurable(metadata(deprecated))]
42#[derive(Clone, Debug)]
43pub struct HttpConfig(SimpleHttpConfig);
44
45impl GenerateConfig for HttpConfig {
46 fn generate_config() -> toml::Value {
47 <SimpleHttpConfig as GenerateConfig>::generate_config()
48 }
49}
50
51#[async_trait::async_trait]
52#[typetag::serde(name = "http")]
53impl SourceConfig for HttpConfig {
54 async fn build(&self, cx: SourceContext) -> vector_lib::Result<super::Source> {
55 self.0.build(cx).await
56 }
57
58 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
59 self.0.outputs(global_log_namespace)
60 }
61
62 fn resources(&self) -> Vec<Resource> {
63 self.0.resources()
64 }
65
66 fn can_acknowledge(&self) -> bool {
67 self.0.can_acknowledge()
68 }
69}
70
71#[configurable_component(source("http_server", "Host an HTTP endpoint to receive logs."))]
73#[derive(Clone, Debug)]
74pub struct SimpleHttpConfig {
75 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
79 #[configurable(metadata(docs::examples = "localhost:80"))]
80 address: SocketAddr,
81
82 #[configurable(deprecated)]
86 #[serde(default)]
87 encoding: Option<Encoding>,
88
89 #[serde(default)]
97 #[configurable(metadata(docs::examples = "User-Agent"))]
98 #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
99 #[configurable(metadata(docs::examples = "X-*"))]
100 #[configurable(metadata(docs::examples = "*"))]
101 headers: Vec<String>,
102
103 #[serde(default)]
111 #[configurable(metadata(docs::examples = "application"))]
112 #[configurable(metadata(docs::examples = "source"))]
113 #[configurable(metadata(docs::examples = "param*"))]
114 #[configurable(metadata(docs::examples = "*"))]
115 query_parameters: Vec<String>,
116
117 #[configurable(derived)]
118 auth: Option<HttpServerAuthConfig>,
119
120 #[serde(default = "crate::serde::default_true")]
128 strict_path: bool,
129
130 #[serde(default = "default_path")]
132 #[configurable(metadata(docs::examples = "/event/path"))]
133 #[configurable(metadata(docs::examples = "/logs"))]
134 path: String,
135
136 #[serde(default = "default_path_key")]
138 #[configurable(metadata(docs::examples = "vector_http_path"))]
139 path_key: OptionalValuePath,
140
141 #[serde(default = "default_host_key")]
143 #[configurable(metadata(docs::examples = "hostname"))]
144 host_key: OptionalValuePath,
145
146 #[serde(default = "default_http_method")]
148 method: HttpMethod,
149
150 #[configurable(metadata(docs::examples = 202))]
152 #[configurable(metadata(docs::numeric_type = "uint"))]
153 #[serde(with = "http_serde::status_code")]
154 #[serde(default = "default_http_response_code")]
155 response_code: StatusCode,
156
157 #[configurable(derived)]
158 tls: Option<TlsEnableableConfig>,
159
160 #[configurable(derived)]
161 framing: Option<FramingConfig>,
162
163 #[configurable(derived)]
164 decoding: Option<DeserializerConfig>,
165
166 #[configurable(derived)]
167 #[serde(default, deserialize_with = "bool_or_struct")]
168 acknowledgements: SourceAcknowledgementsConfig,
169
170 #[configurable(metadata(docs::hidden))]
172 #[serde(default)]
173 log_namespace: Option<bool>,
174
175 #[configurable(derived)]
176 #[serde(default)]
177 keepalive: KeepaliveConfig,
178}
179
180impl SimpleHttpConfig {
181 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
183 let mut schema_definition = self
184 .decoding
185 .as_ref()
186 .unwrap_or(&default_decoding())
187 .schema_definition(log_namespace)
188 .with_source_metadata(
189 SimpleHttpConfig::NAME,
190 self.path_key.path.clone().map(LegacyKey::InsertIfEmpty),
191 &owned_value_path!("path"),
192 Kind::bytes(),
193 None,
194 )
195 .with_source_metadata(
197 SimpleHttpConfig::NAME,
198 None,
199 &owned_value_path!("headers"),
200 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
201 None,
202 )
203 .with_source_metadata(
205 SimpleHttpConfig::NAME,
206 None,
207 &owned_value_path!("query_parameters"),
208 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
209 None,
210 )
211 .with_source_metadata(
212 SimpleHttpConfig::NAME,
213 self.host_key.path.clone().map(LegacyKey::Overwrite),
214 &owned_value_path!("host"),
215 Kind::bytes().or_undefined(),
216 None,
217 )
218 .with_standard_vector_source_metadata();
219
220 if log_namespace == LogNamespace::Legacy {
222 schema_definition = schema_definition.unknown_fields(Kind::bytes());
223 }
224
225 schema_definition
226 }
227
228 fn get_decoding_config(&self) -> crate::Result<DecodingConfig> {
229 if self.encoding.is_some() && (self.framing.is_some() || self.decoding.is_some()) {
230 return Err("Using `encoding` is deprecated and does not have any effect when `decoding` or `framing` is provided. Configure `framing` and `decoding` instead.".into());
231 }
232
233 let (framing, decoding) = if let Some(encoding) = self.encoding {
234 match encoding {
235 Encoding::Text => (
236 NewlineDelimitedDecoderConfig::new().into(),
237 BytesDeserializerConfig::new().into(),
238 ),
239 Encoding::Json => (
240 BytesDecoderConfig::new().into(),
241 JsonDeserializerConfig::default().into(),
242 ),
243 Encoding::Ndjson => (
244 NewlineDelimitedDecoderConfig::new().into(),
245 JsonDeserializerConfig::default().into(),
246 ),
247 Encoding::Binary => (
248 BytesDecoderConfig::new().into(),
249 BytesDeserializerConfig::new().into(),
250 ),
251 }
252 } else {
253 let decoding = self.decoding.clone().unwrap_or_else(default_decoding);
254 let framing = self
255 .framing
256 .clone()
257 .unwrap_or_else(|| decoding.default_stream_framing());
258 (framing, decoding)
259 };
260
261 Ok(DecodingConfig::new(
262 framing,
263 decoding,
264 self.log_namespace.unwrap_or(false).into(),
265 ))
266 }
267}
268
269impl Default for SimpleHttpConfig {
270 fn default() -> Self {
271 Self {
272 address: "0.0.0.0:8080".parse().unwrap(),
273 encoding: None,
274 headers: Vec::new(),
275 query_parameters: Vec::new(),
276 tls: None,
277 auth: None,
278 path: default_path(),
279 path_key: default_path_key(),
280 host_key: default_host_key(),
281 method: default_http_method(),
282 response_code: default_http_response_code(),
283 strict_path: true,
284 framing: None,
285 decoding: Some(default_decoding()),
286 acknowledgements: SourceAcknowledgementsConfig::default(),
287 log_namespace: None,
288 keepalive: KeepaliveConfig::default(),
289 }
290 }
291}
292
293impl_generate_config_from_default!(SimpleHttpConfig);
294
295const fn default_http_method() -> HttpMethod {
296 HttpMethod::Post
297}
298
299fn default_path() -> String {
300 "/".to_string()
301}
302
303fn default_path_key() -> OptionalValuePath {
304 OptionalValuePath::from(owned_value_path!("path"))
305}
306
307fn default_host_key() -> OptionalValuePath {
308 OptionalValuePath::none()
309}
310
311const fn default_http_response_code() -> StatusCode {
312 StatusCode::OK
313}
314
315pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
317 list.sort();
318
319 let mut dedup = false;
320 for (idx, name) in list.iter().enumerate() {
321 if idx < list.len() - 1 && list[idx] == list[idx + 1] {
322 warn!(
323 "`{}` configuration contains duplicate entry for `{}`. Removing duplicate.",
324 list_name, name
325 );
326 dedup = true;
327 }
328 }
329
330 if dedup {
331 list.dedup();
332 }
333 list
334}
335
336fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
338 addr.ip().to_string()
339}
340
341#[derive(Clone)]
342pub enum HttpConfigParamKind {
343 Glob(glob::Pattern),
344 Exact(String),
345}
346
347pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
348 list.iter()
349 .map(|s| match s.contains('*') {
350 true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
351 false => Ok(HttpConfigParamKind::Exact(s.to_string())),
352 })
353 .collect::<crate::Result<Vec<HttpConfigParamKind>>>()
354}
355
356#[async_trait::async_trait]
357#[typetag::serde(name = "http_server")]
358impl SourceConfig for SimpleHttpConfig {
359 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
360 let log_namespace = cx.log_namespace(self.log_namespace);
361 let decoder = self
362 .get_decoding_config()?
363 .build()?
364 .with_log_namespace(log_namespace);
365
366 let source = SimpleHttpSource {
367 headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
368 query_parameters: build_param_matcher(&remove_duplicates(
369 self.query_parameters.clone(),
370 "query_parameters",
371 ))?,
372 path_key: self.path_key.clone(),
373 host_key: self.host_key.clone(),
374 decoder,
375 log_namespace,
376 };
377 source.run(
378 self.address,
379 self.path.as_str(),
380 self.method,
381 self.response_code,
382 self.strict_path,
383 self.tls.as_ref(),
384 self.auth.as_ref(),
385 cx,
386 self.acknowledgements,
387 self.keepalive.clone(),
388 )
389 }
390
391 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
392 let log_namespace = global_log_namespace.merge(self.log_namespace);
395
396 let schema_definition = self.schema_definition(log_namespace);
397
398 vec![SourceOutput::new_maybe_logs(
399 self.decoding
400 .as_ref()
401 .map(|d| d.output_type())
402 .unwrap_or(DataType::Log),
403 schema_definition,
404 )]
405 }
406
407 fn resources(&self) -> Vec<Resource> {
408 vec![Resource::tcp(self.address)]
409 }
410
411 fn can_acknowledge(&self) -> bool {
412 true
413 }
414}
415
416#[derive(Clone)]
417struct SimpleHttpSource {
418 headers: Vec<HttpConfigParamKind>,
419 query_parameters: Vec<HttpConfigParamKind>,
420 path_key: OptionalValuePath,
421 host_key: OptionalValuePath,
422 decoder: Decoder,
423 log_namespace: LogNamespace,
424}
425
426impl HttpSource for SimpleHttpSource {
427 fn enrich_events(
430 &self,
431 events: &mut [Event],
432 request_path: &str,
433 headers: &HeaderMap,
434 query_parameters: &HashMap<String, String>,
435 source_ip: Option<&SocketAddr>,
436 ) {
437 let now = Utc::now();
438 for event in events.iter_mut() {
439 match event {
440 Event::Log(log) => {
441 self.log_namespace.insert_source_metadata(
443 SimpleHttpConfig::NAME,
444 log,
445 self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
446 path!("path"),
447 request_path.to_owned(),
448 );
449
450 self.log_namespace.insert_standard_vector_source_metadata(
451 log,
452 SimpleHttpConfig::NAME,
453 now,
454 );
455
456 if let Some(addr) = source_ip {
457 self.log_namespace.insert_source_metadata(
458 SimpleHttpConfig::NAME,
459 log,
460 self.host_key.path.as_ref().map(LegacyKey::Overwrite),
461 path!("host"),
462 socket_addr_to_ip_string(addr),
463 );
464 }
465 }
466 _ => {
467 continue;
468 }
469 }
470 }
471
472 add_headers(
473 events,
474 &self.headers,
475 headers,
476 self.log_namespace,
477 SimpleHttpConfig::NAME,
478 );
479
480 add_query_parameters(
481 events,
482 &self.query_parameters,
483 query_parameters,
484 self.log_namespace,
485 SimpleHttpConfig::NAME,
486 );
487 }
488
489 fn build_events(
490 &self,
491 body: Bytes,
492 _header_map: &HeaderMap,
493 _query_parameters: &HashMap<String, String>,
494 _request_path: &str,
495 ) -> Result<Vec<Event>, ErrorMessage> {
496 let mut decoder = self.decoder.clone();
497 let mut events = Vec::new();
498 let mut bytes = BytesMut::new();
499 bytes.extend_from_slice(&body);
500
501 loop {
502 match decoder.decode_eof(&mut bytes) {
503 Ok(Some((next, _))) => {
504 events.extend(next);
505 }
506 Ok(None) => break,
507 Err(error) => {
508 return Err(ErrorMessage::new(
511 StatusCode::BAD_REQUEST,
512 format!("Failed decoding body: {error}"),
513 ));
514 }
515 }
516 }
517
518 Ok(events)
519 }
520
521 fn enable_source_ip(&self) -> bool {
522 self.host_key.path.is_some()
523 }
524}
525
526#[cfg(test)]
527mod tests {
528 use std::{io::Write, net::SocketAddr, str::FromStr};
529
530 use flate2::{
531 Compression,
532 write::{GzEncoder, ZlibEncoder},
533 };
534 use futures::Stream;
535 use headers::{Authorization, authorization::Credentials};
536 use http::{HeaderMap, Method, StatusCode, Uri, header::AUTHORIZATION};
537 use similar_asserts::assert_eq;
538 use vector_lib::{
539 codecs::{
540 BytesDecoderConfig, JsonDeserializerConfig,
541 decoding::{DeserializerConfig, FramingConfig},
542 },
543 config::LogNamespace,
544 event::LogEvent,
545 lookup::{
546 OwnedTargetPath, PathPrefix, event_path, lookup_v2::OptionalValuePath, owned_value_path,
547 },
548 schema::Definition,
549 };
550 use vrl::value::{Kind, ObjectMap, kind::Collection};
551
552 use super::{SimpleHttpConfig, remove_duplicates};
553 use crate::{
554 SourceSender,
555 common::http::server_auth::HttpServerAuthConfig,
556 components::validation::prelude::*,
557 config::{SourceConfig, SourceContext, log_schema},
558 event::{Event, EventStatus, Value},
559 sources::http_server::HttpMethod,
560 test_util::{
561 components::{self, HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
562 next_addr, spawn_collect_n, wait_for_tcp,
563 },
564 };
565
566 #[test]
567 fn generate_config() {
568 crate::test_util::test_generate_config::<SimpleHttpConfig>();
569 }
570
571 #[allow(clippy::too_many_arguments)]
572 async fn source<'a>(
573 headers: Vec<String>,
574 query_parameters: Vec<String>,
575 path_key: &'a str,
576 host_key: &'a str,
577 path: &'a str,
578 method: &'a str,
579 response_code: StatusCode,
580 auth: Option<HttpServerAuthConfig>,
581 strict_path: bool,
582 status: EventStatus,
583 acknowledgements: bool,
584 framing: Option<FramingConfig>,
585 decoding: Option<DeserializerConfig>,
586 ) -> (impl Stream<Item = Event> + 'a, SocketAddr) {
587 let (sender, recv) = SourceSender::new_test_finalize(status);
588 let address = next_addr();
589 let path = path.to_owned();
590 let host_key = OptionalValuePath::from(owned_value_path!(host_key));
591 let path_key = OptionalValuePath::from(owned_value_path!(path_key));
592 let context = SourceContext::new_test(sender, None);
593 let method = match Method::from_str(method).unwrap() {
594 Method::GET => HttpMethod::Get,
595 Method::POST => HttpMethod::Post,
596 _ => HttpMethod::Post,
597 };
598
599 tokio::spawn(async move {
600 SimpleHttpConfig {
601 address,
602 headers,
603 encoding: None,
604 query_parameters,
605 response_code,
606 tls: None,
607 auth,
608 strict_path,
609 path_key,
610 host_key,
611 path,
612 method,
613 framing,
614 decoding,
615 acknowledgements: acknowledgements.into(),
616 log_namespace: None,
617 keepalive: Default::default(),
618 }
619 .build(context)
620 .await
621 .unwrap()
622 .await
623 .unwrap();
624 });
625 wait_for_tcp(address).await;
626 (recv, address)
627 }
628
629 async fn send(address: SocketAddr, body: &str) -> u16 {
630 reqwest::Client::new()
631 .post(format!("http://{address}/"))
632 .body(body.to_owned())
633 .send()
634 .await
635 .unwrap()
636 .status()
637 .as_u16()
638 }
639
640 async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 {
641 reqwest::Client::new()
642 .post(format!("http://{address}/"))
643 .headers(headers)
644 .body(body.to_owned())
645 .send()
646 .await
647 .unwrap()
648 .status()
649 .as_u16()
650 }
651
652 async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 {
653 reqwest::Client::new()
654 .post(format!("http://{address}?{query}"))
655 .body(body.to_owned())
656 .send()
657 .await
658 .unwrap()
659 .status()
660 .as_u16()
661 }
662
663 async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 {
664 reqwest::Client::new()
665 .post(format!("http://{address}{path}"))
666 .body(body.to_owned())
667 .send()
668 .await
669 .unwrap()
670 .status()
671 .as_u16()
672 }
673
674 async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 {
675 let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap();
676 reqwest::Client::new()
677 .request(method, format!("http://{address}{path}"))
678 .body(body.to_owned())
679 .send()
680 .await
681 .unwrap()
682 .status()
683 .as_u16()
684 }
685
686 async fn send_bytes(address: SocketAddr, body: Vec<u8>, headers: HeaderMap) -> u16 {
687 reqwest::Client::new()
688 .post(format!("http://{address}/"))
689 .headers(headers)
690 .body(body)
691 .send()
692 .await
693 .unwrap()
694 .status()
695 .as_u16()
696 }
697
698 async fn spawn_ok_collect_n(
699 send: impl std::future::Future<Output = u16> + Send + 'static,
700 rx: impl Stream<Item = Event> + Unpin,
701 n: usize,
702 ) -> Vec<Event> {
703 spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await
704 }
705
706 #[tokio::test]
707 async fn http_multiline_text() {
708 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
709 let body = "test body\ntest body 2";
710
711 let (rx, addr) = source(
712 vec![],
713 vec![],
714 "http_path",
715 "remote_ip",
716 "/",
717 "POST",
718 StatusCode::OK,
719 None,
720 true,
721 EventStatus::Delivered,
722 true,
723 None,
724 None,
725 )
726 .await;
727
728 spawn_ok_collect_n(send(addr, body), rx, 2).await
729 })
730 .await;
731
732 {
733 let event = events.remove(0);
734 let log = event.as_log();
735 assert_eq!(*log.get_message().unwrap(), "test body".into());
736 assert!(log.get_timestamp().is_some());
737 assert_eq!(
738 *log.get_source_type().unwrap(),
739 SimpleHttpConfig::NAME.into()
740 );
741 assert_eq!(log["http_path"], "/".into());
742 assert_event_metadata(log).await;
743 }
744 {
745 let event = events.remove(0);
746 let log = event.as_log();
747 assert_eq!(*log.get_message().unwrap(), "test body 2".into());
748 assert_event_metadata(log).await;
749 }
750 }
751
752 #[tokio::test]
753 async fn http_multiline_text2() {
754 let body = "test body\ntest body 2\n";
756
757 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
758 let (rx, addr) = source(
759 vec![],
760 vec![],
761 "http_path",
762 "remote_ip",
763 "/",
764 "POST",
765 StatusCode::OK,
766 None,
767 true,
768 EventStatus::Delivered,
769 true,
770 None,
771 None,
772 )
773 .await;
774
775 spawn_ok_collect_n(send(addr, body), rx, 2).await
776 })
777 .await;
778
779 {
780 let event = events.remove(0);
781 let log = event.as_log();
782 assert_eq!(*log.get_message().unwrap(), "test body".into());
783 assert_event_metadata(log).await;
784 }
785 {
786 let event = events.remove(0);
787 let log = event.as_log();
788 assert_eq!(*log.get_message().unwrap(), "test body 2".into());
789 assert_event_metadata(log).await;
790 }
791 }
792
793 #[tokio::test]
794 async fn http_bytes_codec_preserves_newlines() {
795 let body = "foo\nbar";
796
797 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
798 let (rx, addr) = source(
799 vec![],
800 vec![],
801 "http_path",
802 "remote_ip",
803 "/",
804 "POST",
805 StatusCode::OK,
806 None,
807 true,
808 EventStatus::Delivered,
809 true,
810 Some(BytesDecoderConfig::new().into()),
811 None,
812 )
813 .await;
814
815 spawn_ok_collect_n(send(addr, body), rx, 1).await
816 })
817 .await;
818
819 assert_eq!(events.len(), 1);
820
821 {
822 let event = events.remove(0);
823 let log = event.as_log();
824 assert_eq!(*log.get_message().unwrap(), "foo\nbar".into());
825 assert_event_metadata(log).await;
826 }
827 }
828
829 #[tokio::test]
830 async fn http_json_parsing() {
831 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
832 let (rx, addr) = source(
833 vec![],
834 vec![],
835 "http_path",
836 "remote_ip",
837 "/",
838 "POST",
839 StatusCode::OK,
840 None,
841 true,
842 EventStatus::Delivered,
843 true,
844 None,
845 Some(JsonDeserializerConfig::default().into()),
846 )
847 .await;
848
849 spawn_collect_n(
850 async move {
851 assert_eq!(400, send(addr, "{").await); assert_eq!(400, send(addr, r#"{"key"}"#).await); assert_eq!(200, send(addr, "{}").await); assert_eq!(200, send(addr, "[{},{},{}]").await);
856 },
857 rx,
858 2,
859 )
860 .await
861 })
862 .await;
863
864 assert!(events.remove(1).as_log().get_timestamp().is_some());
865 assert!(events.remove(0).as_log().get_timestamp().is_some());
866 }
867
868 #[tokio::test]
869 async fn http_json_values() {
870 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
871 let (rx, addr) = source(
872 vec![],
873 vec![],
874 "http_path",
875 "remote_ip",
876 "/",
877 "POST",
878 StatusCode::OK,
879 None,
880 true,
881 EventStatus::Delivered,
882 true,
883 None,
884 Some(JsonDeserializerConfig::default().into()),
885 )
886 .await;
887
888 spawn_collect_n(
889 async move {
890 assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await);
891 assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await);
892 },
893 rx,
894 2,
895 )
896 .await
897 })
898 .await;
899
900 {
901 let event = events.remove(0);
902 let log = event.as_log();
903 assert_eq!(log["key"], "value".into());
904 assert_event_metadata(log).await;
905 }
906 {
907 let event = events.remove(0);
908 let log = event.as_log();
909 assert_eq!(log["key2"], "value2".into());
910 assert_event_metadata(log).await;
911 }
912 }
913
914 #[tokio::test]
915 async fn http_json_dotted_keys() {
916 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
917 let (rx, addr) = source(
918 vec![],
919 vec![],
920 "http_path",
921 "remote_ip",
922 "/",
923 "POST",
924 StatusCode::OK,
925 None,
926 true,
927 EventStatus::Delivered,
928 true,
929 None,
930 Some(JsonDeserializerConfig::default().into()),
931 )
932 .await;
933
934 spawn_collect_n(
935 async move {
936 assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await);
937 assert_eq!(
938 200,
939 send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await
940 );
941 },
942 rx,
943 2,
944 )
945 .await
946 })
947 .await;
948
949 {
950 let event = events.remove(0);
951 let log = event.as_log();
952 assert_eq!(
953 log.get(event_path!("dotted.key")).unwrap(),
954 &Value::from("value")
955 );
956 }
957 {
958 let event = events.remove(0);
959 let log = event.as_log();
960 let mut map = ObjectMap::new();
961 map.insert("dotted.key2".into(), Value::from("value2"));
962 assert_eq!(log["nested"], map.into());
963 }
964 }
965
966 #[tokio::test]
967 async fn http_ndjson() {
968 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
969 let (rx, addr) = source(
970 vec![],
971 vec![],
972 "http_path",
973 "remote_ip",
974 "/",
975 "POST",
976 StatusCode::OK,
977 None,
978 true,
979 EventStatus::Delivered,
980 true,
981 None,
982 Some(JsonDeserializerConfig::default().into()),
983 )
984 .await;
985
986 spawn_collect_n(
987 async move {
988 assert_eq!(
989 200,
990 send(addr, r#"[{"key1":"value1"},{"key2":"value2"}]"#).await
991 );
992
993 assert_eq!(
994 200,
995 send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await
996 );
997 },
998 rx,
999 4,
1000 )
1001 .await
1002 })
1003 .await;
1004
1005 {
1006 let event = events.remove(0);
1007 let log = event.as_log();
1008 assert_eq!(log["key1"], "value1".into());
1009 assert_event_metadata(log).await;
1010 }
1011 {
1012 let event = events.remove(0);
1013 let log = event.as_log();
1014 assert_eq!(log["key2"], "value2".into());
1015 assert_event_metadata(log).await;
1016 }
1017 {
1018 let event = events.remove(0);
1019 let log = event.as_log();
1020 assert_eq!(log["key1"], "value1".into());
1021 assert_event_metadata(log).await;
1022 }
1023 {
1024 let event = events.remove(0);
1025 let log = event.as_log();
1026 assert_eq!(log["key2"], "value2".into());
1027 assert_event_metadata(log).await;
1028 }
1029 }
1030
1031 async fn assert_event_metadata(log: &LogEvent) {
1032 assert!(log.get_timestamp().is_some());
1033
1034 let source_type_key_value = log
1035 .get((PathPrefix::Event, log_schema().source_type_key().unwrap()))
1036 .unwrap()
1037 .as_str()
1038 .unwrap();
1039 assert_eq!(source_type_key_value, SimpleHttpConfig::NAME);
1040 assert_eq!(log["http_path"], "/".into());
1041 }
1042
1043 #[tokio::test]
1044 async fn http_headers() {
1045 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1046 let mut headers = HeaderMap::new();
1047 headers.insert("User-Agent", "test_client".parse().unwrap());
1048 headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
1049 headers.insert("X-Test-Header", "true".parse().unwrap());
1050
1051 let (rx, addr) = source(
1052 vec![
1053 "User-Agent".to_string(),
1054 "Upgrade-Insecure-Requests".to_string(),
1055 "X-*".to_string(),
1056 "AbsentHeader".to_string(),
1057 ],
1058 vec![],
1059 "http_path",
1060 "remote_ip",
1061 "/",
1062 "POST",
1063 StatusCode::OK,
1064 None,
1065 true,
1066 EventStatus::Delivered,
1067 true,
1068 None,
1069 Some(JsonDeserializerConfig::default().into()),
1070 )
1071 .await;
1072
1073 spawn_ok_collect_n(
1074 send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1075 rx,
1076 1,
1077 )
1078 .await
1079 })
1080 .await;
1081
1082 {
1083 let event = events.remove(0);
1084 let log = event.as_log();
1085 assert_eq!(log["key1"], "value1".into());
1086 assert_eq!(log["\"User-Agent\""], "test_client".into());
1087 assert_eq!(log["\"Upgrade-Insecure-Requests\""], "false".into());
1088 assert_eq!(log["\"x-test-header\""], "true".into());
1089 assert_eq!(log["AbsentHeader"], Value::Null);
1090 assert_event_metadata(log).await;
1091 }
1092 }
1093
1094 #[tokio::test]
1095 async fn http_headers_wildcard() {
1096 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1097 let mut headers = HeaderMap::new();
1098 headers.insert("User-Agent", "test_client".parse().unwrap());
1099 headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
1100 headers.insert("key1", "value_from_header".parse().unwrap());
1102
1103 let (rx, addr) = source(
1104 vec!["*".to_string()],
1105 vec![],
1106 "http_path",
1107 "remote_ip",
1108 "/",
1109 "POST",
1110 StatusCode::OK,
1111 None,
1112 true,
1113 EventStatus::Delivered,
1114 true,
1115 None,
1116 Some(JsonDeserializerConfig::default().into()),
1117 )
1118 .await;
1119
1120 spawn_ok_collect_n(
1121 send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
1122 rx,
1123 1,
1124 )
1125 .await
1126 })
1127 .await;
1128
1129 {
1130 let event = events.remove(0);
1131 let log = event.as_log();
1132 assert_eq!(log["key1"], "value1".into());
1133 assert_eq!(log["\"user-agent\""], "test_client".into());
1134 assert_eq!(log["\"x-case-sensitive-value\""], "CaseSensitive".into());
1135 assert_event_metadata(log).await;
1136 }
1137 }
1138
1139 #[tokio::test]
1140 async fn http_query() {
1141 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1142 let (rx, addr) = source(
1143 vec![],
1144 vec![
1145 "source".to_string(),
1146 "region".to_string(),
1147 "absent".to_string(),
1148 ],
1149 "http_path",
1150 "remote_ip",
1151 "/",
1152 "POST",
1153 StatusCode::OK,
1154 None,
1155 true,
1156 EventStatus::Delivered,
1157 true,
1158 None,
1159 Some(JsonDeserializerConfig::default().into()),
1160 )
1161 .await;
1162
1163 spawn_ok_collect_n(
1164 send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging®ion=gb"),
1165 rx,
1166 1,
1167 )
1168 .await
1169 })
1170 .await;
1171
1172 {
1173 let event = events.remove(0);
1174 let log = event.as_log();
1175 assert_eq!(log["key1"], "value1".into());
1176 assert_eq!(log["source"], "staging".into());
1177 assert_eq!(log["region"], "gb".into());
1178 assert_eq!(log["absent"], Value::Null);
1179 assert_event_metadata(log).await;
1180 }
1181 }
1182
1183 #[tokio::test]
1184 async fn http_query_wildcard() {
1185 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1186 let (rx, addr) = source(
1187 vec![],
1188 vec!["*".to_string()],
1189 "http_path",
1190 "remote_ip",
1191 "/",
1192 "POST",
1193 StatusCode::OK,
1194 None,
1195 true,
1196 EventStatus::Delivered,
1197 true,
1198 None,
1199 Some(JsonDeserializerConfig::default().into()),
1200 )
1201 .await;
1202
1203 spawn_ok_collect_n(
1204 send_with_query(
1205 addr,
1206 "{\"key1\":\"value1\",\"key2\":\"value2\"}",
1207 "source=staging®ion=gb&key1=value_from_query",
1208 ),
1209 rx,
1210 1,
1211 )
1212 .await
1213 })
1214 .await;
1215
1216 {
1217 let event = events.remove(0);
1218 let log = event.as_log();
1219 assert_eq!(log["key1"], "value_from_query".into());
1220 assert_eq!(log["key2"], "value2".into());
1221 assert_eq!(log["source"], "staging".into());
1222 assert_eq!(log["region"], "gb".into());
1223 assert_event_metadata(log).await;
1224 }
1225 }
1226
1227 #[tokio::test]
1228 async fn http_gzip_deflate() {
1229 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1230 let body = "test body";
1231
1232 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
1233 encoder.write_all(body.as_bytes()).unwrap();
1234 let body = encoder.finish().unwrap();
1235
1236 let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
1237 encoder.write_all(body.as_slice()).unwrap();
1238 let body = encoder.finish().unwrap();
1239
1240 let mut headers = HeaderMap::new();
1241 headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap());
1242
1243 let (rx, addr) = source(
1244 vec![],
1245 vec![],
1246 "http_path",
1247 "remote_ip",
1248 "/",
1249 "POST",
1250 StatusCode::OK,
1251 None,
1252 true,
1253 EventStatus::Delivered,
1254 true,
1255 None,
1256 None,
1257 )
1258 .await;
1259
1260 spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await
1261 })
1262 .await;
1263
1264 {
1265 let event = events.remove(0);
1266 let log = event.as_log();
1267 assert_eq!(*log.get_message().unwrap(), "test body".into());
1268 assert_event_metadata(log).await;
1269 }
1270 }
1271
1272 #[tokio::test]
1273 async fn http_path() {
1274 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1275 let (rx, addr) = source(
1276 vec![],
1277 vec![],
1278 "vector_http_path",
1279 "vector_remote_ip",
1280 "/event/path",
1281 "POST",
1282 StatusCode::OK,
1283 None,
1284 true,
1285 EventStatus::Delivered,
1286 true,
1287 None,
1288 Some(JsonDeserializerConfig::default().into()),
1289 )
1290 .await;
1291
1292 spawn_ok_collect_n(
1293 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"),
1294 rx,
1295 1,
1296 )
1297 .await
1298 })
1299 .await;
1300
1301 {
1302 let event = events.remove(0);
1303 let log = event.as_log();
1304 assert_eq!(log["key1"], "value1".into());
1305 assert_eq!(log["vector_http_path"], "/event/path".into());
1306 assert!(log.get_timestamp().is_some());
1307 assert_eq!(
1308 *log.get_source_type().unwrap(),
1309 SimpleHttpConfig::NAME.into()
1310 );
1311 }
1312 }
1313
1314 #[tokio::test]
1315 async fn http_path_no_restriction() {
1316 let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1317 let (rx, addr) = source(
1318 vec![],
1319 vec![],
1320 "vector_http_path",
1321 "vector_remote_ip",
1322 "/event",
1323 "POST",
1324 StatusCode::OK,
1325 None,
1326 false,
1327 EventStatus::Delivered,
1328 true,
1329 None,
1330 Some(JsonDeserializerConfig::default().into()),
1331 )
1332 .await;
1333
1334 spawn_collect_n(
1335 async move {
1336 assert_eq!(
1337 200,
1338 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await
1339 );
1340 assert_eq!(
1341 200,
1342 send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await
1343 );
1344 },
1345 rx,
1346 2,
1347 )
1348 .await
1349 })
1350 .await;
1351
1352 {
1353 let event = events.remove(0);
1354 let log = event.as_log();
1355 assert_eq!(log["key1"], "value1".into());
1356 assert_eq!(log["vector_http_path"], "/event/path1".into());
1357 assert!(log.get_timestamp().is_some());
1358 assert_eq!(
1359 *log.get_source_type().unwrap(),
1360 SimpleHttpConfig::NAME.into()
1361 );
1362 }
1363 {
1364 let event = events.remove(0);
1365 let log = event.as_log();
1366 assert_eq!(log["key2"], "value2".into());
1367 assert_eq!(log["vector_http_path"], "/event/path2".into());
1368 assert!(log.get_timestamp().is_some());
1369 assert_eq!(
1370 *log.get_source_type().unwrap(),
1371 SimpleHttpConfig::NAME.into()
1372 );
1373 }
1374 }
1375
1376 #[tokio::test]
1377 async fn http_wrong_path() {
1378 components::init_test();
1379 let (_rx, addr) = source(
1380 vec![],
1381 vec![],
1382 "vector_http_path",
1383 "vector_remote_ip",
1384 "/",
1385 "POST",
1386 StatusCode::OK,
1387 None,
1388 true,
1389 EventStatus::Delivered,
1390 true,
1391 None,
1392 Some(JsonDeserializerConfig::default().into()),
1393 )
1394 .await;
1395
1396 assert_eq!(
1397 404,
1398 send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await
1399 );
1400 }
1401
1402 #[tokio::test]
1403 async fn http_status_code() {
1404 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
1405 let (rx, addr) = source(
1406 vec![],
1407 vec![],
1408 "http_path",
1409 "remote_ip",
1410 "/",
1411 "POST",
1412 StatusCode::ACCEPTED,
1413 None,
1414 true,
1415 EventStatus::Delivered,
1416 true,
1417 None,
1418 None,
1419 )
1420 .await;
1421
1422 spawn_collect_n(
1423 async move {
1424 assert_eq!(
1425 StatusCode::ACCEPTED,
1426 send(addr, "{\"key1\":\"value1\"}").await
1427 );
1428 },
1429 rx,
1430 1,
1431 )
1432 .await;
1433 })
1434 .await;
1435 }
1436
1437 #[tokio::test]
1438 async fn http_delivery_failure() {
1439 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1440 let (rx, addr) = source(
1441 vec![],
1442 vec![],
1443 "http_path",
1444 "remote_ip",
1445 "/",
1446 "POST",
1447 StatusCode::OK,
1448 None,
1449 true,
1450 EventStatus::Rejected,
1451 true,
1452 None,
1453 None,
1454 )
1455 .await;
1456
1457 spawn_collect_n(
1458 async move {
1459 assert_eq!(400, send(addr, "test body\n").await);
1460 },
1461 rx,
1462 1,
1463 )
1464 .await;
1465 })
1466 .await;
1467 }
1468
1469 #[tokio::test]
1470 async fn ignores_disabled_acknowledgements() {
1471 let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
1472 let (rx, addr) = source(
1473 vec![],
1474 vec![],
1475 "http_path",
1476 "remote_ip",
1477 "/",
1478 "POST",
1479 StatusCode::OK,
1480 None,
1481 true,
1482 EventStatus::Rejected,
1483 false,
1484 None,
1485 None,
1486 )
1487 .await;
1488
1489 spawn_collect_n(
1490 async move {
1491 assert_eq!(200, send(addr, "test body\n").await);
1492 },
1493 rx,
1494 1,
1495 )
1496 .await
1497 })
1498 .await;
1499
1500 assert_eq!(events.len(), 1);
1501 }
1502
1503 #[tokio::test]
1504 async fn http_get_method() {
1505 components::init_test();
1506 let (_rx, addr) = source(
1507 vec![],
1508 vec![],
1509 "http_path",
1510 "remote_ip",
1511 "/",
1512 "GET",
1513 StatusCode::OK,
1514 None,
1515 true,
1516 EventStatus::Delivered,
1517 true,
1518 None,
1519 None,
1520 )
1521 .await;
1522
1523 assert_eq!(200, send_request(addr, "GET", "", "/").await);
1524 }
1525
1526 #[tokio::test]
1527 async fn returns_401_when_required_auth_is_missing() {
1528 components::init_test();
1529 let (_rx, addr) = source(
1530 vec![],
1531 vec![],
1532 "http_path",
1533 "remote_ip",
1534 "/",
1535 "GET",
1536 StatusCode::OK,
1537 Some(HttpServerAuthConfig::Basic {
1538 username: "test".to_string(),
1539 password: "test".to_string().into(),
1540 }),
1541 true,
1542 EventStatus::Delivered,
1543 true,
1544 None,
1545 None,
1546 )
1547 .await;
1548
1549 assert_eq!(401, send_request(addr, "GET", "", "/").await);
1550 }
1551
1552 #[tokio::test]
1553 async fn returns_401_when_required_auth_is_wrong() {
1554 components::init_test();
1555 let (_rx, addr) = source(
1556 vec![],
1557 vec![],
1558 "http_path",
1559 "remote_ip",
1560 "/",
1561 "POST",
1562 StatusCode::OK,
1563 Some(HttpServerAuthConfig::Basic {
1564 username: "test".to_string(),
1565 password: "test".to_string().into(),
1566 }),
1567 true,
1568 EventStatus::Delivered,
1569 true,
1570 None,
1571 None,
1572 )
1573 .await;
1574
1575 let mut headers = HeaderMap::new();
1576 headers.insert(
1577 AUTHORIZATION,
1578 Authorization::basic("wrong", "test").0.encode(),
1579 );
1580 assert_eq!(401, send_with_headers(addr, "", headers).await);
1581 }
1582
1583 #[tokio::test]
1584 async fn http_get_with_correct_auth() {
1585 components::init_test();
1586 let (_rx, addr) = source(
1587 vec![],
1588 vec![],
1589 "http_path",
1590 "remote_ip",
1591 "/",
1592 "POST",
1593 StatusCode::OK,
1594 Some(HttpServerAuthConfig::Basic {
1595 username: "test".to_string(),
1596 password: "test".to_string().into(),
1597 }),
1598 true,
1599 EventStatus::Delivered,
1600 true,
1601 None,
1602 None,
1603 )
1604 .await;
1605
1606 let mut headers = HeaderMap::new();
1607 headers.insert(
1608 AUTHORIZATION,
1609 Authorization::basic("test", "test").0.encode(),
1610 );
1611 assert_eq!(200, send_with_headers(addr, "", headers).await);
1612 }
1613
1614 #[test]
1615 fn output_schema_definition_vector_namespace() {
1616 let config = SimpleHttpConfig {
1617 log_namespace: Some(true),
1618 ..Default::default()
1619 };
1620
1621 let definitions = config
1622 .outputs(LogNamespace::Vector)
1623 .remove(0)
1624 .schema_definition(true);
1625
1626 let expected_definition =
1627 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1628 .with_meaning(OwnedTargetPath::event_root(), "message")
1629 .with_metadata_field(
1630 &owned_value_path!("vector", "source_type"),
1631 Kind::bytes(),
1632 None,
1633 )
1634 .with_metadata_field(
1635 &owned_value_path!(SimpleHttpConfig::NAME, "path"),
1636 Kind::bytes(),
1637 None,
1638 )
1639 .with_metadata_field(
1640 &owned_value_path!(SimpleHttpConfig::NAME, "headers"),
1641 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1642 None,
1643 )
1644 .with_metadata_field(
1645 &owned_value_path!(SimpleHttpConfig::NAME, "query_parameters"),
1646 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1647 None,
1648 )
1649 .with_metadata_field(
1650 &owned_value_path!(SimpleHttpConfig::NAME, "host"),
1651 Kind::bytes().or_undefined(),
1652 None,
1653 )
1654 .with_metadata_field(
1655 &owned_value_path!("vector", "ingest_timestamp"),
1656 Kind::timestamp(),
1657 None,
1658 );
1659
1660 assert_eq!(definitions, Some(expected_definition))
1661 }
1662
1663 #[test]
1664 fn output_schema_definition_legacy_namespace() {
1665 let config = SimpleHttpConfig::default();
1666
1667 let definitions = config
1668 .outputs(LogNamespace::Legacy)
1669 .remove(0)
1670 .schema_definition(true);
1671
1672 let expected_definition = Definition::new_with_default_metadata(
1673 Kind::object(Collection::empty()),
1674 [LogNamespace::Legacy],
1675 )
1676 .with_event_field(
1677 &owned_value_path!("message"),
1678 Kind::bytes(),
1679 Some("message"),
1680 )
1681 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1682 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1683 .with_event_field(&owned_value_path!("path"), Kind::bytes(), None)
1684 .with_event_field(
1685 &owned_value_path!("host"),
1686 Kind::bytes().or_undefined(),
1687 None,
1688 )
1689 .unknown_fields(Kind::bytes());
1690
1691 assert_eq!(definitions, Some(expected_definition))
1692 }
1693
1694 #[test]
1695 fn validate_remove_duplicates() {
1696 let mut list = vec![
1697 "a".to_owned(),
1698 "b".to_owned(),
1699 "c".to_owned(),
1700 "d".to_owned(),
1701 ];
1702
1703 {
1705 let list_dedup = remove_duplicates(list.clone(), "foo");
1706
1707 assert_eq!(list, list_dedup);
1708 }
1709
1710 list.push("b".to_owned());
1711
1712 {
1714 let list_dedup = remove_duplicates(list.clone(), "foo");
1715 assert_eq!(
1716 vec![
1717 "a".to_owned(),
1718 "b".to_owned(),
1719 "c".to_owned(),
1720 "d".to_owned()
1721 ],
1722 list_dedup
1723 );
1724 }
1725 }
1726
1727 impl ValidatableComponent for SimpleHttpConfig {
1728 fn validation_configuration() -> ValidationConfiguration {
1729 let config = Self {
1730 decoding: Some(DeserializerConfig::Json(Default::default())),
1731 ..Default::default()
1732 };
1733
1734 let log_namespace: LogNamespace = config.log_namespace.unwrap_or(false).into();
1735
1736 let listen_addr_http = format!("http://{}/", config.address);
1737 let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
1738
1739 let external_resource = ExternalResource::new(
1740 ResourceDirection::Push,
1741 HttpResourceConfig::from_parts(uri, Some(config.method.into())),
1742 config
1743 .get_decoding_config()
1744 .expect("should not fail to get decoding config"),
1745 );
1746
1747 ValidationConfiguration::from_source(
1748 Self::NAME,
1749 log_namespace,
1750 vec![ComponentTestCaseConfig::from_source(
1751 config,
1752 None,
1753 Some(external_resource),
1754 )],
1755 )
1756 }
1757 }
1758
1759 register_validatable_component!(SimpleHttpConfig);
1760}