vector/sinks/splunk_hec/logs/
encoder.rs1use std::borrow::Cow;
2
3use bytes::BytesMut;
4use serde::Serialize;
5use tokio_util::codec::Encoder as _;
6use vector_lib::request_metadata::GroupedCountByteSize;
7use vector_lib::{config::telemetry, EstimatedJsonEncodedSizeOf};
8
9use super::sink::HecProcessedEvent;
10use crate::{
11 codecs::Transformer,
12 event::{Event, LogEvent},
13 internal_events::SplunkEventEncodeError,
14 sinks::{splunk_hec::common::EndpointTarget, util::encoding::Encoder},
15};
16
17#[derive(Serialize, Debug)]
18pub enum HecEvent<'a> {
19 #[serde(rename = "event")]
20 Json(serde_json::Value),
21 #[serde(rename = "event")]
22 Text(Cow<'a, str>),
23}
24
25#[derive(Serialize, Debug)]
26pub struct HecData<'a> {
27 #[serde(flatten)]
28 pub event: HecEvent<'a>,
29 pub fields: LogEvent,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub time: Option<f64>,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 pub host: Option<String>,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub index: Option<String>,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 pub source: Option<String>,
38 #[serde(skip_serializing_if = "Option::is_none")]
39 pub sourcetype: Option<String>,
40}
41
42impl<'a> HecData<'a> {
43 pub const fn new(event: HecEvent<'a>, fields: LogEvent, time: Option<f64>) -> Self {
44 Self {
45 event,
46 fields,
47 time,
48 host: None,
49 index: None,
50 source: None,
51 sourcetype: None,
52 }
53 }
54}
55
56#[derive(Debug, Clone)]
57pub struct HecLogsEncoder {
58 pub transformer: Transformer,
59 pub encoder: crate::codecs::Encoder<()>,
60 pub auto_extract_timestamp: bool,
61}
62
63impl Encoder<Vec<HecProcessedEvent>> for HecLogsEncoder {
64 fn encode_input(
65 &self,
66 input: Vec<HecProcessedEvent>,
67 writer: &mut dyn std::io::Write,
68 ) -> std::io::Result<(usize, GroupedCountByteSize)> {
69 let mut encoder = self.encoder.clone();
70 let mut byte_size = telemetry().create_request_count_byte_size();
71 let encoded_input: Vec<u8> = input
72 .into_iter()
73 .filter_map(|processed_event| {
74 let mut event = Event::from(processed_event.event);
75 let metadata = processed_event.metadata;
76 self.transformer.transform(&mut event);
77
78 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
79
80 let mut bytes = BytesMut::new();
81
82 match metadata.endpoint_target {
83 EndpointTarget::Raw => {
84 encoder.encode(event, &mut bytes).ok()?;
85 Some(bytes.to_vec())
86 }
87 EndpointTarget::Event => {
88 let serializer = encoder.serializer();
89 let hec_event = if serializer.supports_json() {
90 HecEvent::Json(
91 serializer
92 .to_json_value(event)
93 .map_err(|error| emit!(SplunkEventEncodeError { error }))
94 .ok()?,
95 )
96 } else {
97 encoder.encode(event, &mut bytes).ok()?;
98 HecEvent::Text(String::from_utf8_lossy(&bytes))
99 };
100
101 let mut hec_data = HecData::new(
102 hec_event,
103 metadata.fields,
104 if self.auto_extract_timestamp {
107 None
108 } else {
109 metadata.timestamp
110 },
111 );
112 hec_data.host = metadata
113 .host
114 .map(|host| host.to_string_lossy().into_owned());
115 hec_data.index = metadata.index;
116 hec_data.source = metadata.source;
117 hec_data.sourcetype = metadata.sourcetype;
118
119 match serde_json::to_vec(&hec_data) {
120 Ok(value) => Some(value),
121 Err(error) => {
122 emit!(SplunkEventEncodeError {
123 error: error.into()
124 });
125 None
126 }
127 }
128 }
129 }
130 })
131 .flatten()
132 .collect();
133
134 let encoded_size = encoded_input.len();
135 writer.write_all(encoded_input.as_slice())?;
136 Ok((encoded_size, byte_size))
137 }
138}