vector/sources/dnstap/
unix.rs

1use std::path::PathBuf;
2
3use bytes::Bytes;
4use vector_lib::{
5    EstimatedJsonEncodedSizeOf, configurable::configurable_component, lookup::OwnedValuePath,
6};
7
8use crate::{
9    event::Event,
10    internal_events::{SocketEventsReceived, SocketMode},
11    sources::util::framestream::{FrameHandler, UnixFrameHandler},
12};
13
14/// Unix domain socket configuration for the `dnstap` source.
15#[configurable_component]
16#[derive(Clone, Debug)]
17#[serde(deny_unknown_fields)]
18pub struct UnixConfig {
19    /// Absolute path to the socket file to read DNSTAP data from.
20    ///
21    /// The DNS server must be configured to send its DNSTAP data to this socket file. The socket file is created
22    /// if it doesn't already exist when the source first starts.
23    pub socket_path: PathBuf,
24
25    /// Unix file mode bits to be applied to the unix socket file as its designated file permissions.
26    ///
27    /// Note: The file mode value can be specified in any numeric format supported by your configuration
28    /// language, but it is most intuitive to use an octal number.
29    pub socket_file_mode: Option<u32>,
30
31    /// The size, in bytes, of the receive buffer used for the socket.
32    ///
33    /// This should not typically needed to be changed.
34    #[configurable(metadata(docs::type_unit = "bytes"))]
35    pub socket_receive_buffer_size: Option<usize>,
36
37    /// The size, in bytes, of the send buffer used for the socket.
38    ///
39    /// This should not typically needed to be changed.
40    #[configurable(metadata(docs::type_unit = "bytes"))]
41    pub socket_send_buffer_size: Option<usize>,
42}
43
44impl UnixConfig {
45    pub fn new(socket_path: PathBuf) -> Self {
46        Self {
47            socket_path,
48            ..Self::default()
49        }
50    }
51}
52
53impl Default for UnixConfig {
54    fn default() -> Self {
55        Self {
56            socket_path: PathBuf::from("/run/bind/dnstap.sock"),
57            socket_file_mode: None,
58            socket_receive_buffer_size: None,
59            socket_send_buffer_size: None,
60        }
61    }
62}
63
64#[derive(Clone)]
65pub struct DnstapFrameHandler<T: FrameHandler + Clone> {
66    frame_handler: T,
67    socket_path: PathBuf,
68    socket_file_mode: Option<u32>,
69    socket_receive_buffer_size: Option<usize>,
70    socket_send_buffer_size: Option<usize>,
71}
72
73impl<T: FrameHandler + Clone> DnstapFrameHandler<T> {
74    pub fn new(config: UnixConfig, frame_handler: T) -> Self {
75        Self {
76            frame_handler,
77            socket_path: config.socket_path.clone(),
78            socket_file_mode: config.socket_file_mode,
79            socket_receive_buffer_size: config.socket_receive_buffer_size,
80            socket_send_buffer_size: config.socket_send_buffer_size,
81        }
82    }
83}
84
85impl<T: FrameHandler + Clone> FrameHandler for DnstapFrameHandler<T> {
86    fn content_type(&self) -> String {
87        self.frame_handler.content_type()
88    }
89
90    fn max_frame_length(&self) -> usize {
91        self.frame_handler.max_frame_length()
92    }
93
94    /**
95     * Function to pass into util::framestream::build_framestream_unix_source
96     * Takes a data frame from the unix socket and turns it into a Vector Event.
97     **/
98    fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
99        self.frame_handler
100            .handle_event(received_from, frame)
101            .map(|event| {
102                if let Event::Log(ref log_event) = event {
103                    emit!(SocketEventsReceived {
104                        mode: SocketMode::Unix,
105                        byte_size: log_event.estimated_json_encoded_size_of(),
106                        count: 1
107                    })
108                }
109                event
110            })
111    }
112
113    fn multithreaded(&self) -> bool {
114        self.frame_handler.multithreaded()
115    }
116
117    fn max_frame_handling_tasks(&self) -> usize {
118        self.frame_handler.max_frame_handling_tasks()
119    }
120
121    fn host_key(&self) -> &Option<OwnedValuePath> {
122        self.frame_handler.host_key()
123    }
124
125    fn source_type_key(&self) -> Option<&OwnedValuePath> {
126        self.frame_handler.source_type_key()
127    }
128
129    fn timestamp_key(&self) -> Option<&OwnedValuePath> {
130        self.frame_handler.timestamp_key()
131    }
132}
133
134impl<T: FrameHandler + Clone> UnixFrameHandler for DnstapFrameHandler<T> {
135    fn socket_path(&self) -> PathBuf {
136        self.socket_path.clone()
137    }
138
139    fn socket_file_mode(&self) -> Option<u32> {
140        self.socket_file_mode
141    }
142
143    fn socket_receive_buffer_size(&self) -> Option<usize> {
144        self.socket_receive_buffer_size
145    }
146
147    fn socket_send_buffer_size(&self) -> Option<usize> {
148        self.socket_send_buffer_size
149    }
150}