vector/sources/opentelemetry/
http.rs

1use std::{convert::Infallible, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::StatusCode;
6use hyper::{Server, service::make_service_fn};
7use prost::Message;
8use snafu::Snafu;
9use tokio::net::TcpStream;
10use tower::ServiceBuilder;
11use tracing::Span;
12use vector_lib::{
13    EstimatedJsonEncodedSizeOf,
14    codecs::decoding::{OtlpDeserializer, format::Deserializer},
15    config::LogNamespace,
16    event::{BatchNotifier, BatchStatus},
17    internal_event::{
18        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19    },
20    opentelemetry::proto::collector::{
21        logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse},
22        metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse},
23        trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
24    },
25    tls::MaybeTlsIncomingStream,
26};
27use warp::{
28    Filter, Reply, filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response,
29};
30
31use super::{reply::protobuf, status::Status};
32use crate::{
33    SourceSender,
34    common::http::ErrorMessage,
35    event::Event,
36    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
37    internal_events::{EventsReceived, HttpBadRequest, StreamClosedError},
38    shutdown::ShutdownSignal,
39    sources::{
40        http_server::HttpConfigParamKind,
41        opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES},
42        util::{add_headers, decompress_body},
43    },
44    tls::MaybeTlsSettings,
45};
46
47#[derive(Clone, Copy, Debug, Snafu)]
48pub(crate) enum ApiError {
49    ServerShutdown,
50}
51
52impl warp::reject::Reject for ApiError {}
53
54pub(crate) async fn run_http_server(
55    address: SocketAddr,
56    tls_settings: MaybeTlsSettings,
57    filters: BoxedFilter<(Response,)>,
58    shutdown: ShutdownSignal,
59    keepalive_settings: KeepaliveConfig,
60) -> crate::Result<()> {
61    let listener = tls_settings.bind(&address).await?;
62    let routes = filters.recover(handle_rejection);
63
64    info!(message = "Building HTTP server.", address = %address);
65
66    let span = Span::current();
67    let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
68        let svc = ServiceBuilder::new()
69            .layer(build_http_trace_layer(span.clone()))
70            .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
71                MaxConnectionAgeLayer::new(
72                    Duration::from_secs(secs),
73                    keepalive_settings.max_connection_age_jitter_factor,
74                    conn.peer_addr(),
75                )
76            }))
77            .service(warp::service(routes.clone()));
78        futures_util::future::ok::<_, Infallible>(svc)
79    });
80
81    Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
82        .serve(make_svc)
83        .with_graceful_shutdown(shutdown.map(|_| ()))
84        .await?;
85
86    Ok(())
87}
88
89#[allow(clippy::too_many_arguments)] // TODO change to a builder struct
90pub(crate) fn build_warp_filter(
91    acknowledgements: bool,
92    log_namespace: LogNamespace,
93    out: SourceSender,
94    bytes_received: Registered<BytesReceived>,
95    events_received: Registered<EventsReceived>,
96    headers: Vec<HttpConfigParamKind>,
97    logs_deserializer: Option<OtlpDeserializer>,
98    metrics_deserializer: Option<OtlpDeserializer>,
99    traces_deserializer: Option<OtlpDeserializer>,
100) -> BoxedFilter<(Response,)> {
101    let log_filters = build_warp_log_filter(
102        acknowledgements,
103        log_namespace,
104        out.clone(),
105        bytes_received.clone(),
106        events_received.clone(),
107        headers.clone(),
108        logs_deserializer,
109    );
110    let metrics_filters = build_warp_metrics_filter(
111        acknowledgements,
112        log_namespace,
113        out.clone(),
114        bytes_received.clone(),
115        events_received.clone(),
116        headers.clone(),
117        metrics_deserializer,
118    );
119    let trace_filters = build_warp_trace_filter(
120        acknowledgements,
121        out.clone(),
122        bytes_received,
123        events_received,
124        headers.clone(),
125        traces_deserializer,
126    );
127    log_filters
128        .or(trace_filters)
129        .unify()
130        .or(metrics_filters)
131        .unify()
132        .boxed()
133}
134
135fn enrich_events(
136    events: &mut [Event],
137    headers_config: &[HttpConfigParamKind],
138    headers: &HeaderMap,
139    log_namespace: LogNamespace,
140) {
141    add_headers(
142        events,
143        headers_config,
144        headers,
145        log_namespace,
146        OpentelemetryConfig::NAME,
147    );
148}
149
150fn emit_decode_error(error: impl std::fmt::Display) -> ErrorMessage {
151    let message = format!("Could not decode request: {error}");
152    emit!(HttpBadRequest::new(
153        StatusCode::BAD_REQUEST.as_u16(),
154        &message
155    ));
156    ErrorMessage::new(StatusCode::BAD_REQUEST, message)
157}
158
159fn parse_with_deserializer(
160    deserializer: &OtlpDeserializer,
161    body: Bytes,
162    log_namespace: LogNamespace,
163    events_received: &Registered<EventsReceived>,
164) -> Result<Vec<Event>, ErrorMessage> {
165    let events = deserializer
166        .parse(body, log_namespace)
167        .map(|r| r.into_vec())
168        .map_err(emit_decode_error)?;
169
170    // Count individual items within OTLP batches for consistency with other sources
171    let count = super::count_otlp_items(&events);
172    events_received.emit(CountByteSize(
173        count,
174        events.estimated_json_encoded_size_of(),
175    ));
176
177    Ok(events)
178}
179
180fn build_ingest_filter<Resp, F>(
181    telemetry_type: &'static str,
182    acknowledgements: bool,
183    out: SourceSender,
184    make_events: F,
185) -> BoxedFilter<(Response,)>
186where
187    Resp: prost::Message + Default + Send + 'static,
188    F: Clone
189        + Send
190        + Sync
191        + 'static
192        + Fn(Option<String>, HeaderMap, Bytes) -> Result<Vec<Event>, ErrorMessage>,
193{
194    warp::post()
195        .and(warp::path("v1"))
196        .and(warp::path(telemetry_type))
197        .and(warp::path::end())
198        .and(warp::header::exact_ignore_case(
199            "content-type",
200            "application/x-protobuf",
201        ))
202        .and(warp::header::optional::<String>("content-encoding"))
203        .and(warp::header::headers_cloned())
204        .and(warp::body::bytes())
205        .and_then(
206            move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
207                let events = make_events(encoding_header, headers, body);
208                handle_request(
209                    events,
210                    acknowledgements,
211                    out.clone(),
212                    telemetry_type,
213                    Resp::default(),
214                )
215            },
216        )
217        .boxed()
218}
219
220fn build_warp_log_filter(
221    acknowledgements: bool,
222    log_namespace: LogNamespace,
223    source_sender: SourceSender,
224    bytes_received: Registered<BytesReceived>,
225    events_received: Registered<EventsReceived>,
226    headers_cfg: Vec<HttpConfigParamKind>,
227    deserializer: Option<OtlpDeserializer>,
228) -> BoxedFilter<(Response,)> {
229    let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
230        decompress_body(encoding_header.as_deref(), body)
231            .inspect_err(|err| {
232                // Other status codes are already handled by `sources::util::decompress_body` (tech debt).
233                if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
234                    emit!(HttpBadRequest::new(
235                        err.status_code().as_u16(),
236                        err.message()
237                    ));
238                }
239            })
240            .and_then(|decoded_body| {
241                bytes_received.emit(ByteSize(decoded_body.len()));
242                if let Some(d) = deserializer.as_ref() {
243                    parse_with_deserializer(d, decoded_body, log_namespace, &events_received)
244                } else {
245                    decode_log_body(decoded_body, log_namespace, &events_received)
246                }
247                .map(|mut events| {
248                    enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
249                    events
250                })
251            })
252    };
253
254    build_ingest_filter::<ExportLogsServiceResponse, _>(
255        LOGS,
256        acknowledgements,
257        source_sender,
258        make_events,
259    )
260}
261fn build_warp_metrics_filter(
262    acknowledgements: bool,
263    log_namespace: LogNamespace,
264    source_sender: SourceSender,
265    bytes_received: Registered<BytesReceived>,
266    events_received: Registered<EventsReceived>,
267    headers_cfg: Vec<HttpConfigParamKind>,
268    deserializer: Option<OtlpDeserializer>,
269) -> BoxedFilter<(Response,)> {
270    let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
271        decompress_body(encoding_header.as_deref(), body)
272            .inspect_err(|err| {
273                // Other status codes are already handled by `sources::util::decompress_body` (tech debt).
274                if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
275                    emit!(HttpBadRequest::new(
276                        err.status_code().as_u16(),
277                        err.message()
278                    ));
279                }
280            })
281            .and_then(|decoded_body| {
282                bytes_received.emit(ByteSize(decoded_body.len()));
283                if let Some(d) = deserializer.as_ref() {
284                    parse_with_deserializer(d, decoded_body, log_namespace, &events_received)
285                } else {
286                    decode_metrics_body(decoded_body, &events_received)
287                }
288                .map(|mut events| {
289                    enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
290                    events
291                })
292            })
293    };
294
295    build_ingest_filter::<ExportMetricsServiceResponse, _>(
296        METRICS,
297        acknowledgements,
298        source_sender,
299        make_events,
300    )
301}
302
303fn build_warp_trace_filter(
304    acknowledgements: bool,
305    source_sender: SourceSender,
306    bytes_received: Registered<BytesReceived>,
307    events_received: Registered<EventsReceived>,
308    headers_cfg: Vec<HttpConfigParamKind>,
309    deserializer: Option<OtlpDeserializer>,
310) -> BoxedFilter<(Response,)> {
311    let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
312        decompress_body(encoding_header.as_deref(), body)
313            .inspect_err(|err| {
314                // Other status codes are already handled by `sources::util::decompress_body` (tech debt).
315                if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
316                    emit!(HttpBadRequest::new(
317                        err.status_code().as_u16(),
318                        err.message()
319                    ));
320                }
321            })
322            .and_then(|decoded_body| {
323                bytes_received.emit(ByteSize(decoded_body.len()));
324                if let Some(d) = deserializer.as_ref() {
325                    parse_with_deserializer(
326                        d,
327                        decoded_body,
328                        LogNamespace::default(),
329                        &events_received,
330                    )
331                } else {
332                    decode_trace_body(decoded_body, &events_received)
333                }
334                .map(|mut events| {
335                    enrich_events(&mut events, &headers_cfg, &headers, LogNamespace::default());
336                    events
337                })
338            })
339    };
340
341    build_ingest_filter::<ExportTraceServiceResponse, _>(
342        TRACES,
343        acknowledgements,
344        source_sender,
345        make_events,
346    )
347}
348
349fn decode_trace_body(
350    body: Bytes,
351    events_received: &Registered<EventsReceived>,
352) -> Result<Vec<Event>, ErrorMessage> {
353    let request = ExportTraceServiceRequest::decode(body).map_err(emit_decode_error)?;
354
355    let events: Vec<Event> = request
356        .resource_spans
357        .into_iter()
358        .flat_map(|v| v.into_event_iter())
359        .collect();
360
361    events_received.emit(CountByteSize(
362        events.len(),
363        events.estimated_json_encoded_size_of(),
364    ));
365
366    Ok(events)
367}
368
369fn decode_log_body(
370    body: Bytes,
371    log_namespace: LogNamespace,
372    events_received: &Registered<EventsReceived>,
373) -> Result<Vec<Event>, ErrorMessage> {
374    let request = ExportLogsServiceRequest::decode(body).map_err(emit_decode_error)?;
375
376    let events: Vec<Event> = request
377        .resource_logs
378        .into_iter()
379        .flat_map(|v| v.into_event_iter(log_namespace))
380        .collect();
381
382    events_received.emit(CountByteSize(
383        events.len(),
384        events.estimated_json_encoded_size_of(),
385    ));
386
387    Ok(events)
388}
389
390fn decode_metrics_body(
391    body: Bytes,
392    events_received: &Registered<EventsReceived>,
393) -> Result<Vec<Event>, ErrorMessage> {
394    let request = ExportMetricsServiceRequest::decode(body).map_err(emit_decode_error)?;
395
396    let events: Vec<Event> = request
397        .resource_metrics
398        .into_iter()
399        .flat_map(|v| v.into_event_iter())
400        .collect();
401
402    events_received.emit(CountByteSize(
403        events.len(),
404        events.estimated_json_encoded_size_of(),
405    ));
406
407    Ok(events)
408}
409
410async fn handle_request(
411    events: Result<Vec<Event>, ErrorMessage>,
412    acknowledgements: bool,
413    mut out: SourceSender,
414    output: &str,
415    resp: impl Message,
416) -> Result<Response, Rejection> {
417    match events {
418        Ok(mut events) => {
419            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
420            let count = events.len();
421
422            out.send_batch_named(output, events).await.map_err(|_| {
423                emit!(StreamClosedError { count });
424                warp::reject::custom(ApiError::ServerShutdown)
425            })?;
426
427            match receiver {
428                None => Ok(protobuf(resp).into_response()),
429                Some(receiver) => match receiver.await {
430                    BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
431                    BatchStatus::Errored => Err(warp::reject::custom(Status {
432                        code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
433                        message: "Error delivering contents to sink".into(),
434                        ..Default::default()
435                    })),
436                    BatchStatus::Rejected => Err(warp::reject::custom(Status {
437                        code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
438                        message: "Contents failed to deliver to sink".into(),
439                        ..Default::default()
440                    })),
441                },
442            }
443        }
444        Err(err) => Err(warp::reject::custom(err)),
445    }
446}
447
448async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
449    if let Some(err_msg) = err.find::<ErrorMessage>() {
450        let reply = protobuf(Status {
451            code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
452            message: err_msg.message().into(),
453            ..Default::default()
454        });
455
456        Ok(warp::reply::with_status(reply, err_msg.status_code()))
457    } else {
458        let reply = protobuf(Status {
459            code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
460            message: format!("{err:?}"),
461            ..Default::default()
462        });
463
464        Ok(warp::reply::with_status(
465            reply,
466            StatusCode::INTERNAL_SERVER_ERROR,
467        ))
468    }
469}