vector/sources/util/http/
prelude.rs1use crate::common::http::{server_auth::HttpServerAuthConfig, ErrorMessage};
2use std::{collections::HashMap, convert::Infallible, fmt, net::SocketAddr, time::Duration};
3
4use bytes::Bytes;
5use futures::{FutureExt, TryFutureExt};
6use hyper::{service::make_service_fn, Server};
7use tokio::net::TcpStream;
8use tower::ServiceBuilder;
9use tracing::Span;
10use vector_lib::{
11 config::SourceAcknowledgementsConfig,
12 event::{BatchNotifier, BatchStatus, BatchStatusReceiver, Event},
13 EstimatedJsonEncodedSizeOf,
14};
15use warp::{
16 filters::{
17 path::{FullPath, Tail},
18 BoxedFilter,
19 },
20 http::{HeaderMap, StatusCode},
21 reject::Rejection,
22 Filter,
23};
24
25use crate::{
26 config::SourceContext,
27 http::{build_http_trace_layer, KeepaliveConfig, MaxConnectionAgeLayer},
28 internal_events::{
29 HttpBadRequest, HttpBytesReceived, HttpEventsReceived, HttpInternalError, StreamClosedError,
30 },
31 sources::util::http::HttpMethod,
32 tls::{MaybeTlsIncomingStream, MaybeTlsSettings, TlsEnableableConfig},
33 SourceSender,
34};
35
36use super::encoding::decode;
37
38pub trait HttpSource: Clone + Send + Sync + 'static {
39 fn enrich_events(
43 &self,
44 _events: &mut [Event],
45 _request_path: &str,
46 _headers_config: &HeaderMap,
47 _query_parameters: &HashMap<String, String>,
48 _source_ip: Option<&SocketAddr>,
49 ) {
50 }
51
52 fn build_events(
53 &self,
54 body: Bytes,
55 header_map: &HeaderMap,
56 query_parameters: &HashMap<String, String>,
57 path: &str,
58 ) -> Result<Vec<Event>, ErrorMessage>;
59
60 fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
61 decode(encoding_header, body)
62 }
63
64 #[allow(clippy::too_many_arguments)]
65 fn run(
66 self,
67 address: SocketAddr,
68 path: &str,
69 method: HttpMethod,
70 response_code: StatusCode,
71 strict_path: bool,
72 tls: Option<&TlsEnableableConfig>,
73 auth: Option<&HttpServerAuthConfig>,
74 cx: SourceContext,
75 acknowledgements: SourceAcknowledgementsConfig,
76 keepalive_settings: KeepaliveConfig,
77 ) -> crate::Result<crate::sources::Source> {
78 let tls = MaybeTlsSettings::from_config(tls, true)?;
79 let protocol = tls.http_protocol_name();
80 let auth_matcher = auth.map(|a| a.build(&cx.enrichment_tables)).transpose()?;
81 let path = path.to_owned();
82 let acknowledgements = cx.do_acknowledgements(acknowledgements);
83 let enable_source_ip = self.enable_source_ip();
84
85 Ok(Box::pin(async move {
86 let mut filter: BoxedFilter<()> = match method {
87 HttpMethod::Head => warp::head().boxed(),
88 HttpMethod::Get => warp::get().boxed(),
89 HttpMethod::Put => warp::put().boxed(),
90 HttpMethod::Post => warp::post().boxed(),
91 HttpMethod::Patch => warp::patch().boxed(),
92 HttpMethod::Delete => warp::delete().boxed(),
93 HttpMethod::Options => warp::options().boxed(),
94 };
95
96 #[allow(clippy::unnecessary_to_owned)]
98 for s in path.split('/').filter(|&x| !x.is_empty()) {
99 filter = filter.and(warp::path(s.to_string())).boxed()
100 }
101 let svc = filter
102 .and(warp::path::tail())
103 .and_then(move |tail: Tail| async move {
104 if !strict_path || tail.as_str().is_empty() {
105 Ok(())
106 } else {
107 emit!(HttpInternalError {
108 message: "Path not found."
109 });
110 Err(warp::reject::custom(ErrorMessage::new(
111 StatusCode::NOT_FOUND,
112 "Not found".to_string(),
113 )))
114 }
115 })
116 .untuple_one()
117 .and(warp::path::full())
118 .and(warp::header::optional::<String>("content-encoding"))
119 .and(warp::header::headers_cloned())
120 .and(warp::body::bytes())
121 .and(warp::query::<HashMap<String, String>>())
122 .and(warp::filters::ext::optional())
123 .and_then(
124 move |path: FullPath,
125 encoding_header: Option<String>,
126 headers: HeaderMap,
127 body: Bytes,
128 query_parameters: HashMap<String, String>,
129 addr: Option<PeerAddr>| {
130 debug!(message = "Handling HTTP request.", headers = ?headers);
131 let http_path = path.as_str();
132
133 let events = auth_matcher
134 .as_ref()
135 .map_or(Ok(()), |a| {
136 a.handle_auth(
137 addr.as_ref().map(|a| a.0).as_ref(),
138 &headers,
139 path.as_str(),
140 )
141 })
142 .and_then(|()| self.decode(encoding_header.as_deref(), body))
143 .and_then(|body| {
144 emit!(HttpBytesReceived {
145 byte_size: body.len(),
146 http_path,
147 protocol,
148 });
149 self.build_events(body, &headers, &query_parameters, path.as_str())
150 })
151 .map(|mut events| {
152 emit!(HttpEventsReceived {
153 count: events.len(),
154 byte_size: events.estimated_json_encoded_size_of(),
155 http_path,
156 protocol,
157 });
158
159 self.enrich_events(
160 &mut events,
161 path.as_str(),
162 &headers,
163 &query_parameters,
164 addr.and_then(|a| enable_source_ip.then_some(a))
165 .map(|PeerAddr(inner_addr)| inner_addr)
166 .as_ref(),
167 );
168
169 events
170 });
171
172 handle_request(events, acknowledgements, response_code, cx.out.clone())
173 },
174 );
175
176 let ping = warp::get().and(warp::path("ping")).map(|| "pong");
177 let routes = svc.or(ping).recover(|r: Rejection| async move {
178 if let Some(e_msg) = r.find::<ErrorMessage>() {
179 let json = warp::reply::json(e_msg);
180 Ok(warp::reply::with_status(json, e_msg.status_code()))
181 } else {
182 emit!(HttpInternalError {
184 message: &format!("Internal error: {r:?}")
185 });
186 Err(r)
187 }
188 });
189
190 let span = Span::current();
191 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
192 let remote_addr = conn.peer_addr();
193 let svc = ServiceBuilder::new()
194 .layer(build_http_trace_layer(span.clone()))
195 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
196 MaxConnectionAgeLayer::new(
197 Duration::from_secs(secs),
198 keepalive_settings.max_connection_age_jitter_factor,
199 remote_addr,
200 )
201 }))
202 .map_request(move |mut request: hyper::Request<_>| {
203 request.extensions_mut().insert(PeerAddr::new(remote_addr));
204
205 request
206 })
207 .service(warp::service(routes.clone()));
208 futures_util::future::ok::<_, Infallible>(svc)
209 });
210
211 info!(message = "Building HTTP server.", address = %address);
212
213 let listener = tls.bind(&address).await.map_err(|err| {
214 error!("An error occurred: {:?}.", err);
215 })?;
216
217 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
218 .serve(make_svc)
219 .with_graceful_shutdown(cx.shutdown.map(|_| ()))
220 .await
221 .map_err(|err| {
222 error!("An error occurred: {:?}.", err);
223 })?;
224
225 Ok(())
226 }))
227 }
228
229 fn enable_source_ip(&self) -> bool {
230 false
231 }
232}
233
234#[derive(Clone)]
235#[repr(transparent)]
236struct PeerAddr(SocketAddr);
237
238impl PeerAddr {
239 const fn new(addr: SocketAddr) -> Self {
240 Self(addr)
241 }
242}
243
244struct RejectShuttingDown;
245
246impl fmt::Debug for RejectShuttingDown {
247 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
248 f.write_str("shutting down")
249 }
250}
251
252impl warp::reject::Reject for RejectShuttingDown {}
253
254async fn handle_request(
255 events: Result<Vec<Event>, ErrorMessage>,
256 acknowledgements: bool,
257 response_code: StatusCode,
258 mut out: SourceSender,
259) -> Result<impl warp::Reply, Rejection> {
260 match events {
261 Ok(mut events) => {
262 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
263
264 let count = events.len();
265 out.send_batch(events)
266 .map_err(|_| {
267 emit!(StreamClosedError { count });
270 warp::reject::custom(RejectShuttingDown)
271 })
272 .and_then(|_| handle_batch_status(response_code, receiver))
273 .await
274 }
275 Err(error) => {
276 emit!(HttpBadRequest::new(error.code(), error.message()));
277 Err(warp::reject::custom(error))
278 }
279 }
280}
281
282async fn handle_batch_status(
283 success_response_code: StatusCode,
284 receiver: Option<BatchStatusReceiver>,
285) -> Result<impl warp::Reply, Rejection> {
286 match receiver {
287 None => Ok(success_response_code),
288 Some(receiver) => match receiver.await {
289 BatchStatus::Delivered => Ok(success_response_code),
290 BatchStatus::Errored => Err(warp::reject::custom(ErrorMessage::new(
291 StatusCode::INTERNAL_SERVER_ERROR,
292 "Error delivering contents to sink".into(),
293 ))),
294 BatchStatus::Rejected => Err(warp::reject::custom(ErrorMessage::new(
295 StatusCode::BAD_REQUEST,
296 "Contents failed to deliver to sink".into(),
297 ))),
298 },
299 }
300}