vector/sinks/statsd/
sink.rs

1use std::{fmt, future::ready};
2
3use async_trait::async_trait;
4use futures_util::{
5    StreamExt,
6    stream::{self, BoxStream},
7};
8use tower::Service;
9use vector_lib::{
10    event::Event,
11    internal_event::Protocol,
12    sink::StreamSink,
13    stream::{BatcherSettings, DriverResponse},
14};
15
16use super::{
17    batch::StatsdBatchSizer, normalizer::StatsdNormalizer, request_builder::StatsdRequestBuilder,
18    service::StatsdRequest,
19};
20use crate::sinks::util::SinkBuilderExt;
21
22pub(crate) struct StatsdSink<S> {
23    service: S,
24    batch_settings: BatcherSettings,
25    request_builder: StatsdRequestBuilder,
26    protocol: Protocol,
27}
28
29impl<S> StatsdSink<S>
30where
31    S: Service<StatsdRequest> + Send,
32    S::Error: fmt::Debug + Send + 'static,
33    S::Future: Send + 'static,
34    S::Response: DriverResponse,
35{
36    /// Creates a new `StatsdSink`.
37    pub const fn new(
38        service: S,
39        batch_settings: BatcherSettings,
40        request_builder: StatsdRequestBuilder,
41        protocol: Protocol,
42    ) -> Self {
43        Self {
44            service,
45            batch_settings,
46            request_builder,
47            protocol,
48        }
49    }
50
51    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
52        input
53            // Convert `Event` to `Metric` so we don't have to deal with constant conversions.
54            .filter_map(|event| ready(event.try_into_metric()))
55            // Converts absolute counters into incremental counters, but otherwise leaves everything
56            // else alone. The encoder will handle the difference in absolute vs incremental for
57            // other metric types in type-specific ways i.e. incremental gauge updates use a
58            // different syntax, etc.
59            .normalized_with_default::<StatsdNormalizer>()
60            .batched(self.batch_settings.as_item_size_config(StatsdBatchSizer))
61            // We build our requests "incrementally", which means that for a single batch of
62            // metrics, we might generate N requests to represent all of the metrics in the batch.
63            //
64            // We do this as for different socket modes, there are optimal request sizes to use to
65            // ensure the highest rate of delivery, such as staying within the MTU for UDP, etc.
66            .incremental_request_builder(self.request_builder)
67            // This unrolls the vector of request results that our request builder generates.
68            .flat_map(stream::iter)
69            // Generating requests _cannot_ fail, so we just unwrap our built requests.
70            .unwrap_infallible()
71            // Finally, we generate the driver which will take our requests, send them off, and appropriately handle
72            // finalization of the events, and logging/metrics, as the requests are responded to.
73            .into_driver(self.service)
74            .protocol(self.protocol)
75            .run()
76            .await
77    }
78}
79
80#[async_trait]
81impl<S> StreamSink<Event> for StatsdSink<S>
82where
83    S: Service<StatsdRequest> + Send,
84    S::Error: fmt::Debug + Send + 'static,
85    S::Future: Send + 'static,
86    S::Response: DriverResponse,
87{
88    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
89        // Rust has issues with lifetimes and generics, which `async_trait` exacerbates, so we write
90        // a normal async fn in `StatsdSink` itself, and then call out to it from this trait
91        // implementation, which makes the compiler happy.
92        self.run_inner(input).await
93    }
94}