vector_core/event/
metadata.rs

1#![deny(missing_docs)]
2
3use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc, time::Instant};
4
5use derivative::Derivative;
6use lookup::OwnedTargetPath;
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9use vector_common::{EventDataEq, byte_size_of::ByteSizeOf, config::ComponentKey};
10use vrl::{
11    compiler::SecretTarget,
12    value::{KeyString, Kind, Value},
13};
14
15use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, ObjectMap};
16use crate::{
17    config::{LogNamespace, OutputId},
18    schema,
19};
20
21const DATADOG_API_KEY: &str = "datadog_api_key";
22const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token";
23
24/// The event metadata structure is a `Arc` wrapper around the actual metadata to avoid cloning the
25/// underlying data until it becomes necessary to provide a `mut` copy.
26#[derive(Clone, Debug, Derivative, Deserialize, Serialize)]
27#[derivative(PartialEq)]
28pub struct EventMetadata {
29    #[serde(flatten)]
30    pub(super) inner: Arc<Inner>,
31
32    /// The timestamp when the event last entered a transform buffer.
33    #[derivative(PartialEq = "ignore")]
34    #[serde(default, skip)]
35    pub(crate) last_transform_timestamp: Option<Instant>,
36}
37
38/// The actual metadata structure contained by both `struct Metric`
39/// and `struct LogEvent` types.
40#[derive(Clone, Debug, Derivative, Deserialize, Serialize)]
41#[derivative(PartialEq)]
42pub(super) struct Inner {
43    /// Arbitrary data stored with an event
44    #[serde(default = "default_metadata_value")]
45    pub(crate) value: Value,
46
47    /// Storage for secrets
48    #[serde(default)]
49    pub(crate) secrets: Secrets,
50
51    #[serde(default, skip)]
52    pub(crate) finalizers: EventFinalizers,
53
54    /// The id of the source
55    pub(crate) source_id: Option<Arc<ComponentKey>>,
56
57    /// The type of the source
58    pub(crate) source_type: Option<Cow<'static, str>>,
59
60    /// The id of the component this event originated from. This is used to
61    /// determine which schema definition to attach to an event in transforms.
62    /// This should always have a value set for events in transforms. It will always be `None`
63    /// in a source, and there is currently no use-case for reading the value in a sink.
64    pub(crate) upstream_id: Option<Arc<OutputId>>,
65
66    /// An identifier for a globally registered schema definition which provides information about
67    /// the event shape (type information, and semantic meaning of fields).
68    /// This definition is only currently valid for logs, and shouldn't be used for other event types.
69    ///
70    /// TODO(Jean): must not skip serialization to track schemas across restarts.
71    #[serde(default = "default_schema_definition", skip)]
72    pub(crate) schema_definition: Arc<schema::Definition>,
73
74    /// A store of values that may be dropped during the encoding process but may be needed
75    /// later on. The map is indexed by meaning.
76    /// Currently this is just used for the `service`. If the service field is dropped by `only_fields`
77    /// we need to ensure it is still available later on for emitting metrics tagged by the service.
78    /// This field could almost be keyed by `&'static str`, but because it needs to be deserializable
79    /// we have to use `String`.
80    pub(crate) dropped_fields: ObjectMap,
81
82    /// Metadata to track the origin of metrics. This is always `None` for log and trace events.
83    /// Only a small set of Vector sources and transforms explicitly set this field.
84    #[serde(default)]
85    pub(crate) datadog_origin_metadata: Option<DatadogMetricOriginMetadata>,
86
87    /// An internal vector id that can be used to identify this event across all components.
88    #[derivative(PartialEq = "ignore")]
89    pub(crate) source_event_id: Option<Uuid>,
90}
91
92/// Metric Origin metadata for submission to Datadog.
93#[derive(Clone, Default, Debug, Deserialize, PartialEq, Serialize)]
94pub struct DatadogMetricOriginMetadata {
95    /// `OriginProduct`
96    product: Option<u32>,
97    /// `OriginCategory`
98    category: Option<u32>,
99    /// `OriginService`
100    service: Option<u32>,
101}
102
103impl DatadogMetricOriginMetadata {
104    /// Creates a new `DatadogMetricOriginMetadata`.
105    /// When Vector sends out metrics containing the Origin metadata, it should do so with
106    /// all of the fields defined.
107    /// The edge case where the Origin metadata is created within a component and does not
108    /// initially contain all of the metadata fields, is in the `log_to_metric` transform.
109    #[must_use]
110    pub fn new(product: Option<u32>, category: Option<u32>, service: Option<u32>) -> Self {
111        Self {
112            product,
113            category,
114            service,
115        }
116    }
117
118    /// Returns a reference to the `OriginProduct`.
119    pub fn product(&self) -> Option<u32> {
120        self.product
121    }
122
123    /// Returns a reference to the `OriginCategory`.
124    pub fn category(&self) -> Option<u32> {
125        self.category
126    }
127
128    /// Returns a reference to the `OriginService`.
129    pub fn service(&self) -> Option<u32> {
130        self.service
131    }
132}
133
134fn default_metadata_value() -> Value {
135    Value::Object(ObjectMap::new())
136}
137
138impl EventMetadata {
139    /// Creates `EventMetadata` with the given `Value`, and the rest of the fields with default values
140    pub fn default_with_value(value: Value) -> Self {
141        Self {
142            inner: Arc::new(Inner {
143                value,
144                ..Default::default()
145            }),
146            last_transform_timestamp: None,
147        }
148    }
149
150    fn get_mut(&mut self) -> &mut Inner {
151        Arc::make_mut(&mut self.inner)
152    }
153
154    pub(super) fn into_owned(self) -> Inner {
155        Arc::unwrap_or_clone(self.inner)
156    }
157
158    /// Returns a reference to the metadata value
159    pub fn value(&self) -> &Value {
160        &self.inner.value
161    }
162
163    /// Returns a mutable reference to the metadata value
164    pub fn value_mut(&mut self) -> &mut Value {
165        &mut self.get_mut().value
166    }
167
168    /// Returns a reference to the secrets
169    pub fn secrets(&self) -> &Secrets {
170        &self.inner.secrets
171    }
172
173    /// Returns a mutable reference to the secrets
174    pub fn secrets_mut(&mut self) -> &mut Secrets {
175        &mut self.get_mut().secrets
176    }
177
178    /// Returns a reference to the metadata source id.
179    #[must_use]
180    pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
181        self.inner.source_id.as_ref()
182    }
183
184    /// Returns a reference to the metadata source type.
185    #[must_use]
186    pub fn source_type(&self) -> Option<&str> {
187        self.inner.source_type.as_deref()
188    }
189
190    /// Returns a reference to the metadata parent id. This is the `OutputId`
191    /// of the previous component the event was sent through (if any).
192    #[must_use]
193    pub fn upstream_id(&self) -> Option<&OutputId> {
194        self.inner.upstream_id.as_deref()
195    }
196
197    /// Sets the `source_id` in the metadata to the provided value.
198    pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
199        self.get_mut().source_id = Some(source_id);
200    }
201
202    /// Sets the `source_type` in the metadata to the provided value.
203    pub fn set_source_type<S: Into<Cow<'static, str>>>(&mut self, source_type: S) {
204        self.get_mut().source_type = Some(source_type.into());
205    }
206
207    /// Sets the `upstream_id` in the metadata to the provided value.
208    pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
209        self.get_mut().upstream_id = Some(upstream_id);
210    }
211
212    /// Return the datadog API key, if it exists
213    pub fn datadog_api_key(&self) -> Option<Arc<str>> {
214        self.inner.secrets.get(DATADOG_API_KEY).cloned()
215    }
216
217    /// Set the datadog API key to passed value
218    pub fn set_datadog_api_key(&mut self, secret: Arc<str>) {
219        self.get_mut().secrets.insert(DATADOG_API_KEY, secret);
220    }
221
222    /// Return the splunk hec token, if it exists
223    pub fn splunk_hec_token(&self) -> Option<Arc<str>> {
224        self.inner.secrets.get(SPLUNK_HEC_TOKEN).cloned()
225    }
226
227    /// Set the splunk hec token to passed value
228    pub fn set_splunk_hec_token(&mut self, secret: Arc<str>) {
229        self.get_mut().secrets.insert(SPLUNK_HEC_TOKEN, secret);
230    }
231
232    /// Adds the value to the dropped fields list.
233    /// There is currently no way to remove a field from this list, so if a field is dropped
234    /// and then the field is re-added with a new value - the dropped value will still be
235    /// retrieved.
236    pub fn add_dropped_field(&mut self, meaning: KeyString, value: Value) {
237        self.get_mut().dropped_fields.insert(meaning, value);
238    }
239
240    /// Fetches the dropped field by meaning.
241    pub fn dropped_field(&self, meaning: impl AsRef<str>) -> Option<&Value> {
242        self.inner.dropped_fields.get(meaning.as_ref())
243    }
244
245    /// Returns a reference to the `DatadogMetricOriginMetadata`.
246    pub fn datadog_origin_metadata(&self) -> Option<&DatadogMetricOriginMetadata> {
247        self.inner.datadog_origin_metadata.as_ref()
248    }
249
250    /// Returns a reference to the event id.
251    pub fn source_event_id(&self) -> Option<Uuid> {
252        self.inner.source_event_id
253    }
254
255    /// Returns the timestamp of the last transform buffer enqueue operation, if it exists.
256    #[must_use]
257    pub fn last_transform_timestamp(&self) -> Option<Instant> {
258        self.last_transform_timestamp
259    }
260
261    /// Sets the transform enqueue timestamp to the provided value.
262    pub fn set_last_transform_timestamp(&mut self, timestamp: Instant) {
263        self.last_transform_timestamp = Some(timestamp);
264    }
265}
266
267impl Default for Inner {
268    fn default() -> Self {
269        Self {
270            value: Value::Object(ObjectMap::new()),
271            secrets: Secrets::new(),
272            finalizers: Default::default(),
273            schema_definition: default_schema_definition(),
274            source_id: None,
275            source_type: None,
276            upstream_id: None,
277            dropped_fields: ObjectMap::new(),
278            datadog_origin_metadata: None,
279            source_event_id: Some(Uuid::new_v4()),
280        }
281    }
282}
283
284impl Default for EventMetadata {
285    fn default() -> Self {
286        Self {
287            inner: Arc::new(Inner::default()),
288            last_transform_timestamp: None,
289        }
290    }
291}
292
293pub(super) fn default_schema_definition() -> Arc<schema::Definition> {
294    Arc::new(schema::Definition::new_with_default_metadata(
295        Kind::any(),
296        [LogNamespace::Legacy, LogNamespace::Vector],
297    ))
298}
299
300impl ByteSizeOf for EventMetadata {
301    fn allocated_bytes(&self) -> usize {
302        // NOTE we don't count the `str` here because it's allocated somewhere
303        // else. We're just moving around the pointer, which is already captured
304        // by `ByteSizeOf::size_of`.
305        self.inner.finalizers.allocated_bytes()
306    }
307}
308
309impl EventMetadata {
310    /// Replaces the existing event finalizers with the given one.
311    #[must_use]
312    pub fn with_finalizer(mut self, finalizer: EventFinalizer) -> Self {
313        self.get_mut().finalizers = EventFinalizers::new(finalizer);
314        self
315    }
316
317    /// Replaces the existing event finalizers with the given ones.
318    #[must_use]
319    pub fn with_finalizers(mut self, finalizers: EventFinalizers) -> Self {
320        self.get_mut().finalizers = finalizers;
321        self
322    }
323
324    /// Replace the finalizer with a new one created from the given batch notifier.
325    #[must_use]
326    pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
327        self.with_finalizer(EventFinalizer::new(batch.clone()))
328    }
329
330    /// Replace the finalizer with a new one created from the given optional batch notifier.
331    #[must_use]
332    pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
333        match batch {
334            Some(batch) => self.with_finalizer(EventFinalizer::new(batch.clone())),
335            None => self,
336        }
337    }
338
339    /// Replace the schema definition with the given one.
340    #[must_use]
341    pub fn with_schema_definition(mut self, schema_definition: &Arc<schema::Definition>) -> Self {
342        self.get_mut().schema_definition = Arc::clone(schema_definition);
343        self
344    }
345
346    /// Replaces the existing `source_type` with the given one.
347    #[must_use]
348    pub fn with_source_type<S: Into<Cow<'static, str>>>(mut self, source_type: S) -> Self {
349        self.get_mut().source_type = Some(source_type.into());
350        self
351    }
352
353    /// Replaces the existing `DatadogMetricOriginMetadata` with the given one.
354    #[must_use]
355    pub fn with_origin_metadata(mut self, origin_metadata: DatadogMetricOriginMetadata) -> Self {
356        self.get_mut().datadog_origin_metadata = Some(origin_metadata);
357        self
358    }
359
360    /// Replaces the existing `source_event_id` with the given one.
361    #[must_use]
362    pub fn with_source_event_id(mut self, source_event_id: Option<Uuid>) -> Self {
363        self.get_mut().source_event_id = source_event_id;
364        self
365    }
366
367    /// Merge the other `EventMetadata` into this.
368    /// If a Datadog API key is not set in `self`, the one from `other` will be used.
369    /// If a Splunk HEC token is not set in `self`, the one from `other` will be used.
370    pub fn merge(&mut self, other: Self) {
371        let other_timestamp = other.last_transform_timestamp;
372        let inner = self.get_mut();
373        let other = other.into_owned();
374        inner.finalizers.merge(other.finalizers);
375        inner.secrets.merge(other.secrets);
376
377        // Update `source_event_id` if necessary.
378        if inner.source_event_id.is_none() {
379            inner.source_event_id = other.source_event_id;
380        }
381
382        // Keep the earliest `last_transform_timestamp` for accurate latency measurement.
383        match (self.last_transform_timestamp, other_timestamp) {
384            (Some(self_ts), Some(other_ts)) => {
385                if other_ts < self_ts {
386                    self.last_transform_timestamp = Some(other_ts);
387                }
388            }
389            (None, Some(other_ts)) => {
390                self.last_transform_timestamp = Some(other_ts);
391            }
392            _ => {}
393        }
394    }
395
396    /// Update the finalizer(s) status.
397    pub fn update_status(&self, status: EventStatus) {
398        self.inner.finalizers.update_status(status);
399    }
400
401    /// Update the finalizers' sources.
402    pub fn update_sources(&mut self) {
403        self.get_mut().finalizers.update_sources();
404    }
405
406    /// Gets a reference to the event finalizers.
407    pub fn finalizers(&self) -> &EventFinalizers {
408        &self.inner.finalizers
409    }
410
411    /// Add a new event finalizer to the existing list of event finalizers.
412    pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
413        self.get_mut().finalizers.add(finalizer);
414    }
415
416    /// Consumes all event finalizers and returns them, leaving the list of event finalizers empty.
417    pub fn take_finalizers(&mut self) -> EventFinalizers {
418        std::mem::take(&mut self.get_mut().finalizers)
419    }
420
421    /// Merges the given event finalizers into the existing list of event finalizers.
422    pub fn merge_finalizers(&mut self, finalizers: EventFinalizers) {
423        self.get_mut().finalizers.merge(finalizers);
424    }
425
426    /// Get the schema definition.
427    pub fn schema_definition(&self) -> &Arc<schema::Definition> {
428        &self.inner.schema_definition
429    }
430
431    /// Set the schema definition.
432    pub fn set_schema_definition(&mut self, definition: &Arc<schema::Definition>) {
433        self.get_mut().schema_definition = Arc::clone(definition);
434    }
435
436    /// Helper function to add a semantic meaning to the schema definition.
437    ///
438    /// This replaces the common code sequence of:
439    ///
440    /// ```ignore
441    /// let new_schema = log_event
442    ///     .metadata()
443    ///     .schema_definition()
444    ///     .as_ref()
445    ///     .clone()
446    ///     .with_meaning(target_path, meaning);
447    /// log_event
448    ///     .metadata_mut()
449    ///     .set_schema_definition(new_schema);
450    /// ````
451    pub fn add_schema_meaning(&mut self, target_path: OwnedTargetPath, meaning: &str) {
452        let schema = Arc::make_mut(&mut self.get_mut().schema_definition);
453        schema.add_meaning(target_path, meaning);
454    }
455}
456
457impl EventDataEq for EventMetadata {
458    fn event_data_eq(&self, _other: &Self) -> bool {
459        // Don't compare the metadata, it is not "event data".
460        true
461    }
462}
463
464/// This is a simple wrapper to allow attaching `EventMetadata` to any
465/// other type. This is primarily used in conversion functions, such as
466/// `impl From<X> for WithMetadata<Y>`.
467pub struct WithMetadata<T> {
468    /// The data item being wrapped.
469    pub data: T,
470    /// The additional metadata sidecar.
471    pub metadata: EventMetadata,
472}
473
474impl<T> WithMetadata<T> {
475    /// Convert from one wrapped type to another, where the underlying
476    /// type allows direct conversion.
477    // We would like to `impl From` instead, but this fails due to
478    // conflicting implementations of `impl<T> From<T> for T`.
479    pub fn into<T1: From<T>>(self) -> WithMetadata<T1> {
480        WithMetadata {
481            data: T1::from(self.data),
482            metadata: self.metadata,
483        }
484    }
485}
486
487/// A container that holds secrets.
488#[derive(Clone, Default, Deserialize, Eq, PartialEq, PartialOrd, Serialize)]
489pub struct Secrets(BTreeMap<String, Arc<str>>);
490
491impl fmt::Debug for Secrets {
492    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493        let mut map = f.debug_map();
494        for key in self.0.keys() {
495            map.entry(key, &"<redacted secret>");
496        }
497        map.finish()
498    }
499}
500
501impl Secrets {
502    /// Creates a new, empty container.
503    #[must_use]
504    pub fn new() -> Self {
505        Self(BTreeMap::new())
506    }
507
508    /// Returns `true` if the container contains no secrets.
509    pub fn is_empty(&self) -> bool {
510        self.0.is_empty()
511    }
512
513    /// Gets a secret by its name.
514    #[must_use]
515    pub fn get(&self, key: &str) -> Option<&Arc<str>> {
516        self.0.get(key)
517    }
518
519    /// Inserts a new secret into the container.
520    pub fn insert(&mut self, key: impl Into<String>, value: impl Into<Arc<str>>) {
521        self.0.insert(key.into(), value.into());
522    }
523
524    /// Removes a secret
525    pub fn remove(&mut self, key: &str) {
526        self.0.remove(key);
527    }
528
529    /// Merged both together. If there are collisions, the value from `self` is kept.
530    pub fn merge(&mut self, other: Self) {
531        for (key, value) in other.0 {
532            self.0.entry(key).or_insert(value);
533        }
534    }
535}
536
537impl SecretTarget for Secrets {
538    fn get_secret(&self, key: &str) -> Option<&str> {
539        self.get(key).map(AsRef::as_ref)
540    }
541
542    fn insert_secret(&mut self, key: &str, value: &str) {
543        self.insert(key, value);
544    }
545
546    fn remove_secret(&mut self, key: &str) {
547        self.remove(key);
548    }
549}
550
551impl IntoIterator for Secrets {
552    type Item = (String, Arc<str>);
553    type IntoIter = std::collections::btree_map::IntoIter<String, Arc<str>>;
554
555    fn into_iter(self) -> Self::IntoIter {
556        self.0.into_iter()
557    }
558}
559
560#[cfg(test)]
561mod test {
562    use super::*;
563
564    const SECRET: &str = "secret";
565    const SECRET2: &str = "secret2";
566
567    #[test]
568    fn metadata_hardcoded_secrets_get_set() {
569        let mut metadata = EventMetadata::default();
570        metadata.set_datadog_api_key(Arc::from(SECRET));
571        metadata.set_splunk_hec_token(Arc::from(SECRET2));
572        assert_eq!(metadata.datadog_api_key().unwrap().as_ref(), SECRET);
573        assert_eq!(metadata.splunk_hec_token().unwrap().as_ref(), SECRET2);
574    }
575
576    #[test]
577    fn secrets_merge() {
578        let mut a = Secrets::new();
579        a.insert("key-a", "value-a1");
580        a.insert("key-b", "value-b1");
581
582        let mut b = Secrets::new();
583        b.insert("key-b", "value-b2");
584        b.insert("key-c", "value-c2");
585
586        a.merge(b);
587
588        assert_eq!(a.get("key-a").unwrap().as_ref(), "value-a1");
589        assert_eq!(a.get("key-b").unwrap().as_ref(), "value-b1");
590        assert_eq!(a.get("key-c").unwrap().as_ref(), "value-c2");
591    }
592
593    #[test]
594    fn metadata_source_event_id_merging() {
595        let m1 = EventMetadata::default();
596        let m2 = EventMetadata::default();
597
598        // Always maintain the original source event id when merging, similar to how we handle other metadata.
599        {
600            let mut merged = m1.clone();
601            merged.merge(m2.clone());
602            assert_eq!(merged.source_event_id(), m1.source_event_id());
603        }
604
605        {
606            let mut merged = m2.clone();
607            merged.merge(m1.clone());
608            assert_eq!(merged.source_event_id(), m2.source_event_id());
609        }
610    }
611}