1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
use std::{fmt, future::ready};
use async_trait::async_trait;
use futures_util::{
stream::{self, BoxStream},
StreamExt,
};
use tower::Service;
use vector_lib::internal_event::Protocol;
use vector_lib::stream::{BatcherSettings, DriverResponse};
use vector_lib::{event::Event, sink::StreamSink};
use crate::sinks::util::SinkBuilderExt;
use super::{
batch::StatsdBatchSizer, normalizer::StatsdNormalizer, request_builder::StatsdRequestBuilder,
service::StatsdRequest,
};
pub(crate) struct StatsdSink<S> {
service: S,
batch_settings: BatcherSettings,
request_builder: StatsdRequestBuilder,
protocol: Protocol,
}
impl<S> StatsdSink<S>
where
S: Service<StatsdRequest> + Send,
S::Error: fmt::Debug + Send + 'static,
S::Future: Send + 'static,
S::Response: DriverResponse,
{
/// Creates a new `StatsdSink`.
pub const fn new(
service: S,
batch_settings: BatcherSettings,
request_builder: StatsdRequestBuilder,
protocol: Protocol,
) -> Self {
Self {
service,
batch_settings,
request_builder,
protocol,
}
}
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
input
// Convert `Event` to `Metric` so we don't have to deal with constant conversions.
.filter_map(|event| ready(event.try_into_metric()))
// Converts absolute counters into incremental counters, but otherwise leaves everything
// else alone. The encoder will handle the difference in absolute vs incremental for
// other metric types in type-specific ways i.e. incremental gauge updates use a
// different syntax, etc.
.normalized_with_default::<StatsdNormalizer>()
.batched(self.batch_settings.as_item_size_config(StatsdBatchSizer))
// We build our requests "incrementally", which means that for a single batch of
// metrics, we might generate N requests to represent all of the metrics in the batch.
//
// We do this as for different socket modes, there are optimal request sizes to use to
// ensure the highest rate of delivery, such as staying within the MTU for UDP, etc.
.incremental_request_builder(self.request_builder)
// This unrolls the vector of request results that our request builder generates.
.flat_map(stream::iter)
// Generating requests _cannot_ fail, so we just unwrap our built requests.
.unwrap_infallible()
// Finally, we generate the driver which will take our requests, send them off, and appropriately handle
// finalization of the events, and logging/metrics, as the requests are responded to.
.into_driver(self.service)
.protocol(self.protocol)
.run()
.await
}
}
#[async_trait]
impl<S> StreamSink<Event> for StatsdSink<S>
where
S: Service<StatsdRequest> + Send,
S::Error: fmt::Debug + Send + 'static,
S::Future: Send + 'static,
S::Response: DriverResponse,
{
async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
// Rust has issues with lifetimes and generics, which `async_trait` exacerbates, so we write
// a normal async fn in `StatsdSink` itself, and then call out to it from this trait
// implementation, which makes the compiler happy.
self.run_inner(input).await
}
}