vector/sinks/util/
normalizer.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll, ready},
4    time::Duration,
5};
6
7use futures_util::{Stream, StreamExt, stream::Fuse};
8use pin_project::pin_project;
9use vector_lib::event::Metric;
10
11use crate::sinks::util::buffer::metrics::{
12    DefaultNormalizerSettings, MetricNormalize, MetricNormalizer, NormalizerConfig,
13};
14
15#[pin_project]
16pub struct Normalizer<St, N>
17where
18    St: Stream,
19{
20    #[pin]
21    stream: Fuse<St>,
22    normalizer: MetricNormalizer<N>,
23}
24
25impl<St, N> Normalizer<St, N>
26where
27    St: Stream,
28{
29    pub fn new(stream: St, normalizer: N) -> Self {
30        Self {
31            stream: stream.fuse(),
32            normalizer: MetricNormalizer::from(normalizer),
33        }
34    }
35
36    pub fn new_with_ttl(stream: St, normalizer: N, ttl: Duration) -> Self {
37        // Create a new MetricSetConfig with the proper settings type
38        let config = NormalizerConfig::<DefaultNormalizerSettings> {
39            time_to_live: Some(ttl.as_secs()),
40            max_bytes: None,
41            max_events: None,
42            _d: std::marker::PhantomData,
43        };
44        Self {
45            stream: stream.fuse(),
46            normalizer: MetricNormalizer::with_config(normalizer, config),
47        }
48    }
49}
50
51impl<St, N> Stream for Normalizer<St, N>
52where
53    St: Stream<Item = Metric>,
54    N: MetricNormalize,
55{
56    type Item = Metric;
57
58    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59        let mut this = self.project();
60        loop {
61            match ready!(this.stream.as_mut().poll_next(cx)) {
62                Some(metric) => {
63                    if let Some(normalized) = this.normalizer.normalize(metric) {
64                        return Poll::Ready(Some(normalized));
65                    }
66                }
67                None => return Poll::Ready(None),
68            }
69        }
70    }
71}