1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
use bytes::Bytes;
use rdkafka::message::{Header, OwnedHeaders};
use vector_lib::lookup::OwnedTargetPath;

use crate::{
    internal_events::KafkaHeaderExtractionError,
    sinks::{
        kafka::service::{KafkaRequest, KafkaRequestMetadata},
        prelude::*,
    },
};

pub struct KafkaRequestBuilder {
    pub key_field: Option<OwnedTargetPath>,
    pub headers_key: Option<OwnedTargetPath>,
    pub encoder: (Transformer, Encoder<()>),
}

impl RequestBuilder<(String, Event)> for KafkaRequestBuilder {
    type Metadata = KafkaRequestMetadata;
    type Events = Event;
    type Encoder = (Transformer, Encoder<()>);
    type Payload = Bytes;
    type Request = KafkaRequest;
    type Error = std::io::Error;

    fn compression(&self) -> Compression {
        Compression::None
    }

    fn encoder(&self) -> &Self::Encoder {
        &self.encoder
    }

    fn split_input(
        &self,
        input: (String, Event),
    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
        let (topic, mut event) = input;
        let builder = RequestMetadataBuilder::from_event(&event);

        let metadata = KafkaRequestMetadata {
            finalizers: event.take_finalizers(),
            key: get_key(&event, self.key_field.as_ref()),
            timestamp_millis: get_timestamp_millis(&event),
            headers: get_headers(&event, self.headers_key.as_ref()),
            topic,
        };

        (metadata, builder, event)
    }

    fn build_request(
        &self,
        metadata: Self::Metadata,
        request_metadata: RequestMetadata,
        payload: EncodeResult<Self::Payload>,
    ) -> Self::Request {
        KafkaRequest {
            body: payload.into_payload(),
            metadata,
            request_metadata,
        }
    }
}

fn get_key(event: &Event, key_field: Option<&OwnedTargetPath>) -> Option<Bytes> {
    key_field.and_then(|key_field| match event {
        Event::Log(log) => log.get(key_field).map(|value| value.coerce_to_bytes()),
        Event::Metric(metric) => metric
            .tags()
            .and_then(|tags| tags.get(key_field.to_string().as_str()))
            .map(|value| value.to_owned().into()),
        _ => None,
    })
}

fn get_timestamp_millis(event: &Event) -> Option<i64> {
    match &event {
        Event::Log(log) => log.get_timestamp().and_then(|v| v.as_timestamp()).copied(),
        Event::Metric(metric) => metric.timestamp(),
        _ => None,
    }
    .map(|ts| ts.timestamp_millis())
}

fn get_headers(event: &Event, headers_key: Option<&OwnedTargetPath>) -> Option<OwnedHeaders> {
    headers_key.and_then(|headers_key| {
        if let Event::Log(log) = event {
            if let Some(headers) = log.get(headers_key) {
                match headers {
                    Value::Object(headers_map) => {
                        let mut owned_headers = OwnedHeaders::new_with_capacity(headers_map.len());
                        for (key, value) in headers_map {
                            if let Value::Bytes(value_bytes) = value {
                                owned_headers = owned_headers.insert(Header {
                                    key,
                                    value: Some(value_bytes.as_ref()),
                                });
                            } else {
                                emit!(KafkaHeaderExtractionError {
                                    header_field: headers_key
                                });
                            }
                        }
                        return Some(owned_headers);
                    }
                    _ => {
                        emit!(KafkaHeaderExtractionError {
                            header_field: headers_key
                        });
                    }
                }
            }
        }
        None
    })
}

#[cfg(test)]
mod tests {
    use bytes::Bytes;
    use rdkafka::message::Headers;

    use super::*;
    use crate::event::{LogEvent, ObjectMap};

    #[test]
    fn kafka_get_headers() {
        let headers_key = OwnedTargetPath::try_from("headers".to_string()).unwrap();
        let mut header_values = ObjectMap::new();
        header_values.insert("a-key".into(), Value::Bytes(Bytes::from("a-value")));
        header_values.insert("b-key".into(), Value::Bytes(Bytes::from("b-value")));

        let mut event = Event::Log(LogEvent::from("hello"));
        event.as_mut_log().insert(&headers_key, header_values);

        let headers = get_headers(&event, Some(&headers_key)).unwrap();
        assert_eq!(headers.get(0).key, "a-key");
        assert_eq!(headers.get(0).value.unwrap(), "a-value".as_bytes());
        assert_eq!(headers.get(1).key, "b-key");
        assert_eq!(headers.get(1).value.unwrap(), "b-value".as_bytes());
    }
}