vector/components/validation/resources/
http.rs

1use std::{
2    collections::{HashMap, VecDeque},
3    future::Future,
4    net::{IpAddr, SocketAddr},
5    str::FromStr,
6    sync::Arc,
7};
8
9use axum::{
10    response::IntoResponse,
11    routing::{MethodFilter, MethodRouter},
12    Router,
13};
14use bytes::{BufMut as _, BytesMut};
15use http::{Method, Request, StatusCode, Uri};
16use hyper::{Body, Client, Server};
17use tokio::{
18    select,
19    sync::{mpsc, oneshot, Mutex, Notify},
20};
21use tokio_util::codec::Decoder;
22
23use crate::components::validation::{
24    sync::{Configuring, TaskCoordinator},
25    RunnerMetrics,
26};
27use vector_lib::{
28    codecs::encoding::Framer, codecs::encoding::Serializer::Json,
29    codecs::CharacterDelimitedEncoder, config::LogNamespace, event::Event,
30    EstimatedJsonEncodedSizeOf,
31};
32
33use super::{encode_test_event, ResourceCodec, ResourceDirection, TestEvent};
34
35/// An HTTP resource.
36#[derive(Clone)]
37pub struct HttpResourceConfig {
38    uri: Uri,
39    method: Option<Method>,
40    headers: Option<HashMap<String, String>>,
41}
42
43impl HttpResourceConfig {
44    pub const fn from_parts(uri: Uri, method: Option<Method>) -> Self {
45        Self {
46            uri,
47            method,
48            headers: None,
49        }
50    }
51
52    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
53        self.headers = Some(headers);
54        self
55    }
56
57    pub fn spawn_as_input(
58        self,
59        direction: ResourceDirection,
60        codec: ResourceCodec,
61        input_rx: mpsc::Receiver<TestEvent>,
62        task_coordinator: &TaskCoordinator<Configuring>,
63        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
64    ) {
65        match direction {
66            // The source will pull data from us.
67            ResourceDirection::Pull => {
68                spawn_input_http_server(self, codec, input_rx, task_coordinator, runner_metrics)
69            }
70            // We'll push data to the source.
71            ResourceDirection::Push => {
72                spawn_input_http_client(self, codec, input_rx, task_coordinator, runner_metrics)
73            }
74        }
75    }
76
77    pub fn spawn_as_output(self, ctx: HttpResourceOutputContext) -> vector_lib::Result<()> {
78        match ctx.direction {
79            // We'll pull data from the sink.
80            ResourceDirection::Pull => Ok(ctx.spawn_output_http_client(self)),
81            // The sink will push data to us.
82            ResourceDirection::Push => ctx.spawn_output_http_server(self),
83        }
84    }
85}
86
87/// Spawns an HTTP server that a source will make requests to in order to get events.
88#[allow(clippy::missing_const_for_fn)]
89fn spawn_input_http_server(
90    config: HttpResourceConfig,
91    codec: ResourceCodec,
92    mut input_rx: mpsc::Receiver<TestEvent>,
93    task_coordinator: &TaskCoordinator<Configuring>,
94    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
95) {
96    // This HTTP server will poll the input receiver for input events and buffer them. When a
97    // request comes in on the right path/method, one buffered input event will be sent back. If no
98    // buffered events are available when the request arrives, an empty response (204 No Content) is
99    // returned to the caller.
100    let outstanding_events = Arc::new(Mutex::new(VecDeque::new()));
101
102    // First, we'll build and spawn our HTTP server.
103    let encoder = codec.into_encoder();
104    let sendable_events = Arc::clone(&outstanding_events);
105
106    let (resource_notifier, http_server_shutdown_tx) = spawn_http_server(
107        task_coordinator,
108        &config,
109        runner_metrics,
110        move |_request, _runner_metrics| {
111            let sendable_events = Arc::clone(&sendable_events);
112            let mut encoder = encoder.clone();
113
114            async move {
115                let mut sendable_events = sendable_events.lock().await;
116                match sendable_events.pop_front() {
117                    Some(event) => {
118                        let mut buffer = BytesMut::new();
119                        encode_test_event(&mut encoder, &mut buffer, event);
120
121                        buffer.into_response()
122                    }
123                    _ => {
124                        // We'll send an empty 200 in the response since some
125                        // sources throw errors for anything other than a valid
126                        // response.
127                        StatusCode::OK.into_response()
128                    }
129                }
130            }
131        },
132    );
133
134    // Now we'll create and spawn the resource's core logic loop which drives the buffering of input
135    // events and working with the HTTP server as they're consumed.
136    let resource_started = task_coordinator.track_started();
137    let resource_completed = task_coordinator.track_completed();
138    let mut resource_shutdown_rx = task_coordinator.register_for_shutdown();
139
140    tokio::spawn(async move {
141        resource_started.mark_as_done();
142        info!("HTTP server external input resource started.");
143
144        let mut input_finished = false;
145
146        loop {
147            select! {
148                // Handle input events being sent to us from the runner.
149                //
150                // When the channel closes, we'll mark the input as being finished so that we know
151                // to close the external resource itself once the HTTP server has consumed/sent all
152                // outstanding events.
153                maybe_event = input_rx.recv(), if !input_finished => match maybe_event {
154                    Some(event) => {
155                        let mut outstanding_events = outstanding_events.lock().await;
156                        outstanding_events.push_back(event);
157                    },
158                    None => {
159                        info!("HTTP server external input resource input is finished.");
160                        input_finished = true;
161                    },
162                },
163
164                _ = resource_notifier.notified() => {
165                    // The HTTP server notified us that it made progress with a send, which is
166                    // specifically that it serviced a request which returned a non-zero number of
167                    // events.
168                    //
169                    // This indicates that we need to check and see if our input is completed --
170                    // channel closed, no outstanding events left -- and thus if it's time to close.
171                    if input_finished {
172                        let outstanding_events = outstanding_events.lock().await;
173                        if outstanding_events.is_empty() {
174                            break
175                        }
176                    }
177                },
178            }
179        }
180        // Mark ourselves as completed now that we've sent all inputs to the source, and
181        // additionally signal the HTTP server to also gracefully shutdown.
182        info!("HTTP server external input resource signalling ready for shutdown.");
183
184        // Wait for the runner to signal us to shutdown
185        resource_shutdown_rx.wait().await;
186
187        // Shutdown the server
188        _ = http_server_shutdown_tx.send(());
189
190        info!("HTTP server external input resource marking as done.");
191        resource_completed.mark_as_done();
192
193        info!("HTTP server external input resource completed.");
194    });
195}
196
197/// Spawns an HTTP client that pushes events to a source which is accepting events over HTTP.
198fn spawn_input_http_client(
199    config: HttpResourceConfig,
200    codec: ResourceCodec,
201    mut input_rx: mpsc::Receiver<TestEvent>,
202    task_coordinator: &TaskCoordinator<Configuring>,
203    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
204) {
205    // Spin up an HTTP client that will push the input data to the source on a
206    // request-per-input-item basis. This runs serially and has no parallelism.
207    let started = task_coordinator.track_started();
208    let completed = task_coordinator.track_completed();
209    let mut encoder = codec.into_encoder();
210    let runner_metrics = Arc::clone(runner_metrics);
211
212    tokio::spawn(async move {
213        // Mark ourselves as started. We don't actually do anything until we get our first input
214        // message, though.
215        started.mark_as_done();
216        info!("HTTP client external input resource started.");
217
218        let client = Client::builder().build_http::<Body>();
219        let request_uri = config.uri;
220        let request_method = config.method.unwrap_or(Method::POST);
221        let headers = config.headers.unwrap_or_default();
222
223        while let Some(event) = input_rx.recv().await {
224            debug!("Got event to send from runner.");
225
226            let mut buffer = BytesMut::new();
227
228            let is_json = matches!(encoder.serializer(), Json(_))
229                && matches!(
230                    encoder.framer(),
231                    Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })
232                );
233
234            if is_json {
235                buffer.put_u8(b'[');
236            }
237
238            encode_test_event(&mut encoder, &mut buffer, event);
239
240            if is_json {
241                if !buffer.is_empty() {
242                    // remove trailing comma from last record
243                    buffer.truncate(buffer.len() - 1);
244                }
245                buffer.put_u8(b']');
246
247                // in this edge case we have removed the trailing comma (one byte) and added
248                // opening and closing braces (2 bytes) for a net add of one byte.
249                let mut runner_metrics = runner_metrics.lock().await;
250                runner_metrics.sent_bytes_total += 1;
251            }
252
253            let mut request_builder = Request::builder()
254                .uri(request_uri.clone())
255                .method(request_method.clone());
256
257            for (key, value) in &headers {
258                request_builder = request_builder.header(key, value);
259            }
260
261            let request = request_builder
262                .body(buffer.freeze().into())
263                .expect("should not fail to build request");
264
265            match client.request(request).await {
266                Ok(_response) => {
267                    // TODO: Emit metric that tracks a successful response from the HTTP server.
268                    debug!("Got response from server.");
269                }
270                Err(e) => {
271                    // TODO: Emit metric that tracks a failed response from the HTTP server.
272                    error!("Failed to send request: {}", e);
273                }
274            }
275        }
276
277        // Mark ourselves as completed now that we've sent all inputs to the source.
278        completed.mark_as_done();
279
280        info!("HTTP client external input resource completed.");
281    });
282}
283
284/// Anything that the output side HTTP external resource needs
285pub struct HttpResourceOutputContext<'a> {
286    pub direction: ResourceDirection,
287    pub codec: ResourceCodec,
288    pub output_tx: mpsc::Sender<Vec<Event>>,
289    pub task_coordinator: &'a TaskCoordinator<Configuring>,
290    pub input_events: Vec<TestEvent>,
291    pub runner_metrics: &'a Arc<Mutex<RunnerMetrics>>,
292    pub log_namespace: LogNamespace,
293}
294
295impl HttpResourceOutputContext<'_> {
296    /// Spawns an HTTP server that accepts events sent by a sink.
297    #[allow(clippy::missing_const_for_fn)]
298    fn spawn_output_http_server(&self, config: HttpResourceConfig) -> vector_lib::Result<()> {
299        // This HTTP server will wait for events to be sent by a sink, and collect them and send them on
300        // via an output sender. We accept/collect events until we're told to shutdown.
301
302        // First, we'll build and spawn our HTTP server.
303        let decoder = self.codec.into_decoder(self.log_namespace)?;
304
305        // Note that we currently don't differentiate which events should and shouldn't be rejected-
306        // we reject all events in this server if any are marked for rejection.
307        // In the future it might be useful to be able to select which to reject. That will involve
308        // adding logic to the test case which is passed down here, and to the event itself. Since
309        // we can't guarantee the order of events, we'd need a way to flag which ones need to be
310        // rejected.
311        let should_reject = self
312            .input_events
313            .iter()
314            .filter(|te| te.should_reject())
315            .count()
316            > 0;
317
318        let output_tx = self.output_tx.clone();
319        let (_, http_server_shutdown_tx) = spawn_http_server(
320            self.task_coordinator,
321            &config,
322            self.runner_metrics,
323            move |request, output_runner_metrics| {
324                let output_tx = output_tx.clone();
325                let mut decoder = decoder.clone();
326
327                async move {
328                    match hyper::body::to_bytes(request.into_body()).await {
329                        Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
330                        Ok(body) => {
331                            let byte_size = body.len();
332                            let mut body = BytesMut::from(&body[..]);
333                            loop {
334                                match decoder.decode_eof(&mut body) {
335                                    // `decoded_byte_size` is the decoded size of an individual frame. `byte_size` represents the size of the
336                                    // entire payload which may contain multiple frames and their delimiters.
337                                    Ok(Some((events, decoded_byte_size))) => {
338                                        if should_reject {
339                                            info!(internal_log_rate_limit = true, "HTTP server external output resource decoded {decoded_byte_size:?} bytes but test case configured to reject.", );
340                                        } else {
341                                            let mut output_runner_metrics =
342                                                output_runner_metrics.lock().await;
343                                            info!(internal_log_rate_limit = true, "HTTP server external output resource decoded {decoded_byte_size:?} bytes.");
344
345                                            // Update the runner metrics for the received events. This will later
346                                            // be used in the Validators, as the "expected" case.
347                                            output_runner_metrics.received_bytes_total +=
348                                                byte_size as u64;
349
350                                            output_runner_metrics.received_events_total +=
351                                                events.len() as u64;
352
353                                            events.iter().for_each(|event| {
354                                                output_runner_metrics.received_event_bytes_total +=
355                                                    event.estimated_json_encoded_size_of().get()
356                                                        as u64;
357                                            });
358
359                                            output_tx
360                                                .send(events.to_vec())
361                                                .await
362                                                .expect("should not fail to send output event");
363                                        }
364                                    }
365                                    Ok(None) => {
366                                        if should_reject {
367                                            // This status code is not retried and should result in the component under test
368                                            // emitting error events
369                                            return StatusCode::BAD_REQUEST.into_response();
370                                        } else {
371                                            return StatusCode::OK.into_response();
372                                        }
373                                    }
374                                    Err(_) => {
375                                        error!(
376                                            "HTTP server failed to decode {:?}",
377                                            String::from_utf8_lossy(&body)
378                                        );
379                                        return StatusCode::INTERNAL_SERVER_ERROR.into_response();
380                                    }
381                                }
382                            }
383                        }
384                    }
385                }
386            },
387        );
388
389        // Now we'll create and spawn the resource's core logic loop which simply waits for the runner
390        // to instruct us to shutdown, and when that happens, cascades to shutting down the HTTP server.
391        let resource_started = self.task_coordinator.track_started();
392        let resource_completed = self.task_coordinator.track_completed();
393        let mut resource_shutdown_rx = self.task_coordinator.register_for_shutdown();
394
395        tokio::spawn(async move {
396            resource_started.mark_as_done();
397            info!("HTTP server external output resource started.");
398
399            // Wait for the runner to tell us to shutdown
400            resource_shutdown_rx.wait().await;
401
402            // signal the server to shutdown
403            let _ = http_server_shutdown_tx.send(());
404
405            // mark ourselves as done
406            resource_completed.mark_as_done();
407
408            info!("HTTP server external output resource completed.");
409        });
410
411        Ok(())
412    }
413
414    /// Spawns an HTTP client that pulls events by making requests to an HTTP server driven by a sink.
415    #[allow(clippy::missing_const_for_fn)]
416    fn spawn_output_http_client(&self, _config: HttpResourceConfig) {
417        // TODO: The `prometheus_exporter` sink is the only sink that exposes an HTTP server which must be
418        // scraped... but since we need special logic to aggregate/deduplicate scraped metrics, we can't
419        // use this generically for that purpose.
420        todo!()
421    }
422}
423
424fn spawn_http_server<H, F, R>(
425    task_coordinator: &TaskCoordinator<Configuring>,
426    config: &HttpResourceConfig,
427    runner_metrics: &Arc<Mutex<RunnerMetrics>>,
428    handler: H,
429) -> (Arc<Notify>, oneshot::Sender<()>)
430where
431    H: Fn(Request<Body>, Arc<Mutex<RunnerMetrics>>) -> F + Clone + Send + 'static,
432    F: Future<Output = R> + Send,
433    R: IntoResponse,
434{
435    let http_server_started = task_coordinator.track_started();
436    let http_server_completed = task_coordinator.track_completed();
437
438    let listen_addr = socketaddr_from_uri(&config.uri);
439    let request_path = config
440        .uri
441        .path_and_query()
442        .map(|pq| pq.as_str().to_string())
443        .unwrap_or_else(|| "/".to_string());
444    let request_method = config.method.clone().unwrap_or(Method::POST);
445
446    // Create our synchronization primitives that are shared between the HTTP server and the
447    // resource's core logic loop.
448    //
449    // This will let the resource be able to trigger the HTTP server to gracefully shutdown, as well
450    // as be notified when the HTTP server has served a request, so that it can check if all input
451    // events have been sent yet.
452    let (http_server_shutdown_tx, http_server_shutdown_rx) = oneshot::channel();
453    let resource_notifier = Arc::new(Notify::new());
454    let server_notifier = Arc::clone(&resource_notifier);
455
456    let output_runner_metrics = Arc::clone(runner_metrics);
457
458    tokio::spawn(async move {
459        // Create our HTTP server by binding as early as possible to return an error if we can't
460        // actually bind.
461        let server_builder =
462            Server::try_bind(&listen_addr).expect("Failed to bind to listen address.");
463
464        // Create our router, which is a bit boilerplate-y because we take the HTTP method
465        // parametrically. We generate a handler that calls the given `handler` and then triggers
466        // the notifier shared by the HTTP server and the resource's core logic loop.
467        //
468        // Every time a request is processed, we notify the core logic loop so it can continue
469        // checking to see if it's time to fully close once all input events have been consumed and
470        // the input receiver is closed.
471        let method_filter = MethodFilter::try_from(request_method)
472            .expect("should not fail to convert method to method filter");
473        let method_router = MethodRouter::new()
474            .fallback(|req: Request<Body>| async move {
475                error!(
476                    path = req.uri().path(),
477                    method = req.method().as_str(),
478                    "Component sent request to a different path/method than expected."
479                );
480
481                StatusCode::METHOD_NOT_ALLOWED
482            })
483            .on(method_filter, move |request: Request<Body>| {
484                let request_handler = handler(request, output_runner_metrics);
485                let notifier = Arc::clone(&server_notifier);
486
487                async move {
488                    let response = request_handler.await;
489                    notifier.notify_one();
490                    response
491                }
492            });
493
494        let router = Router::new().route(&request_path, method_router).fallback(
495            |req: Request<Body>| async move {
496                error!(?req, "Component sent request the server could not route.");
497                StatusCode::NOT_FOUND
498            },
499        );
500
501        // Now actually run/drive the HTTP server and process requests until we're told to shutdown.
502        http_server_started.mark_as_done();
503
504        let server = server_builder
505            .serve(router.into_make_service())
506            .with_graceful_shutdown(async {
507                http_server_shutdown_rx.await.ok();
508            });
509
510        if let Err(e) = server.await {
511            error!(error = ?e, "HTTP server encountered an error.");
512        }
513
514        http_server_completed.mark_as_done();
515    });
516
517    (resource_notifier, http_server_shutdown_tx)
518}
519
520fn socketaddr_from_uri(uri: &Uri) -> SocketAddr {
521    let uri_port = uri.port_u16().unwrap_or(80);
522    let uri_host = uri
523        .host()
524        .ok_or_else(|| "host must be present in URI".to_string())
525        .and_then(|host| {
526            IpAddr::from_str(host)
527                .map_err(|_| "URI host must be valid IPv4/IPv6 address".to_string())
528        })
529        .expect("HTTP URI not valid");
530
531    SocketAddr::from((uri_host, uri_port))
532}