1#![deny(missing_docs)]
2
3use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc, time::Instant};
4
5use derivative::Derivative;
6use lookup::OwnedTargetPath;
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9use vector_common::{EventDataEq, byte_size_of::ByteSizeOf, config::ComponentKey};
10use vrl::{
11 compiler::SecretTarget,
12 value::{KeyString, Kind, Value},
13};
14
15use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, ObjectMap};
16use crate::{
17 config::{LogNamespace, OutputId},
18 schema,
19};
20
21const DATADOG_API_KEY: &str = "datadog_api_key";
22const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token";
23
24#[derive(Clone, Debug, Derivative, Deserialize, Serialize)]
27#[derivative(PartialEq)]
28pub struct EventMetadata {
29 #[serde(flatten)]
30 pub(super) inner: Arc<Inner>,
31
32 #[derivative(PartialEq = "ignore")]
34 #[serde(default, skip)]
35 pub(crate) last_transform_timestamp: Option<Instant>,
36}
37
38#[derive(Clone, Debug, Derivative, Deserialize, Serialize)]
41#[derivative(PartialEq)]
42pub(super) struct Inner {
43 #[serde(default = "default_metadata_value")]
45 pub(crate) value: Value,
46
47 #[serde(default)]
49 pub(crate) secrets: Secrets,
50
51 #[serde(default, skip)]
52 pub(crate) finalizers: EventFinalizers,
53
54 pub(crate) source_id: Option<Arc<ComponentKey>>,
56
57 pub(crate) source_type: Option<Cow<'static, str>>,
59
60 pub(crate) upstream_id: Option<Arc<OutputId>>,
65
66 #[serde(default = "default_schema_definition", skip)]
72 pub(crate) schema_definition: Arc<schema::Definition>,
73
74 pub(crate) dropped_fields: ObjectMap,
81
82 #[serde(default)]
85 pub(crate) datadog_origin_metadata: Option<DatadogMetricOriginMetadata>,
86
87 #[derivative(PartialEq = "ignore")]
89 pub(crate) source_event_id: Option<Uuid>,
90}
91
92#[derive(Clone, Default, Debug, Deserialize, PartialEq, Serialize)]
94pub struct DatadogMetricOriginMetadata {
95 product: Option<u32>,
97 category: Option<u32>,
99 service: Option<u32>,
101}
102
103impl DatadogMetricOriginMetadata {
104 #[must_use]
110 pub fn new(product: Option<u32>, category: Option<u32>, service: Option<u32>) -> Self {
111 Self {
112 product,
113 category,
114 service,
115 }
116 }
117
118 pub fn product(&self) -> Option<u32> {
120 self.product
121 }
122
123 pub fn category(&self) -> Option<u32> {
125 self.category
126 }
127
128 pub fn service(&self) -> Option<u32> {
130 self.service
131 }
132}
133
134fn default_metadata_value() -> Value {
135 Value::Object(ObjectMap::new())
136}
137
138impl EventMetadata {
139 pub fn default_with_value(value: Value) -> Self {
141 Self {
142 inner: Arc::new(Inner {
143 value,
144 ..Default::default()
145 }),
146 last_transform_timestamp: None,
147 }
148 }
149
150 fn get_mut(&mut self) -> &mut Inner {
151 Arc::make_mut(&mut self.inner)
152 }
153
154 pub(super) fn into_owned(self) -> Inner {
155 Arc::unwrap_or_clone(self.inner)
156 }
157
158 pub fn value(&self) -> &Value {
160 &self.inner.value
161 }
162
163 pub fn value_mut(&mut self) -> &mut Value {
165 &mut self.get_mut().value
166 }
167
168 pub fn secrets(&self) -> &Secrets {
170 &self.inner.secrets
171 }
172
173 pub fn secrets_mut(&mut self) -> &mut Secrets {
175 &mut self.get_mut().secrets
176 }
177
178 #[must_use]
180 pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
181 self.inner.source_id.as_ref()
182 }
183
184 #[must_use]
186 pub fn source_type(&self) -> Option<&str> {
187 self.inner.source_type.as_deref()
188 }
189
190 #[must_use]
193 pub fn upstream_id(&self) -> Option<&OutputId> {
194 self.inner.upstream_id.as_deref()
195 }
196
197 pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
199 self.get_mut().source_id = Some(source_id);
200 }
201
202 pub fn set_source_type<S: Into<Cow<'static, str>>>(&mut self, source_type: S) {
204 self.get_mut().source_type = Some(source_type.into());
205 }
206
207 pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
209 self.get_mut().upstream_id = Some(upstream_id);
210 }
211
212 pub fn datadog_api_key(&self) -> Option<Arc<str>> {
214 self.inner.secrets.get(DATADOG_API_KEY).cloned()
215 }
216
217 pub fn set_datadog_api_key(&mut self, secret: Arc<str>) {
219 self.get_mut().secrets.insert(DATADOG_API_KEY, secret);
220 }
221
222 pub fn splunk_hec_token(&self) -> Option<Arc<str>> {
224 self.inner.secrets.get(SPLUNK_HEC_TOKEN).cloned()
225 }
226
227 pub fn set_splunk_hec_token(&mut self, secret: Arc<str>) {
229 self.get_mut().secrets.insert(SPLUNK_HEC_TOKEN, secret);
230 }
231
232 pub fn add_dropped_field(&mut self, meaning: KeyString, value: Value) {
237 self.get_mut().dropped_fields.insert(meaning, value);
238 }
239
240 pub fn dropped_field(&self, meaning: impl AsRef<str>) -> Option<&Value> {
242 self.inner.dropped_fields.get(meaning.as_ref())
243 }
244
245 pub fn datadog_origin_metadata(&self) -> Option<&DatadogMetricOriginMetadata> {
247 self.inner.datadog_origin_metadata.as_ref()
248 }
249
250 pub fn source_event_id(&self) -> Option<Uuid> {
252 self.inner.source_event_id
253 }
254
255 #[must_use]
257 pub fn last_transform_timestamp(&self) -> Option<Instant> {
258 self.last_transform_timestamp
259 }
260
261 pub fn set_last_transform_timestamp(&mut self, timestamp: Instant) {
263 self.last_transform_timestamp = Some(timestamp);
264 }
265}
266
267impl Default for Inner {
268 fn default() -> Self {
269 Self {
270 value: Value::Object(ObjectMap::new()),
271 secrets: Secrets::new(),
272 finalizers: Default::default(),
273 schema_definition: default_schema_definition(),
274 source_id: None,
275 source_type: None,
276 upstream_id: None,
277 dropped_fields: ObjectMap::new(),
278 datadog_origin_metadata: None,
279 source_event_id: Some(Uuid::new_v4()),
280 }
281 }
282}
283
284impl Default for EventMetadata {
285 fn default() -> Self {
286 Self {
287 inner: Arc::new(Inner::default()),
288 last_transform_timestamp: None,
289 }
290 }
291}
292
293pub(super) fn default_schema_definition() -> Arc<schema::Definition> {
294 Arc::new(schema::Definition::new_with_default_metadata(
295 Kind::any(),
296 [LogNamespace::Legacy, LogNamespace::Vector],
297 ))
298}
299
300impl ByteSizeOf for EventMetadata {
301 fn allocated_bytes(&self) -> usize {
302 self.inner.finalizers.allocated_bytes()
306 }
307}
308
309impl EventMetadata {
310 #[must_use]
312 pub fn with_finalizer(mut self, finalizer: EventFinalizer) -> Self {
313 self.get_mut().finalizers = EventFinalizers::new(finalizer);
314 self
315 }
316
317 #[must_use]
319 pub fn with_finalizers(mut self, finalizers: EventFinalizers) -> Self {
320 self.get_mut().finalizers = finalizers;
321 self
322 }
323
324 #[must_use]
326 pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
327 self.with_finalizer(EventFinalizer::new(batch.clone()))
328 }
329
330 #[must_use]
332 pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
333 match batch {
334 Some(batch) => self.with_finalizer(EventFinalizer::new(batch.clone())),
335 None => self,
336 }
337 }
338
339 #[must_use]
341 pub fn with_schema_definition(mut self, schema_definition: &Arc<schema::Definition>) -> Self {
342 self.get_mut().schema_definition = Arc::clone(schema_definition);
343 self
344 }
345
346 #[must_use]
348 pub fn with_source_type<S: Into<Cow<'static, str>>>(mut self, source_type: S) -> Self {
349 self.get_mut().source_type = Some(source_type.into());
350 self
351 }
352
353 #[must_use]
355 pub fn with_origin_metadata(mut self, origin_metadata: DatadogMetricOriginMetadata) -> Self {
356 self.get_mut().datadog_origin_metadata = Some(origin_metadata);
357 self
358 }
359
360 #[must_use]
362 pub fn with_source_event_id(mut self, source_event_id: Option<Uuid>) -> Self {
363 self.get_mut().source_event_id = source_event_id;
364 self
365 }
366
367 pub fn merge(&mut self, other: Self) {
371 let other_timestamp = other.last_transform_timestamp;
372 let inner = self.get_mut();
373 let other = other.into_owned();
374 inner.finalizers.merge(other.finalizers);
375 inner.secrets.merge(other.secrets);
376
377 if inner.source_event_id.is_none() {
379 inner.source_event_id = other.source_event_id;
380 }
381
382 match (self.last_transform_timestamp, other_timestamp) {
384 (Some(self_ts), Some(other_ts)) => {
385 if other_ts < self_ts {
386 self.last_transform_timestamp = Some(other_ts);
387 }
388 }
389 (None, Some(other_ts)) => {
390 self.last_transform_timestamp = Some(other_ts);
391 }
392 _ => {}
393 }
394 }
395
396 pub fn update_status(&self, status: EventStatus) {
398 self.inner.finalizers.update_status(status);
399 }
400
401 pub fn update_sources(&mut self) {
403 self.get_mut().finalizers.update_sources();
404 }
405
406 pub fn finalizers(&self) -> &EventFinalizers {
408 &self.inner.finalizers
409 }
410
411 pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
413 self.get_mut().finalizers.add(finalizer);
414 }
415
416 pub fn take_finalizers(&mut self) -> EventFinalizers {
418 std::mem::take(&mut self.get_mut().finalizers)
419 }
420
421 pub fn merge_finalizers(&mut self, finalizers: EventFinalizers) {
423 self.get_mut().finalizers.merge(finalizers);
424 }
425
426 pub fn schema_definition(&self) -> &Arc<schema::Definition> {
428 &self.inner.schema_definition
429 }
430
431 pub fn set_schema_definition(&mut self, definition: &Arc<schema::Definition>) {
433 self.get_mut().schema_definition = Arc::clone(definition);
434 }
435
436 pub fn add_schema_meaning(&mut self, target_path: OwnedTargetPath, meaning: &str) {
452 let schema = Arc::make_mut(&mut self.get_mut().schema_definition);
453 schema.add_meaning(target_path, meaning);
454 }
455}
456
457impl EventDataEq for EventMetadata {
458 fn event_data_eq(&self, _other: &Self) -> bool {
459 true
461 }
462}
463
464pub struct WithMetadata<T> {
468 pub data: T,
470 pub metadata: EventMetadata,
472}
473
474impl<T> WithMetadata<T> {
475 pub fn into<T1: From<T>>(self) -> WithMetadata<T1> {
480 WithMetadata {
481 data: T1::from(self.data),
482 metadata: self.metadata,
483 }
484 }
485}
486
487#[derive(Clone, Default, Deserialize, Eq, PartialEq, PartialOrd, Serialize)]
489pub struct Secrets(BTreeMap<String, Arc<str>>);
490
491impl fmt::Debug for Secrets {
492 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
493 let mut map = f.debug_map();
494 for key in self.0.keys() {
495 map.entry(key, &"<redacted secret>");
496 }
497 map.finish()
498 }
499}
500
501impl Secrets {
502 #[must_use]
504 pub fn new() -> Self {
505 Self(BTreeMap::new())
506 }
507
508 pub fn is_empty(&self) -> bool {
510 self.0.is_empty()
511 }
512
513 #[must_use]
515 pub fn get(&self, key: &str) -> Option<&Arc<str>> {
516 self.0.get(key)
517 }
518
519 pub fn insert(&mut self, key: impl Into<String>, value: impl Into<Arc<str>>) {
521 self.0.insert(key.into(), value.into());
522 }
523
524 pub fn remove(&mut self, key: &str) {
526 self.0.remove(key);
527 }
528
529 pub fn merge(&mut self, other: Self) {
531 for (key, value) in other.0 {
532 self.0.entry(key).or_insert(value);
533 }
534 }
535}
536
537impl SecretTarget for Secrets {
538 fn get_secret(&self, key: &str) -> Option<&str> {
539 self.get(key).map(AsRef::as_ref)
540 }
541
542 fn insert_secret(&mut self, key: &str, value: &str) {
543 self.insert(key, value);
544 }
545
546 fn remove_secret(&mut self, key: &str) {
547 self.remove(key);
548 }
549}
550
551impl IntoIterator for Secrets {
552 type Item = (String, Arc<str>);
553 type IntoIter = std::collections::btree_map::IntoIter<String, Arc<str>>;
554
555 fn into_iter(self) -> Self::IntoIter {
556 self.0.into_iter()
557 }
558}
559
560#[cfg(test)]
561mod test {
562 use super::*;
563
564 const SECRET: &str = "secret";
565 const SECRET2: &str = "secret2";
566
567 #[test]
568 fn metadata_hardcoded_secrets_get_set() {
569 let mut metadata = EventMetadata::default();
570 metadata.set_datadog_api_key(Arc::from(SECRET));
571 metadata.set_splunk_hec_token(Arc::from(SECRET2));
572 assert_eq!(metadata.datadog_api_key().unwrap().as_ref(), SECRET);
573 assert_eq!(metadata.splunk_hec_token().unwrap().as_ref(), SECRET2);
574 }
575
576 #[test]
577 fn secrets_merge() {
578 let mut a = Secrets::new();
579 a.insert("key-a", "value-a1");
580 a.insert("key-b", "value-b1");
581
582 let mut b = Secrets::new();
583 b.insert("key-b", "value-b2");
584 b.insert("key-c", "value-c2");
585
586 a.merge(b);
587
588 assert_eq!(a.get("key-a").unwrap().as_ref(), "value-a1");
589 assert_eq!(a.get("key-b").unwrap().as_ref(), "value-b1");
590 assert_eq!(a.get("key-c").unwrap().as_ref(), "value-c2");
591 }
592
593 #[test]
594 fn metadata_source_event_id_merging() {
595 let m1 = EventMetadata::default();
596 let m2 = EventMetadata::default();
597
598 {
600 let mut merged = m1.clone();
601 merged.merge(m2.clone());
602 assert_eq!(merged.source_event_id(), m1.source_event_id());
603 }
604
605 {
606 let mut merged = m2.clone();
607 merged.merge(m1.clone());
608 assert_eq!(merged.source_event_id(), m2.source_event_id());
609 }
610 }
611}