vector/sinks/splunk_hec/logs/
sink.rs1use std::{fmt, sync::Arc};
2
3use vector_lib::{
4 config::{LogNamespace, log_schema},
5 lookup::{OwnedValuePath, PathPrefix, event_path, lookup_v2::OptionalTargetPath},
6 schema::meaning,
7};
8use vrl::path::OwnedTargetPath;
9
10use super::request_builder::HecLogsRequestBuilder;
11use crate::{
12 internal_events::{SplunkEventTimestampInvalidType, SplunkEventTimestampMissing},
13 sinks::{
14 prelude::*,
15 splunk_hec::common::{
16 EndpointTarget, INDEX_FIELD, SOURCE_FIELD, SOURCETYPE_FIELD, render_template_string,
17 request::HecRequest,
18 },
19 util::processed_event::ProcessedEvent,
20 },
21};
22
23pub struct HecLogsSink<S> {
27 pub service: S,
28 pub request_builder: HecLogsRequestBuilder,
29 pub batch_settings: BatcherSettings,
30 pub sourcetype: Option<Template>,
31 pub source: Option<Template>,
32 pub index: Option<Template>,
33 pub indexed_fields: Vec<OwnedValuePath>,
34 pub host_key: Option<OptionalTargetPath>,
35 pub timestamp_nanos_key: Option<String>,
36 pub timestamp_key: Option<OptionalTargetPath>,
37 pub endpoint_target: EndpointTarget,
38 pub auto_extract_timestamp: bool,
39}
40
41pub struct HecLogData<'a> {
42 pub sourcetype: Option<&'a Template>,
43 pub source: Option<&'a Template>,
44 pub index: Option<&'a Template>,
45 pub indexed_fields: &'a [OwnedValuePath],
46 pub host_key: Option<OptionalTargetPath>,
47 pub timestamp_nanos_key: Option<&'a String>,
48 pub timestamp_key: Option<OptionalTargetPath>,
49 pub endpoint_target: EndpointTarget,
50 pub auto_extract_timestamp: bool,
51}
52
53impl<S> HecLogsSink<S>
54where
55 S: Service<HecRequest> + Send + 'static,
56 S::Future: Send + 'static,
57 S::Response: DriverResponse + Send + 'static,
58 S::Error: fmt::Debug + Into<crate::Error> + Send,
59{
60 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
61 let data = HecLogData {
62 sourcetype: self.sourcetype.as_ref(),
63 source: self.source.as_ref(),
64 index: self.index.as_ref(),
65 indexed_fields: self.indexed_fields.as_slice(),
66 host_key: self.host_key.clone(),
67 timestamp_nanos_key: self.timestamp_nanos_key.as_ref(),
68 timestamp_key: self.timestamp_key.clone(),
69 endpoint_target: self.endpoint_target,
70 auto_extract_timestamp: self.auto_extract_timestamp,
71 };
72 let batch_settings = self.batch_settings;
73
74 input
75 .map(move |event| process_log(event, &data))
76 .batched_partitioned(
77 if self.endpoint_target == EndpointTarget::Raw {
78 EventPartitioner::new(
81 self.sourcetype.clone(),
82 self.source.clone(),
83 self.index.clone(),
84 self.host_key.clone(),
85 )
86 } else {
87 EventPartitioner::new(None, None, None, None)
88 },
89 batch_settings.timeout,
90 |_| batch_settings.as_byte_size_config(),
91 )
92 .request_builder(
93 default_request_builder_concurrency_limit(),
94 self.request_builder,
95 )
96 .filter_map(|request| async move {
97 match request {
98 Err(e) => {
99 error!("Failed to build HEC Logs request: {:?}.", e);
100 None
101 }
102 Ok(req) => Some(req),
103 }
104 })
105 .into_driver(self.service)
106 .run()
107 .await
108 }
109}
110
111#[async_trait]
112impl<S> StreamSink<Event> for HecLogsSink<S>
113where
114 S: Service<HecRequest> + Send + 'static,
115 S::Future: Send + 'static,
116 S::Response: DriverResponse + Send + 'static,
117 S::Error: fmt::Debug + Into<crate::Error> + Send,
118{
119 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
120 self.run_inner(input).await
121 }
122}
123
124#[derive(Clone, Debug, PartialEq, Hash, Eq)]
125pub(super) struct Partitioned {
126 pub(super) token: Option<Arc<str>>,
127 pub(super) source: Option<String>,
128 pub(super) sourcetype: Option<String>,
129 pub(super) index: Option<String>,
130 pub(super) host: Option<String>,
131}
132
133#[derive(Default)]
134struct EventPartitioner {
135 pub sourcetype: Option<Template>,
136 pub source: Option<Template>,
137 pub index: Option<Template>,
138 pub host_key: Option<OptionalTargetPath>,
139}
140
141impl EventPartitioner {
142 const fn new(
143 sourcetype: Option<Template>,
144 source: Option<Template>,
145 index: Option<Template>,
146 host_key: Option<OptionalTargetPath>,
147 ) -> Self {
148 Self {
149 sourcetype,
150 source,
151 index,
152 host_key,
153 }
154 }
155}
156
157impl Partitioner for EventPartitioner {
158 type Item = HecProcessedEvent;
159 type Key = Option<Partitioned>;
160
161 fn partition(&self, item: &Self::Item) -> Self::Key {
162 let emit_err = |error, field| {
163 emit!(TemplateRenderingError {
164 error,
165 field: Some(field),
166 drop_event: false,
167 })
168 };
169
170 let source = self.source.as_ref().and_then(|source| {
171 source
172 .render_string(&item.event)
173 .map_err(|error| emit_err(error, SOURCE_FIELD))
174 .ok()
175 });
176
177 let sourcetype = self.sourcetype.as_ref().and_then(|sourcetype| {
178 sourcetype
179 .render_string(&item.event)
180 .map_err(|error| emit_err(error, SOURCETYPE_FIELD))
181 .ok()
182 });
183
184 let index = self.index.as_ref().and_then(|index| {
185 index
186 .render_string(&item.event)
187 .map_err(|error| emit_err(error, INDEX_FIELD))
188 .ok()
189 });
190
191 let host = user_or_namespaced_path(
192 &item.event,
193 self.host_key.as_ref(),
194 meaning::HOST,
195 log_schema().host_key_target_path(),
196 )
197 .and_then(|path| item.event.get(&path))
198 .and_then(|value| value.as_str().map(|s| s.to_string()));
199
200 Some(Partitioned {
201 token: item.event.metadata().splunk_hec_token(),
202 source,
203 sourcetype,
204 index,
205 host,
206 })
207 }
208}
209
210#[derive(PartialEq, Default, Clone, Debug)]
211pub struct HecLogsProcessedEventMetadata {
212 pub sourcetype: Option<String>,
213 pub source: Option<String>,
214 pub index: Option<String>,
215 pub host: Option<Value>,
216 pub timestamp: Option<f64>,
217 pub fields: LogEvent,
218 pub endpoint_target: EndpointTarget,
219}
220
221impl ByteSizeOf for HecLogsProcessedEventMetadata {
222 fn allocated_bytes(&self) -> usize {
223 self.sourcetype.allocated_bytes()
224 + self.source.allocated_bytes()
225 + self.index.allocated_bytes()
226 + self.host.allocated_bytes()
227 + self.fields.allocated_bytes()
228 }
229}
230
231pub type HecProcessedEvent = ProcessedEvent<LogEvent, HecLogsProcessedEventMetadata>;
232
233fn user_or_namespaced_path(
240 log: &LogEvent,
241 user_key: Option<&OptionalTargetPath>,
242 semantic: &str,
243 legacy_path: Option<&OwnedTargetPath>,
244) -> Option<OwnedTargetPath> {
245 match user_key {
246 Some(maybe_key) => maybe_key.path.clone(),
247 None => match log.namespace() {
248 LogNamespace::Vector => log.find_key_by_meaning(semantic).cloned(),
249 LogNamespace::Legacy => legacy_path.cloned(),
250 },
251 }
252}
253
254pub fn process_log(event: Event, data: &HecLogData) -> HecProcessedEvent {
255 let mut log = event.into_log();
256
257 let sourcetype = data
258 .sourcetype
259 .and_then(|sourcetype| render_template_string(sourcetype, &log, SOURCETYPE_FIELD));
260
261 let source = data
262 .source
263 .and_then(|source| render_template_string(source, &log, SOURCE_FIELD));
264
265 let index = data
266 .index
267 .and_then(|index| render_template_string(index, &log, INDEX_FIELD));
268
269 let host = user_or_namespaced_path(
270 &log,
271 data.host_key.as_ref(),
272 meaning::HOST,
273 log_schema().host_key_target_path(),
274 )
275 .and_then(|path| log.get(&path))
276 .cloned();
277
278 let timestamp = if EndpointTarget::Event == data.endpoint_target && !data.auto_extract_timestamp
282 {
283 user_or_namespaced_path(
284 &log,
285 data.timestamp_key.as_ref(),
286 meaning::TIMESTAMP,
287 log_schema().timestamp_key_target_path(),
288 )
289 .and_then(|timestamp_path| {
290 match log.remove(×tamp_path) {
291 Some(Value::Timestamp(ts)) => {
292 if let Some(key) = data.timestamp_nanos_key {
294 log.try_insert(event_path!(key), ts.timestamp_subsec_nanos() % 1_000_000);
295 }
296 Some((ts.timestamp_millis() as f64) / 1000f64)
297 }
298 Some(value) => {
299 emit!(SplunkEventTimestampInvalidType {
300 r#type: value.kind_str()
301 });
302 None
303 }
304 None => {
305 emit!(SplunkEventTimestampMissing {});
306 None
307 }
308 }
309 })
310 } else {
311 None
312 };
313
314 let fields = data
315 .indexed_fields
316 .iter()
317 .filter_map(|field| {
318 log.get((PathPrefix::Event, field))
319 .map(|value| (field.to_string(), value.clone()))
320 })
321 .collect::<LogEvent>();
322
323 let metadata = HecLogsProcessedEventMetadata {
324 sourcetype,
325 source,
326 index,
327 host,
328 timestamp,
329 fields,
330 endpoint_target: data.endpoint_target,
331 };
332
333 ProcessedEvent {
334 event: log,
335 metadata,
336 }
337}
338
339impl EventCount for HecProcessedEvent {
340 fn event_count(&self) -> usize {
341 1
343 }
344}