1use std::{
2 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
3 time::Duration,
4};
5use vector_lib::ipallowlist::IpAllowlistConfig;
6
7use bytes::Bytes;
8use futures::{StreamExt, TryFutureExt};
9use listenfd::ListenFd;
10use serde_with::serde_as;
11use smallvec::{smallvec, SmallVec};
12use tokio_util::udp::UdpFramed;
13use vector_lib::codecs::{
14 decoding::{self, Deserializer, Framer},
15 NewlineDelimitedDecoder,
16};
17use vector_lib::configurable::configurable_component;
18use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered};
19use vector_lib::EstimatedJsonEncodedSizeOf;
20
21use self::parser::ParseError;
22use super::util::net::{try_bind_udp_socket, SocketListenAddr, TcpNullAcker, TcpSource};
23use crate::{
24 codecs::Decoder,
25 config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
26 event::Event,
27 internal_events::{
28 EventsReceived, SocketBindError, SocketBytesReceived, SocketMode, SocketReceiveError,
29 StreamClosedError,
30 },
31 net,
32 shutdown::ShutdownSignal,
33 tcp::TcpKeepaliveConfig,
34 tls::{MaybeTlsSettings, TlsSourceConfig},
35 SourceSender,
36};
37
38pub mod parser;
39#[cfg(unix)]
40mod unix;
41
42use parser::Parser;
43
44#[cfg(unix)]
45use unix::{statsd_unix, UnixConfig};
46use vector_lib::config::LogNamespace;
47
48#[configurable_component(source("statsd", "Collect metrics emitted by the StatsD aggregator."))]
50#[derive(Clone, Debug)]
51#[serde(tag = "mode", rename_all = "snake_case")]
52#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
53#[allow(clippy::large_enum_variant)] pub enum StatsdConfig {
55 Tcp(TcpConfig),
57
58 Udp(UdpConfig),
60
61 #[cfg(unix)]
63 Unix(UnixConfig),
64}
65
66#[configurable_component]
68#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
69#[serde(rename_all = "lowercase")]
70pub enum ConversionUnit {
71 #[default]
73 Seconds,
74
75 Milliseconds,
77}
78
79#[configurable_component]
81#[derive(Clone, Debug)]
82pub struct UdpConfig {
83 #[configurable(derived)]
84 address: SocketListenAddr,
85
86 receive_buffer_bytes: Option<usize>,
88
89 #[serde(default = "default_sanitize")]
90 #[configurable(derived)]
91 sanitize: bool,
92
93 #[serde(default = "default_convert_to")]
94 #[configurable(derived)]
95 convert_to: ConversionUnit,
96}
97
98impl UdpConfig {
99 pub const fn from_address(address: SocketListenAddr) -> Self {
100 Self {
101 address,
102 receive_buffer_bytes: None,
103 sanitize: default_sanitize(),
104 convert_to: default_convert_to(),
105 }
106 }
107}
108
109#[serde_as]
111#[configurable_component]
112#[derive(Clone, Debug)]
113pub struct TcpConfig {
114 #[configurable(derived)]
115 address: SocketListenAddr,
116
117 #[configurable(derived)]
118 keepalive: Option<TcpKeepaliveConfig>,
119
120 #[configurable(derived)]
121 pub permit_origin: Option<IpAllowlistConfig>,
122
123 #[configurable(derived)]
124 #[serde(default)]
125 tls: Option<TlsSourceConfig>,
126
127 #[serde(default = "default_shutdown_timeout_secs")]
129 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
130 #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
131 shutdown_timeout_secs: Duration,
132
133 #[configurable(metadata(docs::type_unit = "bytes"))]
135 receive_buffer_bytes: Option<usize>,
136
137 #[configurable(metadata(docs::type_unit = "connections"))]
139 connection_limit: Option<u32>,
140
141 #[serde(default = "default_sanitize")]
146 #[configurable(derived)]
147 sanitize: bool,
148
149 #[serde(default = "default_convert_to")]
150 #[configurable(derived)]
151 convert_to: ConversionUnit,
152}
153
154impl TcpConfig {
155 #[cfg(test)]
156 pub const fn from_address(address: SocketListenAddr) -> Self {
157 Self {
158 address,
159 keepalive: None,
160 permit_origin: None,
161 tls: None,
162 shutdown_timeout_secs: default_shutdown_timeout_secs(),
163 receive_buffer_bytes: None,
164 connection_limit: None,
165 sanitize: default_sanitize(),
166 convert_to: default_convert_to(),
167 }
168 }
169}
170
171const fn default_shutdown_timeout_secs() -> Duration {
172 Duration::from_secs(30)
173}
174
175const fn default_sanitize() -> bool {
176 true
177}
178
179const fn default_convert_to() -> ConversionUnit {
180 ConversionUnit::Seconds
181}
182
183impl GenerateConfig for StatsdConfig {
184 fn generate_config() -> toml::Value {
185 toml::Value::try_from(Self::Udp(UdpConfig::from_address(
186 SocketListenAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(
187 Ipv4Addr::LOCALHOST,
188 8125,
189 ))),
190 )))
191 .unwrap()
192 }
193}
194
195#[async_trait::async_trait]
196#[typetag::serde(name = "statsd")]
197impl SourceConfig for StatsdConfig {
198 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
199 match self {
200 StatsdConfig::Udp(config) => {
201 Ok(Box::pin(statsd_udp(config.clone(), cx.shutdown, cx.out)))
202 }
203 StatsdConfig::Tcp(config) => {
204 let tls_config = config.tls.as_ref().map(|tls| tls.tls_config.clone());
205 let tls_client_metadata_key = config
206 .tls
207 .as_ref()
208 .and_then(|tls| tls.client_metadata_key.clone())
209 .and_then(|k| k.path);
210 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
211 let statsd_tcp_source = StatsdTcpSource {
212 sanitize: config.sanitize,
213 convert_to: config.convert_to,
214 };
215
216 statsd_tcp_source.run(
217 config.address,
218 config.keepalive,
219 config.shutdown_timeout_secs,
220 tls,
221 tls_client_metadata_key,
222 config.receive_buffer_bytes,
223 None,
224 cx,
225 false.into(),
226 config.connection_limit,
227 config.permit_origin.clone().map(Into::into),
228 StatsdConfig::NAME,
229 LogNamespace::Legacy,
230 )
231 }
232 #[cfg(unix)]
233 StatsdConfig::Unix(config) => statsd_unix(config.clone(), cx.shutdown, cx.out),
234 }
235 }
236
237 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
238 vec![SourceOutput::new_metrics()]
239 }
240
241 fn resources(&self) -> Vec<Resource> {
242 match self.clone() {
243 Self::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
244 Self::Udp(udp) => vec![udp.address.as_udp_resource()],
245 #[cfg(unix)]
246 Self::Unix(_) => vec![],
247 }
248 }
249
250 fn can_acknowledge(&self) -> bool {
251 false
252 }
253}
254
255#[derive(Clone)]
256pub(crate) struct StatsdDeserializer {
257 socket_mode: Option<SocketMode>,
258 events_received: Option<Registered<EventsReceived>>,
259 parser: Parser,
260}
261
262impl StatsdDeserializer {
263 pub fn udp(sanitize: bool, convert_to: ConversionUnit) -> Self {
264 Self {
265 socket_mode: Some(SocketMode::Udp),
266 events_received: Some(register!(EventsReceived)),
268 parser: Parser::new(sanitize, convert_to),
269 }
270 }
271
272 pub const fn tcp(sanitize: bool, convert_to: ConversionUnit) -> Self {
273 Self {
274 socket_mode: None,
275 events_received: None,
276 parser: Parser::new(sanitize, convert_to),
277 }
278 }
279
280 #[cfg(unix)]
281 pub const fn unix(sanitize: bool, convert_to: ConversionUnit) -> Self {
282 Self {
283 socket_mode: Some(SocketMode::Unix),
284 events_received: None,
285 parser: Parser::new(sanitize, convert_to),
286 }
287 }
288}
289
290impl decoding::format::Deserializer for StatsdDeserializer {
291 fn parse(
292 &self,
293 bytes: Bytes,
294 _log_namespace: LogNamespace,
295 ) -> crate::Result<SmallVec<[Event; 1]>> {
296 if let Some(mode) = self.socket_mode {
298 if mode == SocketMode::Udp {
299 emit!(SocketBytesReceived {
300 mode,
301 byte_size: bytes.len(),
302 });
303 }
304 }
305
306 match std::str::from_utf8(&bytes).map_err(ParseError::InvalidUtf8) {
307 Err(error) => Err(Box::new(error)),
308 Ok(s) => match self.parser.parse(s) {
309 Ok(metric) => {
310 let event = Event::Metric(metric);
311 if let Some(er) = &self.events_received {
312 let byte_size = event.estimated_json_encoded_size_of();
313 er.emit(CountByteSize(1, byte_size));
314 }
315 Ok(smallvec![event])
316 }
317 Err(error) => Err(Box::new(error)),
318 },
319 }
320 }
321}
322
323async fn statsd_udp(
324 config: UdpConfig,
325 shutdown: ShutdownSignal,
326 mut out: SourceSender,
327) -> Result<(), ()> {
328 let listenfd = ListenFd::from_env();
329 let socket = try_bind_udp_socket(config.address, listenfd)
330 .map_err(|error| {
331 emit!(SocketBindError {
332 mode: SocketMode::Udp,
333 error
334 })
335 })
336 .await?;
337
338 if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
339 if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
340 warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
341 }
342 }
343
344 info!(
345 message = "Listening.",
346 addr = %config.address,
347 r#type = "udp"
348 );
349
350 let codec = Decoder::new(
351 Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
352 Deserializer::Boxed(Box::new(StatsdDeserializer::udp(
353 config.sanitize,
354 config.convert_to,
355 ))),
356 );
357 let mut stream = UdpFramed::new(socket, codec).take_until(shutdown);
358 while let Some(frame) = stream.next().await {
359 match frame {
360 Ok(((events, _byte_size), _sock)) => {
361 let count = events.len();
362 if (out.send_batch(events).await).is_err() {
363 emit!(StreamClosedError { count });
364 }
365 }
366 Err(error) => {
367 emit!(SocketReceiveError {
368 mode: SocketMode::Udp,
369 error
370 });
371 }
372 }
373 }
374
375 Ok(())
376}
377
378#[derive(Clone)]
379struct StatsdTcpSource {
380 sanitize: bool,
381 convert_to: ConversionUnit,
382}
383
384impl TcpSource for StatsdTcpSource {
385 type Error = vector_lib::codecs::decoding::Error;
386 type Item = SmallVec<[Event; 1]>;
387 type Decoder = Decoder;
388 type Acker = TcpNullAcker;
389
390 fn decoder(&self) -> Self::Decoder {
391 Decoder::new(
392 Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
393 Deserializer::Boxed(Box::new(StatsdDeserializer::tcp(
394 self.sanitize,
395 self.convert_to,
396 ))),
397 )
398 }
399
400 fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
401 TcpNullAcker
402 }
403}
404
405#[cfg(test)]
406mod test {
407 use futures::channel::mpsc;
408 use futures_util::SinkExt;
409 use tokio::{
410 io::AsyncWriteExt,
411 net::UdpSocket,
412 time::{sleep, Duration, Instant},
413 };
414 use vector_lib::{
415 config::ComponentKey,
416 event::{metric::TagValue, EventContainer},
417 };
418
419 use super::*;
420 use crate::test_util::{
421 collect_limited,
422 components::{
423 assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
424 SOCKET_PUSH_SOURCE_TAGS,
425 },
426 metrics::{assert_counter, assert_distribution, assert_gauge, assert_set},
427 next_addr,
428 };
429 use crate::{series, test_util::metrics::AbsoluteMetricState};
430
431 #[test]
432 fn generate_config() {
433 crate::test_util::test_generate_config::<StatsdConfig>();
434 }
435
436 #[tokio::test]
437 async fn test_statsd_udp() {
438 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
439 let in_addr = next_addr();
440 let config = StatsdConfig::Udp(UdpConfig::from_address(in_addr.into()));
441 let (sender, mut receiver) = mpsc::channel(200);
442 tokio::spawn(async move {
443 let bind_addr = next_addr();
444 let socket = UdpSocket::bind(bind_addr).await.unwrap();
445 socket.connect(in_addr).await.unwrap();
446 while let Some(bytes) = receiver.next().await {
447 socket.send(bytes).await.unwrap();
448 }
449 });
450 test_statsd(config, sender).await;
451 })
452 .await;
453 }
454
455 #[tokio::test]
456 async fn test_statsd_tcp() {
457 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
458 let in_addr = next_addr();
459 let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
460 let (sender, mut receiver) = mpsc::channel(200);
461 tokio::spawn(async move {
462 while let Some(bytes) = receiver.next().await {
463 tokio::net::TcpStream::connect(in_addr)
464 .await
465 .unwrap()
466 .write_all(bytes)
467 .await
468 .unwrap();
469 }
470 });
471 test_statsd(config, sender).await;
472 })
473 .await;
474 }
475
476 #[tokio::test]
477 async fn test_statsd_error() {
478 assert_source_error(&COMPONENT_ERROR_TAGS, async move {
479 let in_addr = next_addr();
480 let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
481 let (sender, mut receiver) = mpsc::channel(200);
482 tokio::spawn(async move {
483 while let Some(bytes) = receiver.next().await {
484 tokio::net::TcpStream::connect(in_addr)
485 .await
486 .unwrap()
487 .write_all(bytes)
488 .await
489 .unwrap();
490 }
491 });
492 test_invalid_statsd(config, sender).await;
493 })
494 .await;
495 }
496
497 #[cfg(unix)]
498 #[tokio::test]
499 async fn test_statsd_unix() {
500 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
501 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
502 let config = StatsdConfig::Unix(UnixConfig {
503 path: in_path.clone(),
504 sanitize: true,
505 convert_to: ConversionUnit::Seconds,
506 });
507 let (sender, mut receiver) = mpsc::channel(200);
508 tokio::spawn(async move {
509 while let Some(bytes) = receiver.next().await {
510 tokio::net::UnixStream::connect(&in_path)
511 .await
512 .unwrap()
513 .write_all(bytes)
514 .await
515 .unwrap();
516 }
517 });
518 test_statsd(config, sender).await;
519 })
520 .await;
521 }
522
523 #[tokio::test]
524 async fn test_statsd_udp_conversion_disabled() {
525 let in_addr = next_addr();
526 let mut config = UdpConfig::from_address(in_addr.into());
527 config.convert_to = ConversionUnit::Milliseconds;
528 let statsd_config = StatsdConfig::Udp(config);
529 let (mut sender, mut receiver) = mpsc::channel(200);
530
531 tokio::spawn(async move {
532 let bind_addr = next_addr();
533 let socket = UdpSocket::bind(bind_addr).await.unwrap();
534 socket.connect(in_addr).await.unwrap();
535 while let Some(bytes) = receiver.next().await {
536 socket.send(bytes).await.unwrap();
537 }
538 });
539
540 let component_key = ComponentKey::from("statsd_conversion_disabled");
541 let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096);
542 let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
543 let sink = statsd_config
544 .build(source_ctx)
545 .await
546 .expect("failed to build source");
547
548 tokio::spawn(async move {
549 sink.await.expect("sink should not fail");
550 });
551
552 sleep(Duration::from_millis(250)).await;
553 sender.send(b"timer:320|ms|@0.1\n").await.unwrap();
554 sleep(Duration::from_millis(250)).await;
555 shutdown
556 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
557 .await;
558 let state = collect_limited(rx)
559 .await
560 .into_iter()
561 .flat_map(EventContainer::into_events)
562 .collect::<AbsoluteMetricState>();
563 let metrics = state.finish();
564 assert_distribution(
565 &metrics,
566 series!("timer"),
567 3200.0,
568 10,
569 &[(1.0, 0), (2.0, 0), (4.0, 0), (f64::INFINITY, 10)],
570 );
571 }
572
573 async fn test_statsd(statsd_config: StatsdConfig, mut sender: mpsc::Sender<&'static [u8]>) {
574 let component_key = ComponentKey::from("statsd");
578 let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096);
579 let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
580 let sink = statsd_config
581 .build(source_ctx)
582 .await
583 .expect("failed to build statsd source");
584
585 tokio::spawn(async move {
586 sink.await.expect("sink should not fail");
587 });
588
589 sleep(Duration::from_millis(250)).await;
597
598 for _ in 0..100 {
600 sender.send(
601 b"foo:1|c|#a,b:b\nbar:42|g\nfoo:1|c|#a,b:c\nglork:3|h|@0.1\nmilliglork:3000|ms|@0.2\nset:0|s\nset:1|s\n"
602 ).await.unwrap();
603
604 sleep(Duration::from_millis(10)).await;
606 }
607
608 sleep(Duration::from_millis(250)).await;
612 shutdown
613 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
614 .await;
615
616 let state = collect_limited(rx)
620 .await
621 .into_iter()
622 .flat_map(EventContainer::into_events)
623 .collect::<AbsoluteMetricState>();
624 let metrics = state.finish();
625
626 assert_counter(
627 &metrics,
628 series!(
629 "foo",
630 "a" => TagValue::Bare,
631 "b" => "b"
632 ),
633 100.0,
634 );
635
636 assert_counter(
637 &metrics,
638 series!(
639 "foo",
640 "a" => TagValue::Bare,
641 "b" => "c"
642 ),
643 100.0,
644 );
645
646 assert_gauge(&metrics, series!("bar"), 42.0);
647 assert_distribution(
648 &metrics,
649 series!("glork"),
650 3000.0,
651 1000,
652 &[(1.0, 0), (2.0, 0), (4.0, 1000), (f64::INFINITY, 1000)],
653 );
654 assert_distribution(
655 &metrics,
656 series!("milliglork"),
657 1500.0,
658 500,
659 &[(1.0, 0), (2.0, 0), (4.0, 500), (f64::INFINITY, 500)],
660 );
661 assert_set(&metrics, series!("set"), &["0", "1"]);
662 }
663
664 async fn test_invalid_statsd(
665 statsd_config: StatsdConfig,
666 mut sender: mpsc::Sender<&'static [u8]>,
667 ) {
668 let component_key = ComponentKey::from("statsd");
672 let (tx, _rx) = SourceSender::new_test_sender_with_buffer(4096);
673 let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
674 let sink = statsd_config
675 .build(source_ctx)
676 .await
677 .expect("failed to build statsd source");
678
679 tokio::spawn(async move {
680 sink.await.expect("sink should not fail");
681 });
682
683 sleep(Duration::from_millis(250)).await;
691
692 for _ in 0..10 {
694 sender.send(b"invalid statsd message").await.unwrap();
695
696 sleep(Duration::from_millis(10)).await;
698 }
699
700 sleep(Duration::from_millis(250)).await;
704 shutdown
705 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
706 .await;
707 }
708}