vector/sources/util/http/
prelude.rs

1use crate::common::http::{server_auth::HttpServerAuthConfig, ErrorMessage};
2use std::{collections::HashMap, convert::Infallible, fmt, net::SocketAddr, time::Duration};
3
4use bytes::Bytes;
5use futures::{FutureExt, TryFutureExt};
6use hyper::{service::make_service_fn, Server};
7use tokio::net::TcpStream;
8use tower::ServiceBuilder;
9use tracing::Span;
10use vector_lib::{
11    config::SourceAcknowledgementsConfig,
12    event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
13    EstimatedJsonEncodedSizeOf,
14};
15use warp::{
16    filters::{
17        path::{FullPath, Tail},
18        BoxedFilter,
19    },
20    http::{HeaderMap, StatusCode},
21    reject::Rejection,
22    Filter,
23};
24
25use crate::{
26    config::SourceContext,
27    http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer},
28    internal_events::{
29        HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError,
30    },
31    sources::util::http::HttpMethod,
32    tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig},
33    SourceSender,
34};
35
36use super::encoding::decode;
37
38pub trait HttpSource: Clone + Send + Sync + 'static {
39    // This function can be defined to enrich events with additional HTTP
40    // metadata. This function should be used rather than internal enrichment so
41    // that accurate byte count metrics can be emitted.
42    fn enrich_events(
43        &self,
44        _events: &mut [Event],
45        _request_path: &str,
46        _headers_config: &HeaderMap,
47        _query_parameters: &HashMap<String, String>,
48        _source_ip: Option<&SocketAddr>,
49    ) {
50    }
51
52    fn build_events(
53        &self,
54        body: Bytes,
55        header_map: &HeaderMap,
56        query_parameters: &HashMap<String, String>,
57        path: &str,
58    ) -> Result<Vec<Event>, ErrorMessage>;
59
60    fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
61        decode(encoding_header, body)
62    }
63
64    #[allow(clippy::too_many_arguments)]
65    fn run(
66        self,
67        address: SocketAddr,
68        path: &str,
69        method: HttpMethod,
70        response_code: StatusCode,
71        strict_path: bool,
72        tls: Option<&TlsEnableableConfig>,
73        auth: Option<&HttpServerAuthConfig>,
74        cx: SourceContext,
75        acknowledgements: SourceAcknowledgementsConfig,
76        keepalive_settings: KeepaliveConfig,
77    ) -> crate::Result<crate::sources::Source> {
78        let tls = MaybeTlsSettings::from_config(tls, true)?;
79        let protocol = tls.http_protocol_name();
80        let auth_matcher = auth.map(|a| a.build(&cx.enrichment_tables)).transpose()?;
81        let path = path.to_owned();
82        let acknowledgements = cx.do_acknowledgements(acknowledgements);
83        let enable_source_ip = self.enable_source_ip();
84
85        Ok(Box::pin(async move {
86            let mut filter: BoxedFilter<()> = match method {
87                HttpMethod::Head => warp::head().boxed(),
88                HttpMethod::Get => warp::get().boxed(),
89                HttpMethod::Put => warp::put().boxed(),
90                HttpMethod::Post => warp::post().boxed(),
91                HttpMethod::Patch => warp::patch().boxed(),
92                HttpMethod::Delete => warp::delete().boxed(),
93                HttpMethod::Options => warp::options().boxed(),
94            };
95
96            // https://github.com/rust-lang/rust-clippy/issues/8148
97            #[allow(clippy::unnecessary_to_owned)]
98            for s in path.split('/').filter(|&x| !x.is_empty()) {
99                filter = filter.and(warp::path(s.to_string())).boxed()
100            }
101            let svc = filter
102                .and(warp::path::tail())
103                .and_then(move |tail: Tail| async move {
104                    if !strict_path || tail.as_str().is_empty() {
105                        Ok(())
106                    } else {
107                        emit!(HttpInternalError {
108                            message: "Path not found."
109                        });
110                        Err(warp::reject::custom(ErrorMessage::new(
111                            StatusCode::NOT_FOUND,
112                            "Not found".to_string(),
113                        )))
114                    }
115                })
116                .untuple_one()
117                .and(warp::path::full())
118                .and(warp::header::optional::<String>("content-encoding"))
119                .and(warp::header::headers_cloned())
120                .and(warp::body::bytes())
121                .and(warp::query::<HashMap<String, String>>())
122                .and(warp::filters::ext::optional())
123                .and_then(
124                    move |path: FullPath,
125                          encoding_header: Option<String>,
126                          headers: HeaderMap,
127                          body: Bytes,
128                          query_parameters: HashMap<String, String>,
129                          addr: Option<PeerAddr>| {
130                        debug!(message = "Handling HTTP request.", headers = ?headers);
131                        let http_path = path.as_str();
132
133                        let events = auth_matcher
134                            .as_ref()
135                            .map_or(Ok(()), |a| {
136                                a.handle_auth(
137                                    addr.as_ref().map(|a| a.0).as_ref(),
138                                    &headers,
139                                    path.as_str(),
140                                )
141                            })
142                            .and_then(|()| self.decode(encoding_header.as_deref(), body))
143                            .and_then(|body| {
144                                emit!(HttpBytesReceived {
145                                    byte_size: body.len(),
146                                    http_path,
147                                    protocol,
148                                });
149                                self.build_events(body, &headers, &query_parameters, path.as_str())
150                            })
151                            .map(|mut events| {
152                                emit!(HttpEventsReceived {
153                                    count: events.len(),
154                                    byte_size: events.estimated_json_encoded_size_of(),
155                                    http_path,
156                                    protocol,
157                                });
158
159                                self.enrich_events(
160                                    &mut events,
161                                    path.as_str(),
162                                    &headers,
163                                    &query_parameters,
164                                    addr.and_then(|a| enable_source_ip.then_some(a))
165                                        .map(|PeerAddr(inner_addr)| inner_addr)
166                                        .as_ref(),
167                                );
168
169                                events
170                            });
171
172                        handle_request(events, acknowledgements, response_code, cx.out.clone())
173                    },
174                );
175
176            let ping = warp::get().and(warp::path("ping")).map(|| "pong");
177            let routes = svc.or(ping).recover(|r: Rejection| async move {
178                if let Some(e_msg) = r.find::<ErrorMessage>() {
179                    let json = warp::reply::json(e_msg);
180                    Ok(warp::reply::with_status(json, e_msg.status_code()))
181                } else {
182                    //other internal error - will return 500 internal server error
183                    emit!(HttpInternalError {
184                        message: &format!("Internal error: {r:?}")
185                    });
186                    Err(r)
187                }
188            });
189
190            let span = Span::current();
191            let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
192                let remote_addr = conn.peer_addr();
193                let svc = ServiceBuilder::new()
194                    .layer(build_http_trace_layer(span.clone()))
195                    .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
196                        MaxConnectionAgeLayer::new(
197                            Duration::from_secs(secs),
198                            keepalive_settings.max_connection_age_jitter_factor,
199                            remote_addr,
200                        )
201                    }))
202                    .map_request(move |mut request: hyper::Request<_>| {
203                        request.extensions_mut().insert(PeerAddr::new(remote_addr));
204
205                        request
206                    })
207                    .service(warp::service(routes.clone()));
208                futures_util::future::ok::<_, Infallible>(svc)
209            });
210
211            info!(message = "Building HTTP server.", address = %address);
212
213            let listener = tls.bind(&address).await.map_err(|err| {
214                error!("An error occurred: {:?}.", err);
215            })?;
216
217            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
218                .serve(make_svc)
219                .with_graceful_shutdown(cx.shutdown.map(|_| ()))
220                .await
221                .map_err(|err| {
222                    error!("An error occurred: {:?}.", err);
223                })?;
224
225            Ok(())
226        }))
227    }
228
229    fn enable_source_ip(&self) -> bool {
230        false
231    }
232}
233
234#[derive(Clone)]
235#[repr(transparent)]
236struct PeerAddr(SocketAddr);
237
238impl PeerAddr {
239    const fn new(addr: SocketAddr) -> Self {
240        Self(addr)
241    }
242}
243
244struct RejectShuttingDown;
245
246impl fmt::Debug for RejectShuttingDown {
247    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248        f.write_str("shutting down")
249    }
250}
251
252impl warp::reject::Reject for RejectShuttingDown {}
253
254async fn handle_request(
255    events: Result<Vec<Event>, ErrorMessage>,
256    acknowledgements: bool,
257    response_code: StatusCode,
258    mut out: SourceSender,
259) -> Result<impl warp::Reply, Rejection> {
260    match events {
261        Ok(mut events) => {
262            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
263
264            let count = events.len();
265            out.send_batch(events)
266                .map_err(|_| {
267                    // can only fail if receiving end disconnected, so we are shutting down,
268                    // probably not gracefully.
269                    emit!(StreamClosedError { count });
270                    warp::reject::custom(RejectShuttingDown)
271                })
272                .and_then(|_| handle_batch_status(response_code, receiver))
273                .await
274        }
275        Err(error) => {
276            emit!(HttpBadRequest::new(error.code(), error.message()));
277            Err(warp::reject::custom(error))
278        }
279    }
280}
281
282async fn handle_batch_status(
283    success_response_code: StatusCode,
284    receiver: Option<BatchStatusReceiver>,
285) -> Result<impl warp::Reply, Rejection> {
286    match receiver {
287        None => Ok(success_response_code),
288        Some(receiver) => match receiver.await {
289            BatchStatus::Delivered => Ok(success_response_code),
290            BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
291                StatusCode::INTERNAL_SERVER_ERROR,
292                "Error delivering contents to sink".into(),
293            ))),
294            BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
295                StatusCode::BAD_REQUEST,
296                "Contents failed to deliver to sink".into(),
297            ))),
298        },
299    }
300}