vector/sinks/splunk_hec/logs/
request_builder.rs1use std::sync::Arc;
2
3use bytes::Bytes;
4use vector_lib::event::{EventFinalizers, Finalizable};
5use vector_lib::request_metadata::RequestMetadata;
6
7use super::{
8 encoder::HecLogsEncoder,
9 sink::{HecProcessedEvent, Partitioned},
10};
11use crate::sinks::{
12 splunk_hec::common::request::HecRequest,
13 util::{
14 metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
15 RequestBuilder,
16 },
17};
18
19pub struct HecLogsRequestBuilder {
20 pub encoder: HecLogsEncoder,
21 pub compression: Compression,
22}
23
24#[derive(Debug, Clone)]
25pub struct HecRequestMetadata {
26 finalizers: EventFinalizers,
27 partition: Option<Arc<str>>,
28 source: Option<String>,
29 sourcetype: Option<String>,
30 index: Option<String>,
31 host: Option<String>,
32}
33
34impl RequestBuilder<(Option<Partitioned>, Vec<HecProcessedEvent>)> for HecLogsRequestBuilder {
35 type Metadata = HecRequestMetadata;
36 type Events = Vec<HecProcessedEvent>;
37 type Encoder = HecLogsEncoder;
38 type Payload = Bytes;
39 type Request = HecRequest;
40 type Error = std::io::Error;
41
42 fn compression(&self) -> Compression {
43 self.compression
44 }
45
46 fn encoder(&self) -> &Self::Encoder {
47 &self.encoder
48 }
49
50 fn split_input(
51 &self,
52 input: (Option<Partitioned>, Vec<HecProcessedEvent>),
53 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
54 let (mut partition, mut events) = input;
55
56 let finalizers = events.take_finalizers();
57
58 let builder = RequestMetadataBuilder::from_events(&events);
59
60 (
61 HecRequestMetadata {
62 finalizers,
63 partition: partition.as_ref().and_then(|p| p.token.clone()),
64 source: partition.as_mut().and_then(|p| p.source.take()),
65 sourcetype: partition.as_mut().and_then(|p| p.sourcetype.take()),
66 index: partition.as_mut().and_then(|p| p.index.take()),
67 host: partition.as_mut().and_then(|p| p.host.take()),
68 },
69 builder,
70 events,
71 )
72 }
73
74 fn build_request(
75 &self,
76 hec_metadata: Self::Metadata,
77 metadata: RequestMetadata,
78 payload: EncodeResult<Self::Payload>,
79 ) -> Self::Request {
80 HecRequest {
81 body: payload.into_payload(),
82 finalizers: hec_metadata.finalizers,
83 passthrough_token: hec_metadata.partition,
84 source: hec_metadata.source,
85 sourcetype: hec_metadata.sourcetype,
86 index: hec_metadata.index,
87 host: hec_metadata.host,
88 metadata,
89 }
90 }
91}