vector/components/validation/resources/
http.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    future::Future,
4    net::{IpAddr, SocketAddr},
5    str::FromStr,
6    sync::Arc,
7};
8
9use axum::{
10    Router,
11    response::IntoResponse,
12    routing::{MethodFilter, MethodRouter},
13};
14use bytes::{BufMut as _, BytesMut};
15use http::{Method, Request, StatusCode, Uri};
16use http_body::{Body as _, Collected};
17use hyper::{Body, Client, Server};
18use tokio::{
19    select,
20    sync::{Mutex, Notify, mpsc, oneshot},
21};
22use tokio_util::codec::Decoder;
23use vector_lib::{
24    EstimatedJsonEncodedSizeOf,
25    codecs::{
26        CharacterDelimitedEncoder,
27        encoding::{Framer, Serializer::Json},
28    },
29    config::LogNamespace,
30    event::Event,
31};
32
33use super::{ResourceCodec, ResourceDirection, TestEvent, encode_test_event};
34use crate::components::validation::{
35    RunnerMetrics,
36    sync::{Configuring, TaskCoordinator},
37};
38
39/// An HTTP resource.
40#[derive(Clone)]
41pub struct HttpResourceConfig {
42    uri: Uri,
43    method: Option<Method>,
44    headers: Option<HashMap<String, String>>,
45}
46
47impl HttpResourceConfig {
48    pub const fn from_parts(uri: Uri, method: Option<Method>) -> Self {
49        Self {
50            uri,
51            method,
52            headers: None,
53        }
54    }
55
56    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
57        self.headers = Some(headers);
58        self
59    }
60
61    pub fn spawn_as_input(
62        self,
63        direction: ResourceDirection,
64        codec: ResourceCodec,
65        input_rx: mpsc::Receiver<TestEvent>,
66        task_coordinator: &TaskCoordinator<Configuring>,
67        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
68    ) {
69        match direction {
70            // The source will pull data from us.
71            ResourceDirection::Pull => {
72                spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
73            }
74            // We'll push data to the source.
75            ResourceDirection::Push => {
76                spawn_input_http_client(self, codec, input_rx, task_coordinator, runner_metrics)
77            }
78        }
79    }
80
81    pub fn spawn_as_output(self, ctx: HttpResourceOutputContext) -> vector_lib::Result<()> {
82        match ctx.direction {
83            // We'll pull data from the sink.
84            ResourceDirection::Pull => Ok(ctx.spawn_output_http_client(self)),
85            // The sink will push data to us.
86            ResourceDirection::Push => ctx.spawn_output_http_server(self),
87        }
88    }
89}
90
91/// Spawns an HTTP server that a source will make requests to in order to get events.
92#[allow(clippy::missing_const_for_fn)]
93fn spawn_input_http_server(
94    config: HttpResourceConfig,
95    codec: ResourceCodec,
96    mut input_rx: mpsc::Receiver<TestEvent>,
97    task_coordinator: &TaskCoordinator<Configuring>,
98    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
99) {
100    // This HTTP server will poll the input receiver for input events and buffer them. When a
101    // request comes in on the right path/method, one buffered input event will be sent back. If no
102    // buffered events are available when the request arrives, an empty response (204 No Content) is
103    // returned to the caller.
104    let outstanding_events = Arc::new(Mutex::new(VecDeque::new()));
105
106    // First, we'll build and spawn our HTTP server.
107    let encoder = codec.into_encoder();
108    let sendable_events = Arc::clone(&outstanding_events);
109
110    let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
111        task_coordinator,
112        &config,
113        runner_metrics,
114        move |_request, _runner_metrics| {
115            let sendable_events = Arc::clone(&sendable_events);
116            let mut encoder = encoder.clone();
117
118            async move {
119                let mut sendable_events = sendable_events.lock().await;
120                match sendable_events.pop_front() {
121                    Some(event) => {
122                        let mut buffer = BytesMut::new();
123                        encode_test_event(&mut encoder, &mut buffer, event);
124
125                        buffer.into_response()
126                    }
127                    _ => {
128                        // We'll send an empty 200 in the response since some
129                        // sources throw errors for anything other than a valid
130                        // response.
131                        StatusCode::OK.into_response()
132                    }
133                }
134            }
135        },
136    );
137
138    // Now we'll create and spawn the resource's core logic loop which drives the buffering of input
139    // events and working with the HTTP server as they're consumed.
140    let resource_started = task_coordinator.track_started();
141    let resource_completed = task_coordinator.track_completed();
142    let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();
143
144    tokio::spawn(async move {
145        resource_started.mark_as_done();
146        info!("HTTP server external input resource started.");
147
148        let mut input_finished = false;
149
150        loop {
151            select! {
152                // Handle input events being sent to us from the runner.
153                //
154                // When the channel closes, we'll mark the input as being finished so that we know
155                // to close the external resource itself once the HTTP server has consumed/sent all
156                // outstanding events.
157                maybe_event = input_rx.recv(), if !input_finished => match maybe_event {
158                    Some(event) => {
159                        let mut outstanding_events = outstanding_events.lock().await;
160                        outstanding_events.push_back(event);
161                    },
162                    None => {
163                        info!("HTTP server external input resource input is finished.");
164                        input_finished = true;
165                    },
166                },
167
168                _ = resource_notifier.notified() => {
169                    // The HTTP server notified us that it made progress with a send, which is
170                    // specifically that it serviced a request which returned a non-zero number of
171                    // events.
172                    //
173                    // This indicates that we need to check and see if our input is completed --
174                    // channel closed, no outstanding events left -- and thus if it's time to close.
175                    if input_finished {
176                        let outstanding_events = outstanding_events.lock().await;
177                        if outstanding_events.is_empty() {
178                            break
179                        }
180                    }
181                },
182            }
183        }
184        // Mark ourselves as completed now that we've sent all inputs to the source, and
185        // additionally signal the HTTP server to also gracefully shutdown.
186        info!("HTTP server external input resource signalling ready for shutdown.");
187
188        // Wait for the runner to signal us to shutdown
189        resource_shutdown_rx.wait().await;
190
191        // Shutdown the server
192        _ = http_server_shutdown_tx.send(());
193
194        info!("HTTP server external input resource marking as done.");
195        resource_completed.mark_as_done();
196
197        info!("HTTP server external input resource completed.");
198    });
199}
200
201/// Spawns an HTTP client that pushes events to a source which is accepting events over HTTP.
202fn spawn_input_http_client(
203    config: HttpResourceConfig,
204    codec: ResourceCodec,
205    mut input_rx: mpsc::Receiver<TestEvent>,
206    task_coordinator: &TaskCoordinator<Configuring>,
207    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
208) {
209    // Spin up an HTTP client that will push the input data to the source on a
210    // request-per-input-item basis. This runs serially and has no parallelism.
211    let started = task_coordinator.track_started();
212    let completed = task_coordinator.track_completed();
213    let mut encoder = codec.into_encoder();
214    let runner_metrics = Arc::clone(runner_metrics);
215
216    tokio::spawn(async move {
217        // Mark ourselves as started. We don't actually do anything until we get our first input
218        // message, though.
219        started.mark_as_done();
220        info!("HTTP client external input resource started.");
221
222        let client = Client::builder().build_http::<Body>();
223        let request_uri = config.uri;
224        let request_method = config.method.unwrap_or(Method::POST);
225        let headers = config.headers.unwrap_or_default();
226
227        while let Some(event) = input_rx.recv().await {
228            debug!("Got event to send from runner.");
229
230            let mut buffer = BytesMut::new();
231
232            let is_json = matches!(encoder.serializer(), Json(_))
233                && matches!(
234                    encoder.framer(),
235                    Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })
236                );
237
238            if is_json {
239                buffer.put_u8(b'[');
240            }
241
242            encode_test_event(&mut encoder, &mut buffer, event);
243
244            if is_json {
245                if !buffer.is_empty() {
246                    // remove trailing comma from last record
247                    buffer.truncate(buffer.len() - 1);
248                }
249                buffer.put_u8(b']');
250
251                // in this edge case we have removed the trailing comma (one byte) and added
252                // opening and closing braces (2 bytes) for a net add of one byte.
253                let mut runner_metrics = runner_metrics.lock().await;
254                runner_metrics.sent_bytes_total += 1;
255            }
256
257            let mut request_builder = Request::builder()
258                .uri(request_uri.clone())
259                .method(request_method.clone());
260
261            for (key, value) in &headers {
262                request_builder = request_builder.header(key, value);
263            }
264
265            let request = request_builder
266                .body(buffer.freeze().into())
267                .expect("should not fail to build request");
268
269            match client.request(request).await {
270                Ok(_response) => {
271                    // TODO: Emit metric that tracks a successful response from the HTTP server.
272                    debug!("Got response from server.");
273                }
274                Err(e) => {
275                    // TODO: Emit metric that tracks a failed response from the HTTP server.
276                    error!("Failed to send request: {}", e);
277                }
278            }
279        }
280
281        // Mark ourselves as completed now that we've sent all inputs to the source.
282        completed.mark_as_done();
283
284        info!("HTTP client external input resource completed.");
285    });
286}
287
288/// Anything that the output side HTTP external resource needs
289pub struct HttpResourceOutputContext<'a> {
290    pub direction: ResourceDirection,
291    pub codec: ResourceCodec,
292    pub output_tx: mpsc::Sender<Vec<Event>>,
293    pub task_coordinator: &'a TaskCoordinator<Configuring>,
294    pub input_events: Vec<TestEvent>,
295    pub runner_metrics: &'a Arc<Mutex<RunnerMetrics>>,
296    pub log_namespace: LogNamespace,
297}
298
299impl HttpResourceOutputContext<'_> {
300    /// Spawns an HTTP server that accepts events sent by a sink.
301    #[allow(clippy::missing_const_for_fn)]
302    fn spawn_output_http_server(&self, config: HttpResourceConfig) -> vector_lib::Result<()> {
303        // This HTTP server will wait for events to be sent by a sink, and collect them and send them on
304        // via an output sender. We accept/collect events until we're told to shutdown.
305
306        // First, we'll build and spawn our HTTP server.
307        let decoder = self.codec.into_decoder(self.log_namespace)?;
308
309        // Note that we currently don't differentiate which events should and shouldn't be rejected-
310        // we reject all events in this server if any are marked for rejection.
311        // In the future it might be useful to be able to select which to reject. That will involve
312        // adding logic to the test case which is passed down here, and to the event itself. Since
313        // we can't guarantee the order of events, we'd need a way to flag which ones need to be
314        // rejected.
315        let should_reject = self
316            .input_events
317            .iter()
318            .filter(|te| te.should_reject())
319            .count()
320            > 0;
321
322        let output_tx = self.output_tx.clone();
323        let (_, http_server_shutdown_tx) = spawn_http_server(
324            self.task_coordinator,
325            &config,
326            self.runner_metrics,
327            move |request, output_runner_metrics| {
328                let output_tx = output_tx.clone();
329                let mut decoder = decoder.clone();
330
331                async move {
332                    match request.into_body().collect().await.map(Collected::to_bytes) {
333                        Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
334                        Ok(body) => {
335                            let byte_size = body.len();
336                            let mut body = BytesMut::from(&body[..]);
337                            loop {
338                                match decoder.decode_eof(&mut body) {
339                                    // `decoded_byte_size` is the decoded size of an individual frame. `byte_size` represents the size of the
340                                    // entire payload which may contain multiple frames and their delimiters.
341                                    Ok(Some((events, decoded_byte_size))) => {
342                                        if should_reject {
343                                            info!(
344                                                "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.",
345                                            );
346                                        } else {
347                                            let mut output_runner_metrics =
348                                                output_runner_metrics.lock().await;
349                                            info!(
350                                                "HTTP server external output resource decoded {decoded_byte_size:?} bytes."
351                                            );
352
353                                            // Update the runner metrics for the received events. This will later
354                                            // be used in the Validators, as the "expected" case.
355                                            output_runner_metrics.received_bytes_total +=
356                                                byte_size as u64;
357
358                                            output_runner_metrics.received_events_total +=
359                                                events.len() as u64;
360
361                                            events.iter().for_each(|event| {
362                                                output_runner_metrics.received_event_bytes_total +=
363                                                    event.estimated_json_encoded_size_of().get()
364                                                        as u64;
365                                            });
366
367                                            output_tx
368                                                .send(events.to_vec())
369                                                .await
370                                                .expect("should not fail to send output event");
371                                        }
372                                    }
373                                    Ok(None) => {
374                                        if should_reject {
375                                            // This status code is not retried and should result in the component under test
376                                            // emitting error events
377                                            return StatusCode::BAD_REQUEST.into_response();
378                                        } else {
379                                            return StatusCode::OK.into_response();
380                                        }
381                                    }
382                                    Err(_) => {
383                                        error!(
384                                            "HTTP server failed to decode {:?}",
385                                            String::from_utf8_lossy(&body)
386                                        );
387                                        return StatusCode::INTERNAL_SERVER_ERROR.into_response();
388                                    }
389                                }
390                            }
391                        }
392                    }
393                }
394            },
395        );
396
397        // Now we'll create and spawn the resource's core logic loop which simply waits for the runner
398        // to instruct us to shutdown, and when that happens, cascades to shutting down the HTTP server.
399        let resource_started = self.task_coordinator.track_started();
400        let resource_completed = self.task_coordinator.track_completed();
401        let mut resource_shutdown_rx = self.task_coordinator.register_for_shutdown();
402
403        tokio::spawn(async move {
404            resource_started.mark_as_done();
405            info!("HTTP server external output resource started.");
406
407            // Wait for the runner to tell us to shutdown
408            resource_shutdown_rx.wait().await;
409
410            // signal the server to shutdown
411            let _ = http_server_shutdown_tx.send(());
412
413            // mark ourselves as done
414            resource_completed.mark_as_done();
415
416            info!("HTTP server external output resource completed.");
417        });
418
419        Ok(())
420    }
421
422    /// Spawns an HTTP client that pulls events by making requests to an HTTP server driven by a sink.
423    #[allow(clippy::missing_const_for_fn)]
424    fn spawn_output_http_client(&self, _config: HttpResourceConfig) {
425        // TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be
426        // scraped... but since we need special logic to aggregate/deduplicate scraped metrics, we can't
427        // use this generically for that purpose.
428        todo!()
429    }
430}
431
432fn spawn_http_server<H, F, R>(
433    task_coordinator: &TaskCoordinator<Configuring>,
434    config: &HttpResourceConfig,
435    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
436    handler: H,
437) -> (Arc<Notify>, oneshot::Sender<()>)
438where
439    H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
440    F: Future<Output = R> + Send,
441    R: IntoResponse,
442{
443    let http_server_started = task_coordinator.track_started();
444    let http_server_completed = task_coordinator.track_completed();
445
446    let listen_addr = socketaddr_from_uri(&config.uri);
447    let request_path = config
448        .uri
449        .path_and_query()
450        .map(|pq| pq.as_str().to_string())
451        .unwrap_or_else(|| "/".to_string());
452    let request_method = config.method.clone().unwrap_or(Method::POST);
453
454    // Create our synchronization primitives that are shared between the HTTP server and the
455    // resource's core logic loop.
456    //
457    // This will let the resource be able to trigger the HTTP server to gracefully shutdown, as well
458    // as be notified when the HTTP server has served a request, so that it can check if all input
459    // events have been sent yet.
460    let (http_server_shutdown_tx, http_server_shutdown_rx) = oneshot::channel();
461    let resource_notifier = Arc::new(Notify::new());
462    let server_notifier = Arc::clone(&resource_notifier);
463
464    let output_runner_metrics = Arc::clone(runner_metrics);
465
466    tokio::spawn(async move {
467        // Create our HTTP server by binding as early as possible to return an error if we can't
468        // actually bind.
469        let server_builder =
470            Server::try_bind(&listen_addr).expect("Failed to bind to listen address.");
471
472        // Create our router, which is a bit boilerplate-y because we take the HTTP method
473        // parametrically. We generate a handler that calls the given `handler` and then triggers
474        // the notifier shared by the HTTP server and the resource's core logic loop.
475        //
476        // Every time a request is processed, we notify the core logic loop so it can continue
477        // checking to see if it's time to fully close once all input events have been consumed and
478        // the input receiver is closed.
479        let method_filter = MethodFilter::try_from(request_method)
480            .expect("should not fail to convert method to method filter");
481        let method_router = MethodRouter::new()
482            .fallback(|req: Request<Body>| async move {
483                error!(
484                    path = req.uri().path(),
485                    method = req.method().as_str(),
486                    "Component sent request to a different path/method than expected."
487                );
488
489                StatusCode::METHOD_NOT_ALLOWED
490            })
491            .on(method_filter, move |request: Request<Body>| {
492                let request_handler = handler(request, output_runner_metrics);
493                let notifier = Arc::clone(&server_notifier);
494
495                async move {
496                    let response = request_handler.await;
497                    notifier.notify_one();
498                    response
499                }
500            });
501
502        let router = Router::new().route(&request_path, method_router).fallback(
503            |req: Request<Body>| async move {
504                error!(?req, "Component sent request the server could not route.");
505                StatusCode::NOT_FOUND
506            },
507        );
508
509        // Now actually run/drive the HTTP server and process requests until we're told to shutdown.
510        http_server_started.mark_as_done();
511
512        let server = server_builder
513            .serve(router.into_make_service())
514            .with_graceful_shutdown(async {
515                http_server_shutdown_rx.await.ok();
516            });
517
518        if let Err(e) = server.await {
519            error!(error = ?e, "HTTP server encountered an error.");
520        }
521
522        http_server_completed.mark_as_done();
523    });
524
525    (resource_notifier, http_server_shutdown_tx)
526}
527
528fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
529    let uri_port = uri.port_u16().unwrap_or(80);
530    let uri_host = uri
531        .host()
532        .ok_or_else(|| "host must be present in URI".to_string())
533        .and_then(|host| {
534            IpAddr::from_str(host)
535                .map_err(|_| "URI host must be valid IPv4/IPv6 address".to_string())
536        })
537        .expect("HTTP URI not valid");
538
539    SocketAddr::from((uri_host, uri_port))
540}