vector/sinks/statsd/
sink.rs1use std::{fmt, future::ready};
2
3use async_trait::async_trait;
4use futures_util::{
5 stream::{self, BoxStream},
6 StreamExt,
7};
8use tower::Service;
9use vector_lib::internal_event::Protocol;
10use vector_lib::stream::{BatcherSettings, DriverResponse};
11use vector_lib::{event::Event, sink::StreamSink};
12
13use crate::sinks::util::SinkBuilderExt;
14
15use super::{
16 batch::StatsdBatchSizer, normalizer::StatsdNormalizer, request_builder::StatsdRequestBuilder,
17 service::StatsdRequest,
18};
19
20pub(crate) struct StatsdSink<S> {
21 service: S,
22 batch_settings: BatcherSettings,
23 request_builder: StatsdRequestBuilder,
24 protocol: Protocol,
25}
26
27impl<S> StatsdSink<S>
28where
29 S: Service<StatsdRequest> + Send,
30 S::Error: fmt::Debug + Send + 'static,
31 S::Future: Send + 'static,
32 S::Response: DriverResponse,
33{
34 pub const fn new(
36 service: S,
37 batch_settings: BatcherSettings,
38 request_builder: StatsdRequestBuilder,
39 protocol: Protocol,
40 ) -> Self {
41 Self {
42 service,
43 batch_settings,
44 request_builder,
45 protocol,
46 }
47 }
48
49 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
50 input
51 .filter_map(|event| ready(event.try_into_metric()))
53 .normalized_with_default::<StatsdNormalizer>()
58 .batched(self.batch_settings.as_item_size_config(StatsdBatchSizer))
59 .incremental_request_builder(self.request_builder)
65 .flat_map(stream::iter)
67 .unwrap_infallible()
69 .into_driver(self.service)
72 .protocol(self.protocol)
73 .run()
74 .await
75 }
76}
77
78#[async_trait]
79impl<S> StreamSink<Event> for StatsdSink<S>
80where
81 S: Service<StatsdRequest> + Send,
82 S::Error: fmt::Debug + Send + 'static,
83 S::Future: Send + 'static,
84 S::Response: DriverResponse,
85{
86 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
87 self.run_inner(input).await
91 }
92}