1use std::net::SocketAddr;
2
3use futures::FutureExt;
4use futures_util::{TryFutureExt, future::join};
5use tonic::{codec::CompressionEncoding, transport::server::RoutesBuilder};
6use vector_lib::{
7 codecs::decoding::ProtobufDeserializer,
8 config::{LegacyKey, LogNamespace, log_schema},
9 configurable::configurable_component,
10 internal_event::{BytesReceived, EventsReceived, Protocol},
11 lookup::{OwnedTargetPath, owned_value_path},
12 opentelemetry::{
13 logs::{
14 ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY,
15 RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY,
16 },
17 proto::collector::{
18 logs::v1::logs_service_server::LogsServiceServer,
19 metrics::v1::metrics_service_server::MetricsServiceServer,
20 trace::v1::trace_service_server::TraceServiceServer,
21 },
22 },
23 schema::Definition,
24 tls::{MaybeTlsSettings, TlsEnableableConfig},
25};
26use vrl::{
27 protobuf::parse::Options,
28 value::{Kind, kind::Collection},
29};
30
31use crate::{
32 config::{
33 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
34 SourceContext, SourceOutput,
35 },
36 http::KeepaliveConfig,
37 serde::bool_or_struct,
38 sources::{
39 Source,
40 http_server::{build_param_matcher, remove_duplicates},
41 opentelemetry::{
42 grpc::Service,
43 http::{build_warp_filter, run_http_server},
44 },
45 util::grpc::run_grpc_server_with_routes,
46 },
47};
48
49pub const LOGS: &str = "logs";
50pub const METRICS: &str = "metrics";
51pub const TRACES: &str = "traces";
52
53pub const OTEL_PROTO_LOGS_REQUEST: &str =
54 "opentelemetry.proto.collector.logs.v1.ExportLogsServiceRequest";
55pub const OTEL_PROTO_TRACES_REQUEST: &str =
56 "opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest";
57pub const OTEL_PROTO_METRICS_REQUEST: &str =
58 "opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest";
59
60#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
62#[derive(Clone, Debug)]
63#[serde(deny_unknown_fields)]
64pub struct OpentelemetryConfig {
65 #[configurable(derived)]
66 pub grpc: GrpcConfig,
67
68 #[configurable(derived)]
69 pub http: HttpConfig,
70
71 #[configurable(derived)]
72 #[serde(default, deserialize_with = "bool_or_struct")]
73 pub acknowledgements: SourceAcknowledgementsConfig,
74
75 #[configurable(metadata(docs::hidden))]
77 #[serde(default)]
78 pub log_namespace: Option<bool>,
79
80 #[configurable(derived)]
86 #[serde(default)]
87 pub use_otlp_decoding: bool,
88}
89
90#[configurable_component]
92#[configurable(metadata(docs::examples = "example_grpc_config()"))]
93#[derive(Clone, Debug)]
94#[serde(deny_unknown_fields)]
95pub struct GrpcConfig {
96 #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))]
100 pub address: SocketAddr,
101
102 #[configurable(derived)]
103 #[serde(default, skip_serializing_if = "Option::is_none")]
104 pub tls: Option<TlsEnableableConfig>,
105}
106
107fn example_grpc_config() -> GrpcConfig {
108 GrpcConfig {
109 address: "0.0.0.0:4317".parse().unwrap(),
110 tls: None,
111 }
112}
113
114#[configurable_component]
116#[configurable(metadata(docs::examples = "example_http_config()"))]
117#[derive(Clone, Debug)]
118#[serde(deny_unknown_fields)]
119pub struct HttpConfig {
120 #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))]
124 pub address: SocketAddr,
125
126 #[configurable(derived)]
127 #[serde(default, skip_serializing_if = "Option::is_none")]
128 pub tls: Option<TlsEnableableConfig>,
129
130 #[configurable(derived)]
131 #[serde(default)]
132 pub keepalive: KeepaliveConfig,
133
134 #[serde(default)]
142 #[configurable(metadata(docs::examples = "User-Agent"))]
143 #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
144 #[configurable(metadata(docs::examples = "X-*"))]
145 #[configurable(metadata(docs::examples = "*"))]
146 pub headers: Vec<String>,
147}
148
149fn example_http_config() -> HttpConfig {
150 HttpConfig {
151 address: "0.0.0.0:4318".parse().unwrap(),
152 tls: None,
153 keepalive: KeepaliveConfig::default(),
154 headers: vec![],
155 }
156}
157
158impl GenerateConfig for OpentelemetryConfig {
159 fn generate_config() -> toml::Value {
160 toml::Value::try_from(Self {
161 grpc: example_grpc_config(),
162 http: example_http_config(),
163 acknowledgements: Default::default(),
164 log_namespace: None,
165 use_otlp_decoding: false,
166 })
167 .unwrap()
168 }
169}
170
171impl OpentelemetryConfig {
172 fn get_deserializer(
173 &self,
174 message_type: &str,
175 ) -> vector_common::Result<Option<ProtobufDeserializer>> {
176 if self.use_otlp_decoding {
177 let deserializer = ProtobufDeserializer::new_from_bytes(
178 vector_lib::opentelemetry::proto::DESCRIPTOR_BYTES,
179 message_type,
180 Options {
181 use_json_names: true,
182 },
183 )?;
184 Ok(Some(deserializer))
185 } else {
186 Ok(None)
187 }
188 }
189}
190
191#[async_trait::async_trait]
192#[typetag::serde(name = "opentelemetry")]
193impl SourceConfig for OpentelemetryConfig {
194 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
195 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
196 let events_received = register!(EventsReceived);
197 let log_namespace = cx.log_namespace(self.log_namespace);
198
199 let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
200
201 let log_deserializer = self.get_deserializer(OTEL_PROTO_LOGS_REQUEST)?;
202 let log_service = LogsServiceServer::new(Service {
203 pipeline: cx.out.clone(),
204 acknowledgements,
205 log_namespace,
206 events_received: events_received.clone(),
207 deserializer: log_deserializer.clone(),
208 })
209 .accept_compressed(CompressionEncoding::Gzip)
210 .max_decoding_message_size(usize::MAX);
211
212 let metric_deserializer = self.get_deserializer(OTEL_PROTO_METRICS_REQUEST)?;
213 let metrics_service = MetricsServiceServer::new(Service {
214 pipeline: cx.out.clone(),
215 acknowledgements,
216 log_namespace,
217 events_received: events_received.clone(),
218 deserializer: metric_deserializer,
219 })
220 .accept_compressed(CompressionEncoding::Gzip)
221 .max_decoding_message_size(usize::MAX);
222
223 let trace_deserializer = self.get_deserializer(OTEL_PROTO_TRACES_REQUEST)?;
224 let trace_service = TraceServiceServer::new(Service {
225 pipeline: cx.out.clone(),
226 acknowledgements,
227 log_namespace,
228 events_received: events_received.clone(),
229 deserializer: trace_deserializer,
230 })
231 .accept_compressed(CompressionEncoding::Gzip)
232 .max_decoding_message_size(usize::MAX);
233
234 let mut builder = RoutesBuilder::default();
235 builder
236 .add_service(log_service)
237 .add_service(metrics_service)
238 .add_service(trace_service);
239
240 let grpc_source = run_grpc_server_with_routes(
241 self.grpc.address,
242 grpc_tls_settings,
243 builder.routes(),
244 cx.shutdown.clone(),
245 )
246 .map_err(|error| {
247 error!(message = "Source future failed.", %error);
248 });
249
250 let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?;
251 let protocol = http_tls_settings.http_protocol_name();
252 let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
253 let headers =
254 build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
255
256 let filters = build_warp_filter(
257 acknowledgements,
258 log_namespace,
259 cx.out,
260 bytes_received,
261 events_received,
262 headers,
263 log_deserializer,
264 );
265
266 let http_source = run_http_server(
267 self.http.address,
268 http_tls_settings,
269 filters,
270 cx.shutdown,
271 self.http.keepalive.clone(),
272 );
273
274 Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed())
275 }
276
277 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
280 let log_namespace = global_log_namespace.merge(self.log_namespace);
281 let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace])
282 .with_source_metadata(
283 Self::NAME,
284 Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))),
285 &owned_value_path!(RESOURCE_KEY),
286 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
287 None,
288 )
289 .with_source_metadata(
290 Self::NAME,
291 Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))),
292 &owned_value_path!(ATTRIBUTES_KEY),
293 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
294 None,
295 )
296 .with_source_metadata(
297 Self::NAME,
298 Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))),
299 &owned_value_path!(TRACE_ID_KEY),
300 Kind::bytes().or_undefined(),
301 None,
302 )
303 .with_source_metadata(
304 Self::NAME,
305 Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))),
306 &owned_value_path!(SPAN_ID_KEY),
307 Kind::bytes().or_undefined(),
308 None,
309 )
310 .with_source_metadata(
311 Self::NAME,
312 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))),
313 &owned_value_path!(SEVERITY_TEXT_KEY),
314 Kind::bytes().or_undefined(),
315 Some("severity"),
316 )
317 .with_source_metadata(
318 Self::NAME,
319 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))),
320 &owned_value_path!(SEVERITY_NUMBER_KEY),
321 Kind::integer().or_undefined(),
322 None,
323 )
324 .with_source_metadata(
325 Self::NAME,
326 Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))),
327 &owned_value_path!(FLAGS_KEY),
328 Kind::integer().or_undefined(),
329 None,
330 )
331 .with_source_metadata(
332 Self::NAME,
333 Some(LegacyKey::Overwrite(owned_value_path!(
334 DROPPED_ATTRIBUTES_COUNT_KEY
335 ))),
336 &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
337 Kind::integer(),
338 None,
339 )
340 .with_source_metadata(
341 Self::NAME,
342 Some(LegacyKey::Overwrite(owned_value_path!(
343 OBSERVED_TIMESTAMP_KEY
344 ))),
345 &owned_value_path!(OBSERVED_TIMESTAMP_KEY),
346 Kind::timestamp(),
347 None,
348 )
349 .with_source_metadata(
350 Self::NAME,
351 None,
352 &owned_value_path!("timestamp"),
353 Kind::timestamp(),
354 Some("timestamp"),
355 )
356 .with_standard_vector_source_metadata();
357
358 let schema_definition = match log_namespace {
359 LogNamespace::Vector => {
360 schema_definition.with_meaning(OwnedTargetPath::event_root(), "message")
361 }
362 LogNamespace::Legacy => {
363 schema_definition.with_meaning(log_schema().owned_message_path(), "message")
364 }
365 };
366
367 let metrics_output = if self.use_otlp_decoding {
368 SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS)
369 } else {
370 SourceOutput::new_metrics().with_port(METRICS)
371 };
372 vec![
373 SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS),
374 metrics_output,
375 SourceOutput::new_traces().with_port(TRACES),
376 ]
377 }
378
379 fn resources(&self) -> Vec<Resource> {
380 vec![
381 Resource::tcp(self.grpc.address),
382 Resource::tcp(self.http.address),
383 ]
384 }
385
386 fn can_acknowledge(&self) -> bool {
387 true
388 }
389}