1use std::time::Duration;
2use std::{convert::Infallible, net::SocketAddr};
3
4use bytes::Bytes;
5use futures_util::FutureExt;
6use http::StatusCode;
7use hyper::{service::make_service_fn, Server};
8use prost::Message;
9use snafu::Snafu;
10use tokio::net::TcpStream;
11use tower::ServiceBuilder;
12use tracing::Span;
13use vector_lib::internal_event::{
14 ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
15};
16use vector_lib::opentelemetry::proto::collector::{
17 logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse},
18 metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse},
19 trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
20};
21use vector_lib::tls::MaybeTlsIncomingStream;
22use vector_lib::{
23 config::LogNamespace,
24 event::{BatchNotifier, BatchStatus},
25 EstimatedJsonEncodedSizeOf,
26};
27use warp::{
28 filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response, Filter, Reply,
29};
30
31use crate::common::http::ErrorMessage;
32use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer};
33use crate::sources::http_server::HttpConfigParamKind;
34use crate::sources::util::add_headers;
35use crate::{
36 event::Event,
37 http::build_http_trace_layer,
38 internal_events::{EventsReceived, StreamClosedError},
39 shutdown::ShutdownSignal,
40 sources::util::decode,
41 tls::MaybeTlsSettings,
42 SourceSender,
43};
44
45use super::{reply::protobuf, status::Status};
46use crate::sources::opentelemetry::config::{OpentelemetryConfig, LOGS, METRICS, TRACES};
47
48#[derive(Clone, Copy, Debug, Snafu)]
49pub(crate) enum ApiError {
50 ServerShutdown,
51}
52
53impl warp::reject::Reject for ApiError {}
54
55pub(crate) async fn run_http_server(
56 address: SocketAddr,
57 tls_settings: MaybeTlsSettings,
58 filters: BoxedFilter<(Response,)>,
59 shutdown: ShutdownSignal,
60 keepalive_settings: KeepaliveConfig,
61) -> crate::Result<()> {
62 let listener = tls_settings.bind(&address).await?;
63 let routes = filters.recover(handle_rejection);
64
65 info!(message = "Building HTTP server.", address = %address);
66
67 let span = Span::current();
68 let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
69 let svc = ServiceBuilder::new()
70 .layer(build_http_trace_layer(span.clone()))
71 .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
72 MaxConnectionAgeLayer::new(
73 Duration::from_secs(secs),
74 keepalive_settings.max_connection_age_jitter_factor,
75 conn.peer_addr(),
76 )
77 }))
78 .service(warp::service(routes.clone()));
79 futures_util::future::ok::<_, Infallible>(svc)
80 });
81
82 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
83 .serve(make_svc)
84 .with_graceful_shutdown(shutdown.map(|_| ()))
85 .await?;
86
87 Ok(())
88}
89
90pub(crate) fn build_warp_filter(
91 acknowledgements: bool,
92 log_namespace: LogNamespace,
93 out: SourceSender,
94 bytes_received: Registered<BytesReceived>,
95 events_received: Registered<EventsReceived>,
96 headers: Vec<HttpConfigParamKind>,
97) -> BoxedFilter<(Response,)> {
98 let log_filters = build_warp_log_filter(
99 acknowledgements,
100 log_namespace,
101 out.clone(),
102 bytes_received.clone(),
103 events_received.clone(),
104 headers.clone(),
105 );
106 let metrics_filters = build_warp_metrics_filter(
107 acknowledgements,
108 out.clone(),
109 bytes_received.clone(),
110 events_received.clone(),
111 );
112 let trace_filters = build_warp_trace_filter(
113 acknowledgements,
114 out.clone(),
115 bytes_received,
116 events_received,
117 );
118 log_filters
119 .or(trace_filters)
120 .unify()
121 .or(metrics_filters)
122 .unify()
123 .boxed()
124}
125
126fn enrich_events(
127 events: &mut [Event],
128 headers_config: &[HttpConfigParamKind],
129 headers: &HeaderMap,
130 log_namespace: LogNamespace,
131) {
132 add_headers(
133 events,
134 headers_config,
135 headers,
136 log_namespace,
137 OpentelemetryConfig::NAME,
138 );
139}
140
141fn build_warp_log_filter(
142 acknowledgements: bool,
143 log_namespace: LogNamespace,
144 out: SourceSender,
145 bytes_received: Registered<BytesReceived>,
146 events_received: Registered<EventsReceived>,
147 headers: Vec<HttpConfigParamKind>,
148) -> BoxedFilter<(Response,)> {
149 warp::post()
150 .and(warp::path!("v1" / "logs"))
151 .and(warp::header::exact_ignore_case(
152 "content-type",
153 "application/x-protobuf",
154 ))
155 .and(warp::header::optional::<String>("content-encoding"))
156 .and(warp::header::headers_cloned())
157 .and(warp::body::bytes())
158 .and_then(
159 move |encoding_header: Option<String>, headers_config: HeaderMap, body: Bytes| {
160 let events = decode(encoding_header.as_deref(), body)
161 .and_then(|body| {
162 bytes_received.emit(ByteSize(body.len()));
163 decode_log_body(body, log_namespace, &events_received)
164 })
165 .map(|mut events| {
166 enrich_events(&mut events, &headers, &headers_config, log_namespace);
167 events
168 });
169
170 handle_request(
171 events,
172 acknowledgements,
173 out.clone(),
174 LOGS,
175 ExportLogsServiceResponse::default(),
176 )
177 },
178 )
179 .boxed()
180}
181
182fn build_warp_metrics_filter(
183 acknowledgements: bool,
184 out: SourceSender,
185 bytes_received: Registered<BytesReceived>,
186 events_received: Registered<EventsReceived>,
187) -> BoxedFilter<(Response,)> {
188 warp::post()
189 .and(warp::path!("v1" / "metrics"))
190 .and(warp::header::exact_ignore_case(
191 "content-type",
192 "application/x-protobuf",
193 ))
194 .and(warp::header::optional::<String>("content-encoding"))
195 .and(warp::body::bytes())
196 .and_then(move |encoding_header: Option<String>, body: Bytes| {
197 let events = decode(encoding_header.as_deref(), body).and_then(|body| {
198 bytes_received.emit(ByteSize(body.len()));
199 decode_metrics_body(body, &events_received)
200 });
201
202 handle_request(
203 events,
204 acknowledgements,
205 out.clone(),
206 METRICS,
207 ExportMetricsServiceResponse::default(),
208 )
209 })
210 .boxed()
211}
212
213fn build_warp_trace_filter(
214 acknowledgements: bool,
215 out: SourceSender,
216 bytes_received: Registered<BytesReceived>,
217 events_received: Registered<EventsReceived>,
218) -> BoxedFilter<(Response,)> {
219 warp::post()
220 .and(warp::path!("v1" / "traces"))
221 .and(warp::header::exact_ignore_case(
222 "content-type",
223 "application/x-protobuf",
224 ))
225 .and(warp::header::optional::<String>("content-encoding"))
226 .and(warp::body::bytes())
227 .and_then(move |encoding_header: Option<String>, body: Bytes| {
228 let events = decode(encoding_header.as_deref(), body).and_then(|body| {
229 bytes_received.emit(ByteSize(body.len()));
230 decode_trace_body(body, &events_received)
231 });
232
233 handle_request(
234 events,
235 acknowledgements,
236 out.clone(),
237 TRACES,
238 ExportTraceServiceResponse::default(),
239 )
240 })
241 .boxed()
242}
243
244fn decode_trace_body(
245 body: Bytes,
246 events_received: &Registered<EventsReceived>,
247) -> Result<Vec<Event>, ErrorMessage> {
248 let request = ExportTraceServiceRequest::decode(body).map_err(|error| {
249 ErrorMessage::new(
250 StatusCode::BAD_REQUEST,
251 format!("Could not decode request: {error}"),
252 )
253 })?;
254
255 let events: Vec<Event> = request
256 .resource_spans
257 .into_iter()
258 .flat_map(|v| v.into_event_iter())
259 .collect();
260
261 events_received.emit(CountByteSize(
262 events.len(),
263 events.estimated_json_encoded_size_of(),
264 ));
265
266 Ok(events)
267}
268
269fn decode_log_body(
270 body: Bytes,
271 log_namespace: LogNamespace,
272 events_received: &Registered<EventsReceived>,
273) -> Result<Vec<Event>, ErrorMessage> {
274 let request = ExportLogsServiceRequest::decode(body).map_err(|error| {
275 ErrorMessage::new(
276 StatusCode::BAD_REQUEST,
277 format!("Could not decode request: {error}"),
278 )
279 })?;
280
281 let events: Vec<Event> = request
282 .resource_logs
283 .into_iter()
284 .flat_map(|v| v.into_event_iter(log_namespace))
285 .collect();
286
287 events_received.emit(CountByteSize(
288 events.len(),
289 events.estimated_json_encoded_size_of(),
290 ));
291
292 Ok(events)
293}
294
295fn decode_metrics_body(
296 body: Bytes,
297 events_received: &Registered<EventsReceived>,
298) -> Result<Vec<Event>, ErrorMessage> {
299 let request = ExportMetricsServiceRequest::decode(body).map_err(|error| {
300 ErrorMessage::new(
301 StatusCode::BAD_REQUEST,
302 format!("Could not decode request: {error}"),
303 )
304 })?;
305
306 let events: Vec<Event> = request
307 .resource_metrics
308 .into_iter()
309 .flat_map(|v| v.into_event_iter())
310 .collect();
311
312 events_received.emit(CountByteSize(
313 events.len(),
314 events.estimated_json_encoded_size_of(),
315 ));
316
317 Ok(events)
318}
319
320async fn handle_request(
321 events: Result<Vec<Event>, ErrorMessage>,
322 acknowledgements: bool,
323 mut out: SourceSender,
324 output: &str,
325 resp: impl Message,
326) -> Result<Response, Rejection> {
327 match events {
328 Ok(mut events) => {
329 let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
330 let count = events.len();
331
332 out.send_batch_named(output, events).await.map_err(|_| {
333 emit!(StreamClosedError { count });
334 warp::reject::custom(ApiError::ServerShutdown)
335 })?;
336
337 match receiver {
338 None => Ok(protobuf(resp).into_response()),
339 Some(receiver) => match receiver.await {
340 BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
341 BatchStatus::Errored => Err(warp::reject::custom(Status {
342 code: 2, message: "Error delivering contents to sink".into(),
344 ..Default::default()
345 })),
346 BatchStatus::Rejected => Err(warp::reject::custom(Status {
347 code: 2, message: "Contents failed to deliver to sink".into(),
349 ..Default::default()
350 })),
351 },
352 }
353 }
354 Err(err) => Err(warp::reject::custom(err)),
355 }
356}
357
358async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
359 if let Some(err_msg) = err.find::<ErrorMessage>() {
360 let reply = protobuf(Status {
361 code: 2, message: err_msg.message().into(),
363 ..Default::default()
364 });
365
366 Ok(warp::reply::with_status(reply, err_msg.status_code()))
367 } else {
368 let reply = protobuf(Status {
369 code: 2, message: format!("{err:?}"),
371 ..Default::default()
372 });
373
374 Ok(warp::reply::with_status(
375 reply,
376 StatusCode::INTERNAL_SERVER_ERROR,
377 ))
378 }
379}