vector/sinks/statsd/
config.rs1use std::net::{IpAddr, Ipv4Addr, SocketAddr};
2
3use async_trait::async_trait;
4use vector_lib::{
5 config::{AcknowledgementsConfig, Input},
6 configurable::{component::GenerateConfig, configurable_component},
7 internal_event::Protocol,
8 sink::VectorSink,
9};
10
11use super::{request_builder::StatsdRequestBuilder, service::StatsdService, sink::StatsdSink};
12#[cfg(unix)]
13use crate::sinks::util::service::net::UnixConnectorConfig;
14use crate::{
15 config::{SinkConfig, SinkContext},
16 internal_events::SocketMode,
17 sinks::{
18 Healthcheck,
19 util::{
20 BatchConfig, SinkBatchSettings,
21 service::net::{NetworkConnector, TcpConnectorConfig, UdpConnectorConfig},
22 },
23 },
24};
25
26#[derive(Clone, Copy, Debug, Default)]
27pub struct StatsdDefaultBatchSettings;
28
29impl SinkBatchSettings for StatsdDefaultBatchSettings {
30 const MAX_EVENTS: Option<usize> = Some(1000);
31 const MAX_BYTES: Option<usize> = Some(1300);
32 const TIMEOUT_SECS: f64 = 1.0;
33}
34
35#[configurable_component(sink("statsd", "Deliver metric data to a StatsD aggregator."))]
37#[derive(Clone, Debug)]
38pub struct StatsdSinkConfig {
39 #[serde(alias = "namespace")]
44 #[configurable(metadata(docs::examples = "service"))]
45 pub default_namespace: Option<String>,
46
47 #[serde(flatten)]
48 pub mode: Mode,
49
50 #[configurable(derived)]
51 #[serde(default)]
52 pub batch: BatchConfig<StatsdDefaultBatchSettings>,
53
54 #[configurable(derived)]
55 #[serde(
56 default,
57 deserialize_with = "crate::serde::bool_or_struct",
58 skip_serializing_if = "crate::serde::is_default"
59 )]
60 pub acknowledgements: AcknowledgementsConfig,
61}
62
63#[configurable_component]
65#[derive(Clone, Debug)]
66#[serde(tag = "mode", rename_all = "snake_case")]
67#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
68pub enum Mode {
69 Tcp(TcpConnectorConfig),
71
72 Udp(UdpConnectorConfig),
74
75 #[cfg(unix)]
77 Unix(UnixConnectorConfig),
78}
79
80impl Mode {
81 const fn as_socket_mode(&self) -> SocketMode {
82 match self {
83 Self::Tcp(_) => SocketMode::Tcp,
84 Self::Udp(_) => SocketMode::Udp,
85 #[cfg(unix)]
86 Self::Unix(_) => SocketMode::Unix,
87 }
88 }
89
90 fn as_connector(&self) -> NetworkConnector {
91 match self {
92 Self::Tcp(config) => config.as_connector(),
93 Self::Udp(config) => config.as_connector(),
94 #[cfg(unix)]
95 Self::Unix(config) => config.as_connector(),
96 }
97 }
98}
99
100const fn default_address() -> SocketAddr {
101 SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8125)
102}
103
104impl GenerateConfig for StatsdSinkConfig {
105 fn generate_config() -> toml::Value {
106 let address = default_address();
107
108 toml::Value::try_from(Self {
109 default_namespace: None,
110 mode: Mode::Udp(UdpConnectorConfig::from_address(
111 address.ip().to_string(),
112 address.port(),
113 )),
114 batch: Default::default(),
115 acknowledgements: Default::default(),
116 })
117 .unwrap()
118 }
119}
120
121#[async_trait]
122#[typetag::serde(name = "statsd")]
123impl SinkConfig for StatsdSinkConfig {
124 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
125 let batcher_settings = self.batch.into_batcher_settings()?;
126
127 let socket_mode = self.mode.as_socket_mode();
128 let request_builder =
129 StatsdRequestBuilder::new(self.default_namespace.clone(), socket_mode);
130 let protocol = Protocol::from(socket_mode.as_str());
131
132 let connector = self.mode.as_connector();
133 let service = connector.service();
134 let healthcheck = connector.healthcheck();
135
136 let sink = StatsdSink::new(
137 StatsdService::from_transport(service),
138 batcher_settings,
139 request_builder,
140 protocol,
141 );
142 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
143 }
144
145 fn input(&self) -> Input {
146 Input::metric()
147 }
148
149 fn acknowledgements(&self) -> &AcknowledgementsConfig {
150 &self.acknowledgements
151 }
152}
153
154#[cfg(test)]
155mod test {
156 use super::StatsdSinkConfig;
157
158 #[test]
159 fn generate_config() {
160 crate::test_util::test_generate_config::<StatsdSinkConfig>();
161 }
162}