vector/sinks/splunk_hec/logs/
sink.rs1use std::{fmt, sync::Arc};
2
3use super::request_builder::HecLogsRequestBuilder;
4use crate::{
5 internal_events::SplunkEventTimestampInvalidType,
6 internal_events::SplunkEventTimestampMissing,
7 sinks::{
8 prelude::*,
9 splunk_hec::common::{
10 render_template_string, request::HecRequest, EndpointTarget, INDEX_FIELD,
11 SOURCETYPE_FIELD, SOURCE_FIELD,
12 },
13 util::processed_event::ProcessedEvent,
14 },
15};
16use vector_lib::{
17 config::{log_schema, LogNamespace},
18 lookup::{event_path, lookup_v2::OptionalTargetPath, OwnedValuePath, PathPrefix},
19 schema::meaning,
20};
21use vrl::path::OwnedTargetPath;
22
23pub struct HecLogsSink<S> {
27 pub service: S,
28 pub request_builder: HecLogsRequestBuilder,
29 pub batch_settings: BatcherSettings,
30 pub sourcetype: Option<Template>,
31 pub source: Option<Template>,
32 pub index: Option<Template>,
33 pub indexed_fields: Vec<OwnedValuePath>,
34 pub host_key: Option<OptionalTargetPath>,
35 pub timestamp_nanos_key: Option<String>,
36 pub timestamp_key: Option<OptionalTargetPath>,
37 pub endpoint_target: EndpointTarget,
38 pub auto_extract_timestamp: bool,
39}
40
41pub struct HecLogData<'a> {
42 pub sourcetype: Option<&'a Template>,
43 pub source: Option<&'a Template>,
44 pub index: Option<&'a Template>,
45 pub indexed_fields: &'a [OwnedValuePath],
46 pub host_key: Option<OptionalTargetPath>,
47 pub timestamp_nanos_key: Option<&'a String>,
48 pub timestamp_key: Option<OptionalTargetPath>,
49 pub endpoint_target: EndpointTarget,
50 pub auto_extract_timestamp: bool,
51}
52
53impl<S> HecLogsSink<S>
54where
55 S: Service<HecRequest> + Send + 'static,
56 S::Future: Send + 'static,
57 S::Response: DriverResponse + Send + 'static,
58 S::Error: fmt::Debug + Into<crate::Error> + Send,
59{
60 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
61 let data = HecLogData {
62 sourcetype: self.sourcetype.as_ref(),
63 source: self.source.as_ref(),
64 index: self.index.as_ref(),
65 indexed_fields: self.indexed_fields.as_slice(),
66 host_key: self.host_key.clone(),
67 timestamp_nanos_key: self.timestamp_nanos_key.as_ref(),
68 timestamp_key: self.timestamp_key.clone(),
69 endpoint_target: self.endpoint_target,
70 auto_extract_timestamp: self.auto_extract_timestamp,
71 };
72 let batch_settings = self.batch_settings;
73
74 input
75 .map(move |event| process_log(event, &data))
76 .batched_partitioned(
77 if self.endpoint_target == EndpointTarget::Raw {
78 EventPartitioner::new(
81 self.sourcetype.clone(),
82 self.source.clone(),
83 self.index.clone(),
84 self.host_key.clone(),
85 )
86 } else {
87 EventPartitioner::new(None, None, None, None)
88 },
89 || batch_settings.as_byte_size_config(),
90 )
91 .request_builder(
92 default_request_builder_concurrency_limit(),
93 self.request_builder,
94 )
95 .filter_map(|request| async move {
96 match request {
97 Err(e) => {
98 error!("Failed to build HEC Logs request: {:?}.", e);
99 None
100 }
101 Ok(req) => Some(req),
102 }
103 })
104 .into_driver(self.service)
105 .run()
106 .await
107 }
108}
109
110#[async_trait]
111impl<S> StreamSink<Event> for HecLogsSink<S>
112where
113 S: Service<HecRequest> + Send + 'static,
114 S::Future: Send + 'static,
115 S::Response: DriverResponse + Send + 'static,
116 S::Error: fmt::Debug + Into<crate::Error> + Send,
117{
118 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
119 self.run_inner(input).await
120 }
121}
122
123#[derive(Clone, Debug, PartialEq, Hash, Eq)]
124pub(super) struct Partitioned {
125 pub(super) token: Option<Arc<str>>,
126 pub(super) source: Option<String>,
127 pub(super) sourcetype: Option<String>,
128 pub(super) index: Option<String>,
129 pub(super) host: Option<String>,
130}
131
132#[derive(Default)]
133struct EventPartitioner {
134 pub sourcetype: Option<Template>,
135 pub source: Option<Template>,
136 pub index: Option<Template>,
137 pub host_key: Option<OptionalTargetPath>,
138}
139
140impl EventPartitioner {
141 const fn new(
142 sourcetype: Option<Template>,
143 source: Option<Template>,
144 index: Option<Template>,
145 host_key: Option<OptionalTargetPath>,
146 ) -> Self {
147 Self {
148 sourcetype,
149 source,
150 index,
151 host_key,
152 }
153 }
154}
155
156impl Partitioner for EventPartitioner {
157 type Item = HecProcessedEvent;
158 type Key = Option<Partitioned>;
159
160 fn partition(&self, item: &Self::Item) -> Self::Key {
161 let emit_err = |error, field| {
162 emit!(TemplateRenderingError {
163 error,
164 field: Some(field),
165 drop_event: false,
166 })
167 };
168
169 let source = self.source.as_ref().and_then(|source| {
170 source
171 .render_string(&item.event)
172 .map_err(|error| emit_err(error, SOURCE_FIELD))
173 .ok()
174 });
175
176 let sourcetype = self.sourcetype.as_ref().and_then(|sourcetype| {
177 sourcetype
178 .render_string(&item.event)
179 .map_err(|error| emit_err(error, SOURCETYPE_FIELD))
180 .ok()
181 });
182
183 let index = self.index.as_ref().and_then(|index| {
184 index
185 .render_string(&item.event)
186 .map_err(|error| emit_err(error, INDEX_FIELD))
187 .ok()
188 });
189
190 let host = user_or_namespaced_path(
191 &item.event,
192 self.host_key.as_ref(),
193 meaning::HOST,
194 log_schema().host_key_target_path(),
195 )
196 .and_then(|path| item.event.get(&path))
197 .and_then(|value| value.as_str().map(|s| s.to_string()));
198
199 Some(Partitioned {
200 token: item.event.metadata().splunk_hec_token(),
201 source,
202 sourcetype,
203 index,
204 host,
205 })
206 }
207}
208
209#[derive(PartialEq, Default, Clone, Debug)]
210pub struct HecLogsProcessedEventMetadata {
211 pub sourcetype: Option<String>,
212 pub source: Option<String>,
213 pub index: Option<String>,
214 pub host: Option<Value>,
215 pub timestamp: Option<f64>,
216 pub fields: LogEvent,
217 pub endpoint_target: EndpointTarget,
218}
219
220impl ByteSizeOf for HecLogsProcessedEventMetadata {
221 fn allocated_bytes(&self) -> usize {
222 self.sourcetype.allocated_bytes()
223 + self.source.allocated_bytes()
224 + self.index.allocated_bytes()
225 + self.host.allocated_bytes()
226 + self.fields.allocated_bytes()
227 }
228}
229
230pub type HecProcessedEvent = ProcessedEvent<LogEvent, HecLogsProcessedEventMetadata>;
231
232fn user_or_namespaced_path(
239 log: &LogEvent,
240 user_key: Option<&OptionalTargetPath>,
241 semantic: &str,
242 legacy_path: Option<&OwnedTargetPath>,
243) -> Option<OwnedTargetPath> {
244 match user_key {
245 Some(maybe_key) => maybe_key.path.clone(),
246 None => match log.namespace() {
247 LogNamespace::Vector => log.find_key_by_meaning(semantic).cloned(),
248 LogNamespace::Legacy => legacy_path.cloned(),
249 },
250 }
251}
252
253pub fn process_log(event: Event, data: &HecLogData) -> HecProcessedEvent {
254 let mut log = event.into_log();
255
256 let sourcetype = data
257 .sourcetype
258 .and_then(|sourcetype| render_template_string(sourcetype, &log, SOURCETYPE_FIELD));
259
260 let source = data
261 .source
262 .and_then(|source| render_template_string(source, &log, SOURCE_FIELD));
263
264 let index = data
265 .index
266 .and_then(|index| render_template_string(index, &log, INDEX_FIELD));
267
268 let host = user_or_namespaced_path(
269 &log,
270 data.host_key.as_ref(),
271 meaning::HOST,
272 log_schema().host_key_target_path(),
273 )
274 .and_then(|path| log.get(&path))
275 .cloned();
276
277 let timestamp = if EndpointTarget::Event == data.endpoint_target && !data.auto_extract_timestamp
281 {
282 user_or_namespaced_path(
283 &log,
284 data.timestamp_key.as_ref(),
285 meaning::TIMESTAMP,
286 log_schema().timestamp_key_target_path(),
287 )
288 .and_then(|timestamp_path| {
289 match log.remove(×tamp_path) {
290 Some(Value::Timestamp(ts)) => {
291 if let Some(key) = data.timestamp_nanos_key {
293 log.try_insert(event_path!(key), ts.timestamp_subsec_nanos() % 1_000_000);
294 }
295 Some((ts.timestamp_millis() as f64) / 1000f64)
296 }
297 Some(value) => {
298 emit!(SplunkEventTimestampInvalidType {
299 r#type: value.kind_str()
300 });
301 None
302 }
303 None => {
304 emit!(SplunkEventTimestampMissing {});
305 None
306 }
307 }
308 })
309 } else {
310 None
311 };
312
313 let fields = data
314 .indexed_fields
315 .iter()
316 .filter_map(|field| {
317 log.get((PathPrefix::Event, field))
318 .map(|value| (field.to_string(), value.clone()))
319 })
320 .collect::<LogEvent>();
321
322 let metadata = HecLogsProcessedEventMetadata {
323 sourcetype,
324 source,
325 index,
326 host,
327 timestamp,
328 fields,
329 endpoint_target: data.endpoint_target,
330 };
331
332 ProcessedEvent {
333 event: log,
334 metadata,
335 }
336}
337
338impl EventCount for HecProcessedEvent {
339 fn event_count(&self) -> usize {
340 1
342 }
343}