vector/components/validation/resources/
http.rs1use std::{
2 collections::{HashMap, VecDeque},
3 future::Future,
4 net::{IpAddr, SocketAddr},
5 str::FromStr,
6 sync::Arc,
7};
8
9use axum::{
10 Router,
11 response::IntoResponse,
12 routing::{MethodFilter, MethodRouter},
13};
14use bytes::{BufMut as _, BytesMut};
15use http::{Method, Request, StatusCode, Uri};
16use hyper::{Body, Client, Server};
17use tokio::{
18 select,
19 sync::{Mutex, Notify, mpsc, oneshot},
20};
21use tokio_util::codec::Decoder;
22use vector_lib::{
23 EstimatedJsonEncodedSizeOf,
24 codecs::{
25 CharacterDelimitedEncoder,
26 encoding::{Framer, Serializer::Json},
27 },
28 config::LogNamespace,
29 event::Event,
30};
31
32use super::{ResourceCodec, ResourceDirection, TestEvent, encode_test_event};
33use crate::components::validation::{
34 RunnerMetrics,
35 sync::{Configuring, TaskCoordinator},
36};
37
38#[derive(Clone)]
40pub struct HttpResourceConfig {
41 uri: Uri,
42 method: Option<Method>,
43 headers: Option<HashMap<String, String>>,
44}
45
46impl HttpResourceConfig {
47 pub const fn from_parts(uri: Uri, method: Option<Method>) -> Self {
48 Self {
49 uri,
50 method,
51 headers: None,
52 }
53 }
54
55 pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
56 self.headers = Some(headers);
57 self
58 }
59
60 pub fn spawn_as_input(
61 self,
62 direction: ResourceDirection,
63 codec: ResourceCodec,
64 input_rx: mpsc::Receiver<TestEvent>,
65 task_coordinator: &TaskCoordinator<Configuring>,
66 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
67 ) {
68 match direction {
69 ResourceDirection::Pull => {
71 spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
72 }
73 ResourceDirection::Push => {
75 spawn_input_http_client(self, codec, input_rx, task_coordinator, runner_metrics)
76 }
77 }
78 }
79
80 pub fn spawn_as_output(self, ctx: HttpResourceOutputContext) -> vector_lib::Result<()> {
81 match ctx.direction {
82 ResourceDirection::Pull => Ok(ctx.spawn_output_http_client(self)),
84 ResourceDirection::Push => ctx.spawn_output_http_server(self),
86 }
87 }
88}
89
90#[allow(clippy::missing_const_for_fn)]
92fn spawn_input_http_server(
93 config: HttpResourceConfig,
94 codec: ResourceCodec,
95 mut input_rx: mpsc::Receiver<TestEvent>,
96 task_coordinator: &TaskCoordinator<Configuring>,
97 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
98) {
99 let outstanding_events = Arc::new(Mutex::new(VecDeque::new()));
104
105 let encoder = codec.into_encoder();
107 let sendable_events = Arc::clone(&outstanding_events);
108
109 let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
110 task_coordinator,
111 &config,
112 runner_metrics,
113 move |_request, _runner_metrics| {
114 let sendable_events = Arc::clone(&sendable_events);
115 let mut encoder = encoder.clone();
116
117 async move {
118 let mut sendable_events = sendable_events.lock().await;
119 match sendable_events.pop_front() {
120 Some(event) => {
121 let mut buffer = BytesMut::new();
122 encode_test_event(&mut encoder, &mut buffer, event);
123
124 buffer.into_response()
125 }
126 _ => {
127 StatusCode::OK.into_response()
131 }
132 }
133 }
134 },
135 );
136
137 let resource_started = task_coordinator.track_started();
140 let resource_completed = task_coordinator.track_completed();
141 let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();
142
143 tokio::spawn(async move {
144 resource_started.mark_as_done();
145 info!("HTTP server external input resource started.");
146
147 let mut input_finished = false;
148
149 loop {
150 select! {
151 maybe_event = input_rx.recv(), if !input_finished => match maybe_event {
157 Some(event) => {
158 let mut outstanding_events = outstanding_events.lock().await;
159 outstanding_events.push_back(event);
160 },
161 None => {
162 info!("HTTP server external input resource input is finished.");
163 input_finished = true;
164 },
165 },
166
167 _ = resource_notifier.notified() => {
168 if input_finished {
175 let outstanding_events = outstanding_events.lock().await;
176 if outstanding_events.is_empty() {
177 break
178 }
179 }
180 },
181 }
182 }
183 info!("HTTP server external input resource signalling ready for shutdown.");
186
187 resource_shutdown_rx.wait().await;
189
190 _ = http_server_shutdown_tx.send(());
192
193 info!("HTTP server external input resource marking as done.");
194 resource_completed.mark_as_done();
195
196 info!("HTTP server external input resource completed.");
197 });
198}
199
200fn spawn_input_http_client(
202 config: HttpResourceConfig,
203 codec: ResourceCodec,
204 mut input_rx: mpsc::Receiver<TestEvent>,
205 task_coordinator: &TaskCoordinator<Configuring>,
206 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
207) {
208 let started = task_coordinator.track_started();
211 let completed = task_coordinator.track_completed();
212 let mut encoder = codec.into_encoder();
213 let runner_metrics = Arc::clone(runner_metrics);
214
215 tokio::spawn(async move {
216 started.mark_as_done();
219 info!("HTTP client external input resource started.");
220
221 let client = Client::builder().build_http::<Body>();
222 let request_uri = config.uri;
223 let request_method = config.method.unwrap_or(Method::POST);
224 let headers = config.headers.unwrap_or_default();
225
226 while let Some(event) = input_rx.recv().await {
227 debug!("Got event to send from runner.");
228
229 let mut buffer = BytesMut::new();
230
231 let is_json = matches!(encoder.serializer(), Json(_))
232 && matches!(
233 encoder.framer(),
234 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })
235 );
236
237 if is_json {
238 buffer.put_u8(b'[');
239 }
240
241 encode_test_event(&mut encoder, &mut buffer, event);
242
243 if is_json {
244 if !buffer.is_empty() {
245 buffer.truncate(buffer.len() - 1);
247 }
248 buffer.put_u8(b']');
249
250 let mut runner_metrics = runner_metrics.lock().await;
253 runner_metrics.sent_bytes_total += 1;
254 }
255
256 let mut request_builder = Request::builder()
257 .uri(request_uri.clone())
258 .method(request_method.clone());
259
260 for (key, value) in &headers {
261 request_builder = request_builder.header(key, value);
262 }
263
264 let request = request_builder
265 .body(buffer.freeze().into())
266 .expect("should not fail to build request");
267
268 match client.request(request).await {
269 Ok(_response) => {
270 debug!("Got response from server.");
272 }
273 Err(e) => {
274 error!("Failed to send request: {}", e);
276 }
277 }
278 }
279
280 completed.mark_as_done();
282
283 info!("HTTP client external input resource completed.");
284 });
285}
286
287pub struct HttpResourceOutputContext<'a> {
289 pub direction: ResourceDirection,
290 pub codec: ResourceCodec,
291 pub output_tx: mpsc::Sender<Vec<Event>>,
292 pub task_coordinator: &'a TaskCoordinator<Configuring>,
293 pub input_events: Vec<TestEvent>,
294 pub runner_metrics: &'a Arc<Mutex<RunnerMetrics>>,
295 pub log_namespace: LogNamespace,
296}
297
298impl HttpResourceOutputContext<'_> {
299 #[allow(clippy::missing_const_for_fn)]
301 fn spawn_output_http_server(&self, config: HttpResourceConfig) -> vector_lib::Result<()> {
302 let decoder = self.codec.into_decoder(self.log_namespace)?;
307
308 let should_reject = self
315 .input_events
316 .iter()
317 .filter(|te| te.should_reject())
318 .count()
319 > 0;
320
321 let output_tx = self.output_tx.clone();
322 let (_, http_server_shutdown_tx) = spawn_http_server(
323 self.task_coordinator,
324 &config,
325 self.runner_metrics,
326 move |request, output_runner_metrics| {
327 let output_tx = output_tx.clone();
328 let mut decoder = decoder.clone();
329
330 async move {
331 match hyper::body::to_bytes(request.into_body()).await {
332 Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
333 Ok(body) => {
334 let byte_size = body.len();
335 let mut body = BytesMut::from(&body[..]);
336 loop {
337 match decoder.decode_eof(&mut body) {
338 Ok(Some((events, decoded_byte_size))) => {
341 if should_reject {
342 info!(
343 internal_log_rate_limit = true,
344 "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.",
345 );
346 } else {
347 let mut output_runner_metrics =
348 output_runner_metrics.lock().await;
349 info!(
350 internal_log_rate_limit = true,
351 "HTTP server external output resource decoded {decoded_byte_size:?} bytes."
352 );
353
354 output_runner_metrics.received_bytes_total +=
357 byte_size as u64;
358
359 output_runner_metrics.received_events_total +=
360 events.len() as u64;
361
362 events.iter().for_each(|event| {
363 output_runner_metrics.received_event_bytes_total +=
364 event.estimated_json_encoded_size_of().get()
365 as u64;
366 });
367
368 output_tx
369 .send(events.to_vec())
370 .await
371 .expect("should not fail to send output event");
372 }
373 }
374 Ok(None) => {
375 if should_reject {
376 return StatusCode::BAD_REQUEST.into_response();
379 } else {
380 return StatusCode::OK.into_response();
381 }
382 }
383 Err(_) => {
384 error!(
385 "HTTP server failed to decode {:?}",
386 String::from_utf8_lossy(&body)
387 );
388 return StatusCode::INTERNAL_SERVER_ERROR.into_response();
389 }
390 }
391 }
392 }
393 }
394 }
395 },
396 );
397
398 let resource_started = self.task_coordinator.track_started();
401 let resource_completed = self.task_coordinator.track_completed();
402 let mut resource_shutdown_rx = self.task_coordinator.register_for_shutdown();
403
404 tokio::spawn(async move {
405 resource_started.mark_as_done();
406 info!("HTTP server external output resource started.");
407
408 resource_shutdown_rx.wait().await;
410
411 let _ = http_server_shutdown_tx.send(());
413
414 resource_completed.mark_as_done();
416
417 info!("HTTP server external output resource completed.");
418 });
419
420 Ok(())
421 }
422
423 #[allow(clippy::missing_const_for_fn)]
425 fn spawn_output_http_client(&self, _config: HttpResourceConfig) {
426 todo!()
430 }
431}
432
433fn spawn_http_server<H, F, R>(
434 task_coordinator: &TaskCoordinator<Configuring>,
435 config: &HttpResourceConfig,
436 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
437 handler: H,
438) -> (Arc<Notify>, oneshot::Sender<()>)
439where
440 H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
441 F: Future<Output = R> + Send,
442 R: IntoResponse,
443{
444 let http_server_started = task_coordinator.track_started();
445 let http_server_completed = task_coordinator.track_completed();
446
447 let listen_addr = socketaddr_from_uri(&config.uri);
448 let request_path = config
449 .uri
450 .path_and_query()
451 .map(|pq| pq.as_str().to_string())
452 .unwrap_or_else(|| "/".to_string());
453 let request_method = config.method.clone().unwrap_or(Method::POST);
454
455 let (http_server_shutdown_tx, http_server_shutdown_rx) = oneshot::channel();
462 let resource_notifier = Arc::new(Notify::new());
463 let server_notifier = Arc::clone(&resource_notifier);
464
465 let output_runner_metrics = Arc::clone(runner_metrics);
466
467 tokio::spawn(async move {
468 let server_builder =
471 Server::try_bind(&listen_addr).expect("Failed to bind to listen address.");
472
473 let method_filter = MethodFilter::try_from(request_method)
481 .expect("should not fail to convert method to method filter");
482 let method_router = MethodRouter::new()
483 .fallback(|req: Request<Body>| async move {
484 error!(
485 path = req.uri().path(),
486 method = req.method().as_str(),
487 "Component sent request to a different path/method than expected."
488 );
489
490 StatusCode::METHOD_NOT_ALLOWED
491 })
492 .on(method_filter, move |request: Request<Body>| {
493 let request_handler = handler(request, output_runner_metrics);
494 let notifier = Arc::clone(&server_notifier);
495
496 async move {
497 let response = request_handler.await;
498 notifier.notify_one();
499 response
500 }
501 });
502
503 let router = Router::new().route(&request_path, method_router).fallback(
504 |req: Request<Body>| async move {
505 error!(?req, "Component sent request the server could not route.");
506 StatusCode::NOT_FOUND
507 },
508 );
509
510 http_server_started.mark_as_done();
512
513 let server = server_builder
514 .serve(router.into_make_service())
515 .with_graceful_shutdown(async {
516 http_server_shutdown_rx.await.ok();
517 });
518
519 if let Err(e) = server.await {
520 error!(error = ?e, "HTTP server encountered an error.");
521 }
522
523 http_server_completed.mark_as_done();
524 });
525
526 (resource_notifier, http_server_shutdown_tx)
527}
528
529fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
530 let uri_port = uri.port_u16().unwrap_or(80);
531 let uri_host = uri
532 .host()
533 .ok_or_else(|| "host must be present in URI".to_string())
534 .and_then(|host| {
535 IpAddr::from_str(host)
536 .map_err(|_| "URI host must be valid IPv4/IPv6 address".to_string())
537 })
538 .expect("HTTP URI not valid");
539
540 SocketAddr::from((uri_host, uri_port))
541}