vector/components/validation/resources/
http.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    future::Future,
4    net::{IpAddr, SocketAddr},
5    str::FromStr,
6    sync::Arc,
7};
8
9use axum::{
10    Router,
11    response::IntoResponse,
12    routing::{MethodFilter, MethodRouter},
13};
14use bytes::{BufMut as _, BytesMut};
15use http::{Method, Request, StatusCode, Uri};
16use hyper::{Body, Client, Server};
17use tokio::{
18    select,
19    sync::{Mutex, Notify, mpsc, oneshot},
20};
21use tokio_util::codec::Decoder;
22use vector_lib::{
23    EstimatedJsonEncodedSizeOf,
24    codecs::{
25        CharacterDelimitedEncoder,
26        encoding::{Framer, Serializer::Json},
27    },
28    config::LogNamespace,
29    event::Event,
30};
31
32use super::{ResourceCodec, ResourceDirection, TestEvent, encode_test_event};
33use crate::components::validation::{
34    RunnerMetrics,
35    sync::{Configuring, TaskCoordinator},
36};
37
38/// An HTTP resource.
39#[derive(Clone)]
40pub struct HttpResourceConfig {
41    uri: Uri,
42    method: Option<Method>,
43    headers: Option<HashMap<String, String>>,
44}
45
46impl HttpResourceConfig {
47    pub const fn from_parts(uri: Uri, method: Option<Method>) -> Self {
48        Self {
49            uri,
50            method,
51            headers: None,
52        }
53    }
54
55    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
56        self.headers = Some(headers);
57        self
58    }
59
60    pub fn spawn_as_input(
61        self,
62        direction: ResourceDirection,
63        codec: ResourceCodec,
64        input_rx: mpsc::Receiver<TestEvent>,
65        task_coordinator: &TaskCoordinator<Configuring>,
66        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
67    ) {
68        match direction {
69            // The source will pull data from us.
70            ResourceDirection::Pull => {
71                spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
72            }
73            // We'll push data to the source.
74            ResourceDirection::Push => {
75                spawn_input_http_client(self, codec, input_rx, task_coordinator, runner_metrics)
76            }
77        }
78    }
79
80    pub fn spawn_as_output(self, ctx: HttpResourceOutputContext) -> vector_lib::Result<()> {
81        match ctx.direction {
82            // We'll pull data from the sink.
83            ResourceDirection::Pull => Ok(ctx.spawn_output_http_client(self)),
84            // The sink will push data to us.
85            ResourceDirection::Push => ctx.spawn_output_http_server(self),
86        }
87    }
88}
89
90/// Spawns an HTTP server that a source will make requests to in order to get events.
91#[allow(clippy::missing_const_for_fn)]
92fn spawn_input_http_server(
93    config: HttpResourceConfig,
94    codec: ResourceCodec,
95    mut input_rx: mpsc::Receiver<TestEvent>,
96    task_coordinator: &TaskCoordinator<Configuring>,
97    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
98) {
99    // This HTTP server will poll the input receiver for input events and buffer them. When a
100    // request comes in on the right path/method, one buffered input event will be sent back. If no
101    // buffered events are available when the request arrives, an empty response (204 No Content) is
102    // returned to the caller.
103    let outstanding_events = Arc::new(Mutex::new(VecDeque::new()));
104
105    // First, we'll build and spawn our HTTP server.
106    let encoder = codec.into_encoder();
107    let sendable_events = Arc::clone(&outstanding_events);
108
109    let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
110        task_coordinator,
111        &config,
112        runner_metrics,
113        move |_request, _runner_metrics| {
114            let sendable_events = Arc::clone(&sendable_events);
115            let mut encoder = encoder.clone();
116
117            async move {
118                let mut sendable_events = sendable_events.lock().await;
119                match sendable_events.pop_front() {
120                    Some(event) => {
121                        let mut buffer = BytesMut::new();
122                        encode_test_event(&mut encoder, &mut buffer, event);
123
124                        buffer.into_response()
125                    }
126                    _ => {
127                        // We'll send an empty 200 in the response since some
128                        // sources throw errors for anything other than a valid
129                        // response.
130                        StatusCode::OK.into_response()
131                    }
132                }
133            }
134        },
135    );
136
137    // Now we'll create and spawn the resource's core logic loop which drives the buffering of input
138    // events and working with the HTTP server as they're consumed.
139    let resource_started = task_coordinator.track_started();
140    let resource_completed = task_coordinator.track_completed();
141    let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();
142
143    tokio::spawn(async move {
144        resource_started.mark_as_done();
145        info!("HTTP server external input resource started.");
146
147        let mut input_finished = false;
148
149        loop {
150            select! {
151                // Handle input events being sent to us from the runner.
152                //
153                // When the channel closes, we'll mark the input as being finished so that we know
154                // to close the external resource itself once the HTTP server has consumed/sent all
155                // outstanding events.
156                maybe_event = input_rx.recv(), if !input_finished => match maybe_event {
157                    Some(event) => {
158                        let mut outstanding_events = outstanding_events.lock().await;
159                        outstanding_events.push_back(event);
160                    },
161                    None => {
162                        info!("HTTP server external input resource input is finished.");
163                        input_finished = true;
164                    },
165                },
166
167                _ = resource_notifier.notified() => {
168                    // The HTTP server notified us that it made progress with a send, which is
169                    // specifically that it serviced a request which returned a non-zero number of
170                    // events.
171                    //
172                    // This indicates that we need to check and see if our input is completed --
173                    // channel closed, no outstanding events left -- and thus if it's time to close.
174                    if input_finished {
175                        let outstanding_events = outstanding_events.lock().await;
176                        if outstanding_events.is_empty() {
177                            break
178                        }
179                    }
180                },
181            }
182        }
183        // Mark ourselves as completed now that we've sent all inputs to the source, and
184        // additionally signal the HTTP server to also gracefully shutdown.
185        info!("HTTP server external input resource signalling ready for shutdown.");
186
187        // Wait for the runner to signal us to shutdown
188        resource_shutdown_rx.wait().await;
189
190        // Shutdown the server
191        _ = http_server_shutdown_tx.send(());
192
193        info!("HTTP server external input resource marking as done.");
194        resource_completed.mark_as_done();
195
196        info!("HTTP server external input resource completed.");
197    });
198}
199
200/// Spawns an HTTP client that pushes events to a source which is accepting events over HTTP.
201fn spawn_input_http_client(
202    config: HttpResourceConfig,
203    codec: ResourceCodec,
204    mut input_rx: mpsc::Receiver<TestEvent>,
205    task_coordinator: &TaskCoordinator<Configuring>,
206    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
207) {
208    // Spin up an HTTP client that will push the input data to the source on a
209    // request-per-input-item basis. This runs serially and has no parallelism.
210    let started = task_coordinator.track_started();
211    let completed = task_coordinator.track_completed();
212    let mut encoder = codec.into_encoder();
213    let runner_metrics = Arc::clone(runner_metrics);
214
215    tokio::spawn(async move {
216        // Mark ourselves as started. We don't actually do anything until we get our first input
217        // message, though.
218        started.mark_as_done();
219        info!("HTTP client external input resource started.");
220
221        let client = Client::builder().build_http::<Body>();
222        let request_uri = config.uri;
223        let request_method = config.method.unwrap_or(Method::POST);
224        let headers = config.headers.unwrap_or_default();
225
226        while let Some(event) = input_rx.recv().await {
227            debug!("Got event to send from runner.");
228
229            let mut buffer = BytesMut::new();
230
231            let is_json = matches!(encoder.serializer(), Json(_))
232                && matches!(
233                    encoder.framer(),
234                    Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })
235                );
236
237            if is_json {
238                buffer.put_u8(b'[');
239            }
240
241            encode_test_event(&mut encoder, &mut buffer, event);
242
243            if is_json {
244                if !buffer.is_empty() {
245                    // remove trailing comma from last record
246                    buffer.truncate(buffer.len() - 1);
247                }
248                buffer.put_u8(b']');
249
250                // in this edge case we have removed the trailing comma (one byte) and added
251                // opening and closing braces (2 bytes) for a net add of one byte.
252                let mut runner_metrics = runner_metrics.lock().await;
253                runner_metrics.sent_bytes_total += 1;
254            }
255
256            let mut request_builder = Request::builder()
257                .uri(request_uri.clone())
258                .method(request_method.clone());
259
260            for (key, value) in &headers {
261                request_builder = request_builder.header(key, value);
262            }
263
264            let request = request_builder
265                .body(buffer.freeze().into())
266                .expect("should not fail to build request");
267
268            match client.request(request).await {
269                Ok(_response) => {
270                    // TODO: Emit metric that tracks a successful response from the HTTP server.
271                    debug!("Got response from server.");
272                }
273                Err(e) => {
274                    // TODO: Emit metric that tracks a failed response from the HTTP server.
275                    error!("Failed to send request: {}", e);
276                }
277            }
278        }
279
280        // Mark ourselves as completed now that we've sent all inputs to the source.
281        completed.mark_as_done();
282
283        info!("HTTP client external input resource completed.");
284    });
285}
286
287/// Anything that the output side HTTP external resource needs
288pub struct HttpResourceOutputContext<'a> {
289    pub direction: ResourceDirection,
290    pub codec: ResourceCodec,
291    pub output_tx: mpsc::Sender<Vec<Event>>,
292    pub task_coordinator: &'a TaskCoordinator<Configuring>,
293    pub input_events: Vec<TestEvent>,
294    pub runner_metrics: &'a Arc<Mutex<RunnerMetrics>>,
295    pub log_namespace: LogNamespace,
296}
297
298impl HttpResourceOutputContext<'_> {
299    /// Spawns an HTTP server that accepts events sent by a sink.
300    #[allow(clippy::missing_const_for_fn)]
301    fn spawn_output_http_server(&self, config: HttpResourceConfig) -> vector_lib::Result<()> {
302        // This HTTP server will wait for events to be sent by a sink, and collect them and send them on
303        // via an output sender. We accept/collect events until we're told to shutdown.
304
305        // First, we'll build and spawn our HTTP server.
306        let decoder = self.codec.into_decoder(self.log_namespace)?;
307
308        // Note that we currently don't differentiate which events should and shouldn't be rejected-
309        // we reject all events in this server if any are marked for rejection.
310        // In the future it might be useful to be able to select which to reject. That will involve
311        // adding logic to the test case which is passed down here, and to the event itself. Since
312        // we can't guarantee the order of events, we'd need a way to flag which ones need to be
313        // rejected.
314        let should_reject = self
315            .input_events
316            .iter()
317            .filter(|te| te.should_reject())
318            .count()
319            > 0;
320
321        let output_tx = self.output_tx.clone();
322        let (_, http_server_shutdown_tx) = spawn_http_server(
323            self.task_coordinator,
324            &config,
325            self.runner_metrics,
326            move |request, output_runner_metrics| {
327                let output_tx = output_tx.clone();
328                let mut decoder = decoder.clone();
329
330                async move {
331                    match hyper::body::to_bytes(request.into_body()).await {
332                        Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
333                        Ok(body) => {
334                            let byte_size = body.len();
335                            let mut body = BytesMut::from(&body[..]);
336                            loop {
337                                match decoder.decode_eof(&mut body) {
338                                    // `decoded_byte_size` is the decoded size of an individual frame. `byte_size` represents the size of the
339                                    // entire payload which may contain multiple frames and their delimiters.
340                                    Ok(Some((events, decoded_byte_size))) => {
341                                        if should_reject {
342                                            info!(
343                                                internal_log_rate_limit = true,
344                                                "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.",
345                                            );
346                                        } else {
347                                            let mut output_runner_metrics =
348                                                output_runner_metrics.lock().await;
349                                            info!(
350                                                internal_log_rate_limit = true,
351                                                "HTTP server external output resource decoded {decoded_byte_size:?} bytes."
352                                            );
353
354                                            // Update the runner metrics for the received events. This will later
355                                            // be used in the Validators, as the "expected" case.
356                                            output_runner_metrics.received_bytes_total +=
357                                                byte_size as u64;
358
359                                            output_runner_metrics.received_events_total +=
360                                                events.len() as u64;
361
362                                            events.iter().for_each(|event| {
363                                                output_runner_metrics.received_event_bytes_total +=
364                                                    event.estimated_json_encoded_size_of().get()
365                                                        as u64;
366                                            });
367
368                                            output_tx
369                                                .send(events.to_vec())
370                                                .await
371                                                .expect("should not fail to send output event");
372                                        }
373                                    }
374                                    Ok(None) => {
375                                        if should_reject {
376                                            // This status code is not retried and should result in the component under test
377                                            // emitting error events
378                                            return StatusCode::BAD_REQUEST.into_response();
379                                        } else {
380                                            return StatusCode::OK.into_response();
381                                        }
382                                    }
383                                    Err(_) => {
384                                        error!(
385                                            "HTTP server failed to decode {:?}",
386                                            String::from_utf8_lossy(&body)
387                                        );
388                                        return StatusCode::INTERNAL_SERVER_ERROR.into_response();
389                                    }
390                                }
391                            }
392                        }
393                    }
394                }
395            },
396        );
397
398        // Now we'll create and spawn the resource's core logic loop which simply waits for the runner
399        // to instruct us to shutdown, and when that happens, cascades to shutting down the HTTP server.
400        let resource_started = self.task_coordinator.track_started();
401        let resource_completed = self.task_coordinator.track_completed();
402        let mut resource_shutdown_rx = self.task_coordinator.register_for_shutdown();
403
404        tokio::spawn(async move {
405            resource_started.mark_as_done();
406            info!("HTTP server external output resource started.");
407
408            // Wait for the runner to tell us to shutdown
409            resource_shutdown_rx.wait().await;
410
411            // signal the server to shutdown
412            let _ = http_server_shutdown_tx.send(());
413
414            // mark ourselves as done
415            resource_completed.mark_as_done();
416
417            info!("HTTP server external output resource completed.");
418        });
419
420        Ok(())
421    }
422
423    /// Spawns an HTTP client that pulls events by making requests to an HTTP server driven by a sink.
424    #[allow(clippy::missing_const_for_fn)]
425    fn spawn_output_http_client(&self, _config: HttpResourceConfig) {
426        // TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be
427        // scraped... but since we need special logic to aggregate/deduplicate scraped metrics, we can't
428        // use this generically for that purpose.
429        todo!()
430    }
431}
432
433fn spawn_http_server<H, F, R>(
434    task_coordinator: &TaskCoordinator<Configuring>,
435    config: &HttpResourceConfig,
436    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
437    handler: H,
438) -> (Arc<Notify>, oneshot::Sender<()>)
439where
440    H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
441    F: Future<Output = R> + Send,
442    R: IntoResponse,
443{
444    let http_server_started = task_coordinator.track_started();
445    let http_server_completed = task_coordinator.track_completed();
446
447    let listen_addr = socketaddr_from_uri(&config.uri);
448    let request_path = config
449        .uri
450        .path_and_query()
451        .map(|pq| pq.as_str().to_string())
452        .unwrap_or_else(|| "/".to_string());
453    let request_method = config.method.clone().unwrap_or(Method::POST);
454
455    // Create our synchronization primitives that are shared between the HTTP server and the
456    // resource's core logic loop.
457    //
458    // This will let the resource be able to trigger the HTTP server to gracefully shutdown, as well
459    // as be notified when the HTTP server has served a request, so that it can check if all input
460    // events have been sent yet.
461    let (http_server_shutdown_tx, http_server_shutdown_rx) = oneshot::channel();
462    let resource_notifier = Arc::new(Notify::new());
463    let server_notifier = Arc::clone(&resource_notifier);
464
465    let output_runner_metrics = Arc::clone(runner_metrics);
466
467    tokio::spawn(async move {
468        // Create our HTTP server by binding as early as possible to return an error if we can't
469        // actually bind.
470        let server_builder =
471            Server::try_bind(&listen_addr).expect("Failed to bind to listen address.");
472
473        // Create our router, which is a bit boilerplate-y because we take the HTTP method
474        // parametrically. We generate a handler that calls the given `handler` and then triggers
475        // the notifier shared by the HTTP server and the resource's core logic loop.
476        //
477        // Every time a request is processed, we notify the core logic loop so it can continue
478        // checking to see if it's time to fully close once all input events have been consumed and
479        // the input receiver is closed.
480        let method_filter = MethodFilter::try_from(request_method)
481            .expect("should not fail to convert method to method filter");
482        let method_router = MethodRouter::new()
483            .fallback(|req: Request<Body>| async move {
484                error!(
485                    path = req.uri().path(),
486                    method = req.method().as_str(),
487                    "Component sent request to a different path/method than expected."
488                );
489
490                StatusCode::METHOD_NOT_ALLOWED
491            })
492            .on(method_filter, move |request: Request<Body>| {
493                let request_handler = handler(request, output_runner_metrics);
494                let notifier = Arc::clone(&server_notifier);
495
496                async move {
497                    let response = request_handler.await;
498                    notifier.notify_one();
499                    response
500                }
501            });
502
503        let router = Router::new().route(&request_path, method_router).fallback(
504            |req: Request<Body>| async move {
505                error!(?req, "Component sent request the server could not route.");
506                StatusCode::NOT_FOUND
507            },
508        );
509
510        // Now actually run/drive the HTTP server and process requests until we're told to shutdown.
511        http_server_started.mark_as_done();
512
513        let server = server_builder
514            .serve(router.into_make_service())
515            .with_graceful_shutdown(async {
516                http_server_shutdown_rx.await.ok();
517            });
518
519        if let Err(e) = server.await {
520            error!(error = ?e, "HTTP server encountered an error.");
521        }
522
523        http_server_completed.mark_as_done();
524    });
525
526    (resource_notifier, http_server_shutdown_tx)
527}
528
529fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
530    let uri_port = uri.port_u16().unwrap_or(80);
531    let uri_host = uri
532        .host()
533        .ok_or_else(|| "host must be present in URI".to_string())
534        .and_then(|host| {
535            IpAddr::from_str(host)
536                .map_err(|_| "URI host must be valid IPv4/IPv6 address".to_string())
537        })
538        .expect("HTTP URI not valid");
539
540    SocketAddr::from((uri_host, uri_port))
541}