vector/sinks/statsd/
config.rs

1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2
3use async_trait::async_trait;
4use vector_lib::{
5    config::{AcknowledgementsConfig, Input},
6    configurable::{component::GenerateConfig, configurable_component},
7    internal_event::Protocol,
8    sink::VectorSink,
9};
10
11use super::{request_builder::StatsdRequestBuilder, service::StatsdService, sink::StatsdSink};
12#[cfg(unix)]
13use crate::sinks::util::service::net::UnixConnectorConfig;
14use crate::{
15    config::{SinkConfig, SinkContext},
16    internal_events::SocketMode,
17    sinks::{
18        Healthcheck,
19        util::{
20            BatchConfig, SinkBatchSettings,
21            service::net::{NetworkConnector, TcpConnectorConfig, UdpConnectorConfig},
22        },
23    },
24};
25
26#[derive(Clone, Copy, Debug, Default)]
27pub struct StatsdDefaultBatchSettings;
28
29impl SinkBatchSettings for StatsdDefaultBatchSettings {
30    const MAX_EVENTS: Option<usize> = Some(1000);
31    const MAX_BYTES: Option<usize> = Some(1300);
32    const TIMEOUT_SECS: f64 = 1.0;
33}
34
35/// Configuration for the `statsd` sink.
36#[configurable_component(sink("statsd", "Deliver metric data to a StatsD aggregator."))]
37#[derive(Clone, Debug)]
38pub struct StatsdSinkConfig {
39    /// Sets the default namespace for any metrics sent.
40    ///
41    /// This namespace is only used if a metric has no existing namespace. When a namespace is
42    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
43    #[serde(alias = "namespace")]
44    #[configurable(metadata(docs::examples = "service"))]
45    pub default_namespace: Option<String>,
46
47    #[serde(flatten)]
48    pub mode: Mode,
49
50    #[configurable(derived)]
51    #[serde(default)]
52    pub batch: BatchConfig<StatsdDefaultBatchSettings>,
53
54    #[configurable(derived)]
55    #[serde(
56        default,
57        deserialize_with = "crate::serde::bool_or_struct",
58        skip_serializing_if = "crate::serde::is_default"
59    )]
60    pub acknowledgements: AcknowledgementsConfig,
61}
62
63/// Socket mode.
64#[configurable_component]
65#[derive(Clone, Debug)]
66#[serde(tag = "mode", rename_all = "snake_case")]
67#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
68pub enum Mode {
69    /// Send over TCP.
70    Tcp(TcpConnectorConfig),
71
72    /// Send over UDP.
73    Udp(UdpConnectorConfig),
74
75    /// Send over a Unix domain socket (UDS).
76    #[cfg(unix)]
77    Unix(UnixConnectorConfig),
78}
79
80impl Mode {
81    const fn as_socket_mode(&self) -> SocketMode {
82        match self {
83            Self::Tcp(_) => SocketMode::Tcp,
84            Self::Udp(_) => SocketMode::Udp,
85            #[cfg(unix)]
86            Self::Unix(_) => SocketMode::Unix,
87        }
88    }
89
90    fn as_connector(&self) -> NetworkConnector {
91        match self {
92            Self::Tcp(config) => config.as_connector(),
93            Self::Udp(config) => config.as_connector(),
94            #[cfg(unix)]
95            Self::Unix(config) => config.as_connector(),
96        }
97    }
98}
99
100const fn default_address() -> SocketAddr {
101    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8125)
102}
103
104impl GenerateConfig for StatsdSinkConfig {
105    fn generate_config() -> toml::Value {
106        let address = default_address();
107
108        toml::Value::try_from(Self {
109            default_namespace: None,
110            mode: Mode::Udp(UdpConnectorConfig::from_address(
111                address.ip().to_string(),
112                address.port(),
113            )),
114            batch: Default::default(),
115            acknowledgements: Default::default(),
116        })
117        .unwrap()
118    }
119}
120
121#[async_trait]
122#[typetag::serde(name = "statsd")]
123impl SinkConfig for StatsdSinkConfig {
124    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
125        let batcher_settings = self.batch.into_batcher_settings()?;
126
127        let socket_mode = self.mode.as_socket_mode();
128        let request_builder =
129            StatsdRequestBuilder::new(self.default_namespace.clone(), socket_mode);
130        let protocol = Protocol::from(socket_mode.as_str());
131
132        let connector = self.mode.as_connector();
133        let service = connector.service();
134        let healthcheck = connector.healthcheck();
135
136        let sink = StatsdSink::new(
137            StatsdService::from_transport(service),
138            batcher_settings,
139            request_builder,
140            protocol,
141        );
142        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
143    }
144
145    fn input(&self) -> Input {
146        Input::metric()
147    }
148
149    fn acknowledgements(&self) -> &AcknowledgementsConfig {
150        &self.acknowledgements
151    }
152}
153
154#[cfg(test)]
155mod test {
156    use super::StatsdSinkConfig;
157
158    #[test]
159    fn generate_config() {
160        crate::test_util::test_generate_config::<StatsdSinkConfig>();
161    }
162}