vector/sources/dnstap/
unix.rs

1use std::path::PathBuf;
2
3use bytes::Bytes;
4use vector_lib::configurable::configurable_component;
5use vector_lib::lookup::OwnedValuePath;
6
7use crate::sources::util::framestream::FrameHandler;
8use crate::{
9    event::Event,
10    internal_events::{SocketEventsReceived, SocketMode},
11    sources::util::framestream::UnixFrameHandler,
12};
13
14use vector_lib::EstimatedJsonEncodedSizeOf;
15
16/// Unix domain socket configuration for the `dnstap` source.
17#[configurable_component]
18#[derive(Clone, Debug)]
19#[serde(deny_unknown_fields)]
20pub struct UnixConfig {
21    /// Absolute path to the socket file to read DNSTAP data from.
22    ///
23    /// The DNS server must be configured to send its DNSTAP data to this socket file. The socket file is created
24    /// if it doesn't already exist when the source first starts.
25    pub socket_path: PathBuf,
26
27    /// Unix file mode bits to be applied to the unix socket file as its designated file permissions.
28    ///
29    /// Note: The file mode value can be specified in any numeric format supported by your configuration
30    /// language, but it is most intuitive to use an octal number.
31    pub socket_file_mode: Option<u32>,
32
33    /// The size, in bytes, of the receive buffer used for the socket.
34    ///
35    /// This should not typically needed to be changed.
36    #[configurable(metadata(docs::type_unit = "bytes"))]
37    pub socket_receive_buffer_size: Option<usize>,
38
39    /// The size, in bytes, of the send buffer used for the socket.
40    ///
41    /// This should not typically needed to be changed.
42    #[configurable(metadata(docs::type_unit = "bytes"))]
43    pub socket_send_buffer_size: Option<usize>,
44}
45
46impl UnixConfig {
47    pub fn new(socket_path: PathBuf) -> Self {
48        Self {
49            socket_path,
50            ..Self::default()
51        }
52    }
53}
54
55impl Default for UnixConfig {
56    fn default() -> Self {
57        Self {
58            socket_path: PathBuf::from("/run/bind/dnstap.sock"),
59            socket_file_mode: None,
60            socket_receive_buffer_size: None,
61            socket_send_buffer_size: None,
62        }
63    }
64}
65
66#[derive(Clone)]
67pub struct DnstapFrameHandler<T: FrameHandler + Clone> {
68    frame_handler: T,
69    socket_path: PathBuf,
70    socket_file_mode: Option<u32>,
71    socket_receive_buffer_size: Option<usize>,
72    socket_send_buffer_size: Option<usize>,
73}
74
75impl<T: FrameHandler + Clone> DnstapFrameHandler<T> {
76    pub fn new(config: UnixConfig, frame_handler: T) -> Self {
77        Self {
78            frame_handler,
79            socket_path: config.socket_path.clone(),
80            socket_file_mode: config.socket_file_mode,
81            socket_receive_buffer_size: config.socket_receive_buffer_size,
82            socket_send_buffer_size: config.socket_send_buffer_size,
83        }
84    }
85}
86
87impl<T: FrameHandler + Clone> FrameHandler for DnstapFrameHandler<T> {
88    fn content_type(&self) -> String {
89        self.frame_handler.content_type()
90    }
91
92    fn max_frame_length(&self) -> usize {
93        self.frame_handler.max_frame_length()
94    }
95
96    /**
97     * Function to pass into util::framestream::build_framestream_unix_source
98     * Takes a data frame from the unix socket and turns it into a Vector Event.
99     **/
100    fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
101        self.frame_handler
102            .handle_event(received_from, frame)
103            .map(|event| {
104                if let Event::Log(ref log_event) = event {
105                    emit!(SocketEventsReceived {
106                        mode: SocketMode::Unix,
107                        byte_size: log_event.estimated_json_encoded_size_of(),
108                        count: 1
109                    })
110                }
111                event
112            })
113    }
114
115    fn multithreaded(&self) -> bool {
116        self.frame_handler.multithreaded()
117    }
118
119    fn max_frame_handling_tasks(&self) -> usize {
120        self.frame_handler.max_frame_handling_tasks()
121    }
122
123    fn host_key(&self) -> &Option<OwnedValuePath> {
124        self.frame_handler.host_key()
125    }
126
127    fn source_type_key(&self) -> Option<&OwnedValuePath> {
128        self.frame_handler.source_type_key()
129    }
130
131    fn timestamp_key(&self) -> Option<&OwnedValuePath> {
132        self.frame_handler.timestamp_key()
133    }
134}
135
136impl<T: FrameHandler + Clone> UnixFrameHandler for DnstapFrameHandler<T> {
137    fn socket_path(&self) -> PathBuf {
138        self.socket_path.clone()
139    }
140
141    fn socket_file_mode(&self) -> Option<u32> {
142        self.socket_file_mode
143    }
144
145    fn socket_receive_buffer_size(&self) -> Option<usize> {
146        self.socket_receive_buffer_size
147    }
148
149    fn socket_send_buffer_size(&self) -> Option<usize> {
150        self.socket_send_buffer_size
151    }
152}