pub trait SinkBuilderExt: Stream {
// Provided methods
fn unwrap_infallible<T>(self) -> UnwrapInfallible<Self>
where Self: Stream<Item = Result<T, Infallible>> + Sized { ... }
fn batched_partitioned<P, C, F, B>(
self,
partitioner: P,
settings: F,
) -> PartitionedBatcher<Self, P, ExpirationQueue<P::Key>, C, F, B>
where Self: Stream<Item = P::Item> + Sized,
P: Partitioner + Unpin,
P::Key: Eq + Hash + Clone,
P::Item: ByteSizeOf,
C: BatchConfig<P::Item>,
F: Fn() -> C + Send { ... }
fn batched<C>(self, config: C) -> Batcher<Self, C>
where C: BatchConfig<Self::Item>,
Self: Sized { ... }
fn concurrent_map<F, T>(
self,
limit: NonZeroUsize,
f: F,
) -> ConcurrentMap<Self, T>
where Self: Sized,
F: Fn(Self::Item) -> Pin<Box<dyn Future<Output = T> + Send + 'static>> + Send + 'static,
T: Send + 'static { ... }
fn request_builder<B>(
self,
limit: NonZeroUsize,
builder: B,
) -> ConcurrentMap<Self, Result<B::Request, B::Error>>
where Self: Sized,
Self::Item: Send + 'static,
B: RequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
B::Error: Send,
B::Request: Send { ... }
fn incremental_request_builder<B>(
self,
builder: B,
) -> Map<Self, Box<dyn FnMut(Self::Item) -> Vec<Result<B::Request, B::Error>> + Send + Sync>>
where Self: Sized,
Self::Item: Send + 'static,
B: IncrementalRequestBuilder<<Self as Stream>::Item> + Send + Sync + 'static,
B::Error: Send,
B::Request: Send { ... }
fn normalized<N>(self, normalizer: N) -> Normalizer<Self, N>
where Self: Stream<Item = Metric> + Unpin + Sized,
N: MetricNormalize { ... }
fn normalized_with_default<N>(self) -> Normalizer<Self, N>
where Self: Stream<Item = Metric> + Unpin + Sized,
N: MetricNormalize + Default { ... }
fn into_driver<Svc>(self, service: Svc) -> Driver<Self, Svc>
where Self: Sized,
Self::Item: Finalizable,
Svc: Service<Self::Item>,
Svc::Error: Debug + 'static,
Svc::Future: Send + 'static,
Svc::Response: DriverResponse { ... }
}
Provided Methods§
sourcefn unwrap_infallible<T>(self) -> UnwrapInfallible<Self>
fn unwrap_infallible<T>(self) -> UnwrapInfallible<Self>
Converts a stream of infallible results by unwrapping them.
For a stream of Result<T, Infallible>
items, this turns it into a stream of T
items.
sourcefn batched_partitioned<P, C, F, B>(
self,
partitioner: P,
settings: F,
) -> PartitionedBatcher<Self, P, ExpirationQueue<P::Key>, C, F, B>where
Self: Stream<Item = P::Item> + Sized,
P: Partitioner + Unpin,
P::Key: Eq + Hash + Clone,
P::Item: ByteSizeOf,
C: BatchConfig<P::Item>,
F: Fn() -> C + Send,
fn batched_partitioned<P, C, F, B>(
self,
partitioner: P,
settings: F,
) -> PartitionedBatcher<Self, P, ExpirationQueue<P::Key>, C, F, B>where
Self: Stream<Item = P::Item> + Sized,
P: Partitioner + Unpin,
P::Key: Eq + Hash + Clone,
P::Item: ByteSizeOf,
C: BatchConfig<P::Item>,
F: Fn() -> C + Send,
Batches the stream based on the given partitioner and batch settings.
The stream will yield batches of events, with their partition key, when either a batch fills
up or times out. Partitioner
operates on a per-event basis, and has access to the event
itself, and so can access any and all fields of an event.
sourcefn batched<C>(self, config: C) -> Batcher<Self, C>where
C: BatchConfig<Self::Item>,
Self: Sized,
fn batched<C>(self, config: C) -> Batcher<Self, C>where
C: BatchConfig<Self::Item>,
Self: Sized,
Batches the stream based on the given batch settings and item size calculator.
The stream will yield batches of events, when either a batch fills
up or times out. The item_size_calculator
determines the “size” of each input
in a batch. The units of “size” are intentionally not defined, so you can choose
whatever is needed.
sourcefn concurrent_map<F, T>(
self,
limit: NonZeroUsize,
f: F,
) -> ConcurrentMap<Self, T>
fn concurrent_map<F, T>( self, limit: NonZeroUsize, f: F, ) -> ConcurrentMap<Self, T>
Maps the items in the stream concurrently, up to the configured limit.
For every item, the given mapper is invoked, and the future that is returned is spawned
and awaited concurrently. A limit can be passed: None
is self-describing, as it imposes
no concurrency limit, and Some(n)
limits this stage to n
concurrent operations at any
given time.
If the spawned future panics, the panic will be carried through and resumed on the task calling the stream.
sourcefn request_builder<B>(
self,
limit: NonZeroUsize,
builder: B,
) -> ConcurrentMap<Self, Result<B::Request, B::Error>>
fn request_builder<B>( self, limit: NonZeroUsize, builder: B, ) -> ConcurrentMap<Self, Result<B::Request, B::Error>>
Constructs a [Stream
] which transforms the input into a request suitable for sending to
downstream services.
Each input is transformed concurrently, up to the given limit. A limit of n
limits
this stage to n
concurrent operations at any given time.
Encoding and compression are handled internally, deferring to the builder at the necessary checkpoints for adjusting the event before encoding/compression, as well as generating the correct request object with the result of encoding/compressing the events.
sourcefn incremental_request_builder<B>(
self,
builder: B,
) -> Map<Self, Box<dyn FnMut(Self::Item) -> Vec<Result<B::Request, B::Error>> + Send + Sync>>
fn incremental_request_builder<B>( self, builder: B, ) -> Map<Self, Box<dyn FnMut(Self::Item) -> Vec<Result<B::Request, B::Error>> + Send + Sync>>
Constructs a [Stream
] which transforms the input into a number of requests suitable for
sending to downstream services.
Unlike request_builder
, which depends on the RequestBuilder
trait,
incremental_request_builder
depends on the IncrementalRequestBuilder
trait, which is
designed specifically for sinks that have more stringent requirements around the generated
requests.
As an example, the normal request_builder
doesn’t allow for a batch of input events to be
split up: all events must be split at the beginning, encoded separately (and all together),
and then reassembled into the request. If the encoding of these events caused a payload to
be generated that was, say, too large, you would have to back out the operation entirely by
failing the batch.
With incremental_request_builder
, the builder is given all of the events in a single shot,
and can generate multiple payloads. This is the maximally flexible approach to encoding,
but means that the trait doesn’t provide any default methods like RequestBuilder
does.
Each input is transformed serially.
Encoding and compression are handled internally, deferring to the builder at the necessary checkpoints for adjusting the event before encoding/compression, as well as generating the correct request object with the result of encoding/compressing the events.
sourcefn normalized<N>(self, normalizer: N) -> Normalizer<Self, N>
fn normalized<N>(self, normalizer: N) -> Normalizer<Self, N>
Normalizes a stream of Metric
events with the provided normalizer.
An implementation of MetricNormalize
is used to either drop metrics which cannot be
supported by the sink, or to modify them. Such modifications typically include converting
absolute metrics to incremental metrics by tracking the change over time for a particular
series, or emitting absolute metrics based on incremental updates.
sourcefn normalized_with_default<N>(self) -> Normalizer<Self, N>
fn normalized_with_default<N>(self) -> Normalizer<Self, N>
Normalizes a stream of Metric
events with a default normalizer.
An implementation of MetricNormalize
is used to either drop metrics which cannot be
supported by the sink, or to modify them. Such modifications typically include converting
absolute metrics to incremental metrics by tracking the change over time for a particular
series, or emitting absolute metrics based on incremental updates.
sourcefn into_driver<Svc>(self, service: Svc) -> Driver<Self, Svc>where
Self: Sized,
Self::Item: Finalizable,
Svc: Service<Self::Item>,
Svc::Error: Debug + 'static,
Svc::Future: Send + 'static,
Svc::Response: DriverResponse,
fn into_driver<Svc>(self, service: Svc) -> Driver<Self, Svc>where
Self: Sized,
Self::Item: Finalizable,
Svc: Service<Self::Item>,
Svc::Error: Debug + 'static,
Svc::Future: Send + 'static,
Svc::Response: DriverResponse,
Creates a Driver
that uses the configured event stream as the input to the given
service.
This is typically a terminal step in building a sink, bridging the gap from the processing that must be performed by Vector (in the stream) to the underlying sink itself (the service).