vector/sinks/honeycomb/
encoder.rs1use std::io;
4
5use bytes::Bytes;
6use chrono::{SecondsFormat, Utc};
7use serde_json::{json, to_vec};
8
9use crate::sinks::{
10 prelude::*,
11 util::encoding::{Encoder as SinkEncoder, write_all},
12};
13
14pub(super) struct HoneycombEncoder {
15 pub(super) transformer: Transformer,
16}
17
18impl SinkEncoder<Vec<Event>> for HoneycombEncoder {
19 fn encode_input(
20 &self,
21 events: Vec<Event>,
22 writer: &mut dyn io::Write,
23 ) -> io::Result<(usize, GroupedCountByteSize)> {
24 let mut byte_size = telemetry().create_request_count_byte_size();
25 let n_events = events.len();
26 let mut json_events: Vec<serde_json::Value> = Vec::with_capacity(n_events);
27
28 for mut event in events {
29 self.transformer.transform(&mut event);
30
31 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
32
33 let log = event.as_mut_log();
34
35 let timestamp = match log.remove_timestamp() {
36 Some(Value::Timestamp(ts)) => ts,
37 _ => Utc::now(),
38 };
39
40 let data = json!({
41 "time": timestamp.to_rfc3339_opts(SecondsFormat::Nanos, true),
42 "data": log.convert_to_fields(),
43 });
44
45 json_events.push(data);
46 }
47
48 let body = Bytes::from(to_vec(&serde_json::Value::Array(json_events))?);
49
50 write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size))
51 }
52}