vector/sinks/honeycomb/
encoder.rs

1//! Encoding for the `honeycomb` sink.
2
3use bytes::Bytes;
4use chrono::{SecondsFormat, Utc};
5use serde_json::{json, to_vec};
6use std::io;
7
8use crate::sinks::{
9    prelude::*,
10    util::encoding::{write_all, Encoder as SinkEncoder},
11};
12
13pub(super) struct HoneycombEncoder {
14    pub(super) transformer: Transformer,
15}
16
17impl SinkEncoder<Vec<Event>> for HoneycombEncoder {
18    fn encode_input(
19        &self,
20        events: Vec<Event>,
21        writer: &mut dyn io::Write,
22    ) -> io::Result<(usize, GroupedCountByteSize)> {
23        let mut byte_size = telemetry().create_request_count_byte_size();
24        let n_events = events.len();
25        let mut json_events: Vec<serde_json::Value> = Vec::with_capacity(n_events);
26
27        for mut event in events {
28            self.transformer.transform(&mut event);
29
30            byte_size.add_event(&event, event.estimated_json_encoded_size_of());
31
32            let log = event.as_mut_log();
33
34            let timestamp = match log.remove_timestamp() {
35                Some(Value::Timestamp(ts)) => ts,
36                _ => Utc::now(),
37            };
38
39            let data = json!({
40                "time": timestamp.to_rfc3339_opts(SecondsFormat::Nanos, true),
41                "data": log.convert_to_fields(),
42            });
43
44            json_events.push(data);
45        }
46
47        let body = Bytes::from(to_vec(&serde_json::Value::Array(json_events))?);
48
49        write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size))
50    }
51}