vector/sources/statsd/
unix.rs

1use std::path::PathBuf;
2
3use vector_lib::codecs::{
4    decoding::{Deserializer, Framer},
5    NewlineDelimitedDecoder,
6};
7use vector_lib::configurable::configurable_component;
8
9use super::{default_convert_to, default_sanitize, ConversionUnit, StatsdDeserializer};
10use crate::{
11    codecs::Decoder,
12    shutdown::ShutdownSignal,
13    sources::{util::build_unix_stream_source, Source},
14    SourceSender,
15};
16
17/// Unix domain socket configuration for the `statsd` source.
18#[configurable_component]
19#[derive(Clone, Debug)]
20pub struct UnixConfig {
21    /// The Unix socket path.
22    ///
23    /// This should be an absolute path.
24    #[configurable(metadata(docs::examples = "/path/to/socket"))]
25    pub path: PathBuf,
26
27    #[serde(default = "default_sanitize")]
28    #[configurable(derived)]
29    pub sanitize: bool,
30
31    #[serde(default = "default_convert_to")]
32    #[configurable(derived)]
33    pub convert_to: ConversionUnit,
34}
35
36pub fn statsd_unix(
37    config: UnixConfig,
38    shutdown: ShutdownSignal,
39    out: SourceSender,
40) -> crate::Result<Source> {
41    let decoder = Decoder::new(
42        Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
43        Deserializer::Boxed(Box::new(StatsdDeserializer::unix(
44            config.sanitize,
45            config.convert_to,
46        ))),
47    );
48
49    build_unix_stream_source(
50        config.path,
51        None,
52        decoder,
53        |_events, _host| {},
54        shutdown,
55        out,
56    )
57}