1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use std::convert::Infallible;

use bytes::BytesMut;
use tokio_util::codec::Encoder;
use vector_lib::request_metadata::RequestMetadata;
use vector_lib::{
    config::telemetry,
    event::{EventFinalizers, Finalizable, Metric},
    EstimatedJsonEncodedSizeOf,
};

use super::{encoder::StatsdEncoder, service::StatsdRequest};
use crate::{
    internal_events::SocketMode,
    sinks::util::{
        metadata::RequestMetadataBuilder, request_builder::EncodeResult, IncrementalRequestBuilder,
    },
};

/// Incremental request builder specific to StatsD.
pub struct StatsdRequestBuilder {
    encoder: StatsdEncoder,
    request_max_size: usize,
    encode_buf: BytesMut,
}

impl StatsdRequestBuilder {
    pub fn new(default_namespace: Option<String>, socket_mode: SocketMode) -> Self {
        let encoder = StatsdEncoder::new(default_namespace);
        let request_max_size = match socket_mode {
            // Following the recommended advice [1], we use a datagram size that should reasonably
            // fit within the MTU of the common places that Vector will run: virtual cloud networks,
            // regular ol' Ethernet networks, and so on.
            //
            // [1]: https://github.com/statsd/statsd/blob/0de340f864/docs/metric_types.md?plain=1#L121
            SocketMode::Udp => 1432,

            // Since messages can be much bigger with TCP and Unix domain sockets, we'll give
            // ourselves the chance to build bigger requests which should increase I/O efficiency.
            SocketMode::Tcp | SocketMode::Unix => 8192,
        };

        Self::from_encoder_and_max_size(encoder, request_max_size)
    }

    fn from_encoder_and_max_size(encoder: StatsdEncoder, request_max_size: usize) -> Self {
        Self {
            encoder,
            request_max_size,
            encode_buf: BytesMut::with_capacity(8192),
        }
    }
}

impl Clone for StatsdRequestBuilder {
    fn clone(&self) -> Self {
        Self::from_encoder_and_max_size(self.encoder.clone(), self.request_max_size)
    }
}

impl IncrementalRequestBuilder<Vec<Metric>> for StatsdRequestBuilder {
    type Metadata = (EventFinalizers, RequestMetadata);
    type Payload = Vec<u8>;
    type Request = StatsdRequest;
    type Error = Infallible;

    fn encode_events_incremental(
        &mut self,
        mut input: Vec<Metric>,
    ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
        let mut results = Vec::new();
        let mut pending = None;

        let mut metrics = input.drain(..);
        while metrics.len() != 0 || pending.is_some() {
            let mut byte_size = telemetry().create_request_count_byte_size();
            let mut n = 0;

            let mut request_buf = Vec::new();
            let mut finalizers = EventFinalizers::default();
            let mut request_metadata_builder = RequestMetadataBuilder::default();

            loop {
                // Grab the previously pending metric, or the next metric from the drain.
                let (mut metric, was_encoded) = match pending.take() {
                    Some(metric) => (metric, true),
                    None => match metrics.next() {
                        Some(metric) => (metric, false),
                        None => break,
                    },
                };

                byte_size.add_event(&metric, metric.estimated_json_encoded_size_of());

                // Encode the metric. Once we've done that, see if it can fit into the request
                // buffer without exceeding the maximum request size limit.
                //
                // If it doesn't fit, we'll store this metric off to the side and break out of this
                // loop, which will finalize the current request payload and store it in the vector of
                // all generated requests. Otherwise, we'll merge it in and continue encoding.
                //
                // Crucially, we only break out if the current request payload already has data in
                // it, as we need to be able to stick at least one encoded metric into each request.
                if !was_encoded {
                    self.encode_buf.clear();
                    self.encoder
                        .encode(&metric, &mut self.encode_buf)
                        .expect("encoding is infallible");
                }

                let request_buf_len = request_buf.len();
                if request_buf_len != 0
                    && (request_buf_len + self.encode_buf.len() > self.request_max_size)
                {
                    // The metric, as encoded, would cause us to exceed our maximum request size, so
                    // store it off to the side and finalize the current request.
                    pending = Some(metric);
                    break;
                }

                // Merge the encoded metric into the request buffer and take over its event
                // finalizers, etc.
                request_buf.extend(&self.encode_buf[..]);
                finalizers.merge(metric.take_finalizers());
                request_metadata_builder.track_event(metric);
                n += 1;
            }

            // If we encoded one or more metrics this pass, finalize the request.
            if n > 0 {
                let encode_result = EncodeResult::uncompressed(request_buf, byte_size);
                let request_metadata = request_metadata_builder.build(&encode_result);

                results.push(Ok((
                    (finalizers, request_metadata),
                    encode_result.into_payload(),
                )));
            }
        }

        results
    }

    fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
        let (finalizers, metadata) = metadata;
        StatsdRequest {
            payload,
            finalizers,
            metadata,
        }
    }
}