vector/sinks/splunk_hec/logs/
encoder.rs

1use std::borrow::Cow;
2
3use bytes::BytesMut;
4use serde::Serialize;
5use tokio_util::codec::Encoder as _;
6use vector_lib::{
7    EstimatedJsonEncodedSizeOf, config::telemetry, request_metadata::GroupedCountByteSize,
8};
9
10use super::sink::HecProcessedEvent;
11use crate::{
12    codecs::Transformer,
13    event::{Event, LogEvent},
14    internal_events::SplunkEventEncodeError,
15    sinks::{splunk_hec::common::EndpointTarget, util::encoding::Encoder},
16};
17
18#[derive(Serialize, Debug)]
19pub enum HecEvent<'a> {
20    #[serde(rename = "event")]
21    Json(serde_json::Value),
22    #[serde(rename = "event")]
23    Text(Cow<'a, str>),
24}
25
26#[derive(Serialize, Debug)]
27pub struct HecData<'a> {
28    #[serde(flatten)]
29    pub event: HecEvent<'a>,
30    pub fields: LogEvent,
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub time: Option<f64>,
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub host: Option<String>,
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub index: Option<String>,
37    #[serde(skip_serializing_if = "Option::is_none")]
38    pub source: Option<String>,
39    #[serde(skip_serializing_if = "Option::is_none")]
40    pub sourcetype: Option<String>,
41}
42
43impl<'a> HecData<'a> {
44    pub const fn new(event: HecEvent<'a>, fields: LogEvent, time: Option<f64>) -> Self {
45        Self {
46            event,
47            fields,
48            time,
49            host: None,
50            index: None,
51            source: None,
52            sourcetype: None,
53        }
54    }
55}
56
57#[derive(Debug, Clone)]
58pub struct HecLogsEncoder {
59    pub transformer: Transformer,
60    pub encoder: crate::codecs::Encoder<()>,
61    pub auto_extract_timestamp: bool,
62}
63
64impl Encoder<Vec<HecProcessedEvent>> for HecLogsEncoder {
65    fn encode_input(
66        &self,
67        input: Vec<HecProcessedEvent>,
68        writer: &mut dyn std::io::Write,
69    ) -> std::io::Result<(usize, GroupedCountByteSize)> {
70        let mut encoder = self.encoder.clone();
71        let mut byte_size = telemetry().create_request_count_byte_size();
72        let encoded_input: Vec<u8> = input
73            .into_iter()
74            .filter_map(|processed_event| {
75                let mut event = Event::from(processed_event.event);
76                let metadata = processed_event.metadata;
77                self.transformer.transform(&mut event);
78
79                byte_size.add_event(&event, event.estimated_json_encoded_size_of());
80
81                let mut bytes = BytesMut::new();
82
83                match metadata.endpoint_target {
84                    EndpointTarget::Raw => {
85                        encoder.encode(event, &mut bytes).ok()?;
86                        Some(bytes.to_vec())
87                    }
88                    EndpointTarget::Event => {
89                        let serializer = encoder.serializer();
90                        let hec_event = if serializer.supports_json() {
91                            HecEvent::Json(
92                                serializer
93                                    .to_json_value(event)
94                                    .map_err(|error| emit!(SplunkEventEncodeError { error }))
95                                    .ok()?,
96                            )
97                        } else {
98                            encoder.encode(event, &mut bytes).ok()?;
99                            HecEvent::Text(String::from_utf8_lossy(&bytes))
100                        };
101
102                        let mut hec_data = HecData::new(
103                            hec_event,
104                            metadata.fields,
105                            // If auto_extract_timestamp is set we don't want to pass the timestamp in the
106                            // event since we want Splunk to extract it from the message.
107                            if self.auto_extract_timestamp {
108                                None
109                            } else {
110                                metadata.timestamp
111                            },
112                        );
113                        hec_data.host = metadata
114                            .host
115                            .map(|host| host.to_string_lossy().into_owned());
116                        hec_data.index = metadata.index;
117                        hec_data.source = metadata.source;
118                        hec_data.sourcetype = metadata.sourcetype;
119
120                        match serde_json::to_vec(&hec_data) {
121                            Ok(value) => Some(value),
122                            Err(error) => {
123                                emit!(SplunkEventEncodeError {
124                                    error: error.into()
125                                });
126                                None
127                            }
128                        }
129                    }
130                }
131            })
132            .flatten()
133            .collect();
134
135        let encoded_size = encoded_input.len();
136        writer.write_all(encoded_input.as_slice())?;
137        Ok((encoded_size, byte_size))
138    }
139}