vector/sinks/pulsar/
util.rs

1use crate::internal_events::PulsarPropertyExtractionError;
2use crate::sinks::pulsar::config::PulsarSinkConfig;
3use crate::sinks::pulsar::sink::PulsarEvent;
4use crate::template::Template;
5use bytes::Bytes;
6use std::collections::HashMap;
7use vector_lib::event::Event;
8use vector_lib::lookup::lookup_v2::OptionalTargetPath;
9use vrl::value::{KeyString, Value};
10
11/// Transforms an event into a Pulsar event by rendering the required template fields.
12/// Returns None if there is an error whilst rendering.
13pub(super) fn make_pulsar_event(
14    topic: &Template,
15    config: &PulsarSinkConfig,
16    event: Event,
17) -> Option<PulsarEvent> {
18    let topic = topic.render_string(&event).ok()?;
19    let key = get_key(&event, &config.partition_key_field);
20    let timestamp_millis = get_timestamp_millis(&event);
21    let properties = get_properties(&event, &config.properties_key);
22    Some(PulsarEvent {
23        event,
24        topic,
25        key,
26        timestamp_millis,
27        properties,
28    })
29}
30
31fn get_key(event: &Event, partition_key_field: &Option<OptionalTargetPath>) -> Option<Bytes> {
32    partition_key_field
33        .as_ref()
34        .and_then(|partition_key_field| match event {
35            Event::Log(log) => partition_key_field
36                .path
37                .as_ref()
38                .and_then(|path| log.get(path).map(|value| value.coerce_to_bytes())),
39            Event::Metric(metric) => partition_key_field
40                .path
41                .as_ref()
42                .and_then(|path| metric.tags().and_then(|tags| tags.get(&path.to_string())))
43                .map(|value| value.to_owned().into()),
44            _ => None,
45        })
46}
47
48fn get_timestamp_millis(event: &Event) -> Option<i64> {
49    match &event {
50        Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(),
51        Event::Metric(metric) => metric.timestamp(),
52        _ => None,
53    }
54    .map(|ts| ts.timestamp_millis())
55}
56
57pub(super) fn get_properties(
58    event: &Event,
59    properties_key: &Option<OptionalTargetPath>,
60) -> Option<HashMap<KeyString, Bytes>> {
61    properties_key.as_ref().and_then(|properties_key| {
62        properties_key.path.as_ref().and_then(|path| {
63            event.maybe_as_log().and_then(|log| {
64                log.get(path).and_then(|properties| match properties {
65                    Value::Object(headers_map) => {
66                        let mut property_map = HashMap::new();
67                        for (key, value) in headers_map {
68                            if let Value::Bytes(value_bytes) = value {
69                                property_map.insert(key.clone(), value_bytes.clone());
70                            } else {
71                                emit!(PulsarPropertyExtractionError {
72                                    property_field: path
73                                });
74                            }
75                        }
76                        Some(property_map)
77                    }
78                    _ => {
79                        emit!(PulsarPropertyExtractionError {
80                            property_field: path
81                        });
82                        None
83                    }
84                })
85            })
86        })
87    })
88}