1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
use std::{
    convert::Infallible,
    fmt,
    future::Future,
    hash::Hash,
    num::NonZeroUsize,
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
};

use futures_util::{stream::Map, Stream, StreamExt};
use pin_project::pin_project;
use tower::Service;
use tracing::Span;
use vector_lib::stream::{
    batcher::{config::BatchConfig, Batcher},
    ConcurrentMap, Driver, DriverResponse, ExpirationQueue, PartitionedBatcher,
};
use vector_lib::{
    event::{Finalizable, Metric},
    partition::Partitioner,
    ByteSizeOf,
};

use super::{
    buffer::metrics::MetricNormalize, IncrementalRequestBuilder, Normalizer, RequestBuilder,
};

impl<T: ?Sized> SinkBuilderExt for T where T: Stream {}

pub trait SinkBuilderExt: Stream {
    /// Converts a stream of infallible results by unwrapping them.
    ///
    /// For a stream of `Result<T, Infallible>` items, this turns it into a stream of `T` items.
    fn unwrap_infallible<T>(self) -> UnwrapInfallible<Self>
    where
        Self: Stream<Item = Result<T, Infallible>> + Sized,
    {
        UnwrapInfallible { st: self }
    }

    /// Batches the stream based on the given partitioner and batch settings.
    ///
    /// The stream will yield batches of events, with their partition key, when either a batch fills
    /// up or times out. [`Partitioner`] operates on a per-event basis, and has access to the event
    /// itself, and so can access any and all fields of an event.
    fn batched_partitioned<P, C, F, B>(
        self,
        partitioner: P,
        settings: F,
    ) -> PartitionedBatcher<Self, P, ExpirationQueue<P::Key>, C, F, B>
    where
        Self: Stream<Item = P::Item> + Sized,
        P: Partitioner + Unpin,
        P::Key: Eq + Hash + Clone,
        P::Item: ByteSizeOf,
        C: BatchConfig<P::Item>,
        F: Fn() -> C + Send,
    {
        PartitionedBatcher::new(self, partitioner, settings)
    }

    /// Batches the stream based on the given batch settings and item size calculator.
    ///
    /// The stream will yield batches of events, when either a batch fills
    /// up or times out. The `item_size_calculator` determines the "size" of each input
    /// in a batch. The units of "size" are intentionally not defined, so you can choose
    /// whatever is needed.
    fn batched<C>(self, config: C) -> Batcher<Self, C>
    where
        C: BatchConfig<Self::Item>,
        Self: Sized,
    {
        Batcher::new(self, config)
    }

    /// Maps the items in the stream concurrently, up to the configured limit.
    ///
    /// For every item, the given mapper is invoked, and the future that is returned is spawned
    /// and awaited concurrently.  A limit can be passed: `None` is self-describing, as it imposes
    /// no concurrency limit, and `Some(n)` limits this stage to `n` concurrent operations at any
    /// given time.
    ///
    /// If the spawned future panics, the panic will be carried through and resumed on the task
    /// calling the stream.
    fn concurrent_map<F, T>(self, limit: NonZeroUsize, f: F) -> ConcurrentMap<Self, T>
    where
        Self: Sized,
        F: Fn(Self::Item) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> + Send + 'static,
        T: Send + 'static,
    {
        ConcurrentMap::new(self, Some(limit), f)
    }

    /// Constructs a [`Stream`] which transforms the input into a request suitable for sending to
    /// downstream services.
    ///
    /// Each input is transformed concurrently, up to the given limit.  A limit of `n` limits
    /// this stage to `n` concurrent operations at any given time.
    ///
    /// Encoding and compression are handled internally, deferring to the builder at the necessary
    /// checkpoints for adjusting the event before encoding/compression, as well as generating the
    /// correct request object with the result of encoding/compressing the events.
    fn request_builder<B>(
        self,
        limit: NonZeroUsize,
        builder: B,
    ) -> ConcurrentMap<Self, Result<B::Request, B::Error>>
    where
        Self: Sized,
        Self::Item: Send + 'static,
        B: RequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
        B::Error: Send,
        B::Request: Send,
    {
        let builder = Arc::new(builder);

        // The future passed into the concurrent map is spawned in a tokio thread so we must preserve
        // the span context in order to propagate the sink's automatic tags.
        let span = Arc::new(Span::current());

        self.concurrent_map(limit, move |input| {
            let builder = Arc::clone(&builder);
            let span = Arc::clone(&span);

            Box::pin(async move {
                let _entered = span.enter();

                // Split the input into metadata and events.
                let (metadata, request_metadata_builder, events) = builder.split_input(input);

                // Encode the events.
                let payload = builder.encode_events(events)?;

                // Note: it would be nice for the RequestMetadataBuilder to build be created from the
                // events here, and not need to be required by split_input(). But this then requires
                // each Event type to implement Serialize, and that causes conflicts with the Serialize
                // implementation for EstimatedJsonEncodedSizeOf.

                // Build the request metadata.
                let request_metadata = request_metadata_builder.build(&payload);

                // Now build the actual request.
                Ok(builder.build_request(metadata, request_metadata, payload))
            })
        })
    }

