vector/sinks/util/
builder.rs

1use std::{
2    convert::Infallible,
3    fmt,
4    future::Future,
5    hash::Hash,
6    num::NonZeroUsize,
7    pin::Pin,
8    sync::Arc,
9    task::{Context, Poll},
10    time::Duration,
11};
12
13use futures_util::{Stream, StreamExt, stream::Map};
14use pin_project::pin_project;
15use tower::Service;
16use tracing::Span;
17use vector_lib::{
18    ByteSizeOf,
19    event::{Finalizable, Metric},
20    partition::Partitioner,
21    stream::{
22        ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher,
23        batcher::{Batcher, config::BatchConfig},
24    },
25};
26
27use super::{
28    IncrementalRequestBuilder, Normalizer, RequestBuilder, buffer::metrics::MetricNormalize,
29};
30
31impl<T: ?Sized> SinkBuilderExt for T where T: Stream {}
32
33pub trait SinkBuilderExt: Stream {
34    /// Converts a stream of infallible results by unwrapping them.
35    ///
36    /// For a stream of `Result<T, Infallible>` items, this turns it into a stream of `T` items.
37    fn unwrap_infallible<T>(self) -> UnwrapInfallible<Self>
38    where
39        Self: Stream<Item = Result<T, Infallible>> + Sized,
40    {
41        UnwrapInfallible { st: self }
42    }
43
44    /// Batches the stream based on the given partitioner and batch settings.
45    ///
46    /// The stream will yield batches of events, with their partition key, when either a batch fills
47    /// up or times out. [`Partitioner`] operates on a per-event basis, and has access to the event
48    /// itself, and so can access any and all fields of an event.
49    ///
50    /// The `settings` closure receives the partition key for each new partition, allowing callers
51    /// to vary the batch configuration (e.g. byte size limit) per partition.
52    fn batched_partitioned<P, C, F, B>(
53        self,
54        partitioner: P,
55        timeout: Duration,
56        settings: F,
57    ) -> PartitionedBatcher<Self, P, ExpirationQueue<P::Key>, C, F, B>
58    where
59        Self: Stream<Item = P::Item> + Sized,
60        P: Partitioner + Unpin,
61        P::Key: Eq + Hash + Clone,
62        P::Item: ByteSizeOf,
63        C: BatchConfig<P::Item>,
64        F: Fn(&P::Key) -> C + Send,
65    {
66        PartitionedBatcher::new(self, partitioner, timeout, settings)
67    }
68
69    /// Batches the stream based on the given batch settings and item size calculator.
70    ///
71    /// The stream will yield batches of events, when either a batch fills
72    /// up or times out. The `item_size_calculator` determines the "size" of each input
73    /// in a batch. The units of "size" are intentionally not defined, so you can choose
74    /// whatever is needed.
75    fn batched<C>(self, config: C) -> Batcher<Self, C>
76    where
77        C: BatchConfig<Self::Item>,
78        Self: Sized,
79    {
80        Batcher::new(self, config)
81    }
82
83    /// Maps the items in the stream concurrently, up to the configured limit.
84    ///
85    /// For every item, the given mapper is invoked, and the future that is returned is spawned
86    /// and awaited concurrently.  A limit can be passed: `None` is self-describing, as it imposes
87    /// no concurrency limit, and `Some(n)` limits this stage to `n` concurrent operations at any
88    /// given time.
89    ///
90    /// If the spawned future panics, the panic will be carried through and resumed on the task
91    /// calling the stream.
92    fn concurrent_map<F, T>(self, limit: NonZeroUsize, f: F) -> ConcurrentMap<Self, T>
93    where
94        Self: Sized,
95        F: Fn(Self::Item) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> + Send + 'static,
96        T: Send + 'static,
97    {
98        ConcurrentMap::new(self, Some(limit), f)
99    }
100
101    /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to
102    /// downstream services.
103    ///
104    /// Each input is transformed concurrently, up to the given limit.  A limit of `n` limits
105    /// this stage to `n` concurrent operations at any given time.
106    ///
107    /// Encoding and compression are handled internally, deferring to the builder at the necessary
108    /// checkpoints for adjusting the event before encoding/compression, as well as generating the
109    /// correct request object with the result of encoding/compressing the events.
110    fn request_builder<B>(
111        self,
112        limit: NonZeroUsize,
113        builder: B,
114    ) -> ConcurrentMap<Self, Result<B::Request, B::Error>>
115    where
116        Self: Sized,
117        Self::Item: Send + 'static,
118        B: RequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
119        B::Error: Send,
120        B::Request: Send,
121    {
122        let builder = Arc::new(builder);
123
124        // The future passed into the concurrent map is spawned in a tokio thread so we must preserve
125        // the span context in order to propagate the sink's automatic tags.
126        let span = Arc::new(Span::current());
127
128        self.concurrent_map(limit, move |input| {
129            let builder = Arc::clone(&builder);
130            let span = Arc::clone(&span);
131
132            Box::pin(async move {
133                let _entered = span.enter();
134
135                // Split the input into metadata and events.
136                let (metadata, request_metadata_builder, events) = builder.split_input(input);
137
138                // Encode the events.
139                let payload = builder.encode_events(events)?;
140
141                // Note: it would be nice for the RequestMetadataBuilder to build be created from the
142                // events here, and not need to be required by split_input(). But this then requires
143                // each Event type to implement Serialize, and that causes conflicts with the Serialize
144                // implementation for EstimatedJsonEncodedSizeOf.
145
146                // Build the request metadata.
147                let request_metadata = request_metadata_builder.build(&payload);
148
149                // Now build the actual request.
150                Ok(builder.build_request(metadata, request_metadata, payload))
151            })
152        })
153    }
154
155    /// Constructs a [`Stream`] which transforms the input into a number of requests suitable for
156    /// sending to downstream services.
157    ///
158    /// Unlike `request_builder`, which depends on the `RequestBuilder` trait,
159    /// `incremental_request_builder` depends on the `IncrementalRequestBuilder` trait, which is
160    /// designed specifically for sinks that have more stringent requirements around the generated
161    /// requests.
162    ///
163    /// As an example, the normal `request_builder` doesn't allow for a batch of input events to be
164    /// split up: all events must be split at the beginning, encoded separately (and all together),
165    /// and then reassembled into the request.  If the encoding of these events caused a payload to
166    /// be generated that was, say, too large, you would have to back out the operation entirely by
167    /// failing the batch.
168    ///
169    /// With `incremental_request_builder`, the builder is given all of the events in a single shot,
170    /// and can generate multiple payloads.  This is the maximally flexible approach to encoding,
171    /// but means that the trait doesn't provide any default methods like `RequestBuilder` does.
172    ///
173    /// Each input is transformed serially.
174    ///
175    /// Encoding and compression are handled internally, deferring to the builder at the necessary
176    /// checkpoints for adjusting the event before encoding/compression, as well as generating the
177    /// correct request object with the result of encoding/compressing the events.
178    fn incremental_request_builder<B>(
179        self,
180        mut builder: B,
181    ) -> Map<Self, Box<dyn FnMut(Self::Item) -> Vec<Result<B::Request, B::Error>> + Send + Sync>>
182    where
183        Self: Sized,
184        Self::Item: Send + 'static,
185        B: IncrementalRequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
186        B::Error: Send,
187        B::Request: Send,
188    {
189        self.map(Box::new(move |input| {
190            builder
191                .encode_events_incremental(input)
192                .into_iter()
193                .map(|result| {
194                    result.map(|(metadata, payload)| builder.build_request(metadata, payload))
195                })
196                .collect()
197        }))
198    }
199
200    /// Normalizes a stream of [`Metric`] events with the provided normalizer.
201    ///
202    /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
203    /// supported by the sink, or to modify them.  Such modifications typically include converting
204    /// absolute metrics to incremental metrics by tracking the change over time for a particular
205    /// series, or emitting absolute metrics based on incremental updates.
206    fn normalized<N>(self, normalizer: N) -> Normalizer<Self, N>
207    where
208        Self: Stream<Item = Metric> + Unpin + Sized,
209        N: MetricNormalize,
210    {
211        Normalizer::new(self, normalizer)
212    }
213
214    /// Normalizes a stream of [`Metric`] events with a default normalizer.
215    ///
216    /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
217    /// supported by the sink, or to modify them.  Such modifications typically include converting
218    /// absolute metrics to incremental metrics by tracking the change over time for a particular
219    /// series, or emitting absolute metrics based on incremental updates.
220    fn normalized_with_default<N>(self) -> Normalizer<Self, N>
221    where
222        Self: Stream<Item = Metric> + Unpin + Sized,
223        N: MetricNormalize + Default,
224    {
225        Normalizer::new(self, N::default())
226    }
227
228    /// Normalizes a stream of [`Metric`] events with a normalizer and an optional TTL.
229    fn normalized_with_ttl<N>(self, maybe_ttl_secs: Option<f64>) -> Normalizer<Self, N>
230    where
231        Self: Stream<Item = Metric> + Unpin + Sized,
232        N: MetricNormalize + Default,
233    {
234        match maybe_ttl_secs {
235            None => Normalizer::new(self, N::default()),
236            Some(ttl) => {
237                Normalizer::new_with_ttl(self, N::default(), Duration::from_secs(ttl as u64))
238            }
239        }
240    }
241
242    /// Creates a [`Driver`] that uses the configured event stream as the input to the given
243    /// service.
244    ///
245    /// This is typically a terminal step in building a sink, bridging the gap from the processing
246    /// that must be performed by Vector (in the stream) to the underlying sink itself (the
247    /// service).
248    fn into_driver<Svc>(self, service: Svc) -> Driver<Self, Svc>
249    where
250        Self: Sized,
251        Self::Item: Finalizable,
252        Svc: Service<Self::Item>,
253        Svc::Error: fmt::Debug + 'static,
254        Svc::Future: Send + 'static,
255        Svc::Response: DriverResponse,
256    {
257        Driver::new(self, service)
258    }
259}
260
261#[pin_project]
262pub struct UnwrapInfallible<St> {
263    #[pin]
264    st: St,
265}
266
267impl<St, T> Stream for UnwrapInfallible<St>
268where
269    St: Stream<Item = Result<T, Infallible>>,
270{
271    type Item = T;
272
273    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
274        let this = self.project();
275        this.st
276            .poll_next(cx)
277            .map(|maybe| maybe.map(|result| result.unwrap()))
278    }
279}