1use std::net::SocketAddr;
2
3use crate::{
4 config::{
5 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
6 SourceContext, SourceOutput,
7 },
8 http::KeepaliveConfig,
9 serde::bool_or_struct,
10 sources::{
11 Source,
12 http_server::{build_param_matcher, remove_duplicates},
13 opentelemetry::{
14 grpc::Service,
15 http::{build_warp_filter, run_http_server},
16 },
17 util::grpc::run_grpc_server_with_routes,
18 },
19};
20use futures::FutureExt;
21use futures_util::{TryFutureExt, future::join};
22use tonic::{codec::CompressionEncoding, transport::server::RoutesBuilder};
23use vector_config::indexmap::IndexSet;
24use vector_lib::{
25 codecs::decoding::{OtlpDeserializer, OtlpSignalType},
26 config::{LegacyKey, LogNamespace, log_schema},
27 configurable::configurable_component,
28 internal_event::{BytesReceived, EventsReceived, Protocol},
29 lookup::{OwnedTargetPath, owned_value_path},
30 opentelemetry::{
31 logs::{
32 ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY,
33 RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY,
34 },
35 proto::collector::{
36 logs::v1::logs_service_server::LogsServiceServer,
37 metrics::v1::metrics_service_server::MetricsServiceServer,
38 trace::v1::trace_service_server::TraceServiceServer,
39 },
40 },
41 schema::Definition,
42 tls::{MaybeTlsSettings, TlsEnableableConfig},
43};
44use vrl::value::{Kind, kind::Collection};
45
46pub const LOGS: &str = "logs";
47pub const METRICS: &str = "metrics";
48pub const TRACES: &str = "traces";
49
50#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
52#[derive(Clone, Debug)]
53#[serde(deny_unknown_fields)]
54pub struct OpentelemetryConfig {
55 #[configurable(derived)]
56 pub grpc: GrpcConfig,
57
58 #[configurable(derived)]
59 pub http: HttpConfig,
60
61 #[configurable(derived)]
62 #[serde(default, deserialize_with = "bool_or_struct")]
63 pub acknowledgements: SourceAcknowledgementsConfig,
64
65 #[configurable(metadata(docs::hidden))]
67 #[serde(default)]
68 pub log_namespace: Option<bool>,
69
70 #[configurable(derived)]
76 #[serde(default)]
77 pub use_otlp_decoding: bool,
78}
79
80#[configurable_component]
82#[configurable(metadata(docs::examples = "example_grpc_config()"))]
83#[derive(Clone, Debug)]
84#[serde(deny_unknown_fields)]
85pub struct GrpcConfig {
86 #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))]
90 pub address: SocketAddr,
91
92 #[configurable(derived)]
93 #[serde(default, skip_serializing_if = "Option::is_none")]
94 pub tls: Option<TlsEnableableConfig>,
95}
96
97fn example_grpc_config() -> GrpcConfig {
98 GrpcConfig {
99 address: "0.0.0.0:4317".parse().unwrap(),
100 tls: None,
101 }
102}
103
104#[configurable_component]
106#[configurable(metadata(docs::examples = "example_http_config()"))]
107#[derive(Clone, Debug)]
108#[serde(deny_unknown_fields)]
109pub struct HttpConfig {
110 #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))]
114 pub address: SocketAddr,
115
116 #[configurable(derived)]
117 #[serde(default, skip_serializing_if = "Option::is_none")]
118 pub tls: Option<TlsEnableableConfig>,
119
120 #[configurable(derived)]
121 #[serde(default)]
122 pub keepalive: KeepaliveConfig,
123
124 #[serde(default)]
132 #[configurable(metadata(docs::examples = "User-Agent"))]
133 #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
134 #[configurable(metadata(docs::examples = "X-*"))]
135 #[configurable(metadata(docs::examples = "*"))]
136 pub headers: Vec<String>,
137}
138
139fn example_http_config() -> HttpConfig {
140 HttpConfig {
141 address: "0.0.0.0:4318".parse().unwrap(),
142 tls: None,
143 keepalive: KeepaliveConfig::default(),
144 headers: vec![],
145 }
146}
147
148impl GenerateConfig for OpentelemetryConfig {
149 fn generate_config() -> toml::Value {
150 toml::Value::try_from(Self {
151 grpc: example_grpc_config(),
152 http: example_http_config(),
153 acknowledgements: Default::default(),
154 log_namespace: None,
155 use_otlp_decoding: false,
156 })
157 .unwrap()
158 }
159}
160
161impl OpentelemetryConfig {
162 fn get_signal_deserializer(
163 &self,
164 signal_type: OtlpSignalType,
165 ) -> vector_common::Result<Option<OtlpDeserializer>> {
166 if self.use_otlp_decoding {
167 Ok(Some(OtlpDeserializer::new_with_signals(IndexSet::from([
168 signal_type,
169 ]))))
170 } else {
171 Ok(None)
172 }
173 }
174}
175
176#[async_trait::async_trait]
177#[typetag::serde(name = "opentelemetry")]
178impl SourceConfig for OpentelemetryConfig {
179 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
180 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
181 let events_received = register!(EventsReceived);
182 let log_namespace = cx.log_namespace(self.log_namespace);
183
184 let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
185
186 let logs_deserializer = self.get_signal_deserializer(OtlpSignalType::Logs)?;
187 let metrics_deserializer = self.get_signal_deserializer(OtlpSignalType::Metrics)?;
188 let traces_deserializer = self.get_signal_deserializer(OtlpSignalType::Traces)?;
189
190 let log_service = LogsServiceServer::new(Service {
191 pipeline: cx.out.clone(),
192 acknowledgements,
193 log_namespace,
194 events_received: events_received.clone(),
195 deserializer: logs_deserializer.clone(),
196 })
197 .accept_compressed(CompressionEncoding::Gzip)
198 .max_decoding_message_size(usize::MAX);
199
200 let metrics_service = MetricsServiceServer::new(Service {
201 pipeline: cx.out.clone(),
202 acknowledgements,
203 log_namespace,
204 events_received: events_received.clone(),
205 deserializer: metrics_deserializer.clone(),
206 })
207 .accept_compressed(CompressionEncoding::Gzip)
208 .max_decoding_message_size(usize::MAX);
209
210 let trace_service = TraceServiceServer::new(Service {
211 pipeline: cx.out.clone(),
212 acknowledgements,
213 log_namespace,
214 events_received: events_received.clone(),
215 deserializer: traces_deserializer.clone(),
216 })
217 .accept_compressed(CompressionEncoding::Gzip)
218 .max_decoding_message_size(usize::MAX);
219
220 let mut builder = RoutesBuilder::default();
221 builder
222 .add_service(log_service)
223 .add_service(metrics_service)
224 .add_service(trace_service);
225
226 let grpc_source = run_grpc_server_with_routes(
227 self.grpc.address,
228 grpc_tls_settings,
229 builder.routes(),
230 cx.shutdown.clone(),
231 )
232 .map_err(|error| {
233 error!(message = "Source future failed.", %error);
234 });
235
236 let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?;
237 let protocol = http_tls_settings.http_protocol_name();
238 let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
239 let headers =
240 build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
241
242 let filters = build_warp_filter(
243 acknowledgements,
244 log_namespace,
245 cx.out,
246 bytes_received,
247 events_received,
248 headers,
249 logs_deserializer,
250 metrics_deserializer,
251 traces_deserializer,
252 );
253
254 let http_source = run_http_server(
255 self.http.address,
256 http_tls_settings,
257 filters,
258 cx.shutdown,
259 self.http.keepalive.clone(),
260 );
261
262 Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed())
263 }
264
265 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
268 let log_namespace = global_log_namespace.merge(self.log_namespace);
269 let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace])
270 .with_source_metadata(
271 Self::NAME,
272 Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))),
273 &owned_value_path!(RESOURCE_KEY),
274 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
275 None,
276 )
277 .with_source_metadata(
278 Self::NAME,
279 Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))),
280 &owned_value_path!(ATTRIBUTES_KEY),
281 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
282 None,
283 )
284 .with_source_metadata(
285 Self::NAME,
286 Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))),
287 &owned_value_path!(TRACE_ID_KEY),
288 Kind::bytes().or_undefined(),
289 None,
290 )
291 .with_source_metadata(
292 Self::NAME,
293 Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))),
294 &owned_value_path!(SPAN_ID_KEY),
295 Kind::bytes().or_undefined(),
296 None,
297 )
298 .with_source_metadata(
299 Self::NAME,
300 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))),
301 &owned_value_path!(SEVERITY_TEXT_KEY),
302 Kind::bytes().or_undefined(),
303 Some("severity"),
304 )
305 .with_source_metadata(
306 Self::NAME,
307 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))),
308 &owned_value_path!(SEVERITY_NUMBER_KEY),
309 Kind::integer().or_undefined(),
310 None,
311 )
312 .with_source_metadata(
313 Self::NAME,
314 Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))),
315 &owned_value_path!(FLAGS_KEY),
316 Kind::integer().or_undefined(),
317 None,
318 )
319 .with_source_metadata(
320 Self::NAME,
321 Some(LegacyKey::Overwrite(owned_value_path!(
322 DROPPED_ATTRIBUTES_COUNT_KEY
323 ))),
324 &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
325 Kind::integer(),
326 None,
327 )
328 .with_source_metadata(
329 Self::NAME,
330 Some(LegacyKey::Overwrite(owned_value_path!(
331 OBSERVED_TIMESTAMP_KEY
332 ))),
333 &owned_value_path!(OBSERVED_TIMESTAMP_KEY),
334 Kind::timestamp(),
335 None,
336 )
337 .with_source_metadata(
338 Self::NAME,
339 None,
340 &owned_value_path!("timestamp"),
341 Kind::timestamp(),
342 Some("timestamp"),
343 )
344 .with_standard_vector_source_metadata();
345
346 let schema_definition = match log_namespace {
347 LogNamespace::Vector => {
348 schema_definition.with_meaning(OwnedTargetPath::event_root(), "message")
349 }
350 LogNamespace::Legacy => {
351 schema_definition.with_meaning(log_schema().owned_message_path(), "message")
352 }
353 };
354
355 let metrics_output = if self.use_otlp_decoding {
356 SourceOutput::new_maybe_logs(DataType::Log, Definition::any()).with_port(METRICS)
357 } else {
358 SourceOutput::new_metrics().with_port(METRICS)
359 };
360 vec![
361 SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS),
362 metrics_output,
363 SourceOutput::new_traces().with_port(TRACES),
364 ]
365 }
366
367 fn resources(&self) -> Vec<Resource> {
368 vec![
369 Resource::tcp(self.grpc.address),
370 Resource::tcp(self.http.address),
371 ]
372 }
373
374 fn can_acknowledge(&self) -> bool {
375 true
376 }
377}