vector/sinks/splunk_hec/logs/
encoder.rs1use std::borrow::Cow;
2
3use bytes::BytesMut;
4use serde::Serialize;
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7 EstimatedJsonEncodedSizeOf, config::telemetry, request_metadata::GroupedCountByteSize,
8};
9
10use super::sink::HecProcessedEvent;
11use crate::{
12 codecs::Transformer,
13 event::{Event, LogEvent},
14 internal_events::SplunkEventEncodeError,
15 sinks::{splunk_hec::common::EndpointTarget, util::encoding::Encoder},
16};
17
18#[derive(Serialize, Debug)]
19pub enum HecEvent<'a> {
20 #[serde(rename = "event")]
21 Json(serde_json::Value),
22 #[serde(rename = "event")]
23 Text(Cow<'a, str>),
24}
25
26#[derive(Serialize, Debug)]
27pub struct HecData<'a> {
28 #[serde(flatten)]
29 pub event: HecEvent<'a>,
30 pub fields: LogEvent,
31 #[serde(skip_serializing_if = "Option::is_none")]
32 pub time: Option<f64>,
33 #[serde(skip_serializing_if = "Option::is_none")]
34 pub host: Option<String>,
35 #[serde(skip_serializing_if = "Option::is_none")]
36 pub index: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 pub source: Option<String>,
39 #[serde(skip_serializing_if = "Option::is_none")]
40 pub sourcetype: Option<String>,
41}
42
43impl<'a> HecData<'a> {
44 pub const fn new(event: HecEvent<'a>, fields: LogEvent, time: Option<f64>) -> Self {
45 Self {
46 event,
47 fields,
48 time,
49 host: None,
50 index: None,
51 source: None,
52 sourcetype: None,
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
58pub struct HecLogsEncoder {
59 pub transformer: Transformer,
60 pub encoder: crate::codecs::Encoder<()>,
61 pub auto_extract_timestamp: bool,
62}
63
64impl Encoder<Vec<HecProcessedEvent>> for HecLogsEncoder {
65 fn encode_input(
66 &self,
67 input: Vec<HecProcessedEvent>,
68 writer: &mut dyn std::io::Write,
69 ) -> std::io::Result<(usize, GroupedCountByteSize)> {
70 let mut encoder = self.encoder.clone();
71 let mut byte_size = telemetry().create_request_count_byte_size();
72 let encoded_input: Vec<u8> = input
73 .into_iter()
74 .filter_map(|processed_event| {
75 let mut event = Event::from(processed_event.event);
76 let metadata = processed_event.metadata;
77 self.transformer.transform(&mut event);
78
79 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
80
81 let mut bytes = BytesMut::new();
82
83 match metadata.endpoint_target {
84 EndpointTarget::Raw => {
85 encoder.encode(event, &mut bytes).ok()?;
86 Some(bytes.to_vec())
87 }
88 EndpointTarget::Event => {
89 let serializer = encoder.serializer();
90 let hec_event = if serializer.supports_json() {
91 HecEvent::Json(
92 serializer
93 .to_json_value(event)
94 .map_err(|error| emit!(SplunkEventEncodeError { error }))
95 .ok()?,
96 )
97 } else {
98 encoder.encode(event, &mut bytes).ok()?;
99 HecEvent::Text(String::from_utf8_lossy(&bytes))
100 };
101
102 let mut hec_data = HecData::new(
103 hec_event,
104 metadata.fields,
105 if self.auto_extract_timestamp {
108 None
109 } else {
110 metadata.timestamp
111 },
112 );
113 hec_data.host = metadata
114 .host
115 .map(|host| host.to_string_lossy().into_owned());
116 hec_data.index = metadata.index;
117 hec_data.source = metadata.source;
118 hec_data.sourcetype = metadata.sourcetype;
119
120 match serde_json::to_vec(&hec_data) {
121 Ok(value) => Some(value),
122 Err(error) => {
123 emit!(SplunkEventEncodeError {
124 error: error.into()
125 });
126 None
127 }
128 }
129 }
130 }
131 })
132 .flatten()
133 .collect();
134
135 let encoded_size = encoded_input.len();
136 writer.write_all(encoded_input.as_slice())?;
137 Ok((encoded_size, byte_size))
138 }
139}