vector/sources/statsd/
mod.rs

1use std::{
2    net::{Ipv4Addr, SocketAddr, SocketAddrV4},
3    time::Duration,
4};
5
6use bytes::Bytes;
7use futures::{StreamExt, TryFutureExt};
8use listenfd::ListenFd;
9use serde_with::serde_as;
10use smallvec::{SmallVec, smallvec};
11use tokio_util::udp::UdpFramed;
12use vector_lib::{
13    EstimatedJsonEncodedSizeOf,
14    codecs::{
15        NewlineDelimitedDecoder,
16        decoding::{self, Deserializer, Framer},
17    },
18    configurable::configurable_component,
19    internal_event::{CountByteSize, InternalEventHandle as _, Registered},
20    ipallowlist::IpAllowlistConfig,
21};
22
23use self::parser::ParseError;
24use super::util::net::{SocketListenAddr, TcpNullAcker, TcpSource, try_bind_udp_socket};
25use crate::{
26    SourceSender,
27    codecs::Decoder,
28    config::{GenerateConfig, Resource, SourceConfig, SourceContext, SourceOutput},
29    event::Event,
30    internal_events::{
31        EventsReceived, SocketBindError, SocketBytesReceived, SocketMode, SocketReceiveError,
32        StreamClosedError,
33    },
34    net,
35    shutdown::ShutdownSignal,
36    tcp::TcpKeepaliveConfig,
37    tls::{MaybeTlsSettings, TlsSourceConfig},
38};
39
40pub mod parser;
41#[cfg(unix)]
42mod unix;
43
44use parser::Parser;
45#[cfg(unix)]
46use unix::{UnixConfig, statsd_unix};
47use vector_lib::config::LogNamespace;
48
49/// Configuration for the `statsd` source.
50#[configurable_component(source("statsd", "Collect metrics emitted by the StatsD aggregator."))]
51#[derive(Clone, Debug)]
52#[serde(tag = "mode", rename_all = "snake_case")]
53#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
54#[allow(clippy::large_enum_variant)] // just used for configuration
55pub enum StatsdConfig {
56    /// Listen on TCP.
57    Tcp(TcpConfig),
58
59    /// Listen on UDP.
60    Udp(UdpConfig),
61
62    /// Listen on a Unix domain Socket (UDS).
63    #[cfg(unix)]
64    Unix(UnixConfig),
65}
66
67/// Specifies the target unit for converting incoming StatsD timing values. When set to "seconds" (the default), timing values in milliseconds (`ms`) are converted to seconds (`s`). When set to "milliseconds", the original timing values are preserved.
68#[configurable_component]
69#[derive(Clone, Debug, Copy, PartialEq, Eq, Default)]
70#[serde(rename_all = "lowercase")]
71pub enum ConversionUnit {
72    /// Convert to seconds.
73    #[default]
74    Seconds,
75
76    /// Convert to milliseconds.
77    Milliseconds,
78}
79
80/// UDP configuration for the `statsd` source.
81#[configurable_component]
82#[derive(Clone, Debug)]
83pub struct UdpConfig {
84    #[configurable(derived)]
85    address: SocketListenAddr,
86
87    /// The size of the receive buffer used for each connection.
88    receive_buffer_bytes: Option<usize>,
89
90    #[serde(default = "default_sanitize")]
91    #[configurable(derived)]
92    sanitize: bool,
93
94    #[serde(default = "default_convert_to")]
95    #[configurable(derived)]
96    convert_to: ConversionUnit,
97}
98
99impl UdpConfig {
100    pub const fn from_address(address: SocketListenAddr) -> Self {
101        Self {
102            address,
103            receive_buffer_bytes: None,
104            sanitize: default_sanitize(),
105            convert_to: default_convert_to(),
106        }
107    }
108}
109
110/// TCP configuration for the `statsd` source.
111#[serde_as]
112#[configurable_component]
113#[derive(Clone, Debug)]
114pub struct TcpConfig {
115    #[configurable(derived)]
116    address: SocketListenAddr,
117
118    #[configurable(derived)]
119    keepalive: Option<TcpKeepaliveConfig>,
120
121    #[configurable(derived)]
122    pub permit_origin: Option<IpAllowlistConfig>,
123
124    #[configurable(derived)]
125    #[serde(default)]
126    tls: Option<TlsSourceConfig>,
127
128    /// The timeout before a connection is forcefully closed during shutdown.
129    #[serde(default = "default_shutdown_timeout_secs")]
130    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
131    #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
132    shutdown_timeout_secs: Duration,
133
134    /// The size of the receive buffer used for each connection.
135    #[configurable(metadata(docs::type_unit = "bytes"))]
136    receive_buffer_bytes: Option<usize>,
137
138    /// The maximum number of TCP connections that are allowed at any given time.
139    #[configurable(metadata(docs::type_unit = "connections"))]
140    connection_limit: Option<u32>,
141
142    ///	Whether or not to sanitize incoming statsd key names. When "true", keys are sanitized by:
143    /// - "/" is replaced with "-"
144    /// - All whitespace is replaced with "_"
145    /// - All non alphanumeric characters (A-Z, a-z, 0-9, _, or -) are removed.
146    #[serde(default = "default_sanitize")]
147    #[configurable(derived)]
148    sanitize: bool,
149
150    #[serde(default = "default_convert_to")]
151    #[configurable(derived)]
152    convert_to: ConversionUnit,
153}
154
155impl TcpConfig {
156    #[cfg(test)]
157    pub const fn from_address(address: SocketListenAddr) -> Self {
158        Self {
159            address,
160            keepalive: None,
161            permit_origin: None,
162            tls: None,
163            shutdown_timeout_secs: default_shutdown_timeout_secs(),
164            receive_buffer_bytes: None,
165            connection_limit: None,
166            sanitize: default_sanitize(),
167            convert_to: default_convert_to(),
168        }
169    }
170}
171
172const fn default_shutdown_timeout_secs() -> Duration {
173    Duration::from_secs(30)
174}
175
176const fn default_sanitize() -> bool {
177    true
178}
179
180const fn default_convert_to() -> ConversionUnit {
181    ConversionUnit::Seconds
182}
183
184impl GenerateConfig for StatsdConfig {
185    fn generate_config() -> toml::Value {
186        toml::Value::try_from(Self::Udp(UdpConfig::from_address(
187            SocketListenAddr::SocketAddr(SocketAddr::V4(SocketAddrV4::new(
188                Ipv4Addr::LOCALHOST,
189                8125,
190            ))),
191        )))
192        .unwrap()
193    }
194}
195
196#[async_trait::async_trait]
197#[typetag::serde(name = "statsd")]
198impl SourceConfig for StatsdConfig {
199    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
200        match self {
201            StatsdConfig::Udp(config) => {
202                Ok(Box::pin(statsd_udp(config.clone(), cx.shutdown, cx.out)))
203            }
204            StatsdConfig::Tcp(config) => {
205                let tls_config = config.tls.as_ref().map(|tls| tls.tls_config.clone());
206                let tls_client_metadata_key = config
207                    .tls
208                    .as_ref()
209                    .and_then(|tls| tls.client_metadata_key.clone())
210                    .and_then(|k| k.path);
211                let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
212                let statsd_tcp_source = StatsdTcpSource {
213                    sanitize: config.sanitize,
214                    convert_to: config.convert_to,
215                };
216
217                statsd_tcp_source.run(
218                    config.address,
219                    config.keepalive,
220                    config.shutdown_timeout_secs,
221                    tls,
222                    tls_client_metadata_key,
223                    config.receive_buffer_bytes,
224                    None,
225                    cx,
226                    false.into(),
227                    config.connection_limit,
228                    config.permit_origin.clone().map(Into::into),
229                    StatsdConfig::NAME,
230                    LogNamespace::Legacy,
231                )
232            }
233            #[cfg(unix)]
234            StatsdConfig::Unix(config) => statsd_unix(config.clone(), cx.shutdown, cx.out),
235        }
236    }
237
238    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
239        vec![SourceOutput::new_metrics()]
240    }
241
242    fn resources(&self) -> Vec<Resource> {
243        match self.clone() {
244            Self::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
245            Self::Udp(udp) => vec![udp.address.as_udp_resource()],
246            #[cfg(unix)]
247            Self::Unix(_) => vec![],
248        }
249    }
250
251    fn can_acknowledge(&self) -> bool {
252        false
253    }
254}
255
256#[derive(Clone)]
257pub(crate) struct StatsdDeserializer {
258    socket_mode: Option<SocketMode>,
259    events_received: Option<Registered<EventsReceived>>,
260    parser: Parser,
261}
262
263impl StatsdDeserializer {
264    pub fn udp(sanitize: bool, convert_to: ConversionUnit) -> Self {
265        Self {
266            socket_mode: Some(SocketMode::Udp),
267            // The other modes emit a different `EventsReceived`.
268            events_received: Some(register!(EventsReceived)),
269            parser: Parser::new(sanitize, convert_to),
270        }
271    }
272
273    pub const fn tcp(sanitize: bool, convert_to: ConversionUnit) -> Self {
274        Self {
275            socket_mode: None,
276            events_received: None,
277            parser: Parser::new(sanitize, convert_to),
278        }
279    }
280
281    #[cfg(unix)]
282    pub const fn unix(sanitize: bool, convert_to: ConversionUnit) -> Self {
283        Self {
284            socket_mode: Some(SocketMode::Unix),
285            events_received: None,
286            parser: Parser::new(sanitize, convert_to),
287        }
288    }
289}
290
291impl decoding::format::Deserializer for StatsdDeserializer {
292    fn parse(
293        &self,
294        bytes: Bytes,
295        _log_namespace: LogNamespace,
296    ) -> crate::Result<SmallVec<[Event; 1]>> {
297        // The other modes already emit BytesReceived
298        if let Some(mode) = self.socket_mode
299            && mode == SocketMode::Udp
300        {
301            emit!(SocketBytesReceived {
302                mode,
303                byte_size: bytes.len(),
304            });
305        }
306
307        match std::str::from_utf8(&bytes).map_err(ParseError::InvalidUtf8) {
308            Err(error) => Err(Box::new(error)),
309            Ok(s) => match self.parser.parse(s) {
310                Ok(metric) => {
311                    let event = Event::Metric(metric);
312                    if let Some(er) = &self.events_received {
313                        let byte_size = event.estimated_json_encoded_size_of();
314                        er.emit(CountByteSize(1, byte_size));
315                    }
316                    Ok(smallvec![event])
317                }
318                Err(error) => Err(Box::new(error)),
319            },
320        }
321    }
322}
323
324async fn statsd_udp(
325    config: UdpConfig,
326    shutdown: ShutdownSignal,
327    mut out: SourceSender,
328) -> Result<(), ()> {
329    let listenfd = ListenFd::from_env();
330    let socket = try_bind_udp_socket(config.address, listenfd)
331        .map_err(|error| {
332            emit!(SocketBindError {
333                mode: SocketMode::Udp,
334                error
335            })
336        })
337        .await?;
338
339    if let Some(receive_buffer_bytes) = config.receive_buffer_bytes
340        && let Err(error) = net::set_receive_buffer_size(&socket, receive_buffer_bytes)
341    {
342        warn!(message = "Failed configuring receive buffer size on UDP socket.", %error);
343    }
344
345    info!(
346        message = "Listening.",
347        addr = %config.address,
348        r#type = "udp"
349    );
350
351    let codec = Decoder::new(
352        Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
353        Deserializer::Boxed(Box::new(StatsdDeserializer::udp(
354            config.sanitize,
355            config.convert_to,
356        ))),
357    );
358    let mut stream = UdpFramed::new(socket, codec).take_until(shutdown);
359    while let Some(frame) = stream.next().await {
360        match frame {
361            Ok(((events, _byte_size), _sock)) => {
362                let count = events.len();
363                if (out.send_batch(events).await).is_err() {
364                    emit!(StreamClosedError { count });
365                }
366            }
367            Err(error) => {
368                emit!(SocketReceiveError {
369                    mode: SocketMode::Udp,
370                    error
371                });
372            }
373        }
374    }
375
376    Ok(())
377}
378
379#[derive(Clone)]
380struct StatsdTcpSource {
381    sanitize: bool,
382    convert_to: ConversionUnit,
383}
384
385impl TcpSource for StatsdTcpSource {
386    type Error = vector_lib::codecs::decoding::Error;
387    type Item = SmallVec<[Event; 1]>;
388    type Decoder = Decoder;
389    type Acker = TcpNullAcker;
390
391    fn decoder(&self) -> Self::Decoder {
392        Decoder::new(
393            Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
394            Deserializer::Boxed(Box::new(StatsdDeserializer::tcp(
395                self.sanitize,
396                self.convert_to,
397            ))),
398        )
399    }
400
401    fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
402        TcpNullAcker
403    }
404}
405
406#[cfg(test)]
407mod test {
408    use futures::channel::mpsc;
409    use futures_util::SinkExt;
410    use tokio::{
411        io::AsyncWriteExt,
412        net::UdpSocket,
413        time::{Duration, Instant, sleep},
414    };
415    use vector_lib::{
416        config::ComponentKey,
417        event::{EventContainer, metric::TagValue},
418    };
419
420    use super::*;
421    use crate::{
422        series,
423        test_util::{
424            collect_limited,
425            components::{
426                COMPONENT_ERROR_TAGS, SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance,
427                assert_source_error,
428            },
429            metrics::{
430                AbsoluteMetricState, assert_counter, assert_distribution, assert_gauge, assert_set,
431            },
432            next_addr,
433        },
434    };
435
436    #[test]
437    fn generate_config() {
438        crate::test_util::test_generate_config::<StatsdConfig>();
439    }
440
441    #[tokio::test]
442    async fn test_statsd_udp() {
443        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
444            let in_addr = next_addr();
445            let config = StatsdConfig::Udp(UdpConfig::from_address(in_addr.into()));
446            let (sender, mut receiver) = mpsc::channel(200);
447            tokio::spawn(async move {
448                let bind_addr = next_addr();
449                let socket = UdpSocket::bind(bind_addr).await.unwrap();
450                socket.connect(in_addr).await.unwrap();
451                while let Some(bytes) = receiver.next().await {
452                    socket.send(bytes).await.unwrap();
453                }
454            });
455            test_statsd(config, sender).await;
456        })
457        .await;
458    }
459
460    #[tokio::test]
461    async fn test_statsd_tcp() {
462        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
463            let in_addr = next_addr();
464            let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
465            let (sender, mut receiver) = mpsc::channel(200);
466            tokio::spawn(async move {
467                while let Some(bytes) = receiver.next().await {
468                    tokio::net::TcpStream::connect(in_addr)
469                        .await
470                        .unwrap()
471                        .write_all(bytes)
472                        .await
473                        .unwrap();
474                }
475            });
476            test_statsd(config, sender).await;
477        })
478        .await;
479    }
480
481    #[tokio::test]
482    async fn test_statsd_error() {
483        assert_source_error(&COMPONENT_ERROR_TAGS, async move {
484            let in_addr = next_addr();
485            let config = StatsdConfig::Tcp(TcpConfig::from_address(in_addr.into()));
486            let (sender, mut receiver) = mpsc::channel(200);
487            tokio::spawn(async move {
488                while let Some(bytes) = receiver.next().await {
489                    tokio::net::TcpStream::connect(in_addr)
490                        .await
491                        .unwrap()
492                        .write_all(bytes)
493                        .await
494                        .unwrap();
495                }
496            });
497            test_invalid_statsd(config, sender).await;
498        })
499        .await;
500    }
501
502    #[cfg(unix)]
503    #[tokio::test]
504    async fn test_statsd_unix() {
505        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
506            let in_path = tempfile::tempdir().unwrap().keep().join("unix_test");
507            let config = StatsdConfig::Unix(UnixConfig {
508                path: in_path.clone(),
509                sanitize: true,
510                convert_to: ConversionUnit::Seconds,
511            });
512            let (sender, mut receiver) = mpsc::channel(200);
513            tokio::spawn(async move {
514                while let Some(bytes) = receiver.next().await {
515                    tokio::net::UnixStream::connect(&in_path)
516                        .await
517                        .unwrap()
518                        .write_all(bytes)
519                        .await
520                        .unwrap();
521                }
522            });
523            test_statsd(config, sender).await;
524        })
525        .await;
526    }
527
528    #[tokio::test]
529    async fn test_statsd_udp_conversion_disabled() {
530        let in_addr = next_addr();
531        let mut config = UdpConfig::from_address(in_addr.into());
532        config.convert_to = ConversionUnit::Milliseconds;
533        let statsd_config = StatsdConfig::Udp(config);
534        let (mut sender, mut receiver) = mpsc::channel(200);
535
536        tokio::spawn(async move {
537            let bind_addr = next_addr();
538            let socket = UdpSocket::bind(bind_addr).await.unwrap();
539            socket.connect(in_addr).await.unwrap();
540            while let Some(bytes) = receiver.next().await {
541                socket.send(bytes).await.unwrap();
542            }
543        });
544
545        let component_key = ComponentKey::from("statsd_conversion_disabled");
546        let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096);
547        let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
548        let sink = statsd_config
549            .build(source_ctx)
550            .await
551            .expect("failed to build source");
552
553        tokio::spawn(async move {
554            sink.await.expect("sink should not fail");
555        });
556
557        sleep(Duration::from_millis(250)).await;
558        sender.send(b"timer:320|ms|@0.1\n").await.unwrap();
559        sleep(Duration::from_millis(250)).await;
560        shutdown
561            .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
562            .await;
563        let state = collect_limited(rx)
564            .await
565            .into_iter()
566            .flat_map(EventContainer::into_events)
567            .collect::<AbsoluteMetricState>();
568        let metrics = state.finish();
569        assert_distribution(
570            &metrics,
571            series!("timer"),
572            3200.0,
573            10,
574            &[(1.0, 0), (2.0, 0), (4.0, 0), (f64::INFINITY, 10)],
575        );
576    }
577
578    async fn test_statsd(statsd_config: StatsdConfig, mut sender: mpsc::Sender<&'static [u8]>) {
579        // Build our statsd source and then spawn it.  We use a big pipeline buffer because each
580        // packet we send has a lot of metrics per packet.  We could technically count them all up
581        // and have a more accurate number here, but honestly, who cares?  This is big enough.
582        let component_key = ComponentKey::from("statsd");
583        let (tx, rx) = SourceSender::new_test_sender_with_buffer(4096);
584        let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
585        let sink = statsd_config
586            .build(source_ctx)
587            .await
588            .expect("failed to build statsd source");
589
590        tokio::spawn(async move {
591            sink.await.expect("sink should not fail");
592        });
593
594        // Wait like 250ms to give the sink time to start running and become ready to handle
595        // traffic.
596        //
597        // TODO: It'd be neat if we could make `ShutdownSignal` track when it was polled at least once,
598        // and then surface that (via one of the related types, maybe) somehow so we could use it as
599        // a signal for "the sink is ready, it's polled the shutdown future at least once, which
600        // means it's trying to accept connections, etc" and would be far more deterministic than this.
601        sleep(Duration::from_millis(250)).await;
602
603        // Send all of the messages.
604        for _ in 0..100 {
605            sender.send(
606                b"foo:1|c|#a,b:b\nbar:42|g\nfoo:1|c|#a,b:c\nglork:3|h|@0.1\nmilliglork:3000|ms|@0.2\nset:0|s\nset:1|s\n"
607            ).await.unwrap();
608
609            // Space things out slightly to try to avoid dropped packets.
610            sleep(Duration::from_millis(10)).await;
611        }
612
613        // Now wait for another small period of time to make sure we've processed the messages.
614        // After that, trigger shutdown so our source closes and allows us to deterministically read
615        // everything that was in up without having to know the exact count.
616        sleep(Duration::from_millis(250)).await;
617        shutdown
618            .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
619            .await;
620
621        // Read all the events into a `MetricState`, which handles normalizing metrics and tracking
622        // cumulative values for incremental metrics, etc.  This will represent the final/cumulative
623        // values for each metric sent by the source into the pipeline.
624        let state = collect_limited(rx)
625            .await
626            .into_iter()
627            .flat_map(EventContainer::into_events)
628            .collect::<AbsoluteMetricState>();
629        let metrics = state.finish();
630
631        assert_counter(
632            &metrics,
633            series!(
634                "foo",
635                "a" => TagValue::Bare,
636                "b" => "b"
637            ),
638            100.0,
639        );
640
641        assert_counter(
642            &metrics,
643            series!(
644                "foo",
645                "a" => TagValue::Bare,
646                "b" => "c"
647            ),
648            100.0,
649        );
650
651        assert_gauge(&metrics, series!("bar"), 42.0);
652        assert_distribution(
653            &metrics,
654            series!("glork"),
655            3000.0,
656            1000,
657            &[(1.0, 0), (2.0, 0), (4.0, 1000), (f64::INFINITY, 1000)],
658        );
659        assert_distribution(
660            &metrics,
661            series!("milliglork"),
662            1500.0,
663            500,
664            &[(1.0, 0), (2.0, 0), (4.0, 500), (f64::INFINITY, 500)],
665        );
666        assert_set(&metrics, series!("set"), &["0", "1"]);
667    }
668
669    async fn test_invalid_statsd(
670        statsd_config: StatsdConfig,
671        mut sender: mpsc::Sender<&'static [u8]>,
672    ) {
673        // Build our statsd source and then spawn it.  We use a big pipeline buffer because each
674        // packet we send has a lot of metrics per packet.  We could technically count them all up
675        // and have a more accurate number here, but honestly, who cares?  This is big enough.
676        let component_key = ComponentKey::from("statsd");
677        let (tx, _rx) = SourceSender::new_test_sender_with_buffer(4096);
678        let (source_ctx, shutdown) = SourceContext::new_shutdown(&component_key, tx);
679        let sink = statsd_config
680            .build(source_ctx)
681            .await
682            .expect("failed to build statsd source");
683
684        tokio::spawn(async move {
685            sink.await.expect("sink should not fail");
686        });
687
688        // Wait like 250ms to give the sink time to start running and become ready to handle
689        // traffic.
690        //
691        // TODO: It'd be neat if we could make `ShutdownSignal` track when it was polled at least once,
692        // and then surface that (via one of the related types, maybe) somehow so we could use it as
693        // a signal for "the sink is ready, it's polled the shutdown future at least once, which
694        // means it's trying to accept connections, etc" and would be far more deterministic than this.
695        sleep(Duration::from_millis(250)).await;
696
697        // Send 10 invalid statsd messages
698        for _ in 0..10 {
699            sender.send(b"invalid statsd message").await.unwrap();
700
701            // Space things out slightly to try to avoid dropped packets.
702            sleep(Duration::from_millis(10)).await;
703        }
704
705        // Now wait for another small period of time to make sure we've processed the messages.
706        // After that, trigger shutdown so our source closes and allows us to deterministically read
707        // everything that was in up without having to know the exact count.
708        sleep(Duration::from_millis(250)).await;
709        shutdown
710            .shutdown_all(Some(Instant::now() + Duration::from_millis(100)))
711            .await;
712    }
713}