1#![deny(missing_docs)]
2
3use std::{borrow::Cow, collections::BTreeMap, fmt, sync::Arc};
4
5use derivative::Derivative;
6use lookup::OwnedTargetPath;
7use serde::{Deserialize, Serialize};
8use uuid::Uuid;
9use vector_common::{EventDataEq, byte_size_of::ByteSizeOf, config::ComponentKey};
10use vrl::{
11 compiler::SecretTarget,
12 value::{KeyString, Kind, Value},
13};
14
15use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus, ObjectMap};
16use crate::{
17 config::{LogNamespace, OutputId},
18 schema,
19};
20
21const DATADOG_API_KEY: &str = "datadog_api_key";
22const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token";
23
24#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
27pub struct EventMetadata(pub(super) Arc<Inner>);
28
29#[derive(Clone, Debug, Derivative, Deserialize, Serialize)]
32#[derivative(PartialEq)]
33pub(super) struct Inner {
34 #[serde(default = "default_metadata_value")]
36 pub(crate) value: Value,
37
38 #[serde(default)]
40 pub(crate) secrets: Secrets,
41
42 #[serde(default, skip)]
43 pub(crate) finalizers: EventFinalizers,
44
45 pub(crate) source_id: Option<Arc<ComponentKey>>,
47
48 pub(crate) source_type: Option<Cow<'static, str>>,
50
51 pub(crate) upstream_id: Option<Arc<OutputId>>,
56
57 #[serde(default = "default_schema_definition", skip)]
63 pub(crate) schema_definition: Arc<schema::Definition>,
64
65 pub(crate) dropped_fields: ObjectMap,
72
73 #[serde(default)]
76 pub(crate) datadog_origin_metadata: Option<DatadogMetricOriginMetadata>,
77
78 #[derivative(PartialEq = "ignore")]
80 pub(crate) source_event_id: Option<Uuid>,
81}
82
83#[derive(Clone, Default, Debug, Deserialize, PartialEq, Serialize)]
85pub struct DatadogMetricOriginMetadata {
86 product: Option<u32>,
88 category: Option<u32>,
90 service: Option<u32>,
92}
93
94impl DatadogMetricOriginMetadata {
95 #[must_use]
101 pub fn new(product: Option<u32>, category: Option<u32>, service: Option<u32>) -> Self {
102 Self {
103 product,
104 category,
105 service,
106 }
107 }
108
109 pub fn product(&self) -> Option<u32> {
111 self.product
112 }
113
114 pub fn category(&self) -> Option<u32> {
116 self.category
117 }
118
119 pub fn service(&self) -> Option<u32> {
121 self.service
122 }
123}
124
125fn default_metadata_value() -> Value {
126 Value::Object(ObjectMap::new())
127}
128
129impl EventMetadata {
130 pub fn default_with_value(value: Value) -> Self {
132 Self(Arc::new(Inner {
133 value,
134 ..Default::default()
135 }))
136 }
137
138 fn get_mut(&mut self) -> &mut Inner {
139 Arc::make_mut(&mut self.0)
140 }
141
142 pub(super) fn into_owned(self) -> Inner {
143 Arc::unwrap_or_clone(self.0)
144 }
145
146 pub fn value(&self) -> &Value {
148 &self.0.value
149 }
150
151 pub fn value_mut(&mut self) -> &mut Value {
153 &mut self.get_mut().value
154 }
155
156 pub fn secrets(&self) -> &Secrets {
158 &self.0.secrets
159 }
160
161 pub fn secrets_mut(&mut self) -> &mut Secrets {
163 &mut self.get_mut().secrets
164 }
165
166 #[must_use]
168 pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
169 self.0.source_id.as_ref()
170 }
171
172 #[must_use]
174 pub fn source_type(&self) -> Option<&str> {
175 self.0.source_type.as_deref()
176 }
177
178 #[must_use]
181 pub fn upstream_id(&self) -> Option<&OutputId> {
182 self.0.upstream_id.as_deref()
183 }
184
185 pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
187 self.get_mut().source_id = Some(source_id);
188 }
189
190 pub fn set_source_type<S: Into<Cow<'static, str>>>(&mut self, source_type: S) {
192 self.get_mut().source_type = Some(source_type.into());
193 }
194
195 pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
197 self.get_mut().upstream_id = Some(upstream_id);
198 }
199
200 pub fn datadog_api_key(&self) -> Option<Arc<str>> {
202 self.0.secrets.get(DATADOG_API_KEY).cloned()
203 }
204
205 pub fn set_datadog_api_key(&mut self, secret: Arc<str>) {
207 self.get_mut().secrets.insert(DATADOG_API_KEY, secret);
208 }
209
210 pub fn splunk_hec_token(&self) -> Option<Arc<str>> {
212 self.0.secrets.get(SPLUNK_HEC_TOKEN).cloned()
213 }
214
215 pub fn set_splunk_hec_token(&mut self, secret: Arc<str>) {
217 self.get_mut().secrets.insert(SPLUNK_HEC_TOKEN, secret);
218 }
219
220 pub fn add_dropped_field(&mut self, meaning: KeyString, value: Value) {
225 self.get_mut().dropped_fields.insert(meaning, value);
226 }
227
228 pub fn dropped_field(&self, meaning: impl AsRef<str>) -> Option<&Value> {
230 self.0.dropped_fields.get(meaning.as_ref())
231 }
232
233 pub fn datadog_origin_metadata(&self) -> Option<&DatadogMetricOriginMetadata> {
235 self.0.datadog_origin_metadata.as_ref()
236 }
237
238 pub fn source_event_id(&self) -> Option<Uuid> {
240 self.0.source_event_id
241 }
242}
243
244impl Default for Inner {
245 fn default() -> Self {
246 Self {
247 value: Value::Object(ObjectMap::new()),
248 secrets: Secrets::new(),
249 finalizers: Default::default(),
250 schema_definition: default_schema_definition(),
251 source_id: None,
252 source_type: None,
253 upstream_id: None,
254 dropped_fields: ObjectMap::new(),
255 datadog_origin_metadata: None,
256 source_event_id: Some(Uuid::new_v4()),
257 }
258 }
259}
260
261impl Default for EventMetadata {
262 fn default() -> Self {
263 Self(Arc::new(Inner::default()))
264 }
265}
266
267pub(super) fn default_schema_definition() -> Arc<schema::Definition> {
268 Arc::new(schema::Definition::new_with_default_metadata(
269 Kind::any(),
270 [LogNamespace::Legacy, LogNamespace::Vector],
271 ))
272}
273
274impl ByteSizeOf for EventMetadata {
275 fn allocated_bytes(&self) -> usize {
276 self.0.finalizers.allocated_bytes()
280 }
281}
282
283impl EventMetadata {
284 #[must_use]
286 pub fn with_finalizer(mut self, finalizer: EventFinalizer) -> Self {
287 self.get_mut().finalizers = EventFinalizers::new(finalizer);
288 self
289 }
290
291 #[must_use]
293 pub fn with_finalizers(mut self, finalizers: EventFinalizers) -> Self {
294 self.get_mut().finalizers = finalizers;
295 self
296 }
297
298 #[must_use]
300 pub fn with_batch_notifier(self, batch: &BatchNotifier) -> Self {
301 self.with_finalizer(EventFinalizer::new(batch.clone()))
302 }
303
304 #[must_use]
306 pub fn with_batch_notifier_option(self, batch: &Option<BatchNotifier>) -> Self {
307 match batch {
308 Some(batch) => self.with_finalizer(EventFinalizer::new(batch.clone())),
309 None => self,
310 }
311 }
312
313 #[must_use]
315 pub fn with_schema_definition(mut self, schema_definition: &Arc<schema::Definition>) -> Self {
316 self.get_mut().schema_definition = Arc::clone(schema_definition);
317 self
318 }
319
320 #[must_use]
322 pub fn with_source_type<S: Into<Cow<'static, str>>>(mut self, source_type: S) -> Self {
323 self.get_mut().source_type = Some(source_type.into());
324 self
325 }
326
327 #[must_use]
329 pub fn with_origin_metadata(mut self, origin_metadata: DatadogMetricOriginMetadata) -> Self {
330 self.get_mut().datadog_origin_metadata = Some(origin_metadata);
331 self
332 }
333
334 #[must_use]
336 pub fn with_source_event_id(mut self, source_event_id: Option<Uuid>) -> Self {
337 self.get_mut().source_event_id = source_event_id;
338 self
339 }
340
341 pub fn merge(&mut self, other: Self) {
345 let inner = self.get_mut();
346 let other = other.into_owned();
347 inner.finalizers.merge(other.finalizers);
348 inner.secrets.merge(other.secrets);
349
350 if inner.source_event_id.is_none() {
352 inner.source_event_id = other.source_event_id;
353 }
354 }
355
356 pub fn update_status(&self, status: EventStatus) {
358 self.0.finalizers.update_status(status);
359 }
360
361 pub fn update_sources(&mut self) {
363 self.get_mut().finalizers.update_sources();
364 }
365
366 pub fn finalizers(&self) -> &EventFinalizers {
368 &self.0.finalizers
369 }
370
371 pub fn add_finalizer(&mut self, finalizer: EventFinalizer) {
373 self.get_mut().finalizers.add(finalizer);
374 }
375
376 pub fn take_finalizers(&mut self) -> EventFinalizers {
378 std::mem::take(&mut self.get_mut().finalizers)
379 }
380
381 pub fn merge_finalizers(&mut self, finalizers: EventFinalizers) {
383 self.get_mut().finalizers.merge(finalizers);
384 }
385
386 pub fn schema_definition(&self) -> &Arc<schema::Definition> {
388 &self.0.schema_definition
389 }
390
391 pub fn set_schema_definition(&mut self, definition: &Arc<schema::Definition>) {
393 self.get_mut().schema_definition = Arc::clone(definition);
394 }
395
396 pub fn add_schema_meaning(&mut self, target_path: OwnedTargetPath, meaning: &str) {
412 let schema = Arc::make_mut(&mut self.get_mut().schema_definition);
413 schema.add_meaning(target_path, meaning);
414 }
415}
416
417impl EventDataEq for EventMetadata {
418 fn event_data_eq(&self, _other: &Self) -> bool {
419 true
421 }
422}
423
424pub struct WithMetadata<T> {
428 pub data: T,
430 pub metadata: EventMetadata,
432}
433
434impl<T> WithMetadata<T> {
435 pub fn into<T1: From<T>>(self) -> WithMetadata<T1> {
440 WithMetadata {
441 data: T1::from(self.data),
442 metadata: self.metadata,
443 }
444 }
445}
446
447#[derive(Clone, Default, Deserialize, Eq, PartialEq, PartialOrd, Serialize)]
449pub struct Secrets(BTreeMap<String, Arc<str>>);
450
451impl fmt::Debug for Secrets {
452 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
453 let mut map = f.debug_map();
454 for key in self.0.keys() {
455 map.entry(key, &"<redacted secret>");
456 }
457 map.finish()
458 }
459}
460
461impl Secrets {
462 #[must_use]
464 pub fn new() -> Self {
465 Self(BTreeMap::new())
466 }
467
468 pub fn is_empty(&self) -> bool {
470 self.0.is_empty()
471 }
472
473 #[must_use]
475 pub fn get(&self, key: &str) -> Option<&Arc<str>> {
476 self.0.get(key)
477 }
478
479 pub fn insert(&mut self, key: impl Into<String>, value: impl Into<Arc<str>>) {
481 self.0.insert(key.into(), value.into());
482 }
483
484 pub fn remove(&mut self, key: &str) {
486 self.0.remove(key);
487 }
488
489 pub fn merge(&mut self, other: Self) {
491 for (key, value) in other.0 {
492 self.0.entry(key).or_insert(value);
493 }
494 }
495}
496
497impl SecretTarget for Secrets {
498 fn get_secret(&self, key: &str) -> Option<&str> {
499 self.get(key).map(AsRef::as_ref)
500 }
501
502 fn insert_secret(&mut self, key: &str, value: &str) {
503 self.insert(key, value);
504 }
505
506 fn remove_secret(&mut self, key: &str) {
507 self.remove(key);
508 }
509}
510
511impl IntoIterator for Secrets {
512 type Item = (String, Arc<str>);
513 type IntoIter = std::collections::btree_map::IntoIter<String, Arc<str>>;
514
515 fn into_iter(self) -> Self::IntoIter {
516 self.0.into_iter()
517 }
518}
519
520#[cfg(test)]
521mod test {
522 use super::*;
523
524 const SECRET: &str = "secret";
525 const SECRET2: &str = "secret2";
526
527 #[test]
528 fn metadata_hardcoded_secrets_get_set() {
529 let mut metadata = EventMetadata::default();
530 metadata.set_datadog_api_key(Arc::from(SECRET));
531 metadata.set_splunk_hec_token(Arc::from(SECRET2));
532 assert_eq!(metadata.datadog_api_key().unwrap().as_ref(), SECRET);
533 assert_eq!(metadata.splunk_hec_token().unwrap().as_ref(), SECRET2);
534 }
535
536 #[test]
537 fn secrets_merge() {
538 let mut a = Secrets::new();
539 a.insert("key-a", "value-a1");
540 a.insert("key-b", "value-b1");
541
542 let mut b = Secrets::new();
543 b.insert("key-b", "value-b2");
544 b.insert("key-c", "value-c2");
545
546 a.merge(b);
547
548 assert_eq!(a.get("key-a").unwrap().as_ref(), "value-a1");
549 assert_eq!(a.get("key-b").unwrap().as_ref(), "value-b1");
550 assert_eq!(a.get("key-c").unwrap().as_ref(), "value-c2");
551 }
552
553 #[test]
554 fn metadata_source_event_id_merging() {
555 let m1 = EventMetadata::default();
556 let m2 = EventMetadata::default();
557
558 {
560 let mut merged = m1.clone();
561 merged.merge(m2.clone());
562 assert_eq!(merged.source_event_id(), m1.source_event_id());
563 }
564
565 {
566 let mut merged = m2.clone();
567 merged.merge(m1.clone());
568 assert_eq!(merged.source_event_id(), m2.source_event_id());
569 }
570 }
571}