vector/sinks/util/
builder.rs

1use futures_util::{stream::Map, Stream, StreamExt};
2use pin_project::pin_project;
3use std::time::Duration;
4use std::{
5    convert::Infallible,
6    fmt,
7    future::Future,
8    hash::Hash,
9    num::NonZeroUsize,
10    pin::Pin,
11    sync::Arc,
12    task::{Context, Poll},
13};
14use tower::Service;
15use tracing::Span;
16use vector_lib::stream::{
17    batcher::{config::BatchConfig, Batcher},
18    ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher,
19};
20use vector_lib::{
21    event::{Finalizable, Metric},
22    partition::Partitioner,
23    ByteSizeOf,
24};
25
26use super::{
27    buffer::metrics::MetricNormalize, IncrementalRequestBuilder, Normalizer, RequestBuilder,
28};
29
30impl<T: ?Sized> SinkBuilderExt for T where T: Stream {}
31
32pub trait SinkBuilderExt: Stream {
33    /// Converts a stream of infallible results by unwrapping them.
34    ///
35    /// For a stream of `Result<T, Infallible>` items, this turns it into a stream of `T` items.
36    fn unwrap_infallible<T>(self) -> UnwrapInfallible<Self>
37    where
38        Self: Stream<Item = Result<T, Infallible>> + Sized,
39    {
40        UnwrapInfallible { st: self }
41    }
42
43    /// Batches the stream based on the given partitioner and batch settings.
44    ///
45    /// The stream will yield batches of events, with their partition key, when either a batch fills
46    /// up or times out. [`Partitioner`] operates on a per-event basis, and has access to the event
47    /// itself, and so can access any and all fields of an event.
48    fn batched_partitioned<P, C, F, B>(
49        self,
50        partitioner: P,
51        settings: F,
52    ) -> PartitionedBatcher<Self, P, ExpirationQueue<P::Key>, C, F, B>
53    where
54        Self: Stream<Item = P::Item> + Sized,
55        P: Partitioner + Unpin,
56        P::Key: Eq + Hash + Clone,
57        P::Item: ByteSizeOf,
58        C: BatchConfig<P::Item>,
59        F: Fn() -> C + Send,
60    {
61        PartitionedBatcher::new(self, partitioner, settings)
62    }
63
64    /// Batches the stream based on the given batch settings and item size calculator.
65    ///
66    /// The stream will yield batches of events, when either a batch fills
67    /// up or times out. The `item_size_calculator` determines the "size" of each input
68    /// in a batch. The units of "size" are intentionally not defined, so you can choose
69    /// whatever is needed.
70    fn batched<C>(self, config: C) -> Batcher<Self, C>
71    where
72        C: BatchConfig<Self::Item>,
73        Self: Sized,
74    {
75        Batcher::new(self, config)
76    }
77
78    /// Maps the items in the stream concurrently, up to the configured limit.
79    ///
80    /// For every item, the given mapper is invoked, and the future that is returned is spawned
81    /// and awaited concurrently.  A limit can be passed: `None` is self-describing, as it imposes
82    /// no concurrency limit, and `Some(n)` limits this stage to `n` concurrent operations at any
83    /// given time.
84    ///
85    /// If the spawned future panics, the panic will be carried through and resumed on the task
86    /// calling the stream.
87    fn concurrent_map<F, T>(self, limit: NonZeroUsize, f: F) -> ConcurrentMap<Self, T>
88    where
89        Self: Sized,
90        F: Fn(Self::Item) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> + Send + 'static,
91        T: Send + 'static,
92    {
93        ConcurrentMap::new(self, Some(limit), f)
94    }
95
96    /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to
97    /// downstream services.
98    ///
99    /// Each input is transformed concurrently, up to the given limit.  A limit of `n` limits
100    /// this stage to `n` concurrent operations at any given time.
101    ///
102    /// Encoding and compression are handled internally, deferring to the builder at the necessary
103    /// checkpoints for adjusting the event before encoding/compression, as well as generating the
104    /// correct request object with the result of encoding/compressing the events.
105    fn request_builder<B>(
106        self,
107        limit: NonZeroUsize,
108        builder: B,
109    ) -> ConcurrentMap<Self, Result<B::Request, B::Error>>
110    where
111        Self: Sized,
112        Self::Item: Send + 'static,
113        B: RequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
114        B::Error: Send,
115        B::Request: Send,
116    {
117        let builder = Arc::new(builder);
118
119        // The future passed into the concurrent map is spawned in a tokio thread so we must preserve
120        // the span context in order to propagate the sink's automatic tags.
121        let span = Arc::new(Span::current());
122
123        self.concurrent_map(limit, move |input| {
124            let builder = Arc::clone(&builder);
125            let span = Arc::clone(&span);
126
127            Box::pin(async move {
128                let _entered = span.enter();
129
130                // Split the input into metadata and events.
131                let (metadata, request_metadata_builder, events) = builder.split_input(input);
132
133                // Encode the events.
134                let payload = builder.encode_events(events)?;
135
136                // Note: it would be nice for the RequestMetadataBuilder to build be created from the
137                // events here, and not need to be required by split_input(). But this then requires
138                // each Event type to implement Serialize, and that causes conflicts with the Serialize
139                // implementation for EstimatedJsonEncodedSizeOf.
140
141                // Build the request metadata.
142                let request_metadata = request_metadata_builder.build(&payload);
143
144                // Now build the actual request.
145                Ok(builder.build_request(metadata, request_metadata, payload))
146            })
147        })
148    }
149
150    /// Constructs a [`Stream`] which transforms the input into a number of requests suitable for
151    /// sending to downstream services.
152    ///
153    /// Unlike `request_builder`, which depends on the `RequestBuilder` trait,
154    /// `incremental_request_builder` depends on the `IncrementalRequestBuilder` trait, which is
155    /// designed specifically for sinks that have more stringent requirements around the generated
156    /// requests.
157    ///
158    /// As an example, the normal `request_builder` doesn't allow for a batch of input events to be
159    /// split up: all events must be split at the beginning, encoded separately (and all together),
160    /// and then reassembled into the request.  If the encoding of these events caused a payload to
161    /// be generated that was, say, too large, you would have to back out the operation entirely by
162    /// failing the batch.
163    ///
164    /// With `incremental_request_builder`, the builder is given all of the events in a single shot,
165    /// and can generate multiple payloads.  This is the maximally flexible approach to encoding,
166    /// but means that the trait doesn't provide any default methods like `RequestBuilder` does.
167    ///
168    /// Each input is transformed serially.
169    ///
170    /// Encoding and compression are handled internally, deferring to the builder at the necessary
171    /// checkpoints for adjusting the event before encoding/compression, as well as generating the
172    /// correct request object with the result of encoding/compressing the events.
173    fn incremental_request_builder<B>(
174        self,
175        mut builder: B,
176    ) -> Map<Self, Box<dyn FnMut(Self::Item) -> Vec<Result<B::Request, B::Error>> + Send + Sync>>
177    where
178        Self: Sized,
179        Self::Item: Send + 'static,
180        B: IncrementalRequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
181        B::Error: Send,
182        B::Request: Send,
183    {
184        self.map(Box::new(move |input| {
185            builder
186                .encode_events_incremental(input)
187                .into_iter()
188                .map(|result| {
189                    result.map(|(metadata, payload)| builder.build_request(metadata, payload))
190                })
191                .collect()
192        }))
193    }
194
195    /// Normalizes a stream of [`Metric`] events with the provided normalizer.
196    ///
197    /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
198    /// supported by the sink, or to modify them.  Such modifications typically include converting
199    /// absolute metrics to incremental metrics by tracking the change over time for a particular
200    /// series, or emitting absolute metrics based on incremental updates.
201    fn normalized<N>(self, normalizer: N) -> Normalizer<Self, N>
202    where
203        Self: Stream<Item = Metric> + Unpin + Sized,
204        N: MetricNormalize,
205    {
206        Normalizer::new(self, normalizer)
207    }
208
209    /// Normalizes a stream of [`Metric`] events with a default normalizer.
210    ///
211    /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
212    /// supported by the sink, or to modify them.  Such modifications typically include converting
213    /// absolute metrics to incremental metrics by tracking the change over time for a particular
214    /// series, or emitting absolute metrics based on incremental updates.
215    fn normalized_with_default<N>(self) -> Normalizer<Self, N>
216    where
217        Self: Stream<Item = Metric> + Unpin + Sized,
218        N: MetricNormalize + Default,
219    {
220        Normalizer::new(self, N::default())
221    }
222
223    /// Normalizes a stream of [`Metric`] events with a normalizer and an optional TTL.
224    fn normalized_with_ttl<N>(self, maybe_ttl_secs: Option<f64>) -> Normalizer<Self, N>
225    where
226        Self: Stream<Item = Metric> + Unpin + Sized,
227        N: MetricNormalize + Default,
228    {
229        match maybe_ttl_secs {
230            None => Normalizer::new(self, N::default()),
231            Some(ttl) => {
232                Normalizer::new_with_ttl(self, N::default(), Duration::from_secs(ttl as u64))
233            }
234        }
235    }
236
237    /// Creates a [`Driver`] that uses the configured event stream as the input to the given
238    /// service.
239    ///
240    /// This is typically a terminal step in building a sink, bridging the gap from the processing
241    /// that must be performed by Vector (in the stream) to the underlying sink itself (the
242    /// service).
243    fn into_driver<Svc>(self, service: Svc) -> Driver<Self, Svc>
244    where
245        Self: Sized,
246        Self::Item: Finalizable,
247        Svc: Service<Self::Item>,
248        Svc::Error: fmt::Debug + 'static,
249        Svc::Future: Send + 'static,
250        Svc::Response: DriverResponse,
251    {
252        Driver::new(self, service)
253    }
254}
255
256#[pin_project]
257pub struct UnwrapInfallible<St> {
258    #[pin]
259    st: St,
260}
261
262impl<St, T> Stream for UnwrapInfallible<St>
263where
264    St: Stream<Item = Result<T, Infallible>>,
265{
266    type Item = T;
267
268    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
269        let this = self.project();
270        this.st
271            .poll_next(cx)
272            .map(|maybe| maybe.map(|result| result.unwrap()))
273    }
274}