vector/sinks/util/
request_builder.rs1use std::{io, num::NonZeroUsize};
2
3use bytes::Bytes;
4use vector_lib::request_metadata::{GroupedCountByteSize, RequestMetadata};
5
6use super::{encoding::Encoder, metadata::RequestMetadataBuilder, Compression, Compressor};
7
8pub fn default_request_builder_concurrency_limit() -> NonZeroUsize {
9 if let Some(limit) = std::env::var("VECTOR_EXPERIMENTAL_REQUEST_BUILDER_CONCURRENCY")
10 .map(|value| value.parse::<NonZeroUsize>().ok())
11 .ok()
12 .flatten()
13 {
14 return limit;
15 }
16
17 crate::app::worker_threads().unwrap_or_else(|| NonZeroUsize::new(8).expect("static"))
18}
19
20pub struct EncodeResult<P> {
21 pub payload: P,
22 pub uncompressed_byte_size: usize,
23 pub transformed_json_size: GroupedCountByteSize,
24 pub compressed_byte_size: Option<usize>,
25}
26
27impl<P> EncodeResult<P>
28where
29 P: AsRef<[u8]>,
30{
31 pub fn uncompressed(payload: P, transformed_json_size: GroupedCountByteSize) -> Self {
32 let uncompressed_byte_size = payload.as_ref().len();
33 Self {
34 payload,
35 uncompressed_byte_size,
36 transformed_json_size,
37 compressed_byte_size: None,
38 }
39 }
40
41 pub fn compressed(
42 payload: P,
43 uncompressed_byte_size: usize,
44 transformed_json_size: GroupedCountByteSize,
45 ) -> Self {
46 let compressed_byte_size = payload.as_ref().len();
47 Self {
48 payload,
49 uncompressed_byte_size,
50 transformed_json_size,
51 compressed_byte_size: Some(compressed_byte_size),
52 }
53 }
54}
55
56impl<P> EncodeResult<P> {
57 #[allow(clippy::missing_const_for_fn)]
60 pub fn into_payload(self) -> P {
61 self.payload
62 }
63}
64
65pub trait RequestBuilder<Input> {
67 type Metadata;
68 type Events;
69 type Encoder: Encoder<Self::Events>;
70 type Payload: From<Bytes> + AsRef<[u8]>;
71 type Request;
72 type Error: From<io::Error>;
73
74 fn compression(&self) -> Compression;
76
77 fn encoder(&self) -> &Self::Encoder;
79
80 fn split_input(&self, input: Input) -> (Self::Metadata, RequestMetadataBuilder, Self::Events);
85
86 fn encode_events(
87 &self,
88 events: Self::Events,
89 ) -> Result<EncodeResult<Self::Payload>, Self::Error> {
90 let mut compressor = Compressor::from(self.compression());
95 let is_compressed = compressor.is_compressed();
96 let (_, json_size) = self.encoder().encode_input(events, &mut compressor)?;
97
98 let payload = compressor.into_inner().freeze();
99 let result = if is_compressed {
100 let compressed_byte_size = payload.len();
101 EncodeResult::compressed(payload.into(), compressed_byte_size, json_size)
102 } else {
103 EncodeResult::uncompressed(payload.into(), json_size)
104 };
105
106 Ok(result)
107 }
108
109 fn build_request(
111 &self,
112 metadata: Self::Metadata,
113 request_metadata: RequestMetadata,
114 payload: EncodeResult<Self::Payload>,
115 ) -> Self::Request;
116}
117
118pub trait IncrementalRequestBuilder<Input> {
129 type Metadata;
130 type Payload;
131 type Request;
132 type Error;
133
134 fn encode_events_incremental(
136 &mut self,
137 input: Input,
138 ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>>;
139
140 fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request;
142}