vector/components/validation/resources/
http.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    future::Future,
4    net::{IpAddr, SocketAddr},
5    str::FromStr,
6    sync::Arc,
7};
8
9use axum::{
10    Router,
11    response::IntoResponse,
12    routing::{MethodFilter, MethodRouter},
13};
14use bytes::{BufMut as _, BytesMut};
15use http::{Method, Request, StatusCode, Uri};
16use http_body::{Body as _, Collected};
17use hyper::{Body, Client, Server};
18use tokio::{
19    select,
20    sync::{Mutex, Notify, mpsc, oneshot},
21};
22use tokio_util::codec::Decoder;
23use vector_lib::{
24    EstimatedJsonEncodedSizeOf,
25    codecs::{
26        CharacterDelimitedEncoder,
27        encoding::{Framer, Serializer::Json},
28    },
29    config::LogNamespace,
30    event::Event,
31};
32
33use super::{ResourceCodec, ResourceDirection, TestEvent, encode_test_event};
34use crate::components::validation::{
35    RunnerMetrics,
36    sync::{Configuring, TaskCoordinator},
37};
38
39/// An HTTP resource.
40#[derive(Clone)]
41pub struct HttpResourceConfig {
42    uri: Uri,
43    method: Option<Method>,
44    headers: Option<HashMap<String, String>>,
45}
46
47impl HttpResourceConfig {
48    pub const fn from_parts(uri: Uri, method: Option<Method>) -> Self {
49        Self {
50            uri,
51            method,
52            headers: None,
53        }
54    }
55
56    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
57        self.headers = Some(headers);
58        self
59    }
60
61    pub fn spawn_as_input(
62        self,
63        direction: ResourceDirection,
64        codec: ResourceCodec,
65        input_rx: mpsc::Receiver<TestEvent>,
66        task_coordinator: &TaskCoordinator<Configuring>,
67        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
68    ) {
69        match direction {
70            // The source will pull data from us.
71            ResourceDirection::Pull => {
72                spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
73            }
74            // We'll push data to the source.
75            ResourceDirection::Push => {
76                spawn_input_http_client(self, codec, input_rx, task_coordinator, runner_metrics)
77            }
78        }
79    }
80
81    pub fn spawn_as_output(self, ctx: HttpResourceOutputContext) -> vector_lib::Result<()> {
82        match ctx.direction {
83            // We'll pull data from the sink.
84            ResourceDirection::Pull => Ok(ctx.spawn_output_http_client(self)),
85            // The sink will push data to us.
86            ResourceDirection::Push => ctx.spawn_output_http_server(self),
87        }
88    }
89}
90
91/// Spawns an HTTP server that a source will make requests to in order to get events.
92#[allow(clippy::missing_const_for_fn)]
93fn spawn_input_http_server(
94    config: HttpResourceConfig,
95    codec: ResourceCodec,
96    mut input_rx: mpsc::Receiver<TestEvent>,
97    task_coordinator: &TaskCoordinator<Configuring>,
98    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
99) {
100    // This HTTP server will poll the input receiver for input events and buffer them. When a
101    // request comes in on the right path/method, one buffered input event will be sent back. If no
102    // buffered events are available when the request arrives, an empty response (204 No Content) is
103    // returned to the caller.
104    let outstanding_events = Arc::new(Mutex::new(VecDeque::new()));
105
106    // First, we'll build and spawn our HTTP server.
107    let encoder = codec.into_encoder();
108    let sendable_events = Arc::clone(&outstanding_events);
109
110    let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
111        task_coordinator,
112        &config,
113        runner_metrics,
114        move |_request, _runner_metrics| {
115            let sendable_events = Arc::clone(&sendable_events);
116            let mut encoder = encoder.clone();
117
118            async move {
119                let mut sendable_events = sendable_events.lock().await;
120                match sendable_events.pop_front() {
121                    Some(event) => {
122                        let mut buffer = BytesMut::new();
123                        encode_test_event(&mut encoder, &mut buffer, event);
124
125                        buffer.into_response()
126                    }
127                    _ => {
128                        // We'll send an empty 200 in the response since some
129                        // sources throw errors for anything other than a valid
130                        // response.
131                        StatusCode::OK.into_response()
132                    }
133                }
134            }
135        },
136    );
137
138    // Now we'll create and spawn the resource's core logic loop which drives the buffering of input
139    // events and working with the HTTP server as they're consumed.
140    let resource_started = task_coordinator.track_started();
141    let resource_completed = task_coordinator.track_completed();
142    let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();
143
144    tokio::spawn(async move {
145        resource_started.mark_as_done();
146        info!("HTTP server external input resource started.");
147
148        let mut input_finished = false;
149
150        loop {
151            select! {
152                // Handle input events being sent to us from the runner.
153                //
154                // When the channel closes, we'll mark the input as being finished so that we know
155                // to close the external resource itself once the HTTP server has consumed/sent all
156                // outstanding events.
157                maybe_event = input_rx.recv(), if !input_finished => match maybe_event {
158                    Some(event) => {
159                        let mut outstanding_events = outstanding_events.lock().await;
160                        outstanding_events.push_back(event);
161                    },
162                    None => {
163                        info!("HTTP server external input resource input is finished.");
164                        input_finished = true;
165                    },
166                },
167
168                _ = resource_notifier.notified() => {
169                    // The HTTP server notified us that it made progress with a send, which is
170                    // specifically that it serviced a request which returned a non-zero number of
171                    // events.
172                    //
173                    // This indicates that we need to check and see if our input is completed --
174                    // channel closed, no outstanding events left -- and thus if it's time to close.
175                    if input_finished {
176                        let outstanding_events = outstanding_events.lock().await;
177                        if outstanding_events.is_empty() {
178                            break
179                        }
180                    }
181                },
182            }
183        }
184        // Mark ourselves as completed now that we've sent all inputs to the source, and
185        // additionally signal the HTTP server to also gracefully shutdown.
186        info!("HTTP server external input resource signalling ready for shutdown.");
187
188        // Wait for the runner to signal us to shutdown
189        resource_shutdown_rx.wait().await;
190
191        // Shutdown the server
192        _ = http_server_shutdown_tx.send(());
193
194        info!("HTTP server external input resource marking as done.");
195        resource_completed.mark_as_done();
196
197        info!("HTTP server external input resource completed.");
198    });
199}
200
201/// Spawns an HTTP client that pushes events to a source which is accepting events over HTTP.
202fn spawn_input_http_client(
203    config: HttpResourceConfig,
204    codec: ResourceCodec,
205    mut input_rx: mpsc::Receiver<TestEvent>,
206    task_coordinator: &TaskCoordinator<Configuring>,
207    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
208) {
209    // Spin up an HTTP client that will push the input data to the source on a
210    // request-per-input-item basis. This runs serially and has no parallelism.
211    let started = task_coordinator.track_started();
212    let completed = task_coordinator.track_completed();
213    let mut encoder = codec.into_encoder();
214    let runner_metrics = Arc::clone(runner_metrics);
215
216    tokio::spawn(async move {
217        // Mark ourselves as started. We don't actually do anything until we get our first input
218        // message, though.
219        started.mark_as_done();
220        info!("HTTP client external input resource started.");
221
222        let client = Client::builder().build_http::<Body>();
223        let request_uri = config.uri;
224        let request_method = config.method.unwrap_or(Method::POST);
225        let headers = config.headers.unwrap_or_default();
226
227        while let Some(event) = input_rx.recv().await {
228            debug!("Got event to send from runner.");
229
230            let mut buffer = BytesMut::new();
231
232            let is_json = matches!(encoder.serializer(), Json(_))
233                && matches!(
234                    encoder.framer(),
235                    Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })
236                );
237
238            if is_json {
239                buffer.put_u8(b'[');
240            }
241
242            encode_test_event(&mut encoder, &mut buffer, event);
243
244            if is_json {
245                if !buffer.is_empty() {
246                    // remove trailing comma from last record
247                    buffer.truncate(buffer.len() - 1);
248                }
249                buffer.put_u8(b']');
250
251                // in this edge case we have removed the trailing comma (one byte) and added
252                // opening and closing braces (2 bytes) for a net add of one byte.
253                let mut runner_metrics = runner_metrics.lock().await;
254                runner_metrics.sent_bytes_total += 1;
255            }
256
257            let mut request_builder = Request::builder()
258                .uri(request_uri.clone())
259                .method(request_method.clone());
260
261            for (key, value) in &headers {
262                request_builder = request_builder.header(key, value);
263            }
264
265            let request = request_builder
266                .body(buffer.freeze().into())
267                .expect("should not fail to build request");
268
269            match client.request(request).await {
270                Ok(_response) => {
271                    // TODO: Emit metric that tracks a successful response from the HTTP server.
272                    debug!("Got response from server.");
273                }
274                Err(e) => {
275                    // TODO: Emit metric that tracks a failed response from the HTTP server.
276                    error!("Failed to send request: {}", e);
277                }
278            }
279        }
280
281        // Mark ourselves as completed now that we've sent all inputs to the source.
282        completed.mark_as_done();
283
284        info!("HTTP client external input resource completed.");
285    });
286}
287
288/// Anything that the output side HTTP external resource needs
289pub struct HttpResourceOutputContext<'a> {
290    pub direction: ResourceDirection,
291    pub codec: ResourceCodec,
292    pub output_tx: mpsc::Sender<Vec<Event>>,
293    pub task_coordinator: &'a TaskCoordinator<Configuring>,
294    pub input_events: Vec<TestEvent>,
295    pub runner_metrics: &'a Arc<Mutex<RunnerMetrics>>,
296    pub log_namespace: LogNamespace,
297}
298
299impl HttpResourceOutputContext<'_> {
300    /// Spawns an HTTP server that accepts events sent by a sink.
301    #[allow(clippy::missing_const_for_fn)]
302    fn spawn_output_http_server(&self, config: HttpResourceConfig) -> vector_lib::Result<()> {
303        // This HTTP server will wait for events to be sent by a sink, and collect them and send them on
304        // via an output sender. We accept/collect events until we're told to shutdown.
305
306        // First, we'll build and spawn our HTTP server.
307        let decoder = self.codec.into_decoder(self.log_namespace)?;
308
309        // Note that we currently don't differentiate which events should and shouldn't be rejected-
310        // we reject all events in this server if any are marked for rejection.
311        // In the future it might be useful to be able to select which to reject. That will involve
312        // adding logic to the test case which is passed down here, and to the event itself. Since
313        // we can't guarantee the order of events, we'd need a way to flag which ones need to be
314        // rejected.
315        let should_reject = self
316            .input_events
317            .iter()
318            .filter(|te| te.should_reject())
319            .count()
320            > 0;
321
322        let output_tx = self.output_tx.clone();
323        let (_, http_server_shutdown_tx) = spawn_http_server(
324            self.task_coordinator,
325            &config,
326            self.runner_metrics,
327            move |request, output_runner_metrics| {
328                let output_tx = output_tx.clone();
329                let mut decoder = decoder.clone();
330
331                async move {
332                    // Extract the Content-Encoding header before consuming the request
333                    let content_encoding = request
334                        .headers()
335                        .get("content-encoding")
336                        .and_then(|v| v.to_str().ok())
337                        .map(|s| s.to_string());
338
339                    match request.into_body().collect().await.map(Collected::to_bytes) {
340                        Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
341                        Ok(body) => {
342                            let byte_size = body.len();
343
344                            // Validation tests should not use compression - error if we receive compressed data
345                            if let Some(encoding) = &content_encoding
346                                && encoding != "identity"
347                            {
348                                error!(
349                                    "Received compressed data (Content-Encoding: {encoding}). \
350                                        Validation tests assert on bytes sizes and compressed size might not be deterministic."
351                                );
352                                return StatusCode::BAD_REQUEST.into_response();
353                            }
354
355                            let mut body = BytesMut::from(&body[..]);
356                            loop {
357                                match decoder.decode_eof(&mut body) {
358                                    // `decoded_byte_size` is the decoded size of an individual frame. `byte_size` represents the size of the
359                                    // entire payload which may contain multiple frames and their delimiters.
360                                    Ok(Some((events, decoded_byte_size))) => {
361                                        if should_reject {
362                                            info!(
363                                                "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.",
364                                            );
365                                        } else {
366                                            let mut output_runner_metrics =
367                                                output_runner_metrics.lock().await;
368                                            info!(
369                                                "HTTP server external output resource decoded {decoded_byte_size:?} bytes."
370                                            );
371
372                                            // Update the runner metrics for the received events. This will later
373                                            // be used in the Validators, as the "expected" case.
374                                            output_runner_metrics.received_bytes_total +=
375                                                byte_size as u64;
376
377                                            output_runner_metrics.received_events_total +=
378                                                events.len() as u64;
379
380                                            events.iter().for_each(|event| {
381                                                output_runner_metrics.received_event_bytes_total +=
382                                                    event.estimated_json_encoded_size_of().get()
383                                                        as u64;
384                                            });
385
386                                            output_tx
387                                                .send(events.to_vec())
388                                                .await
389                                                .expect("should not fail to send output event");
390                                        }
391                                    }
392                                    Ok(None) => {
393                                        if should_reject {
394                                            // This status code is not retried and should result in the component under test
395                                            // emitting error events
396                                            return StatusCode::BAD_REQUEST.into_response();
397                                        } else {
398                                            return StatusCode::OK.into_response();
399                                        }
400                                    }
401                                    Err(_) => {
402                                        error!(
403                                            "HTTP server failed to decode body: {:?}",
404                                            String::from_utf8_lossy(&body)
405                                        );
406                                        return StatusCode::INTERNAL_SERVER_ERROR.into_response();
407                                    }
408                                }
409                            }
410                        }
411                    }
412                }
413            },
414        );
415
416        // Now we'll create and spawn the resource's core logic loop which simply waits for the runner
417        // to instruct us to shutdown, and when that happens, cascades to shutting down the HTTP server.
418        let resource_started = self.task_coordinator.track_started();
419        let resource_completed = self.task_coordinator.track_completed();
420        let mut resource_shutdown_rx = self.task_coordinator.register_for_shutdown();
421
422        tokio::spawn(async move {
423            resource_started.mark_as_done();
424            info!("HTTP server external output resource started.");
425
426            // Wait for the runner to tell us to shutdown
427            resource_shutdown_rx.wait().await;
428
429            // signal the server to shutdown
430            let _ = http_server_shutdown_tx.send(());
431
432            // mark ourselves as done
433            resource_completed.mark_as_done();
434
435            info!("HTTP server external output resource completed.");
436        });
437
438        Ok(())
439    }
440
441    /// Spawns an HTTP client that pulls events by making requests to an HTTP server driven by a sink.
442    #[allow(clippy::missing_const_for_fn)]
443    fn spawn_output_http_client(&self, _config: HttpResourceConfig) {
444        // TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be
445        // scraped... but since we need special logic to aggregate/deduplicate scraped metrics, we can't
446        // use this generically for that purpose.
447        todo!()
448    }
449}
450
451fn spawn_http_server<H, F, R>(
452    task_coordinator: &TaskCoordinator<Configuring>,
453    config: &HttpResourceConfig,
454    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
455    handler: H,
456) -> (Arc<Notify>, oneshot::Sender<()>)
457where
458    H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
459    F: Future<Output = R> + Send,
460    R: IntoResponse,
461{
462    let http_server_started = task_coordinator.track_started();
463    let http_server_completed = task_coordinator.track_completed();
464
465    let listen_addr = socketaddr_from_uri(&config.uri);
466    let request_path = config
467        .uri
468        .path_and_query()
469        .map(|pq| pq.as_str().to_string())
470        .unwrap_or_else(|| "/".to_string());
471    let request_method = config.method.clone().unwrap_or(Method::POST);
472
473    // Create our synchronization primitives that are shared between the HTTP server and the
474    // resource's core logic loop.
475    //
476    // This will let the resource be able to trigger the HTTP server to gracefully shutdown, as well
477    // as be notified when the HTTP server has served a request, so that it can check if all input
478    // events have been sent yet.
479    let (http_server_shutdown_tx, http_server_shutdown_rx) = oneshot::channel();
480    let resource_notifier = Arc::new(Notify::new());
481    let server_notifier = Arc::clone(&resource_notifier);
482
483    let output_runner_metrics = Arc::clone(runner_metrics);
484
485    tokio::spawn(async move {
486        // Create our HTTP server by binding as early as possible to return an error if we can't
487        // actually bind.
488        let server_builder =
489            Server::try_bind(&listen_addr).expect("Failed to bind to listen address.");
490
491        // Create our router, which is a bit boilerplate-y because we take the HTTP method
492        // parametrically. We generate a handler that calls the given `handler` and then triggers
493        // the notifier shared by the HTTP server and the resource's core logic loop.
494        //
495        // Every time a request is processed, we notify the core logic loop so it can continue
496        // checking to see if it's time to fully close once all input events have been consumed and
497        // the input receiver is closed.
498        let method_filter = MethodFilter::try_from(request_method)
499            .expect("should not fail to convert method to method filter");
500        let method_router = MethodRouter::new()
501            .fallback(|req: Request<Body>| async move {
502                error!(
503                    path = req.uri().path(),
504                    method = req.method().as_str(),
505                    "Component sent request to a different path/method than expected."
506                );
507
508                StatusCode::METHOD_NOT_ALLOWED
509            })
510            .on(method_filter, move |request: Request<Body>| {
511                let request_handler = handler(request, output_runner_metrics);
512                let notifier = Arc::clone(&server_notifier);
513
514                async move {
515                    let response = request_handler.await;
516                    notifier.notify_one();
517                    response
518                }
519            });
520
521        let router = Router::new().route(&request_path, method_router).fallback(
522            |req: Request<Body>| async move {
523                error!(?req, "Component sent request the server could not route.");
524                StatusCode::NOT_FOUND
525            },
526        );
527
528        // Now actually run/drive the HTTP server and process requests until we're told to shutdown.
529        http_server_started.mark_as_done();
530
531        let server = server_builder
532            .serve(router.into_make_service())
533            .with_graceful_shutdown(async {
534                http_server_shutdown_rx.await.ok();
535            });
536
537        if let Err(e) = server.await {
538            error!(error = ?e, "HTTP server encountered an error.");
539        }
540
541        http_server_completed.mark_as_done();
542    });
543
544    (resource_notifier, http_server_shutdown_tx)
545}
546
547fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
548    let uri_port = uri.port_u16().unwrap_or(80);
549    let uri_host = uri
550        .host()
551        .ok_or_else(|| "host must be present in URI".to_string())
552        .and_then(|host| {
553            IpAddr::from_str(host)
554                .map_err(|_| "URI host must be valid IPv4/IPv6 address".to_string())
555        })
556        .expect("HTTP URI not valid");
557
558    SocketAddr::from((uri_host, uri_port))
559}