vector/sources/util/http/
prelude.rs1use std::{collections::HashMap, convert::Infallible, fmt, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures::{FutureExt, TryFutureExt};
5use hyper::{Server, service::make_service_fn};
6use tokio::net::TcpStream;
7use tower::ServiceBuilder;
8use tracing::Span;
9use vector_lib::{
10 EstimatedJsonEncodedSizeOf,
11 config::SourceAcknowledgementsConfig,
12 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
13};
14use warp::{
15 Filter,
16 filters::{
17 BoxedFilter,
18 path::{FullPath, Tail},
19 },
20 http::{HeaderMap, StatusCode},
21 reject::Rejection,
22};
23
24use super::encoding::decode;
25use crate::{
26 SourceSender,
27 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28 config::SourceContext,
29 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
30 internal_events::{
31 HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError,
32 },
33 sources::util::http::HttpMethod,
34 tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig},
35};
36
37pub trait HttpSource: Clone + Send + Sync + 'static {
38 fn enrich_events(
42 &self,
43 _events: &mut [Event],
44 _request_path: &str,
45 _headers_config: &HeaderMap,
46 _query_parameters: &HashMap<String, String>,
47 _source_ip: Option<&SocketAddr>,
48 ) {
49 }
50
51 fn build_events(
52 &self,
53 body: Bytes,
54 header_map: &HeaderMap,
55 query_parameters: &HashMap<String, String>,
56 path: &str,
57 ) -> Result<Vec<Event>, ErrorMessage>;
58
59 fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
60 decode(encoding_header, body)
61 }
62
63 #[allow(clippy::too_many_arguments)]
64 fn run(
65 self,
66 address: SocketAddr,
67 path: &str,
68 method: HttpMethod,
69 response_code: StatusCode,
70 strict_path: bool,
71 tls: Option<&TlsEnableableConfig>,
72 auth: Option<&HttpServerAuthConfig>,
73 cx: SourceContext,
74 acknowledgements: SourceAcknowledgementsConfig,
75 keepalive_settings: KeepaliveConfig,
76 ) -> crate::Result<crate::sources::Source> {
77 let tls = MaybeTlsSettings::from_config(tls, true)?;
78 let protocol = tls.http_protocol_name();
79 let auth_matcher = auth.map(|a| a.build(&cx.enrichment_tables)).transpose()?;
80 let path = path.to_owned();
81 let acknowledgements = cx.do_acknowledgements(acknowledgements);
82 let enable_source_ip = self.enable_source_ip();
83
84 Ok(Box::pin(async move {
85 let mut filter: BoxedFilter<()> = match method {
86 HttpMethod::Head => warp::head().boxed(),
87 HttpMethod::Get => warp::get().boxed(),
88 HttpMethod::Put => warp::put().boxed(),
89 HttpMethod::Post => warp::post().boxed(),
90 HttpMethod::Patch => warp::patch().boxed(),
91 HttpMethod::Delete => warp::delete().boxed(),
92 HttpMethod::Options => warp::options().boxed(),
93 };
94
95 #[allow(clippy::unnecessary_to_owned)]
97 for s in path.split('/').filter(|&x| !x.is_empty()) {
98 filter = filter.and(warp::path(s.to_string())).boxed()
99 }
100 let svc = filter
101 .and(warp::path::tail())
102 .and_then(move |tail: Tail| async move {
103 if !strict_path || tail.as_str().is_empty() {
104 Ok(())
105 } else {
106 emit!(HttpInternalError {
107 message: "Path not found."
108 });
109 Err(warp::reject::custom(ErrorMessage::new(
110 StatusCode::NOT_FOUND,
111 "Not found".to_string(),
112 )))
113 }
114 })
115 .untuple_one()
116 .and(warp::path::full())
117 .and(warp::header::optional::<String>("content-encoding"))
118 .and(warp::header::headers_cloned())
119 .and(warp::body::bytes())
120 .and(warp::query::<HashMap<String, String>>())
121 .and(warp::filters::ext::optional())
122 .and_then(
123 move |path: FullPath,
124 encoding_header: Option<String>,
125 headers: HeaderMap,
126 body: Bytes,
127 query_parameters: HashMap<String, String>,
128 addr: Option<PeerAddr>| {
129 debug!(message = "Handling HTTP request.", headers = ?headers);
130 let http_path = path.as_str();
131 let events = auth_matcher
132 .as_ref()
133 .map_or(Ok(()), |a| {
134 a.handle_auth(
135 addr.as_ref().map(|a| a.0).as_ref(),
136 &headers,
137 path.as_str(),
138 )
139 })
140 .and_then(|()| self.decode(encoding_header.as_deref(), body))
141 .and_then(|body| {
142 emit!(HttpBytesReceived {
143 byte_size: body.len(),
144 http_path,
145 protocol,
146 });
147 self.build_events(body, &headers, &query_parameters, path.as_str())
148 })
149 .map(|mut events| {
150 emit!(HttpEventsReceived {
151 count: events.len(),
152 byte_size: events.estimated_json_encoded_size_of(),
153 http_path,
154 protocol,
155 });
156
157 self.enrich_events(
158 &mut events,
159 path.as_str(),
160 &headers,
161 &query_parameters,
162 addr.and_then(|a| enable_source_ip.then_some(a))
163 .map(|PeerAddr(inner_addr)| inner_addr)
164 .as_ref(),
165 );
166
167 events
168 });
169
170 handle_request(events, acknowledgements, response_code, cx.out.clone())
171 },
172 );
173
174 let ping = warp::get().and(warp::path("ping")).map(|| "pong");
175 let routes = svc.or(ping).recover(|r: Rejection| async move {
176 if let Some(e_msg) = r.find::<ErrorMessage>() {
177 let json = warp::reply::json(e_msg);
178 Ok(warp::reply::with_status(json, e_msg.status_code()))
179 } else {
180 emit!(HttpInternalError {
182 message: &format!("Internal error: {r:?}")
183 });
184 Err(r)
185 }
186 });
187
188 let span = Span::current();
189 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
190 let remote_addr = conn.peer_addr();
191 let svc = ServiceBuilder::new()
192 .layer(build_http_trace_layer(span.clone()))
193 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
194 MaxConnectionAgeLayer::new(
195 Duration::from_secs(secs),
196 keepalive_settings.max_connection_age_jitter_factor,
197 remote_addr,
198 )
199 }))
200 .map_request(move |mut request: hyper::Request<_>| {
201 request.extensions_mut().insert(PeerAddr::new(remote_addr));
202
203 request
204 })
205 .service(warp::service(routes.clone()));
206 futures_util::future::ok::<_, Infallible>(svc)
207 });
208
209 info!(message = "Building HTTP server.", address = %address);
210
211 let listener = tls.bind(&address).await.map_err(|err| {
212 error!("An error occurred: {:?}.", err);
213 })?;
214
215 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
216 .serve(make_svc)
217 .with_graceful_shutdown(cx.shutdown.map(|_| ()))
218 .await
219 .map_err(|err| {
220 error!("An error occurred: {:?}.", err);
221 })?;
222
223 Ok(())
224 }))
225 }
226
227 fn enable_source_ip(&self) -> bool {
228 false
229 }
230}
231
232#[derive(Clone)]
233#[repr(transparent)]
234struct PeerAddr(SocketAddr);
235
236impl PeerAddr {
237 const fn new(addr: SocketAddr) -> Self {
238 Self(addr)
239 }
240}
241
242struct RejectShuttingDown;
243
244impl fmt::Debug for RejectShuttingDown {
245 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
246 f.write_str("shutting down")
247 }
248}
249
250impl warp::reject::Reject for RejectShuttingDown {}
251
252async fn handle_request(
253 events: Result<Vec<Event>, ErrorMessage>,
254 acknowledgements: bool,
255 response_code: StatusCode,
256 mut out: SourceSender,
257) -> Result<impl warp::Reply, Rejection> {
258 match events {
259 Ok(mut events) => {
260 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
261
262 let count = events.len();
263 out.send_batch(events)
264 .map_err(|_| {
265 emit!(StreamClosedError { count });
268 warp::reject::custom(RejectShuttingDown)
269 })
270 .and_then(|_| handle_batch_status(response_code, receiver))
271 .await
272 }
273 Err(error) => {
274 emit!(HttpBadRequest::new(error.code(), error.message()));
275 Err(warp::reject::custom(error))
276 }
277 }
278}
279
280async fn handle_batch_status(
281 success_response_code: StatusCode,
282 receiver: Option<BatchStatusReceiver>,
283) -> Result<impl warp::Reply, Rejection> {
284 match receiver {
285 None => Ok(success_response_code),
286 Some(receiver) => match receiver.await {
287 BatchStatus::Delivered => Ok(success_response_code),
288 BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
289 StatusCode::INTERNAL_SERVER_ERROR,
290 "Error delivering contents to sink".into(),
291 ))),
292 BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
293 StatusCode::BAD_REQUEST,
294 "Contents failed to deliver to sink".into(),
295 ))),
296 },
297 }
298}