vector/sinks/statsd/
sink.rs

1use std::{fmt, future::ready};
2
3use async_trait::async_trait;
4use futures_util::{
5    stream::{self, BoxStream},
6    StreamExt,
7};
8use tower::Service;
9use vector_lib::internal_event::Protocol;
10use vector_lib::stream::{BatcherSettings, DriverResponse};
11use vector_lib::{event::Event, sink::StreamSink};
12
13use crate::sinks::util::SinkBuilderExt;
14
15use super::{
16    batch::StatsdBatchSizer, normalizer::StatsdNormalizer, request_builder::StatsdRequestBuilder,
17    service::StatsdRequest,
18};
19
20pub(crate) struct StatsdSink<S> {
21    service: S,
22    batch_settings: BatcherSettings,
23    request_builder: StatsdRequestBuilder,
24    protocol: Protocol,
25}
26
27impl<S> StatsdSink<S>
28where
29    S: Service<StatsdRequest> + Send,
30    S::Error: fmt::Debug + Send + 'static,
31    S::Future: Send + 'static,
32    S::Response: DriverResponse,
33{
34    /// Creates a new `StatsdSink`.
35    pub const fn new(
36        service: S,
37        batch_settings: BatcherSettings,
38        request_builder: StatsdRequestBuilder,
39        protocol: Protocol,
40    ) -> Self {
41        Self {
42            service,
43            batch_settings,
44            request_builder,
45            protocol,
46        }
47    }
48
49    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
50        input
51            // Convert `Event` to `Metric` so we don't have to deal with constant conversions.
52            .filter_map(|event| ready(event.try_into_metric()))
53            // Converts absolute counters into incremental counters, but otherwise leaves everything
54            // else alone. The encoder will handle the difference in absolute vs incremental for
55            // other metric types in type-specific ways i.e. incremental gauge updates use a
56            // different syntax, etc.
57            .normalized_with_default::<StatsdNormalizer>()
58            .batched(self.batch_settings.as_item_size_config(StatsdBatchSizer))
59            // We build our requests "incrementally", which means that for a single batch of
60            // metrics, we might generate N requests to represent all of the metrics in the batch.
61            //
62            // We do this as for different socket modes, there are optimal request sizes to use to
63            // ensure the highest rate of delivery, such as staying within the MTU for UDP, etc.
64            .incremental_request_builder(self.request_builder)
65            // This unrolls the vector of request results that our request builder generates.
66            .flat_map(stream::iter)
67            // Generating requests _cannot_ fail, so we just unwrap our built requests.
68            .unwrap_infallible()
69            // Finally, we generate the driver which will take our requests, send them off, and appropriately handle
70            // finalization of the events, and logging/metrics, as the requests are responded to.
71            .into_driver(self.service)
72            .protocol(self.protocol)
73            .run()
74            .await
75    }
76}
77
78#[async_trait]
79impl<S> StreamSink<Event> for StatsdSink<S>
80where
81    S: Service<StatsdRequest> + Send,
82    S::Error: fmt::Debug + Send + 'static,
83    S::Future: Send + 'static,
84    S::Response: DriverResponse,
85{
86    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
87        // Rust has issues with lifetimes and generics, which `async_trait` exacerbates, so we write
88        // a normal async fn in `StatsdSink` itself, and then call out to it from this trait
89        // implementation, which makes the compiler happy.
90        self.run_inner(input).await
91    }
92}