vector/components/validation/resources/
http.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    future::Future,
4    net::{IpAddr, SocketAddr},
5    str::FromStr,
6    sync::Arc,
7};
8
9use axum::{
10    Router,
11    response::IntoResponse,
12    routing::{MethodFilter, MethodRouter},
13};
14use bytes::{BufMut as _, BytesMut};
15use http::{Method, Request, StatusCode, Uri};
16use hyper::{Body, Client, Server};
17use tokio::{
18    select,
19    sync::{Mutex, Notify, mpsc, oneshot},
20};
21use tokio_util::codec::Decoder;
22use vector_lib::{
23    EstimatedJsonEncodedSizeOf,
24    codecs::{
25        CharacterDelimitedEncoder,
26        encoding::{Framer, Serializer::Json},
27    },
28    config::LogNamespace,
29    event::Event,
30};
31
32use super::{ResourceCodec, ResourceDirection, TestEvent, encode_test_event};
33use crate::components::validation::{
34    RunnerMetrics,
35    sync::{Configuring, TaskCoordinator},
36};
37
38/// An HTTP resource.
39#[derive(Clone)]
40pub struct HttpResourceConfig {
41    uri: Uri,
42    method: Option<Method>,
43    headers: Option<HashMap<String, String>>,
44}
45
46impl HttpResourceConfig {
47    pub const fn from_parts(uri: Uri, method: Option<Method>) -> Self {
48        Self {
49            uri,
50            method,
51            headers: None,
52        }
53    }
54
55    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
56        self.headers = Some(headers);
57        self
58    }
59
60    pub fn spawn_as_input(
61        self,
62        direction: ResourceDirection,
63        codec: ResourceCodec,
64        input_rx: mpsc::Receiver<TestEvent>,
65        task_coordinator: &TaskCoordinator<Configuring>,
66        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
67    ) {
68        match direction {
69            // The source will pull data from us.
70            ResourceDirection::Pull => {
71                spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
72            }
73            // We'll push data to the source.
74            ResourceDirection::Push => {
75                spawn_input_http_client(self, codec, input_rx, task_coordinator, runner_metrics)
76            }
77        }
78    }
79
80    pub fn spawn_as_output(self, ctx: HttpResourceOutputContext) -> vector_lib::Result<()> {
81        match ctx.direction {
82            // We'll pull data from the sink.
83            ResourceDirection::Pull => Ok(ctx.spawn_output_http_client(self)),
84            // The sink will push data to us.
85            ResourceDirection::Push => ctx.spawn_output_http_server(self),
86        }
87    }
88}
89
90/// Spawns an HTTP server that a source will make requests to in order to get events.
91#[allow(clippy::missing_const_for_fn)]
92fn spawn_input_http_server(
93    config: HttpResourceConfig,
94    codec: ResourceCodec,
95    mut input_rx: mpsc::Receiver<TestEvent>,
96    task_coordinator: &TaskCoordinator<Configuring>,
97    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
98) {
99    // This HTTP server will poll the input receiver for input events and buffer them. When a
100    // request comes in on the right path/method, one buffered input event will be sent back. If no
101    // buffered events are available when the request arrives, an empty response (204 No Content) is
102    // returned to the caller.
103    let outstanding_events = Arc::new(Mutex::new(VecDeque::new()));
104
105    // First, we'll build and spawn our HTTP server.
106    let encoder = codec.into_encoder();
107    let sendable_events = Arc::clone(&outstanding_events);
108
109    let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
110        task_coordinator,
111        &config,
112        runner_metrics,
113        move |_request, _runner_metrics| {
114            let sendable_events = Arc::clone(&sendable_events);
115            let mut encoder = encoder.clone();
116
117            async move {
118                let mut sendable_events = sendable_events.lock().await;
119                match sendable_events.pop_front() {
120                    Some(event) => {
121                        let mut buffer = BytesMut::new();
122                        encode_test_event(&mut encoder, &mut buffer, event);
123
124                        buffer.into_response()
125                    }
126                    _ => {
127                        // We'll send an empty 200 in the response since some
128                        // sources throw errors for anything other than a valid
129                        // response.
130                        StatusCode::OK.into_response()
131                    }
132                }
133            }
134        },
135    );
136
137    // Now we'll create and spawn the resource's core logic loop which drives the buffering of input
138    // events and working with the HTTP server as they're consumed.
139    let resource_started = task_coordinator.track_started();
140    let resource_completed = task_coordinator.track_completed();
141    let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();
142
143    tokio::spawn(async move {
144        resource_started.mark_as_done();
145        info!("HTTP server external input resource started.");
146
147        let mut input_finished = false;
148
149        loop {
150            select! {
151                // Handle input events being sent to us from the runner.
152                //
153                // When the channel closes, we'll mark the input as being finished so that we know
154                // to close the external resource itself once the HTTP server has consumed/sent all
155                // outstanding events.
156                maybe_event = input_rx.recv(), if !input_finished => match maybe_event {
157                    Some(event) => {
158                        let mut outstanding_events = outstanding_events.lock().await;
159                        outstanding_events.push_back(event);
160                    },
161                    None => {
162                        info!("HTTP server external input resource input is finished.");
163                        input_finished = true;
164                    },
165                },
166
167                _ = resource_notifier.notified() => {
168                    // The HTTP server notified us that it made progress with a send, which is
169                    // specifically that it serviced a request which returned a non-zero number of
170                    // events.
171                    //
172                    // This indicates that we need to check and see if our input is completed --
173                    // channel closed, no outstanding events left -- and thus if it's time to close.
174                    if input_finished {
175                        let outstanding_events = outstanding_events.lock().await;
176                        if outstanding_events.is_empty() {
177                            break
178                        }
179                    }
180                },
181            }
182        }
183        // Mark ourselves as completed now that we've sent all inputs to the source, and
184        // additionally signal the HTTP server to also gracefully shutdown.
185        info!("HTTP server external input resource signalling ready for shutdown.");
186
187        // Wait for the runner to signal us to shutdown
188        resource_shutdown_rx.wait().await;
189
190        // Shutdown the server
191        _ = http_server_shutdown_tx.send(());
192
193        info!("HTTP server external input resource marking as done.");
194        resource_completed.mark_as_done();
195
196        info!("HTTP server external input resource completed.");
197    });
198}
199
200/// Spawns an HTTP client that pushes events to a source which is accepting events over HTTP.
201fn spawn_input_http_client(
202    config: HttpResourceConfig,
203    codec: ResourceCodec,
204    mut input_rx: mpsc::Receiver<TestEvent>,
205    task_coordinator: &TaskCoordinator<Configuring>,
206    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
207) {
208    // Spin up an HTTP client that will push the input data to the source on a
209    // request-per-input-item basis. This runs serially and has no parallelism.
210    let started = task_coordinator.track_started();
211    let completed = task_coordinator.track_completed();
212    let mut encoder = codec.into_encoder();
213    let runner_metrics = Arc::clone(runner_metrics);
214
215    tokio::spawn(async move {
216        // Mark ourselves as started. We don't actually do anything until we get our first input
217        // message, though.
218        started.mark_as_done();
219        info!("HTTP client external input resource started.");
220
221        let client = Client::builder().build_http::<Body>();
222        let request_uri = config.uri;
223        let request_method = config.method.unwrap_or(Method::POST);
224        let headers = config.headers.unwrap_or_default();
225
226        while let Some(event) = input_rx.recv().await {
227            debug!("Got event to send from runner.");
228
229            let mut buffer = BytesMut::new();
230
231            let is_json = matches!(encoder.serializer(), Json(_))
232                && matches!(
233                    encoder.framer(),
234                    Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })
235                );
236
237            if is_json {
238                buffer.put_u8(b'[');
239            }
240
241            encode_test_event(&mut encoder, &mut buffer, event);
242
243            if is_json {
244                if !buffer.is_empty() {
245                    // remove trailing comma from last record
246                    buffer.truncate(buffer.len() - 1);
247                }
248                buffer.put_u8(b']');
249
250                // in this edge case we have removed the trailing comma (one byte) and added
251                // opening and closing braces (2 bytes) for a net add of one byte.
252                let mut runner_metrics = runner_metrics.lock().await;
253                runner_metrics.sent_bytes_total += 1;
254            }
255
256            let mut request_builder = Request::builder()
257                .uri(request_uri.clone())
258                .method(request_method.clone());
259
260            for (key, value) in &headers {
261                request_builder = request_builder.header(key, value);
262            }
263
264            let request = request_builder
265                .body(buffer.freeze().into())
266                .expect("should not fail to build request");
267
268            match client.request(request).await {
269                Ok(_response) => {
270                    // TODO: Emit metric that tracks a successful response from the HTTP server.
271                    debug!("Got response from server.");
272                }
273                Err(e) => {
274                    // TODO: Emit metric that tracks a failed response from the HTTP server.
275                    error!("Failed to send request: {}", e);
276                }
277            }
278        }
279
280        // Mark ourselves as completed now that we've sent all inputs to the source.
281        completed.mark_as_done();
282
283        info!("HTTP client external input resource completed.");
284    });
285}
286
287/// Anything that the output side HTTP external resource needs
288pub struct HttpResourceOutputContext<'a> {
289    pub direction: ResourceDirection,
290    pub codec: ResourceCodec,
291    pub output_tx: mpsc::Sender<Vec<Event>>,
292    pub task_coordinator: &'a TaskCoordinator<Configuring>,
293    pub input_events: Vec<TestEvent>,
294    pub runner_metrics: &'a Arc<Mutex<RunnerMetrics>>,
295    pub log_namespace: LogNamespace,
296}
297
298impl HttpResourceOutputContext<'_> {
299    /// Spawns an HTTP server that accepts events sent by a sink.
300    #[allow(clippy::missing_const_for_fn)]
301    fn spawn_output_http_server(&self, config: HttpResourceConfig) -> vector_lib::Result<()> {
302        // This HTTP server will wait for events to be sent by a sink, and collect them and send them on
303        // via an output sender. We accept/collect events until we're told to shutdown.
304
305        // First, we'll build and spawn our HTTP server.
306        let decoder = self.codec.into_decoder(self.log_namespace)?;
307
308        // Note that we currently don't differentiate which events should and shouldn't be rejected-
309        // we reject all events in this server if any are marked for rejection.
310        // In the future it might be useful to be able to select which to reject. That will involve
311        // adding logic to the test case which is passed down here, and to the event itself. Since
312        // we can't guarantee the order of events, we'd need a way to flag which ones need to be
313        // rejected.
314        let should_reject = self
315            .input_events
316            .iter()
317            .filter(|te| te.should_reject())
318            .count()
319            > 0;
320
321        let output_tx = self.output_tx.clone();
322        let (_, http_server_shutdown_tx) = spawn_http_server(
323            self.task_coordinator,
324            &config,
325            self.runner_metrics,
326            move |request, output_runner_metrics| {
327                let output_tx = output_tx.clone();
328                let mut decoder = decoder.clone();
329
330                async move {
331                    match hyper::body::to_bytes(request.into_body()).await {
332                        Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
333                        Ok(body) => {
334                            let byte_size = body.len();
335                            let mut body = BytesMut::from(&body[..]);
336                            loop {
337                                match decoder.decode_eof(&mut body) {
338                                    // `decoded_byte_size` is the decoded size of an individual frame. `byte_size` represents the size of the
339                                    // entire payload which may contain multiple frames and their delimiters.
340                                    Ok(Some((events, decoded_byte_size))) => {
341                                        if should_reject {
342                                            info!(
343                                                "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.",
344                                            );
345                                        } else {
346                                            let mut output_runner_metrics =
347                                                output_runner_metrics.lock().await;
348                                            info!(
349                                                "HTTP server external output resource decoded {decoded_byte_size:?} bytes."
350                                            );
351
352                                            // Update the runner metrics for the received events. This will later
353                                            // be used in the Validators, as the "expected" case.
354                                            output_runner_metrics.received_bytes_total +=
355                                                byte_size as u64;
356
357                                            output_runner_metrics.received_events_total +=
358                                                events.len() as u64;
359
360                                            events.iter().for_each(|event| {
361                                                output_runner_metrics.received_event_bytes_total +=
362                                                    event.estimated_json_encoded_size_of().get()
363                                                        as u64;
364                                            });
365
366                                            output_tx
367                                                .send(events.to_vec())
368                                                .await
369                                                .expect("should not fail to send output event");
370                                        }
371                                    }
372                                    Ok(None) => {
373                                        if should_reject {
374                                            // This status code is not retried and should result in the component under test
375                                            // emitting error events
376                                            return StatusCode::BAD_REQUEST.into_response();
377                                        } else {
378                                            return StatusCode::OK.into_response();
379                                        }
380                                    }
381                                    Err(_) => {
382                                        error!(
383                                            "HTTP server failed to decode {:?}",
384                                            String::from_utf8_lossy(&body)
385                                        );
386                                        return StatusCode::INTERNAL_SERVER_ERROR.into_response();
387                                    }
388                                }
389                            }
390                        }
391                    }
392                }
393            },
394        );
395
396        // Now we'll create and spawn the resource's core logic loop which simply waits for the runner
397        // to instruct us to shutdown, and when that happens, cascades to shutting down the HTTP server.
398        let resource_started = self.task_coordinator.track_started();
399        let resource_completed = self.task_coordinator.track_completed();
400        let mut resource_shutdown_rx = self.task_coordinator.register_for_shutdown();
401
402        tokio::spawn(async move {
403            resource_started.mark_as_done();
404            info!("HTTP server external output resource started.");
405
406            // Wait for the runner to tell us to shutdown
407            resource_shutdown_rx.wait().await;
408
409            // signal the server to shutdown
410            let _ = http_server_shutdown_tx.send(());
411
412            // mark ourselves as done
413            resource_completed.mark_as_done();
414
415            info!("HTTP server external output resource completed.");
416        });
417
418        Ok(())
419    }
420
421    /// Spawns an HTTP client that pulls events by making requests to an HTTP server driven by a sink.
422    #[allow(clippy::missing_const_for_fn)]
423    fn spawn_output_http_client(&self, _config: HttpResourceConfig) {
424        // TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be
425        // scraped... but since we need special logic to aggregate/deduplicate scraped metrics, we can't
426        // use this generically for that purpose.
427        todo!()
428    }
429}
430
431fn spawn_http_server<H, F, R>(
432    task_coordinator: &TaskCoordinator<Configuring>,
433    config: &HttpResourceConfig,
434    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
435    handler: H,
436) -> (Arc<Notify>, oneshot::Sender<()>)
437where
438    H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
439    F: Future<Output = R> + Send,
440    R: IntoResponse,
441{
442    let http_server_started = task_coordinator.track_started();
443    let http_server_completed = task_coordinator.track_completed();
444
445    let listen_addr = socketaddr_from_uri(&config.uri);
446    let request_path = config
447        .uri
448        .path_and_query()
449        .map(|pq| pq.as_str().to_string())
450        .unwrap_or_else(|| "/".to_string());
451    let request_method = config.method.clone().unwrap_or(Method::POST);
452
453    // Create our synchronization primitives that are shared between the HTTP server and the
454    // resource's core logic loop.
455    //
456    // This will let the resource be able to trigger the HTTP server to gracefully shutdown, as well
457    // as be notified when the HTTP server has served a request, so that it can check if all input
458    // events have been sent yet.
459    let (http_server_shutdown_tx, http_server_shutdown_rx) = oneshot::channel();
460    let resource_notifier = Arc::new(Notify::new());
461    let server_notifier = Arc::clone(&resource_notifier);
462
463    let output_runner_metrics = Arc::clone(runner_metrics);
464
465    tokio::spawn(async move {
466        // Create our HTTP server by binding as early as possible to return an error if we can't
467        // actually bind.
468        let server_builder =
469            Server::try_bind(&listen_addr).expect("Failed to bind to listen address.");
470
471        // Create our router, which is a bit boilerplate-y because we take the HTTP method
472        // parametrically. We generate a handler that calls the given `handler` and then triggers
473        // the notifier shared by the HTTP server and the resource's core logic loop.
474        //
475        // Every time a request is processed, we notify the core logic loop so it can continue
476        // checking to see if it's time to fully close once all input events have been consumed and
477        // the input receiver is closed.
478        let method_filter = MethodFilter::try_from(request_method)
479            .expect("should not fail to convert method to method filter");
480        let method_router = MethodRouter::new()
481            .fallback(|req: Request<Body>| async move {
482                error!(
483                    path = req.uri().path(),
484                    method = req.method().as_str(),
485                    "Component sent request to a different path/method than expected."
486                );
487
488                StatusCode::METHOD_NOT_ALLOWED
489            })
490            .on(method_filter, move |request: Request<Body>| {
491                let request_handler = handler(request, output_runner_metrics);
492                let notifier = Arc::clone(&server_notifier);
493
494                async move {
495                    let response = request_handler.await;
496                    notifier.notify_one();
497                    response
498                }
499            });
500
501        let router = Router::new().route(&request_path, method_router).fallback(
502            |req: Request<Body>| async move {
503                error!(?req, "Component sent request the server could not route.");
504                StatusCode::NOT_FOUND
505            },
506        );
507
508        // Now actually run/drive the HTTP server and process requests until we're told to shutdown.
509        http_server_started.mark_as_done();
510
511        let server = server_builder
512            .serve(router.into_make_service())
513            .with_graceful_shutdown(async {
514                http_server_shutdown_rx.await.ok();
515            });
516
517        if let Err(e) = server.await {
518            error!(error = ?e, "HTTP server encountered an error.");
519        }
520
521        http_server_completed.mark_as_done();
522    });
523
524    (resource_notifier, http_server_shutdown_tx)
525}
526
527fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
528    let uri_port = uri.port_u16().unwrap_or(80);
529    let uri_host = uri
530        .host()
531        .ok_or_else(|| "host must be present in URI".to_string())
532        .and_then(|host| {
533            IpAddr::from_str(host)
534                .map_err(|_| "URI host must be valid IPv4/IPv6 address".to_string())
535        })
536        .expect("HTTP URI not valid");
537
538    SocketAddr::from((uri_host, uri_port))
539}