vector/sinks/util/
request_builder.rs

1use std::{io, num::NonZeroUsize};
2
3use bytes::Bytes;
4use vector_lib::request_metadata::{GroupedCountByteSize, RequestMetadata};
5
6use super::{encoding::Encoder, metadata::RequestMetadataBuilder, Compression, Compressor};
7
8pub fn default_request_builder_concurrency_limit() -> NonZeroUsize {
9    if let Some(limit) = std::env::var("VECTOR_EXPERIMENTAL_REQUEST_BUILDER_CONCURRENCY")
10        .map(|value| value.parse::<NonZeroUsize>().ok())
11        .ok()
12        .flatten()
13    {
14        return limit;
15    }
16
17    crate::app::worker_threads().unwrap_or_else(|| NonZeroUsize::new(8).expect("static"))
18}
19
20pub struct EncodeResult<P> {
21    pub payload: P,
22    pub uncompressed_byte_size: usize,
23    pub transformed_json_size: GroupedCountByteSize,
24    pub compressed_byte_size: Option<usize>,
25}
26
27impl<P> EncodeResult<P>
28where
29    P: AsRef<[u8]>,
30{
31    pub fn uncompressed(payload: P, transformed_json_size: GroupedCountByteSize) -> Self {
32        let uncompressed_byte_size = payload.as_ref().len();
33        Self {
34            payload,
35            uncompressed_byte_size,
36            transformed_json_size,
37            compressed_byte_size: None,
38        }
39    }
40
41    pub fn compressed(
42        payload: P,
43        uncompressed_byte_size: usize,
44        transformed_json_size: GroupedCountByteSize,
45    ) -> Self {
46        let compressed_byte_size = payload.as_ref().len();
47        Self {
48            payload,
49            uncompressed_byte_size,
50            transformed_json_size,
51            compressed_byte_size: Some(compressed_byte_size),
52        }
53    }
54}
55
56impl<P> EncodeResult<P> {
57    // Can't be `const` because you can't (yet?) run deconstructors in a const context, which is what this function does
58    // by dropping the (un)compressed sizes.
59    #[allow(clippy::missing_const_for_fn)]
60    pub fn into_payload(self) -> P {
61        self.payload
62    }
63}
64
65/// Generalized interface for defining how a batch of events will be turned into a request.
66pub trait RequestBuilder<Input> {
67    type Metadata;
68    type Events;
69    type Encoder: Encoder<Self::Events>;
70    type Payload: From<Bytes> + AsRef<[u8]>;
71    type Request;
72    type Error: From<io::Error>;
73
74    /// Gets the compression algorithm used by this request builder.
75    fn compression(&self) -> Compression;
76
77    /// Gets the encoder used by this request builder.
78    fn encoder(&self) -> &Self::Encoder;
79
80    /// Splits apart the input into the metadata and event portions.
81    ///
82    /// The metadata should be any information that needs to be passed back to `build_request`
83    /// as-is, such as event finalizers, while the events are the actual events to process.
84    fn split_input(&self, input: Input) -> (Self::Metadata, RequestMetadataBuilder, Self::Events);
85
86    fn encode_events(
87        &self,
88        events: Self::Events,
89    ) -> Result<EncodeResult<Self::Payload>, Self::Error> {
90        // TODO: Should we add enough bounds on `Self::Events` that we could automatically derive event count/event byte
91        // size, and then we could generate `BatchRequestMetadata` and pass it directly to `build_request`? That would
92        // obviate needing to wrap `payload` in `EncodeResult`, although practically speaking.. the name would be kind
93        // of clash-y with `Self::Metadata`.
94        let mut compressor = Compressor::from(self.compression());
95        let is_compressed = compressor.is_compressed();
96        let (_, json_size) = self.encoder().encode_input(events, &mut compressor)?;
97
98        let payload = compressor.into_inner().freeze();
99        let result = if is_compressed {
100            let compressed_byte_size = payload.len();
101            EncodeResult::compressed(payload.into(), compressed_byte_size, json_size)
102        } else {
103            EncodeResult::uncompressed(payload.into(), json_size)
104        };
105
106        Ok(result)
107    }
108
109    /// Builds a request for the given metadata and payload.
110    fn build_request(
111        &self,
112        metadata: Self::Metadata,
113        request_metadata: RequestMetadata,
114        payload: EncodeResult<Self::Payload>,
115    ) -> Self::Request;
116}
117
118/// Generalized interface for defining how a batch of events will incrementally be turned into requests.
119///
120/// As opposed to `RequestBuilder`, this trait provides the means to incrementally build requests
121/// from a single batch of events, where all events in the batch may not fit into a single request.
122/// This can be important for sinks where the underlying service has limitations on the size of a
123/// request, or how many events may be present, necessitating a batch be split up into multiple requests.
124///
125/// While batches can be limited in size before being handed off to a request builder, we can't
126/// always know in advance how large the encoded payload will be, which requires us to be able to
127/// potentially split a batch into multiple requests.
128pub trait IncrementalRequestBuilder<Input> {
129    type Metadata;
130    type Payload;
131    type Request;
132    type Error;
133
134    /// Incrementally encodes the given input, potentially generating multiple payloads.
135    fn encode_events_incremental(
136        &mut self,
137        input: Input,
138    ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>>;
139
140    /// Builds a request for the given metadata and payload.
141    fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request;
142}