1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
use std::convert::Infallible;
use bytes::BytesMut;
use tokio_util::codec::Encoder;
use vector_lib::request_metadata::RequestMetadata;
use vector_lib::{
config::telemetry,
event::{EventFinalizers, Finalizable, Metric},
EstimatedJsonEncodedSizeOf,
};
use super::{encoder::StatsdEncoder, service::StatsdRequest};
use crate::{
internal_events::SocketMode,
sinks::util::{
metadata::RequestMetadataBuilder, request_builder::EncodeResult, IncrementalRequestBuilder,
},
};
/// Incremental request builder specific to StatsD.
pub struct StatsdRequestBuilder {
encoder: StatsdEncoder,
request_max_size: usize,
encode_buf: BytesMut,
}
impl StatsdRequestBuilder {
pub fn new(default_namespace: Option<String>, socket_mode: SocketMode) -> Self {
let encoder = StatsdEncoder::new(default_namespace);
let request_max_size = match socket_mode {
// Following the recommended advice [1], we use a datagram size that should reasonably
// fit within the MTU of the common places that Vector will run: virtual cloud networks,
// regular ol' Ethernet networks, and so on.
//
// [1]: https://github.com/statsd/statsd/blob/0de340f864/docs/metric_types.md?plain=1#L121
SocketMode::Udp => 1432,
// Since messages can be much bigger with TCP and Unix domain sockets, we'll give
// ourselves the chance to build bigger requests which should increase I/O efficiency.
SocketMode::Tcp | SocketMode::Unix => 8192,
};
Self::from_encoder_and_max_size(encoder, request_max_size)
}
fn from_encoder_and_max_size(encoder: StatsdEncoder, request_max_size: usize) -> Self {
Self {
encoder,
request_max_size,
encode_buf: BytesMut::with_capacity(8192),
}
}
}
impl Clone for StatsdRequestBuilder {
fn clone(&self) -> Self {
Self::from_encoder_and_max_size(self.encoder.clone(), self.request_max_size)
}
}
impl IncrementalRequestBuilder<Vec<Metric>> for StatsdRequestBuilder {
type Metadata = (EventFinalizers, RequestMetadata);
type Payload = Vec<u8>;
type Request = StatsdRequest;
type Error = Infallible;
fn encode_events_incremental(
&mut self,
mut input: Vec<Metric>,
) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
let mut results = Vec::new();
let mut pending = None;
let mut metrics = input.drain(..);
while metrics.len() != 0 || pending.is_some() {
let mut byte_size = telemetry().create_request_count_byte_size();
let mut n = 0;
let mut request_buf = Vec::new();
let mut finalizers = EventFinalizers::default();
let mut request_metadata_builder = RequestMetadataBuilder::default();
loop {
// Grab the previously pending metric, or the next metric from the drain.
let (mut metric, was_encoded) = match pending.take() {
Some(metric) => (metric, true),
None => match metrics.next() {
Some(metric) => (metric, false),
None => break,
},
};
byte_size.add_event(&metric, metric.estimated_json_encoded_size_of());
// Encode the metric. Once we've done that, see if it can fit into the request
// buffer without exceeding the maximum request size limit.
//
// If it doesn't fit, we'll store this metric off to the side and break out of this
// loop, which will finalize the current request payload and store it in the vector of
// all generated requests. Otherwise, we'll merge it in and continue encoding.
//
// Crucially, we only break out if the current request payload already has data in
// it, as we need to be able to stick at least one encoded metric into each request.
if !was_encoded {
self.encode_buf.clear();
self.encoder
.encode(&metric, &mut self.encode_buf)
.expect("encoding is infallible");
}
let request_buf_len = request_buf.len();
if request_buf_len != 0
&& (request_buf_len + self.encode_buf.len() > self.request_max_size)
{
// The metric, as encoded, would cause us to exceed our maximum request size, so
// store it off to the side and finalize the current request.
pending = Some(metric);
break;
}
// Merge the encoded metric into the request buffer and take over its event
// finalizers, etc.
request_buf.extend(&self.encode_buf[..]);
finalizers.merge(metric.take_finalizers());
request_metadata_builder.track_event(metric);
n += 1;
}
// If we encoded one or more metrics this pass, finalize the request.
if n > 0 {
let encode_result = EncodeResult::uncompressed(request_buf, byte_size);
let request_metadata = request_metadata_builder.build(&encode_result);
results.push(Ok((
(finalizers, request_metadata),
encode_result.into_payload(),
)));
}
}
results
}
fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
let (finalizers, metadata) = metadata;
StatsdRequest {
payload,
finalizers,
metadata,
}
}
}