vector_core/event/
metadata.rs

1#![deny(missing_docs)]
2
3use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc};
4
5use derivative::Derivative;
6use lookup::OwnedTargetPath;
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9use vector_common::{byte_size_of::ByteSizeOf, config::ComponentKey, EventDataEq};
10use vrl::{
11    compiler::SecretTarget,
12    value::{KeyString, Kind, Value},
13};
14
15use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, ObjectMap};
16use crate::{
17    config::{LogNamespace, OutputId},
18    schema,
19};
20
21const DATADOG_API_KEY: &str = "datadog_api_key";
22const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token";
23
24/// The event metadata structure is a `Arc` wrapper around the actual metadata to avoid cloning the
25/// underlying data until it becomes necessary to provide a `mut` copy.
26#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
27pub struct EventMetadata(pub(super) Arc<Inner>);
28
29/// The actual metadata structure contained by both `struct Metric`
30/// and `struct LogEvent` types.
31#[derive(Clone, Debug, Derivative, Deserialize, Serialize)]
32#[derivative(PartialEq)]
33pub(super) struct Inner {
34    /// Arbitrary data stored with an event
35    #[serde(default = "default_metadata_value")]
36    pub(crate) value: Value,
37
38    /// Storage for secrets
39    #[serde(default)]
40    pub(crate) secrets: Secrets,
41
42    #[serde(default, skip)]
43    finalizers: EventFinalizers,
44
45    /// The id of the source
46    pub(crate) source_id: Option<Arc<ComponentKey>>,
47
48    /// The type of the source
49    pub(crate) source_type: Option<Cow<'static, str>>,
50
51    /// The id of the component this event originated from. This is used to
52    /// determine which schema definition to attach to an event in transforms.
53    /// This should always have a value set for events in transforms. It will always be `None`
54    /// in a source, and there is currently no use-case for reading the value in a sink.
55    pub(crate) upstream_id: Option<Arc<OutputId>>,
56
57    /// An identifier for a globally registered schema definition which provides information about
58    /// the event shape (type information, and semantic meaning of fields).
59    /// This definition is only currently valid for logs, and shouldn't be used for other event types.
60    ///
61    /// TODO(Jean): must not skip serialization to track schemas across restarts.
62    #[serde(default = "default_schema_definition", skip)]
63    schema_definition: Arc<schema::Definition>,
64
65    /// A store of values that may be dropped during the encoding process but may be needed
66    /// later on. The map is indexed by meaning.
67    /// Currently this is just used for the `service`. If the service field is dropped by `only_fields`
68    /// we need to ensure it is still available later on for emitting metrics tagged by the service.
69    /// This field could almost be keyed by `&'static str`, but because it needs to be deserializable
70    /// we have to use `String`.
71    dropped_fields: ObjectMap,
72
73    /// Metadata to track the origin of metrics. This is always `None` for log and trace events.
74    /// Only a small set of Vector sources and transforms explicitly set this field.
75    #[serde(default)]
76    pub(crate) datadog_origin_metadata: Option<DatadogMetricOriginMetadata>,
77
78    /// An internal vector id that can be used to identify this event across all components.
79    #[derivative(PartialEq = "ignore")]
80    pub(crate) source_event_id: Option<Uuid>,
81}
82
83/// Metric Origin metadata for submission to Datadog.
84#[derive(Clone, Default, Debug, Deserialize, PartialEq, Serialize)]
85pub struct DatadogMetricOriginMetadata {
86    /// `OriginProduct`
87    product: Option<u32>,
88    /// `OriginCategory`
89    category: Option<u32>,
90    /// `OriginService`
91    service: Option<u32>,
92}
93
94impl DatadogMetricOriginMetadata {
95    /// Creates a new `DatadogMetricOriginMetadata`.
96    /// When Vector sends out metrics containing the Origin metadata, it should do so with
97    /// all of the fields defined.
98    /// The edge case where the Origin metadata is created within a component and does not
99    /// initially contain all of the metadata fields, is in the `log_to_metric` transform.
100    #[must_use]
101    pub fn new(product: Option<u32>, category: Option<u32>, service: Option<u32>) -> Self {
102        Self {
103            product,
104            category,
105            service,
106        }
107    }
108
109    /// Returns a reference to the `OriginProduct`.
110    pub fn product(&self) -> Option<u32> {
111        self.product
112    }
113
114    /// Returns a reference to the `OriginCategory`.
115    pub fn category(&self) -> Option<u32> {
116        self.category
117    }
118
119    /// Returns a reference to the `OriginService`.
120    pub fn service(&self) -> Option<u32> {
121        self.service
122    }
123}
124
125fn default_metadata_value() -> Value {
126    Value::Object(ObjectMap::new())
127}
128
129impl EventMetadata {
130    /// Creates `EventMetadata` with the given `Value`, and the rest of the fields with default values
131    pub fn default_with_value(value: Value) -> Self {
132        Self(Arc::new(Inner {
133            value,
134            ..Default::default()
135        }))
136    }
137
138    fn get_mut(&mut self) -> &mut Inner {
139        Arc::make_mut(&mut self.0)
140    }
141
142    pub(super) fn into_owned(self) -> Inner {
143        Arc::unwrap_or_clone(self.0)
144    }
145
146    /// Returns a reference to the metadata value
147    pub fn value(&self) -> &Value {
148        &self.0.value
149    }
150
151    /// Returns a mutable reference to the metadata value
152    pub fn value_mut(&mut self) -> &mut Value {
153        &mut self.get_mut().value
154    }
155
156    /// Returns a reference to the secrets
157    pub fn secrets(&self) -> &Secrets {
158        &self.0.secrets
159    }
160
161    /// Returns a mutable reference to the secrets
162    pub fn secrets_mut(&mut self) -> &mut Secrets {
163        &mut self.get_mut().secrets
164    }
165
166    /// Returns a reference to the metadata source id.
167    #[must_use]
168    pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
169        self.0.source_id.as_ref()
170    }
171
172    /// Returns a reference to the metadata source type.
173    #[must_use]
174    pub fn source_type(&self) -> Option<&str> {
175        self.0.source_type.as_deref()
176    }
177
178    /// Returns a reference to the metadata parent id. This is the `OutputId`
179    /// of the previous component the event was sent through (if any).
180    #[must_use]
181    pub fn upstream_id(&self) -> Option<&OutputId> {
182        self.0.upstream_id.as_deref()
183    }
184
185    /// Sets the `source_id` in the metadata to the provided value.
186    pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
187        self.get_mut().source_id = Some(source_id);
188    }
189
190    /// Sets the `source_type` in the metadata to the provided value.
191    pub fn set_source_type<S: Into<Cow<'static, str>>>(&mut self, source_type: S) {
192        self.get_mut().source_type = Some(source_type.into());
193    }
194
195    /// Sets the `upstream_id` in the metadata to the provided value.
196    pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
197        self.get_mut().upstream_id = Some(upstream_id);
198    }
199
200    /// Return the datadog API key, if it exists
201    pub fn datadog_api_key(&self) -> Option<Arc<str>> {
202        self.0.secrets.get(DATADOG_API_KEY).cloned()
203    }
204
205    /// Set the datadog API key to passed value
206    pub fn set_datadog_api_key(&mut self, secret: Arc<str>) {
207        self.get_mut().secrets.insert(DATADOG_API_KEY, secret);
208    }
209
210    /// Return the splunk hec token, if it exists
211    pub fn splunk_hec_token(&self) -> Option<Arc<str>> {
212        self.0.secrets.get(SPLUNK_HEC_TOKEN).cloned()
213    }
214
215    /// Set the splunk hec token to passed value
216    pub fn set_splunk_hec_token(&mut self, secret: Arc<str>) {
217        self.get_mut().secrets.insert(SPLUNK_HEC_TOKEN, secret);
218    }
219
220    /// Adds the value to the dropped fields list.
221    /// There is currently no way to remove a field from this list, so if a field is dropped
222    /// and then the field is re-added with a new value - the dropped value will still be
223    /// retrieved.
224    pub fn add_dropped_field(&mut self, meaning: KeyString, value: Value) {
225        self.get_mut().dropped_fields.insert(meaning, value);
226    }
227
228    /// Fetches the dropped field by meaning.
229    pub fn dropped_field(&self, meaning: impl AsRef<str>) -> Option<&Value> {
230        self.0.dropped_fields.get(meaning.as_ref())
231    }
232
233    /// Returns a reference to the `DatadogMetricOriginMetadata`.
234    pub fn datadog_origin_metadata(&self) -> Option<&DatadogMetricOriginMetadata> {
235        self.0.datadog_origin_metadata.as_ref()
236    }
237
238    /// Returns a reference to the event id.
239    pub fn source_event_id(&self) -> Option<Uuid> {
240        self.0.source_event_id
241    }
242}
243
244impl Default for Inner {
245    fn default() -> Self {
246        Self {
247            value: Value::Object(ObjectMap::new()),
248            secrets: Secrets::new(),
249            finalizers: Default::default(),
250            schema_definition: default_schema_definition(),
251            source_id: None,
252            source_type: None,
253            upstream_id: None,
254            dropped_fields: ObjectMap::new(),
255            datadog_origin_metadata: None,
256            source_event_id: Some(Uuid::now_v7()),
257        }
258    }
259}
260
261impl Default for EventMetadata {
262    fn default() -> Self {
263        Self(Arc::new(Inner::default()))
264    }
265}
266
267fn default_schema_definition() -> Arc<schema::Definition> {
268    Arc::new(schema::Definition::new_with_default_metadata(
269        Kind::any(),
270        [LogNamespace::Legacy, LogNamespace::Vector],
271    ))
272}
273
274impl ByteSizeOf for EventMetadata {
275    fn allocated_bytes(&self) -> usize {
276        // NOTE we don't count the `str` here because it's allocated somewhere
277        // else. We're just moving around the pointer, which is already captured
278        // by `ByteSizeOf::size_of`.
279        self.0.finalizers.allocated_bytes()
280    }
281}
282
283impl EventMetadata {
284    /// Replaces the existing event finalizers with the given one.
285    #[must_use]
286    pub fn with_finalizer(mut self, finalizer: EventFinalizer) -> Self {
287        self.get_mut().finalizers = EventFinalizers::new(finalizer);
288        self
289    }
290
291    /// Replaces the existing event finalizers with the given ones.
292    #[must_use]
293    pub fn with_finalizers(mut self, finalizers: EventFinalizers) -> Self {
294        self.get_mut().finalizers = finalizers;
295        self
296    }
297
298    /// Replace the finalizer with a new one created from the given batch notifier.
299    #[must_use]
300    pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
301        self.with_finalizer(EventFinalizer::new(batch.clone()))
302    }
303
304    /// Replace the finalizer with a new one created from the given optional batch notifier.
305    #[must_use]
306    pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
307        match batch {
308            Some(batch) => self.with_finalizer(EventFinalizer::new(batch.clone())),
309            None => self,
310        }
311    }
312
313    /// Replace the schema definition with the given one.
314    #[must_use]
315    pub fn with_schema_definition(mut self, schema_definition: &Arc<schema::Definition>) -> Self {
316        self.get_mut().schema_definition = Arc::clone(schema_definition);
317        self
318    }
319
320    /// Replaces the existing `source_type` with the given one.
321    #[must_use]
322    pub fn with_source_type<S: Into<Cow<'static, str>>>(mut self, source_type: S) -> Self {
323        self.get_mut().source_type = Some(source_type.into());
324        self
325    }
326
327    /// Replaces the existing `DatadogMetricOriginMetadata` with the given one.
328    #[must_use]
329    pub fn with_origin_metadata(mut self, origin_metadata: DatadogMetricOriginMetadata) -> Self {
330        self.get_mut().datadog_origin_metadata = Some(origin_metadata);
331        self
332    }
333
334    /// Replaces the existing `source_event_id` with the given one.
335    #[must_use]
336    pub fn with_source_event_id(mut self, source_event_id: Option<Uuid>) -> Self {
337        self.get_mut().source_event_id = source_event_id;
338        self
339    }
340
341    /// Merge the other `EventMetadata` into this.
342    /// If a Datadog API key is not set in `self`, the one from `other` will be used.
343    /// If a Splunk HEC token is not set in `self`, the one from `other` will be used.
344    pub fn merge(&mut self, other: Self) {
345        let inner = self.get_mut();
346        let other = other.into_owned();
347        inner.finalizers.merge(other.finalizers);
348        inner.secrets.merge(other.secrets);
349
350        // Update `source_event_id` if necessary.
351        match (inner.source_event_id, other.source_event_id) {
352            (None, Some(id)) => {
353                inner.source_event_id = Some(id);
354            }
355            (Some(uuid1), Some(uuid2)) if uuid2 < uuid1 => {
356                inner.source_event_id = Some(uuid2);
357            }
358            _ => {} // Keep the existing value.
359        }
360    }
361
362    /// Update the finalizer(s) status.
363    pub fn update_status(&self, status: EventStatus) {
364        self.0.finalizers.update_status(status);
365    }
366
367    /// Update the finalizers' sources.
368    pub fn update_sources(&mut self) {
369        self.get_mut().finalizers.update_sources();
370    }
371
372    /// Gets a reference to the event finalizers.
373    pub fn finalizers(&self) -> &EventFinalizers {
374        &self.0.finalizers
375    }
376
377    /// Add a new event finalizer to the existing list of event finalizers.
378    pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
379        self.get_mut().finalizers.add(finalizer);
380    }
381
382    /// Consumes all event finalizers and returns them, leaving the list of event finalizers empty.
383    pub fn take_finalizers(&mut self) -> EventFinalizers {
384        std::mem::take(&mut self.get_mut().finalizers)
385    }
386
387    /// Merges the given event finalizers into the existing list of event finalizers.
388    pub fn merge_finalizers(&mut self, finalizers: EventFinalizers) {
389        self.get_mut().finalizers.merge(finalizers);
390    }
391
392    /// Get the schema definition.
393    pub fn schema_definition(&self) -> &Arc<schema::Definition> {
394        &self.0.schema_definition
395    }
396
397    /// Set the schema definition.
398    pub fn set_schema_definition(&mut self, definition: &Arc<schema::Definition>) {
399        self.get_mut().schema_definition = Arc::clone(definition);
400    }
401
402    /// Helper function to add a semantic meaning to the schema definition.
403    ///
404    /// This replaces the common code sequence of:
405    ///
406    /// ```ignore
407    /// let new_schema = log_event
408    ///     .metadata()
409    ///     .schema_definition()
410    ///     .as_ref()
411    ///     .clone()
412    ///     .with_meaning(target_path, meaning);
413    /// log_event
414    ///     .metadata_mut()
415    ///     .set_schema_definition(new_schema);
416    /// ````
417    pub fn add_schema_meaning(&mut self, target_path: OwnedTargetPath, meaning: &str) {
418        let schema = Arc::make_mut(&mut self.get_mut().schema_definition);
419        schema.add_meaning(target_path, meaning);
420    }
421}
422
423impl EventDataEq for EventMetadata {
424    fn event_data_eq(&self, _other: &Self) -> bool {
425        // Don't compare the metadata, it is not "event data".
426        true
427    }
428}
429
430/// This is a simple wrapper to allow attaching `EventMetadata` to any
431/// other type. This is primarily used in conversion functions, such as
432/// `impl From<X> for WithMetadata<Y>`.
433pub struct WithMetadata<T> {
434    /// The data item being wrapped.
435    pub data: T,
436    /// The additional metadata sidecar.
437    pub metadata: EventMetadata,
438}
439
440impl<T> WithMetadata<T> {
441    /// Convert from one wrapped type to another, where the underlying
442    /// type allows direct conversion.
443    // We would like to `impl From` instead, but this fails due to
444    // conflicting implementations of `impl<T> From<T> for T`.
445    pub fn into<T1: From<T>>(self) -> WithMetadata<T1> {
446        WithMetadata {
447            data: T1::from(self.data),
448            metadata: self.metadata,
449        }
450    }
451}
452
453/// A container that holds secrets.
454#[derive(Clone, Default, Deserialize, Eq, PartialEq, PartialOrd, Serialize)]
455pub struct Secrets(BTreeMap<String, Arc<str>>);
456
457impl fmt::Debug for Secrets {
458    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459        let mut map = f.debug_map();
460        for key in self.0.keys() {
461            map.entry(key, &"<redacted secret>");
462        }
463        map.finish()
464    }
465}
466
467impl Secrets {
468    /// Creates a new, empty container.
469    #[must_use]
470    pub fn new() -> Self {
471        Self(BTreeMap::new())
472    }
473
474    /// Returns `true` if the container contains no secrets.
475    pub fn is_empty(&self) -> bool {
476        self.0.is_empty()
477    }
478
479    /// Gets a secret by its name.
480    #[must_use]
481    pub fn get(&self, key: &str) -> Option<&Arc<str>> {
482        self.0.get(key)
483    }
484
485    /// Inserts a new secret into the container.
486    pub fn insert(&mut self, key: impl Into<String>, value: impl Into<Arc<str>>) {
487        self.0.insert(key.into(), value.into());
488    }
489
490    /// Removes a secret
491    pub fn remove(&mut self, key: &str) {
492        self.0.remove(key);
493    }
494
495    /// Merged both together. If there are collisions, the value from `self` is kept.
496    pub fn merge(&mut self, other: Self) {
497        for (key, value) in other.0 {
498            self.0.entry(key).or_insert(value);
499        }
500    }
501}
502
503impl SecretTarget for Secrets {
504    fn get_secret(&self, key: &str) -> Option<&str> {
505        self.get(key).map(AsRef::as_ref)
506    }
507
508    fn insert_secret(&mut self, key: &str, value: &str) {
509        self.insert(key, value);
510    }
511
512    fn remove_secret(&mut self, key: &str) {
513        self.remove(key);
514    }
515}
516
517impl IntoIterator for Secrets {
518    type Item = (String, Arc<str>);
519    type IntoIter = std::collections::btree_map::IntoIter<String, Arc<str>>;
520
521    fn into_iter(self) -> Self::IntoIter {
522        self.0.into_iter()
523    }
524}
525
526#[cfg(test)]
527mod test {
528    use super::*;
529
530    const SECRET: &str = "secret";
531    const SECRET2: &str = "secret2";
532
533    #[test]
534    fn metadata_hardcoded_secrets_get_set() {
535        let mut metadata = EventMetadata::default();
536        metadata.set_datadog_api_key(Arc::from(SECRET));
537        metadata.set_splunk_hec_token(Arc::from(SECRET2));
538        assert_eq!(metadata.datadog_api_key().unwrap().as_ref(), SECRET);
539        assert_eq!(metadata.splunk_hec_token().unwrap().as_ref(), SECRET2);
540    }
541
542    #[test]
543    fn secrets_merge() {
544        let mut a = Secrets::new();
545        a.insert("key-a", "value-a1");
546        a.insert("key-b", "value-b1");
547
548        let mut b = Secrets::new();
549        b.insert("key-b", "value-b2");
550        b.insert("key-c", "value-c2");
551
552        a.merge(b);
553
554        assert_eq!(a.get("key-a").unwrap().as_ref(), "value-a1");
555        assert_eq!(a.get("key-b").unwrap().as_ref(), "value-b1");
556        assert_eq!(a.get("key-c").unwrap().as_ref(), "value-c2");
557    }
558
559    #[test]
560    fn metadata_source_event_id_merging() {
561        let m1 = EventMetadata::default();
562        let m2 = EventMetadata::default();
563
564        {
565            let mut merged = m1.clone();
566            merged.merge(m2.clone());
567            assert_eq!(merged.source_event_id(), m1.source_event_id());
568        }
569
570        {
571            let mut merged = m2.clone();
572            merged.merge(m1.clone());
573            assert_eq!(merged.source_event_id(), m1.source_event_id());
574        }
575    }
576}