vector/sinks/statsd/
service.rs

1use std::task::{Context, Poll};
2
3use futures_util::future::BoxFuture;
4use tower::Service;
5use vector_lib::stream::DriverResponse;
6use vector_lib::{
7    finalization::{EventFinalizers, EventStatus, Finalizable},
8    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
9};
10
11/// Generalized request for sending metrics to a StatsD endpoint.
12#[derive(Clone, Debug)]
13pub struct StatsdRequest {
14    pub payload: Vec<u8>,
15    pub finalizers: EventFinalizers,
16    pub metadata: RequestMetadata,
17}
18
19impl Finalizable for StatsdRequest {
20    fn take_finalizers(&mut self) -> EventFinalizers {
21        std::mem::take(&mut self.finalizers)
22    }
23}
24
25impl MetaDescriptive for StatsdRequest {
26    fn get_metadata(&self) -> &RequestMetadata {
27        &self.metadata
28    }
29
30    fn metadata_mut(&mut self) -> &mut RequestMetadata {
31        &mut self.metadata
32    }
33}
34
35// Placeholder response to shuttle request metadata for StatsD requests.
36//
37// As StatsD sends no response back to a caller, there's no success/failure to report except for raw
38// I/O errors when sending the request. Primarily, this type shuttles the metadata around the
39// request -- events sent, bytes sent, etc -- that is required by `Driver`.
40#[derive(Debug)]
41pub struct StatsdResponse {
42    metadata: RequestMetadata,
43}
44
45impl DriverResponse for StatsdResponse {
46    fn event_status(&self) -> EventStatus {
47        // If we generated a response, that implies our send concluded without any I/O errors, so we
48        // assume things were delivered.
49        EventStatus::Delivered
50    }
51
52    fn events_sent(&self) -> &GroupedCountByteSize {
53        self.metadata.events_estimated_json_encoded_byte_size()
54    }
55
56    fn bytes_sent(&self) -> Option<usize> {
57        Some(self.metadata.request_encoded_size())
58    }
59}
60
61#[derive(Clone)]
62pub struct StatsdService<T> {
63    transport: T,
64}
65
66impl<T> StatsdService<T> {
67    /// Creates a new `StatsdService` with the given `transport` service.
68    ///
69    /// The `transport` service is responsible for sending the actual encoded requests to the downstream
70    /// endpoint.
71    pub const fn from_transport(transport: T) -> Self {
72        Self { transport }
73    }
74}
75
76impl<T> Service<StatsdRequest> for StatsdService<T>
77where
78    T: Service<Vec<u8>>,
79    T::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
80    T::Future: Send + 'static,
81{
82    type Response = StatsdResponse;
83    type Error = Box<dyn std::error::Error + Send + Sync>;
84    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
85
86    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
87        self.transport.poll_ready(cx).map_err(Into::into)
88    }
89
90    fn call(&mut self, request: StatsdRequest) -> Self::Future {
91        let StatsdRequest {
92            payload,
93            finalizers: _,
94            metadata,
95        } = request;
96
97        let send_future = self.transport.call(payload);
98
99        Box::pin(async move {
100            send_future
101                .await
102                .map(|_| StatsdResponse { metadata })
103                .map_err(Into::into)
104        })
105    }
106}