vector/sinks/util/
normalizer.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll, ready},
4 time::Duration,
5};
6
7use futures_util::{Stream, StreamExt, stream::Fuse};
8use pin_project::pin_project;
9use vector_lib::event::Metric;
10
11use crate::sinks::util::buffer::metrics::{
12 DefaultNormalizerSettings, MetricNormalize, MetricNormalizer, NormalizerConfig,
13};
14
15#[pin_project]
16pub struct Normalizer<St, N>
17where
18 St: Stream,
19{
20 #[pin]
21 stream: Fuse<St>,
22 normalizer: MetricNormalizer<N>,
23}
24
25impl<St, N> Normalizer<St, N>
26where
27 St: Stream,
28{
29 pub fn new(stream: St, normalizer: N) -> Self {
30 Self {
31 stream: stream.fuse(),
32 normalizer: MetricNormalizer::from(normalizer),
33 }
34 }
35
36 pub fn new_with_ttl(stream: St, normalizer: N, ttl: Duration) -> Self {
37 let config = NormalizerConfig::<DefaultNormalizerSettings> {
39 time_to_live: Some(ttl.as_secs()),
40 max_bytes: None,
41 max_events: None,
42 _d: std::marker::PhantomData,
43 };
44 Self {
45 stream: stream.fuse(),
46 normalizer: MetricNormalizer::with_config(normalizer, config),
47 }
48 }
49}
50
51impl<St, N> Stream for Normalizer<St, N>
52where
53 St: Stream<Item = Metric>,
54 N: MetricNormalize,
55{
56 type Item = Metric;
57
58 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59 let mut this = self.project();
60 loop {
61 match ready!(this.stream.as_mut().poll_next(cx)) {
62 Some(metric) => {
63 if let Some(normalized) = this.normalizer.normalize(metric) {
64 return Poll::Ready(Some(normalized));
65 }
66 }
67 None => return Poll::Ready(None),
68 }
69 }
70 }
71}