vector/sources/opentelemetry/
config.rs

1use std::net::SocketAddr;
2
3use futures::FutureExt;
4use futures_util::{TryFutureExt, future::join};
5use tonic::{codec::CompressionEncoding, transport::server::RoutesBuilder};
6use vector_lib::{
7    codecs::decoding::ProtobufDeserializer,
8    config::{LegacyKey, LogNamespace, log_schema},
9    configurable::configurable_component,
10    internal_event::{BytesReceived, EventsReceived, Protocol},
11    lookup::{OwnedTargetPath, owned_value_path},
12    opentelemetry::{
13        logs::{
14            ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY,
15            RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY,
16        },
17        proto::collector::{
18            logs::v1::logs_service_server::LogsServiceServer,
19            metrics::v1::metrics_service_server::MetricsServiceServer,
20            trace::v1::trace_service_server::TraceServiceServer,
21        },
22    },
23    schema::Definition,
24    tls::{MaybeTlsSettings, TlsEnableableConfig},
25};
26use vrl::{
27    protobuf::parse::Options,
28    value::{Kind, kind::Collection},
29};
30
31use crate::{
32    config::{
33        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
34        SourceContext, SourceOutput,
35    },
36    http::KeepaliveConfig,
37    serde::bool_or_struct,
38    sources::{
39        Source,
40        http_server::{build_param_matcher, remove_duplicates},
41        opentelemetry::{
42            grpc::Service,
43            http::{build_warp_filter, run_http_server},
44        },
45        util::grpc::run_grpc_server_with_routes,
46    },
47};
48
49pub const LOGS: &str = "logs";
50pub const METRICS: &str = "metrics";
51pub const TRACES: &str = "traces";
52
53pub const OTEL_PROTO_LOGS_REQUEST: &str =
54    "opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest";
55pub const OTEL_PROTO_TRACES_REQUEST: &str =
56    "opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest";
57pub const OTEL_PROTO_METRICS_REQUEST: &str =
58    "opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest";
59
60/// Configuration for the `opentelemetry` source.
61#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
62#[derive(Clone, Debug)]
63#[serde(deny_unknown_fields)]
64pub struct OpentelemetryConfig {
65    #[configurable(derived)]
66    pub grpc: GrpcConfig,
67
68    #[configurable(derived)]
69    pub http: HttpConfig,
70
71    #[configurable(derived)]
72    #[serde(default, deserialize_with = "bool_or_struct")]
73    pub acknowledgements: SourceAcknowledgementsConfig,
74
75    /// The namespace to use for logs. This overrides the global setting.
76    #[configurable(metadata(docs::hidden))]
77    #[serde(default)]
78    pub log_namespace: Option<bool>,
79
80    /// Setting this field will override the legacy mapping of OTEL protos to Vector events and use the proto directly.
81    ///
82    /// One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format.
83    /// This means that components that work on metrics, will not be compatible with this output.
84    /// However, these events can be forwarded directly to a downstream OTEL collector.
85    #[configurable(derived)]
86    #[serde(default)]
87    pub use_otlp_decoding: bool,
88}
89
90/// Configuration for the `opentelemetry` gRPC server.
91#[configurable_component]
92#[configurable(metadata(docs::examples = "example_grpc_config()"))]
93#[derive(Clone, Debug)]
94#[serde(deny_unknown_fields)]
95pub struct GrpcConfig {
96    /// The socket address to listen for connections on.
97    ///
98    /// It _must_ include a port.
99    #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))]
100    pub address: SocketAddr,
101
102    #[configurable(derived)]
103    #[serde(default, skip_serializing_if = "Option::is_none")]
104    pub tls: Option<TlsEnableableConfig>,
105}
106
107fn example_grpc_config() -> GrpcConfig {
108    GrpcConfig {
109        address: "0.0.0.0:4317".parse().unwrap(),
110        tls: None,
111    }
112}
113
114/// Configuration for the `opentelemetry` HTTP server.
115#[configurable_component]
116#[configurable(metadata(docs::examples = "example_http_config()"))]
117#[derive(Clone, Debug)]
118#[serde(deny_unknown_fields)]
119pub struct HttpConfig {
120    /// The socket address to listen for connections on.
121    ///
122    /// It _must_ include a port.
123    #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))]
124    pub address: SocketAddr,
125
126    #[configurable(derived)]
127    #[serde(default, skip_serializing_if = "Option::is_none")]
128    pub tls: Option<TlsEnableableConfig>,
129
130    #[configurable(derived)]
131    #[serde(default)]
132    pub keepalive: KeepaliveConfig,
133
134    /// A list of HTTP headers to include in the log event.
135    ///
136    /// Accepts the wildcard (`*`) character for headers matching a specified pattern.
137    ///
138    /// Specifying "*" results in all headers included in the log event.
139    ///
140    /// These headers are not included in the JSON payload if a field with a conflicting name exists.
141    #[serde(default)]
142    #[configurable(metadata(docs::examples = "User-Agent"))]
143    #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
144    #[configurable(metadata(docs::examples = "X-*"))]
145    #[configurable(metadata(docs::examples = "*"))]
146    pub headers: Vec<String>,
147}
148
149fn example_http_config() -> HttpConfig {
150    HttpConfig {
151        address: "0.0.0.0:4318".parse().unwrap(),
152        tls: None,
153        keepalive: KeepaliveConfig::default(),
154        headers: vec![],
155    }
156}
157
158impl GenerateConfig for OpentelemetryConfig {
159    fn generate_config() -> toml::Value {
160        toml::Value::try_from(Self {
161            grpc: example_grpc_config(),
162            http: example_http_config(),
163            acknowledgements: Default::default(),
164            log_namespace: None,
165            use_otlp_decoding: false,
166        })
167        .unwrap()
168    }
169}
170
171impl OpentelemetryConfig {
172    fn get_deserializer(
173        &self,
174        message_type: &str,
175    ) -> vector_common::Result<Option<ProtobufDeserializer>> {
176        if self.use_otlp_decoding {
177            let deserializer = ProtobufDeserializer::new_from_bytes(
178                vector_lib::opentelemetry::proto::DESCRIPTOR_BYTES,
179                message_type,
180                Options {
181                    use_json_names: true,
182                },
183            )?;
184            Ok(Some(deserializer))
185        } else {
186            Ok(None)
187        }
188    }
189}
190
191#[async_trait::async_trait]
192#[typetag::serde(name = "opentelemetry")]
193impl SourceConfig for OpentelemetryConfig {
194    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
195        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
196        let events_received = register!(EventsReceived);
197        let log_namespace = cx.log_namespace(self.log_namespace);
198
199        let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
200
201        let log_deserializer = self.get_deserializer(OTEL_PROTO_LOGS_REQUEST)?;
202        let log_service = LogsServiceServer::new(Service {
203            pipeline: cx.out.clone(),
204            acknowledgements,
205            log_namespace,
206            events_received: events_received.clone(),
207            deserializer: log_deserializer.clone(),
208        })
209        .accept_compressed(CompressionEncoding::Gzip)
210        .max_decoding_message_size(usize::MAX);
211
212        let metric_deserializer = self.get_deserializer(OTEL_PROTO_METRICS_REQUEST)?;
213        let metrics_service = MetricsServiceServer::new(Service {
214            pipeline: cx.out.clone(),
215            acknowledgements,
216            log_namespace,
217            events_received: events_received.clone(),
218            deserializer: metric_deserializer,
219        })
220        .accept_compressed(CompressionEncoding::Gzip)
221        .max_decoding_message_size(usize::MAX);
222
223        let trace_deserializer = self.get_deserializer(OTEL_PROTO_TRACES_REQUEST)?;
224        let trace_service = TraceServiceServer::new(Service {
225            pipeline: cx.out.clone(),
226            acknowledgements,
227            log_namespace,
228            events_received: events_received.clone(),
229            deserializer: trace_deserializer,
230        })
231        .accept_compressed(CompressionEncoding::Gzip)
232        .max_decoding_message_size(usize::MAX);
233
234        let mut builder = RoutesBuilder::default();
235        builder
236            .add_service(log_service)
237            .add_service(metrics_service)
238            .add_service(trace_service);
239
240        let grpc_source = run_grpc_server_with_routes(
241            self.grpc.address,
242            grpc_tls_settings,
243            builder.routes(),
244            cx.shutdown.clone(),
245        )
246        .map_err(|error| {
247            error!(message = "Source future failed.", %error);
248        });
249
250        let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?;
251        let protocol = http_tls_settings.http_protocol_name();
252        let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
253        let headers =
254            build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
255
256        let filters = build_warp_filter(
257            acknowledgements,
258            log_namespace,
259            cx.out,
260            bytes_received,
261            events_received,
262            headers,
263            log_deserializer,
264        );
265
266        let http_source = run_http_server(
267            self.http.address,
268            http_tls_settings,
269            filters,
270            cx.shutdown,
271            self.http.keepalive.clone(),
272        );
273
274        Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed())
275    }
276
277    // TODO: appropriately handle "severity" meaning across both "severity_text" and "severity_number",
278    // as both are optional and can be converted to/from.
279    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
280        let log_namespace = global_log_namespace.merge(self.log_namespace);
281        let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace])
282            .with_source_metadata(
283                Self::NAME,
284                Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))),
285                &owned_value_path!(RESOURCE_KEY),
286                Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
287                None,
288            )
289            .with_source_metadata(
290                Self::NAME,
291                Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))),
292                &owned_value_path!(ATTRIBUTES_KEY),
293                Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
294                None,
295            )
296            .with_source_metadata(
297                Self::NAME,
298                Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))),
299                &owned_value_path!(TRACE_ID_KEY),
300                Kind::bytes().or_undefined(),
301                None,
302            )
303            .with_source_metadata(
304                Self::NAME,
305                Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))),
306                &owned_value_path!(SPAN_ID_KEY),
307                Kind::bytes().or_undefined(),
308                None,
309            )
310            .with_source_metadata(
311                Self::NAME,
312                Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))),
313                &owned_value_path!(SEVERITY_TEXT_KEY),
314                Kind::bytes().or_undefined(),
315                Some("severity"),
316            )
317            .with_source_metadata(
318                Self::NAME,
319                Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))),
320                &owned_value_path!(SEVERITY_NUMBER_KEY),
321                Kind::integer().or_undefined(),
322                None,
323            )
324            .with_source_metadata(
325                Self::NAME,
326                Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))),
327                &owned_value_path!(FLAGS_KEY),
328                Kind::integer().or_undefined(),
329                None,
330            )
331            .with_source_metadata(
332                Self::NAME,
333                Some(LegacyKey::Overwrite(owned_value_path!(
334                    DROPPED_ATTRIBUTES_COUNT_KEY
335                ))),
336                &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
337                Kind::integer(),
338                None,
339            )
340            .with_source_metadata(
341                Self::NAME,
342                Some(LegacyKey::Overwrite(owned_value_path!(
343                    OBSERVED_TIMESTAMP_KEY
344                ))),
345                &owned_value_path!(OBSERVED_TIMESTAMP_KEY),
346                Kind::timestamp(),
347                None,
348            )
349            .with_source_metadata(
350                Self::NAME,
351                None,
352                &owned_value_path!("timestamp"),
353                Kind::timestamp(),
354                Some("timestamp"),
355            )
356            .with_standard_vector_source_metadata();
357
358        let schema_definition = match log_namespace {
359            LogNamespace::Vector => {
360                schema_definition.with_meaning(OwnedTargetPath::event_root(), "message")
361            }
362            LogNamespace::Legacy => {
363                schema_definition.with_meaning(log_schema().owned_message_path(), "message")
364            }
365        };
366
367        let metrics_output = if self.use_otlp_decoding {
368            SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS)
369        } else {
370            SourceOutput::new_metrics().with_port(METRICS)
371        };
372        vec![
373            SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS),
374            metrics_output,
375            SourceOutput::new_traces().with_port(TRACES),
376        ]
377    }
378
379    fn resources(&self) -> Vec<Resource> {
380        vec![
381            Resource::tcp(self.grpc.address),
382            Resource::tcp(self.http.address),
383        ]
384    }
385
386    fn can_acknowledge(&self) -> bool {
387        true
388    }
389}