    /// Constructs a [`Stream`] which transforms the input into a number of requests suitable for
    /// sending to downstream services.
    ///
    /// Unlike `request_builder`, which depends on the `RequestBuilder` trait,
    /// `incremental_request_builder` depends on the `IncrementalRequestBuilder` trait, which is
    /// designed specifically for sinks that have more stringent requirements around the generated
    /// requests.
    ///
    /// As an example, the normal `request_builder` doesn't allow for a batch of input events to be
    /// split up: all events must be split at the beginning, encoded separately (and all together),
    /// and then reassembled into the request.  If the encoding of these events caused a payload to
    /// be generated that was, say, too large, you would have to back out the operation entirely by
    /// failing the batch.
    ///
    /// With `incremental_request_builder`, the builder is given all of the events in a single shot,
    /// and can generate multiple payloads.  This is the maximally flexible approach to encoding,
    /// but means that the trait doesn't provide any default methods like `RequestBuilder` does.
    ///
    /// Each input is transformed serially.
    ///
    /// Encoding and compression are handled internally, deferring to the builder at the necessary
    /// checkpoints for adjusting the event before encoding/compression, as well as generating the
    /// correct request object with the result of encoding/compressing the events.
    fn incremental_request_builder<B>(
        self,
        mut builder: B,
    ) -> Map<Self, Box<dyn FnMut(Self::Item) -> Vec<Result<B::Request, B::Error>> + Send + Sync>>
    where
        Self: Sized,
        Self::Item: Send + 'static,
        B: IncrementalRequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
        B::Error: Send,
        B::Request: Send,
    {
        self.map(Box::new(move |input| {
            builder
                .encode_events_incremental(input)
                .into_iter()
                .map(|result| {
                    result.map(|(metadata, payload)| builder.build_request(metadata, payload))
                })
                .collect()
        }))
    }

    /// Normalizes a stream of [`Metric`] events with the provided normalizer.
    ///
    /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
    /// supported by the sink, or to modify them.  Such modifications typically include converting
    /// absolute metrics to incremental metrics by tracking the change over time for a particular
    /// series, or emitting absolute metrics based on incremental updates.
    fn normalized<N>(self, normalizer: N) -> Normalizer<Self, N>
    where
        Self: Stream<Item = Metric> + Unpin + Sized,
        N: MetricNormalize,
    {
        Normalizer::new(self, normalizer)
    }

    /// Normalizes a stream of [`Metric`] events with a default normalizer.
    ///
    /// An implementation of [`MetricNormalize`] is used to either drop metrics which cannot be
    /// supported by the sink, or to modify them.  Such modifications typically include converting
    /// absolute metrics to incremental metrics by tracking the change over time for a particular
    /// series, or emitting absolute metrics based on incremental updates.
    fn normalized_with_default<N>(self) -> Normalizer<Self, N>
    where
        Self: Stream<Item = Metric> + Unpin + Sized,
        N: MetricNormalize + Default,
    {
        Normalizer::new(self, N::default())
    }

    /// Creates a [`Driver`] that uses the configured event stream as the input to the given
    /// service.
    ///
    /// This is typically a terminal step in building a sink, bridging the gap from the processing
    /// that must be performed by Vector (in the stream) to the underlying sink itself (the
    /// service).
    fn into_driver<Svc>(self, service: Svc) -> Driver<Self, Svc>
    where
        Self: Sized,
        Self::Item: Finalizable,
        Svc: Service<Self::Item>,
        Svc::Error: fmt::Debug + 'static,
        Svc::Future: Send + 'static,
        Svc::Response: DriverResponse,
    {
        Driver::new(self, service)
    }
}

#[pin_project]
pub struct UnwrapInfallible<St> {
    #[pin]
    st: St,
}

impl<St, T> Stream for UnwrapInfallible<St>
where
    St: Stream<Item = Result<T, Infallible>>,
{
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.project();
        this.st
            .poll_next(cx)
            .map(|maybe| maybe.map(|result| result.unwrap()))
    }
}