vector/sinks/splunk_hec/logs/
request_builder.rs

1use std::sync::Arc;
2
3use bytes::Bytes;
4use vector_lib::{
5    event::{EventFinalizers, Finalizable},
6    request_metadata::RequestMetadata,
7};
8
9use super::{
10    encoder::HecLogsEncoder,
11    sink::{HecProcessedEvent, Partitioned},
12};
13use crate::sinks::{
14    splunk_hec::common::request::HecRequest,
15    util::{
16        Compression, RequestBuilder, metadata::RequestMetadataBuilder,
17        request_builder::EncodeResult,
18    },
19};
20
21pub struct HecLogsRequestBuilder {
22    pub encoder: HecLogsEncoder,
23    pub compression: Compression,
24}
25
26#[derive(Debug, Clone)]
27pub struct HecRequestMetadata {
28    finalizers: EventFinalizers,
29    partition: Option<Arc<str>>,
30    source: Option<String>,
31    sourcetype: Option<String>,
32    index: Option<String>,
33    host: Option<String>,
34}
35
36impl RequestBuilder<(Option<Partitioned>, Vec<HecProcessedEvent>)> for HecLogsRequestBuilder {
37    type Metadata = HecRequestMetadata;
38    type Events = Vec<HecProcessedEvent>;
39    type Encoder = HecLogsEncoder;
40    type Payload = Bytes;
41    type Request = HecRequest;
42    type Error = std::io::Error;
43
44    fn compression(&self) -> Compression {
45        self.compression
46    }
47
48    fn encoder(&self) -> &Self::Encoder {
49        &self.encoder
50    }
51
52    fn split_input(
53        &self,
54        input: (Option<Partitioned>, Vec<HecProcessedEvent>),
55    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
56        let (mut partition, mut events) = input;
57
58        let finalizers = events.take_finalizers();
59
60        let builder = RequestMetadataBuilder::from_events(&events);
61
62        (
63            HecRequestMetadata {
64                finalizers,
65                partition: partition.as_ref().and_then(|p| p.token.clone()),
66                source: partition.as_mut().and_then(|p| p.source.take()),
67                sourcetype: partition.as_mut().and_then(|p| p.sourcetype.take()),
68                index: partition.as_mut().and_then(|p| p.index.take()),
69                host: partition.as_mut().and_then(|p| p.host.take()),
70            },
71            builder,
72            events,
73        )
74    }
75
76    fn build_request(
77        &self,
78        hec_metadata: Self::Metadata,
79        metadata: RequestMetadata,
80        payload: EncodeResult<Self::Payload>,
81    ) -> Self::Request {
82        HecRequest {
83            body: payload.into_payload(),
84            finalizers: hec_metadata.finalizers,
85            passthrough_token: hec_metadata.partition,
86            source: hec_metadata.source,
87            sourcetype: hec_metadata.sourcetype,
88            index: hec_metadata.index,
89            host: hec_metadata.host,
90            metadata,
91        }
92    }
93}