vector/sinks/util/builder.rs
1use std::{
2 convert::Infallible,
3 fmt,
4 future::Future,
5 hash::Hash,
6 num::NonZeroUsize,
7 pin::Pin,
8 sync::Arc,
9 task::{Context, Poll},
10 time::Duration,
11};
12
13use futures_util::{Stream, StreamExt, stream::Map};
14use pin_project::pin_project;
15use tower::Service;
16use tracing::Span;
17use vector_lib::{
18 ByteSizeOf,
19 event::{Finalizable, Metric},
20 partition::Partitioner,
21 stream::{
22 ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher,
23 batcher::{Batcher, config::BatchConfig},
24 },
25};
26
27use super::{
28 IncrementalRequestBuilder, Normalizer, RequestBuilder, buffer::metrics::MetricNormalize,
29};
30
31impl<T: ?Sized> SinkBuilderExt for T where T: Stream {}
32
33pub trait SinkBuilderExt: Stream {
34 /// Converts a stream of infallible results by unwrapping them.
35 ///
36 /// For a stream of `Result<T, Infallible>` items, this turns it into a stream of `T` items.
37 fn unwrap_infallible<T>(self) -> UnwrapInfallible<Self>
38 where
39 Self: Stream<Item = Result<T, Infallible>> + Sized,
40 {
41 UnwrapInfallible { st: self }
42 }
43
44 /// Batches the stream based on the given partitioner and batch settings.
45 ///
46 /// The stream will yield batches of events, with their partition key, when either a batch fills
47 /// up or times out. [`Partitioner`] operates on a per-event basis, and has access to the event
48 /// itself, and so can access any and all fields of an event.
49 fn batched_partitioned<P, C, F, B>(
50 self,
51 partitioner: P,
52 settings: F,
53 ) -> PartitionedBatcher<Self, P, ExpirationQueue<P::Key>, C, F, B>
54 where
55 Self: Stream<Item = P::Item> + Sized,
56 P: Partitioner + Unpin,
57 P::Key: Eq + Hash + Clone,
58 P::Item: ByteSizeOf,
59 C: BatchConfig<P::Item>,
60 F: Fn() -> C + Send,
61 {
62 PartitionedBatcher::new(self, partitioner, settings)
63 }
64
65 /// Batches the stream based on the given batch settings and item size calculator.
66 ///
67 /// The stream will yield batches of events, when either a batch fills
68 /// up or times out. The `item_size_calculator` determines the "size" of each input
69 /// in a batch. The units of "size" are intentionally not defined, so you can choose
70 /// whatever is needed.
71 fn batched<C>(self, config: C) -> Batcher<Self, C>
72 where
73 C: BatchConfig<Self::Item>,
74 Self: Sized,
75 {
76 Batcher::new(self, config)
77 }
78
79 /// Maps the items in the stream concurrently, up to the configured limit.
80 ///
81 /// For every item, the given mapper is invoked, and the future that is returned is spawned
82 /// and awaited concurrently. A limit can be passed: `None` is self-describing, as it imposes
83 /// no concurrency limit, and `Some(n)` limits this stage to `n` concurrent operations at any
84 /// given time.
85 ///
86 /// If the spawned future panics, the panic will be carried through and resumed on the task
87 /// calling the stream.
88 fn concurrent_map<F, T>(self, limit: NonZeroUsize, f: F) -> ConcurrentMap<Self, T>
89 where
90 Self: Sized,
91 F: Fn(Self::Item) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> + Send + 'static,
92 T: Send + 'static,
93 {
94 ConcurrentMap::new(self, Some(limit), f)
95 }
96
97 /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to
98 /// downstream services.
99 ///
100 /// Each input is transformed concurrently, up to the given limit. A limit of `n` limits
101 /// this stage to `n` concurrent operations at any given time.
102 ///
103 /// Encoding and compression are handled internally, deferring to the builder at the necessary
104 /// checkpoints for adjusting the event before encoding/compression, as well as generating the
105 /// correct request object with the result of encoding/compressing the events.
106 fn request_builder<B>(
107 self,
108 limit: NonZeroUsize,
109 builder: B,
110 ) -> ConcurrentMap<Self, Result<B::Request, B::Error>>
111 where
112 Self: Sized,
113 Self::Item: Send + 'static,
114 B: RequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
115 B::Error: Send,
116 B::Request: Send,
117 {
118 let builder = Arc::new(builder);
119
120 // The future passed into the concurrent map is spawned in a tokio thread so we must preserve
121 // the span context in order to propagate the sink's automatic tags.
122 let span = Arc::new(Span::current());
123
124 self.concurrent_map(limit, move |input| {
125 let builder = Arc::clone(&builder);
126 let span = Arc::clone(&span);
127
128 Box::pin(async move {
129 let _entered = span.enter();
130
131 // Split the input into metadata and events.
132 let (metadata, request_metadata_builder, events) = builder.split_input(input);
133
134 // Encode the events.
135 let payload = builder.encode_events(events)?;
136
137 // Note: it would be nice for the RequestMetadataBuilder to build be created from the
138 // events here, and not need to be required by split_input(). But this then requires
139 // each Event type to implement Serialize, and that causes conflicts with the Serialize
140 // implementation for EstimatedJsonEncodedSizeOf.
141
142 // Build the request metadata.
143 let request_metadata = request_metadata_builder.build(&payload);
144
145 // Now build the actual request.
146 Ok(builder.build_request(metadata, request_metadata, payload))
147 })
148 })
149 }
150
151 /// Constructs a [`Stream`] which transforms the input into a number of requests suitable for
152 /// sending to downstream services.
153 ///
154 /// Unlike `request_builder`, which depends on the `RequestBuilder` trait,
155 /// `incremental_request_builder` depends on the `IncrementalRequestBuilder` trait, which is
156 /// designed specifically for sinks that have more stringent requirements around the generated
157 /// requests.
158 ///
159 /// As an example, the normal `request_builder` doesn't allow for a batch of input events to be
160 /// split up: all events must be split at the beginning, encoded separately (and all together),
161 /// and then reassembled into the request. If the encoding of these events caused a payload to
162 /// be generated that was, say, too large, you would have to back out the operation entirely by
163 /// failing the batch.
164 ///
165 /// With `incremental_request_builder`, the builder is given all of the events in a single shot,
166 /// and can generate multiple payloads. This is the maximally flexible approach to encoding,
167 /// but means that the trait doesn't provide any default methods like `RequestBuilder` does.
168 ///
169 /// Each input is transformed serially.
170 ///
171 /// Encoding and compression are handled internally, deferring to the builder at the necessary
172 /// checkpoints for adjusting the event before encoding/compression, as well as generating the
173 /// correct request object with the result of encoding/compressing the events.
174 fn incremental_request_builder<B>(
175 self,
176 mut builder: B,
177 ) -> Map<Self, Box<dyn FnMut(Self::Item) -> Vec<Result<B::Request, B::Error>> + Send + Sync>>
178 where
179 Self: Sized,
180 Self::Item: Send + 'static,
181 B: IncrementalRequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
182 B::Error: Send,
183 B::Request: Send,
184 {
185 self.map(Box::new(move |input| {
186 builder
187 .encode_events_incremental(input)
188 .into_iter()
189 .map(|result| {
190 result.map(|(metadata, payload)| builder.build_request(metadata, payload))
191 })
192 .collect()
193 }))
194 }
195
196 /// Normalizes a stream of [`Metric`] events with the provided normalizer.
197 ///
198 /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
199 /// supported by the sink, or to modify them. Such modifications typically include converting
200 /// absolute metrics to incremental metrics by tracking the change over time for a particular
201 /// series, or emitting absolute metrics based on incremental updates.
202 fn normalized<N>(self, normalizer: N) -> Normalizer<Self, N>
203 where
204 Self: Stream<Item = Metric> + Unpin + Sized,
205 N: MetricNormalize,
206 {
207 Normalizer::new(self, normalizer)
208 }
209
210 /// Normalizes a stream of [`Metric`] events with a default normalizer.
211 ///
212 /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
213 /// supported by the sink, or to modify them. Such modifications typically include converting
214 /// absolute metrics to incremental metrics by tracking the change over time for a particular
215 /// series, or emitting absolute metrics based on incremental updates.
216 fn normalized_with_default<N>(self) -> Normalizer<Self, N>
217 where
218 Self: Stream<Item = Metric> + Unpin + Sized,
219 N: MetricNormalize + Default,
220 {
221 Normalizer::new(self, N::default())
222 }
223
224 /// Normalizes a stream of [`Metric`] events with a normalizer and an optional TTL.
225 fn normalized_with_ttl<N>(self, maybe_ttl_secs: Option<f64>) -> Normalizer<Self, N>
226 where
227 Self: Stream<Item = Metric> + Unpin + Sized,
228 N: MetricNormalize + Default,
229 {
230 match maybe_ttl_secs {
231 None => Normalizer::new(self, N::default()),
232 Some(ttl) => {
233 Normalizer::new_with_ttl(self, N::default(), Duration::from_secs(ttl as u64))
234 }
235 }
236 }
237
238 /// Creates a [`Driver`] that uses the configured event stream as the input to the given
239 /// service.
240 ///
241 /// This is typically a terminal step in building a sink, bridging the gap from the processing
242 /// that must be performed by Vector (in the stream) to the underlying sink itself (the
243 /// service).
244 fn into_driver<Svc>(self, service: Svc) -> Driver<Self, Svc>
245 where
246 Self: Sized,
247 Self::Item: Finalizable,
248 Svc: Service<Self::Item>,
249 Svc::Error: fmt::Debug + 'static,
250 Svc::Future: Send + 'static,
251 Svc::Response: DriverResponse,
252 {
253 Driver::new(self, service)
254 }
255}
256
257#[pin_project]
258pub struct UnwrapInfallible<St> {
259 #[pin]
260 st: St,
261}
262
263impl<St, T> Stream for UnwrapInfallible<St>
264where
265 St: Stream<Item = Result<T, Infallible>>,
266{
267 type Item = T;
268
269 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
270 let this = self.project();
271 this.st
272 .poll_next(cx)
273 .map(|maybe| maybe.map(|result| result.unwrap()))
274 }
275}