1use std::{convert::Infallible, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::StatusCode;
6use hyper::{Server, service::make_service_fn};
7use prost::Message;
8use snafu::Snafu;
9use tokio::net::TcpStream;
10use tower::ServiceBuilder;
11use tracing::Span;
12use vector_lib::{
13 EstimatedJsonEncodedSizeOf,
14 codecs::decoding::{OtlpDeserializer, format::Deserializer},
15 config::LogNamespace,
16 event::{BatchNotifier, BatchStatus},
17 internal_event::{
18 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19 },
20 opentelemetry::proto::collector::{
21 logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse},
22 metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse},
23 trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
24 },
25 tls::MaybeTlsIncomingStream,
26};
27use warp::{
28 Filter, Reply, filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response,
29};
30
31use super::{reply::protobuf, status::Status};
32use crate::{
33 SourceSender,
34 common::http::ErrorMessage,
35 event::Event,
36 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
37 internal_events::{EventsReceived, HttpBadRequest, StreamClosedError},
38 shutdown::ShutdownSignal,
39 sources::{
40 http_server::HttpConfigParamKind,
41 opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES},
42 util::{add_headers, decompress_body},
43 },
44 tls::MaybeTlsSettings,
45};
46
47#[derive(Clone, Copy, Debug, Snafu)]
48pub(crate) enum ApiError {
49 ServerShutdown,
50}
51
52impl warp::reject::Reject for ApiError {}
53
54pub(crate) async fn run_http_server(
55 address: SocketAddr,
56 tls_settings: MaybeTlsSettings,
57 filters: BoxedFilter<(Response,)>,
58 shutdown: ShutdownSignal,
59 keepalive_settings: KeepaliveConfig,
60) -> crate::Result<()> {
61 let listener = tls_settings.bind(&address).await?;
62 let routes = filters.recover(handle_rejection);
63
64 info!(message = "Building HTTP server.", address = %address);
65
66 let span = Span::current();
67 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
68 let svc = ServiceBuilder::new()
69 .layer(build_http_trace_layer(span.clone()))
70 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
71 MaxConnectionAgeLayer::new(
72 Duration::from_secs(secs),
73 keepalive_settings.max_connection_age_jitter_factor,
74 conn.peer_addr(),
75 )
76 }))
77 .service(warp::service(routes.clone()));
78 futures_util::future::ok::<_, Infallible>(svc)
79 });
80
81 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
82 .serve(make_svc)
83 .with_graceful_shutdown(shutdown.map(|_| ()))
84 .await?;
85
86 Ok(())
87}
88
89#[allow(clippy::too_many_arguments)] pub(crate) fn build_warp_filter(
91 acknowledgements: bool,
92 log_namespace: LogNamespace,
93 out: SourceSender,
94 bytes_received: Registered<BytesReceived>,
95 events_received: Registered<EventsReceived>,
96 headers: Vec<HttpConfigParamKind>,
97 logs_deserializer: Option<OtlpDeserializer>,
98 metrics_deserializer: Option<OtlpDeserializer>,
99 traces_deserializer: Option<OtlpDeserializer>,
100) -> BoxedFilter<(Response,)> {
101 let log_filters = build_warp_log_filter(
102 acknowledgements,
103 log_namespace,
104 out.clone(),
105 bytes_received.clone(),
106 events_received.clone(),
107 headers.clone(),
108 logs_deserializer,
109 );
110 let metrics_filters = build_warp_metrics_filter(
111 acknowledgements,
112 out.clone(),
113 bytes_received.clone(),
114 events_received.clone(),
115 metrics_deserializer,
116 );
117 let trace_filters = build_warp_trace_filter(
118 acknowledgements,
119 out.clone(),
120 bytes_received,
121 events_received,
122 traces_deserializer,
123 );
124 log_filters
125 .or(trace_filters)
126 .unify()
127 .or(metrics_filters)
128 .unify()
129 .boxed()
130}
131
132fn enrich_events(
133 events: &mut [Event],
134 headers_config: &[HttpConfigParamKind],
135 headers: &HeaderMap,
136 log_namespace: LogNamespace,
137) {
138 add_headers(
139 events,
140 headers_config,
141 headers,
142 log_namespace,
143 OpentelemetryConfig::NAME,
144 );
145}
146
147fn emit_decode_error(error: impl std::fmt::Display) -> ErrorMessage {
148 let message = format!("Could not decode request: {error}");
149 emit!(HttpBadRequest::new(
150 StatusCode::BAD_REQUEST.as_u16(),
151 &message
152 ));
153 ErrorMessage::new(StatusCode::BAD_REQUEST, message)
154}
155
156fn parse_with_deserializer(
157 deserializer: &OtlpDeserializer,
158 body: Bytes,
159 log_namespace: LogNamespace,
160) -> Result<Vec<Event>, ErrorMessage> {
161 deserializer
162 .parse(body, log_namespace)
163 .map(|r| r.into_vec())
164 .map_err(emit_decode_error)
165}
166
167fn build_ingest_filter<Resp, F>(
168 telemetry_type: &'static str,
169 acknowledgements: bool,
170 out: SourceSender,
171 make_events: F,
172) -> BoxedFilter<(Response,)>
173where
174 Resp: prost::Message + Default + Send + 'static,
175 F: Clone
176 + Send
177 + Sync
178 + 'static
179 + Fn(Option<String>, HeaderMap, Bytes) -> Result<Vec<Event>, ErrorMessage>,
180{
181 warp::post()
182 .and(warp::path("v1"))
183 .and(warp::path(telemetry_type))
184 .and(warp::path::end())
185 .and(warp::header::exact_ignore_case(
186 "content-type",
187 "application/x-protobuf",
188 ))
189 .and(warp::header::optional::<String>("content-encoding"))
190 .and(warp::header::headers_cloned())
191 .and(warp::body::bytes())
192 .and_then(
193 move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
194 let events = make_events(encoding_header, headers, body);
195 handle_request(
196 events,
197 acknowledgements,
198 out.clone(),
199 telemetry_type,
200 Resp::default(),
201 )
202 },
203 )
204 .boxed()
205}
206
207fn build_warp_log_filter(
208 acknowledgements: bool,
209 log_namespace: LogNamespace,
210 source_sender: SourceSender,
211 bytes_received: Registered<BytesReceived>,
212 events_received: Registered<EventsReceived>,
213 headers_cfg: Vec<HttpConfigParamKind>,
214 deserializer: Option<OtlpDeserializer>,
215) -> BoxedFilter<(Response,)> {
216 let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
217 decompress_body(encoding_header.as_deref(), body)
218 .inspect_err(|err| {
219 if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
221 emit!(HttpBadRequest::new(
222 err.status_code().as_u16(),
223 err.message()
224 ));
225 }
226 })
227 .and_then(|decoded_body| {
228 bytes_received.emit(ByteSize(decoded_body.len()));
229 if let Some(d) = deserializer.as_ref() {
230 parse_with_deserializer(d, decoded_body, log_namespace)
231 } else {
232 decode_log_body(decoded_body, log_namespace, &events_received).map(
233 |mut events| {
234 enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
235 events
236 },
237 )
238 }
239 })
240 };
241
242 build_ingest_filter::<ExportLogsServiceResponse, _>(
243 LOGS,
244 acknowledgements,
245 source_sender,
246 make_events,
247 )
248}
249fn build_warp_metrics_filter(
250 acknowledgements: bool,
251 source_sender: SourceSender,
252 bytes_received: Registered<BytesReceived>,
253 events_received: Registered<EventsReceived>,
254 deserializer: Option<OtlpDeserializer>,
255) -> BoxedFilter<(Response,)> {
256 let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
257 decompress_body(encoding_header.as_deref(), body)
258 .inspect_err(|err| {
259 if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
261 emit!(HttpBadRequest::new(
262 err.status_code().as_u16(),
263 err.message()
264 ));
265 }
266 })
267 .and_then(|decoded_body| {
268 bytes_received.emit(ByteSize(decoded_body.len()));
269 if let Some(d) = deserializer.as_ref() {
270 parse_with_deserializer(d, decoded_body, LogNamespace::default())
271 } else {
272 decode_metrics_body(decoded_body, &events_received)
273 }
274 })
275 };
276
277 build_ingest_filter::<ExportMetricsServiceResponse, _>(
278 METRICS,
279 acknowledgements,
280 source_sender,
281 make_events,
282 )
283}
284
285fn build_warp_trace_filter(
286 acknowledgements: bool,
287 source_sender: SourceSender,
288 bytes_received: Registered<BytesReceived>,
289 events_received: Registered<EventsReceived>,
290 deserializer: Option<OtlpDeserializer>,
291) -> BoxedFilter<(Response,)> {
292 let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
293 decompress_body(encoding_header.as_deref(), body)
294 .inspect_err(|err| {
295 if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
297 emit!(HttpBadRequest::new(
298 err.status_code().as_u16(),
299 err.message()
300 ));
301 }
302 })
303 .and_then(|decoded_body| {
304 bytes_received.emit(ByteSize(decoded_body.len()));
305 if let Some(d) = deserializer.as_ref() {
306 parse_with_deserializer(d, decoded_body, LogNamespace::default())
307 } else {
308 decode_trace_body(decoded_body, &events_received)
309 }
310 })
311 };
312
313 build_ingest_filter::<ExportTraceServiceResponse, _>(
314 TRACES,
315 acknowledgements,
316 source_sender,
317 make_events,
318 )
319}
320
321fn decode_trace_body(
322 body: Bytes,
323 events_received: &Registered<EventsReceived>,
324) -> Result<Vec<Event>, ErrorMessage> {
325 let request = ExportTraceServiceRequest::decode(body).map_err(emit_decode_error)?;
326
327 let events: Vec<Event> = request
328 .resource_spans
329 .into_iter()
330 .flat_map(|v| v.into_event_iter())
331 .collect();
332
333 events_received.emit(CountByteSize(
334 events.len(),
335 events.estimated_json_encoded_size_of(),
336 ));
337
338 Ok(events)
339}
340
341fn decode_log_body(
342 body: Bytes,
343 log_namespace: LogNamespace,
344 events_received: &Registered<EventsReceived>,
345) -> Result<Vec<Event>, ErrorMessage> {
346 let request = ExportLogsServiceRequest::decode(body).map_err(emit_decode_error)?;
347
348 let events: Vec<Event> = request
349 .resource_logs
350 .into_iter()
351 .flat_map(|v| v.into_event_iter(log_namespace))
352 .collect();
353
354 events_received.emit(CountByteSize(
355 events.len(),
356 events.estimated_json_encoded_size_of(),
357 ));
358
359 Ok(events)
360}
361
362fn decode_metrics_body(
363 body: Bytes,
364 events_received: &Registered<EventsReceived>,
365) -> Result<Vec<Event>, ErrorMessage> {
366 let request = ExportMetricsServiceRequest::decode(body).map_err(emit_decode_error)?;
367
368 let events: Vec<Event> = request
369 .resource_metrics
370 .into_iter()
371 .flat_map(|v| v.into_event_iter())
372 .collect();
373
374 events_received.emit(CountByteSize(
375 events.len(),
376 events.estimated_json_encoded_size_of(),
377 ));
378
379 Ok(events)
380}
381
382async fn handle_request(
383 events: Result<Vec<Event>, ErrorMessage>,
384 acknowledgements: bool,
385 mut out: SourceSender,
386 output: &str,
387 resp: impl Message,
388) -> Result<Response, Rejection> {
389 match events {
390 Ok(mut events) => {
391 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
392 let count = events.len();
393
394 out.send_batch_named(output, events).await.map_err(|_| {
395 emit!(StreamClosedError { count });
396 warp::reject::custom(ApiError::ServerShutdown)
397 })?;
398
399 match receiver {
400 None => Ok(protobuf(resp).into_response()),
401 Some(receiver) => match receiver.await {
402 BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
403 BatchStatus::Errored => Err(warp::reject::custom(Status {
404 code: 2, message: "Error delivering contents to sink".into(),
406 ..Default::default()
407 })),
408 BatchStatus::Rejected => Err(warp::reject::custom(Status {
409 code: 2, message: "Contents failed to deliver to sink".into(),
411 ..Default::default()
412 })),
413 },
414 }
415 }
416 Err(err) => Err(warp::reject::custom(err)),
417 }
418}
419
420async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
421 if let Some(err_msg) = err.find::<ErrorMessage>() {
422 let reply = protobuf(Status {
423 code: 2, message: err_msg.message().into(),
425 ..Default::default()
426 });
427
428 Ok(warp::reply::with_status(reply, err_msg.status_code()))
429 } else {
430 let reply = protobuf(Status {
431 code: 2, message: format!("{err:?}"),
433 ..Default::default()
434 });
435
436 Ok(warp::reply::with_status(
437 reply,
438 StatusCode::INTERNAL_SERVER_ERROR,
439 ))
440 }
441}