vector/sources/util/http/
prelude.rs

1use std::{collections::HashMap, convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, TryFutureExt};
5use hyper::{Server, service::make_service_fn};
6use tokio::net::TcpStream;
7use tower::ServiceBuilder;
8use tracing::Span;
9use vector_lib::{
10    EstimatedJsonEncodedSizeOf,
11    config::SourceAcknowledgementsConfig,
12    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
13};
14use warp::{
15    Filter,
16    filters::{
17        BoxedFilter,
18        path::{FullPath, Tail},
19    },
20    http::{HeaderMap, StatusCode},
21    reject::Rejection,
22};
23
24use super::encoding::decode;
25use crate::{
26    SourceSender,
27    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28    config::SourceContext,
29    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
30    internal_events::{
31        HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError,
32    },
33    sources::util::http::HttpMethod,
34    tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig},
35};
36
37pub trait HttpSource: Clone + Send + Sync + 'static {
38    // This function can be defined to enrich events with additional HTTP
39    // metadata. This function should be used rather than internal enrichment so
40    // that accurate byte count metrics can be emitted.
41    fn enrich_events(
42        &self,
43        _events: &mut [Event],
44        _request_path: &str,
45        _headers_config: &HeaderMap,
46        _query_parameters: &HashMap<String, String>,
47        _source_ip: Option<&SocketAddr>,
48    ) {
49    }
50
51    fn build_events(
52        &self,
53        body: Bytes,
54        header_map: &HeaderMap,
55        query_parameters: &HashMap<String, String>,
56        path: &str,
57    ) -> Result<Vec<Event>, ErrorMessage>;
58
59    fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
60        decode(encoding_header, body)
61    }
62
63    #[allow(clippy::too_many_arguments)]
64    fn run(
65        self,
66        address: SocketAddr,
67        path: &str,
68        method: HttpMethod,
69        response_code: StatusCode,
70        strict_path: bool,
71        tls: Option<&TlsEnableableConfig>,
72        auth: Option<&HttpServerAuthConfig>,
73        cx: SourceContext,
74        acknowledgements: SourceAcknowledgementsConfig,
75        keepalive_settings: KeepaliveConfig,
76    ) -> crate::Result<crate::sources::Source> {
77        let tls = MaybeTlsSettings::from_config(tls, true)?;
78        let protocol = tls.http_protocol_name();
79        let auth_matcher = auth.map(|a| a.build(&cx.enrichment_tables)).transpose()?;
80        let path = path.to_owned();
81        let acknowledgements = cx.do_acknowledgements(acknowledgements);
82        let enable_source_ip = self.enable_source_ip();
83
84        Ok(Box::pin(async move {
85            let mut filter: BoxedFilter<()> = match method {
86                HttpMethod::Head => warp::head().boxed(),
87                HttpMethod::Get => warp::get().boxed(),
88                HttpMethod::Put => warp::put().boxed(),
89                HttpMethod::Post => warp::post().boxed(),
90                HttpMethod::Patch => warp::patch().boxed(),
91                HttpMethod::Delete => warp::delete().boxed(),
92                HttpMethod::Options => warp::options().boxed(),
93            };
94
95            // https://github.com/rust-lang/rust-clippy/issues/8148
96            #[allow(clippy::unnecessary_to_owned)]
97            for s in path.split('/').filter(|&x| !x.is_empty()) {
98                filter = filter.and(warp::path(s.to_string())).boxed()
99            }
100            let svc = filter
101                .and(warp::path::tail())
102                .and_then(move |tail: Tail| async move {
103                    if !strict_path || tail.as_str().is_empty() {
104                        Ok(())
105                    } else {
106                        emit!(HttpInternalError {
107                            message: "Path not found."
108                        });
109                        Err(warp::reject::custom(ErrorMessage::new(
110                            StatusCode::NOT_FOUND,
111                            "Not found".to_string(),
112                        )))
113                    }
114                })
115                .untuple_one()
116                .and(warp::path::full())
117                .and(warp::header::optional::<String>("content-encoding"))
118                .and(warp::header::headers_cloned())
119                .and(warp::body::bytes())
120                .and(warp::query::<HashMap<String, String>>())
121                .and(warp::filters::ext::optional())
122                .and_then(
123                    move |path: FullPath,
124                          encoding_header: Option<String>,
125                          headers: HeaderMap,
126                          body: Bytes,
127                          query_parameters: HashMap<String, String>,
128                          addr: Option<PeerAddr>| {
129                        debug!(message = "Handling HTTP request.", headers = ?headers);
130                        let http_path = path.as_str();
131                        let events = auth_matcher
132                            .as_ref()
133                            .map_or(Ok(()), |a| {
134                                a.handle_auth(
135                                    addr.as_ref().map(|a| a.0).as_ref(),
136                                    &headers,
137                                    path.as_str(),
138                                )
139                            })
140                            .and_then(|()| self.decode(encoding_header.as_deref(), body))
141                            .and_then(|body| {
142                                emit!(HttpBytesReceived {
143                                    byte_size: body.len(),
144                                    http_path,
145                                    protocol,
146                                });
147                                self.build_events(body, &headers, &query_parameters, path.as_str())
148                            })
149                            .map(|mut events| {
150                                emit!(HttpEventsReceived {
151                                    count: events.len(),
152                                    byte_size: events.estimated_json_encoded_size_of(),
153                                    http_path,
154                                    protocol,
155                                });
156
157                                self.enrich_events(
158                                    &mut events,
159                                    path.as_str(),
160                                    &headers,
161                                    &query_parameters,
162                                    addr.and_then(|a| enable_source_ip.then_some(a))
163                                        .map(|PeerAddr(inner_addr)| inner_addr)
164                                        .as_ref(),
165                                );
166
167                                events
168                            });
169
170                        handle_request(events, acknowledgements, response_code, cx.out.clone())
171                    },
172                );
173
174            let ping = warp::get().and(warp::path("ping")).map(|| "pong");
175            let routes = svc.or(ping).recover(|r: Rejection| async move {
176                if let Some(e_msg) = r.find::<ErrorMessage>() {
177                    let json = warp::reply::json(e_msg);
178                    Ok(warp::reply::with_status(json, e_msg.status_code()))
179                } else {
180                    //other internal error - will return 500 internal server error
181                    emit!(HttpInternalError {
182                        message: &format!("Internal error: {r:?}")
183                    });
184                    Err(r)
185                }
186            });
187
188            let span = Span::current();
189            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
190                let remote_addr = conn.peer_addr();
191                let svc = ServiceBuilder::new()
192                    .layer(build_http_trace_layer(span.clone()))
193                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
194                        MaxConnectionAgeLayer::new(
195                            Duration::from_secs(secs),
196                            keepalive_settings.max_connection_age_jitter_factor,
197                            remote_addr,
198                        )
199                    }))
200                    .map_request(move |mut request: hyper::Request<_>| {
201                        request.extensions_mut().insert(PeerAddr::new(remote_addr));
202
203                        request
204                    })
205                    .service(warp::service(routes.clone()));
206                futures_util::future::ok::<_, Infallible>(svc)
207            });
208
209            info!(message = "Building HTTP server.", address = %address);
210
211            let listener = tls.bind(&address).await.map_err(|err| {
212                error!("An error occurred: {:?}.", err);
213            })?;
214
215            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
216                .serve(make_svc)
217                .with_graceful_shutdown(cx.shutdown.map(|_| ()))
218                .await
219                .map_err(|err| {
220                    error!("An error occurred: {:?}.", err);
221                })?;
222
223            Ok(())
224        }))
225    }
226
227    fn enable_source_ip(&self) -> bool {
228        false
229    }
230}
231
232#[derive(Clone)]
233#[repr(transparent)]
234struct PeerAddr(SocketAddr);
235
236impl PeerAddr {
237    const fn new(addr: SocketAddr) -> Self {
238        Self(addr)
239    }
240}
241
242struct RejectShuttingDown;
243
244impl fmt::Debug for RejectShuttingDown {
245    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246        f.write_str("shutting down")
247    }
248}
249
250impl warp::reject::Reject for RejectShuttingDown {}
251
252async fn handle_request(
253    events: Result<Vec<Event>, ErrorMessage>,
254    acknowledgements: bool,
255    response_code: StatusCode,
256    mut out: SourceSender,
257) -> Result<impl warp::Reply, Rejection> {
258    match events {
259        Ok(mut events) => {
260            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
261
262            let count = events.len();
263            out.send_batch(events)
264                .map_err(|_| {
265                    // can only fail if receiving end disconnected, so we are shutting down,
266                    // probably not gracefully.
267                    emit!(StreamClosedError { count });
268                    warp::reject::custom(RejectShuttingDown)
269                })
270                .and_then(|_| handle_batch_status(response_code, receiver))
271                .await
272        }
273        Err(error) => {
274            emit!(HttpBadRequest::new(error.code(), error.message()));
275            Err(warp::reject::custom(error))
276        }
277    }
278}
279
280async fn handle_batch_status(
281    success_response_code: StatusCode,
282    receiver: Option<BatchStatusReceiver>,
283) -> Result<impl warp::Reply, Rejection> {
284    match receiver {
285        None => Ok(success_response_code),
286        Some(receiver) => match receiver.await {
287            BatchStatus::Delivered => Ok(success_response_code),
288            BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
289                StatusCode::INTERNAL_SERVER_ERROR,
290                "Error delivering contents to sink".into(),
291            ))),
292            BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
293                StatusCode::BAD_REQUEST,
294                "Contents failed to deliver to sink".into(),
295            ))),
296        },
297    }
298}