vector/sinks/splunk_hec/logs/
request_builder.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4use vector_lib::event::{EventFinalizers, Finalizable};
5use vector_lib::request_metadata::RequestMetadata;
6
7use super::{
8    encoder::HecLogsEncoder,
9    sink::{HecProcessedEvent, Partitioned},
10};
11use crate::sinks::{
12    splunk_hec::common::request::HecRequest,
13    util::{
14        metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
15        RequestBuilder,
16    },
17};
18
19pub struct HecLogsRequestBuilder {
20    pub encoder: HecLogsEncoder,
21    pub compression: Compression,
22}
23
24#[derive(Debug, Clone)]
25pub struct HecRequestMetadata {
26    finalizers: EventFinalizers,
27    partition: Option<Arc<str>>,
28    source: Option<String>,
29    sourcetype: Option<String>,
30    index: Option<String>,
31    host: Option<String>,
32}
33
34impl RequestBuilder<(Option<Partitioned>, Vec<HecProcessedEvent>)> for HecLogsRequestBuilder {
35    type Metadata = HecRequestMetadata;
36    type Events = Vec<HecProcessedEvent>;
37    type Encoder = HecLogsEncoder;
38    type Payload = Bytes;
39    type Request = HecRequest;
40    type Error = std::io::Error;
41
42    fn compression(&self) -> Compression {
43        self.compression
44    }
45
46    fn encoder(&self) -> &Self::Encoder {
47        &self.encoder
48    }
49
50    fn split_input(
51        &self,
52        input: (Option<Partitioned>, Vec<HecProcessedEvent>),
53    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
54        let (mut partition, mut events) = input;
55
56        let finalizers = events.take_finalizers();
57
58        let builder = RequestMetadataBuilder::from_events(&events);
59
60        (
61            HecRequestMetadata {
62                finalizers,
63                partition: partition.as_ref().and_then(|p| p.token.clone()),
64                source: partition.as_mut().and_then(|p| p.source.take()),
65                sourcetype: partition.as_mut().and_then(|p| p.sourcetype.take()),
66                index: partition.as_mut().and_then(|p| p.index.take()),
67                host: partition.as_mut().and_then(|p| p.host.take()),
68            },
69            builder,
70            events,
71        )
72    }
73
74    fn build_request(
75        &self,
76        hec_metadata: Self::Metadata,
77        metadata: RequestMetadata,
78        payload: EncodeResult<Self::Payload>,
79    ) -> Self::Request {
80        HecRequest {
81            body: payload.into_payload(),
82            finalizers: hec_metadata.finalizers,
83            passthrough_token: hec_metadata.partition,
84            source: hec_metadata.source,
85            sourcetype: hec_metadata.sourcetype,
86            index: hec_metadata.index,
87            host: hec_metadata.host,
88            metadata,
89        }
90    }
91}