opentelemetry_proto/
logs.rs

1use super::common::{kv_list_into_value, to_hex};
2use crate::proto::{
3    common::v1::{any_value::Value as PBValue, InstrumentationScope},
4    logs::v1::{LogRecord, ResourceLogs, SeverityNumber},
5    resource::v1::Resource,
6};
7use bytes::Bytes;
8use chrono::{DateTime, TimeZone, Utc};
9use vector_core::{
10    config::{log_schema, LegacyKey, LogNamespace},
11    event::{Event, LogEvent},
12};
13use vrl::core::Value;
14use vrl::path;
15
16const SOURCE_NAME: &str = "opentelemetry";
17pub const RESOURCE_KEY: &str = "resources";
18pub const ATTRIBUTES_KEY: &str = "attributes";
19pub const SCOPE_KEY: &str = "scope";
20pub const NAME_KEY: &str = "name";
21pub const VERSION_KEY: &str = "version";
22pub const TRACE_ID_KEY: &str = "trace_id";
23pub const SPAN_ID_KEY: &str = "span_id";
24pub const SEVERITY_TEXT_KEY: &str = "severity_text";
25pub const SEVERITY_NUMBER_KEY: &str = "severity_number";
26pub const OBSERVED_TIMESTAMP_KEY: &str = "observed_timestamp";
27pub const DROPPED_ATTRIBUTES_COUNT_KEY: &str = "dropped_attributes_count";
28pub const FLAGS_KEY: &str = "flags";
29
30impl ResourceLogs {
31    pub fn into_event_iter(self, log_namespace: LogNamespace) -> impl Iterator<Item = Event> {
32        let now = Utc::now();
33
34        self.scope_logs.into_iter().flat_map(move |scope_log| {
35            let scope = scope_log.scope;
36            let resource = self.resource.clone();
37            scope_log.log_records.into_iter().map(move |log_record| {
38                ResourceLog {
39                    resource: resource.clone(),
40                    scope: scope.clone(),
41                    log_record,
42                }
43                .into_event(log_namespace, now)
44            })
45        })
46    }
47}
48
49struct ResourceLog {
50    resource: Option<Resource>,
51    scope: Option<InstrumentationScope>,
52    log_record: LogRecord,
53}
54
55// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.15.0/specification/logs/data-model.md
56impl ResourceLog {
57    fn into_event(self, log_namespace: LogNamespace, now: DateTime<Utc>) -> Event {
58        let mut log = match log_namespace {
59            LogNamespace::Vector => {
60                if let Some(v) = self.log_record.body.and_then(|av| av.value) {
61                    LogEvent::from(<PBValue as Into<Value>>::into(v))
62                } else {
63                    LogEvent::from(Value::Null)
64                }
65            }
66            LogNamespace::Legacy => {
67                let mut log = LogEvent::default();
68                if let Some(v) = self.log_record.body.and_then(|av| av.value) {
69                    log.maybe_insert(log_schema().message_key_target_path(), v);
70                }
71                log
72            }
73        };
74
75        // Insert instrumentation scope (scope name, version, and attributes)
76        if let Some(scope) = self.scope {
77            if !scope.name.is_empty() {
78                log_namespace.insert_source_metadata(
79                    SOURCE_NAME,
80                    &mut log,
81                    Some(LegacyKey::Overwrite(path!(SCOPE_KEY, NAME_KEY))),
82                    path!(SCOPE_KEY, NAME_KEY),
83                    scope.name,
84                );
85            }
86            if !scope.version.is_empty() {
87                log_namespace.insert_source_metadata(
88                    SOURCE_NAME,
89                    &mut log,
90                    Some(LegacyKey::Overwrite(path!(SCOPE_KEY, VERSION_KEY))),
91                    path!(SCOPE_KEY, VERSION_KEY),
92                    scope.version,
93                );
94            }
95            if !scope.attributes.is_empty() {
96                log_namespace.insert_source_metadata(
97                    SOURCE_NAME,
98                    &mut log,
99                    Some(LegacyKey::Overwrite(path!(SCOPE_KEY, ATTRIBUTES_KEY))),
100                    path!(SCOPE_KEY, ATTRIBUTES_KEY),
101                    kv_list_into_value(scope.attributes),
102                );
103            }
104            if scope.dropped_attributes_count > 0 {
105                log_namespace.insert_source_metadata(
106                    SOURCE_NAME,
107                    &mut log,
108                    Some(LegacyKey::Overwrite(path!(
109                        SCOPE_KEY,
110                        DROPPED_ATTRIBUTES_COUNT_KEY
111                    ))),
112                    path!(SCOPE_KEY, DROPPED_ATTRIBUTES_COUNT_KEY),
113                    scope.dropped_attributes_count,
114                );
115            }
116        }
117
118        // Optional fields
119        if let Some(resource) = self.resource {
120            if !resource.attributes.is_empty() {
121                log_namespace.insert_source_metadata(
122                    SOURCE_NAME,
123                    &mut log,
124                    Some(LegacyKey::Overwrite(path!(RESOURCE_KEY))),
125                    path!(RESOURCE_KEY),
126                    kv_list_into_value(resource.attributes),
127                );
128            }
129        }
130        if !self.log_record.attributes.is_empty() {
131            log_namespace.insert_source_metadata(
132                SOURCE_NAME,
133                &mut log,
134                Some(LegacyKey::Overwrite(path!(ATTRIBUTES_KEY))),
135                path!(ATTRIBUTES_KEY),
136                kv_list_into_value(self.log_record.attributes),
137            );
138        }
139        if !self.log_record.trace_id.is_empty() {
140            log_namespace.insert_source_metadata(
141                SOURCE_NAME,
142                &mut log,
143                Some(LegacyKey::Overwrite(path!(TRACE_ID_KEY))),
144                path!(TRACE_ID_KEY),
145                Bytes::from(to_hex(&self.log_record.trace_id)),
146            );
147        }
148        if !self.log_record.span_id.is_empty() {
149            log_namespace.insert_source_metadata(
150                SOURCE_NAME,
151                &mut log,
152                Some(LegacyKey::Overwrite(path!(SPAN_ID_KEY))),
153                path!(SPAN_ID_KEY),
154                Bytes::from(to_hex(&self.log_record.span_id)),
155            );
156        }
157        if !self.log_record.severity_text.is_empty() {
158            log_namespace.insert_source_metadata(
159                SOURCE_NAME,
160                &mut log,
161                Some(LegacyKey::Overwrite(path!(SEVERITY_TEXT_KEY))),
162                path!(SEVERITY_TEXT_KEY),
163                self.log_record.severity_text,
164            );
165        }
166        if self.log_record.severity_number != SeverityNumber::Unspecified as i32 {
167            log_namespace.insert_source_metadata(
168                SOURCE_NAME,
169                &mut log,
170                Some(LegacyKey::Overwrite(path!(SEVERITY_NUMBER_KEY))),
171                path!(SEVERITY_NUMBER_KEY),
172                self.log_record.severity_number,
173            );
174        }
175        if self.log_record.flags > 0 {
176            log_namespace.insert_source_metadata(
177                SOURCE_NAME,
178                &mut log,
179                Some(LegacyKey::Overwrite(path!(FLAGS_KEY))),
180                path!(FLAGS_KEY),
181                self.log_record.flags,
182            );
183        }
184
185        log_namespace.insert_source_metadata(
186            SOURCE_NAME,
187            &mut log,
188            Some(LegacyKey::Overwrite(path!(DROPPED_ATTRIBUTES_COUNT_KEY))),
189            path!(DROPPED_ATTRIBUTES_COUNT_KEY),
190            self.log_record.dropped_attributes_count,
191        );
192
193        // According to log data model spec, if observed_time_unix_nano is missing, the collector
194        // should set it to the current time.
195        let observed_timestamp = if self.log_record.observed_time_unix_nano > 0 {
196            Utc.timestamp_nanos(self.log_record.observed_time_unix_nano as i64)
197                .into()
198        } else {
199            Value::Timestamp(now)
200        };
201        log_namespace.insert_source_metadata(
202            SOURCE_NAME,
203            &mut log,
204            Some(LegacyKey::Overwrite(path!(OBSERVED_TIMESTAMP_KEY))),
205            path!(OBSERVED_TIMESTAMP_KEY),
206            observed_timestamp.clone(),
207        );
208
209        // If time_unix_nano is not present (0 represents missing or unknown timestamp) use observed time
210        let timestamp = if self.log_record.time_unix_nano > 0 {
211            Utc.timestamp_nanos(self.log_record.time_unix_nano as i64)
212                .into()
213        } else {
214            observed_timestamp
215        };
216        log_namespace.insert_source_metadata(
217            SOURCE_NAME,
218            &mut log,
219            log_schema().timestamp_key().map(LegacyKey::Overwrite),
220            path!("timestamp"),
221            timestamp,
222        );
223
224        log_namespace.insert_vector_metadata(
225            &mut log,
226            log_schema().source_type_key(),
227            path!("source_type"),
228            Bytes::from_static(SOURCE_NAME.as_bytes()),
229        );
230        if log_namespace == LogNamespace::Vector {
231            log.metadata_mut()
232                .value_mut()
233                .insert(path!("vector", "ingest_timestamp"), now);
234        }
235
236        log.into()
237    }
238}