vector/sinks/splunk_hec/logs/
request_builder.rs1use std::sync::Arc;
2
3use bytes::Bytes;
4use vector_lib::{
5 event::{EventFinalizers, Finalizable},
6 request_metadata::RequestMetadata,
7};
8
9use super::{
10 encoder::HecLogsEncoder,
11 sink::{HecProcessedEvent, Partitioned},
12};
13use crate::sinks::{
14 splunk_hec::common::request::HecRequest,
15 util::{
16 Compression, RequestBuilder, metadata::RequestMetadataBuilder,
17 request_builder::EncodeResult,
18 },
19};
20
21pub struct HecLogsRequestBuilder {
22 pub encoder: HecLogsEncoder,
23 pub compression: Compression,
24}
25
26#[derive(Debug, Clone)]
27pub struct HecRequestMetadata {
28 finalizers: EventFinalizers,
29 partition: Option<Arc<str>>,
30 source: Option<String>,
31 sourcetype: Option<String>,
32 index: Option<String>,
33 host: Option<String>,
34}
35
36impl RequestBuilder<(Option<Partitioned>, Vec<HecProcessedEvent>)> for HecLogsRequestBuilder {
37 type Metadata = HecRequestMetadata;
38 type Events = Vec<HecProcessedEvent>;
39 type Encoder = HecLogsEncoder;
40 type Payload = Bytes;
41 type Request = HecRequest;
42 type Error = std::io::Error;
43
44 fn compression(&self) -> Compression {
45 self.compression
46 }
47
48 fn encoder(&self) -> &Self::Encoder {
49 &self.encoder
50 }
51
52 fn split_input(
53 &self,
54 input: (Option<Partitioned>, Vec<HecProcessedEvent>),
55 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
56 let (mut partition, mut events) = input;
57
58 let finalizers = events.take_finalizers();
59
60 let builder = RequestMetadataBuilder::from_events(&events);
61
62 (
63 HecRequestMetadata {
64 finalizers,
65 partition: partition.as_ref().and_then(|p| p.token.clone()),
66 source: partition.as_mut().and_then(|p| p.source.take()),
67 sourcetype: partition.as_mut().and_then(|p| p.sourcetype.take()),
68 index: partition.as_mut().and_then(|p| p.index.take()),
69 host: partition.as_mut().and_then(|p| p.host.take()),
70 },
71 builder,
72 events,
73 )
74 }
75
76 fn build_request(
77 &self,
78 hec_metadata: Self::Metadata,
79 metadata: RequestMetadata,
80 payload: EncodeResult<Self::Payload>,
81 ) -> Self::Request {
82 HecRequest {
83 body: payload.into_payload(),
84 finalizers: hec_metadata.finalizers,
85 passthrough_token: hec_metadata.partition,
86 source: hec_metadata.source,
87 sourcetype: hec_metadata.sourcetype,
88 index: hec_metadata.index,
89 host: hec_metadata.host,
90 metadata,
91 }
92 }
93}