vector/sources/opentelemetry/
mod.rs1#[cfg(all(test, feature = "opentelemetry-integration-tests"))]
2mod integration_tests;
3#[cfg(test)]
4mod tests;
5
6pub mod config;
7mod grpc;
8mod http;
9mod reply;
10mod status;
11
12use vector_lib::{
13 event::Event,
14 opentelemetry::proto::{
15 RESOURCE_LOGS_JSON_FIELD, RESOURCE_METRICS_JSON_FIELD, RESOURCE_SPANS_JSON_FIELD,
16 },
17};
18use vrl::value::Value;
19
20fn count_items_inner(resource: &Value, array_id: &str, inner_id: &str) -> usize {
21 let Some(resource_array) = resource.as_array() else {
22 return 0;
23 };
24
25 resource_array
26 .iter()
27 .map(|r| {
28 r.get(array_id)
29 .and_then(|s| s.as_array())
30 .map(|scope_array| {
31 scope_array
32 .iter()
33 .map(|sl| {
34 sl.get(inner_id)
35 .and_then(|lr| lr.as_array())
36 .map(|arr| arr.len())
37 .unwrap_or(0)
38 })
39 .sum::<usize>()
40 })
41 .unwrap_or(0)
42 })
43 .sum()
44}
45
46pub(crate) fn count_otlp_items(events: &[Event]) -> usize {
52 events
53 .iter()
54 .map(|event| match event {
55 Event::Log(log) => {
56 if let Some(resource_logs) = log.get(RESOURCE_LOGS_JSON_FIELD) {
57 count_items_inner(resource_logs, "scopeLogs", "logRecords")
58 } else if let Some(resource_metrics) = log.get(RESOURCE_METRICS_JSON_FIELD) {
59 count_items_inner(resource_metrics, "scopeMetrics", "metrics")
60 } else {
61 0
62 }
63 }
64 Event::Trace(trace) => {
65 if let Some(resource_spans) = trace.get(RESOURCE_SPANS_JSON_FIELD) {
67 count_items_inner(resource_spans, "scopeSpans", "spans")
68 } else {
69 0
70 }
71 }
72 _ => 0, })
74 .sum()
75}