vector/sinks/pulsar/
util.rs

1use std::collections::HashMap;
2
3use bytes::Bytes;
4use vector_lib::{event::Event, lookup::lookup_v2::OptionalTargetPath};
5use vrl::value::{KeyString, Value};
6
7use crate::{
8    internal_events::PulsarPropertyExtractionError,
9    sinks::pulsar::{config::PulsarSinkConfig, sink::PulsarEvent},
10    template::Template,
11};
12
13/// Transforms an event into a Pulsar event by rendering the required template fields.
14/// Returns None if there is an error whilst rendering.
15pub(super) fn make_pulsar_event(
16    topic: &Template,
17    config: &PulsarSinkConfig,
18    event: Event,
19) -> Option<PulsarEvent> {
20    let topic = topic.render_string(&event).ok()?;
21    let key = get_key(&event, &config.partition_key_field);
22    let timestamp_millis = get_timestamp_millis(&event);
23    let properties = get_properties(&event, &config.properties_key);
24    Some(PulsarEvent {
25        event,
26        topic,
27        key,
28        timestamp_millis,
29        properties,
30    })
31}
32
33fn get_key(event: &Event, partition_key_field: &Option<OptionalTargetPath>) -> Option<Bytes> {
34    partition_key_field
35        .as_ref()
36        .and_then(|partition_key_field| match event {
37            Event::Log(log) => partition_key_field
38                .path
39                .as_ref()
40                .and_then(|path| log.get(path).map(|value| value.coerce_to_bytes())),
41            Event::Metric(metric) => partition_key_field
42                .path
43                .as_ref()
44                .and_then(|path| metric.tags().and_then(|tags| tags.get(&path.to_string())))
45                .map(|value| value.to_owned().into()),
46            _ => None,
47        })
48}
49
50fn get_timestamp_millis(event: &Event) -> Option<i64> {
51    match &event {
52        Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(),
53        Event::Metric(metric) => metric.timestamp(),
54        _ => None,
55    }
56    .map(|ts| ts.timestamp_millis())
57}
58
59pub(super) fn get_properties(
60    event: &Event,
61    properties_key: &Option<OptionalTargetPath>,
62) -> Option<HashMap<KeyString, Bytes>> {
63    properties_key.as_ref().and_then(|properties_key| {
64        properties_key.path.as_ref().and_then(|path| {
65            event.maybe_as_log().and_then(|log| {
66                log.get(path).and_then(|properties| match properties {
67                    Value::Object(headers_map) => {
68                        let mut property_map = HashMap::new();
69                        for (key, value) in headers_map {
70                            if let Value::Bytes(value_bytes) = value {
71                                property_map.insert(key.clone(), value_bytes.clone());
72                            } else {
73                                emit!(PulsarPropertyExtractionError {
74                                    property_field: path
75                                });
76                            }
77                        }
78                        Some(property_map)
79                    }
80                    _ => {
81                        emit!(PulsarPropertyExtractionError {
82                            property_field: path
83                        });
84                        None
85                    }
86                })
87            })
88        })
89    })
90}