vector/components/validation/resources/
http.rs1use std::{
2 collections::{HashMap, VecDeque},
3 future::Future,
4 net::{IpAddr, SocketAddr},
5 str::FromStr,
6 sync::Arc,
7};
8
9use axum::{
10 Router,
11 response::IntoResponse,
12 routing::{MethodFilter, MethodRouter},
13};
14use bytes::{BufMut as _, BytesMut};
15use http::{Method, Request, StatusCode, Uri};
16use http_body::{Body as _, Collected};
17use hyper::{Body, Client, Server};
18use tokio::{
19 select,
20 sync::{Mutex, Notify, mpsc, oneshot},
21};
22use tokio_util::codec::Decoder;
23use vector_lib::{
24 EstimatedJsonEncodedSizeOf,
25 codecs::{
26 CharacterDelimitedEncoder,
27 encoding::{Framer, Serializer::Json},
28 },
29 config::LogNamespace,
30 event::Event,
31};
32
33use super::{ResourceCodec, ResourceDirection, TestEvent, encode_test_event};
34use crate::components::validation::{
35 RunnerMetrics,
36 sync::{Configuring, TaskCoordinator},
37};
38
39#[derive(Clone)]
41pub struct HttpResourceConfig {
42 uri: Uri,
43 method: Option<Method>,
44 headers: Option<HashMap<String, String>>,
45}
46
47impl HttpResourceConfig {
48 pub const fn from_parts(uri: Uri, method: Option<Method>) -> Self {
49 Self {
50 uri,
51 method,
52 headers: None,
53 }
54 }
55
56 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
57 self.headers = Some(headers);
58 self
59 }
60
61 pub fn spawn_as_input(
62 self,
63 direction: ResourceDirection,
64 codec: ResourceCodec,
65 input_rx: mpsc::Receiver<TestEvent>,
66 task_coordinator: &TaskCoordinator<Configuring>,
67 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
68 ) {
69 match direction {
70 ResourceDirection::Pull => {
72 spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
73 }
74 ResourceDirection::Push => {
76 spawn_input_http_client(self, codec, input_rx, task_coordinator, runner_metrics)
77 }
78 }
79 }
80
81 pub fn spawn_as_output(self, ctx: HttpResourceOutputContext) -> vector_lib::Result<()> {
82 match ctx.direction {
83 ResourceDirection::Pull => Ok(ctx.spawn_output_http_client(self)),
85 ResourceDirection::Push => ctx.spawn_output_http_server(self),
87 }
88 }
89}
90
91#[allow(clippy::missing_const_for_fn)]
93fn spawn_input_http_server(
94 config: HttpResourceConfig,
95 codec: ResourceCodec,
96 mut input_rx: mpsc::Receiver<TestEvent>,
97 task_coordinator: &TaskCoordinator<Configuring>,
98 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
99) {
100 let outstanding_events = Arc::new(Mutex::new(VecDeque::new()));
105
106 let encoder = codec.into_encoder();
108 let sendable_events = Arc::clone(&outstanding_events);
109
110 let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
111 task_coordinator,
112 &config,
113 runner_metrics,
114 move |_request, _runner_metrics| {
115 let sendable_events = Arc::clone(&sendable_events);
116 let mut encoder = encoder.clone();
117
118 async move {
119 let mut sendable_events = sendable_events.lock().await;
120 match sendable_events.pop_front() {
121 Some(event) => {
122 let mut buffer = BytesMut::new();
123 encode_test_event(&mut encoder, &mut buffer, event);
124
125 buffer.into_response()
126 }
127 _ => {
128 StatusCode::OK.into_response()
132 }
133 }
134 }
135 },
136 );
137
138 let resource_started = task_coordinator.track_started();
141 let resource_completed = task_coordinator.track_completed();
142 let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();
143
144 tokio::spawn(async move {
145 resource_started.mark_as_done();
146 info!("HTTP server external input resource started.");
147
148 let mut input_finished = false;
149
150 loop {
151 select! {
152 maybe_event = input_rx.recv(), if !input_finished => match maybe_event {
158 Some(event) => {
159 let mut outstanding_events = outstanding_events.lock().await;
160 outstanding_events.push_back(event);
161 },
162 None => {
163 info!("HTTP server external input resource input is finished.");
164 input_finished = true;
165 },
166 },
167
168 _ = resource_notifier.notified() => {
169 if input_finished {
176 let outstanding_events = outstanding_events.lock().await;
177 if outstanding_events.is_empty() {
178 break
179 }
180 }
181 },
182 }
183 }
184 info!("HTTP server external input resource signalling ready for shutdown.");
187
188 resource_shutdown_rx.wait().await;
190
191 _ = http_server_shutdown_tx.send(());
193
194 info!("HTTP server external input resource marking as done.");
195 resource_completed.mark_as_done();
196
197 info!("HTTP server external input resource completed.");
198 });
199}
200
201fn spawn_input_http_client(
203 config: HttpResourceConfig,
204 codec: ResourceCodec,
205 mut input_rx: mpsc::Receiver<TestEvent>,
206 task_coordinator: &TaskCoordinator<Configuring>,
207 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
208) {
209 let started = task_coordinator.track_started();
212 let completed = task_coordinator.track_completed();
213 let mut encoder = codec.into_encoder();
214 let runner_metrics = Arc::clone(runner_metrics);
215
216 tokio::spawn(async move {
217 started.mark_as_done();
220 info!("HTTP client external input resource started.");
221
222 let client = Client::builder().build_http::<Body>();
223 let request_uri = config.uri;
224 let request_method = config.method.unwrap_or(Method::POST);
225 let headers = config.headers.unwrap_or_default();
226
227 while let Some(event) = input_rx.recv().await {
228 debug!("Got event to send from runner.");
229
230 let mut buffer = BytesMut::new();
231
232 let is_json = matches!(encoder.serializer(), Json(_))
233 && matches!(
234 encoder.framer(),
235 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })
236 );
237
238 if is_json {
239 buffer.put_u8(b'[');
240 }
241
242 encode_test_event(&mut encoder, &mut buffer, event);
243
244 if is_json {
245 if !buffer.is_empty() {
246 buffer.truncate(buffer.len() - 1);
248 }
249 buffer.put_u8(b']');
250
251 let mut runner_metrics = runner_metrics.lock().await;
254 runner_metrics.sent_bytes_total += 1;
255 }
256
257 let mut request_builder = Request::builder()
258 .uri(request_uri.clone())
259 .method(request_method.clone());
260
261 for (key, value) in &headers {
262 request_builder = request_builder.header(key, value);
263 }
264
265 let request = request_builder
266 .body(buffer.freeze().into())
267 .expect("should not fail to build request");
268
269 match client.request(request).await {
270 Ok(_response) => {
271 debug!("Got response from server.");
273 }
274 Err(e) => {
275 error!("Failed to send request: {}", e);
277 }
278 }
279 }
280
281 completed.mark_as_done();
283
284 info!("HTTP client external input resource completed.");
285 });
286}
287
288pub struct HttpResourceOutputContext<'a> {
290 pub direction: ResourceDirection,
291 pub codec: ResourceCodec,
292 pub output_tx: mpsc::Sender<Vec<Event>>,
293 pub task_coordinator: &'a TaskCoordinator<Configuring>,
294 pub input_events: Vec<TestEvent>,
295 pub runner_metrics: &'a Arc<Mutex<RunnerMetrics>>,
296 pub log_namespace: LogNamespace,
297}
298
299impl HttpResourceOutputContext<'_> {
300 #[allow(clippy::missing_const_for_fn)]
302 fn spawn_output_http_server(&self, config: HttpResourceConfig) -> vector_lib::Result<()> {
303 let decoder = self.codec.into_decoder(self.log_namespace)?;
308
309 let should_reject = self
316 .input_events
317 .iter()
318 .filter(|te| te.should_reject())
319 .count()
320 > 0;
321
322 let output_tx = self.output_tx.clone();
323 let (_, http_server_shutdown_tx) = spawn_http_server(
324 self.task_coordinator,
325 &config,
326 self.runner_metrics,
327 move |request, output_runner_metrics| {
328 let output_tx = output_tx.clone();
329 let mut decoder = decoder.clone();
330
331 async move {
332 match request.into_body().collect().await.map(Collected::to_bytes) {
333 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
334 Ok(body) => {
335 let byte_size = body.len();
336 let mut body = BytesMut::from(&body[..]);
337 loop {
338 match decoder.decode_eof(&mut body) {
339 Ok(Some((events, decoded_byte_size))) => {
342 if should_reject {
343 info!(
344 "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.",
345 );
346 } else {
347 let mut output_runner_metrics =
348 output_runner_metrics.lock().await;
349 info!(
350 "HTTP server external output resource decoded {decoded_byte_size:?} bytes."
351 );
352
353 output_runner_metrics.received_bytes_total +=
356 byte_size as u64;
357
358 output_runner_metrics.received_events_total +=
359 events.len() as u64;
360
361 events.iter().for_each(|event| {
362 output_runner_metrics.received_event_bytes_total +=
363 event.estimated_json_encoded_size_of().get()
364 as u64;
365 });
366
367 output_tx
368 .send(events.to_vec())
369 .await
370 .expect("should not fail to send output event");
371 }
372 }
373 Ok(None) => {
374 if should_reject {
375 return StatusCode::BAD_REQUEST.into_response();
378 } else {
379 return StatusCode::OK.into_response();
380 }
381 }
382 Err(_) => {
383 error!(
384 "HTTP server failed to decode {:?}",
385 String::from_utf8_lossy(&body)
386 );
387 return StatusCode::INTERNAL_SERVER_ERROR.into_response();
388 }
389 }
390 }
391 }
392 }
393 }
394 },
395 );
396
397 let resource_started = self.task_coordinator.track_started();
400 let resource_completed = self.task_coordinator.track_completed();
401 let mut resource_shutdown_rx = self.task_coordinator.register_for_shutdown();
402
403 tokio::spawn(async move {
404 resource_started.mark_as_done();
405 info!("HTTP server external output resource started.");
406
407 resource_shutdown_rx.wait().await;
409
410 let _ = http_server_shutdown_tx.send(());
412
413 resource_completed.mark_as_done();
415
416 info!("HTTP server external output resource completed.");
417 });
418
419 Ok(())
420 }
421
422 #[allow(clippy::missing_const_for_fn)]
424 fn spawn_output_http_client(&self, _config: HttpResourceConfig) {
425 todo!()
429 }
430}
431
432fn spawn_http_server<H, F, R>(
433 task_coordinator: &TaskCoordinator<Configuring>,
434 config: &HttpResourceConfig,
435 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
436 handler: H,
437) -> (Arc<Notify>, oneshot::Sender<()>)
438where
439 H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
440 F: Future<Output = R> + Send,
441 R: IntoResponse,
442{
443 let http_server_started = task_coordinator.track_started();
444 let http_server_completed = task_coordinator.track_completed();
445
446 let listen_addr = socketaddr_from_uri(&config.uri);
447 let request_path = config
448 .uri
449 .path_and_query()
450 .map(|pq| pq.as_str().to_string())
451 .unwrap_or_else(|| "/".to_string());
452 let request_method = config.method.clone().unwrap_or(Method::POST);
453
454 let (http_server_shutdown_tx, http_server_shutdown_rx) = oneshot::channel();
461 let resource_notifier = Arc::new(Notify::new());
462 let server_notifier = Arc::clone(&resource_notifier);
463
464 let output_runner_metrics = Arc::clone(runner_metrics);
465
466 tokio::spawn(async move {
467 let server_builder =
470 Server::try_bind(&listen_addr).expect("Failed to bind to listen address.");
471
472 let method_filter = MethodFilter::try_from(request_method)
480 .expect("should not fail to convert method to method filter");
481 let method_router = MethodRouter::new()
482 .fallback(|req: Request<Body>| async move {
483 error!(
484 path = req.uri().path(),
485 method = req.method().as_str(),
486 "Component sent request to a different path/method than expected."
487 );
488
489 StatusCode::METHOD_NOT_ALLOWED
490 })
491 .on(method_filter, move |request: Request<Body>| {
492 let request_handler = handler(request, output_runner_metrics);
493 let notifier = Arc::clone(&server_notifier);
494
495 async move {
496 let response = request_handler.await;
497 notifier.notify_one();
498 response
499 }
500 });
501
502 let router = Router::new().route(&request_path, method_router).fallback(
503 |req: Request<Body>| async move {
504 error!(?req, "Component sent request the server could not route.");
505 StatusCode::NOT_FOUND
506 },
507 );
508
509 http_server_started.mark_as_done();
511
512 let server = server_builder
513 .serve(router.into_make_service())
514 .with_graceful_shutdown(async {
515 http_server_shutdown_rx.await.ok();
516 });
517
518 if let Err(e) = server.await {
519 error!(error = ?e, "HTTP server encountered an error.");
520 }
521
522 http_server_completed.mark_as_done();
523 });
524
525 (resource_notifier, http_server_shutdown_tx)
526}
527
528fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
529 let uri_port = uri.port_u16().unwrap_or(80);
530 let uri_host = uri
531 .host()
532 .ok_or_else(|| "host must be present in URI".to_string())
533 .and_then(|host| {
534 IpAddr::from_str(host)
535 .map_err(|_| "URI host must be valid IPv4/IPv6 address".to_string())
536 })
537 .expect("HTTP URI not valid");
538
539 SocketAddr::from((uri_host, uri_port))
540}