vector/sinks/util/
normalizer.rs

1use std::{
2    pin::Pin,
3    task::{ready, Context, Poll},
4    time::Duration,
5};
6
7use futures_util::{stream::Fuse, Stream, StreamExt};
8use pin_project::pin_project;
9use vector_lib::event::Metric;
10
11use super::buffer::metrics::{MetricNormalize, MetricNormalizer, TtlPolicy};
12
13#[pin_project]
14pub struct Normalizer<St, N>
15where
16    St: Stream,
17{
18    #[pin]
19    stream: Fuse<St>,
20    normalizer: MetricNormalizer<N>,
21}
22
23impl<St, N> Normalizer<St, N>
24where
25    St: Stream,
26{
27    pub fn new(stream: St, normalizer: N) -> Self {
28        Self {
29            stream: stream.fuse(),
30            normalizer: MetricNormalizer::from(normalizer),
31        }
32    }
33
34    pub fn new_with_ttl(stream: St, normalizer: N, ttl: Duration) -> Self {
35        Self {
36            stream: stream.fuse(),
37            normalizer: MetricNormalizer::with_ttl(normalizer, TtlPolicy::new(ttl)),
38        }
39    }
40}
41
42impl<St, N> Stream for Normalizer<St, N>
43where
44    St: Stream<Item = Metric>,
45    N: MetricNormalize,
46{
47    type Item = Metric;
48
49    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
50        let mut this = self.project();
51        loop {
52            match ready!(this.stream.as_mut().poll_next(cx)) {
53                Some(metric) => {
54                    if let Some(normalized) = this.normalizer.normalize(metric) {
55                        return Poll::Ready(Some(normalized));
56                    }
57                }
58                None => return Poll::Ready(None),
59            }
60        }
61    }
62}