vector/sources/statsd/
mod.rs

1use std::{
2    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
3    time::Duration,
4};
5use vector_lib::ipallowlist::IpAllowlistConfig;
6
7use bytes::Bytes;
8use futures::{StreamExt, TryFutureExt};
9use listenfd::ListenFd;
10use serde_with::serde_as;
11use smallvec::{smallvec, SmallVec};
12use tokio_util::udp::UdpFramed;
13use vector_lib::codecs::{
14    decoding::{self, Deserializer, Framer},
15    NewlineDelimitedDecoder,
16};
17use vector_lib::configurable::configurable_component;
18use vector_lib::internal_event::{CountByteSize, InternalEventHandle as _, Registered};
19use vector_lib::EstimatedJsonEncodedSizeOf;
20
21use self::parser::ParseError;
22use super::util::net::{try_bind_udp_socket, SocketListenAddr, TcpNullAcker, TcpSource};
23use crate::{
24    codecs::Decoder,
25    config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
26    event::Event,
27    internal_events::{
28        EventsReceived, SocketBindError, SocketBytesReceived, SocketMode, SocketReceiveError,
29        StreamClosedError,
30    },
31    net,
32    shutdown::ShutdownSignal,
33    tcp::TcpKeepaliveConfig,
34    tls::{MaybeTlsSettings, TlsSourceConfig},
35    SourceSender,
36};
37
38pub mod parser;
39#[cfg(unix)]
40mod unix;
41
42use parser::Parser;
43
44#[cfg(unix)]
45use unix::{statsd_unix, UnixConfig};
46use vector_lib::config::LogNamespace;
47
48/// Configuration for the `statsd` source.
49#[configurable_component(source("statsd", "Collect metrics emitted by the StatsD aggregator."))]
50#[derive(Clone, Debug)]
51#[serde(tag = "mode", rename_all = "snake_case")]
52#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
53#[allow(clippy::large_enum_variant)] // just used for configuration
54pub enum StatsdConfig {
55    /// Listen on TCP.
56    Tcp(TcpConfig),
57
58    /// Listen on UDP.
59    Udp(UdpConfig),
60
61    /// Listen on a Unix domain Socket (UDS).
62    #[cfg(unix)]
63    Unix(UnixConfig),
64}
65
66/// Specifies the target unit for converting incoming StatsD timing values. When set to "seconds" (the default), timing values in milliseconds (`ms`) are converted to seconds (`s`). When set to "milliseconds", the original timing values are preserved.
67#[configurable_component]
68#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
69#[serde(rename_all = "lowercase")]
70pub enum ConversionUnit {
71    /// Convert to seconds.
72    #[default]
73    Seconds,
74
75    /// Convert to milliseconds.
76    Milliseconds,
77}
78
79/// UDP configuration for the `statsd` source.
80#[configurable_component]
81#[derive(Clone, Debug)]
82pub struct UdpConfig {
83    #[configurable(derived)]
84    address: SocketListenAddr,
85
86    /// The size of the receive buffer used for each connection.
87    receive_buffer_bytes: Option<usize>,
88
89    #[serde(default = "default_sanitize")]
90    #[configurable(derived)]
91    sanitize: bool,
92
93    #[serde(default = "default_convert_to")]
94    #[configurable(derived)]
95    convert_to: ConversionUnit,
96}
97
98impl UdpConfig {
99    pub const fn from_address(address: SocketListenAddr) -> Self {
100        Self {
101            address,
102            receive_buffer_bytes: None,
103            sanitize: default_sanitize(),
104            convert_to: default_convert_to(),
105        }
106    }
107}
108
109/// TCP configuration for the `statsd` source.
110#[serde_as]
111#[configurable_component]
112#[derive(Clone, Debug)]
113pub struct TcpConfig {
114    #[configurable(derived)]
115    address: SocketListenAddr,
116
117    #[configurable(derived)]
118    keepalive: Option<TcpKeepaliveConfig>,
119
120    #[configurable(derived)]
121    pub permit_origin: Option<IpAllowlistConfig>,
122
123    #[configurable(derived)]
124    #[serde(default)]
125    tls: Option<TlsSourceConfig>,
126
127    /// The timeout before a connection is forcefully closed during shutdown.
128    #[serde(default = "default_shutdown_timeout_secs")]
129    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
130    #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
131    shutdown_timeout_secs: Duration,
132
133    /// The size of the receive buffer used for each connection.
134    #[configurable(metadata(docs::type_unit = "bytes"))]
135    receive_buffer_bytes: Option<usize>,
136
137    /// The maximum number of TCP connections that are allowed at any given time.
138    #[configurable(metadata(docs::type_unit = "connections"))]
139    connection_limit: Option<u32>,
140
141    ///	Whether or not to sanitize incoming statsd key names. When "true", keys are sanitized by:
142    /// - "/" is replaced with "-"
143    /// - All whitespace is replaced with "_"
144    /// - All non alphanumeric characters (A-Z, a-z, 0-9, _, or -) are removed.
145    #[serde(default = "default_sanitize")]
146    #[configurable(derived)]
147    sanitize: bool,
148
149    #[serde(default = "default_convert_to")]
150    #[configurable(derived)]
151    convert_to: ConversionUnit,
152}
153
154impl TcpConfig {
155    #[cfg(test)]
156    pub const fn from_address(address: SocketListenAddr) -> Self {
157        Self {
158            address,
159            keepalive: None,
160            permit_origin: None,
161            tls: None,
162            shutdown_timeout_secs: default_shutdown_timeout_secs(),
163            receive_buffer_bytes: None,
164            connection_limit: None,
165            sanitize: default_sanitize(),
166            convert_to: default_convert_to(),
167        }
168    }
169}
170
171const fn default_shutdown_timeout_secs() -> Duration {
172    Duration::from_secs(30)
173}
174
175const fn default_sanitize() -> bool {
176    true
177}
178
179const fn default_convert_to() -> ConversionUnit {
180    ConversionUnit::Seconds
181}
182
183impl GenerateConfig for StatsdConfig {
184    fn generate_config() -> toml::Value {
185        toml::Value::try_from(Self::Udp(UdpConfig::from_address(
186            SocketListenAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(
187                Ipv4Addr::LOCALHOST,
188                8125,
189            ))),
190        )))
191        .unwrap()
192    }
193}
194
195#[async_trait::async_trait]
196#[typetag::serde(name = "statsd")]
197impl SourceConfig for StatsdConfig {
198    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
199        match self {
200            StatsdConfig::Udp(config) => {
201                Ok(Box::pin(statsd_udp(config.clone(), cx.shutdown, cx.out)))
202            }
203            StatsdConfig::Tcp(config) => {
204                let tls_config = config.tls.as_ref().map(|tls| tls.tls_config.clone());
205                let tls_client_metadata_key = config
206                    .tls
207                    .as_ref()
208                    .and_then(|tls| tls.client_metadata_key.clone())
209                    .and_then(|k| k.path);
210                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
211                let statsd_tcp_source = StatsdTcpSource {
212                    sanitize: config.sanitize,
213                    convert_to: config.convert_to,
214                };
215
216                statsd_tcp_source.run(
217                    config.address,
218                    config.keepalive,
219                    config.shutdown_timeout_secs,
220                    tls,
221                    tls_client_metadata_key,
222                    config.receive_buffer_bytes,
223                    None,
224                    cx,
225                    false.into(),
226                    config.connection_limit,
227                    config.permit_origin.clone().map(Into::into),
228                    StatsdConfig::NAME,
229                    LogNamespace::Legacy,
230                )
231            }
232            #[cfg(unix)]
233            StatsdConfig::Unix(config) => statsd_unix(config.clone(), cx.shutdown, cx.out),
234        }
235    }
236
237    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
238        vec![SourceOutput::new_metrics()]
239    }
240
241    fn resources(&self) -> Vec<Resource> {
242        match self.clone() {
243            Self::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
244            Self::Udp(udp) => vec![udp.address.as_udp_resource()],
245            #[cfg(unix)]
246            Self::Unix(_) => vec![],
247        }
248    }
249
250    fn can_acknowledge(&self) -> bool {
251        false
252    }
253}
254
255#[derive(Clone)]
256pub(crate) struct StatsdDeserializer {
257    socket_mode: Option<SocketMode>,
258    events_received: Option<Registered<EventsReceived>>,
259    parser: Parser,
260}
261
262impl StatsdDeserializer {
263    pub fn udp(sanitize: bool, convert_to: ConversionUnit) -> Self {
264        Self {
265            socket_mode: Some(SocketMode::Udp),
266            // The other modes emit a different `EventsReceived`.
267            events_received: Some(register!(EventsReceived)),
268            parser: Parser::new(sanitize, convert_to),
269        }
270    }
271
272    pub const fn tcp(sanitize: bool, convert_to: ConversionUnit) -> Self {
273        Self {
274            socket_mode: None,
275            events_received: None,
276            parser: Parser::new(sanitize, convert_to),
277        }
278    }
279
280    #[cfg(unix)]
281    pub const fn unix(sanitize: bool, convert_to: ConversionUnit) -> Self {
282        Self {
283            socket_mode: Some(SocketMode::Unix),
284            events_received: None,
285            parser: Parser::new(sanitize, convert_to),
286        }
287    }
288}
289
290impl decoding::format::Deserializer for StatsdDeserializer {
291    fn parse(
292        &self,
293        bytes: Bytes,
294        _log_namespace: LogNamespace,
295    ) -> crate::Result<SmallVec<[Event; 1]>> {
296        // The other modes already emit BytesReceived
297        if let Some(mode) = self.socket_mode {
298            if mode == SocketMode::Udp {
299                emit!(SocketBytesReceived {
300                    mode,
301                    byte_size: bytes.len(),
302                });
303            }
304        }
305
306        match std::str::from_utf8(&bytes).map_err(ParseError::InvalidUtf8) {
307            Err(error) => Err(Box::new(error)),
308            Ok(s) => match self.parser.parse(s) {
309                Ok(metric) => {
310                    let event = Event::Metric(metric);
311                    if let Some(er) = &self.events_received {
312                        let byte_size = event.estimated_json_encoded_size_of();
313                        er.emit(CountByteSize(1, byte_size));
314                    }
315                    Ok(smallvec![event])
316                }
317                Err(error) => Err(Box::new(error)),
318            },
319        }
320    }
321}
322
323async fn statsd_udp(
324    config: UdpConfig,
325    shutdown: ShutdownSignal,
326    mut out: SourceSender,
327) -> Result<(), ()> {
328    let listenfd = ListenFd::from_env();
329    let socket = try_bind_udp_socket(config.address, listenfd)
330        .map_err(|error| {
331            emit!(SocketBindError {
332                mode: SocketMode::Udp,
333                error
334            })
335        })
336        .await?;
337
338    if let Some(receive_buffer_bytes) = config.receive_buffer_bytes {
339        if let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes) {
340            warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
341        }
342    }
343
344    info!(
345        message = "Listening.",
346        addr = %config.address,
347        r#type = "udp"
348    );
349
350    let codec = Decoder::new(
351        Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
352        Deserializer::Boxed(Box::new(StatsdDeserializer::udp(
353            config.sanitize,
354            config.convert_to,
355        ))),
356    );
357    let mut stream = UdpFramed::new(socket, codec).take_until(shutdown);
358    while let Some(frame) = stream.next().await {
359        match frame {
360            Ok(((events, _byte_size), _sock)) => {
361                let count = events.len();
362                if (out.send_batch(events).await).is_err() {
363                    emit!(StreamClosedError { count });
364                }
365            }
366            Err(error) => {
367                emit!(SocketReceiveError {
368                    mode: SocketMode::Udp,
369                    error
370                });
371            }
372        }
373    }
374
375    Ok(())
376}
377
378#[derive(Clone)]
379struct StatsdTcpSource {
380    sanitize: bool,
381    convert_to: ConversionUnit,
382}
383
384impl TcpSource for StatsdTcpSource {
385    type Error = vector_lib::codecs::decoding::Error;
386    type Item = SmallVec<[Event; 1]>;
387    type Decoder = Decoder;
388    type Acker = TcpNullAcker;
389
390    fn decoder(&self) -> Self::Decoder {
391        Decoder::new(
392            Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
393            Deserializer::Boxed(Box::new(StatsdDeserializer::tcp(
394                self.sanitize,
395                self.convert_to,
396            ))),
397        )
398    }
399
400    fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
401        TcpNullAcker
402    }
403}
404
405#[cfg(test)]
406mod test {
407    use futures::channel::mpsc;
408    use futures_util::SinkExt;
409    use tokio::{
410        io::AsyncWriteExt,
411        net::UdpSocket,
412        time::{sleep, Duration, Instant},
413    };
414    use vector_lib::{
415        config::ComponentKey,
416        event::{metric::TagValue, EventContainer},
417    };
418
419    use super::*;
420    use crate::test_util::{
421        collect_limited,
422        components::{
423            assert_source_compliance, assert_source_error, COMPONENT_ERROR_TAGS,
424            SOCKET_PUSH_SOURCE_TAGS,
425        },
426        metrics::{assert_counter, assert_distribution, assert_gauge, assert_set},
427        next_addr,
428    };
429    use crate::{series, test_util::metrics::AbsoluteMetricState};
430
431    #[test]
432    fn generate_config() {
433        crate::test_util::test_generate_config::<StatsdConfig>();
434    }
435
436    #[tokio::test]
437    async fn test_statsd_udp() {
438        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
439            let in_addr = next_addr();
440            let config = StatsdConfig::Udp(UdpConfig::from_address(in_addr.into()));
441            let (sender, mut receiver) = mpsc::channel(200);
442            tokio::spawn(async move {
443                let bind_addr = next_addr();
444                let socket = UdpSocket::bind(bind_addr).await.unwrap();
445                socket.connect(in_addr).await.unwrap();
446                while let Some(bytes) = receiver.next().await {
447                    socket.send(bytes).await.unwrap();
448                }
449            });
450            test_statsd(config, sender).await;
451        })
452        .await;
453    }
454
455    #[tokio::test]
456    async fn test_statsd_tcp() {
457        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
458            let in_addr = next_addr();
459            let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
460            let (sender, mut receiver) = mpsc::channel(200);
461            tokio::spawn(async move {
462                while let Some(bytes) = receiver.next().await {
463                    tokio::net::TcpStream::connect(in_addr)
464                        .await
465                        .unwrap()
466                        .write_all(bytes)
467                        .await
468                        .unwrap();
469                }
470            });
471            test_statsd(config, sender).await;
472        })
473        .await;
474    }
475
476    #[tokio::test]
477    async fn test_statsd_error() {
478        assert_source_error(&COMPONENT_ERROR_TAGS, async move {
479            let in_addr = next_addr();
480            let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
481            let (sender, mut receiver) = mpsc::channel(200);
482            tokio::spawn(async move {
483                while let Some(bytes) = receiver.next().await {
484                    tokio::net::TcpStream::connect(in_addr)
485                        .await
486                        .unwrap()
487                        .write_all(bytes)
488                        .await
489                        .unwrap();
490                }
491            });
492            test_invalid_statsd(config, sender).await;
493        })
494        .await;
495    }
496
497    #[cfg(unix)]
498    #[tokio::test]
499    async fn test_statsd_unix() {
500        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
501            let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
502            let config = StatsdConfig::Unix(UnixConfig {
503                path: in_path.clone(),
504                sanitize: true,
505                convert_to: ConversionUnit::Seconds,
506            });
507            let (sender, mut receiver) = mpsc::channel(200);
508            tokio::spawn(async move {
509                while let Some(bytes) = receiver.next().await {
510                    tokio::net::UnixStream::connect(&in_path)
511                        .await
512                        .unwrap()
513                        .write_all(bytes)
514                        .await
515                        .unwrap();
516                }
517            });
518            test_statsd(config, sender).await;
519        })
520        .await;
521    }
522
523    #[tokio::test]
524    async fn test_statsd_udp_conversion_disabled() {
525        let in_addr = next_addr();
526        let mut config = UdpConfig::from_address(in_addr.into());
527        config.convert_to = ConversionUnit::Milliseconds;
528        let statsd_config = StatsdConfig::Udp(config);
529        let (mut sender, mut receiver) = mpsc::channel(200);
530
531        tokio::spawn(async move {
532            let bind_addr = next_addr();
533            let socket = UdpSocket::bind(bind_addr).await.unwrap();
534            socket.connect(in_addr).await.unwrap();
535            while let Some(bytes) = receiver.next().await {
536                socket.send(bytes).await.unwrap();
537            }
538        });
539
540        let component_key = ComponentKey::from("statsd_conversion_disabled");
541        let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096);
542        let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
543        let sink = statsd_config
544            .build(source_ctx)
545            .await
546            .expect("failed to build source");
547
548        tokio::spawn(async move {
549            sink.await.expect("sink should not fail");
550        });
551
552        sleep(Duration::from_millis(250)).await;
553        sender.send(b"timer:320|ms|@0.1\n").await.unwrap();
554        sleep(Duration::from_millis(250)).await;
555        shutdown
556            .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
557            .await;
558        let state = collect_limited(rx)
559            .await
560            .into_iter()
561            .flat_map(EventContainer::into_events)
562            .collect::<AbsoluteMetricState>();
563        let metrics = state.finish();
564        assert_distribution(
565            &metrics,
566            series!("timer"),
567            3200.0,
568            10,
569            &[(1.0, 0), (2.0, 0), (4.0, 0), (f64::INFINITY, 10)],
570        );
571    }
572
573    async fn test_statsd(statsd_config: StatsdConfig, mut sender: mpsc::Sender<&'static [u8]>) {
574        // Build our statsd source and then spawn it.  We use a big pipeline buffer because each
575        // packet we send has a lot of metrics per packet.  We could technically count them all up
576        // and have a more accurate number here, but honestly, who cares?  This is big enough.
577        let component_key = ComponentKey::from("statsd");
578        let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096);
579        let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
580        let sink = statsd_config
581            .build(source_ctx)
582            .await
583            .expect("failed to build statsd source");
584
585        tokio::spawn(async move {
586            sink.await.expect("sink should not fail");
587        });
588
589        // Wait like 250ms to give the sink time to start running and become ready to handle
590        // traffic.
591        //
592        // TODO: It'd be neat if we could make `ShutdownSignal` track when it was polled at least once,
593        // and then surface that (via one of the related types, maybe) somehow so we could use it as
594        // a signal for "the sink is ready, it's polled the shutdown future at least once, which
595        // means it's trying to accept connections, etc" and would be far more deterministic than this.
596        sleep(Duration::from_millis(250)).await;
597
598        // Send all of the messages.
599        for _ in 0..100 {
600            sender.send(
601                b"foo:1|c|#a,b:b\nbar:42|g\nfoo:1|c|#a,b:c\nglork:3|h|@0.1\nmilliglork:3000|ms|@0.2\nset:0|s\nset:1|s\n"
602            ).await.unwrap();
603
604            // Space things out slightly to try to avoid dropped packets.
605            sleep(Duration::from_millis(10)).await;
606        }
607
608        // Now wait for another small period of time to make sure we've processed the messages.
609        // After that, trigger shutdown so our source closes and allows us to deterministically read
610        // everything that was in up without having to know the exact count.
611        sleep(Duration::from_millis(250)).await;
612        shutdown
613            .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
614            .await;
615
616        // Read all the events into a `MetricState`, which handles normalizing metrics and tracking
617        // cumulative values for incremental metrics, etc.  This will represent the final/cumulative
618        // values for each metric sent by the source into the pipeline.
619        let state = collect_limited(rx)
620            .await
621            .into_iter()
622            .flat_map(EventContainer::into_events)
623            .collect::<AbsoluteMetricState>();
624        let metrics = state.finish();
625
626        assert_counter(
627            &metrics,
628            series!(
629                "foo",
630                "a" => TagValue::Bare,
631                "b" => "b"
632            ),
633            100.0,
634        );
635
636        assert_counter(
637            &metrics,
638            series!(
639                "foo",
640                "a" => TagValue::Bare,
641                "b" => "c"
642            ),
643            100.0,
644        );
645
646        assert_gauge(&metrics, series!("bar"), 42.0);
647        assert_distribution(
648            &metrics,
649            series!("glork"),
650            3000.0,
651            1000,
652            &[(1.0, 0), (2.0, 0), (4.0, 1000), (f64::INFINITY, 1000)],
653        );
654        assert_distribution(
655            &metrics,
656            series!("milliglork"),
657            1500.0,
658            500,
659            &[(1.0, 0), (2.0, 0), (4.0, 500), (f64::INFINITY, 500)],
660        );
661        assert_set(&metrics, series!("set"), &["0", "1"]);
662    }
663
664    async fn test_invalid_statsd(
665        statsd_config: StatsdConfig,
666        mut sender: mpsc::Sender<&'static [u8]>,
667    ) {
668        // Build our statsd source and then spawn it.  We use a big pipeline buffer because each
669        // packet we send has a lot of metrics per packet.  We could technically count them all up
670        // and have a more accurate number here, but honestly, who cares?  This is big enough.
671        let component_key = ComponentKey::from("statsd");
672        let (tx, _rx) = SourceSender::new_test_sender_with_buffer(4096);
673        let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
674        let sink = statsd_config
675            .build(source_ctx)
676            .await
677            .expect("failed to build statsd source");
678
679        tokio::spawn(async move {
680            sink.await.expect("sink should not fail");
681        });
682
683        // Wait like 250ms to give the sink time to start running and become ready to handle
684        // traffic.
685        //
686        // TODO: It'd be neat if we could make `ShutdownSignal` track when it was polled at least once,
687        // and then surface that (via one of the related types, maybe) somehow so we could use it as
688        // a signal for "the sink is ready, it's polled the shutdown future at least once, which
689        // means it's trying to accept connections, etc" and would be far more deterministic than this.
690        sleep(Duration::from_millis(250)).await;
691
692        // Send 10 invalid statsd messages
693        for _ in 0..10 {
694            sender.send(b"invalid statsd message").await.unwrap();
695
696            // Space things out slightly to try to avoid dropped packets.
697            sleep(Duration::from_millis(10)).await;
698        }
699
700        // Now wait for another small period of time to make sure we've processed the messages.
701        // After that, trigger shutdown so our source closes and allows us to deterministically read
702        // everything that was in up without having to know the exact count.
703        sleep(Duration::from_millis(250)).await;
704        shutdown
705            .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
706            .await;
707    }
708}