vector/sinks/statsd/
service.rs1use std::task::{Context, Poll};
2
3use futures_util::future::BoxFuture;
4use tower::Service;
5use vector_lib::stream::DriverResponse;
6use vector_lib::{
7 finalization::{EventFinalizers, EventStatus, Finalizable},
8 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
9};
10
11#[derive(Clone, Debug)]
13pub struct StatsdRequest {
14 pub payload: Vec<u8>,
15 pub finalizers: EventFinalizers,
16 pub metadata: RequestMetadata,
17}
18
19impl Finalizable for StatsdRequest {
20 fn take_finalizers(&mut self) -> EventFinalizers {
21 std::mem::take(&mut self.finalizers)
22 }
23}
24
25impl MetaDescriptive for StatsdRequest {
26 fn get_metadata(&self) -> &RequestMetadata {
27 &self.metadata
28 }
29
30 fn metadata_mut(&mut self) -> &mut RequestMetadata {
31 &mut self.metadata
32 }
33}
34
35#[derive(Debug)]
41pub struct StatsdResponse {
42 metadata: RequestMetadata,
43}
44
45impl DriverResponse for StatsdResponse {
46 fn event_status(&self) -> EventStatus {
47 EventStatus::Delivered
50 }
51
52 fn events_sent(&self) -> &GroupedCountByteSize {
53 self.metadata.events_estimated_json_encoded_byte_size()
54 }
55
56 fn bytes_sent(&self) -> Option<usize> {
57 Some(self.metadata.request_encoded_size())
58 }
59}
60
61#[derive(Clone)]
62pub struct StatsdService<T> {
63 transport: T,
64}
65
66impl<T> StatsdService<T> {
67 pub const fn from_transport(transport: T) -> Self {
72 Self { transport }
73 }
74}
75
76impl<T> Service<StatsdRequest> for StatsdService<T>
77where
78 T: Service<Vec<u8>>,
79 T::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
80 T::Future: Send + 'static,
81{
82 type Response = StatsdResponse;
83 type Error = Box<dyn std::error::Error + Send + Sync>;
84 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
85
86 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
87 self.transport.poll_ready(cx).map_err(Into::into)
88 }
89
90 fn call(&mut self, request: StatsdRequest) -> Self::Future {
91 let StatsdRequest {
92 payload,
93 finalizers: _,
94 metadata,
95 } = request;
96
97 let send_future = self.transport.call(payload);
98
99 Box::pin(async move {
100 send_future
101 .await
102 .map(|_| StatsdResponse { metadata })
103 .map_err(Into::into)
104 })
105 }
106}