1use std::{convert::Infallible, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::StatusCode;
6use hyper::{Server, service::make_service_fn};
7use prost::Message;
8use snafu::Snafu;
9use tokio::net::TcpStream;
10use tower::ServiceBuilder;
11use tracing::Span;
12use vector_lib::{
13    EstimatedJsonEncodedSizeOf,
14    codecs::decoding::{ProtobufDeserializer, format::Deserializer},
15    config::LogNamespace,
16    event::{BatchNotifier, BatchStatus},
17    internal_event::{
18        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19    },
20    opentelemetry::proto::collector::{
21        logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse},
22        metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse},
23        trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
24    },
25    tls::MaybeTlsIncomingStream,
26};
27use warp::{
28    Filter, Reply, filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response,
29};
30
31use super::{reply::protobuf, status::Status};
32use crate::{
33    SourceSender,
34    common::http::ErrorMessage,
35    event::Event,
36    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
37    internal_events::{EventsReceived, StreamClosedError},
38    shutdown::ShutdownSignal,
39    sources::{
40        http_server::HttpConfigParamKind,
41        opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES},
42        util::{add_headers, decode},
43    },
44    tls::MaybeTlsSettings,
45};
46
47#[derive(Clone, Copy, Debug, Snafu)]
48pub(crate) enum ApiError {
49    ServerShutdown,
50}
51
52impl warp::reject::Reject for ApiError {}
53
54pub(crate) async fn run_http_server(
55    address: SocketAddr,
56    tls_settings: MaybeTlsSettings,
57    filters: BoxedFilter<(Response,)>,
58    shutdown: ShutdownSignal,
59    keepalive_settings: KeepaliveConfig,
60) -> crate::Result<()> {
61    let listener = tls_settings.bind(&address).await?;
62    let routes = filters.recover(handle_rejection);
63
64    info!(message = "Building HTTP server.", address = %address);
65
66    let span = Span::current();
67    let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
68        let svc = ServiceBuilder::new()
69            .layer(build_http_trace_layer(span.clone()))
70            .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
71                MaxConnectionAgeLayer::new(
72                    Duration::from_secs(secs),
73                    keepalive_settings.max_connection_age_jitter_factor,
74                    conn.peer_addr(),
75                )
76            }))
77            .service(warp::service(routes.clone()));
78        futures_util::future::ok::<_, Infallible>(svc)
79    });
80
81    Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
82        .serve(make_svc)
83        .with_graceful_shutdown(shutdown.map(|_| ()))
84        .await?;
85
86    Ok(())
87}
88
89pub(crate) fn build_warp_filter(
90    acknowledgements: bool,
91    log_namespace: LogNamespace,
92    out: SourceSender,
93    bytes_received: Registered<BytesReceived>,
94    events_received: Registered<EventsReceived>,
95    headers: Vec<HttpConfigParamKind>,
96    deserializer: Option<ProtobufDeserializer>,
97) -> BoxedFilter<(Response,)> {
98    let log_filters = build_warp_log_filter(
99        acknowledgements,
100        log_namespace,
101        out.clone(),
102        bytes_received.clone(),
103        events_received.clone(),
104        headers.clone(),
105        deserializer.clone(),
106    );
107    let metrics_filters = build_warp_metrics_filter(
108        acknowledgements,
109        out.clone(),
110        bytes_received.clone(),
111        events_received.clone(),
112        deserializer.clone(),
113    );
114    let trace_filters = build_warp_trace_filter(
115        acknowledgements,
116        out.clone(),
117        bytes_received,
118        events_received,
119        deserializer,
120    );
121    log_filters
122        .or(trace_filters)
123        .unify()
124        .or(metrics_filters)
125        .unify()
126        .boxed()
127}
128
129fn enrich_events(
130    events: &mut [Event],
131    headers_config: &[HttpConfigParamKind],
132    headers: &HeaderMap,
133    log_namespace: LogNamespace,
134) {
135    add_headers(
136        events,
137        headers_config,
138        headers,
139        log_namespace,
140        OpentelemetryConfig::NAME,
141    );
142}
143
144fn build_ingest_filter<Resp, F>(
145    telemetry_type: &'static str,
146    acknowledgements: bool,
147    out: SourceSender,
148    make_events: F,
149) -> BoxedFilter<(Response,)>
150where
151    Resp: prost::Message + Default + Send + 'static,
152    F: Clone
153        + Send
154        + Sync
155        + 'static
156        + Fn(Option<String>, HeaderMap, Bytes) -> Result<Vec<Event>, ErrorMessage>,
157{
158    warp::post()
159        .and(warp::path("v1"))
160        .and(warp::path(telemetry_type))
161        .and(warp::path::end())
162        .and(warp::header::exact_ignore_case(
163            "content-type",
164            "application/x-protobuf",
165        ))
166        .and(warp::header::optional::<String>("content-encoding"))
167        .and(warp::header::headers_cloned())
168        .and(warp::body::bytes())
169        .and_then(
170            move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
171                let events = make_events(encoding_header, headers, body);
172                handle_request(
173                    events,
174                    acknowledgements,
175                    out.clone(),
176                    telemetry_type,
177                    Resp::default(),
178                )
179            },
180        )
181        .boxed()
182}
183
184fn build_warp_log_filter(
185    acknowledgements: bool,
186    log_namespace: LogNamespace,
187    source_sender: SourceSender,
188    bytes_received: Registered<BytesReceived>,
189    events_received: Registered<EventsReceived>,
190    headers_cfg: Vec<HttpConfigParamKind>,
191    deserializer: Option<ProtobufDeserializer>,
192) -> BoxedFilter<(Response,)> {
193    let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
194        if let Some(d) = deserializer.as_ref() {
195            d.parse(body, log_namespace)
196                .map(|r| r.into_vec())
197                .map_err(|e| ErrorMessage::new(StatusCode::BAD_REQUEST, e.to_string()))
198        } else {
199            decode(encoding_header.as_deref(), body)
200                .and_then(|body| {
201                    bytes_received.emit(ByteSize(body.len()));
202                    decode_log_body(body, log_namespace, &events_received)
203                })
204                .map(|mut events| {
205                    enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
206                    events
207                })
208        }
209    };
210
211    build_ingest_filter::<ExportLogsServiceResponse, _>(
212        LOGS,
213        acknowledgements,
214        source_sender,
215        make_events,
216    )
217}
218fn build_warp_metrics_filter(
219    acknowledgements: bool,
220    source_sender: SourceSender,
221    bytes_received: Registered<BytesReceived>,
222    events_received: Registered<EventsReceived>,
223    deserializer: Option<ProtobufDeserializer>,
224) -> BoxedFilter<(Response,)> {
225    let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
226        if let Some(d) = deserializer.as_ref() {
227            d.parse(body, LogNamespace::default())
228                .map(|r| r.into_vec())
229                .map_err(|e| ErrorMessage::new(StatusCode::BAD_REQUEST, e.to_string()))
230        } else {
231            decode(encoding_header.as_deref(), body).and_then(|body| {
232                bytes_received.emit(ByteSize(body.len()));
233                decode_metrics_body(body, &events_received)
234            })
235        }
236    };
237
238    build_ingest_filter::<ExportMetricsServiceResponse, _>(
239        METRICS,
240        acknowledgements,
241        source_sender,
242        make_events,
243    )
244}
245
246fn build_warp_trace_filter(
247    acknowledgements: bool,
248    source_sender: SourceSender,
249    bytes_received: Registered<BytesReceived>,
250    events_received: Registered<EventsReceived>,
251    deserializer: Option<ProtobufDeserializer>,
252) -> BoxedFilter<(Response,)> {
253    let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
254        if let Some(d) = deserializer.as_ref() {
255            d.parse_traces(body)
256                .map(|r| r.into_vec())
257                .map_err(|e| ErrorMessage::new(StatusCode::BAD_REQUEST, e.to_string()))
258        } else {
259            decode(encoding_header.as_deref(), body).and_then(|body| {
260                bytes_received.emit(ByteSize(body.len()));
261                decode_trace_body(body, &events_received)
262            })
263        }
264    };
265
266    build_ingest_filter::<ExportTraceServiceResponse, _>(
267        TRACES,
268        acknowledgements,
269        source_sender,
270        make_events,
271    )
272}
273
274fn decode_trace_body(
275    body: Bytes,
276    events_received: &Registered<EventsReceived>,
277) -> Result<Vec<Event>, ErrorMessage> {
278    let request = ExportTraceServiceRequest::decode(body).map_err(|error| {
279        ErrorMessage::new(
280            StatusCode::BAD_REQUEST,
281            format!("Could not decode request: {error}"),
282        )
283    })?;
284
285    let events: Vec<Event> = request
286        .resource_spans
287        .into_iter()
288        .flat_map(|v| v.into_event_iter())
289        .collect();
290
291    events_received.emit(CountByteSize(
292        events.len(),
293        events.estimated_json_encoded_size_of(),
294    ));
295
296    Ok(events)
297}
298
299fn decode_log_body(
300    body: Bytes,
301    log_namespace: LogNamespace,
302    events_received: &Registered<EventsReceived>,
303) -> Result<Vec<Event>, ErrorMessage> {
304    let request = ExportLogsServiceRequest::decode(body).map_err(|error| {
305        ErrorMessage::new(
306            StatusCode::BAD_REQUEST,
307            format!("Could not decode request: {error}"),
308        )
309    })?;
310
311    let events: Vec<Event> = request
312        .resource_logs
313        .into_iter()
314        .flat_map(|v| v.into_event_iter(log_namespace))
315        .collect();
316
317    events_received.emit(CountByteSize(
318        events.len(),
319        events.estimated_json_encoded_size_of(),
320    ));
321
322    Ok(events)
323}
324
325fn decode_metrics_body(
326    body: Bytes,
327    events_received: &Registered<EventsReceived>,
328) -> Result<Vec<Event>, ErrorMessage> {
329    let request = ExportMetricsServiceRequest::decode(body).map_err(|error| {
330        ErrorMessage::new(
331            StatusCode::BAD_REQUEST,
332            format!("Could not decode request: {error}"),
333        )
334    })?;
335
336    let events: Vec<Event> = request
337        .resource_metrics
338        .into_iter()
339        .flat_map(|v| v.into_event_iter())
340        .collect();
341
342    events_received.emit(CountByteSize(
343        events.len(),
344        events.estimated_json_encoded_size_of(),
345    ));
346
347    Ok(events)
348}
349
350async fn handle_request(
351    events: Result<Vec<Event>, ErrorMessage>,
352    acknowledgements: bool,
353    mut out: SourceSender,
354    output: &str,
355    resp: impl Message,
356) -> Result<Response, Rejection> {
357    match events {
358        Ok(mut events) => {
359            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
360            let count = events.len();
361
362            out.send_batch_named(output, events).await.map_err(|_| {
363                emit!(StreamClosedError { count });
364                warp::reject::custom(ApiError::ServerShutdown)
365            })?;
366
367            match receiver {
368                None => Ok(protobuf(resp).into_response()),
369                Some(receiver) => match receiver.await {
370                    BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
371                    BatchStatus::Errored => Err(warp::reject::custom(Status {
372                        code: 2, message: "Error delivering contents to sink".into(),
374                        ..Default::default()
375                    })),
376                    BatchStatus::Rejected => Err(warp::reject::custom(Status {
377                        code: 2, message: "Contents failed to deliver to sink".into(),
379                        ..Default::default()
380                    })),
381                },
382            }
383        }
384        Err(err) => Err(warp::reject::custom(err)),
385    }
386}
387
388async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
389    if let Some(err_msg) = err.find::<ErrorMessage>() {
390        let reply = protobuf(Status {
391            code: 2, message: err_msg.message().into(),
393            ..Default::default()
394        });
395
396        Ok(warp::reply::with_status(reply, err_msg.status_code()))
397    } else {
398        let reply = protobuf(Status {
399            code: 2, message: format!("{err:?}"),
401            ..Default::default()
402        });
403
404        Ok(warp::reply::with_status(
405            reply,
406            StatusCode::INTERNAL_SERVER_ERROR,
407        ))
408    }
409}