1use std::{convert::Infallible, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::StatusCode;
6use hyper::{Server, service::make_service_fn};
7use prost::Message;
8use snafu::Snafu;
9use tokio::net::TcpStream;
10use tower::ServiceBuilder;
11use tracing::Span;
12use vector_lib::{
13 EstimatedJsonEncodedSizeOf,
14 codecs::decoding::{OtlpDeserializer, format::Deserializer},
15 config::LogNamespace,
16 event::{BatchNotifier, BatchStatus},
17 internal_event::{
18 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19 },
20 opentelemetry::proto::collector::{
21 logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse},
22 metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse},
23 trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
24 },
25 tls::MaybeTlsIncomingStream,
26};
27use warp::{
28 Filter, Reply, filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response,
29};
30
31use super::{reply::protobuf, status::Status};
32use crate::{
33 SourceSender,
34 common::http::ErrorMessage,
35 event::Event,
36 http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
37 internal_events::{EventsReceived, HttpBadRequest, StreamClosedError},
38 shutdown::ShutdownSignal,
39 sources::{
40 http_server::HttpConfigParamKind,
41 opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES},
42 util::{add_headers, decompress_body},
43 },
44 tls::MaybeTlsSettings,
45};
46
47#[derive(Clone, Copy, Debug, Snafu)]
48pub(crate) enum ApiError {
49 ServerShutdown,
50}
51
52impl warp::reject::Reject for ApiError {}
53
54pub(crate) async fn run_http_server(
55 address: SocketAddr,
56 tls_settings: MaybeTlsSettings,
57 filters: BoxedFilter<(Response,)>,
58 shutdown: ShutdownSignal,
59 keepalive_settings: KeepaliveConfig,
60) -> crate::Result<()> {
61 let listener = tls_settings.bind(&address).await?;
62 let routes = filters.recover(handle_rejection);
63
64 info!(message = "Building HTTP server.", address = %address);
65
66 let span = Span::current();
67 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
68 let svc = ServiceBuilder::new()
69 .layer(build_http_trace_layer(span.clone()))
70 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
71 MaxConnectionAgeLayer::new(
72 Duration::from_secs(secs),
73 keepalive_settings.max_connection_age_jitter_factor,
74 conn.peer_addr(),
75 )
76 }))
77 .service(warp::service(routes.clone()));
78 futures_util::future::ok::<_, Infallible>(svc)
79 });
80
81 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
82 .serve(make_svc)
83 .with_graceful_shutdown(shutdown.map(|_| ()))
84 .await?;
85
86 Ok(())
87}
88
89#[allow(clippy::too_many_arguments)] pub(crate) fn build_warp_filter(
91 acknowledgements: bool,
92 log_namespace: LogNamespace,
93 out: SourceSender,
94 bytes_received: Registered<BytesReceived>,
95 events_received: Registered<EventsReceived>,
96 headers: Vec<HttpConfigParamKind>,
97 logs_deserializer: Option<OtlpDeserializer>,
98 metrics_deserializer: Option<OtlpDeserializer>,
99 traces_deserializer: Option<OtlpDeserializer>,
100) -> BoxedFilter<(Response,)> {
101 let log_filters = build_warp_log_filter(
102 acknowledgements,
103 log_namespace,
104 out.clone(),
105 bytes_received.clone(),
106 events_received.clone(),
107 headers.clone(),
108 logs_deserializer,
109 );
110 let metrics_filters = build_warp_metrics_filter(
111 acknowledgements,
112 out.clone(),
113 bytes_received.clone(),
114 events_received.clone(),
115 metrics_deserializer,
116 );
117 let trace_filters = build_warp_trace_filter(
118 acknowledgements,
119 out.clone(),
120 bytes_received,
121 events_received,
122 traces_deserializer,
123 );
124 log_filters
125 .or(trace_filters)
126 .unify()
127 .or(metrics_filters)
128 .unify()
129 .boxed()
130}
131
132fn enrich_events(
133 events: &mut [Event],
134 headers_config: &[HttpConfigParamKind],
135 headers: &HeaderMap,
136 log_namespace: LogNamespace,
137) {
138 add_headers(
139 events,
140 headers_config,
141 headers,
142 log_namespace,
143 OpentelemetryConfig::NAME,
144 );
145}
146
147fn emit_decode_error(error: impl std::fmt::Display) -> ErrorMessage {
148 let message = format!("Could not decode request: {error}");
149 emit!(HttpBadRequest::new(
150 StatusCode::BAD_REQUEST.as_u16(),
151 &message
152 ));
153 ErrorMessage::new(StatusCode::BAD_REQUEST, message)
154}
155
156fn parse_with_deserializer(
157 deserializer: &OtlpDeserializer,
158 body: Bytes,
159 log_namespace: LogNamespace,
160 events_received: &Registered<EventsReceived>,
161) -> Result<Vec<Event>, ErrorMessage> {
162 let events = deserializer
163 .parse(body, log_namespace)
164 .map(|r| r.into_vec())
165 .map_err(emit_decode_error)?;
166
167 let count = super::count_otlp_items(&events);
169 events_received.emit(CountByteSize(
170 count,
171 events.estimated_json_encoded_size_of(),
172 ));
173
174 Ok(events)
175}
176
177fn build_ingest_filter<Resp, F>(
178 telemetry_type: &'static str,
179 acknowledgements: bool,
180 out: SourceSender,
181 make_events: F,
182) -> BoxedFilter<(Response,)>
183where
184 Resp: prost::Message + Default + Send + 'static,
185 F: Clone
186 + Send
187 + Sync
188 + 'static
189 + Fn(Option<String>, HeaderMap, Bytes) -> Result<Vec<Event>, ErrorMessage>,
190{
191 warp::post()
192 .and(warp::path("v1"))
193 .and(warp::path(telemetry_type))
194 .and(warp::path::end())
195 .and(warp::header::exact_ignore_case(
196 "content-type",
197 "application/x-protobuf",
198 ))
199 .and(warp::header::optional::<String>("content-encoding"))
200 .and(warp::header::headers_cloned())
201 .and(warp::body::bytes())
202 .and_then(
203 move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
204 let events = make_events(encoding_header, headers, body);
205 handle_request(
206 events,
207 acknowledgements,
208 out.clone(),
209 telemetry_type,
210 Resp::default(),
211 )
212 },
213 )
214 .boxed()
215}
216
217fn build_warp_log_filter(
218 acknowledgements: bool,
219 log_namespace: LogNamespace,
220 source_sender: SourceSender,
221 bytes_received: Registered<BytesReceived>,
222 events_received: Registered<EventsReceived>,
223 headers_cfg: Vec<HttpConfigParamKind>,
224 deserializer: Option<OtlpDeserializer>,
225) -> BoxedFilter<(Response,)> {
226 let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
227 decompress_body(encoding_header.as_deref(), body)
228 .inspect_err(|err| {
229 if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
231 emit!(HttpBadRequest::new(
232 err.status_code().as_u16(),
233 err.message()
234 ));
235 }
236 })
237 .and_then(|decoded_body| {
238 bytes_received.emit(ByteSize(decoded_body.len()));
239 if let Some(d) = deserializer.as_ref() {
240 parse_with_deserializer(d, decoded_body, log_namespace, &events_received)
241 } else {
242 decode_log_body(decoded_body, log_namespace, &events_received)
243 }
244 .map(|mut events| {
245 enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
246 events
247 })
248 })
249 };
250
251 build_ingest_filter::<ExportLogsServiceResponse, _>(
252 LOGS,
253 acknowledgements,
254 source_sender,
255 make_events,
256 )
257}
258fn build_warp_metrics_filter(
259 acknowledgements: bool,
260 source_sender: SourceSender,
261 bytes_received: Registered<BytesReceived>,
262 events_received: Registered<EventsReceived>,
263 deserializer: Option<OtlpDeserializer>,
264) -> BoxedFilter<(Response,)> {
265 let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
266 decompress_body(encoding_header.as_deref(), body)
267 .inspect_err(|err| {
268 if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
270 emit!(HttpBadRequest::new(
271 err.status_code().as_u16(),
272 err.message()
273 ));
274 }
275 })
276 .and_then(|decoded_body| {
277 bytes_received.emit(ByteSize(decoded_body.len()));
278 if let Some(d) = deserializer.as_ref() {
279 parse_with_deserializer(
280 d,
281 decoded_body,
282 LogNamespace::default(),
283 &events_received,
284 )
285 } else {
286 decode_metrics_body(decoded_body, &events_received)
287 }
288 })
289 };
290
291 build_ingest_filter::<ExportMetricsServiceResponse, _>(
292 METRICS,
293 acknowledgements,
294 source_sender,
295 make_events,
296 )
297}
298
299fn build_warp_trace_filter(
300 acknowledgements: bool,
301 source_sender: SourceSender,
302 bytes_received: Registered<BytesReceived>,
303 events_received: Registered<EventsReceived>,
304 deserializer: Option<OtlpDeserializer>,
305) -> BoxedFilter<(Response,)> {
306 let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
307 decompress_body(encoding_header.as_deref(), body)
308 .inspect_err(|err| {
309 if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
311 emit!(HttpBadRequest::new(
312 err.status_code().as_u16(),
313 err.message()
314 ));
315 }
316 })
317 .and_then(|decoded_body| {
318 bytes_received.emit(ByteSize(decoded_body.len()));
319 if let Some(d) = deserializer.as_ref() {
320 parse_with_deserializer(
321 d,
322 decoded_body,
323 LogNamespace::default(),
324 &events_received,
325 )
326 } else {
327 decode_trace_body(decoded_body, &events_received)
328 }
329 })
330 };
331
332 build_ingest_filter::<ExportTraceServiceResponse, _>(
333 TRACES,
334 acknowledgements,
335 source_sender,
336 make_events,
337 )
338}
339
340fn decode_trace_body(
341 body: Bytes,
342 events_received: &Registered<EventsReceived>,
343) -> Result<Vec<Event>, ErrorMessage> {
344 let request = ExportTraceServiceRequest::decode(body).map_err(emit_decode_error)?;
345
346 let events: Vec<Event> = request
347 .resource_spans
348 .into_iter()
349 .flat_map(|v| v.into_event_iter())
350 .collect();
351
352 events_received.emit(CountByteSize(
353 events.len(),
354 events.estimated_json_encoded_size_of(),
355 ));
356
357 Ok(events)
358}
359
360fn decode_log_body(
361 body: Bytes,
362 log_namespace: LogNamespace,
363 events_received: &Registered<EventsReceived>,
364) -> Result<Vec<Event>, ErrorMessage> {
365 let request = ExportLogsServiceRequest::decode(body).map_err(emit_decode_error)?;
366
367 let events: Vec<Event> = request
368 .resource_logs
369 .into_iter()
370 .flat_map(|v| v.into_event_iter(log_namespace))
371 .collect();
372
373 events_received.emit(CountByteSize(
374 events.len(),
375 events.estimated_json_encoded_size_of(),
376 ));
377
378 Ok(events)
379}
380
381fn decode_metrics_body(
382 body: Bytes,
383 events_received: &Registered<EventsReceived>,
384) -> Result<Vec<Event>, ErrorMessage> {
385 let request = ExportMetricsServiceRequest::decode(body).map_err(emit_decode_error)?;
386
387 let events: Vec<Event> = request
388 .resource_metrics
389 .into_iter()
390 .flat_map(|v| v.into_event_iter())
391 .collect();
392
393 events_received.emit(CountByteSize(
394 events.len(),
395 events.estimated_json_encoded_size_of(),
396 ));
397
398 Ok(events)
399}
400
401async fn handle_request(
402 events: Result<Vec<Event>, ErrorMessage>,
403 acknowledgements: bool,
404 mut out: SourceSender,
405 output: &str,
406 resp: impl Message,
407) -> Result<Response, Rejection> {
408 match events {
409 Ok(mut events) => {
410 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
411 let count = events.len();
412
413 out.send_batch_named(output, events).await.map_err(|_| {
414 emit!(StreamClosedError { count });
415 warp::reject::custom(ApiError::ServerShutdown)
416 })?;
417
418 match receiver {
419 None => Ok(protobuf(resp).into_response()),
420 Some(receiver) => match receiver.await {
421 BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
422 BatchStatus::Errored => Err(warp::reject::custom(Status {
423 code: 2, message: "Error delivering contents to sink".into(),
425 ..Default::default()
426 })),
427 BatchStatus::Rejected => Err(warp::reject::custom(Status {
428 code: 2, message: "Contents failed to deliver to sink".into(),
430 ..Default::default()
431 })),
432 },
433 }
434 }
435 Err(err) => Err(warp::reject::custom(err)),
436 }
437}
438
439async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
440 if let Some(err_msg) = err.find::<ErrorMessage>() {
441 let reply = protobuf(Status {
442 code: 2, message: err_msg.message().into(),
444 ..Default::default()
445 });
446
447 Ok(warp::reply::with_status(reply, err_msg.status_code()))
448 } else {
449 let reply = protobuf(Status {
450 code: 2, message: format!("{err:?}"),
452 ..Default::default()
453 });
454
455 Ok(warp::reply::with_status(
456 reply,
457 StatusCode::INTERNAL_SERVER_ERROR,
458 ))
459 }
460}