vector/sources/opentelemetry/
config.rs

1use std::net::SocketAddr;
2
3use crate::{
4    config::{
5        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
6        SourceContext, SourceOutput,
7    },
8    http::KeepaliveConfig,
9    serde::bool_or_struct,
10    sources::{
11        Source,
12        http_server::{build_param_matcher, remove_duplicates},
13        opentelemetry::{
14            grpc::Service,
15            http::{build_warp_filter, run_http_server},
16        },
17        util::grpc::run_grpc_server_with_routes,
18    },
19};
20use futures::FutureExt;
21use futures_util::{TryFutureExt, future::join};
22use tonic::{codec::CompressionEncoding, transport::server::RoutesBuilder};
23use vector_config::indexmap::IndexSet;
24use vector_lib::{
25    codecs::decoding::{OtlpDeserializer, OtlpSignalType},
26    config::{LegacyKey, LogNamespace, log_schema},
27    configurable::configurable_component,
28    internal_event::{BytesReceived, EventsReceived, Protocol},
29    lookup::{OwnedTargetPath, owned_value_path},
30    opentelemetry::{
31        logs::{
32            ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY,
33            RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY,
34        },
35        proto::collector::{
36            logs::v1::logs_service_server::LogsServiceServer,
37            metrics::v1::metrics_service_server::MetricsServiceServer,
38            trace::v1::trace_service_server::TraceServiceServer,
39        },
40    },
41    schema::Definition,
42    tls::{MaybeTlsSettings, TlsEnableableConfig},
43};
44use vrl::value::{Kind, kind::Collection};
45
46pub const LOGS: &str = "logs";
47pub const METRICS: &str = "metrics";
48pub const TRACES: &str = "traces";
49
50/// Configuration for the `opentelemetry` source.
51#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
52#[derive(Clone, Debug)]
53#[serde(deny_unknown_fields)]
54pub struct OpentelemetryConfig {
55    #[configurable(derived)]
56    pub grpc: GrpcConfig,
57
58    #[configurable(derived)]
59    pub http: HttpConfig,
60
61    #[configurable(derived)]
62    #[serde(default, deserialize_with = "bool_or_struct")]
63    pub acknowledgements: SourceAcknowledgementsConfig,
64
65    /// The namespace to use for logs. This overrides the global setting.
66    #[configurable(metadata(docs::hidden))]
67    #[serde(default)]
68    pub log_namespace: Option<bool>,
69
70    /// Setting this field will override the legacy mapping of OTEL protos to Vector events and use the proto directly.
71    ///
72    /// One major caveat here is that the incoming metrics will be parsed as logs but they will preserve the OTLP format.
73    /// This means that components that work on metrics, will not be compatible with this output.
74    /// However, these events can be forwarded directly to a downstream OTEL collector.
75    #[configurable(derived)]
76    #[serde(default)]
77    pub use_otlp_decoding: bool,
78}
79
80/// Configuration for the `opentelemetry` gRPC server.
81#[configurable_component]
82#[configurable(metadata(docs::examples = "example_grpc_config()"))]
83#[derive(Clone, Debug)]
84#[serde(deny_unknown_fields)]
85pub struct GrpcConfig {
86    /// The socket address to listen for connections on.
87    ///
88    /// It _must_ include a port.
89    #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))]
90    pub address: SocketAddr,
91
92    #[configurable(derived)]
93    #[serde(default, skip_serializing_if = "Option::is_none")]
94    pub tls: Option<TlsEnableableConfig>,
95}
96
97fn example_grpc_config() -> GrpcConfig {
98    GrpcConfig {
99        address: "0.0.0.0:4317".parse().unwrap(),
100        tls: None,
101    }
102}
103
104/// Configuration for the `opentelemetry` HTTP server.
105#[configurable_component]
106#[configurable(metadata(docs::examples = "example_http_config()"))]
107#[derive(Clone, Debug)]
108#[serde(deny_unknown_fields)]
109pub struct HttpConfig {
110    /// The socket address to listen for connections on.
111    ///
112    /// It _must_ include a port.
113    #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))]
114    pub address: SocketAddr,
115
116    #[configurable(derived)]
117    #[serde(default, skip_serializing_if = "Option::is_none")]
118    pub tls: Option<TlsEnableableConfig>,
119
120    #[configurable(derived)]
121    #[serde(default)]
122    pub keepalive: KeepaliveConfig,
123
124    /// A list of HTTP headers to include in the log event.
125    ///
126    /// Accepts the wildcard (`*`) character for headers matching a specified pattern.
127    ///
128    /// Specifying "*" results in all headers included in the log event.
129    ///
130    /// These headers are not included in the JSON payload if a field with a conflicting name exists.
131    #[serde(default)]
132    #[configurable(metadata(docs::examples = "User-Agent"))]
133    #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
134    #[configurable(metadata(docs::examples = "X-*"))]
135    #[configurable(metadata(docs::examples = "*"))]
136    pub headers: Vec<String>,
137}
138
139fn example_http_config() -> HttpConfig {
140    HttpConfig {
141        address: "0.0.0.0:4318".parse().unwrap(),
142        tls: None,
143        keepalive: KeepaliveConfig::default(),
144        headers: vec![],
145    }
146}
147
148impl GenerateConfig for OpentelemetryConfig {
149    fn generate_config() -> toml::Value {
150        toml::Value::try_from(Self {
151            grpc: example_grpc_config(),
152            http: example_http_config(),
153            acknowledgements: Default::default(),
154            log_namespace: None,
155            use_otlp_decoding: false,
156        })
157        .unwrap()
158    }
159}
160
161impl OpentelemetryConfig {
162    fn get_signal_deserializer(
163        &self,
164        signal_type: OtlpSignalType,
165    ) -> vector_common::Result<Option<OtlpDeserializer>> {
166        if self.use_otlp_decoding {
167            Ok(Some(OtlpDeserializer::new_with_signals(IndexSet::from([
168                signal_type,
169            ]))))
170        } else {
171            Ok(None)
172        }
173    }
174}
175
176#[async_trait::async_trait]
177#[typetag::serde(name = "opentelemetry")]
178impl SourceConfig for OpentelemetryConfig {
179    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
180        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
181        let events_received = register!(EventsReceived);
182        let log_namespace = cx.log_namespace(self.log_namespace);
183
184        let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
185
186        let logs_deserializer = self.get_signal_deserializer(OtlpSignalType::Logs)?;
187        let metrics_deserializer = self.get_signal_deserializer(OtlpSignalType::Metrics)?;
188        let traces_deserializer = self.get_signal_deserializer(OtlpSignalType::Traces)?;
189
190        let log_service = LogsServiceServer::new(Service {
191            pipeline: cx.out.clone(),
192            acknowledgements,
193            log_namespace,
194            events_received: events_received.clone(),
195            deserializer: logs_deserializer.clone(),
196        })
197        .accept_compressed(CompressionEncoding::Gzip)
198        .max_decoding_message_size(usize::MAX);
199
200        let metrics_service = MetricsServiceServer::new(Service {
201            pipeline: cx.out.clone(),
202            acknowledgements,
203            log_namespace,
204            events_received: events_received.clone(),
205            deserializer: metrics_deserializer.clone(),
206        })
207        .accept_compressed(CompressionEncoding::Gzip)
208        .max_decoding_message_size(usize::MAX);
209
210        let trace_service = TraceServiceServer::new(Service {
211            pipeline: cx.out.clone(),
212            acknowledgements,
213            log_namespace,
214            events_received: events_received.clone(),
215            deserializer: traces_deserializer.clone(),
216        })
217        .accept_compressed(CompressionEncoding::Gzip)
218        .max_decoding_message_size(usize::MAX);
219
220        let mut builder = RoutesBuilder::default();
221        builder
222            .add_service(log_service)
223            .add_service(metrics_service)
224            .add_service(trace_service);
225
226        let grpc_source = run_grpc_server_with_routes(
227            self.grpc.address,
228            grpc_tls_settings,
229            builder.routes(),
230            cx.shutdown.clone(),
231        )
232        .map_err(|error| {
233            error!(message = "Source future failed.", %error);
234        });
235
236        let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?;
237        let protocol = http_tls_settings.http_protocol_name();
238        let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
239        let headers =
240            build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
241
242        let filters = build_warp_filter(
243            acknowledgements,
244            log_namespace,
245            cx.out,
246            bytes_received,
247            events_received,
248            headers,
249            logs_deserializer,
250            metrics_deserializer,
251            traces_deserializer,
252        );
253
254        let http_source = run_http_server(
255            self.http.address,
256            http_tls_settings,
257            filters,
258            cx.shutdown,
259            self.http.keepalive.clone(),
260        );
261
262        Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed())
263    }
264
265    // TODO: appropriately handle "severity" meaning across both "severity_text" and "severity_number",
266    // as both are optional and can be converted to/from.
267    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
268        let log_namespace = global_log_namespace.merge(self.log_namespace);
269        let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace])
270            .with_source_metadata(
271                Self::NAME,
272                Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))),
273                &owned_value_path!(RESOURCE_KEY),
274                Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
275                None,
276            )
277            .with_source_metadata(
278                Self::NAME,
279                Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))),
280                &owned_value_path!(ATTRIBUTES_KEY),
281                Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
282                None,
283            )
284            .with_source_metadata(
285                Self::NAME,
286                Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))),
287                &owned_value_path!(TRACE_ID_KEY),
288                Kind::bytes().or_undefined(),
289                None,
290            )
291            .with_source_metadata(
292                Self::NAME,
293                Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))),
294                &owned_value_path!(SPAN_ID_KEY),
295                Kind::bytes().or_undefined(),
296                None,
297            )
298            .with_source_metadata(
299                Self::NAME,
300                Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))),
301                &owned_value_path!(SEVERITY_TEXT_KEY),
302                Kind::bytes().or_undefined(),
303                Some("severity"),
304            )
305            .with_source_metadata(
306                Self::NAME,
307                Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))),
308                &owned_value_path!(SEVERITY_NUMBER_KEY),
309                Kind::integer().or_undefined(),
310                None,
311            )
312            .with_source_metadata(
313                Self::NAME,
314                Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))),
315                &owned_value_path!(FLAGS_KEY),
316                Kind::integer().or_undefined(),
317                None,
318            )
319            .with_source_metadata(
320                Self::NAME,
321                Some(LegacyKey::Overwrite(owned_value_path!(
322                    DROPPED_ATTRIBUTES_COUNT_KEY
323                ))),
324                &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
325                Kind::integer(),
326                None,
327            )
328            .with_source_metadata(
329                Self::NAME,
330                Some(LegacyKey::Overwrite(owned_value_path!(
331                    OBSERVED_TIMESTAMP_KEY
332                ))),
333                &owned_value_path!(OBSERVED_TIMESTAMP_KEY),
334                Kind::timestamp(),
335                None,
336            )
337            .with_source_metadata(
338                Self::NAME,
339                None,
340                &owned_value_path!("timestamp"),
341                Kind::timestamp(),
342                Some("timestamp"),
343            )
344            .with_standard_vector_source_metadata();
345
346        let schema_definition = match log_namespace {
347            LogNamespace::Vector => {
348                schema_definition.with_meaning(OwnedTargetPath::event_root(), "message")
349            }
350            LogNamespace::Legacy => {
351                schema_definition.with_meaning(log_schema().owned_message_path(), "message")
352            }
353        };
354
355        let metrics_output = if self.use_otlp_decoding {
356            SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS)
357        } else {
358            SourceOutput::new_metrics().with_port(METRICS)
359        };
360        vec![
361            SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS),
362            metrics_output,
363            SourceOutput::new_traces().with_port(TRACES),
364        ]
365    }
366
367    fn resources(&self) -> Vec<Resource> {
368        vec![
369            Resource::tcp(self.grpc.address),
370            Resource::tcp(self.http.address),
371        ]
372    }
373
374    fn can_acknowledge(&self) -> bool {
375        true
376    }
377}