vector/components/validation/resources/
http.rs1use std::{
2 collections::{HashMap, VecDeque},
3 future::Future,
4 net::{IpAddr, SocketAddr},
5 str::FromStr,
6 sync::Arc,
7};
8
9use axum::{
10 response::IntoResponse,
11 routing::{MethodFilter, MethodRouter},
12 Router,
13};
14use bytes::{BufMut as _, BytesMut};
15use http::{Method, Request, StatusCode, Uri};
16use hyper::{Body, Client, Server};
17use tokio::{
18 select,
19 sync::{mpsc, oneshot, Mutex, Notify},
20};
21use tokio_util::codec::Decoder;
22
23use crate::components::validation::{
24 sync::{Configuring, TaskCoordinator},
25 RunnerMetrics,
26};
27use vector_lib::{
28 codecs::encoding::Framer, codecs::encoding::Serializer::Json,
29 codecs::CharacterDelimitedEncoder, config::LogNamespace, event::Event,
30 EstimatedJsonEncodedSizeOf,
31};
32
33use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent};
34
35#[derive(Clone)]
37pub struct HttpResourceConfig {
38 uri: Uri,
39 method: Option<Method>,
40 headers: Option<HashMap<String, String>>,
41}
42
43impl HttpResourceConfig {
44 pub const fn from_parts(uri: Uri, method: Option<Method>) -> Self {
45 Self {
46 uri,
47 method,
48 headers: None,
49 }
50 }
51
52 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
53 self.headers = Some(headers);
54 self
55 }
56
57 pub fn spawn_as_input(
58 self,
59 direction: ResourceDirection,
60 codec: ResourceCodec,
61 input_rx: mpsc::Receiver<TestEvent>,
62 task_coordinator: &TaskCoordinator<Configuring>,
63 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
64 ) {
65 match direction {
66 ResourceDirection::Pull => {
68 spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
69 }
70 ResourceDirection::Push => {
72 spawn_input_http_client(self, codec, input_rx, task_coordinator, runner_metrics)
73 }
74 }
75 }
76
77 pub fn spawn_as_output(self, ctx: HttpResourceOutputContext) -> vector_lib::Result<()> {
78 match ctx.direction {
79 ResourceDirection::Pull => Ok(ctx.spawn_output_http_client(self)),
81 ResourceDirection::Push => ctx.spawn_output_http_server(self),
83 }
84 }
85}
86
87#[allow(clippy::missing_const_for_fn)]
89fn spawn_input_http_server(
90 config: HttpResourceConfig,
91 codec: ResourceCodec,
92 mut input_rx: mpsc::Receiver<TestEvent>,
93 task_coordinator: &TaskCoordinator<Configuring>,
94 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
95) {
96 let outstanding_events = Arc::new(Mutex::new(VecDeque::new()));
101
102 let encoder = codec.into_encoder();
104 let sendable_events = Arc::clone(&outstanding_events);
105
106 let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
107 task_coordinator,
108 &config,
109 runner_metrics,
110 move |_request, _runner_metrics| {
111 let sendable_events = Arc::clone(&sendable_events);
112 let mut encoder = encoder.clone();
113
114 async move {
115 let mut sendable_events = sendable_events.lock().await;
116 match sendable_events.pop_front() {
117 Some(event) => {
118 let mut buffer = BytesMut::new();
119 encode_test_event(&mut encoder, &mut buffer, event);
120
121 buffer.into_response()
122 }
123 _ => {
124 StatusCode::OK.into_response()
128 }
129 }
130 }
131 },
132 );
133
134 let resource_started = task_coordinator.track_started();
137 let resource_completed = task_coordinator.track_completed();
138 let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();
139
140 tokio::spawn(async move {
141 resource_started.mark_as_done();
142 info!("HTTP server external input resource started.");
143
144 let mut input_finished = false;
145
146 loop {
147 select! {
148 maybe_event = input_rx.recv(), if !input_finished => match maybe_event {
154 Some(event) => {
155 let mut outstanding_events = outstanding_events.lock().await;
156 outstanding_events.push_back(event);
157 },
158 None => {
159 info!("HTTP server external input resource input is finished.");
160 input_finished = true;
161 },
162 },
163
164 _ = resource_notifier.notified() => {
165 if input_finished {
172 let outstanding_events = outstanding_events.lock().await;
173 if outstanding_events.is_empty() {
174 break
175 }
176 }
177 },
178 }
179 }
180 info!("HTTP server external input resource signalling ready for shutdown.");
183
184 resource_shutdown_rx.wait().await;
186
187 _ = http_server_shutdown_tx.send(());
189
190 info!("HTTP server external input resource marking as done.");
191 resource_completed.mark_as_done();
192
193 info!("HTTP server external input resource completed.");
194 });
195}
196
197fn spawn_input_http_client(
199 config: HttpResourceConfig,
200 codec: ResourceCodec,
201 mut input_rx: mpsc::Receiver<TestEvent>,
202 task_coordinator: &TaskCoordinator<Configuring>,
203 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
204) {
205 let started = task_coordinator.track_started();
208 let completed = task_coordinator.track_completed();
209 let mut encoder = codec.into_encoder();
210 let runner_metrics = Arc::clone(runner_metrics);
211
212 tokio::spawn(async move {
213 started.mark_as_done();
216 info!("HTTP client external input resource started.");
217
218 let client = Client::builder().build_http::<Body>();
219 let request_uri = config.uri;
220 let request_method = config.method.unwrap_or(Method::POST);
221 let headers = config.headers.unwrap_or_default();
222
223 while let Some(event) = input_rx.recv().await {
224 debug!("Got event to send from runner.");
225
226 let mut buffer = BytesMut::new();
227
228 let is_json = matches!(encoder.serializer(), Json(_))
229 && matches!(
230 encoder.framer(),
231 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })
232 );
233
234 if is_json {
235 buffer.put_u8(b'[');
236 }
237
238 encode_test_event(&mut encoder, &mut buffer, event);
239
240 if is_json {
241 if !buffer.is_empty() {
242 buffer.truncate(buffer.len() - 1);
244 }
245 buffer.put_u8(b']');
246
247 let mut runner_metrics = runner_metrics.lock().await;
250 runner_metrics.sent_bytes_total += 1;
251 }
252
253 let mut request_builder = Request::builder()
254 .uri(request_uri.clone())
255 .method(request_method.clone());
256
257 for (key, value) in &headers {
258 request_builder = request_builder.header(key, value);
259 }
260
261 let request = request_builder
262 .body(buffer.freeze().into())
263 .expect("should not fail to build request");
264
265 match client.request(request).await {
266 Ok(_response) => {
267 debug!("Got response from server.");
269 }
270 Err(e) => {
271 error!("Failed to send request: {}", e);
273 }
274 }
275 }
276
277 completed.mark_as_done();
279
280 info!("HTTP client external input resource completed.");
281 });
282}
283
284pub struct HttpResourceOutputContext<'a> {
286 pub direction: ResourceDirection,
287 pub codec: ResourceCodec,
288 pub output_tx: mpsc::Sender<Vec<Event>>,
289 pub task_coordinator: &'a TaskCoordinator<Configuring>,
290 pub input_events: Vec<TestEvent>,
291 pub runner_metrics: &'a Arc<Mutex<RunnerMetrics>>,
292 pub log_namespace: LogNamespace,
293}
294
295impl HttpResourceOutputContext<'_> {
296 #[allow(clippy::missing_const_for_fn)]
298 fn spawn_output_http_server(&self, config: HttpResourceConfig) -> vector_lib::Result<()> {
299 let decoder = self.codec.into_decoder(self.log_namespace)?;
304
305 let should_reject = self
312 .input_events
313 .iter()
314 .filter(|te| te.should_reject())
315 .count()
316 > 0;
317
318 let output_tx = self.output_tx.clone();
319 let (_, http_server_shutdown_tx) = spawn_http_server(
320 self.task_coordinator,
321 &config,
322 self.runner_metrics,
323 move |request, output_runner_metrics| {
324 let output_tx = output_tx.clone();
325 let mut decoder = decoder.clone();
326
327 async move {
328 match hyper::body::to_bytes(request.into_body()).await {
329 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
330 Ok(body) => {
331 let byte_size = body.len();
332 let mut body = BytesMut::from(&body[..]);
333 loop {
334 match decoder.decode_eof(&mut body) {
335 Ok(Some((events, decoded_byte_size))) => {
338 if should_reject {
339 info!(internal_log_rate_limit = true, "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.", );
340 } else {
341 let mut output_runner_metrics =
342 output_runner_metrics.lock().await;
343 info!(internal_log_rate_limit = true, "HTTP server external output resource decoded {decoded_byte_size:?} bytes.");
344
345 output_runner_metrics.received_bytes_total +=
348 byte_size as u64;
349
350 output_runner_metrics.received_events_total +=
351 events.len() as u64;
352
353 events.iter().for_each(|event| {
354 output_runner_metrics.received_event_bytes_total +=
355 event.estimated_json_encoded_size_of().get()
356 as u64;
357 });
358
359 output_tx
360 .send(events.to_vec())
361 .await
362 .expect("should not fail to send output event");
363 }
364 }
365 Ok(None) => {
366 if should_reject {
367 return StatusCode::BAD_REQUEST.into_response();
370 } else {
371 return StatusCode::OK.into_response();
372 }
373 }
374 Err(_) => {
375 error!(
376 "HTTP server failed to decode {:?}",
377 String::from_utf8_lossy(&body)
378 );
379 return StatusCode::INTERNAL_SERVER_ERROR.into_response();
380 }
381 }
382 }
383 }
384 }
385 }
386 },
387 );
388
389 let resource_started = self.task_coordinator.track_started();
392 let resource_completed = self.task_coordinator.track_completed();
393 let mut resource_shutdown_rx = self.task_coordinator.register_for_shutdown();
394
395 tokio::spawn(async move {
396 resource_started.mark_as_done();
397 info!("HTTP server external output resource started.");
398
399 resource_shutdown_rx.wait().await;
401
402 let _ = http_server_shutdown_tx.send(());
404
405 resource_completed.mark_as_done();
407
408 info!("HTTP server external output resource completed.");
409 });
410
411 Ok(())
412 }
413
414 #[allow(clippy::missing_const_for_fn)]
416 fn spawn_output_http_client(&self, _config: HttpResourceConfig) {
417 todo!()
421 }
422}
423
424fn spawn_http_server<H, F, R>(
425 task_coordinator: &TaskCoordinator<Configuring>,
426 config: &HttpResourceConfig,
427 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
428 handler: H,
429) -> (Arc<Notify>, oneshot::Sender<()>)
430where
431 H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
432 F: Future<Output = R> + Send,
433 R: IntoResponse,
434{
435 let http_server_started = task_coordinator.track_started();
436 let http_server_completed = task_coordinator.track_completed();
437
438 let listen_addr = socketaddr_from_uri(&config.uri);
439 let request_path = config
440 .uri
441 .path_and_query()
442 .map(|pq| pq.as_str().to_string())
443 .unwrap_or_else(|| "/".to_string());
444 let request_method = config.method.clone().unwrap_or(Method::POST);
445
446 let (http_server_shutdown_tx, http_server_shutdown_rx) = oneshot::channel();
453 let resource_notifier = Arc::new(Notify::new());
454 let server_notifier = Arc::clone(&resource_notifier);
455
456 let output_runner_metrics = Arc::clone(runner_metrics);
457
458 tokio::spawn(async move {
459 let server_builder =
462 Server::try_bind(&listen_addr).expect("Failed to bind to listen address.");
463
464 let method_filter = MethodFilter::try_from(request_method)
472 .expect("should not fail to convert method to method filter");
473 let method_router = MethodRouter::new()
474 .fallback(|req: Request<Body>| async move {
475 error!(
476 path = req.uri().path(),
477 method = req.method().as_str(),
478 "Component sent request to a different path/method than expected."
479 );
480
481 StatusCode::METHOD_NOT_ALLOWED
482 })
483 .on(method_filter, move |request: Request<Body>| {
484 let request_handler = handler(request, output_runner_metrics);
485 let notifier = Arc::clone(&server_notifier);
486
487 async move {
488 let response = request_handler.await;
489 notifier.notify_one();
490 response
491 }
492 });
493
494 let router = Router::new().route(&request_path, method_router).fallback(
495 |req: Request<Body>| async move {
496 error!(?req, "Component sent request the server could not route.");
497 StatusCode::NOT_FOUND
498 },
499 );
500
501 http_server_started.mark_as_done();
503
504 let server = server_builder
505 .serve(router.into_make_service())
506 .with_graceful_shutdown(async {
507 http_server_shutdown_rx.await.ok();
508 });
509
510 if let Err(e) = server.await {
511 error!(error = ?e, "HTTP server encountered an error.");
512 }
513
514 http_server_completed.mark_as_done();
515 });
516
517 (resource_notifier, http_server_shutdown_tx)
518}
519
520fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
521 let uri_port = uri.port_u16().unwrap_or(80);
522 let uri_host = uri
523 .host()
524 .ok_or_else(|| "host must be present in URI".to_string())
525 .and_then(|host| {
526 IpAddr::from_str(host)
527 .map_err(|_| "URI host must be valid IPv4/IPv6 address".to_string())
528 })
529 .expect("HTTP URI not valid");
530
531 SocketAddr::from((uri_host, uri_port))
532}