vector/sinks/statsd/
request_builder.rs

1use std::convert::Infallible;
2
3use bytes::BytesMut;
4use tokio_util::codec::Encoder;
5use vector_lib::request_metadata::RequestMetadata;
6use vector_lib::{
7    config::telemetry,
8    event::{EventFinalizers, Finalizable, Metric},
9    EstimatedJsonEncodedSizeOf,
10};
11
12use super::{encoder::StatsdEncoder, service::StatsdRequest};
13use crate::{
14    internal_events::SocketMode,
15    sinks::util::{
16        metadata::RequestMetadataBuilder, request_builder::EncodeResult, IncrementalRequestBuilder,
17    },
18};
19
20/// Incremental request builder specific to StatsD.
21pub struct StatsdRequestBuilder {
22    encoder: StatsdEncoder,
23    request_max_size: usize,
24    encode_buf: BytesMut,
25}
26
27impl StatsdRequestBuilder {
28    pub fn new(default_namespace: Option<String>, socket_mode: SocketMode) -> Self {
29        let encoder = StatsdEncoder::new(default_namespace);
30        let request_max_size = match socket_mode {
31            // Following the recommended advice [1], we use a datagram size that should reasonably
32            // fit within the MTU of the common places that Vector will run: virtual cloud networks,
33            // regular ol' Ethernet networks, and so on.
34            //
35            // [1]: https://github.com/statsd/statsd/blob/0de340f864/docs/metric_types.md?plain=1#L121
36            SocketMode::Udp => 1432,
37
38            // Since messages can be much bigger with TCP and Unix domain sockets, we'll give
39            // ourselves the chance to build bigger requests which should increase I/O efficiency.
40            SocketMode::Tcp | SocketMode::Unix => 8192,
41        };
42
43        Self::from_encoder_and_max_size(encoder, request_max_size)
44    }
45
46    fn from_encoder_and_max_size(encoder: StatsdEncoder, request_max_size: usize) -> Self {
47        Self {
48            encoder,
49            request_max_size,
50            encode_buf: BytesMut::with_capacity(8192),
51        }
52    }
53}
54
55impl Clone for StatsdRequestBuilder {
56    fn clone(&self) -> Self {
57        Self::from_encoder_and_max_size(self.encoder.clone(), self.request_max_size)
58    }
59}
60
61impl IncrementalRequestBuilder<Vec<Metric>> for StatsdRequestBuilder {
62    type Metadata = (EventFinalizers, RequestMetadata);
63    type Payload = Vec<u8>;
64    type Request = StatsdRequest;
65    type Error = Infallible;
66
67    fn encode_events_incremental(
68        &mut self,
69        mut input: Vec<Metric>,
70    ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
71        let mut results = Vec::new();
72        let mut pending = None;
73
74        let mut metrics = input.drain(..);
75        while metrics.len() != 0 || pending.is_some() {
76            let mut byte_size = telemetry().create_request_count_byte_size();
77            let mut n = 0;
78
79            let mut request_buf = Vec::new();
80            let mut finalizers = EventFinalizers::default();
81            let mut request_metadata_builder = RequestMetadataBuilder::default();
82
83            loop {
84                // Grab the previously pending metric, or the next metric from the drain.
85                let (mut metric, was_encoded) = match pending.take() {
86                    Some(metric) => (metric, true),
87                    None => match metrics.next() {
88                        Some(metric) => (metric, false),
89                        None => break,
90                    },
91                };
92
93                byte_size.add_event(&metric, metric.estimated_json_encoded_size_of());
94
95                // Encode the metric. Once we've done that, see if it can fit into the request
96                // buffer without exceeding the maximum request size limit.
97                //
98                // If it doesn't fit, we'll store this metric off to the side and break out of this
99                // loop, which will finalize the current request payload and store it in the vector of
100                // all generated requests. Otherwise, we'll merge it in and continue encoding.
101                //
102                // Crucially, we only break out if the current request payload already has data in
103                // it, as we need to be able to stick at least one encoded metric into each request.
104                if !was_encoded {
105                    self.encode_buf.clear();
106                    self.encoder
107                        .encode(&metric, &mut self.encode_buf)
108                        .expect("encoding is infallible");
109                }
110
111                let request_buf_len = request_buf.len();
112                if request_buf_len != 0
113                    && (request_buf_len + self.encode_buf.len() > self.request_max_size)
114                {
115                    // The metric, as encoded, would cause us to exceed our maximum request size, so
116                    // store it off to the side and finalize the current request.
117                    pending = Some(metric);
118                    break;
119                }
120
121                // Merge the encoded metric into the request buffer and take over its event
122                // finalizers, etc.
123                request_buf.extend(&self.encode_buf[..]);
124                finalizers.merge(metric.take_finalizers());
125                request_metadata_builder.track_event(metric);
126                n += 1;
127            }
128
129            // If we encoded one or more metrics this pass, finalize the request.
130            if n > 0 {
131                let encode_result = EncodeResult::uncompressed(request_buf, byte_size);
132                let request_metadata = request_metadata_builder.build(&encode_result);
133
134                results.push(Ok((
135                    (finalizers, request_metadata),
136                    encode_result.into_payload(),
137                )));
138            }
139        }
140
141        results
142    }
143
144    fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
145        let (finalizers, metadata) = metadata;
146        StatsdRequest {
147            payload,
148            finalizers,
149            metadata,
150        }
151    }
152}