vector/sinks/pulsar/
util.rs1use crate::internal_events::PulsarPropertyExtractionError;
2use crate::sinks::pulsar::config::PulsarSinkConfig;
3use crate::sinks::pulsar::sink::PulsarEvent;
4use crate::template::Template;
5use bytes::Bytes;
6use std::collections::HashMap;
7use vector_lib::event::Event;
8use vector_lib::lookup::lookup_v2::OptionalTargetPath;
9use vrl::value::{KeyString, Value};
10
11pub(super) fn make_pulsar_event(
14 topic: &Template,
15 config: &PulsarSinkConfig,
16 event: Event,
17) -> Option<PulsarEvent> {
18 let topic = topic.render_string(&event).ok()?;
19 let key = get_key(&event, &config.partition_key_field);
20 let timestamp_millis = get_timestamp_millis(&event);
21 let properties = get_properties(&event, &config.properties_key);
22 Some(PulsarEvent {
23 event,
24 topic,
25 key,
26 timestamp_millis,
27 properties,
28 })
29}
30
31fn get_key(event: &Event, partition_key_field: &Option<OptionalTargetPath>) -> Option<Bytes> {
32 partition_key_field
33 .as_ref()
34 .and_then(|partition_key_field| match event {
35 Event::Log(log) => partition_key_field
36 .path
37 .as_ref()
38 .and_then(|path| log.get(path).map(|value| value.coerce_to_bytes())),
39 Event::Metric(metric) => partition_key_field
40 .path
41 .as_ref()
42 .and_then(|path| metric.tags().and_then(|tags| tags.get(&path.to_string())))
43 .map(|value| value.to_owned().into()),
44 _ => None,
45 })
46}
47
48fn get_timestamp_millis(event: &Event) -> Option<i64> {
49 match &event {
50 Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(),
51 Event::Metric(metric) => metric.timestamp(),
52 _ => None,
53 }
54 .map(|ts| ts.timestamp_millis())
55}
56
57pub(super) fn get_properties(
58 event: &Event,
59 properties_key: &Option<OptionalTargetPath>,
60) -> Option<HashMap<KeyString, Bytes>> {
61 properties_key.as_ref().and_then(|properties_key| {
62 properties_key.path.as_ref().and_then(|path| {
63 event.maybe_as_log().and_then(|log| {
64 log.get(path).and_then(|properties| match properties {
65 Value::Object(headers_map) => {
66 let mut property_map = HashMap::new();
67 for (key, value) in headers_map {
68 if let Value::Bytes(value_bytes) = value {
69 property_map.insert(key.clone(), value_bytes.clone());
70 } else {
71 emit!(PulsarPropertyExtractionError {
72 property_field: path
73 });
74 }
75 }
76 Some(property_map)
77 }
78 _ => {
79 emit!(PulsarPropertyExtractionError {
80 property_field: path
81 });
82 None
83 }
84 })
85 })
86 })
87 })
88}