vector/sources/opentelemetry/
http.rs

1use std::{convert::Infallible, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::StatusCode;
6use hyper::{Server, service::make_service_fn};
7use prost::Message;
8use snafu::Snafu;
9use tokio::net::TcpStream;
10use tower::ServiceBuilder;
11use tracing::Span;
12use vector_lib::{
13    EstimatedJsonEncodedSizeOf,
14    codecs::decoding::{OtlpDeserializer, format::Deserializer},
15    config::LogNamespace,
16    event::{BatchNotifier, BatchStatus},
17    internal_event::{
18        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19    },
20    opentelemetry::proto::collector::{
21        logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse},
22        metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse},
23        trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
24    },
25    tls::MaybeTlsIncomingStream,
26};
27use warp::{
28    Filter, Reply, filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response,
29};
30
31use super::{reply::protobuf, status::Status};
32use crate::{
33    SourceSender,
34    common::http::ErrorMessage,
35    event::Event,
36    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
37    internal_events::{EventsReceived, HttpBadRequest, StreamClosedError},
38    shutdown::ShutdownSignal,
39    sources::{
40        http_server::HttpConfigParamKind,
41        opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES},
42        util::{add_headers, decompress_body},
43    },
44    tls::MaybeTlsSettings,
45};
46
47#[derive(Clone, Copy, Debug, Snafu)]
48pub(crate) enum ApiError {
49    ServerShutdown,
50}
51
52impl warp::reject::Reject for ApiError {}
53
54pub(crate) async fn run_http_server(
55    address: SocketAddr,
56    tls_settings: MaybeTlsSettings,
57    filters: BoxedFilter<(Response,)>,
58    shutdown: ShutdownSignal,
59    keepalive_settings: KeepaliveConfig,
60) -> crate::Result<()> {
61    let listener = tls_settings.bind(&address).await?;
62    let routes = filters.recover(handle_rejection);
63
64    info!(message = "Building HTTP server.", address = %address);
65
66    let span = Span::current();
67    let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
68        let svc = ServiceBuilder::new()
69            .layer(build_http_trace_layer(span.clone()))
70            .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
71                MaxConnectionAgeLayer::new(
72                    Duration::from_secs(secs),
73                    keepalive_settings.max_connection_age_jitter_factor,
74                    conn.peer_addr(),
75                )
76            }))
77            .service(warp::service(routes.clone()));
78        futures_util::future::ok::<_, Infallible>(svc)
79    });
80
81    Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
82        .serve(make_svc)
83        .with_graceful_shutdown(shutdown.map(|_| ()))
84        .await?;
85
86    Ok(())
87}
88
89#[allow(clippy::too_many_arguments)] // TODO change to a builder struct
90pub(crate) fn build_warp_filter(
91    acknowledgements: bool,
92    log_namespace: LogNamespace,
93    out: SourceSender,
94    bytes_received: Registered<BytesReceived>,
95    events_received: Registered<EventsReceived>,
96    headers: Vec<HttpConfigParamKind>,
97    logs_deserializer: Option<OtlpDeserializer>,
98    metrics_deserializer: Option<OtlpDeserializer>,
99    traces_deserializer: Option<OtlpDeserializer>,
100) -> BoxedFilter<(Response,)> {
101    let log_filters = build_warp_log_filter(
102        acknowledgements,
103        log_namespace,
104        out.clone(),
105        bytes_received.clone(),
106        events_received.clone(),
107        headers.clone(),
108        logs_deserializer,
109    );
110    let metrics_filters = build_warp_metrics_filter(
111        acknowledgements,
112        out.clone(),
113        bytes_received.clone(),
114        events_received.clone(),
115        metrics_deserializer,
116    );
117    let trace_filters = build_warp_trace_filter(
118        acknowledgements,
119        out.clone(),
120        bytes_received,
121        events_received,
122        traces_deserializer,
123    );
124    log_filters
125        .or(trace_filters)
126        .unify()
127        .or(metrics_filters)
128        .unify()
129        .boxed()
130}
131
132fn enrich_events(
133    events: &mut [Event],
134    headers_config: &[HttpConfigParamKind],
135    headers: &HeaderMap,
136    log_namespace: LogNamespace,
137) {
138    add_headers(
139        events,
140        headers_config,
141        headers,
142        log_namespace,
143        OpentelemetryConfig::NAME,
144    );
145}
146
147fn emit_decode_error(error: impl std::fmt::Display) -> ErrorMessage {
148    let message = format!("Could not decode request: {error}");
149    emit!(HttpBadRequest::new(
150        StatusCode::BAD_REQUEST.as_u16(),
151        &message
152    ));
153    ErrorMessage::new(StatusCode::BAD_REQUEST, message)
154}
155
156fn parse_with_deserializer(
157    deserializer: &OtlpDeserializer,
158    body: Bytes,
159    log_namespace: LogNamespace,
160) -> Result<Vec<Event>, ErrorMessage> {
161    deserializer
162        .parse(body, log_namespace)
163        .map(|r| r.into_vec())
164        .map_err(emit_decode_error)
165}
166
167fn build_ingest_filter<Resp, F>(
168    telemetry_type: &'static str,
169    acknowledgements: bool,
170    out: SourceSender,
171    make_events: F,
172) -> BoxedFilter<(Response,)>
173where
174    Resp: prost::Message + Default + Send + 'static,
175    F: Clone
176        + Send
177        + Sync
178        + 'static
179        + Fn(Option<String>, HeaderMap, Bytes) -> Result<Vec<Event>, ErrorMessage>,
180{
181    warp::post()
182        .and(warp::path("v1"))
183        .and(warp::path(telemetry_type))
184        .and(warp::path::end())
185        .and(warp::header::exact_ignore_case(
186            "content-type",
187            "application/x-protobuf",
188        ))
189        .and(warp::header::optional::<String>("content-encoding"))
190        .and(warp::header::headers_cloned())
191        .and(warp::body::bytes())
192        .and_then(
193            move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
194                let events = make_events(encoding_header, headers, body);
195                handle_request(
196                    events,
197                    acknowledgements,
198                    out.clone(),
199                    telemetry_type,
200                    Resp::default(),
201                )
202            },
203        )
204        .boxed()
205}
206
207fn build_warp_log_filter(
208    acknowledgements: bool,
209    log_namespace: LogNamespace,
210    source_sender: SourceSender,
211    bytes_received: Registered<BytesReceived>,
212    events_received: Registered<EventsReceived>,
213    headers_cfg: Vec<HttpConfigParamKind>,
214    deserializer: Option<OtlpDeserializer>,
215) -> BoxedFilter<(Response,)> {
216    let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
217        decompress_body(encoding_header.as_deref(), body)
218            .inspect_err(|err| {
219                // Other status codes are already handled by `sources::util::decompress_body` (tech debt).
220                if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
221                    emit!(HttpBadRequest::new(
222                        err.status_code().as_u16(),
223                        err.message()
224                    ));
225                }
226            })
227            .and_then(|decoded_body| {
228                bytes_received.emit(ByteSize(decoded_body.len()));
229                if let Some(d) = deserializer.as_ref() {
230                    parse_with_deserializer(d, decoded_body, log_namespace)
231                } else {
232                    decode_log_body(decoded_body, log_namespace, &events_received).map(
233                        |mut events| {
234                            enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
235                            events
236                        },
237                    )
238                }
239            })
240    };
241
242    build_ingest_filter::<ExportLogsServiceResponse, _>(
243        LOGS,
244        acknowledgements,
245        source_sender,
246        make_events,
247    )
248}
249fn build_warp_metrics_filter(
250    acknowledgements: bool,
251    source_sender: SourceSender,
252    bytes_received: Registered<BytesReceived>,
253    events_received: Registered<EventsReceived>,
254    deserializer: Option<OtlpDeserializer>,
255) -> BoxedFilter<(Response,)> {
256    let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
257        decompress_body(encoding_header.as_deref(), body)
258            .inspect_err(|err| {
259                // Other status codes are already handled by `sources::util::decompress_body` (tech debt).
260                if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
261                    emit!(HttpBadRequest::new(
262                        err.status_code().as_u16(),
263                        err.message()
264                    ));
265                }
266            })
267            .and_then(|decoded_body| {
268                bytes_received.emit(ByteSize(decoded_body.len()));
269                if let Some(d) = deserializer.as_ref() {
270                    parse_with_deserializer(d, decoded_body, LogNamespace::default())
271                } else {
272                    decode_metrics_body(decoded_body, &events_received)
273                }
274            })
275    };
276
277    build_ingest_filter::<ExportMetricsServiceResponse, _>(
278        METRICS,
279        acknowledgements,
280        source_sender,
281        make_events,
282    )
283}
284
285fn build_warp_trace_filter(
286    acknowledgements: bool,
287    source_sender: SourceSender,
288    bytes_received: Registered<BytesReceived>,
289    events_received: Registered<EventsReceived>,
290    deserializer: Option<OtlpDeserializer>,
291) -> BoxedFilter<(Response,)> {
292    let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
293        decompress_body(encoding_header.as_deref(), body)
294            .inspect_err(|err| {
295                // Other status codes are already handled by `sources::util::decompress_body` (tech debt).
296                if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
297                    emit!(HttpBadRequest::new(
298                        err.status_code().as_u16(),
299                        err.message()
300                    ));
301                }
302            })
303            .and_then(|decoded_body| {
304                bytes_received.emit(ByteSize(decoded_body.len()));
305                if let Some(d) = deserializer.as_ref() {
306                    parse_with_deserializer(d, decoded_body, LogNamespace::default())
307                } else {
308                    decode_trace_body(decoded_body, &events_received)
309                }
310            })
311    };
312
313    build_ingest_filter::<ExportTraceServiceResponse, _>(
314        TRACES,
315        acknowledgements,
316        source_sender,
317        make_events,
318    )
319}
320
321fn decode_trace_body(
322    body: Bytes,
323    events_received: &Registered<EventsReceived>,
324) -> Result<Vec<Event>, ErrorMessage> {
325    let request = ExportTraceServiceRequest::decode(body).map_err(emit_decode_error)?;
326
327    let events: Vec<Event> = request
328        .resource_spans
329        .into_iter()
330        .flat_map(|v| v.into_event_iter())
331        .collect();
332
333    events_received.emit(CountByteSize(
334        events.len(),
335        events.estimated_json_encoded_size_of(),
336    ));
337
338    Ok(events)
339}
340
341fn decode_log_body(
342    body: Bytes,
343    log_namespace: LogNamespace,
344    events_received: &Registered<EventsReceived>,
345) -> Result<Vec<Event>, ErrorMessage> {
346    let request = ExportLogsServiceRequest::decode(body).map_err(emit_decode_error)?;
347
348    let events: Vec<Event> = request
349        .resource_logs
350        .into_iter()
351        .flat_map(|v| v.into_event_iter(log_namespace))
352        .collect();
353
354    events_received.emit(CountByteSize(
355        events.len(),
356        events.estimated_json_encoded_size_of(),
357    ));
358
359    Ok(events)
360}
361
362fn decode_metrics_body(
363    body: Bytes,
364    events_received: &Registered<EventsReceived>,
365) -> Result<Vec<Event>, ErrorMessage> {
366    let request = ExportMetricsServiceRequest::decode(body).map_err(emit_decode_error)?;
367
368    let events: Vec<Event> = request
369        .resource_metrics
370        .into_iter()
371        .flat_map(|v| v.into_event_iter())
372        .collect();
373
374    events_received.emit(CountByteSize(
375        events.len(),
376        events.estimated_json_encoded_size_of(),
377    ));
378
379    Ok(events)
380}
381
382async fn handle_request(
383    events: Result<Vec<Event>, ErrorMessage>,
384    acknowledgements: bool,
385    mut out: SourceSender,
386    output: &str,
387    resp: impl Message,
388) -> Result<Response, Rejection> {
389    match events {
390        Ok(mut events) => {
391            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
392            let count = events.len();
393
394            out.send_batch_named(output, events).await.map_err(|_| {
395                emit!(StreamClosedError { count });
396                warp::reject::custom(ApiError::ServerShutdown)
397            })?;
398
399            match receiver {
400                None => Ok(protobuf(resp).into_response()),
401                Some(receiver) => match receiver.await {
402                    BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
403                    BatchStatus::Errored => Err(warp::reject::custom(Status {
404                        code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
405                        message: "Error delivering contents to sink".into(),
406                        ..Default::default()
407                    })),
408                    BatchStatus::Rejected => Err(warp::reject::custom(Status {
409                        code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
410                        message: "Contents failed to deliver to sink".into(),
411                        ..Default::default()
412                    })),
413                },
414            }
415        }
416        Err(err) => Err(warp::reject::custom(err)),
417    }
418}
419
420async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
421    if let Some(err_msg) = err.find::<ErrorMessage>() {
422        let reply = protobuf(Status {
423            code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
424            message: err_msg.message().into(),
425            ..Default::default()
426        });
427
428        Ok(warp::reply::with_status(reply, err_msg.status_code()))
429    } else {
430        let reply = protobuf(Status {
431            code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
432            message: format!("{err:?}"),
433            ..Default::default()
434        });
435
436        Ok(warp::reply::with_status(
437            reply,
438            StatusCode::INTERNAL_SERVER_ERROR,
439        ))
440    }
441}