vector/components/validation/resources/
http.rs1use std::{
2 collections::{HashMap, VecDeque},
3 future::Future,
4 net::{IpAddr, SocketAddr},
5 str::FromStr,
6 sync::Arc,
7};
8
9use axum::{
10 Router,
11 response::IntoResponse,
12 routing::{MethodFilter, MethodRouter},
13};
14use bytes::{BufMut as _, BytesMut};
15use http::{Method, Request, StatusCode, Uri};
16use http_body::{Body as _, Collected};
17use hyper::{Body, Client, Server};
18use tokio::{
19 select,
20 sync::{Mutex, Notify, mpsc, oneshot},
21};
22use tokio_util::codec::Decoder;
23use vector_lib::{
24 EstimatedJsonEncodedSizeOf,
25 codecs::{
26 CharacterDelimitedEncoder,
27 encoding::{Framer, Serializer::Json},
28 },
29 config::LogNamespace,
30 event::Event,
31};
32
33use super::{ResourceCodec, ResourceDirection, TestEvent, encode_test_event};
34use crate::components::validation::{
35 RunnerMetrics,
36 sync::{Configuring, TaskCoordinator},
37};
38
39#[derive(Clone)]
41pub struct HttpResourceConfig {
42 uri: Uri,
43 method: Option<Method>,
44 headers: Option<HashMap<String, String>>,
45}
46
47impl HttpResourceConfig {
48 pub const fn from_parts(uri: Uri, method: Option<Method>) -> Self {
49 Self {
50 uri,
51 method,
52 headers: None,
53 }
54 }
55
56 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
57 self.headers = Some(headers);
58 self
59 }
60
61 pub fn spawn_as_input(
62 self,
63 direction: ResourceDirection,
64 codec: ResourceCodec,
65 input_rx: mpsc::Receiver<TestEvent>,
66 task_coordinator: &TaskCoordinator<Configuring>,
67 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
68 ) {
69 match direction {
70 ResourceDirection::Pull => {
72 spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
73 }
74 ResourceDirection::Push => {
76 spawn_input_http_client(self, codec, input_rx, task_coordinator, runner_metrics)
77 }
78 }
79 }
80
81 pub fn spawn_as_output(self, ctx: HttpResourceOutputContext) -> vector_lib::Result<()> {
82 match ctx.direction {
83 ResourceDirection::Pull => Ok(ctx.spawn_output_http_client(self)),
85 ResourceDirection::Push => ctx.spawn_output_http_server(self),
87 }
88 }
89}
90
91#[allow(clippy::missing_const_for_fn)]
93fn spawn_input_http_server(
94 config: HttpResourceConfig,
95 codec: ResourceCodec,
96 mut input_rx: mpsc::Receiver<TestEvent>,
97 task_coordinator: &TaskCoordinator<Configuring>,
98 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
99) {
100 let outstanding_events = Arc::new(Mutex::new(VecDeque::new()));
105
106 let encoder = codec.into_encoder();
108 let sendable_events = Arc::clone(&outstanding_events);
109
110 let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
111 task_coordinator,
112 &config,
113 runner_metrics,
114 move |_request, _runner_metrics| {
115 let sendable_events = Arc::clone(&sendable_events);
116 let mut encoder = encoder.clone();
117
118 async move {
119 let mut sendable_events = sendable_events.lock().await;
120 match sendable_events.pop_front() {
121 Some(event) => {
122 let mut buffer = BytesMut::new();
123 encode_test_event(&mut encoder, &mut buffer, event);
124
125 buffer.into_response()
126 }
127 _ => {
128 StatusCode::OK.into_response()
132 }
133 }
134 }
135 },
136 );
137
138 let resource_started = task_coordinator.track_started();
141 let resource_completed = task_coordinator.track_completed();
142 let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();
143
144 tokio::spawn(async move {
145 resource_started.mark_as_done();
146 info!("HTTP server external input resource started.");
147
148 let mut input_finished = false;
149
150 loop {
151 select! {
152 maybe_event = input_rx.recv(), if !input_finished => match maybe_event {
158 Some(event) => {
159 let mut outstanding_events = outstanding_events.lock().await;
160 outstanding_events.push_back(event);
161 },
162 None => {
163 info!("HTTP server external input resource input is finished.");
164 input_finished = true;
165 },
166 },
167
168 _ = resource_notifier.notified() => {
169 if input_finished {
176 let outstanding_events = outstanding_events.lock().await;
177 if outstanding_events.is_empty() {
178 break
179 }
180 }
181 },
182 }
183 }
184 info!("HTTP server external input resource signalling ready for shutdown.");
187
188 resource_shutdown_rx.wait().await;
190
191 _ = http_server_shutdown_tx.send(());
193
194 info!("HTTP server external input resource marking as done.");
195 resource_completed.mark_as_done();
196
197 info!("HTTP server external input resource completed.");
198 });
199}
200
201fn spawn_input_http_client(
203 config: HttpResourceConfig,
204 codec: ResourceCodec,
205 mut input_rx: mpsc::Receiver<TestEvent>,
206 task_coordinator: &TaskCoordinator<Configuring>,
207 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
208) {
209 let started = task_coordinator.track_started();
212 let completed = task_coordinator.track_completed();
213 let mut encoder = codec.into_encoder();
214 let runner_metrics = Arc::clone(runner_metrics);
215
216 tokio::spawn(async move {
217 started.mark_as_done();
220 info!("HTTP client external input resource started.");
221
222 let client = Client::builder().build_http::<Body>();
223 let request_uri = config.uri;
224 let request_method = config.method.unwrap_or(Method::POST);
225 let headers = config.headers.unwrap_or_default();
226
227 while let Some(event) = input_rx.recv().await {
228 debug!("Got event to send from runner.");
229
230 let mut buffer = BytesMut::new();
231
232 let is_json = matches!(encoder.serializer(), Json(_))
233 && matches!(
234 encoder.framer(),
235 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })
236 );
237
238 if is_json {
239 buffer.put_u8(b'[');
240 }
241
242 encode_test_event(&mut encoder, &mut buffer, event);
243
244 if is_json {
245 if !buffer.is_empty() {
246 buffer.truncate(buffer.len() - 1);
248 }
249 buffer.put_u8(b']');
250
251 let mut runner_metrics = runner_metrics.lock().await;
254 runner_metrics.sent_bytes_total += 1;
255 }
256
257 let mut request_builder = Request::builder()
258 .uri(request_uri.clone())
259 .method(request_method.clone());
260
261 for (key, value) in &headers {
262 request_builder = request_builder.header(key, value);
263 }
264
265 let request = request_builder
266 .body(buffer.freeze().into())
267 .expect("should not fail to build request");
268
269 match client.request(request).await {
270 Ok(_response) => {
271 debug!("Got response from server.");
273 }
274 Err(e) => {
275 error!("Failed to send request: {}", e);
277 }
278 }
279 }
280
281 completed.mark_as_done();
283
284 info!("HTTP client external input resource completed.");
285 });
286}
287
288pub struct HttpResourceOutputContext<'a> {
290 pub direction: ResourceDirection,
291 pub codec: ResourceCodec,
292 pub output_tx: mpsc::Sender<Vec<Event>>,
293 pub task_coordinator: &'a TaskCoordinator<Configuring>,
294 pub input_events: Vec<TestEvent>,
295 pub runner_metrics: &'a Arc<Mutex<RunnerMetrics>>,
296 pub log_namespace: LogNamespace,
297}
298
299impl HttpResourceOutputContext<'_> {
300 #[allow(clippy::missing_const_for_fn)]
302 fn spawn_output_http_server(&self, config: HttpResourceConfig) -> vector_lib::Result<()> {
303 let decoder = self.codec.into_decoder(self.log_namespace)?;
308
309 let should_reject = self
316 .input_events
317 .iter()
318 .filter(|te| te.should_reject())
319 .count()
320 > 0;
321
322 let output_tx = self.output_tx.clone();
323 let (_, http_server_shutdown_tx) = spawn_http_server(
324 self.task_coordinator,
325 &config,
326 self.runner_metrics,
327 move |request, output_runner_metrics| {
328 let output_tx = output_tx.clone();
329 let mut decoder = decoder.clone();
330
331 async move {
332 let content_encoding = request
334 .headers()
335 .get("content-encoding")
336 .and_then(|v| v.to_str().ok())
337 .map(|s| s.to_string());
338
339 match request.into_body().collect().await.map(Collected::to_bytes) {
340 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
341 Ok(body) => {
342 let byte_size = body.len();
343
344 if let Some(encoding) = &content_encoding
346 && encoding != "identity"
347 {
348 error!(
349 "Received compressed data (Content-Encoding: {encoding}). \
350 Validation tests assert on bytes sizes and compressed size might not be deterministic."
351 );
352 return StatusCode::BAD_REQUEST.into_response();
353 }
354
355 let mut body = BytesMut::from(&body[..]);
356 loop {
357 match decoder.decode_eof(&mut body) {
358 Ok(Some((events, decoded_byte_size))) => {
361 if should_reject {
362 info!(
363 "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.",
364 );
365 } else {
366 let mut output_runner_metrics =
367 output_runner_metrics.lock().await;
368 info!(
369 "HTTP server external output resource decoded {decoded_byte_size:?} bytes."
370 );
371
372 output_runner_metrics.received_bytes_total +=
375 byte_size as u64;
376
377 output_runner_metrics.received_events_total +=
378 events.len() as u64;
379
380 events.iter().for_each(|event| {
381 output_runner_metrics.received_event_bytes_total +=
382 event.estimated_json_encoded_size_of().get()
383 as u64;
384 });
385
386 output_tx
387 .send(events.to_vec())
388 .await
389 .expect("should not fail to send output event");
390 }
391 }
392 Ok(None) => {
393 if should_reject {
394 return StatusCode::BAD_REQUEST.into_response();
397 } else {
398 return StatusCode::OK.into_response();
399 }
400 }
401 Err(_) => {
402 error!(
403 "HTTP server failed to decode body: {:?}",
404 String::from_utf8_lossy(&body)
405 );
406 return StatusCode::INTERNAL_SERVER_ERROR.into_response();
407 }
408 }
409 }
410 }
411 }
412 }
413 },
414 );
415
416 let resource_started = self.task_coordinator.track_started();
419 let resource_completed = self.task_coordinator.track_completed();
420 let mut resource_shutdown_rx = self.task_coordinator.register_for_shutdown();
421
422 tokio::spawn(async move {
423 resource_started.mark_as_done();
424 info!("HTTP server external output resource started.");
425
426 resource_shutdown_rx.wait().await;
428
429 let _ = http_server_shutdown_tx.send(());
431
432 resource_completed.mark_as_done();
434
435 info!("HTTP server external output resource completed.");
436 });
437
438 Ok(())
439 }
440
441 #[allow(clippy::missing_const_for_fn)]
443 fn spawn_output_http_client(&self, _config: HttpResourceConfig) {
444 todo!()
448 }
449}
450
451fn spawn_http_server<H, F, R>(
452 task_coordinator: &TaskCoordinator<Configuring>,
453 config: &HttpResourceConfig,
454 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
455 handler: H,
456) -> (Arc<Notify>, oneshot::Sender<()>)
457where
458 H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
459 F: Future<Output = R> + Send,
460 R: IntoResponse,
461{
462 let http_server_started = task_coordinator.track_started();
463 let http_server_completed = task_coordinator.track_completed();
464
465 let listen_addr = socketaddr_from_uri(&config.uri);
466 let request_path = config
467 .uri
468 .path_and_query()
469 .map(|pq| pq.as_str().to_string())
470 .unwrap_or_else(|| "/".to_string());
471 let request_method = config.method.clone().unwrap_or(Method::POST);
472
473 let (http_server_shutdown_tx, http_server_shutdown_rx) = oneshot::channel();
480 let resource_notifier = Arc::new(Notify::new());
481 let server_notifier = Arc::clone(&resource_notifier);
482
483 let output_runner_metrics = Arc::clone(runner_metrics);
484
485 tokio::spawn(async move {
486 let server_builder =
489 Server::try_bind(&listen_addr).expect("Failed to bind to listen address.");
490
491 let method_filter = MethodFilter::try_from(request_method)
499 .expect("should not fail to convert method to method filter");
500 let method_router = MethodRouter::new()
501 .fallback(|req: Request<Body>| async move {
502 error!(
503 path = req.uri().path(),
504 method = req.method().as_str(),
505 "Component sent request to a different path/method than expected."
506 );
507
508 StatusCode::METHOD_NOT_ALLOWED
509 })
510 .on(method_filter, move |request: Request<Body>| {
511 let request_handler = handler(request, output_runner_metrics);
512 let notifier = Arc::clone(&server_notifier);
513
514 async move {
515 let response = request_handler.await;
516 notifier.notify_one();
517 response
518 }
519 });
520
521 let router = Router::new().route(&request_path, method_router).fallback(
522 |req: Request<Body>| async move {
523 error!(?req, "Component sent request the server could not route.");
524 StatusCode::NOT_FOUND
525 },
526 );
527
528 http_server_started.mark_as_done();
530
531 let server = server_builder
532 .serve(router.into_make_service())
533 .with_graceful_shutdown(async {
534 http_server_shutdown_rx.await.ok();
535 });
536
537 if let Err(e) = server.await {
538 error!(error = ?e, "HTTP server encountered an error.");
539 }
540
541 http_server_completed.mark_as_done();
542 });
543
544 (resource_notifier, http_server_shutdown_tx)
545}
546
547fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
548 let uri_port = uri.port_u16().unwrap_or(80);
549 let uri_host = uri
550 .host()
551 .ok_or_else(|| "host must be present in URI".to_string())
552 .and_then(|host| {
553 IpAddr::from_str(host)
554 .map_err(|_| "URI host must be valid IPv4/IPv6 address".to_string())
555 })
556 .expect("HTTP URI not valid");
557
558 SocketAddr::from((uri_host, uri_port))
559}