1#![deny(missing_docs)]
2
3use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc};
4
5use derivative::Derivative;
6use lookup::OwnedTargetPath;
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9use vector_common::{byte_size_of::ByteSizeOf, config::ComponentKey, EventDataEq};
10use vrl::{
11 compiler::SecretTarget,
12 value::{KeyString, Kind, Value},
13};
14
15use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, ObjectMap};
16use crate::{
17 config::{LogNamespace, OutputId},
18 schema,
19};
20
21const DATADOG_API_KEY: &str = "datadog_api_key";
22const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token";
23
24#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
27pub struct EventMetadata(pub(super) Arc<Inner>);
28
29#[derive(Clone, Debug, Derivative, Deserialize, Serialize)]
32#[derivative(PartialEq)]
33pub(super) struct Inner {
34 #[serde(default = "default_metadata_value")]
36 pub(crate) value: Value,
37
38 #[serde(default)]
40 pub(crate) secrets: Secrets,
41
42 #[serde(default, skip)]
43 finalizers: EventFinalizers,
44
45 pub(crate) source_id: Option<Arc<ComponentKey>>,
47
48 pub(crate) source_type: Option<Cow<'static, str>>,
50
51 pub(crate) upstream_id: Option<Arc<OutputId>>,
56
57 #[serde(default = "default_schema_definition", skip)]
63 schema_definition: Arc<schema::Definition>,
64
65 dropped_fields: ObjectMap,
72
73 #[serde(default)]
76 pub(crate) datadog_origin_metadata: Option<DatadogMetricOriginMetadata>,
77
78 #[derivative(PartialEq = "ignore")]
80 pub(crate) source_event_id: Option<Uuid>,
81}
82
83#[derive(Clone, Default, Debug, Deserialize, PartialEq, Serialize)]
85pub struct DatadogMetricOriginMetadata {
86 product: Option<u32>,
88 category: Option<u32>,
90 service: Option<u32>,
92}
93
94impl DatadogMetricOriginMetadata {
95 #[must_use]
101 pub fn new(product: Option<u32>, category: Option<u32>, service: Option<u32>) -> Self {
102 Self {
103 product,
104 category,
105 service,
106 }
107 }
108
109 pub fn product(&self) -> Option<u32> {
111 self.product
112 }
113
114 pub fn category(&self) -> Option<u32> {
116 self.category
117 }
118
119 pub fn service(&self) -> Option<u32> {
121 self.service
122 }
123}
124
125fn default_metadata_value() -> Value {
126 Value::Object(ObjectMap::new())
127}
128
129impl EventMetadata {
130 pub fn default_with_value(value: Value) -> Self {
132 Self(Arc::new(Inner {
133 value,
134 ..Default::default()
135 }))
136 }
137
138 fn get_mut(&mut self) -> &mut Inner {
139 Arc::make_mut(&mut self.0)
140 }
141
142 pub(super) fn into_owned(self) -> Inner {
143 Arc::unwrap_or_clone(self.0)
144 }
145
146 pub fn value(&self) -> &Value {
148 &self.0.value
149 }
150
151 pub fn value_mut(&mut self) -> &mut Value {
153 &mut self.get_mut().value
154 }
155
156 pub fn secrets(&self) -> &Secrets {
158 &self.0.secrets
159 }
160
161 pub fn secrets_mut(&mut self) -> &mut Secrets {
163 &mut self.get_mut().secrets
164 }
165
166 #[must_use]
168 pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
169 self.0.source_id.as_ref()
170 }
171
172 #[must_use]
174 pub fn source_type(&self) -> Option<&str> {
175 self.0.source_type.as_deref()
176 }
177
178 #[must_use]
181 pub fn upstream_id(&self) -> Option<&OutputId> {
182 self.0.upstream_id.as_deref()
183 }
184
185 pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
187 self.get_mut().source_id = Some(source_id);
188 }
189
190 pub fn set_source_type<S: Into<Cow<'static, str>>>(&mut self, source_type: S) {
192 self.get_mut().source_type = Some(source_type.into());
193 }
194
195 pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
197 self.get_mut().upstream_id = Some(upstream_id);
198 }
199
200 pub fn datadog_api_key(&self) -> Option<Arc<str>> {
202 self.0.secrets.get(DATADOG_API_KEY).cloned()
203 }
204
205 pub fn set_datadog_api_key(&mut self, secret: Arc<str>) {
207 self.get_mut().secrets.insert(DATADOG_API_KEY, secret);
208 }
209
210 pub fn splunk_hec_token(&self) -> Option<Arc<str>> {
212 self.0.secrets.get(SPLUNK_HEC_TOKEN).cloned()
213 }
214
215 pub fn set_splunk_hec_token(&mut self, secret: Arc<str>) {
217 self.get_mut().secrets.insert(SPLUNK_HEC_TOKEN, secret);
218 }
219
220 pub fn add_dropped_field(&mut self, meaning: KeyString, value: Value) {
225 self.get_mut().dropped_fields.insert(meaning, value);
226 }
227
228 pub fn dropped_field(&self, meaning: impl AsRef<str>) -> Option<&Value> {
230 self.0.dropped_fields.get(meaning.as_ref())
231 }
232
233 pub fn datadog_origin_metadata(&self) -> Option<&DatadogMetricOriginMetadata> {
235 self.0.datadog_origin_metadata.as_ref()
236 }
237
238 pub fn source_event_id(&self) -> Option<Uuid> {
240 self.0.source_event_id
241 }
242}
243
244impl Default for Inner {
245 fn default() -> Self {
246 Self {
247 value: Value::Object(ObjectMap::new()),
248 secrets: Secrets::new(),
249 finalizers: Default::default(),
250 schema_definition: default_schema_definition(),
251 source_id: None,
252 source_type: None,
253 upstream_id: None,
254 dropped_fields: ObjectMap::new(),
255 datadog_origin_metadata: None,
256 source_event_id: Some(Uuid::now_v7()),
257 }
258 }
259}
260
261impl Default for EventMetadata {
262 fn default() -> Self {
263 Self(Arc::new(Inner::default()))
264 }
265}
266
267fn default_schema_definition() -> Arc<schema::Definition> {
268 Arc::new(schema::Definition::new_with_default_metadata(
269 Kind::any(),
270 [LogNamespace::Legacy, LogNamespace::Vector],
271 ))
272}
273
274impl ByteSizeOf for EventMetadata {
275 fn allocated_bytes(&self) -> usize {
276 self.0.finalizers.allocated_bytes()
280 }
281}
282
283impl EventMetadata {
284 #[must_use]
286 pub fn with_finalizer(mut self, finalizer: EventFinalizer) -> Self {
287 self.get_mut().finalizers = EventFinalizers::new(finalizer);
288 self
289 }
290
291 #[must_use]
293 pub fn with_finalizers(mut self, finalizers: EventFinalizers) -> Self {
294 self.get_mut().finalizers = finalizers;
295 self
296 }
297
298 #[must_use]
300 pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
301 self.with_finalizer(EventFinalizer::new(batch.clone()))
302 }
303
304 #[must_use]
306 pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
307 match batch {
308 Some(batch) => self.with_finalizer(EventFinalizer::new(batch.clone())),
309 None => self,
310 }
311 }
312
313 #[must_use]
315 pub fn with_schema_definition(mut self, schema_definition: &Arc<schema::Definition>) -> Self {
316 self.get_mut().schema_definition = Arc::clone(schema_definition);
317 self
318 }
319
320 #[must_use]
322 pub fn with_source_type<S: Into<Cow<'static, str>>>(mut self, source_type: S) -> Self {
323 self.get_mut().source_type = Some(source_type.into());
324 self
325 }
326
327 #[must_use]
329 pub fn with_origin_metadata(mut self, origin_metadata: DatadogMetricOriginMetadata) -> Self {
330 self.get_mut().datadog_origin_metadata = Some(origin_metadata);
331 self
332 }
333
334 #[must_use]
336 pub fn with_source_event_id(mut self, source_event_id: Option<Uuid>) -> Self {
337 self.get_mut().source_event_id = source_event_id;
338 self
339 }
340
341 pub fn merge(&mut self, other: Self) {
345 let inner = self.get_mut();
346 let other = other.into_owned();
347 inner.finalizers.merge(other.finalizers);
348 inner.secrets.merge(other.secrets);
349
350 match (inner.source_event_id, other.source_event_id) {
352 (None, Some(id)) => {
353 inner.source_event_id = Some(id);
354 }
355 (Some(uuid1), Some(uuid2)) if uuid2 < uuid1 => {
356 inner.source_event_id = Some(uuid2);
357 }
358 _ => {} }
360 }
361
362 pub fn update_status(&self, status: EventStatus) {
364 self.0.finalizers.update_status(status);
365 }
366
367 pub fn update_sources(&mut self) {
369 self.get_mut().finalizers.update_sources();
370 }
371
372 pub fn finalizers(&self) -> &EventFinalizers {
374 &self.0.finalizers
375 }
376
377 pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
379 self.get_mut().finalizers.add(finalizer);
380 }
381
382 pub fn take_finalizers(&mut self) -> EventFinalizers {
384 std::mem::take(&mut self.get_mut().finalizers)
385 }
386
387 pub fn merge_finalizers(&mut self, finalizers: EventFinalizers) {
389 self.get_mut().finalizers.merge(finalizers);
390 }
391
392 pub fn schema_definition(&self) -> &Arc<schema::Definition> {
394 &self.0.schema_definition
395 }
396
397 pub fn set_schema_definition(&mut self, definition: &Arc<schema::Definition>) {
399 self.get_mut().schema_definition = Arc::clone(definition);
400 }
401
402 pub fn add_schema_meaning(&mut self, target_path: OwnedTargetPath, meaning: &str) {
418 let schema = Arc::make_mut(&mut self.get_mut().schema_definition);
419 schema.add_meaning(target_path, meaning);
420 }
421}
422
423impl EventDataEq for EventMetadata {
424 fn event_data_eq(&self, _other: &Self) -> bool {
425 true
427 }
428}
429
430pub struct WithMetadata<T> {
434 pub data: T,
436 pub metadata: EventMetadata,
438}
439
440impl<T> WithMetadata<T> {
441 pub fn into<T1: From<T>>(self) -> WithMetadata<T1> {
446 WithMetadata {
447 data: T1::from(self.data),
448 metadata: self.metadata,
449 }
450 }
451}
452
453#[derive(Clone, Default, Deserialize, Eq, PartialEq, PartialOrd, Serialize)]
455pub struct Secrets(BTreeMap<String, Arc<str>>);
456
457impl fmt::Debug for Secrets {
458 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
459 let mut map = f.debug_map();
460 for key in self.0.keys() {
461 map.entry(key, &"<redacted secret>");
462 }
463 map.finish()
464 }
465}
466
467impl Secrets {
468 #[must_use]
470 pub fn new() -> Self {
471 Self(BTreeMap::new())
472 }
473
474 pub fn is_empty(&self) -> bool {
476 self.0.is_empty()
477 }
478
479 #[must_use]
481 pub fn get(&self, key: &str) -> Option<&Arc<str>> {
482 self.0.get(key)
483 }
484
485 pub fn insert(&mut self, key: impl Into<String>, value: impl Into<Arc<str>>) {
487 self.0.insert(key.into(), value.into());
488 }
489
490 pub fn remove(&mut self, key: &str) {
492 self.0.remove(key);
493 }
494
495 pub fn merge(&mut self, other: Self) {
497 for (key, value) in other.0 {
498 self.0.entry(key).or_insert(value);
499 }
500 }
501}
502
503impl SecretTarget for Secrets {
504 fn get_secret(&self, key: &str) -> Option<&str> {
505 self.get(key).map(AsRef::as_ref)
506 }
507
508 fn insert_secret(&mut self, key: &str, value: &str) {
509 self.insert(key, value);
510 }
511
512 fn remove_secret(&mut self, key: &str) {
513 self.remove(key);
514 }
515}
516
517impl IntoIterator for Secrets {
518 type Item = (String, Arc<str>);
519 type IntoIter = std::collections::btree_map::IntoIter<String, Arc<str>>;
520
521 fn into_iter(self) -> Self::IntoIter {
522 self.0.into_iter()
523 }
524}
525
526#[cfg(test)]
527mod test {
528 use super::*;
529
530 const SECRET: &str = "secret";
531 const SECRET2: &str = "secret2";
532
533 #[test]
534 fn metadata_hardcoded_secrets_get_set() {
535 let mut metadata = EventMetadata::default();
536 metadata.set_datadog_api_key(Arc::from(SECRET));
537 metadata.set_splunk_hec_token(Arc::from(SECRET2));
538 assert_eq!(metadata.datadog_api_key().unwrap().as_ref(), SECRET);
539 assert_eq!(metadata.splunk_hec_token().unwrap().as_ref(), SECRET2);
540 }
541
542 #[test]
543 fn secrets_merge() {
544 let mut a = Secrets::new();
545 a.insert("key-a", "value-a1");
546 a.insert("key-b", "value-b1");
547
548 let mut b = Secrets::new();
549 b.insert("key-b", "value-b2");
550 b.insert("key-c", "value-c2");
551
552 a.merge(b);
553
554 assert_eq!(a.get("key-a").unwrap().as_ref(), "value-a1");
555 assert_eq!(a.get("key-b").unwrap().as_ref(), "value-b1");
556 assert_eq!(a.get("key-c").unwrap().as_ref(), "value-c2");
557 }
558
559 #[test]
560 fn metadata_source_event_id_merging() {
561 let m1 = EventMetadata::default();
562 let m2 = EventMetadata::default();
563
564 {
565 let mut merged = m1.clone();
566 merged.merge(m2.clone());
567 assert_eq!(merged.source_event_id(), m1.source_event_id());
568 }
569
570 {
571 let mut merged = m2.clone();
572 merged.merge(m1.clone());
573 assert_eq!(merged.source_event_id(), m1.source_event_id());
574 }
575 }
576}