vector/sources/opentelemetry/
http.rs

1use std::time::Duration;
2use std::{convert::Infallible, net::SocketAddr};
3
4use bytes::Bytes;
5use futures_util::FutureExt;
6use http::StatusCode;
7use hyper::{service::make_service_fn, Server};
8use prost::Message;
9use snafu::Snafu;
10use tokio::net::TcpStream;
11use tower::ServiceBuilder;
12use tracing::Span;
13use vector_lib::internal_event::{
14    ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Registered,
15};
16use vector_lib::opentelemetry::proto::collector::{
17    logs::v1::{ExportLogsServiceRequest, ExportLogsServiceResponse},
18    metrics::v1::{ExportMetricsServiceRequest, ExportMetricsServiceResponse},
19    trace::v1::{ExportTraceServiceRequest, ExportTraceServiceResponse},
20};
21use vector_lib::tls::MaybeTlsIncomingStream;
22use vector_lib::{
23    config::LogNamespace,
24    event::{BatchNotifier, BatchStatus},
25    EstimatedJsonEncodedSizeOf,
26};
27use warp::{
28    filters::BoxedFilter, http::HeaderMap, reject::Rejection, reply::Response, Filter, Reply,
29};
30
31use crate::common::http::ErrorMessage;
32use crate::http::{KeepaliveConfig, MaxConnectionAgeLayer};
33use crate::sources::http_server::HttpConfigParamKind;
34use crate::sources::util::add_headers;
35use crate::{
36    event::Event,
37    http::build_http_trace_layer,
38    internal_events::{EventsReceived, StreamClosedError},
39    shutdown::ShutdownSignal,
40    sources::util::decode,
41    tls::MaybeTlsSettings,
42    SourceSender,
43};
44
45use super::{reply::protobuf, status::Status};
46use crate::sources::opentelemetry::config::{OpentelemetryConfig, LOGS, METRICS, TRACES};
47
48#[derive(Clone, Copy, Debug, Snafu)]
49pub(crate) enum ApiError {
50    ServerShutdown,
51}
52
53impl warp::reject::Reject for ApiError {}
54
55pub(crate) async fn run_http_server(
56    address: SocketAddr,
57    tls_settings: MaybeTlsSettings,
58    filters: BoxedFilter<(Response,)>,
59    shutdown: ShutdownSignal,
60    keepalive_settings: KeepaliveConfig,
61) -> crate::Result<()> {
62    let listener = tls_settings.bind(&address).await?;
63    let routes = filters.recover(handle_rejection);
64
65    info!(message = "Building HTTP server.", address = %address);
66
67    let span = Span::current();
68    let make_svc = make_service_fn(move |conn: &MaybeTlsIncomingStream<TcpStream>| {
69        let svc = ServiceBuilder::new()
70            .layer(build_http_trace_layer(span.clone()))
71            .option_layer(keepalive_settings.max_connection_age_secs.map(|secs| {
72                MaxConnectionAgeLayer::new(
73                    Duration::from_secs(secs),
74                    keepalive_settings.max_connection_age_jitter_factor,
75                    conn.peer_addr(),
76                )
77            }))
78            .service(warp::service(routes.clone()));
79        futures_util::future::ok::<_, Infallible>(svc)
80    });
81
82    Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
83        .serve(make_svc)
84        .with_graceful_shutdown(shutdown.map(|_| ()))
85        .await?;
86
87    Ok(())
88}
89
90pub(crate) fn build_warp_filter(
91    acknowledgements: bool,
92    log_namespace: LogNamespace,
93    out: SourceSender,
94    bytes_received: Registered<BytesReceived>,
95    events_received: Registered<EventsReceived>,
96    headers: Vec<HttpConfigParamKind>,
97) -> BoxedFilter<(Response,)> {
98    let log_filters = build_warp_log_filter(
99        acknowledgements,
100        log_namespace,
101        out.clone(),
102        bytes_received.clone(),
103        events_received.clone(),
104        headers.clone(),
105    );
106    let metrics_filters = build_warp_metrics_filter(
107        acknowledgements,
108        out.clone(),
109        bytes_received.clone(),
110        events_received.clone(),
111    );
112    let trace_filters = build_warp_trace_filter(
113        acknowledgements,
114        out.clone(),
115        bytes_received,
116        events_received,
117    );
118    log_filters
119        .or(trace_filters)
120        .unify()
121        .or(metrics_filters)
122        .unify()
123        .boxed()
124}
125
126fn enrich_events(
127    events: &mut [Event],
128    headers_config: &[HttpConfigParamKind],
129    headers: &HeaderMap,
130    log_namespace: LogNamespace,
131) {
132    add_headers(
133        events,
134        headers_config,
135        headers,
136        log_namespace,
137        OpentelemetryConfig::NAME,
138    );
139}
140
141fn build_warp_log_filter(
142    acknowledgements: bool,
143    log_namespace: LogNamespace,
144    out: SourceSender,
145    bytes_received: Registered<BytesReceived>,
146    events_received: Registered<EventsReceived>,
147    headers: Vec<HttpConfigParamKind>,
148) -> BoxedFilter<(Response,)> {
149    warp::post()
150        .and(warp::path!("v1" / "logs"))
151        .and(warp::header::exact_ignore_case(
152            "content-type",
153            "application/x-protobuf",
154        ))
155        .and(warp::header::optional::<String>("content-encoding"))
156        .and(warp::header::headers_cloned())
157        .and(warp::body::bytes())
158        .and_then(
159            move |encoding_header: Option<String>, headers_config: HeaderMap, body: Bytes| {
160                let events = decode(encoding_header.as_deref(), body)
161                    .and_then(|body| {
162                        bytes_received.emit(ByteSize(body.len()));
163                        decode_log_body(body, log_namespace, &events_received)
164                    })
165                    .map(|mut events| {
166                        enrich_events(&mut events, &headers, &headers_config, log_namespace);
167                        events
168                    });
169
170                handle_request(
171                    events,
172                    acknowledgements,
173                    out.clone(),
174                    LOGS,
175                    ExportLogsServiceResponse::default(),
176                )
177            },
178        )
179        .boxed()
180}
181
182fn build_warp_metrics_filter(
183    acknowledgements: bool,
184    out: SourceSender,
185    bytes_received: Registered<BytesReceived>,
186    events_received: Registered<EventsReceived>,
187) -> BoxedFilter<(Response,)> {
188    warp::post()
189        .and(warp::path!("v1" / "metrics"))
190        .and(warp::header::exact_ignore_case(
191            "content-type",
192            "application/x-protobuf",
193        ))
194        .and(warp::header::optional::<String>("content-encoding"))
195        .and(warp::body::bytes())
196        .and_then(move |encoding_header: Option<String>, body: Bytes| {
197            let events = decode(encoding_header.as_deref(), body).and_then(|body| {
198                bytes_received.emit(ByteSize(body.len()));
199                decode_metrics_body(body, &events_received)
200            });
201
202            handle_request(
203                events,
204                acknowledgements,
205                out.clone(),
206                METRICS,
207                ExportMetricsServiceResponse::default(),
208            )
209        })
210        .boxed()
211}
212
213fn build_warp_trace_filter(
214    acknowledgements: bool,
215    out: SourceSender,
216    bytes_received: Registered<BytesReceived>,
217    events_received: Registered<EventsReceived>,
218) -> BoxedFilter<(Response,)> {
219    warp::post()
220        .and(warp::path!("v1" / "traces"))
221        .and(warp::header::exact_ignore_case(
222            "content-type",
223            "application/x-protobuf",
224        ))
225        .and(warp::header::optional::<String>("content-encoding"))
226        .and(warp::body::bytes())
227        .and_then(move |encoding_header: Option<String>, body: Bytes| {
228            let events = decode(encoding_header.as_deref(), body).and_then(|body| {
229                bytes_received.emit(ByteSize(body.len()));
230                decode_trace_body(body, &events_received)
231            });
232
233            handle_request(
234                events,
235                acknowledgements,
236                out.clone(),
237                TRACES,
238                ExportTraceServiceResponse::default(),
239            )
240        })
241        .boxed()
242}
243
244fn decode_trace_body(
245    body: Bytes,
246    events_received: &Registered<EventsReceived>,
247) -> Result<Vec<Event>, ErrorMessage> {
248    let request = ExportTraceServiceRequest::decode(body).map_err(|error| {
249        ErrorMessage::new(
250            StatusCode::BAD_REQUEST,
251            format!("Could not decode request: {error}"),
252        )
253    })?;
254
255    let events: Vec<Event> = request
256        .resource_spans
257        .into_iter()
258        .flat_map(|v| v.into_event_iter())
259        .collect();
260
261    events_received.emit(CountByteSize(
262        events.len(),
263        events.estimated_json_encoded_size_of(),
264    ));
265
266    Ok(events)
267}
268
269fn decode_log_body(
270    body: Bytes,
271    log_namespace: LogNamespace,
272    events_received: &Registered<EventsReceived>,
273) -> Result<Vec<Event>, ErrorMessage> {
274    let request = ExportLogsServiceRequest::decode(body).map_err(|error| {
275        ErrorMessage::new(
276            StatusCode::BAD_REQUEST,
277            format!("Could not decode request: {error}"),
278        )
279    })?;
280
281    let events: Vec<Event> = request
282        .resource_logs
283        .into_iter()
284        .flat_map(|v| v.into_event_iter(log_namespace))
285        .collect();
286
287    events_received.emit(CountByteSize(
288        events.len(),
289        events.estimated_json_encoded_size_of(),
290    ));
291
292    Ok(events)
293}
294
295fn decode_metrics_body(
296    body: Bytes,
297    events_received: &Registered<EventsReceived>,
298) -> Result<Vec<Event>, ErrorMessage> {
299    let request = ExportMetricsServiceRequest::decode(body).map_err(|error| {
300        ErrorMessage::new(
301            StatusCode::BAD_REQUEST,
302            format!("Could not decode request: {error}"),
303        )
304    })?;
305
306    let events: Vec<Event> = request
307        .resource_metrics
308        .into_iter()
309        .flat_map(|v| v.into_event_iter())
310        .collect();
311
312    events_received.emit(CountByteSize(
313        events.len(),
314        events.estimated_json_encoded_size_of(),
315    ));
316
317    Ok(events)
318}
319
320async fn handle_request(
321    events: Result<Vec<Event>, ErrorMessage>,
322    acknowledgements: bool,
323    mut out: SourceSender,
324    output: &str,
325    resp: impl Message,
326) -> Result<Response, Rejection> {
327    match events {
328        Ok(mut events) => {
329            let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
330            let count = events.len();
331
332            out.send_batch_named(output, events).await.map_err(|_| {
333                emit!(StreamClosedError { count });
334                warp::reject::custom(ApiError::ServerShutdown)
335            })?;
336
337            match receiver {
338                None => Ok(protobuf(resp).into_response()),
339                Some(receiver) => match receiver.await {
340                    BatchStatus::Delivered => Ok(protobuf(resp).into_response()),
341                    BatchStatus::Errored => Err(warp::reject::custom(Status {
342                        code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
343                        message: "Error delivering contents to sink".into(),
344                        ..Default::default()
345                    })),
346                    BatchStatus::Rejected => Err(warp::reject::custom(Status {
347                        code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
348                        message: "Contents failed to deliver to sink".into(),
349                        ..Default::default()
350                    })),
351                },
352            }
353        }
354        Err(err) => Err(warp::reject::custom(err)),
355    }
356}
357
358async fn handle_rejection(err: Rejection) -> Result<impl Reply, std::convert::Infallible> {
359    if let Some(err_msg) = err.find::<ErrorMessage>() {
360        let reply = protobuf(Status {
361            code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
362            message: err_msg.message().into(),
363            ..Default::default()
364        });
365
366        Ok(warp::reply::with_status(reply, err_msg.status_code()))
367    } else {
368        let reply = protobuf(Status {
369            code: 2, // UNKNOWN - OTLP doesn't require use of status.code, but we can't encode a None here
370            message: format!("{err:?}"),
371            ..Default::default()
372        });
373
374        Ok(warp::reply::with_status(
375            reply,
376            StatusCode::INTERNAL_SERVER_ERROR,
377        ))
378    }
379}