vector/sinks/splunk_hec/logs/
sink.rs

1use std::{fmt, sync::Arc};
2
3use super::request_builder::HecLogsRequestBuilder;
4use crate::{
5    internal_events::SplunkEventTimestampInvalidType,
6    internal_events::SplunkEventTimestampMissing,
7    sinks::{
8        prelude::*,
9        splunk_hec::common::{
10            render_template_string, request::HecRequest, EndpointTarget, INDEX_FIELD,
11            SOURCETYPE_FIELD, SOURCE_FIELD,
12        },
13        util::processed_event::ProcessedEvent,
14    },
15};
16use vector_lib::{
17    config::{log_schema, LogNamespace},
18    lookup::{event_path, lookup_v2::OptionalTargetPath, OwnedValuePath, PathPrefix},
19    schema::meaning,
20};
21use vrl::path::OwnedTargetPath;
22
23// NOTE: The `OptionalTargetPath`s are wrapped in an `Option` in order to distinguish between a true
24//       `None` type and an empty string. This is necessary because `OptionalTargetPath` deserializes an
25//       empty string to a `None` path internally.
26pub struct HecLogsSink<S> {
27    pub service: S,
28    pub request_builder: HecLogsRequestBuilder,
29    pub batch_settings: BatcherSettings,
30    pub sourcetype: Option<Template>,
31    pub source: Option<Template>,
32    pub index: Option<Template>,
33    pub indexed_fields: Vec<OwnedValuePath>,
34    pub host_key: Option<OptionalTargetPath>,
35    pub timestamp_nanos_key: Option<String>,
36    pub timestamp_key: Option<OptionalTargetPath>,
37    pub endpoint_target: EndpointTarget,
38    pub auto_extract_timestamp: bool,
39}
40
41pub struct HecLogData<'a> {
42    pub sourcetype: Option<&'a Template>,
43    pub source: Option<&'a Template>,
44    pub index: Option<&'a Template>,
45    pub indexed_fields: &'a [OwnedValuePath],
46    pub host_key: Option<OptionalTargetPath>,
47    pub timestamp_nanos_key: Option<&'a String>,
48    pub timestamp_key: Option<OptionalTargetPath>,
49    pub endpoint_target: EndpointTarget,
50    pub auto_extract_timestamp: bool,
51}
52
53impl<S> HecLogsSink<S>
54where
55    S: Service<HecRequest> + Send + 'static,
56    S::Future: Send + 'static,
57    S::Response: DriverResponse + Send + 'static,
58    S::Error: fmt::Debug + Into<crate::Error> + Send,
59{
60    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
61        let data = HecLogData {
62            sourcetype: self.sourcetype.as_ref(),
63            source: self.source.as_ref(),
64            index: self.index.as_ref(),
65            indexed_fields: self.indexed_fields.as_slice(),
66            host_key: self.host_key.clone(),
67            timestamp_nanos_key: self.timestamp_nanos_key.as_ref(),
68            timestamp_key: self.timestamp_key.clone(),
69            endpoint_target: self.endpoint_target,
70            auto_extract_timestamp: self.auto_extract_timestamp,
71        };
72        let batch_settings = self.batch_settings;
73
74        input
75            .map(move |event| process_log(event, &data))
76            .batched_partitioned(
77                if self.endpoint_target == EndpointTarget::Raw {
78                    // We only need to partition by the metadata fields for the raw endpoint since those fields
79                    // are sent via query parameters in the request.
80                    EventPartitioner::new(
81                        self.sourcetype.clone(),
82                        self.source.clone(),
83                        self.index.clone(),
84                        self.host_key.clone(),
85                    )
86                } else {
87                    EventPartitioner::new(None, None, None, None)
88                },
89                || batch_settings.as_byte_size_config(),
90            )
91            .request_builder(
92                default_request_builder_concurrency_limit(),
93                self.request_builder,
94            )
95            .filter_map(|request| async move {
96                match request {
97                    Err(e) => {
98                        error!("Failed to build HEC Logs request: {:?}.", e);
99                        None
100                    }
101                    Ok(req) => Some(req),
102                }
103            })
104            .into_driver(self.service)
105            .run()
106            .await
107    }
108}
109
110#[async_trait]
111impl<S> StreamSink<Event> for HecLogsSink<S>
112where
113    S: Service<HecRequest> + Send + 'static,
114    S::Future: Send + 'static,
115    S::Response: DriverResponse + Send + 'static,
116    S::Error: fmt::Debug + Into<crate::Error> + Send,
117{
118    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
119        self.run_inner(input).await
120    }
121}
122
123#[derive(Clone, Debug, PartialEq, Hash, Eq)]
124pub(super) struct Partitioned {
125    pub(super) token: Option<Arc<str>>,
126    pub(super) source: Option<String>,
127    pub(super) sourcetype: Option<String>,
128    pub(super) index: Option<String>,
129    pub(super) host: Option<String>,
130}
131
132#[derive(Default)]
133struct EventPartitioner {
134    pub sourcetype: Option<Template>,
135    pub source: Option<Template>,
136    pub index: Option<Template>,
137    pub host_key: Option<OptionalTargetPath>,
138}
139
140impl EventPartitioner {
141    const fn new(
142        sourcetype: Option<Template>,
143        source: Option<Template>,
144        index: Option<Template>,
145        host_key: Option<OptionalTargetPath>,
146    ) -> Self {
147        Self {
148            sourcetype,
149            source,
150            index,
151            host_key,
152        }
153    }
154}
155
156impl Partitioner for EventPartitioner {
157    type Item = HecProcessedEvent;
158    type Key = Option<Partitioned>;
159
160    fn partition(&self, item: &Self::Item) -> Self::Key {
161        let emit_err = |error, field| {
162            emit!(TemplateRenderingError {
163                error,
164                field: Some(field),
165                drop_event: false,
166            })
167        };
168
169        let source = self.source.as_ref().and_then(|source| {
170            source
171                .render_string(&item.event)
172                .map_err(|error| emit_err(error, SOURCE_FIELD))
173                .ok()
174        });
175
176        let sourcetype = self.sourcetype.as_ref().and_then(|sourcetype| {
177            sourcetype
178                .render_string(&item.event)
179                .map_err(|error| emit_err(error, SOURCETYPE_FIELD))
180                .ok()
181        });
182
183        let index = self.index.as_ref().and_then(|index| {
184            index
185                .render_string(&item.event)
186                .map_err(|error| emit_err(error, INDEX_FIELD))
187                .ok()
188        });
189
190        let host = user_or_namespaced_path(
191            &item.event,
192            self.host_key.as_ref(),
193            meaning::HOST,
194            log_schema().host_key_target_path(),
195        )
196        .and_then(|path| item.event.get(&path))
197        .and_then(|value| value.as_str().map(|s| s.to_string()));
198
199        Some(Partitioned {
200            token: item.event.metadata().splunk_hec_token(),
201            source,
202            sourcetype,
203            index,
204            host,
205        })
206    }
207}
208
209#[derive(PartialEq, Default, Clone, Debug)]
210pub struct HecLogsProcessedEventMetadata {
211    pub sourcetype: Option<String>,
212    pub source: Option<String>,
213    pub index: Option<String>,
214    pub host: Option<Value>,
215    pub timestamp: Option<f64>,
216    pub fields: LogEvent,
217    pub endpoint_target: EndpointTarget,
218}
219
220impl ByteSizeOf for HecLogsProcessedEventMetadata {
221    fn allocated_bytes(&self) -> usize {
222        self.sourcetype.allocated_bytes()
223            + self.source.allocated_bytes()
224            + self.index.allocated_bytes()
225            + self.host.allocated_bytes()
226            + self.fields.allocated_bytes()
227    }
228}
229
230pub type HecProcessedEvent = ProcessedEvent<LogEvent, HecLogsProcessedEventMetadata>;
231
232// determine the path for a field from one of the following use cases:
233// 1. user provided a path in the config settings
234//     a. If the path provided was an empty string, None is returned
235// 2. namespaced path ("default")
236//     a. if Legacy namespace, use the provided path from the global log schema
237//     b. if Vector namespace, use the semantically defined path
238fn user_or_namespaced_path(
239    log: &LogEvent,
240    user_key: Option<&OptionalTargetPath>,
241    semantic: &str,
242    legacy_path: Option<&OwnedTargetPath>,
243) -> Option<OwnedTargetPath> {
244    match user_key {
245        Some(maybe_key) => maybe_key.path.clone(),
246        None => match log.namespace() {
247            LogNamespace::Vector => log.find_key_by_meaning(semantic).cloned(),
248            LogNamespace::Legacy => legacy_path.cloned(),
249        },
250    }
251}
252
253pub fn process_log(event: Event, data: &HecLogData) -> HecProcessedEvent {
254    let mut log = event.into_log();
255
256    let sourcetype = data
257        .sourcetype
258        .and_then(|sourcetype| render_template_string(sourcetype, &log, SOURCETYPE_FIELD));
259
260    let source = data
261        .source
262        .and_then(|source| render_template_string(source, &log, SOURCE_FIELD));
263
264    let index = data
265        .index
266        .and_then(|index| render_template_string(index, &log, INDEX_FIELD));
267
268    let host = user_or_namespaced_path(
269        &log,
270        data.host_key.as_ref(),
271        meaning::HOST,
272        log_schema().host_key_target_path(),
273    )
274    .and_then(|path| log.get(&path))
275    .cloned();
276
277    // only extract the timestamp if this is the Event endpoint, and if the setting
278    // `auto_extract_timestamp` is false (because that indicates that we should leave
279    // the timestamp in the event as-is, and let Splunk do the extraction).
280    let timestamp = if EndpointTarget::Event == data.endpoint_target && !data.auto_extract_timestamp
281    {
282        user_or_namespaced_path(
283            &log,
284            data.timestamp_key.as_ref(),
285            meaning::TIMESTAMP,
286            log_schema().timestamp_key_target_path(),
287        )
288        .and_then(|timestamp_path| {
289            match log.remove(&timestamp_path) {
290                Some(Value::Timestamp(ts)) => {
291                    // set nanos in log if valid timestamp in event and timestamp_nanos_key is configured
292                    if let Some(key) = data.timestamp_nanos_key {
293                        log.try_insert(event_path!(key), ts.timestamp_subsec_nanos() % 1_000_000);
294                    }
295                    Some((ts.timestamp_millis() as f64) / 1000f64)
296                }
297                Some(value) => {
298                    emit!(SplunkEventTimestampInvalidType {
299                        r#type: value.kind_str()
300                    });
301                    None
302                }
303                None => {
304                    emit!(SplunkEventTimestampMissing {});
305                    None
306                }
307            }
308        })
309    } else {
310        None
311    };
312
313    let fields = data
314        .indexed_fields
315        .iter()
316        .filter_map(|field| {
317            log.get((PathPrefix::Event, field))
318                .map(|value| (field.to_string(), value.clone()))
319        })
320        .collect::<LogEvent>();
321
322    let metadata = HecLogsProcessedEventMetadata {
323        sourcetype,
324        source,
325        index,
326        host,
327        timestamp,
328        fields,
329        endpoint_target: data.endpoint_target,
330    };
331
332    ProcessedEvent {
333        event: log,
334        metadata,
335    }
336}
337
338impl EventCount for HecProcessedEvent {
339    fn event_count(&self) -> usize {
340        // A HecProcessedEvent is mapped one-to-one with an event.
341        1
342    }
343}