vector/sources/statsd/
unix.rs

1use std::path::PathBuf;
2
3use vector_lib::{
4    codecs::{
5        NewlineDelimitedDecoder,
6        decoding::{Deserializer, Framer},
7    },
8    configurable::configurable_component,
9};
10
11use super::{ConversionUnit, StatsdDeserializer, default_convert_to, default_sanitize};
12use crate::{
13    SourceSender,
14    codecs::Decoder,
15    shutdown::ShutdownSignal,
16    sources::{Source, util::build_unix_stream_source},
17};
18
19/// Unix domain socket configuration for the `statsd` source.
20#[configurable_component]
21#[derive(Clone, Debug)]
22pub struct UnixConfig {
23    /// The Unix socket path.
24    ///
25    /// This should be an absolute path.
26    #[configurable(metadata(docs::examples = "/path/to/socket"))]
27    pub path: PathBuf,
28
29    #[serde(default = "default_sanitize")]
30    #[configurable(derived)]
31    pub sanitize: bool,
32
33    #[serde(default = "default_convert_to")]
34    #[configurable(derived)]
35    pub convert_to: ConversionUnit,
36}
37
38pub fn statsd_unix(
39    config: UnixConfig,
40    shutdown: ShutdownSignal,
41    out: SourceSender,
42) -> crate::Result<Source> {
43    let decoder = Decoder::new(
44        Framer::NewlineDelimited(NewlineDelimitedDecoder::new()),
45        Deserializer::Boxed(Box::new(StatsdDeserializer::unix(
46            config.sanitize,
47            config.convert_to,
48        ))),
49    );
50
51    build_unix_stream_source(
52        config.path,
53        None,
54        decoder,
55        |_events, _host| {},
56        shutdown,
57        out,
58    )
59}