vector/sources/opentelemetry/
config.rs

1use std::net::SocketAddr;
2
3use futures::FutureExt;
4use futures_util::{future::join, TryFutureExt};
5
6use tonic::{codec::CompressionEncoding, transport::server::RoutesBuilder};
7
8use vector_lib::{
9    config::{log_schema, LegacyKey, LogNamespace},
10    configurable::configurable_component,
11    internal_event::{BytesReceived, EventsReceived, Protocol},
12    lookup::{owned_value_path, OwnedTargetPath},
13    opentelemetry::{
14        logs::{
15            ATTRIBUTES_KEY, DROPPED_ATTRIBUTES_COUNT_KEY, FLAGS_KEY, OBSERVED_TIMESTAMP_KEY,
16            RESOURCE_KEY, SEVERITY_NUMBER_KEY, SEVERITY_TEXT_KEY, SPAN_ID_KEY, TRACE_ID_KEY,
17        },
18        proto::collector::{
19            logs::v1::logs_service_server::LogsServiceServer,
20            metrics::v1::metrics_service_server::MetricsServiceServer,
21            trace::v1::trace_service_server::TraceServiceServer,
22        },
23    },
24    schema::Definition,
25    tls::{MaybeTlsSettings, TlsEnableableConfig},
26};
27
28use crate::{
29    config::{
30        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
31        SourceContext, SourceOutput,
32    },
33    http::KeepaliveConfig,
34    serde::bool_or_struct,
35    sources::{
36        http_server::{build_param_matcher, remove_duplicates},
37        opentelemetry::{
38            grpc::Service,
39            http::{build_warp_filter, run_http_server},
40        },
41        util::grpc::run_grpc_server_with_routes,
42        Source,
43    },
44};
45
46use vrl::value::{kind::Collection, Kind};
47
48pub const LOGS: &str = "logs";
49pub const METRICS: &str = "metrics";
50pub const TRACES: &str = "traces";
51
52/// Configuration for the `opentelemetry` source.
53#[configurable_component(source("opentelemetry", "Receive OTLP data through gRPC or HTTP."))]
54#[derive(Clone, Debug)]
55#[serde(deny_unknown_fields)]
56pub struct OpentelemetryConfig {
57    #[configurable(derived)]
58    pub grpc: GrpcConfig,
59
60    #[configurable(derived)]
61    pub http: HttpConfig,
62
63    #[configurable(derived)]
64    #[serde(default, deserialize_with = "bool_or_struct")]
65    pub acknowledgements: SourceAcknowledgementsConfig,
66
67    /// The namespace to use for logs. This overrides the global setting.
68    #[configurable(metadata(docs::hidden))]
69    #[serde(default)]
70    pub log_namespace: Option<bool>,
71}
72
73/// Configuration for the `opentelemetry` gRPC server.
74#[configurable_component]
75#[configurable(metadata(docs::examples = "example_grpc_config()"))]
76#[derive(Clone, Debug)]
77#[serde(deny_unknown_fields)]
78pub struct GrpcConfig {
79    /// The socket address to listen for connections on.
80    ///
81    /// It _must_ include a port.
82    #[configurable(metadata(docs::examples = "0.0.0.0:4317", docs::examples = "localhost:4317"))]
83    pub address: SocketAddr,
84
85    #[configurable(derived)]
86    #[serde(default, skip_serializing_if = "Option::is_none")]
87    pub tls: Option<TlsEnableableConfig>,
88}
89
90fn example_grpc_config() -> GrpcConfig {
91    GrpcConfig {
92        address: "0.0.0.0:4317".parse().unwrap(),
93        tls: None,
94    }
95}
96
97/// Configuration for the `opentelemetry` HTTP server.
98#[configurable_component]
99#[configurable(metadata(docs::examples = "example_http_config()"))]
100#[derive(Clone, Debug)]
101#[serde(deny_unknown_fields)]
102pub struct HttpConfig {
103    /// The socket address to listen for connections on.
104    ///
105    /// It _must_ include a port.
106    #[configurable(metadata(docs::examples = "0.0.0.0:4318", docs::examples = "localhost:4318"))]
107    pub address: SocketAddr,
108
109    #[configurable(derived)]
110    #[serde(default, skip_serializing_if = "Option::is_none")]
111    pub tls: Option<TlsEnableableConfig>,
112
113    #[configurable(derived)]
114    #[serde(default)]
115    pub keepalive: KeepaliveConfig,
116
117    /// A list of HTTP headers to include in the log event.
118    ///
119    /// Accepts the wildcard (`*`) character for headers matching a specified pattern.
120    ///
121    /// Specifying "*" results in all headers included in the log event.
122    ///
123    /// These headers are not included in the JSON payload if a field with a conflicting name exists.
124    #[serde(default)]
125    #[configurable(metadata(docs::examples = "User-Agent"))]
126    #[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
127    #[configurable(metadata(docs::examples = "X-*"))]
128    #[configurable(metadata(docs::examples = "*"))]
129    pub headers: Vec<String>,
130}
131
132fn example_http_config() -> HttpConfig {
133    HttpConfig {
134        address: "0.0.0.0:4318".parse().unwrap(),
135        tls: None,
136        keepalive: KeepaliveConfig::default(),
137        headers: vec![],
138    }
139}
140
141impl GenerateConfig for OpentelemetryConfig {
142    fn generate_config() -> toml::Value {
143        toml::Value::try_from(Self {
144            grpc: example_grpc_config(),
145            http: example_http_config(),
146            acknowledgements: Default::default(),
147            log_namespace: None,
148        })
149        .unwrap()
150    }
151}
152
153#[async_trait::async_trait]
154#[typetag::serde(name = "opentelemetry")]
155impl SourceConfig for OpentelemetryConfig {
156    async fn build(&self, cx: SourceContext) -> crate::Result<Source> {
157        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
158        let events_received = register!(EventsReceived);
159        let log_namespace = cx.log_namespace(self.log_namespace);
160
161        let grpc_tls_settings = MaybeTlsSettings::from_config(self.grpc.tls.as_ref(), true)?;
162
163        let log_service = LogsServiceServer::new(Service {
164            pipeline: cx.out.clone(),
165            acknowledgements,
166            log_namespace,
167            events_received: events_received.clone(),
168        })
169        .accept_compressed(CompressionEncoding::Gzip)
170        .max_decoding_message_size(usize::MAX);
171
172        let trace_service = TraceServiceServer::new(Service {
173            pipeline: cx.out.clone(),
174            acknowledgements,
175            log_namespace,
176            events_received: events_received.clone(),
177        })
178        .accept_compressed(CompressionEncoding::Gzip)
179        .max_decoding_message_size(usize::MAX);
180
181        let metrics_service = MetricsServiceServer::new(Service {
182            pipeline: cx.out.clone(),
183            acknowledgements,
184            log_namespace,
185            events_received: events_received.clone(),
186        })
187        .accept_compressed(CompressionEncoding::Gzip)
188        .max_decoding_message_size(usize::MAX);
189
190        let mut builder = RoutesBuilder::default();
191        builder
192            .add_service(log_service)
193            .add_service(metrics_service)
194            .add_service(trace_service);
195        let grpc_source = run_grpc_server_with_routes(
196            self.grpc.address,
197            grpc_tls_settings,
198            builder.routes(),
199            cx.shutdown.clone(),
200        )
201        .map_err(|error| {
202            error!(message = "Source future failed.", %error);
203        });
204
205        let http_tls_settings = MaybeTlsSettings::from_config(self.http.tls.as_ref(), true)?;
206        let protocol = http_tls_settings.http_protocol_name();
207        let bytes_received = register!(BytesReceived::from(Protocol::from(protocol)));
208        let headers =
209            build_param_matcher(&remove_duplicates(self.http.headers.clone(), "headers"))?;
210        let filters = build_warp_filter(
211            acknowledgements,
212            log_namespace,
213            cx.out,
214            bytes_received,
215            events_received,
216            headers,
217        );
218        let http_source = run_http_server(
219            self.http.address,
220            http_tls_settings,
221            filters,
222            cx.shutdown,
223            self.http.keepalive.clone(),
224        );
225
226        Ok(join(grpc_source, http_source).map(|_| Ok(())).boxed())
227    }
228
229    // TODO: appropriately handle "severity" meaning across both "severity_text" and "severity_number",
230    // as both are optional and can be converted to/from.
231    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
232        let log_namespace = global_log_namespace.merge(self.log_namespace);
233        let schema_definition = Definition::new_with_default_metadata(Kind::any(), [log_namespace])
234            .with_source_metadata(
235                Self::NAME,
236                Some(LegacyKey::Overwrite(owned_value_path!(RESOURCE_KEY))),
237                &owned_value_path!(RESOURCE_KEY),
238                Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
239                None,
240            )
241            .with_source_metadata(
242                Self::NAME,
243                Some(LegacyKey::Overwrite(owned_value_path!(ATTRIBUTES_KEY))),
244                &owned_value_path!(ATTRIBUTES_KEY),
245                Kind::object(Collection::from_unknown(Kind::any())).or_undefined(),
246                None,
247            )
248            .with_source_metadata(
249                Self::NAME,
250                Some(LegacyKey::Overwrite(owned_value_path!(TRACE_ID_KEY))),
251                &owned_value_path!(TRACE_ID_KEY),
252                Kind::bytes().or_undefined(),
253                None,
254            )
255            .with_source_metadata(
256                Self::NAME,
257                Some(LegacyKey::Overwrite(owned_value_path!(SPAN_ID_KEY))),
258                &owned_value_path!(SPAN_ID_KEY),
259                Kind::bytes().or_undefined(),
260                None,
261            )
262            .with_source_metadata(
263                Self::NAME,
264                Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_TEXT_KEY))),
265                &owned_value_path!(SEVERITY_TEXT_KEY),
266                Kind::bytes().or_undefined(),
267                Some("severity"),
268            )
269            .with_source_metadata(
270                Self::NAME,
271                Some(LegacyKey::Overwrite(owned_value_path!(SEVERITY_NUMBER_KEY))),
272                &owned_value_path!(SEVERITY_NUMBER_KEY),
273                Kind::integer().or_undefined(),
274                None,
275            )
276            .with_source_metadata(
277                Self::NAME,
278                Some(LegacyKey::Overwrite(owned_value_path!(FLAGS_KEY))),
279                &owned_value_path!(FLAGS_KEY),
280                Kind::integer().or_undefined(),
281                None,
282            )
283            .with_source_metadata(
284                Self::NAME,
285                Some(LegacyKey::Overwrite(owned_value_path!(
286                    DROPPED_ATTRIBUTES_COUNT_KEY
287                ))),
288                &owned_value_path!(DROPPED_ATTRIBUTES_COUNT_KEY),
289                Kind::integer(),
290                None,
291            )
292            .with_source_metadata(
293                Self::NAME,
294                Some(LegacyKey::Overwrite(owned_value_path!(
295                    OBSERVED_TIMESTAMP_KEY
296                ))),
297                &owned_value_path!(OBSERVED_TIMESTAMP_KEY),
298                Kind::timestamp(),
299                None,
300            )
301            .with_source_metadata(
302                Self::NAME,
303                None,
304                &owned_value_path!("timestamp"),
305                Kind::timestamp(),
306                Some("timestamp"),
307            )
308            .with_standard_vector_source_metadata();
309
310        let schema_definition = match log_namespace {
311            LogNamespace::Vector => {
312                schema_definition.with_meaning(OwnedTargetPath::event_root(), "message")
313            }
314            LogNamespace::Legacy => {
315                schema_definition.with_meaning(log_schema().owned_message_path(), "message")
316            }
317        };
318
319        vec![
320            SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS),
321            SourceOutput::new_metrics().with_port(METRICS),
322            SourceOutput::new_traces().with_port(TRACES),
323        ]
324    }
325
326    fn resources(&self) -> Vec<Resource> {
327        vec![
328            Resource::tcp(self.grpc.address),
329            Resource::tcp(self.http.address),
330        ]
331    }
332
333    fn can_acknowledge(&self) -> bool {
334        true
335    }
336}