vector/sinks/statsd/
config.rs

1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2
3use async_trait::async_trait;
4use vector_lib::configurable::{component::GenerateConfig, configurable_component};
5use vector_lib::internal_event::Protocol;
6use vector_lib::{
7    config::{AcknowledgementsConfig, Input},
8    sink::VectorSink,
9};
10
11use crate::{
12    config::{SinkConfig, SinkContext},
13    internal_events::SocketMode,
14    sinks::{
15        util::{
16            service::net::{NetworkConnector, TcpConnectorConfig, UdpConnectorConfig},
17            BatchConfig, SinkBatchSettings,
18        },
19        Healthcheck,
20    },
21};
22
23#[cfg(unix)]
24use crate::sinks::util::service::net::UnixConnectorConfig;
25
26use super::{request_builder::StatsdRequestBuilder, service::StatsdService, sink::StatsdSink};
27
28#[derive(Clone, Copy, Debug, Default)]
29pub struct StatsdDefaultBatchSettings;
30
31impl SinkBatchSettings for StatsdDefaultBatchSettings {
32    const MAX_EVENTS: Option<usize> = Some(1000);
33    const MAX_BYTES: Option<usize> = Some(1300);
34    const TIMEOUT_SECS: f64 = 1.0;
35}
36
37/// Configuration for the `statsd` sink.
38#[configurable_component(sink("statsd", "Deliver metric data to a StatsD aggregator."))]
39#[derive(Clone, Debug)]
40pub struct StatsdSinkConfig {
41    /// Sets the default namespace for any metrics sent.
42    ///
43    /// This namespace is only used if a metric has no existing namespace. When a namespace is
44    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
45    #[serde(alias = "namespace")]
46    #[configurable(metadata(docs::examples = "service"))]
47    pub default_namespace: Option<String>,
48
49    #[serde(flatten)]
50    pub mode: Mode,
51
52    #[configurable(derived)]
53    #[serde(default)]
54    pub batch: BatchConfig<StatsdDefaultBatchSettings>,
55
56    #[configurable(derived)]
57    #[serde(
58        default,
59        deserialize_with = "crate::serde::bool_or_struct",
60        skip_serializing_if = "crate::serde::is_default"
61    )]
62    pub acknowledgements: AcknowledgementsConfig,
63}
64
65/// Socket mode.
66#[configurable_component]
67#[derive(Clone, Debug)]
68#[serde(tag = "mode", rename_all = "snake_case")]
69#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
70pub enum Mode {
71    /// Send over TCP.
72    Tcp(TcpConnectorConfig),
73
74    /// Send over UDP.
75    Udp(UdpConnectorConfig),
76
77    /// Send over a Unix domain socket (UDS).
78    #[cfg(unix)]
79    Unix(UnixConnectorConfig),
80}
81
82impl Mode {
83    const fn as_socket_mode(&self) -> SocketMode {
84        match self {
85            Self::Tcp(_) => SocketMode::Tcp,
86            Self::Udp(_) => SocketMode::Udp,
87            #[cfg(unix)]
88            Self::Unix(_) => SocketMode::Unix,
89        }
90    }
91
92    fn as_connector(&self) -> NetworkConnector {
93        match self {
94            Self::Tcp(config) => config.as_connector(),
95            Self::Udp(config) => config.as_connector(),
96            #[cfg(unix)]
97            Self::Unix(config) => config.as_connector(),
98        }
99    }
100}
101
102const fn default_address() -> SocketAddr {
103    SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8125)
104}
105
106impl GenerateConfig for StatsdSinkConfig {
107    fn generate_config() -> toml::Value {
108        let address = default_address();
109
110        toml::Value::try_from(Self {
111            default_namespace: None,
112            mode: Mode::Udp(UdpConnectorConfig::from_address(
113                address.ip().to_string(),
114                address.port(),
115            )),
116            batch: Default::default(),
117            acknowledgements: Default::default(),
118        })
119        .unwrap()
120    }
121}
122
123#[async_trait]
124#[typetag::serde(name = "statsd")]
125impl SinkConfig for StatsdSinkConfig {
126    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
127        let batcher_settings = self.batch.into_batcher_settings()?;
128
129        let socket_mode = self.mode.as_socket_mode();
130        let request_builder =
131            StatsdRequestBuilder::new(self.default_namespace.clone(), socket_mode);
132        let protocol = Protocol::from(socket_mode.as_str());
133
134        let connector = self.mode.as_connector();
135        let service = connector.service();
136        let healthcheck = connector.healthcheck();
137
138        let sink = StatsdSink::new(
139            StatsdService::from_transport(service),
140            batcher_settings,
141            request_builder,
142            protocol,
143        );
144        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
145    }
146
147    fn input(&self) -> Input {
148        Input::metric()
149    }
150
151    fn acknowledgements(&self) -> &AcknowledgementsConfig {
152        &self.acknowledgements
153    }
154}
155
156#[cfg(test)]
157mod test {
158    use super::StatsdSinkConfig;
159
160    #[test]
161    fn generate_config() {
162        crate::test_util::test_generate_config::<StatsdSinkConfig>();
163    }
164}