vector/sinks/util/builder.rs
1use futures_util::{stream::Map, Stream, StreamExt};
2use pin_project::pin_project;
3use std::time::Duration;
4use std::{
5 convert::Infallible,
6 fmt,
7 future::Future,
8 hash::Hash,
9 num::NonZeroUsize,
10 pin::Pin,
11 sync::Arc,
12 task::{Context, Poll},
13};
14use tower::Service;
15use tracing::Span;
16use vector_lib::stream::{
17 batcher::{config::BatchConfig, Batcher},
18 ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher,
19};
20use vector_lib::{
21 event::{Finalizable, Metric},
22 partition::Partitioner,
23 ByteSizeOf,
24};
25
26use super::{
27 buffer::metrics::MetricNormalize, IncrementalRequestBuilder, Normalizer, RequestBuilder,
28};
29
30impl<T: ?Sized> SinkBuilderExt for T where T: Stream {}
31
32pub trait SinkBuilderExt: Stream {
33 /// Converts a stream of infallible results by unwrapping them.
34 ///
35 /// For a stream of `Result<T, Infallible>` items, this turns it into a stream of `T` items.
36 fn unwrap_infallible<T>(self) -> UnwrapInfallible<Self>
37 where
38 Self: Stream<Item = Result<T, Infallible>> + Sized,
39 {
40 UnwrapInfallible { st: self }
41 }
42
43 /// Batches the stream based on the given partitioner and batch settings.
44 ///
45 /// The stream will yield batches of events, with their partition key, when either a batch fills
46 /// up or times out. [`Partitioner`] operates on a per-event basis, and has access to the event
47 /// itself, and so can access any and all fields of an event.
48 fn batched_partitioned<P, C, F, B>(
49 self,
50 partitioner: P,
51 settings: F,
52 ) -> PartitionedBatcher<Self, P, ExpirationQueue<P::Key>, C, F, B>
53 where
54 Self: Stream<Item = P::Item> + Sized,
55 P: Partitioner + Unpin,
56 P::Key: Eq + Hash + Clone,
57 P::Item: ByteSizeOf,
58 C: BatchConfig<P::Item>,
59 F: Fn() -> C + Send,
60 {
61 PartitionedBatcher::new(self, partitioner, settings)
62 }
63
64 /// Batches the stream based on the given batch settings and item size calculator.
65 ///
66 /// The stream will yield batches of events, when either a batch fills
67 /// up or times out. The `item_size_calculator` determines the "size" of each input
68 /// in a batch. The units of "size" are intentionally not defined, so you can choose
69 /// whatever is needed.
70 fn batched<C>(self, config: C) -> Batcher<Self, C>
71 where
72 C: BatchConfig<Self::Item>,
73 Self: Sized,
74 {
75 Batcher::new(self, config)
76 }
77
78 /// Maps the items in the stream concurrently, up to the configured limit.
79 ///
80 /// For every item, the given mapper is invoked, and the future that is returned is spawned
81 /// and awaited concurrently. A limit can be passed: `None` is self-describing, as it imposes
82 /// no concurrency limit, and `Some(n)` limits this stage to `n` concurrent operations at any
83 /// given time.
84 ///
85 /// If the spawned future panics, the panic will be carried through and resumed on the task
86 /// calling the stream.
87 fn concurrent_map<F, T>(self, limit: NonZeroUsize, f: F) -> ConcurrentMap<Self, T>
88 where
89 Self: Sized,
90 F: Fn(Self::Item) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> + Send + 'static,
91 T: Send + 'static,
92 {
93 ConcurrentMap::new(self, Some(limit), f)
94 }
95
96 /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to
97 /// downstream services.
98 ///
99 /// Each input is transformed concurrently, up to the given limit. A limit of `n` limits
100 /// this stage to `n` concurrent operations at any given time.
101 ///
102 /// Encoding and compression are handled internally, deferring to the builder at the necessary
103 /// checkpoints for adjusting the event before encoding/compression, as well as generating the
104 /// correct request object with the result of encoding/compressing the events.
105 fn request_builder<B>(
106 self,
107 limit: NonZeroUsize,
108 builder: B,
109 ) -> ConcurrentMap<Self, Result<B::Request, B::Error>>
110 where
111 Self: Sized,
112 Self::Item: Send + 'static,
113 B: RequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
114 B::Error: Send,
115 B::Request: Send,
116 {
117 let builder = Arc::new(builder);
118
119 // The future passed into the concurrent map is spawned in a tokio thread so we must preserve
120 // the span context in order to propagate the sink's automatic tags.
121 let span = Arc::new(Span::current());
122
123 self.concurrent_map(limit, move |input| {
124 let builder = Arc::clone(&builder);
125 let span = Arc::clone(&span);
126
127 Box::pin(async move {
128 let _entered = span.enter();
129
130 // Split the input into metadata and events.
131 let (metadata, request_metadata_builder, events) = builder.split_input(input);
132
133 // Encode the events.
134 let payload = builder.encode_events(events)?;
135
136 // Note: it would be nice for the RequestMetadataBuilder to build be created from the
137 // events here, and not need to be required by split_input(). But this then requires
138 // each Event type to implement Serialize, and that causes conflicts with the Serialize
139 // implementation for EstimatedJsonEncodedSizeOf.
140
141 // Build the request metadata.
142 let request_metadata = request_metadata_builder.build(&payload);
143
144 // Now build the actual request.
145 Ok(builder.build_request(metadata, request_metadata, payload))
146 })
147 })
148 }
149
150 /// Constructs a [`Stream`] which transforms the input into a number of requests suitable for
151 /// sending to downstream services.
152 ///
153 /// Unlike `request_builder`, which depends on the `RequestBuilder` trait,
154 /// `incremental_request_builder` depends on the `IncrementalRequestBuilder` trait, which is
155 /// designed specifically for sinks that have more stringent requirements around the generated
156 /// requests.
157 ///
158 /// As an example, the normal `request_builder` doesn't allow for a batch of input events to be
159 /// split up: all events must be split at the beginning, encoded separately (and all together),
160 /// and then reassembled into the request. If the encoding of these events caused a payload to
161 /// be generated that was, say, too large, you would have to back out the operation entirely by
162 /// failing the batch.
163 ///
164 /// With `incremental_request_builder`, the builder is given all of the events in a single shot,
165 /// and can generate multiple payloads. This is the maximally flexible approach to encoding,
166 /// but means that the trait doesn't provide any default methods like `RequestBuilder` does.
167 ///
168 /// Each input is transformed serially.
169 ///
170 /// Encoding and compression are handled internally, deferring to the builder at the necessary
171 /// checkpoints for adjusting the event before encoding/compression, as well as generating the
172 /// correct request object with the result of encoding/compressing the events.
173 fn incremental_request_builder<B>(
174 self,
175 mut builder: B,
176 ) -> Map<Self, Box<dyn FnMut(Self::Item) -> Vec<Result<B::Request, B::Error>> + Send + Sync>>
177 where
178 Self: Sized,
179 Self::Item: Send + 'static,
180 B: IncrementalRequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
181 B::Error: Send,
182 B::Request: Send,
183 {
184 self.map(Box::new(move |input| {
185 builder
186 .encode_events_incremental(input)
187 .into_iter()
188 .map(|result| {
189 result.map(|(metadata, payload)| builder.build_request(metadata, payload))
190 })
191 .collect()
192 }))
193 }
194
195 /// Normalizes a stream of [`Metric`] events with the provided normalizer.
196 ///
197 /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
198 /// supported by the sink, or to modify them. Such modifications typically include converting
199 /// absolute metrics to incremental metrics by tracking the change over time for a particular
200 /// series, or emitting absolute metrics based on incremental updates.
201 fn normalized<N>(self, normalizer: N) -> Normalizer<Self, N>
202 where
203 Self: Stream<Item = Metric> + Unpin + Sized,
204 N: MetricNormalize,
205 {
206 Normalizer::new(self, normalizer)
207 }
208
209 /// Normalizes a stream of [`Metric`] events with a default normalizer.
210 ///
211 /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
212 /// supported by the sink, or to modify them. Such modifications typically include converting
213 /// absolute metrics to incremental metrics by tracking the change over time for a particular
214 /// series, or emitting absolute metrics based on incremental updates.
215 fn normalized_with_default<N>(self) -> Normalizer<Self, N>
216 where
217 Self: Stream<Item = Metric> + Unpin + Sized,
218 N: MetricNormalize + Default,
219 {
220 Normalizer::new(self, N::default())
221 }
222
223 /// Normalizes a stream of [`Metric`] events with a normalizer and an optional TTL.
224 fn normalized_with_ttl<N>(self, maybe_ttl_secs: Option<f64>) -> Normalizer<Self, N>
225 where
226 Self: Stream<Item = Metric> + Unpin + Sized,
227 N: MetricNormalize + Default,
228 {
229 match maybe_ttl_secs {
230 None => Normalizer::new(self, N::default()),
231 Some(ttl) => {
232 Normalizer::new_with_ttl(self, N::default(), Duration::from_secs(ttl as u64))
233 }
234 }
235 }
236
237 /// Creates a [`Driver`] that uses the configured event stream as the input to the given
238 /// service.
239 ///
240 /// This is typically a terminal step in building a sink, bridging the gap from the processing
241 /// that must be performed by Vector (in the stream) to the underlying sink itself (the
242 /// service).
243 fn into_driver<Svc>(self, service: Svc) -> Driver<Self, Svc>
244 where
245 Self: Sized,
246 Self::Item: Finalizable,
247 Svc: Service<Self::Item>,
248 Svc::Error: fmt::Debug + 'static,
249 Svc::Future: Send + 'static,
250 Svc::Response: DriverResponse,
251 {
252 Driver::new(self, service)
253 }
254}
255
256#[pin_project]
257pub struct UnwrapInfallible<St> {
258 #[pin]
259 st: St,
260}
261
262impl<St, T> Stream for UnwrapInfallible<St>
263where
264 St: Stream<Item = Result<T, Infallible>>,
265{
266 type Item = T;
267
268 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
269 let this = self.project();
270 this.st
271 .poll_next(cx)
272 .map(|maybe| maybe.map(|result| result.unwrap()))
273 }
274}