vector/sources/opentelemetry/
mod.rs

1#[cfg(all(test, feature = "opentelemetry-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6pub mod config;
7mod grpc;
8mod http;
9mod reply;
10mod status;
11
12use vector_lib::{
13    event::Event,
14    opentelemetry::proto::{
15        RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
16    },
17};
18use vrl::value::Value;
19
20fn count_items_inner(resource: &Value, array_id: &str, inner_id: &str) -> usize {
21    let Some(resource_array) = resource.as_array() else {
22        return 0;
23    };
24
25    resource_array
26        .iter()
27        .map(|r| {
28            r.get(array_id)
29                .and_then(|s| s.as_array())
30                .map(|scope_array| {
31                    scope_array
32                        .iter()
33                        .map(|sl| {
34                            sl.get(inner_id)
35                                .and_then(|lr| lr.as_array())
36                                .map(|arr| arr.len())
37                                .unwrap_or(0)
38                        })
39                        .sum::<usize>()
40                })
41                .unwrap_or(0)
42        })
43        .sum()
44}
45
46/// Counts individual log records, metrics, or spans within OTLP batch events.
47/// When use_otlp_decoding is enabled, events contain entire OTLP batches, but
48/// we want to count the individual items for metric consistency with other sources.
49/// This iterates through the Value structure, which is less efficient than
50/// counting from the typed protobuf request, but avoids decoding twice.
51pub(crate) fn count_otlp_items(events: &[Event]) -> usize {
52    events
53        .iter()
54        .map(|event| match event {
55            Event::Log(log) => {
56                if let Some(resource_logs) = log.get(RESOURCE_LOGS_JSON_FIELD) {
57                    count_items_inner(resource_logs, "scopeLogs", "logRecords")
58                } else if let Some(resource_metrics) = log.get(RESOURCE_METRICS_JSON_FIELD) {
59                    count_items_inner(resource_metrics, "scopeMetrics", "metrics")
60                } else {
61                    0
62                }
63            }
64            Event::Trace(trace) => {
65                // Count spans in resourceSpans
66                if let Some(resource_spans) = trace.get(RESOURCE_SPANS_JSON_FIELD) {
67                    count_items_inner(resource_spans, "scopeSpans", "spans")
68                } else {
69                    0
70                }
71            }
72            _ => 0, // unreachable
73        })
74        .sum()
75}