1use std::net::SocketAddr;
2
3use futures::FutureExt;
4use futures_util::{future::join, TryFutureExt};
5
6use tonic::{codec::CompressionEncoding, transport::server::RoutesBuilder};
7
8use vector_lib::{
9 config::{log_schema, LegacyKey, LogNamespace},
10 configurable::configurable_component,
11 internal_event::{BytesReceived, EventsReceived, Protocol},
12 lookup::{owned_value_path, OwnedTargetPath},
13 opentelemetry::{
14 logs::{
15 ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY,
16 RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY,
17 },
18 proto::collector::{
19 logs::v1::logs_service_server::LogsServiceServer,
20 metrics::v1::metrics_service_server::MetricsServiceServer,
21 trace::v1::trace_service_server::TraceServiceServer,
22 },
23 },
24 schema::Definition,
25 tls::{MaybeTlsSettings, TlsEnableableConfig},
26};
27
28use crate::{
29 config::{
30 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
31 SourceContext, SourceOutput,
32 },
33 http::KeepaliveConfig,
34 serde::bool_or_struct,
35 sources::{
36 http_server::{build_param_matcher, remove_duplicates},
37 opentelemetry::{
38 grpc::Service,
39 http::{build_warp_filter, run_http_server},
40 },
41 util::grpc::run_grpc_server_with_routes,
42 Source,
43 },
44};
45
46use vrl::value::{kind::Collection, Kind};
47
48pub const LOGS: &str = "logs";
49pub const METRICS: &str = "metrics";
50pub const TRACES: &str = "traces";
51
52#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub struct OpentelemetryConfig {
57 #[configurable(derived)]
58 pub grpc: GrpcConfig,
59
60 #[configurable(derived)]
61 pub http: HttpConfig,
62
63 #[configurable(derived)]
64 #[serde(default, deserialize_with = "bool_or_struct")]
65 pub acknowledgements: SourceAcknowledgementsConfig,
66
67 #[configurable(metadata(docs::hidden))]
69 #[serde(default)]
70 pub log_namespace: Option<bool>,
71}
72
73#[configurable_component]
75#[configurable(metadata(docs::examples = "example_grpc_config()"))]
76#[derive(Clone, Debug)]
77#[serde(deny_unknown_fields)]
78pub struct GrpcConfig {
79 #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))]
83 pub address: SocketAddr,
84
85 #[configurable(derived)]
86 #[serde(default, skip_serializing_if = "Option::is_none")]
87 pub tls: Option<TlsEnableableConfig>,
88}
89
90fn example_grpc_config() -> GrpcConfig {
91 GrpcConfig {
92 address: "0.0.0.0:4317".parse().unwrap(),
93 tls: None,
94 }
95}
96
97#[configurable_component]
99#[configurable(metadata(docs::examples = "example_http_config()"))]
100#[derive(Clone, Debug)]
101#[serde(deny_unknown_fields)]
102pub struct HttpConfig {
103 #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))]
107 pub address: SocketAddr,
108
109 #[configurable(derived)]
110 #[serde(default, skip_serializing_if = "Option::is_none")]
111 pub tls: Option<TlsEnableableConfig>,
112
113 #[configurable(derived)]
114 #[serde(default)]
115 pub keepalive: KeepaliveConfig,
116
117 #[serde(default)]
125 #[configurable(metadata(docs::examples = "User-Agent"))]
126 #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
127 #[configurable(metadata(docs::examples = "X-*"))]
128 #[configurable(metadata(docs::examples = "*"))]
129 pub headers: Vec<String>,
130}
131
132fn example_http_config() -> HttpConfig {
133 HttpConfig {
134 address: "0.0.0.0:4318".parse().unwrap(),
135 tls: None,
136 keepalive: KeepaliveConfig::default(),
137 headers: vec![],
138 }
139}
140
141impl GenerateConfig for OpentelemetryConfig {
142 fn generate_config() -> toml::Value {
143 toml::Value::try_from(Self {
144 grpc: example_grpc_config(),
145 http: example_http_config(),
146 acknowledgements: Default::default(),
147 log_namespace: None,
148 })
149 .unwrap()
150 }
151}
152
153#[async_trait::async_trait]
154#[typetag::serde(name = "opentelemetry")]
155impl SourceConfig for OpentelemetryConfig {
156 async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
157 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
158 let events_received = register!(EventsReceived);
159 let log_namespace = cx.log_namespace(self.log_namespace);
160
161 let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
162
163 let log_service = LogsServiceServer::new(Service {
164 pipeline: cx.out.clone(),
165 acknowledgements,
166 log_namespace,
167 events_received: events_received.clone(),
168 })
169 .accept_compressed(CompressionEncoding::Gzip)
170 .max_decoding_message_size(usize::MAX);
171
172 let trace_service = TraceServiceServer::new(Service {
173 pipeline: cx.out.clone(),
174 acknowledgements,
175 log_namespace,
176 events_received: events_received.clone(),
177 })
178 .accept_compressed(CompressionEncoding::Gzip)
179 .max_decoding_message_size(usize::MAX);
180
181 let metrics_service = MetricsServiceServer::new(Service {
182 pipeline: cx.out.clone(),
183 acknowledgements,
184 log_namespace,
185 events_received: events_received.clone(),
186 })
187 .accept_compressed(CompressionEncoding::Gzip)
188 .max_decoding_message_size(usize::MAX);
189
190 let mut builder = RoutesBuilder::default();
191 builder
192 .add_service(log_service)
193 .add_service(metrics_service)
194 .add_service(trace_service);
195 let grpc_source = run_grpc_server_with_routes(
196 self.grpc.address,
197 grpc_tls_settings,
198 builder.routes(),
199 cx.shutdown.clone(),
200 )
201 .map_err(|error| {
202 error!(message = "Source future failed.", %error);
203 });
204
205 let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?;
206 let protocol = http_tls_settings.http_protocol_name();
207 let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
208 let headers =
209 build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
210 let filters = build_warp_filter(
211 acknowledgements,
212 log_namespace,
213 cx.out,
214 bytes_received,
215 events_received,
216 headers,
217 );
218 let http_source = run_http_server(
219 self.http.address,
220 http_tls_settings,
221 filters,
222 cx.shutdown,
223 self.http.keepalive.clone(),
224 );
225
226 Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed())
227 }
228
229 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
232 let log_namespace = global_log_namespace.merge(self.log_namespace);
233 let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace])
234 .with_source_metadata(
235 Self::NAME,
236 Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))),
237 &owned_value_path!(RESOURCE_KEY),
238 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
239 None,
240 )
241 .with_source_metadata(
242 Self::NAME,
243 Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))),
244 &owned_value_path!(ATTRIBUTES_KEY),
245 Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
246 None,
247 )
248 .with_source_metadata(
249 Self::NAME,
250 Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))),
251 &owned_value_path!(TRACE_ID_KEY),
252 Kind::bytes().or_undefined(),
253 None,
254 )
255 .with_source_metadata(
256 Self::NAME,
257 Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))),
258 &owned_value_path!(SPAN_ID_KEY),
259 Kind::bytes().or_undefined(),
260 None,
261 )
262 .with_source_metadata(
263 Self::NAME,
264 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))),
265 &owned_value_path!(SEVERITY_TEXT_KEY),
266 Kind::bytes().or_undefined(),
267 Some("severity"),
268 )
269 .with_source_metadata(
270 Self::NAME,
271 Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))),
272 &owned_value_path!(SEVERITY_NUMBER_KEY),
273 Kind::integer().or_undefined(),
274 None,
275 )
276 .with_source_metadata(
277 Self::NAME,
278 Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))),
279 &owned_value_path!(FLAGS_KEY),
280 Kind::integer().or_undefined(),
281 None,
282 )
283 .with_source_metadata(
284 Self::NAME,
285 Some(LegacyKey::Overwrite(owned_value_path!(
286 DROPPED_ATTRIBUTES_COUNT_KEY
287 ))),
288 &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
289 Kind::integer(),
290 None,
291 )
292 .with_source_metadata(
293 Self::NAME,
294 Some(LegacyKey::Overwrite(owned_value_path!(
295 OBSERVED_TIMESTAMP_KEY
296 ))),
297 &owned_value_path!(OBSERVED_TIMESTAMP_KEY),
298 Kind::timestamp(),
299 None,
300 )
301 .with_source_metadata(
302 Self::NAME,
303 None,
304 &owned_value_path!("timestamp"),
305 Kind::timestamp(),
306 Some("timestamp"),
307 )
308 .with_standard_vector_source_metadata();
309
310 let schema_definition = match log_namespace {
311 LogNamespace::Vector => {
312 schema_definition.with_meaning(OwnedTargetPath::event_root(), "message")
313 }
314 LogNamespace::Legacy => {
315 schema_definition.with_meaning(log_schema().owned_message_path(), "message")
316 }
317 };
318
319 vec![
320 SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS),
321 SourceOutput::new_metrics().with_port(METRICS),
322 SourceOutput::new_traces().with_port(TRACES),
323 ]
324 }
325
326 fn resources(&self) -> Vec<Resource> {
327 vec![
328 Resource::tcp(self.grpc.address),
329 Resource::tcp(self.http.address),
330 ]
331 }
332
333 fn can_acknowledge(&self) -> bool {
334 true
335 }
336}