1use std::{convert::Infallible, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::StatusCode;
6use hyper::{Server, service::make_service_fn};
7use prost::Message;
8use snafu::Snafu;
9use tokio::net::TcpStream;
10use tower::ServiceBuilder;
11use tracing::Span;
12use vector_lib::{
13 EstimatedJsonEncodedSizeOf,
14 codecs::decoding::{OtlpDeserializer, format::Deserializer},
15 config::LogNamespace,
16 event::{BatchNotifier, BatchStatus},
17 internal_event::{
18 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19 },
20 opentelemetry::proto::collector::{
21 logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse},
22 metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse},
23 trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
24 },
25 tls::MaybeTlsIncomingStream,
26};
27use warp::{
28 Filter, Reply, filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response,
29};
30
31use super::{reply::protobuf, status::Status};
32use crate::{
33 SourceSender,
34 common::http::ErrorMessage,
35 event::Event,
36 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
37 internal_events::{EventsReceived, HttpBadRequest, StreamClosedError},
38 shutdown::ShutdownSignal,
39 sources::{
40 http_server::HttpConfigParamKind,
41 opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES},
42 util::{add_headers, decompress_body},
43 },
44 tls::MaybeTlsSettings,
45};
46
47#[derive(Clone, Copy, Debug, Snafu)]
48pub(crate) enum ApiError {
49 ServerShutdown,
50}
51
52impl warp::reject::Reject for ApiError {}
53
54pub(crate) async fn run_http_server(
55 address: SocketAddr,
56 tls_settings: MaybeTlsSettings,
57 filters: BoxedFilter<(Response,)>,
58 shutdown: ShutdownSignal,
59 keepalive_settings: KeepaliveConfig,
60) -> crate::Result<()> {
61 let listener = tls_settings.bind(&address).await?;
62 let routes = filters.recover(handle_rejection);
63
64 info!(message = "Building HTTP server.", address = %address);
65
66 let span = Span::current();
67 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
68 let svc = ServiceBuilder::new()
69 .layer(build_http_trace_layer(span.clone()))
70 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
71 MaxConnectionAgeLayer::new(
72 Duration::from_secs(secs),
73 keepalive_settings.max_connection_age_jitter_factor,
74 conn.peer_addr(),
75 )
76 }))
77 .service(warp::service(routes.clone()));
78 futures_util::future::ok::<_, Infallible>(svc)
79 });
80
81 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
82 .serve(make_svc)
83 .with_graceful_shutdown(shutdown.map(|_| ()))
84 .await?;
85
86 Ok(())
87}
88
89#[allow(clippy::too_many_arguments)] pub(crate) fn build_warp_filter(
91 acknowledgements: bool,
92 log_namespace: LogNamespace,
93 out: SourceSender,
94 bytes_received: Registered<BytesReceived>,
95 events_received: Registered<EventsReceived>,
96 headers: Vec<HttpConfigParamKind>,
97 logs_deserializer: Option<OtlpDeserializer>,
98 metrics_deserializer: Option<OtlpDeserializer>,
99 traces_deserializer: Option<OtlpDeserializer>,
100) -> BoxedFilter<(Response,)> {
101 let log_filters = build_warp_log_filter(
102 acknowledgements,
103 log_namespace,
104 out.clone(),
105 bytes_received.clone(),
106 events_received.clone(),
107 headers.clone(),
108 logs_deserializer,
109 );
110 let metrics_filters = build_warp_metrics_filter(
111 acknowledgements,
112 log_namespace,
113 out.clone(),
114 bytes_received.clone(),
115 events_received.clone(),
116 headers.clone(),
117 metrics_deserializer,
118 );
119 let trace_filters = build_warp_trace_filter(
120 acknowledgements,
121 out.clone(),
122 bytes_received,
123 events_received,
124 headers.clone(),
125 traces_deserializer,
126 );
127 log_filters
128 .or(trace_filters)
129 .unify()
130 .or(metrics_filters)
131 .unify()
132 .boxed()
133}
134
135fn enrich_events(
136 events: &mut [Event],
137 headers_config: &[HttpConfigParamKind],
138 headers: &HeaderMap,
139 log_namespace: LogNamespace,
140) {
141 add_headers(
142 events,
143 headers_config,
144 headers,
145 log_namespace,
146 OpentelemetryConfig::NAME,
147 );
148}
149
150fn emit_decode_error(error: impl std::fmt::Display) -> ErrorMessage {
151 let message = format!("Could not decode request: {error}");
152 emit!(HttpBadRequest::new(
153 StatusCode::BAD_REQUEST.as_u16(),
154 &message
155 ));
156 ErrorMessage::new(StatusCode::BAD_REQUEST, message)
157}
158
159fn parse_with_deserializer(
160 deserializer: &OtlpDeserializer,
161 body: Bytes,
162 log_namespace: LogNamespace,
163 events_received: &Registered<EventsReceived>,
164) -> Result<Vec<Event>, ErrorMessage> {
165 let events = deserializer
166 .parse(body, log_namespace)
167 .map(|r| r.into_vec())
168 .map_err(emit_decode_error)?;
169
170 let count = super::count_otlp_items(&events);
172 events_received.emit(CountByteSize(
173 count,
174 events.estimated_json_encoded_size_of(),
175 ));
176
177 Ok(events)
178}
179
180fn build_ingest_filter<Resp, F>(
181 telemetry_type: &'static str,
182 acknowledgements: bool,
183 out: SourceSender,
184 make_events: F,
185) -> BoxedFilter<(Response,)>
186where
187 Resp: prost::Message + Default + Send + 'static,
188 F: Clone
189 + Send
190 + Sync
191 + 'static
192 + Fn(Option<String>, HeaderMap, Bytes) -> Result<Vec<Event>, ErrorMessage>,
193{
194 warp::post()
195 .and(warp::path("v1"))
196 .and(warp::path(telemetry_type))
197 .and(warp::path::end())
198 .and(warp::header::exact_ignore_case(
199 "content-type",
200 "application/x-protobuf",
201 ))
202 .and(warp::header::optional::<String>("content-encoding"))
203 .and(warp::header::headers_cloned())
204 .and(warp::body::bytes())
205 .and_then(
206 move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
207 let events = make_events(encoding_header, headers, body);
208 handle_request(
209 events,
210 acknowledgements,
211 out.clone(),
212 telemetry_type,
213 Resp::default(),
214 )
215 },
216 )
217 .boxed()
218}
219
220fn build_warp_log_filter(
221 acknowledgements: bool,
222 log_namespace: LogNamespace,
223 source_sender: SourceSender,
224 bytes_received: Registered<BytesReceived>,
225 events_received: Registered<EventsReceived>,
226 headers_cfg: Vec<HttpConfigParamKind>,
227 deserializer: Option<OtlpDeserializer>,
228) -> BoxedFilter<(Response,)> {
229 let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
230 decompress_body(encoding_header.as_deref(), body)
231 .inspect_err(|err| {
232 if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
234 emit!(HttpBadRequest::new(
235 err.status_code().as_u16(),
236 err.message()
237 ));
238 }
239 })
240 .and_then(|decoded_body| {
241 bytes_received.emit(ByteSize(decoded_body.len()));
242 if let Some(d) = deserializer.as_ref() {
243 parse_with_deserializer(d, decoded_body, log_namespace, &events_received)
244 } else {
245 decode_log_body(decoded_body, log_namespace, &events_received)
246 }
247 .map(|mut events| {
248 enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
249 events
250 })
251 })
252 };
253
254 build_ingest_filter::<ExportLogsServiceResponse, _>(
255 LOGS,
256 acknowledgements,
257 source_sender,
258 make_events,
259 )
260}
261fn build_warp_metrics_filter(
262 acknowledgements: bool,
263 log_namespace: LogNamespace,
264 source_sender: SourceSender,
265 bytes_received: Registered<BytesReceived>,
266 events_received: Registered<EventsReceived>,
267 headers_cfg: Vec<HttpConfigParamKind>,
268 deserializer: Option<OtlpDeserializer>,
269) -> BoxedFilter<(Response,)> {
270 let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
271 decompress_body(encoding_header.as_deref(), body)
272 .inspect_err(|err| {
273 if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
275 emit!(HttpBadRequest::new(
276 err.status_code().as_u16(),
277 err.message()
278 ));
279 }
280 })
281 .and_then(|decoded_body| {
282 bytes_received.emit(ByteSize(decoded_body.len()));
283 if let Some(d) = deserializer.as_ref() {
284 parse_with_deserializer(d, decoded_body, log_namespace, &events_received)
285 } else {
286 decode_metrics_body(decoded_body, &events_received)
287 }
288 .map(|mut events| {
289 enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
290 events
291 })
292 })
293 };
294
295 build_ingest_filter::<ExportMetricsServiceResponse, _>(
296 METRICS,
297 acknowledgements,
298 source_sender,
299 make_events,
300 )
301}
302
303fn build_warp_trace_filter(
304 acknowledgements: bool,
305 source_sender: SourceSender,
306 bytes_received: Registered<BytesReceived>,
307 events_received: Registered<EventsReceived>,
308 headers_cfg: Vec<HttpConfigParamKind>,
309 deserializer: Option<OtlpDeserializer>,
310) -> BoxedFilter<(Response,)> {
311 let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
312 decompress_body(encoding_header.as_deref(), body)
313 .inspect_err(|err| {
314 if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
316 emit!(HttpBadRequest::new(
317 err.status_code().as_u16(),
318 err.message()
319 ));
320 }
321 })
322 .and_then(|decoded_body| {
323 bytes_received.emit(ByteSize(decoded_body.len()));
324 if let Some(d) = deserializer.as_ref() {
325 parse_with_deserializer(
326 d,
327 decoded_body,
328 LogNamespace::default(),
329 &events_received,
330 )
331 } else {
332 decode_trace_body(decoded_body, &events_received)
333 }
334 .map(|mut events| {
335 enrich_events(&mut events, &headers_cfg, &headers, LogNamespace::default());
336 events
337 })
338 })
339 };
340
341 build_ingest_filter::<ExportTraceServiceResponse, _>(
342 TRACES,
343 acknowledgements,
344 source_sender,
345 make_events,
346 )
347}
348
349fn decode_trace_body(
350 body: Bytes,
351 events_received: &Registered<EventsReceived>,
352) -> Result<Vec<Event>, ErrorMessage> {
353 let request = ExportTraceServiceRequest::decode(body).map_err(emit_decode_error)?;
354
355 let events: Vec<Event> = request
356 .resource_spans
357 .into_iter()
358 .flat_map(|v| v.into_event_iter())
359 .collect();
360
361 events_received.emit(CountByteSize(
362 events.len(),
363 events.estimated_json_encoded_size_of(),
364 ));
365
366 Ok(events)
367}
368
369fn decode_log_body(
370 body: Bytes,
371 log_namespace: LogNamespace,
372 events_received: &Registered<EventsReceived>,
373) -> Result<Vec<Event>, ErrorMessage> {
374 let request = ExportLogsServiceRequest::decode(body).map_err(emit_decode_error)?;
375
376 let events: Vec<Event> = request
377 .resource_logs
378 .into_iter()
379 .flat_map(|v| v.into_event_iter(log_namespace))
380 .collect();
381
382 events_received.emit(CountByteSize(
383 events.len(),
384 events.estimated_json_encoded_size_of(),
385 ));
386
387 Ok(events)
388}
389
390fn decode_metrics_body(
391 body: Bytes,
392 events_received: &Registered<EventsReceived>,
393) -> Result<Vec<Event>, ErrorMessage> {
394 let request = ExportMetricsServiceRequest::decode(body).map_err(emit_decode_error)?;
395
396 let events: Vec<Event> = request
397 .resource_metrics
398 .into_iter()
399 .flat_map(|v| v.into_event_iter())
400 .collect();
401
402 events_received.emit(CountByteSize(
403 events.len(),
404 events.estimated_json_encoded_size_of(),
405 ));
406
407 Ok(events)
408}
409
410async fn handle_request(
411 events: Result<Vec<Event>, ErrorMessage>,
412 acknowledgements: bool,
413 mut out: SourceSender,
414 output: &str,
415 resp: impl Message,
416) -> Result<Response, Rejection> {
417 match events {
418 Ok(mut events) => {
419 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
420 let count = events.len();
421
422 out.send_batch_named(output, events).await.map_err(|_| {
423 emit!(StreamClosedError { count });
424 warp::reject::custom(ApiError::ServerShutdown)
425 })?;
426
427 match receiver {
428 None => Ok(protobuf(resp).into_response()),
429 Some(receiver) => match receiver.await {
430 BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
431 BatchStatus::Errored => Err(warp::reject::custom(Status {
432 code: 2, message: "Error delivering contents to sink".into(),
434 ..Default::default()
435 })),
436 BatchStatus::Rejected => Err(warp::reject::custom(Status {
437 code: 2, message: "Contents failed to deliver to sink".into(),
439 ..Default::default()
440 })),
441 },
442 }
443 }
444 Err(err) => Err(warp::reject::custom(err)),
445 }
446}
447
448async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
449 if let Some(err_msg) = err.find::<ErrorMessage>() {
450 let reply = protobuf(Status {
451 code: 2, message: err_msg.message().into(),
453 ..Default::default()
454 });
455
456 Ok(warp::reply::with_status(reply, err_msg.status_code()))
457 } else {
458 let reply = protobuf(Status {
459 code: 2, message: format!("{err:?}"),
461 ..Default::default()
462 });
463
464 Ok(warp::reply::with_status(
465 reply,
466 StatusCode::INTERNAL_SERVER_ERROR,
467 ))
468 }
469}