vector/sinks/pulsar/
util.rs1use std::collections::HashMap;
2
3use bytes::Bytes;
4use vector_lib::{event::Event, lookup::lookup_v2::OptionalTargetPath};
5use vrl::value::{KeyString, Value};
6
7use crate::{
8 internal_events::PulsarPropertyExtractionError,
9 sinks::pulsar::{config::PulsarSinkConfig, sink::PulsarEvent},
10 template::Template,
11};
12
13pub(super) fn make_pulsar_event(
16 topic: &Template,
17 config: &PulsarSinkConfig,
18 event: Event,
19) -> Option<PulsarEvent> {
20 let topic = topic.render_string(&event).ok()?;
21 let key = get_key(&event, &config.partition_key_field);
22 let timestamp_millis = get_timestamp_millis(&event);
23 let properties = get_properties(&event, &config.properties_key);
24 Some(PulsarEvent {
25 event,
26 topic,
27 key,
28 timestamp_millis,
29 properties,
30 })
31}
32
33fn get_key(event: &Event, partition_key_field: &Option<OptionalTargetPath>) -> Option<Bytes> {
34 partition_key_field
35 .as_ref()
36 .and_then(|partition_key_field| match event {
37 Event::Log(log) => partition_key_field
38 .path
39 .as_ref()
40 .and_then(|path| log.get(path).map(|value| value.coerce_to_bytes())),
41 Event::Metric(metric) => partition_key_field
42 .path
43 .as_ref()
44 .and_then(|path| metric.tags().and_then(|tags| tags.get(&path.to_string())))
45 .map(|value| value.to_owned().into()),
46 _ => None,
47 })
48}
49
50fn get_timestamp_millis(event: &Event) -> Option<i64> {
51 match &event {
52 Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(),
53 Event::Metric(metric) => metric.timestamp(),
54 _ => None,
55 }
56 .map(|ts| ts.timestamp_millis())
57}
58
59pub(super) fn get_properties(
60 event: &Event,
61 properties_key: &Option<OptionalTargetPath>,
62) -> Option<HashMap<KeyString, Bytes>> {
63 properties_key.as_ref().and_then(|properties_key| {
64 properties_key.path.as_ref().and_then(|path| {
65 event.maybe_as_log().and_then(|log| {
66 log.get(path).and_then(|properties| match properties {
67 Value::Object(headers_map) => {
68 let mut property_map = HashMap::new();
69 for (key, value) in headers_map {
70 if let Value::Bytes(value_bytes) = value {
71 property_map.insert(key.clone(), value_bytes.clone());
72 } else {
73 emit!(PulsarPropertyExtractionError {
74 property_field: path
75 });
76 }
77 }
78 Some(property_map)
79 }
80 _ => {
81 emit!(PulsarPropertyExtractionError {
82 property_field: path
83 });
84 None
85 }
86 })
87 })
88 })
89 })
90}