vector/sinks/util/
normalizer.rs1use std::{
2 pin::Pin,
3 task::{ready, Context, Poll},
4 time::Duration,
5};
6
7use futures_util::{stream::Fuse, Stream, StreamExt};
8use pin_project::pin_project;
9use vector_lib::event::Metric;
10
11use super::buffer::metrics::{MetricNormalize, MetricNormalizer, TtlPolicy};
12
13#[pin_project]
14pub struct Normalizer<St, N>
15where
16 St: Stream,
17{
18 #[pin]
19 stream: Fuse<St>,
20 normalizer: MetricNormalizer<N>,
21}
22
23impl<St, N> Normalizer<St, N>
24where
25 St: Stream,
26{
27 pub fn new(stream: St, normalizer: N) -> Self {
28 Self {
29 stream: stream.fuse(),
30 normalizer: MetricNormalizer::from(normalizer),
31 }
32 }
33
34 pub fn new_with_ttl(stream: St, normalizer: N, ttl: Duration) -> Self {
35 Self {
36 stream: stream.fuse(),
37 normalizer: MetricNormalizer::with_ttl(normalizer, TtlPolicy::new(ttl)),
38 }
39 }
40}
41
42impl<St, N> Stream for Normalizer<St, N>
43where
44 St: Stream<Item = Metric>,
45 N: MetricNormalize,
46{
47 type Item = Metric;
48
49 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50 let mut this = self.project();
51 loop {
52 match ready!(this.stream.as_mut().poll_next(cx)) {
53 Some(metric) => {
54 if let Some(normalized) = this.normalizer.normalize(metric) {
55 return Poll::Ready(Some(normalized));
56 }
57 }
58 None => return Poll::Ready(None),
59 }
60 }
61 }
62}