vector/sources/opentelemetry/
http.rs

1use std::{convert::Infallible, net::SocketAddr, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::StatusCode;
6use hyper::{Server, service::make_service_fn};
7use prost::Message;
8use snafu::Snafu;
9use tokio::net::TcpStream;
10use tower::ServiceBuilder;
11use tracing::Span;
12use vector_lib::{
13    EstimatedJsonEncodedSizeOf,
14    codecs::decoding::{OtlpDeserializer, format::Deserializer},
15    config::LogNamespace,
16    event::{BatchNotifier, BatchStatus},
17    internal_event::{
18        ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
19    },
20    opentelemetry::proto::collector::{
21        logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse},
22        metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse},
23        trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
24    },
25    tls::MaybeTlsIncomingStream,
26};
27use warp::{
28    Filter, Reply, filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response,
29};
30
31use super::{reply::protobuf, status::Status};
32use crate::{
33    SourceSender,
34    common::http::ErrorMessage,
35    event::Event,
36    http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
37    internal_events::{EventsReceived, HttpBadRequest, StreamClosedError},
38    shutdown::ShutdownSignal,
39    sources::{
40        http_server::HttpConfigParamKind,
41        opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES},
42        util::{add_headers, decompress_body},
43    },
44    tls::MaybeTlsSettings,
45};
46
47#[derive(Clone, Copy, Debug, Snafu)]
48pub(crate) enum ApiError {
49    ServerShutdown,
50}
51
52impl warp::reject::Reject for ApiError {}
53
54pub(crate) async fn run_http_server(
55    address: SocketAddr,
56    tls_settings: MaybeTlsSettings,
57    filters: BoxedFilter<(Response,)>,
58    shutdown: ShutdownSignal,
59    keepalive_settings: KeepaliveConfig,
60) -> crate::Result<()> {
61    let listener = tls_settings.bind(&address).await?;
62    let routes = filters.recover(handle_rejection);
63
64    info!(message = "Building HTTP server.", address = %address);
65
66    let span = Span::current();
67    let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
68        let svc = ServiceBuilder::new()
69            .layer(build_http_trace_layer(span.clone()))
70            .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
71                MaxConnectionAgeLayer::new(
72                    Duration::from_secs(secs),
73                    keepalive_settings.max_connection_age_jitter_factor,
74                    conn.peer_addr(),
75                )
76            }))
77            .service(warp::service(routes.clone()));
78        futures_util::future::ok::<_, Infallible>(svc)
79    });
80
81    Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
82        .serve(make_svc)
83        .with_graceful_shutdown(shutdown.map(|_| ()))
84        .await?;
85
86    Ok(())
87}
88
89#[allow(clippy::too_many_arguments)] // TODO change to a builder struct
90pub(crate) fn build_warp_filter(
91    acknowledgements: bool,
92    log_namespace: LogNamespace,
93    out: SourceSender,
94    bytes_received: Registered<BytesReceived>,
95    events_received: Registered<EventsReceived>,
96    headers: Vec<HttpConfigParamKind>,
97    logs_deserializer: Option<OtlpDeserializer>,
98    metrics_deserializer: Option<OtlpDeserializer>,
99    traces_deserializer: Option<OtlpDeserializer>,
100) -> BoxedFilter<(Response,)> {
101    let log_filters = build_warp_log_filter(
102        acknowledgements,
103        log_namespace,
104        out.clone(),
105        bytes_received.clone(),
106        events_received.clone(),
107        headers.clone(),
108        logs_deserializer,
109    );
110    let metrics_filters = build_warp_metrics_filter(
111        acknowledgements,
112        out.clone(),
113        bytes_received.clone(),
114        events_received.clone(),
115        metrics_deserializer,
116    );
117    let trace_filters = build_warp_trace_filter(
118        acknowledgements,
119        out.clone(),
120        bytes_received,
121        events_received,
122        traces_deserializer,
123    );
124    log_filters
125        .or(trace_filters)
126        .unify()
127        .or(metrics_filters)
128        .unify()
129        .boxed()
130}
131
132fn enrich_events(
133    events: &mut [Event],
134    headers_config: &[HttpConfigParamKind],
135    headers: &HeaderMap,
136    log_namespace: LogNamespace,
137) {
138    add_headers(
139        events,
140        headers_config,
141        headers,
142        log_namespace,
143        OpentelemetryConfig::NAME,
144    );
145}
146
147fn emit_decode_error(error: impl std::fmt::Display) -> ErrorMessage {
148    let message = format!("Could not decode request: {error}");
149    emit!(HttpBadRequest::new(
150        StatusCode::BAD_REQUEST.as_u16(),
151        &message
152    ));
153    ErrorMessage::new(StatusCode::BAD_REQUEST, message)
154}
155
156fn parse_with_deserializer(
157    deserializer: &OtlpDeserializer,
158    body: Bytes,
159    log_namespace: LogNamespace,
160    events_received: &Registered<EventsReceived>,
161) -> Result<Vec<Event>, ErrorMessage> {
162    let events = deserializer
163        .parse(body, log_namespace)
164        .map(|r| r.into_vec())
165        .map_err(emit_decode_error)?;
166
167    // Count individual items within OTLP batches for consistency with other sources
168    let count = super::count_otlp_items(&events);
169    events_received.emit(CountByteSize(
170        count,
171        events.estimated_json_encoded_size_of(),
172    ));
173
174    Ok(events)
175}
176
177fn build_ingest_filter<Resp, F>(
178    telemetry_type: &'static str,
179    acknowledgements: bool,
180    out: SourceSender,
181    make_events: F,
182) -> BoxedFilter<(Response,)>
183where
184    Resp: prost::Message + Default + Send + 'static,
185    F: Clone
186        + Send
187        + Sync
188        + 'static
189        + Fn(Option<String>, HeaderMap, Bytes) -> Result<Vec<Event>, ErrorMessage>,
190{
191    warp::post()
192        .and(warp::path("v1"))
193        .and(warp::path(telemetry_type))
194        .and(warp::path::end())
195        .and(warp::header::exact_ignore_case(
196            "content-type",
197            "application/x-protobuf",
198        ))
199        .and(warp::header::optional::<String>("content-encoding"))
200        .and(warp::header::headers_cloned())
201        .and(warp::body::bytes())
202        .and_then(
203            move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
204                let events = make_events(encoding_header, headers, body);
205                handle_request(
206                    events,
207                    acknowledgements,
208                    out.clone(),
209                    telemetry_type,
210                    Resp::default(),
211                )
212            },
213        )
214        .boxed()
215}
216
217fn build_warp_log_filter(
218    acknowledgements: bool,
219    log_namespace: LogNamespace,
220    source_sender: SourceSender,
221    bytes_received: Registered<BytesReceived>,
222    events_received: Registered<EventsReceived>,
223    headers_cfg: Vec<HttpConfigParamKind>,
224    deserializer: Option<OtlpDeserializer>,
225) -> BoxedFilter<(Response,)> {
226    let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
227        decompress_body(encoding_header.as_deref(), body)
228            .inspect_err(|err| {
229                // Other status codes are already handled by `sources::util::decompress_body` (tech debt).
230                if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
231                    emit!(HttpBadRequest::new(
232                        err.status_code().as_u16(),
233                        err.message()
234                    ));
235                }
236            })
237            .and_then(|decoded_body| {
238                bytes_received.emit(ByteSize(decoded_body.len()));
239                if let Some(d) = deserializer.as_ref() {
240                    parse_with_deserializer(d, decoded_body, log_namespace, &events_received)
241                } else {
242                    decode_log_body(decoded_body, log_namespace, &events_received)
243                }
244                .map(|mut events| {
245                    enrich_events(&mut events, &headers_cfg, &headers, log_namespace);
246                    events
247                })
248            })
249    };
250
251    build_ingest_filter::<ExportLogsServiceResponse, _>(
252        LOGS,
253        acknowledgements,
254        source_sender,
255        make_events,
256    )
257}
258fn build_warp_metrics_filter(
259    acknowledgements: bool,
260    source_sender: SourceSender,
261    bytes_received: Registered<BytesReceived>,
262    events_received: Registered<EventsReceived>,
263    deserializer: Option<OtlpDeserializer>,
264) -> BoxedFilter<(Response,)> {
265    let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
266        decompress_body(encoding_header.as_deref(), body)
267            .inspect_err(|err| {
268                // Other status codes are already handled by `sources::util::decompress_body` (tech debt).
269                if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
270                    emit!(HttpBadRequest::new(
271                        err.status_code().as_u16(),
272                        err.message()
273                    ));
274                }
275            })
276            .and_then(|decoded_body| {
277                bytes_received.emit(ByteSize(decoded_body.len()));
278                if let Some(d) = deserializer.as_ref() {
279                    parse_with_deserializer(
280                        d,
281                        decoded_body,
282                        LogNamespace::default(),
283                        &events_received,
284                    )
285                } else {
286                    decode_metrics_body(decoded_body, &events_received)
287                }
288            })
289    };
290
291    build_ingest_filter::<ExportMetricsServiceResponse, _>(
292        METRICS,
293        acknowledgements,
294        source_sender,
295        make_events,
296    )
297}
298
299fn build_warp_trace_filter(
300    acknowledgements: bool,
301    source_sender: SourceSender,
302    bytes_received: Registered<BytesReceived>,
303    events_received: Registered<EventsReceived>,
304    deserializer: Option<OtlpDeserializer>,
305) -> BoxedFilter<(Response,)> {
306    let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
307        decompress_body(encoding_header.as_deref(), body)
308            .inspect_err(|err| {
309                // Other status codes are already handled by `sources::util::decompress_body` (tech debt).
310                if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
311                    emit!(HttpBadRequest::new(
312                        err.status_code().as_u16(),
313                        err.message()
314                    ));
315                }
316            })
317            .and_then(|decoded_body| {
318                bytes_received.emit(ByteSize(decoded_body.len()));
319                if let Some(d) = deserializer.as_ref() {
320                    parse_with_deserializer(
321                        d,
322                        decoded_body,
323                        LogNamespace::default(),
324                        &events_received,
325                    )
326                } else {
327                    decode_trace_body(decoded_body, &events_received)
328                }
329            })
330    };
331
332    build_ingest_filter::<ExportTraceServiceResponse, _>(
333        TRACES,
334        acknowledgements,
335        source_sender,
336        make_events,
337    )
338}
339
340fn decode_trace_body(
341    body: Bytes,
342    events_received: &Registered<EventsReceived>,
343) -> Result<Vec<Event>, ErrorMessage> {
344    let request = ExportTraceServiceRequest::decode(body).map_err(emit_decode_error)?;
345
346    let events: Vec<Event> = request
347        .resource_spans
348        .into_iter()
349        .flat_map(|v| v.into_event_iter())
350        .collect();
351
352    events_received.emit(CountByteSize(
353        events.len(),
354        events.estimated_json_encoded_size_of(),
355    ));
356
357    Ok(events)
358}
359
360fn decode_log_body(
361    body: Bytes,
362    log_namespace: LogNamespace,
363    events_received: &Registered<EventsReceived>,
364) -> Result<Vec<Event>, ErrorMessage> {
365    let request = ExportLogsServiceRequest::decode(body).map_err(emit_decode_error)?;
366
367    let events: Vec<Event> = request
368        .resource_logs
369        .into_iter()
370        .flat_map(|v| v.into_event_iter(log_namespace))
371        .collect();
372
373    events_received.emit(CountByteSize(
374        events.len(),
375        events.estimated_json_encoded_size_of(),
376    ));
377
378    Ok(events)
379}
380
381fn decode_metrics_body(
382    body: Bytes,
383    events_received: &Registered<EventsReceived>,
384) -> Result<Vec<Event>, ErrorMessage> {
385    let request = ExportMetricsServiceRequest::decode(body).map_err(emit_decode_error)?;
386
387    let events: Vec<Event> = request
388        .resource_metrics
389        .into_iter()
390        .flat_map(|v| v.into_event_iter())
391        .collect();
392
393    events_received.emit(CountByteSize(
394        events.len(),
395        events.estimated_json_encoded_size_of(),
396    ));
397
398    Ok(events)
399}
400
401async fn handle_request(
402    events: Result<Vec<Event>, ErrorMessage>,
403    acknowledgements: bool,
404    mut out: SourceSender,
405    output: &str,
406    resp: impl Message,
407) -> Result<Response, Rejection> {
408    match events {
409        Ok(mut events) => {
410            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
411            let count = events.len();
412
413            out.send_batch_named(output, events).await.map_err(|_| {
414                emit!(StreamClosedError { count });
415                warp::reject::custom(ApiError::ServerShutdown)
416            })?;
417
418            match receiver {
419                None => Ok(protobuf(resp).into_response()),
420                Some(receiver) => match receiver.await {
421                    BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
422                    BatchStatus::Errored => Err(warp::reject::custom(Status {
423                        code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
424                        message: "Error delivering contents to sink".into(),
425                        ..Default::default()
426                    })),
427                    BatchStatus::Rejected => Err(warp::reject::custom(Status {
428                        code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
429                        message: "Contents failed to deliver to sink".into(),
430                        ..Default::default()
431                    })),
432                },
433            }
434        }
435        Err(err) => Err(warp::reject::custom(err)),
436    }
437}
438
439async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
440    if let Some(err_msg) = err.find::<ErrorMessage>() {
441        let reply = protobuf(Status {
442            code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
443            message: err_msg.message().into(),
444            ..Default::default()
445        });
446
447        Ok(warp::reply::with_status(reply, err_msg.status_code()))
448    } else {
449        let reply = protobuf(Status {
450            code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
451            message: format!("{err:?}"),
452            ..Default::default()
453        });
454
455        Ok(warp::reply::with_status(
456            reply,
457            StatusCode::INTERNAL_SERVER_ERROR,
458        ))
459    }
460}