1use std::net::SocketAddr;
2
3use crate::{
4 config::{
5 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
6 SourceContext, SourceOutput,
7 },
8 http::KeepaliveConfig,
9 serde::bool_or_struct,
10 sources::{
11 Source,
12 http_server::{build_param_matcher, remove_duplicates},
13 opentelemetry::{
14 grpc::Service,
15 http::{build_warp_filter, run_http_server},
16 },
17 util::grpc::run_grpc_server_with_routes,
18 },
19};
20use futures::FutureExt;
21use futures_util::{TryFutureExt, future::join};
22use tonic::{codec::CompressionEncoding, transport::server::RoutesBuilder};
23use vector_config::indexmap::IndexSet;
24use vector_lib::{
25 codecs::decoding::{OtlpDeserializer, OtlpSignalType},
26 config::{LegacyKey, LogNamespace, log_schema},
27 configurable::configurable_component,
28 internal_event::{BytesReceived, EventsReceived, Protocol},
29 lookup::{OwnedTargetPath, owned_value_path},
30 opentelemetry::{
31 logs::{
32 ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY,
33 RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY,
34 },
35 proto::collector::{
36 logs::v1::logs_service_server::LogsServiceServer,
37 metrics::v1::metrics_service_server::MetricsServiceServer,
38 trace::v1::trace_service_server::TraceServiceServer,
39 },
40 },
41 schema::Definition,
42 tls::{MaybeTlsSettings, TlsEnableableConfig},
43};
44use vrl::value::{Kind, kind::Collection};
45
46pub const LOGS: &str = "logs";
47pub const METRICS: &str = "metrics";
48pub const TRACES: &str = "traces";
49
50#[configurable_component]
52#[derive(Clone, Debug, Default, PartialEq, Eq)]
53#[serde(deny_unknown_fields)]
54pub struct OtlpDecodingConfig {
55 #[serde(default)]
60 pub logs: bool,
61
62 #[serde(default)]
67 pub metrics: bool,
68
69 #[serde(default)]
74 pub traces: bool,
75}
76
77impl From<bool> for OtlpDecodingConfig {
78 fn from(value: bool) -> Self {
84 Self {
85 logs: value,
86 metrics: value,
87 traces: value,
88 }
89 }
90}
91
92impl OtlpDecodingConfig {
93 pub const fn any_enabled(&self) -> bool {
95 self.logs || self.metrics || self.traces
96 }
97
98 pub const fn all_enabled(&self) -> bool {
100 self.logs && self.metrics && self.traces
101 }
102
103 pub const fn is_mixed(&self) -> bool {
105 self.any_enabled() && !self.all_enabled()
106 }
107}
108
109#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
111#[derive(Clone, Debug)]
112#[serde(deny_unknown_fields)]
113pub struct OpentelemetryConfig {
114 #[configurable(derived)]
115 pub grpc: GrpcConfig,
116
117 #[configurable(derived)]
118 pub http: HttpConfig,
119
120 #[configurable(derived)]
121 #[serde(default, deserialize_with = "bool_or_struct")]
122 pub acknowledgements: SourceAcknowledgementsConfig,
123
124 #[configurable(metadata(docs::hidden))]
126 #[serde(default)]
127 pub log_namespace: Option<bool>,
128
129 #[serde(default, deserialize_with = "bool_or_struct")]
158 pub use_otlp_decoding: OtlpDecodingConfig,
159}
160
161#[configurable_component]
163#[configurable(metadata(docs::examples = "example_grpc_config()"))]
164#[derive(Clone, Debug)]
165#[serde(deny_unknown_fields)]
166pub struct GrpcConfig {
167 #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))]
171 pub address: SocketAddr,
172
173 #[configurable(derived)]
174 #[serde(default, skip_serializing_if = "Option::is_none")]
175 pub tls: Option<TlsEnableableConfig>,
176}
177
178fn example_grpc_config() -> GrpcConfig {
179 GrpcConfig {
180 address: "0.0.0.0:4317".parse().unwrap(),
181 tls: None,
182 }
183}
184
185#[configurable_component]
187#[configurable(metadata(docs::examples = "example_http_config()"))]
188#[derive(Clone, Debug)]
189#[serde(deny_unknown_fields)]
190pub struct HttpConfig {
191 #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))]
195 pub address: SocketAddr,
196
197 #[configurable(derived)]
198 #[serde(default, skip_serializing_if = "Option::is_none")]
199 pub tls: Option<TlsEnableableConfig>,
200
201 #[configurable(derived)]
202 #[serde(default)]
203 pub keepalive: KeepaliveConfig,
204
205 #[serde(default)]
214 #[configurable(metadata(docs::examples = "User-Agent"))]
215 #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
216 #[configurable(metadata(docs::examples = "X-*"))]
217 #[configurable(metadata(docs::examples = "*"))]
218 pub headers: Vec<String>,
219}
220
221fn example_http_config() -> HttpConfig {
222 HttpConfig {
223 address: "0.0.0.0:4318".parse().unwrap(),
224 tls: None,
225 keepalive: KeepaliveConfig::default(),
226 headers: vec![],
227 }
228}
229
230impl GenerateConfig for OpentelemetryConfig {
231 fn generate_config() -> toml::Value {
232 toml::Value::try_from(Self {
233 grpc: example_grpc_config(),
234 http: example_http_config(),
235 acknowledgements: Default::default(),
236 log_namespace: None,
237 use_otlp_decoding: OtlpDecodingConfig::default(),
238 })
239 .unwrap()
240 }
241}
242
243impl OpentelemetryConfig {
244 pub(crate) fn get_signal_deserializer(
245 &self,
246 signal_type: OtlpSignalType,
247 ) -> vector_common::Result<Option<OtlpDeserializer>> {
248 let should_use_otlp = match signal_type {
249 OtlpSignalType::Logs => self.use_otlp_decoding.logs,
250 OtlpSignalType::Metrics => self.use_otlp_decoding.metrics,
251 OtlpSignalType::Traces => self.use_otlp_decoding.traces,
252 };
253
254 if should_use_otlp {
255 Ok(Some(OtlpDeserializer::new_with_signals(IndexSet::from([
256 signal_type,
257 ]))))
258 } else {
259 Ok(None)
260 }
261 }
262}
263
264#[async_trait::async_trait]
265#[typetag::serde(name = "opentelemetry")]
266impl SourceConfig for OpentelemetryConfig {
267 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
268 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
269 let events_received = register!(EventsReceived);
270 let log_namespace = cx.log_namespace(self.log_namespace);
271
272 let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
273
274 if self.use_otlp_decoding.is_mixed() {
276 info!(
277 message = "Signals with OTLP decoding enabled will preserve raw format; others will use Vector native format.",
278 logs_otlp = self.use_otlp_decoding.logs,
279 metrics_otlp = self.use_otlp_decoding.metrics,
280 traces_otlp = self.use_otlp_decoding.traces,
281 );
282 }
283
284 let logs_deserializer = self.get_signal_deserializer(OtlpSignalType::Logs)?;
285 let metrics_deserializer = self.get_signal_deserializer(OtlpSignalType::Metrics)?;
286 let traces_deserializer = self.get_signal_deserializer(OtlpSignalType::Traces)?;
287
288 let log_service = LogsServiceServer::new(Service {
289 pipeline: cx.out.clone(),
290 acknowledgements,
291 log_namespace,
292 events_received: events_received.clone(),
293 deserializer: logs_deserializer.clone(),
294 })
295 .accept_compressed(CompressionEncoding::Gzip)
296 .max_decoding_message_size(usize::MAX);
297
298 let metrics_service = MetricsServiceServer::new(Service {
299 pipeline: cx.out.clone(),
300 acknowledgements,
301 log_namespace,
302 events_received: events_received.clone(),
303 deserializer: metrics_deserializer.clone(),
304 })
305 .accept_compressed(CompressionEncoding::Gzip)
306 .max_decoding_message_size(usize::MAX);
307
308 let trace_service = TraceServiceServer::new(Service {
309 pipeline: cx.out.clone(),
310 acknowledgements,
311 log_namespace,
312 events_received: events_received.clone(),
313 deserializer: traces_deserializer.clone(),
314 })
315 .accept_compressed(CompressionEncoding::Gzip)
316 .max_decoding_message_size(usize::MAX);
317
318 let mut builder = RoutesBuilder::default();
319 builder
320 .add_service(log_service)
321 .add_service(metrics_service)
322 .add_service(trace_service);
323
324 let grpc_source = run_grpc_server_with_routes(
325 self.grpc.address,
326 grpc_tls_settings,
327 builder.routes(),
328 cx.shutdown.clone(),
329 )
330 .map_err(|error| {
331 error!(message = "OpenTelemetry source gRPC server failed.", %error);
332 });
333
334 let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?;
335 let protocol = http_tls_settings.http_protocol_name();
336 let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
337 let headers =
338 build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
339
340 let filters = build_warp_filter(
341 acknowledgements,
342 log_namespace,
343 cx.out,
344 bytes_received,
345 events_received,
346 headers,
347 logs_deserializer,
348 metrics_deserializer,
349 traces_deserializer,
350 );
351
352 let http_source = run_http_server(
353 self.http.address,
354 http_tls_settings,
355 filters,
356 cx.shutdown,
357 self.http.keepalive.clone(),
358 )
359 .map_err(|error| {
360 error!(message = "OpenTelemetry source HTTP server failed.", %error);
361 });
362
363 Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed())
364 }
365
366 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
369 let log_namespace = global_log_namespace.merge(self.log_namespace);
370 let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace])
371 .with_source_metadata(
372 Self::NAME,
373 Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))),
374 &owned_value_path!(RESOURCE_KEY),
375 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
376 None,
377 )
378 .with_source_metadata(
379 Self::NAME,
380 Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))),
381 &owned_value_path!(ATTRIBUTES_KEY),
382 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
383 None,
384 )
385 .with_source_metadata(
386 Self::NAME,
387 Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))),
388 &owned_value_path!(TRACE_ID_KEY),
389 Kind::bytes().or_undefined(),
390 None,
391 )
392 .with_source_metadata(
393 Self::NAME,
394 Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))),
395 &owned_value_path!(SPAN_ID_KEY),
396 Kind::bytes().or_undefined(),
397 None,
398 )
399 .with_source_metadata(
400 Self::NAME,
401 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))),
402 &owned_value_path!(SEVERITY_TEXT_KEY),
403 Kind::bytes().or_undefined(),
404 Some("severity"),
405 )
406 .with_source_metadata(
407 Self::NAME,
408 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))),
409 &owned_value_path!(SEVERITY_NUMBER_KEY),
410 Kind::integer().or_undefined(),
411 None,
412 )
413 .with_source_metadata(
414 Self::NAME,
415 Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))),
416 &owned_value_path!(FLAGS_KEY),
417 Kind::integer().or_undefined(),
418 None,
419 )
420 .with_source_metadata(
421 Self::NAME,
422 Some(LegacyKey::Overwrite(owned_value_path!(
423 DROPPED_ATTRIBUTES_COUNT_KEY
424 ))),
425 &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
426 Kind::integer(),
427 None,
428 )
429 .with_source_metadata(
430 Self::NAME,
431 Some(LegacyKey::Overwrite(owned_value_path!(
432 OBSERVED_TIMESTAMP_KEY
433 ))),
434 &owned_value_path!(OBSERVED_TIMESTAMP_KEY),
435 Kind::timestamp(),
436 None,
437 )
438 .with_source_metadata(
439 Self::NAME,
440 None,
441 &owned_value_path!("timestamp"),
442 Kind::timestamp(),
443 Some("timestamp"),
444 )
445 .with_standard_vector_source_metadata();
446
447 let schema_definition = match log_namespace {
448 LogNamespace::Vector => {
449 schema_definition.with_meaning(OwnedTargetPath::event_root(), "message")
450 }
451 LogNamespace::Legacy => {
452 schema_definition.with_meaning(log_schema().owned_message_path(), "message")
453 }
454 };
455
456 let logs_output = if self.use_otlp_decoding.logs {
457 SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(LOGS)
458 } else {
459 SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS)
460 };
461
462 let metrics_output = if self.use_otlp_decoding.metrics {
463 SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS)
464 } else {
465 SourceOutput::new_metrics().with_port(METRICS)
466 };
467
468 vec![
469 logs_output,
470 metrics_output,
471 SourceOutput::new_traces().with_port(TRACES),
472 ]
473 }
474
475 fn resources(&self) -> Vec<Resource> {
476 vec![
477 Resource::tcp(self.grpc.address),
478 Resource::tcp(self.http.address),
479 ]
480 }
481
482 fn can_acknowledge(&self) -> bool {
483 true
484 }
485}