vector/sinks/splunk_hec/logs/
encoder.rs

1use std::borrow::Cow;
2
3use bytes::BytesMut;
4use serde::Serialize;
5use tokio_util::codec::Encoder as _;
6use vector_lib::request_metadata::GroupedCountByteSize;
7use vector_lib::{config::telemetry, EstimatedJsonEncodedSizeOf};
8
9use super::sink::HecProcessedEvent;
10use crate::{
11    codecs::Transformer,
12    event::{Event, LogEvent},
13    internal_events::SplunkEventEncodeError,
14    sinks::{splunk_hec::common::EndpointTarget, util::encoding::Encoder},
15};
16
17#[derive(Serialize, Debug)]
18pub enum HecEvent<'a> {
19    #[serde(rename = "event")]
20    Json(serde_json::Value),
21    #[serde(rename = "event")]
22    Text(Cow<'a, str>),
23}
24
25#[derive(Serialize, Debug)]
26pub struct HecData<'a> {
27    #[serde(flatten)]
28    pub event: HecEvent<'a>,
29    pub fields: LogEvent,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub time: Option<f64>,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub host: Option<String>,
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub index: Option<String>,
36    #[serde(skip_serializing_if = "Option::is_none")]
37    pub source: Option<String>,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub sourcetype: Option<String>,
40}
41
42impl<'a> HecData<'a> {
43    pub const fn new(event: HecEvent<'a>, fields: LogEvent, time: Option<f64>) -> Self {
44        Self {
45            event,
46            fields,
47            time,
48            host: None,
49            index: None,
50            source: None,
51            sourcetype: None,
52        }
53    }
54}
55
56#[derive(Debug, Clone)]
57pub struct HecLogsEncoder {
58    pub transformer: Transformer,
59    pub encoder: crate::codecs::Encoder<()>,
60    pub auto_extract_timestamp: bool,
61}
62
63impl Encoder<Vec<HecProcessedEvent>> for HecLogsEncoder {
64    fn encode_input(
65        &self,
66        input: Vec<HecProcessedEvent>,
67        writer: &mut dyn std::io::Write,
68    ) -> std::io::Result<(usize, GroupedCountByteSize)> {
69        let mut encoder = self.encoder.clone();
70        let mut byte_size = telemetry().create_request_count_byte_size();
71        let encoded_input: Vec<u8> = input
72            .into_iter()
73            .filter_map(|processed_event| {
74                let mut event = Event::from(processed_event.event);
75                let metadata = processed_event.metadata;
76                self.transformer.transform(&mut event);
77
78                byte_size.add_event(&event, event.estimated_json_encoded_size_of());
79
80                let mut bytes = BytesMut::new();
81
82                match metadata.endpoint_target {
83                    EndpointTarget::Raw => {
84                        encoder.encode(event, &mut bytes).ok()?;
85                        Some(bytes.to_vec())
86                    }
87                    EndpointTarget::Event => {
88                        let serializer = encoder.serializer();
89                        let hec_event = if serializer.supports_json() {
90                            HecEvent::Json(
91                                serializer
92                                    .to_json_value(event)
93                                    .map_err(|error| emit!(SplunkEventEncodeError { error }))
94                                    .ok()?,
95                            )
96                        } else {
97                            encoder.encode(event, &mut bytes).ok()?;
98                            HecEvent::Text(String::from_utf8_lossy(&bytes))
99                        };
100
101                        let mut hec_data = HecData::new(
102                            hec_event,
103                            metadata.fields,
104                            // If auto_extract_timestamp is set we don't want to pass the timestamp in the
105                            // event since we want Splunk to extract it from the message.
106                            if self.auto_extract_timestamp {
107                                None
108                            } else {
109                                metadata.timestamp
110                            },
111                        );
112                        hec_data.host = metadata
113                            .host
114                            .map(|host| host.to_string_lossy().into_owned());
115                        hec_data.index = metadata.index;
116                        hec_data.source = metadata.source;
117                        hec_data.sourcetype = metadata.sourcetype;
118
119                        match serde_json::to_vec(&hec_data) {
120                            Ok(value) => Some(value),
121                            Err(error) => {
122                                emit!(SplunkEventEncodeError {
123                                    error: error.into()
124                                });
125                                None
126                            }
127                        }
128                    }
129                }
130            })
131            .flatten()
132            .collect();
133
134        let encoded_size = encoded_input.len();
135        writer.write_all(encoded_input.as_slice())?;
136        Ok((encoded_size, byte_size))
137    }
138}