1use std::{
2 net::{Ipv4Addr, SocketAddr, SocketAddrV4},
3 time::Duration,
4};
5
6use bytes::Bytes;
7use futures::{StreamExt, TryFutureExt};
8use listenfd::ListenFd;
9use serde_with::serde_as;
10use smallvec::{SmallVec, smallvec};
11use tokio_util::udp::UdpFramed;
12use vector_lib::{
13 EstimatedJsonEncodedSizeOf,
14 codecs::{
15 NewlineDelimitedDecoder,
16 decoding::{self, Deserializer, Framer},
17 },
18 configurable::configurable_component,
19 internal_event::{CountByteSize, InternalEventHandle as _, Registered},
20 ipallowlist::IpAllowlistConfig,
21};
22
23use self::parser::ParseError;
24use super::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket};
25use crate::{
26 SourceSender,
27 codecs::Decoder,
28 config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
29 event::Event,
30 internal_events::{
31 EventsReceived, SocketBindError, SocketBytesReceived, SocketMode, SocketReceiveError,
32 StreamClosedError,
33 },
34 net,
35 shutdown::ShutdownSignal,
36 tcp::TcpKeepaliveConfig,
37 tls::{MaybeTlsSettings, TlsSourceConfig},
38};
39
40pub mod parser;
41#[cfg(unix)]
42mod unix;
43
44use parser::Parser;
45#[cfg(unix)]
46use unix::{UnixConfig, statsd_unix};
47use vector_lib::config::LogNamespace;
48
49#[configurable_component(source("statsd", "Collect metrics emitted by the StatsD aggregator."))]
51#[derive(Clone, Debug)]
52#[serde(tag = "mode", rename_all = "snake_case")]
53#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
54#[allow(clippy::large_enum_variant)] pub enum StatsdConfig {
56 Tcp(TcpConfig),
58
59 Udp(UdpConfig),
61
62 #[cfg(unix)]
64 Unix(UnixConfig),
65}
66
67#[configurable_component]
69#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
70#[serde(rename_all = "lowercase")]
71pub enum ConversionUnit {
72 #[default]
74 Seconds,
75
76 Milliseconds,
78}
79
80#[configurable_component]
82#[derive(Clone, Debug)]
83pub struct UdpConfig {
84 #[configurable(derived)]
85 address: SocketListenAddr,
86
87 receive_buffer_bytes: Option<usize>,
89
90 #[serde(default = "default_sanitize")]
91 #[configurable(derived)]
92 sanitize: bool,
93
94 #[serde(default = "default_convert_to")]
95 #[configurable(derived)]
96 convert_to: ConversionUnit,
97}
98
99impl UdpConfig {
100 pub const fn from_address(address: SocketListenAddr) -> Self {
101 Self {
102 address,
103 receive_buffer_bytes: None,
104 sanitize: default_sanitize(),
105 convert_to: default_convert_to(),
106 }
107 }
108}
109
110#[serde_as]
112#[configurable_component]
113#[derive(Clone, Debug)]
114pub struct TcpConfig {
115 #[configurable(derived)]
116 address: SocketListenAddr,
117
118 #[configurable(derived)]
119 keepalive: Option<TcpKeepaliveConfig>,
120
121 #[configurable(derived)]
122 pub permit_origin: Option<IpAllowlistConfig>,
123
124 #[configurable(derived)]
125 #[serde(default)]
126 tls: Option<TlsSourceConfig>,
127
128 #[serde(default = "default_shutdown_timeout_secs")]
130 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
131 #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
132 shutdown_timeout_secs: Duration,
133
134 #[configurable(metadata(docs::type_unit = "bytes"))]
136 receive_buffer_bytes: Option<usize>,
137
138 #[configurable(metadata(docs::type_unit = "connections"))]
140 connection_limit: Option<u32>,
141
142 #[serde(default = "default_sanitize")]
147 #[configurable(derived)]
148 sanitize: bool,
149
150 #[serde(default = "default_convert_to")]
151 #[configurable(derived)]
152 convert_to: ConversionUnit,
153}
154
155impl TcpConfig {
156 #[cfg(test)]
157 pub const fn from_address(address: SocketListenAddr) -> Self {
158 Self {
159 address,
160 keepalive: None,
161 permit_origin: None,
162 tls: None,
163 shutdown_timeout_secs: default_shutdown_timeout_secs(),
164 receive_buffer_bytes: None,
165 connection_limit: None,
166 sanitize: default_sanitize(),
167 convert_to: default_convert_to(),
168 }
169 }
170}
171
172const fn default_shutdown_timeout_secs() -> Duration {
173 Duration::from_secs(30)
174}
175
176const fn default_sanitize() -> bool {
177 true
178}
179
180const fn default_convert_to() -> ConversionUnit {
181 ConversionUnit::Seconds
182}
183
184impl GenerateConfig for StatsdConfig {
185 fn generate_config() -> toml::Value {
186 toml::Value::try_from(Self::Udp(UdpConfig::from_address(
187 SocketListenAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(
188 Ipv4Addr::LOCALHOST,
189 8125,
190 ))),
191 )))
192 .unwrap()
193 }
194}
195
196#[async_trait::async_trait]
197#[typetag::serde(name = "statsd")]
198impl SourceConfig for StatsdConfig {
199 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
200 match self {
201 StatsdConfig::Udp(config) => {
202 Ok(Box::pin(statsd_udp(config.clone(), cx.shutdown, cx.out)))
203 }
204 StatsdConfig::Tcp(config) => {
205 let tls_config = config.tls.as_ref().map(|tls| tls.tls_config.clone());
206 let tls_client_metadata_key = config
207 .tls
208 .as_ref()
209 .and_then(|tls| tls.client_metadata_key.clone())
210 .and_then(|k| k.path);
211 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
212 let statsd_tcp_source = StatsdTcpSource {
213 sanitize: config.sanitize,
214 convert_to: config.convert_to,
215 };
216
217 statsd_tcp_source.run(
218 config.address,
219 config.keepalive,
220 config.shutdown_timeout_secs,
221 tls,
222 tls_client_metadata_key,
223 config.receive_buffer_bytes,
224 None,
225 cx,
226 false.into(),
227 config.connection_limit,
228 config.permit_origin.clone().map(Into::into),
229 StatsdConfig::NAME,
230 LogNamespace::Legacy,
231 )
232 }
233 #[cfg(unix)]
234 StatsdConfig::Unix(config) => statsd_unix(config.clone(), cx.shutdown, cx.out),
235 }
236 }
237
238 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
239 vec![SourceOutput::new_metrics()]
240 }
241
242 fn resources(&self) -> Vec<Resource> {
243 match self.clone() {
244 Self::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
245 Self::Udp(udp) => vec![udp.address.as_udp_resource()],
246 #[cfg(unix)]
247 Self::Unix(_) => vec![],
248 }
249 }
250
251 fn can_acknowledge(&self) -> bool {
252 false
253 }
254}
255
256#[derive(Clone)]
257pub(crate) struct StatsdDeserializer {
258 socket_mode: Option<SocketMode>,
259 events_received: Option<Registered<EventsReceived>>,
260 parser: Parser,
261}
262
263impl StatsdDeserializer {
264 pub fn udp(sanitize: bool, convert_to: ConversionUnit) -> Self {
265 Self {
266 socket_mode: Some(SocketMode::Udp),
267 events_received: Some(register!(EventsReceived)),
269 parser: Parser::new(sanitize, convert_to),
270 }
271 }
272
273 pub const fn tcp(sanitize: bool, convert_to: ConversionUnit) -> Self {
274 Self {
275 socket_mode: None,
276 events_received: None,
277 parser: Parser::new(sanitize, convert_to),
278 }
279 }
280
281 #[cfg(unix)]
282 pub const fn unix(sanitize: bool, convert_to: ConversionUnit) -> Self {
283 Self {
284 socket_mode: Some(SocketMode::Unix),
285 events_received: None,
286 parser: Parser::new(sanitize, convert_to),
287 }
288 }
289}
290
291impl decoding::format::Deserializer for StatsdDeserializer {
292 fn parse(
293 &self,
294 bytes: Bytes,
295 _log_namespace: LogNamespace,
296 ) -> crate::Result<SmallVec<[Event; 1]>> {
297 if let Some(mode) = self.socket_mode
299 && mode == SocketMode::Udp
300 {
301 emit!(SocketBytesReceived {
302 mode,
303 byte_size: bytes.len(),
304 });
305 }
306
307 match std::str::from_utf8(&bytes).map_err(ParseError::InvalidUtf8) {
308 Err(error) => Err(Box::new(error)),
309 Ok(s) => match self.parser.parse(s) {
310 Ok(metric) => {
311 let event = Event::Metric(metric);
312 if let Some(er) = &self.events_received {
313 let byte_size = event.estimated_json_encoded_size_of();
314 er.emit(CountByteSize(1, byte_size));
315 }
316 Ok(smallvec![event])
317 }
318 Err(error) => Err(Box::new(error)),
319 },
320 }
321 }
322}
323
324async fn statsd_udp(
325 config: UdpConfig,
326 shutdown: ShutdownSignal,
327 mut out: SourceSender,
328) -> Result<(), ()> {
329 let listenfd = ListenFd::from_env();
330 let socket = try_bind_udp_socket(config.address, listenfd)
331 .map_err(|error| {
332 emit!(SocketBindError {
333 mode: SocketMode::Udp,
334 error
335 })
336 })
337 .await?;
338
339 if let Some(receive_buffer_bytes) = config.receive_buffer_bytes
340 && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
341 {
342 warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
343 }
344
345 info!(
346 message = "Listening.",
347 addr = %config.address,
348 r#type = "udp"
349 );
350
351 let codec = Decoder::new(
352 Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
353 Deserializer::Boxed(Box::new(StatsdDeserializer::udp(
354 config.sanitize,
355 config.convert_to,
356 ))),
357 );
358 let mut stream = UdpFramed::new(socket, codec).take_until(shutdown);
359 while let Some(frame) = stream.next().await {
360 match frame {
361 Ok(((events, _byte_size), _sock)) => {
362 let count = events.len();
363 if (out.send_batch(events).await).is_err() {
364 emit!(StreamClosedError { count });
365 }
366 }
367 Err(error) => {
368 emit!(SocketReceiveError {
369 mode: SocketMode::Udp,
370 error
371 });
372 }
373 }
374 }
375
376 Ok(())
377}
378
379#[derive(Clone)]
380struct StatsdTcpSource {
381 sanitize: bool,
382 convert_to: ConversionUnit,
383}
384
385impl TcpSource for StatsdTcpSource {
386 type Error = vector_lib::codecs::decoding::Error;
387 type Item = SmallVec<[Event; 1]>;
388 type Decoder = Decoder;
389 type Acker = TcpNullAcker;
390
391 fn decoder(&self) -> Self::Decoder {
392 Decoder::new(
393 Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
394 Deserializer::Boxed(Box::new(StatsdDeserializer::tcp(
395 self.sanitize,
396 self.convert_to,
397 ))),
398 )
399 }
400
401 fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
402 TcpNullAcker
403 }
404}
405
406#[cfg(test)]
407mod test {
408 use futures::channel::mpsc;
409 use futures_util::SinkExt;
410 use tokio::{
411 io::AsyncWriteExt,
412 net::UdpSocket,
413 time::{Duration, Instant, sleep},
414 };
415 use vector_lib::{
416 config::ComponentKey,
417 event::{EventContainer, metric::TagValue},
418 };
419
420 use super::*;
421 use crate::{
422 series,
423 test_util::{
424 collect_limited,
425 components::{
426 COMPONENT_ERROR_TAGS, SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance,
427 assert_source_error,
428 },
429 metrics::{
430 AbsoluteMetricState, assert_counter, assert_distribution, assert_gauge, assert_set,
431 },
432 next_addr,
433 },
434 };
435
436 #[test]
437 fn generate_config() {
438 crate::test_util::test_generate_config::<StatsdConfig>();
439 }
440
441 #[tokio::test]
442 async fn test_statsd_udp() {
443 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
444 let in_addr = next_addr();
445 let config = StatsdConfig::Udp(UdpConfig::from_address(in_addr.into()));
446 let (sender, mut receiver) = mpsc::channel(200);
447 tokio::spawn(async move {
448 let bind_addr = next_addr();
449 let socket = UdpSocket::bind(bind_addr).await.unwrap();
450 socket.connect(in_addr).await.unwrap();
451 while let Some(bytes) = receiver.next().await {
452 socket.send(bytes).await.unwrap();
453 }
454 });
455 test_statsd(config, sender).await;
456 })
457 .await;
458 }
459
460 #[tokio::test]
461 async fn test_statsd_tcp() {
462 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
463 let in_addr = next_addr();
464 let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
465 let (sender, mut receiver) = mpsc::channel(200);
466 tokio::spawn(async move {
467 while let Some(bytes) = receiver.next().await {
468 tokio::net::TcpStream::connect(in_addr)
469 .await
470 .unwrap()
471 .write_all(bytes)
472 .await
473 .unwrap();
474 }
475 });
476 test_statsd(config, sender).await;
477 })
478 .await;
479 }
480
481 #[tokio::test]
482 async fn test_statsd_error() {
483 assert_source_error(&COMPONENT_ERROR_TAGS, async move {
484 let in_addr = next_addr();
485 let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
486 let (sender, mut receiver) = mpsc::channel(200);
487 tokio::spawn(async move {
488 while let Some(bytes) = receiver.next().await {
489 tokio::net::TcpStream::connect(in_addr)
490 .await
491 .unwrap()
492 .write_all(bytes)
493 .await
494 .unwrap();
495 }
496 });
497 test_invalid_statsd(config, sender).await;
498 })
499 .await;
500 }
501
502 #[cfg(unix)]
503 #[tokio::test]
504 async fn test_statsd_unix() {
505 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
506 let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
507 let config = StatsdConfig::Unix(UnixConfig {
508 path: in_path.clone(),
509 sanitize: true,
510 convert_to: ConversionUnit::Seconds,
511 });
512 let (sender, mut receiver) = mpsc::channel(200);
513 tokio::spawn(async move {
514 while let Some(bytes) = receiver.next().await {
515 tokio::net::UnixStream::connect(&in_path)
516 .await
517 .unwrap()
518 .write_all(bytes)
519 .await
520 .unwrap();
521 }
522 });
523 test_statsd(config, sender).await;
524 })
525 .await;
526 }
527
528 #[tokio::test]
529 async fn test_statsd_udp_conversion_disabled() {
530 let in_addr = next_addr();
531 let mut config = UdpConfig::from_address(in_addr.into());
532 config.convert_to = ConversionUnit::Milliseconds;
533 let statsd_config = StatsdConfig::Udp(config);
534 let (mut sender, mut receiver) = mpsc::channel(200);
535
536 tokio::spawn(async move {
537 let bind_addr = next_addr();
538 let socket = UdpSocket::bind(bind_addr).await.unwrap();
539 socket.connect(in_addr).await.unwrap();
540 while let Some(bytes) = receiver.next().await {
541 socket.send(bytes).await.unwrap();
542 }
543 });
544
545 let component_key = ComponentKey::from("statsd_conversion_disabled");
546 let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096);
547 let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
548 let sink = statsd_config
549 .build(source_ctx)
550 .await
551 .expect("failed to build source");
552
553 tokio::spawn(async move {
554 sink.await.expect("sink should not fail");
555 });
556
557 sleep(Duration::from_millis(250)).await;
558 sender.send(b"timer:320|ms|@0.1\n").await.unwrap();
559 sleep(Duration::from_millis(250)).await;
560 shutdown
561 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
562 .await;
563 let state = collect_limited(rx)
564 .await
565 .into_iter()
566 .flat_map(EventContainer::into_events)
567 .collect::<AbsoluteMetricState>();
568 let metrics = state.finish();
569 assert_distribution(
570 &metrics,
571 series!("timer"),
572 3200.0,
573 10,
574 &[(1.0, 0), (2.0, 0), (4.0, 0), (f64::INFINITY, 10)],
575 );
576 }
577
578 async fn test_statsd(statsd_config: StatsdConfig, mut sender: mpsc::Sender<&'static [u8]>) {
579 let component_key = ComponentKey::from("statsd");
583 let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096);
584 let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
585 let sink = statsd_config
586 .build(source_ctx)
587 .await
588 .expect("failed to build statsd source");
589
590 tokio::spawn(async move {
591 sink.await.expect("sink should not fail");
592 });
593
594 sleep(Duration::from_millis(250)).await;
602
603 for _ in 0..100 {
605 sender.send(
606 b"foo:1|c|#a,b:b\nbar:42|g\nfoo:1|c|#a,b:c\nglork:3|h|@0.1\nmilliglork:3000|ms|@0.2\nset:0|s\nset:1|s\n"
607 ).await.unwrap();
608
609 sleep(Duration::from_millis(10)).await;
611 }
612
613 sleep(Duration::from_millis(250)).await;
617 shutdown
618 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
619 .await;
620
621 let state = collect_limited(rx)
625 .await
626 .into_iter()
627 .flat_map(EventContainer::into_events)
628 .collect::<AbsoluteMetricState>();
629 let metrics = state.finish();
630
631 assert_counter(
632 &metrics,
633 series!(
634 "foo",
635 "a" => TagValue::Bare,
636 "b" => "b"
637 ),
638 100.0,
639 );
640
641 assert_counter(
642 &metrics,
643 series!(
644 "foo",
645 "a" => TagValue::Bare,
646 "b" => "c"
647 ),
648 100.0,
649 );
650
651 assert_gauge(&metrics, series!("bar"), 42.0);
652 assert_distribution(
653 &metrics,
654 series!("glork"),
655 3000.0,
656 1000,
657 &[(1.0, 0), (2.0, 0), (4.0, 1000), (f64::INFINITY, 1000)],
658 );
659 assert_distribution(
660 &metrics,
661 series!("milliglork"),
662 1500.0,
663 500,
664 &[(1.0, 0), (2.0, 0), (4.0, 500), (f64::INFINITY, 500)],
665 );
666 assert_set(&metrics, series!("set"), &["0", "1"]);
667 }
668
669 async fn test_invalid_statsd(
670 statsd_config: StatsdConfig,
671 mut sender: mpsc::Sender<&'static [u8]>,
672 ) {
673 let component_key = ComponentKey::from("statsd");
677 let (tx, _rx) = SourceSender::new_test_sender_with_buffer(4096);
678 let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
679 let sink = statsd_config
680 .build(source_ctx)
681 .await
682 .expect("failed to build statsd source");
683
684 tokio::spawn(async move {
685 sink.await.expect("sink should not fail");
686 });
687
688 sleep(Duration::from_millis(250)).await;
696
697 for _ in 0..10 {
699 sender.send(b"invalid statsd message").await.unwrap();
700
701 sleep(Duration::from_millis(10)).await;
703 }
704
705 sleep(Duration::from_millis(250)).await;
709 shutdown
710 .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
711 .await;
712 }
713}