vector/sinks/splunk_hec/logs/
sink.rs

1use std::{fmt, sync::Arc};
2
3use vector_lib::{
4    config::{LogNamespace, log_schema},
5    lookup::{OwnedValuePath, PathPrefix, event_path, lookup_v2::OptionalTargetPath},
6    schema::meaning,
7};
8use vrl::path::OwnedTargetPath;
9
10use super::request_builder::HecLogsRequestBuilder;
11use crate::{
12    internal_events::{SplunkEventTimestampInvalidType, SplunkEventTimestampMissing},
13    sinks::{
14        prelude::*,
15        splunk_hec::common::{
16            EndpointTarget, INDEX_FIELD, SOURCE_FIELD, SOURCETYPE_FIELD, render_template_string,
17            request::HecRequest,
18        },
19        util::processed_event::ProcessedEvent,
20    },
21};
22
23// NOTE: The `OptionalTargetPath`s are wrapped in an `Option` in order to distinguish between a true
24//       `None` type and an empty string. This is necessary because `OptionalTargetPath` deserializes an
25//       empty string to a `None` path internally.
26pub struct HecLogsSink<S> {
27    pub service: S,
28    pub request_builder: HecLogsRequestBuilder,
29    pub batch_settings: BatcherSettings,
30    pub sourcetype: Option<Template>,
31    pub source: Option<Template>,
32    pub index: Option<Template>,
33    pub indexed_fields: Vec<OwnedValuePath>,
34    pub host_key: Option<OptionalTargetPath>,
35    pub timestamp_nanos_key: Option<String>,
36    pub timestamp_key: Option<OptionalTargetPath>,
37    pub endpoint_target: EndpointTarget,
38    pub auto_extract_timestamp: bool,
39}
40
41pub struct HecLogData<'a> {
42    pub sourcetype: Option<&'a Template>,
43    pub source: Option<&'a Template>,
44    pub index: Option<&'a Template>,
45    pub indexed_fields: &'a [OwnedValuePath],
46    pub host_key: Option<OptionalTargetPath>,
47    pub timestamp_nanos_key: Option<&'a String>,
48    pub timestamp_key: Option<OptionalTargetPath>,
49    pub endpoint_target: EndpointTarget,
50    pub auto_extract_timestamp: bool,
51}
52
53impl<S> HecLogsSink<S>
54where
55    S: Service<HecRequest> + Send + 'static,
56    S::Future: Send + 'static,
57    S::Response: DriverResponse + Send + 'static,
58    S::Error: fmt::Debug + Into<crate::Error> + Send,
59{
60    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
61        let data = HecLogData {
62            sourcetype: self.sourcetype.as_ref(),
63            source: self.source.as_ref(),
64            index: self.index.as_ref(),
65            indexed_fields: self.indexed_fields.as_slice(),
66            host_key: self.host_key.clone(),
67            timestamp_nanos_key: self.timestamp_nanos_key.as_ref(),
68            timestamp_key: self.timestamp_key.clone(),
69            endpoint_target: self.endpoint_target,
70            auto_extract_timestamp: self.auto_extract_timestamp,
71        };
72        let batch_settings = self.batch_settings;
73
74        input
75            .map(move |event| process_log(event, &data))
76            .batched_partitioned(
77                if self.endpoint_target == EndpointTarget::Raw {
78                    // We only need to partition by the metadata fields for the raw endpoint since those fields
79                    // are sent via query parameters in the request.
80                    EventPartitioner::new(
81                        self.sourcetype.clone(),
82                        self.source.clone(),
83                        self.index.clone(),
84                        self.host_key.clone(),
85                    )
86                } else {
87                    EventPartitioner::new(None, None, None, None)
88                },
89                batch_settings.timeout,
90                |_| batch_settings.as_byte_size_config(),
91            )
92            .request_builder(
93                default_request_builder_concurrency_limit(),
94                self.request_builder,
95            )
96            .filter_map(|request| async move {
97                match request {
98                    Err(e) => {
99                        error!("Failed to build HEC Logs request: {:?}.", e);
100                        None
101                    }
102                    Ok(req) => Some(req),
103                }
104            })
105            .into_driver(self.service)
106            .run()
107            .await
108    }
109}
110
111#[async_trait]
112impl<S> StreamSink<Event> for HecLogsSink<S>
113where
114    S: Service<HecRequest> + Send + 'static,
115    S::Future: Send + 'static,
116    S::Response: DriverResponse + Send + 'static,
117    S::Error: fmt::Debug + Into<crate::Error> + Send,
118{
119    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
120        self.run_inner(input).await
121    }
122}
123
124#[derive(Clone, Debug, PartialEq, Hash, Eq)]
125pub(super) struct Partitioned {
126    pub(super) token: Option<Arc<str>>,
127    pub(super) source: Option<String>,
128    pub(super) sourcetype: Option<String>,
129    pub(super) index: Option<String>,
130    pub(super) host: Option<String>,
131}
132
133#[derive(Default)]
134struct EventPartitioner {
135    pub sourcetype: Option<Template>,
136    pub source: Option<Template>,
137    pub index: Option<Template>,
138    pub host_key: Option<OptionalTargetPath>,
139}
140
141impl EventPartitioner {
142    const fn new(
143        sourcetype: Option<Template>,
144        source: Option<Template>,
145        index: Option<Template>,
146        host_key: Option<OptionalTargetPath>,
147    ) -> Self {
148        Self {
149            sourcetype,
150            source,
151            index,
152            host_key,
153        }
154    }
155}
156
157impl Partitioner for EventPartitioner {
158    type Item = HecProcessedEvent;
159    type Key = Option<Partitioned>;
160
161    fn partition(&self, item: &Self::Item) -> Self::Key {
162        let emit_err = |error, field| {
163            emit!(TemplateRenderingError {
164                error,
165                field: Some(field),
166                drop_event: false,
167            })
168        };
169
170        let source = self.source.as_ref().and_then(|source| {
171            source
172                .render_string(&item.event)
173                .map_err(|error| emit_err(error, SOURCE_FIELD))
174                .ok()
175        });
176
177        let sourcetype = self.sourcetype.as_ref().and_then(|sourcetype| {
178            sourcetype
179                .render_string(&item.event)
180                .map_err(|error| emit_err(error, SOURCETYPE_FIELD))
181                .ok()
182        });
183
184        let index = self.index.as_ref().and_then(|index| {
185            index
186                .render_string(&item.event)
187                .map_err(|error| emit_err(error, INDEX_FIELD))
188                .ok()
189        });
190
191        let host = user_or_namespaced_path(
192            &item.event,
193            self.host_key.as_ref(),
194            meaning::HOST,
195            log_schema().host_key_target_path(),
196        )
197        .and_then(|path| item.event.get(&path))
198        .and_then(|value| value.as_str().map(|s| s.to_string()));
199
200        Some(Partitioned {
201            token: item.event.metadata().splunk_hec_token(),
202            source,
203            sourcetype,
204            index,
205            host,
206        })
207    }
208}
209
210#[derive(PartialEq, Default, Clone, Debug)]
211pub struct HecLogsProcessedEventMetadata {
212    pub sourcetype: Option<String>,
213    pub source: Option<String>,
214    pub index: Option<String>,
215    pub host: Option<Value>,
216    pub timestamp: Option<f64>,
217    pub fields: LogEvent,
218    pub endpoint_target: EndpointTarget,
219}
220
221impl ByteSizeOf for HecLogsProcessedEventMetadata {
222    fn allocated_bytes(&self) -> usize {
223        self.sourcetype.allocated_bytes()
224            + self.source.allocated_bytes()
225            + self.index.allocated_bytes()
226            + self.host.allocated_bytes()
227            + self.fields.allocated_bytes()
228    }
229}
230
231pub type HecProcessedEvent = ProcessedEvent<LogEvent, HecLogsProcessedEventMetadata>;
232
233// determine the path for a field from one of the following use cases:
234// 1. user provided a path in the config settings
235//     a. If the path provided was an empty string, None is returned
236// 2. namespaced path ("default")
237//     a. if Legacy namespace, use the provided path from the global log schema
238//     b. if Vector namespace, use the semantically defined path
239fn user_or_namespaced_path(
240    log: &LogEvent,
241    user_key: Option<&OptionalTargetPath>,
242    semantic: &str,
243    legacy_path: Option<&OwnedTargetPath>,
244) -> Option<OwnedTargetPath> {
245    match user_key {
246        Some(maybe_key) => maybe_key.path.clone(),
247        None => match log.namespace() {
248            LogNamespace::Vector => log.find_key_by_meaning(semantic).cloned(),
249            LogNamespace::Legacy => legacy_path.cloned(),
250        },
251    }
252}
253
254pub fn process_log(event: Event, data: &HecLogData) -> HecProcessedEvent {
255    let mut log = event.into_log();
256
257    let sourcetype = data
258        .sourcetype
259        .and_then(|sourcetype| render_template_string(sourcetype, &log, SOURCETYPE_FIELD));
260
261    let source = data
262        .source
263        .and_then(|source| render_template_string(source, &log, SOURCE_FIELD));
264
265    let index = data
266        .index
267        .and_then(|index| render_template_string(index, &log, INDEX_FIELD));
268
269    let host = user_or_namespaced_path(
270        &log,
271        data.host_key.as_ref(),
272        meaning::HOST,
273        log_schema().host_key_target_path(),
274    )
275    .and_then(|path| log.get(&path))
276    .cloned();
277
278    // only extract the timestamp if this is the Event endpoint, and if the setting
279    // `auto_extract_timestamp` is false (because that indicates that we should leave
280    // the timestamp in the event as-is, and let Splunk do the extraction).
281    let timestamp = if EndpointTarget::Event == data.endpoint_target && !data.auto_extract_timestamp
282    {
283        user_or_namespaced_path(
284            &log,
285            data.timestamp_key.as_ref(),
286            meaning::TIMESTAMP,
287            log_schema().timestamp_key_target_path(),
288        )
289        .and_then(|timestamp_path| {
290            match log.remove(&timestamp_path) {
291                Some(Value::Timestamp(ts)) => {
292                    // set nanos in log if valid timestamp in event and timestamp_nanos_key is configured
293                    if let Some(key) = data.timestamp_nanos_key {
294                        log.try_insert(event_path!(key), ts.timestamp_subsec_nanos() % 1_000_000);
295                    }
296                    Some((ts.timestamp_millis() as f64) / 1000f64)
297                }
298                Some(value) => {
299                    emit!(SplunkEventTimestampInvalidType {
300                        r#type: value.kind_str()
301                    });
302                    None
303                }
304                None => {
305                    emit!(SplunkEventTimestampMissing {});
306                    None
307                }
308            }
309        })
310    } else {
311        None
312    };
313
314    let fields = data
315        .indexed_fields
316        .iter()
317        .filter_map(|field| {
318            log.get((PathPrefix::Event, field))
319                .map(|value| (field.to_string(), value.clone()))
320        })
321        .collect::<LogEvent>();
322
323    let metadata = HecLogsProcessedEventMetadata {
324        sourcetype,
325        source,
326        index,
327        host,
328        timestamp,
329        fields,
330        endpoint_target: data.endpoint_target,
331    };
332
333    ProcessedEvent {
334        event: log,
335        metadata,
336    }
337}
338
339impl EventCount for HecProcessedEvent {
340    fn event_count(&self) -> usize {
341        // A HecProcessedEvent is mapped one-to-one with an event.
342        1
343    }
344}