vector/sinks/statsd/
request_builder.rs1use std::convert::Infallible;
2
3use bytes::BytesMut;
4use tokio_util::codec::Encoder;
5use vector_lib::request_metadata::RequestMetadata;
6use vector_lib::{
7 config::telemetry,
8 event::{EventFinalizers, Finalizable, Metric},
9 EstimatedJsonEncodedSizeOf,
10};
11
12use super::{encoder::StatsdEncoder, service::StatsdRequest};
13use crate::{
14 internal_events::SocketMode,
15 sinks::util::{
16 metadata::RequestMetadataBuilder, request_builder::EncodeResult, IncrementalRequestBuilder,
17 },
18};
19
20pub struct StatsdRequestBuilder {
22 encoder: StatsdEncoder,
23 request_max_size: usize,
24 encode_buf: BytesMut,
25}
26
27impl StatsdRequestBuilder {
28 pub fn new(default_namespace: Option<String>, socket_mode: SocketMode) -> Self {
29 let encoder = StatsdEncoder::new(default_namespace);
30 let request_max_size = match socket_mode {
31 SocketMode::Udp => 1432,
37
38 SocketMode::Tcp | SocketMode::Unix => 8192,
41 };
42
43 Self::from_encoder_and_max_size(encoder, request_max_size)
44 }
45
46 fn from_encoder_and_max_size(encoder: StatsdEncoder, request_max_size: usize) -> Self {
47 Self {
48 encoder,
49 request_max_size,
50 encode_buf: BytesMut::with_capacity(8192),
51 }
52 }
53}
54
55impl Clone for StatsdRequestBuilder {
56 fn clone(&self) -> Self {
57 Self::from_encoder_and_max_size(self.encoder.clone(), self.request_max_size)
58 }
59}
60
61impl IncrementalRequestBuilder<Vec<Metric>> for StatsdRequestBuilder {
62 type Metadata = (EventFinalizers, RequestMetadata);
63 type Payload = Vec<u8>;
64 type Request = StatsdRequest;
65 type Error = Infallible;
66
67 fn encode_events_incremental(
68 &mut self,
69 mut input: Vec<Metric>,
70 ) -> Vec<Result<(Self::Metadata, Self::Payload), Self::Error>> {
71 let mut results = Vec::new();
72 let mut pending = None;
73
74 let mut metrics = input.drain(..);
75 while metrics.len() != 0 || pending.is_some() {
76 let mut byte_size = telemetry().create_request_count_byte_size();
77 let mut n = 0;
78
79 let mut request_buf = Vec::new();
80 let mut finalizers = EventFinalizers::default();
81 let mut request_metadata_builder = RequestMetadataBuilder::default();
82
83 loop {
84 let (mut metric, was_encoded) = match pending.take() {
86 Some(metric) => (metric, true),
87 None => match metrics.next() {
88 Some(metric) => (metric, false),
89 None => break,
90 },
91 };
92
93 byte_size.add_event(&metric, metric.estimated_json_encoded_size_of());
94
95 if !was_encoded {
105 self.encode_buf.clear();
106 self.encoder
107 .encode(&metric, &mut self.encode_buf)
108 .expect("encoding is infallible");
109 }
110
111 let request_buf_len = request_buf.len();
112 if request_buf_len != 0
113 && (request_buf_len + self.encode_buf.len() > self.request_max_size)
114 {
115 pending = Some(metric);
118 break;
119 }
120
121 request_buf.extend(&self.encode_buf[..]);
124 finalizers.merge(metric.take_finalizers());
125 request_metadata_builder.track_event(metric);
126 n += 1;
127 }
128
129 if n > 0 {
131 let encode_result = EncodeResult::uncompressed(request_buf, byte_size);
132 let request_metadata = request_metadata_builder.build(&encode_result);
133
134 results.push(Ok((
135 (finalizers, request_metadata),
136 encode_result.into_payload(),
137 )));
138 }
139 }
140
141 results
142 }
143
144 fn build_request(&mut self, metadata: Self::Metadata, payload: Self::Payload) -> Self::Request {
145 let (finalizers, metadata) = metadata;
146 StatsdRequest {
147 payload,
148 finalizers,
149 metadata,
150 }
151 }
152}