vector/sinks/statsd/
sink.rs1use std::{fmt, future::ready};
2
3use async_trait::async_trait;
4use futures_util::{
5 StreamExt,
6 stream::{self, BoxStream},
7};
8use tower::Service;
9use vector_lib::{
10 event::Event,
11 internal_event::Protocol,
12 sink::StreamSink,
13 stream::{BatcherSettings, DriverResponse},
14};
15
16use super::{
17 batch::StatsdBatchSizer, normalizer::StatsdNormalizer, request_builder::StatsdRequestBuilder,
18 service::StatsdRequest,
19};
20use crate::sinks::util::SinkBuilderExt;
21
22pub(crate) struct StatsdSink<S> {
23 service: S,
24 batch_settings: BatcherSettings,
25 request_builder: StatsdRequestBuilder,
26 protocol: Protocol,
27}
28
29impl<S> StatsdSink<S>
30where
31 S: Service<StatsdRequest> + Send,
32 S::Error: fmt::Debug + Send + 'static,
33 S::Future: Send + 'static,
34 S::Response: DriverResponse,
35{
36 pub const fn new(
38 service: S,
39 batch_settings: BatcherSettings,
40 request_builder: StatsdRequestBuilder,
41 protocol: Protocol,
42 ) -> Self {
43 Self {
44 service,
45 batch_settings,
46 request_builder,
47 protocol,
48 }
49 }
50
51 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
52 input
53 .filter_map(|event| ready(event.try_into_metric()))
55 .normalized_with_default::<StatsdNormalizer>()
60 .batched(self.batch_settings.as_item_size_config(StatsdBatchSizer))
61 .incremental_request_builder(self.request_builder)
67 .flat_map(stream::iter)
69 .unwrap_infallible()
71 .into_driver(self.service)
74 .protocol(self.protocol)
75 .run()
76 .await
77 }
78}
79
80#[async_trait]
81impl<S> StreamSink<Event> for StatsdSink<S>
82where
83 S: Service<StatsdRequest> + Send,
84 S::Error: fmt::Debug + Send + 'static,
85 S::Future: Send + 'static,
86 S::Response: DriverResponse,
87{
88 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
89 self.run_inner(input).await
93 }
94}