vector_core/event/
metadata.rs

1#![deny(missing_docs)]
2
3use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc};
4
5use derivative::Derivative;
6use lookup::OwnedTargetPath;
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9use vector_common::{EventDataEq, byte_size_of::ByteSizeOf, config::ComponentKey};
10use vrl::{
11    compiler::SecretTarget,
12    value::{KeyString, Kind, Value},
13};
14
15use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, ObjectMap};
16use crate::{
17    config::{LogNamespace, OutputId},
18    schema,
19};
20
21const DATADOG_API_KEY: &str = "datadog_api_key";
22const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token";
23
24/// The event metadata structure is a `Arc` wrapper around the actual metadata to avoid cloning the
25/// underlying data until it becomes necessary to provide a `mut` copy.
26#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
27pub struct EventMetadata(pub(super) Arc<Inner>);
28
29/// The actual metadata structure contained by both `struct Metric`
30/// and `struct LogEvent` types.
31#[derive(Clone, Debug, Derivative, Deserialize, Serialize)]
32#[derivative(PartialEq)]
33pub(super) struct Inner {
34    /// Arbitrary data stored with an event
35    #[serde(default = "default_metadata_value")]
36    pub(crate) value: Value,
37
38    /// Storage for secrets
39    #[serde(default)]
40    pub(crate) secrets: Secrets,
41
42    #[serde(default, skip)]
43    pub(crate) finalizers: EventFinalizers,
44
45    /// The id of the source
46    pub(crate) source_id: Option<Arc<ComponentKey>>,
47
48    /// The type of the source
49    pub(crate) source_type: Option<Cow<'static, str>>,
50
51    /// The id of the component this event originated from. This is used to
52    /// determine which schema definition to attach to an event in transforms.
53    /// This should always have a value set for events in transforms. It will always be `None`
54    /// in a source, and there is currently no use-case for reading the value in a sink.
55    pub(crate) upstream_id: Option<Arc<OutputId>>,
56
57    /// An identifier for a globally registered schema definition which provides information about
58    /// the event shape (type information, and semantic meaning of fields).
59    /// This definition is only currently valid for logs, and shouldn't be used for other event types.
60    ///
61    /// TODO(Jean): must not skip serialization to track schemas across restarts.
62    #[serde(default = "default_schema_definition", skip)]
63    pub(crate) schema_definition: Arc<schema::Definition>,
64
65    /// A store of values that may be dropped during the encoding process but may be needed
66    /// later on. The map is indexed by meaning.
67    /// Currently this is just used for the `service`. If the service field is dropped by `only_fields`
68    /// we need to ensure it is still available later on for emitting metrics tagged by the service.
69    /// This field could almost be keyed by `&'static str`, but because it needs to be deserializable
70    /// we have to use `String`.
71    pub(crate) dropped_fields: ObjectMap,
72
73    /// Metadata to track the origin of metrics. This is always `None` for log and trace events.
74    /// Only a small set of Vector sources and transforms explicitly set this field.
75    #[serde(default)]
76    pub(crate) datadog_origin_metadata: Option<DatadogMetricOriginMetadata>,
77
78    /// An internal vector id that can be used to identify this event across all components.
79    #[derivative(PartialEq = "ignore")]
80    pub(crate) source_event_id: Option<Uuid>,
81}
82
83/// Metric Origin metadata for submission to Datadog.
84#[derive(Clone, Default, Debug, Deserialize, PartialEq, Serialize)]
85pub struct DatadogMetricOriginMetadata {
86    /// `OriginProduct`
87    product: Option<u32>,
88    /// `OriginCategory`
89    category: Option<u32>,
90    /// `OriginService`
91    service: Option<u32>,
92}
93
94impl DatadogMetricOriginMetadata {
95    /// Creates a new `DatadogMetricOriginMetadata`.
96    /// When Vector sends out metrics containing the Origin metadata, it should do so with
97    /// all of the fields defined.
98    /// The edge case where the Origin metadata is created within a component and does not
99    /// initially contain all of the metadata fields, is in the `log_to_metric` transform.
100    #[must_use]
101    pub fn new(product: Option<u32>, category: Option<u32>, service: Option<u32>) -> Self {
102        Self {
103            product,
104            category,
105            service,
106        }
107    }
108
109    /// Returns a reference to the `OriginProduct`.
110    pub fn product(&self) -> Option<u32> {
111        self.product
112    }
113
114    /// Returns a reference to the `OriginCategory`.
115    pub fn category(&self) -> Option<u32> {
116        self.category
117    }
118
119    /// Returns a reference to the `OriginService`.
120    pub fn service(&self) -> Option<u32> {
121        self.service
122    }
123}
124
125fn default_metadata_value() -> Value {
126    Value::Object(ObjectMap::new())
127}
128
129impl EventMetadata {
130    /// Creates `EventMetadata` with the given `Value`, and the rest of the fields with default values
131    pub fn default_with_value(value: Value) -> Self {
132        Self(Arc::new(Inner {
133            value,
134            ..Default::default()
135        }))
136    }
137
138    fn get_mut(&mut self) -> &mut Inner {
139        Arc::make_mut(&mut self.0)
140    }
141
142    pub(super) fn into_owned(self) -> Inner {
143        Arc::unwrap_or_clone(self.0)
144    }
145
146    /// Returns a reference to the metadata value
147    pub fn value(&self) -> &Value {
148        &self.0.value
149    }
150
151    /// Returns a mutable reference to the metadata value
152    pub fn value_mut(&mut self) -> &mut Value {
153        &mut self.get_mut().value
154    }
155
156    /// Returns a reference to the secrets
157    pub fn secrets(&self) -> &Secrets {
158        &self.0.secrets
159    }
160
161    /// Returns a mutable reference to the secrets
162    pub fn secrets_mut(&mut self) -> &mut Secrets {
163        &mut self.get_mut().secrets
164    }
165
166    /// Returns a reference to the metadata source id.
167    #[must_use]
168    pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
169        self.0.source_id.as_ref()
170    }
171
172    /// Returns a reference to the metadata source type.
173    #[must_use]
174    pub fn source_type(&self) -> Option<&str> {
175        self.0.source_type.as_deref()
176    }
177
178    /// Returns a reference to the metadata parent id. This is the `OutputId`
179    /// of the previous component the event was sent through (if any).
180    #[must_use]
181    pub fn upstream_id(&self) -> Option<&OutputId> {
182        self.0.upstream_id.as_deref()
183    }
184
185    /// Sets the `source_id` in the metadata to the provided value.
186    pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
187        self.get_mut().source_id = Some(source_id);
188    }
189
190    /// Sets the `source_type` in the metadata to the provided value.
191    pub fn set_source_type<S: Into<Cow<'static, str>>>(&mut self, source_type: S) {
192        self.get_mut().source_type = Some(source_type.into());
193    }
194
195    /// Sets the `upstream_id` in the metadata to the provided value.
196    pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
197        self.get_mut().upstream_id = Some(upstream_id);
198    }
199
200    /// Return the datadog API key, if it exists
201    pub fn datadog_api_key(&self) -> Option<Arc<str>> {
202        self.0.secrets.get(DATADOG_API_KEY).cloned()
203    }
204
205    /// Set the datadog API key to passed value
206    pub fn set_datadog_api_key(&mut self, secret: Arc<str>) {
207        self.get_mut().secrets.insert(DATADOG_API_KEY, secret);
208    }
209
210    /// Return the splunk hec token, if it exists
211    pub fn splunk_hec_token(&self) -> Option<Arc<str>> {
212        self.0.secrets.get(SPLUNK_HEC_TOKEN).cloned()
213    }
214
215    /// Set the splunk hec token to passed value
216    pub fn set_splunk_hec_token(&mut self, secret: Arc<str>) {
217        self.get_mut().secrets.insert(SPLUNK_HEC_TOKEN, secret);
218    }
219
220    /// Adds the value to the dropped fields list.
221    /// There is currently no way to remove a field from this list, so if a field is dropped
222    /// and then the field is re-added with a new value - the dropped value will still be
223    /// retrieved.
224    pub fn add_dropped_field(&mut self, meaning: KeyString, value: Value) {
225        self.get_mut().dropped_fields.insert(meaning, value);
226    }
227
228    /// Fetches the dropped field by meaning.
229    pub fn dropped_field(&self, meaning: impl AsRef<str>) -> Option<&Value> {
230        self.0.dropped_fields.get(meaning.as_ref())
231    }
232
233    /// Returns a reference to the `DatadogMetricOriginMetadata`.
234    pub fn datadog_origin_metadata(&self) -> Option<&DatadogMetricOriginMetadata> {
235        self.0.datadog_origin_metadata.as_ref()
236    }
237
238    /// Returns a reference to the event id.
239    pub fn source_event_id(&self) -> Option<Uuid> {
240        self.0.source_event_id
241    }
242}
243
244impl Default for Inner {
245    fn default() -> Self {
246        Self {
247            value: Value::Object(ObjectMap::new()),
248            secrets: Secrets::new(),
249            finalizers: Default::default(),
250            schema_definition: default_schema_definition(),
251            source_id: None,
252            source_type: None,
253            upstream_id: None,
254            dropped_fields: ObjectMap::new(),
255            datadog_origin_metadata: None,
256            source_event_id: Some(Uuid::new_v4()),
257        }
258    }
259}
260
261impl Default for EventMetadata {
262    fn default() -> Self {
263        Self(Arc::new(Inner::default()))
264    }
265}
266
267pub(super) fn default_schema_definition() -> Arc<schema::Definition> {
268    Arc::new(schema::Definition::new_with_default_metadata(
269        Kind::any(),
270        [LogNamespace::Legacy, LogNamespace::Vector],
271    ))
272}
273
274impl ByteSizeOf for EventMetadata {
275    fn allocated_bytes(&self) -> usize {
276        // NOTE we don't count the `str` here because it's allocated somewhere
277        // else. We're just moving around the pointer, which is already captured
278        // by `ByteSizeOf::size_of`.
279        self.0.finalizers.allocated_bytes()
280    }
281}
282
283impl EventMetadata {
284    /// Replaces the existing event finalizers with the given one.
285    #[must_use]
286    pub fn with_finalizer(mut self, finalizer: EventFinalizer) -> Self {
287        self.get_mut().finalizers = EventFinalizers::new(finalizer);
288        self
289    }
290
291    /// Replaces the existing event finalizers with the given ones.
292    #[must_use]
293    pub fn with_finalizers(mut self, finalizers: EventFinalizers) -> Self {
294        self.get_mut().finalizers = finalizers;
295        self
296    }
297
298    /// Replace the finalizer with a new one created from the given batch notifier.
299    #[must_use]
300    pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
301        self.with_finalizer(EventFinalizer::new(batch.clone()))
302    }
303
304    /// Replace the finalizer with a new one created from the given optional batch notifier.
305    #[must_use]
306    pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
307        match batch {
308            Some(batch) => self.with_finalizer(EventFinalizer::new(batch.clone())),
309            None => self,
310        }
311    }
312
313    /// Replace the schema definition with the given one.
314    #[must_use]
315    pub fn with_schema_definition(mut self, schema_definition: &Arc<schema::Definition>) -> Self {
316        self.get_mut().schema_definition = Arc::clone(schema_definition);
317        self
318    }
319
320    /// Replaces the existing `source_type` with the given one.
321    #[must_use]
322    pub fn with_source_type<S: Into<Cow<'static, str>>>(mut self, source_type: S) -> Self {
323        self.get_mut().source_type = Some(source_type.into());
324        self
325    }
326
327    /// Replaces the existing `DatadogMetricOriginMetadata` with the given one.
328    #[must_use]
329    pub fn with_origin_metadata(mut self, origin_metadata: DatadogMetricOriginMetadata) -> Self {
330        self.get_mut().datadog_origin_metadata = Some(origin_metadata);
331        self
332    }
333
334    /// Replaces the existing `source_event_id` with the given one.
335    #[must_use]
336    pub fn with_source_event_id(mut self, source_event_id: Option<Uuid>) -> Self {
337        self.get_mut().source_event_id = source_event_id;
338        self
339    }
340
341    /// Merge the other `EventMetadata` into this.
342    /// If a Datadog API key is not set in `self`, the one from `other` will be used.
343    /// If a Splunk HEC token is not set in `self`, the one from `other` will be used.
344    pub fn merge(&mut self, other: Self) {
345        let inner = self.get_mut();
346        let other = other.into_owned();
347        inner.finalizers.merge(other.finalizers);
348        inner.secrets.merge(other.secrets);
349
350        // Update `source_event_id` if necessary.
351        if inner.source_event_id.is_none() {
352            inner.source_event_id = other.source_event_id;
353        }
354    }
355
356    /// Update the finalizer(s) status.
357    pub fn update_status(&self, status: EventStatus) {
358        self.0.finalizers.update_status(status);
359    }
360
361    /// Update the finalizers' sources.
362    pub fn update_sources(&mut self) {
363        self.get_mut().finalizers.update_sources();
364    }
365
366    /// Gets a reference to the event finalizers.
367    pub fn finalizers(&self) -> &EventFinalizers {
368        &self.0.finalizers
369    }
370
371    /// Add a new event finalizer to the existing list of event finalizers.
372    pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
373        self.get_mut().finalizers.add(finalizer);
374    }
375
376    /// Consumes all event finalizers and returns them, leaving the list of event finalizers empty.
377    pub fn take_finalizers(&mut self) -> EventFinalizers {
378        std::mem::take(&mut self.get_mut().finalizers)
379    }
380
381    /// Merges the given event finalizers into the existing list of event finalizers.
382    pub fn merge_finalizers(&mut self, finalizers: EventFinalizers) {
383        self.get_mut().finalizers.merge(finalizers);
384    }
385
386    /// Get the schema definition.
387    pub fn schema_definition(&self) -> &Arc<schema::Definition> {
388        &self.0.schema_definition
389    }
390
391    /// Set the schema definition.
392    pub fn set_schema_definition(&mut self, definition: &Arc<schema::Definition>) {
393        self.get_mut().schema_definition = Arc::clone(definition);
394    }
395
396    /// Helper function to add a semantic meaning to the schema definition.
397    ///
398    /// This replaces the common code sequence of:
399    ///
400    /// ```ignore
401    /// let new_schema = log_event
402    ///     .metadata()
403    ///     .schema_definition()
404    ///     .as_ref()
405    ///     .clone()
406    ///     .with_meaning(target_path, meaning);
407    /// log_event
408    ///     .metadata_mut()
409    ///     .set_schema_definition(new_schema);
410    /// ````
411    pub fn add_schema_meaning(&mut self, target_path: OwnedTargetPath, meaning: &str) {
412        let schema = Arc::make_mut(&mut self.get_mut().schema_definition);
413        schema.add_meaning(target_path, meaning);
414    }
415}
416
417impl EventDataEq for EventMetadata {
418    fn event_data_eq(&self, _other: &Self) -> bool {
419        // Don't compare the metadata, it is not "event data".
420        true
421    }
422}
423
424/// This is a simple wrapper to allow attaching `EventMetadata` to any
425/// other type. This is primarily used in conversion functions, such as
426/// `impl From<X> for WithMetadata<Y>`.
427pub struct WithMetadata<T> {
428    /// The data item being wrapped.
429    pub data: T,
430    /// The additional metadata sidecar.
431    pub metadata: EventMetadata,
432}
433
434impl<T> WithMetadata<T> {
435    /// Convert from one wrapped type to another, where the underlying
436    /// type allows direct conversion.
437    // We would like to `impl From` instead, but this fails due to
438    // conflicting implementations of `impl<T> From<T> for T`.
439    pub fn into<T1: From<T>>(self) -> WithMetadata<T1> {
440        WithMetadata {
441            data: T1::from(self.data),
442            metadata: self.metadata,
443        }
444    }
445}
446
447/// A container that holds secrets.
448#[derive(Clone, Default, Deserialize, Eq, PartialEq, PartialOrd, Serialize)]
449pub struct Secrets(BTreeMap<String, Arc<str>>);
450
451impl fmt::Debug for Secrets {
452    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453        let mut map = f.debug_map();
454        for key in self.0.keys() {
455            map.entry(key, &"<redacted secret>");
456        }
457        map.finish()
458    }
459}
460
461impl Secrets {
462    /// Creates a new, empty container.
463    #[must_use]
464    pub fn new() -> Self {
465        Self(BTreeMap::new())
466    }
467
468    /// Returns `true` if the container contains no secrets.
469    pub fn is_empty(&self) -> bool {
470        self.0.is_empty()
471    }
472
473    /// Gets a secret by its name.
474    #[must_use]
475    pub fn get(&self, key: &str) -> Option<&Arc<str>> {
476        self.0.get(key)
477    }
478
479    /// Inserts a new secret into the container.
480    pub fn insert(&mut self, key: impl Into<String>, value: impl Into<Arc<str>>) {
481        self.0.insert(key.into(), value.into());
482    }
483
484    /// Removes a secret
485    pub fn remove(&mut self, key: &str) {
486        self.0.remove(key);
487    }
488
489    /// Merged both together. If there are collisions, the value from `self` is kept.
490    pub fn merge(&mut self, other: Self) {
491        for (key, value) in other.0 {
492            self.0.entry(key).or_insert(value);
493        }
494    }
495}
496
497impl SecretTarget for Secrets {
498    fn get_secret(&self, key: &str) -> Option<&str> {
499        self.get(key).map(AsRef::as_ref)
500    }
501
502    fn insert_secret(&mut self, key: &str, value: &str) {
503        self.insert(key, value);
504    }
505
506    fn remove_secret(&mut self, key: &str) {
507        self.remove(key);
508    }
509}
510
511impl IntoIterator for Secrets {
512    type Item = (String, Arc<str>);
513    type IntoIter = std::collections::btree_map::IntoIter<String, Arc<str>>;
514
515    fn into_iter(self) -> Self::IntoIter {
516        self.0.into_iter()
517    }
518}
519
520#[cfg(test)]
521mod test {
522    use super::*;
523
524    const SECRET: &str = "secret";
525    const SECRET2: &str = "secret2";
526
527    #[test]
528    fn metadata_hardcoded_secrets_get_set() {
529        let mut metadata = EventMetadata::default();
530        metadata.set_datadog_api_key(Arc::from(SECRET));
531        metadata.set_splunk_hec_token(Arc::from(SECRET2));
532        assert_eq!(metadata.datadog_api_key().unwrap().as_ref(), SECRET);
533        assert_eq!(metadata.splunk_hec_token().unwrap().as_ref(), SECRET2);
534    }
535
536    #[test]
537    fn secrets_merge() {
538        let mut a = Secrets::new();
539        a.insert("key-a", "value-a1");
540        a.insert("key-b", "value-b1");
541
542        let mut b = Secrets::new();
543        b.insert("key-b", "value-b2");
544        b.insert("key-c", "value-c2");
545
546        a.merge(b);
547
548        assert_eq!(a.get("key-a").unwrap().as_ref(), "value-a1");
549        assert_eq!(a.get("key-b").unwrap().as_ref(), "value-b1");
550        assert_eq!(a.get("key-c").unwrap().as_ref(), "value-c2");
551    }
552
553    #[test]
554    fn metadata_source_event_id_merging() {
555        let m1 = EventMetadata::default();
556        let m2 = EventMetadata::default();
557
558        // Always maintain the original source event id when merging, similar to how we handle other metadata.
559        {
560            let mut merged = m1.clone();
561            merged.merge(m2.clone());
562            assert_eq!(merged.source_event_id(), m1.source_event_id());
563        }
564
565        {
566            let mut merged = m2.clone();
567            merged.merge(m1.clone());
568            assert_eq!(merged.source_event_id(), m2.source_event_id());
569        }
570    }
571}