vector/sources/dnstap/
unix.rs1use std::path::PathBuf;
2
3use bytes::Bytes;
4use vector_lib::configurable::configurable_component;
5use vector_lib::lookup::OwnedValuePath;
6
7use crate::sources::util::framestream::FrameHandler;
8use crate::{
9 event::Event,
10 internal_events::{SocketEventsReceived, SocketMode},
11 sources::util::framestream::UnixFrameHandler,
12};
13
14use vector_lib::EstimatedJsonEncodedSizeOf;
15
16#[configurable_component]
18#[derive(Clone, Debug)]
19#[serde(deny_unknown_fields)]
20pub struct UnixConfig {
21 pub socket_path: PathBuf,
26
27 pub socket_file_mode: Option<u32>,
32
33 #[configurable(metadata(docs::type_unit = "bytes"))]
37 pub socket_receive_buffer_size: Option<usize>,
38
39 #[configurable(metadata(docs::type_unit = "bytes"))]
43 pub socket_send_buffer_size: Option<usize>,
44}
45
46impl UnixConfig {
47 pub fn new(socket_path: PathBuf) -> Self {
48 Self {
49 socket_path,
50 ..Self::default()
51 }
52 }
53}
54
55impl Default for UnixConfig {
56 fn default() -> Self {
57 Self {
58 socket_path: PathBuf::from("/run/bind/dnstap.sock"),
59 socket_file_mode: None,
60 socket_receive_buffer_size: None,
61 socket_send_buffer_size: None,
62 }
63 }
64}
65
66#[derive(Clone)]
67pub struct DnstapFrameHandler<T: FrameHandler + Clone> {
68 frame_handler: T,
69 socket_path: PathBuf,
70 socket_file_mode: Option<u32>,
71 socket_receive_buffer_size: Option<usize>,
72 socket_send_buffer_size: Option<usize>,
73}
74
75impl<T: FrameHandler + Clone> DnstapFrameHandler<T> {
76 pub fn new(config: UnixConfig, frame_handler: T) -> Self {
77 Self {
78 frame_handler,
79 socket_path: config.socket_path.clone(),
80 socket_file_mode: config.socket_file_mode,
81 socket_receive_buffer_size: config.socket_receive_buffer_size,
82 socket_send_buffer_size: config.socket_send_buffer_size,
83 }
84 }
85}
86
87impl<T: FrameHandler + Clone> FrameHandler for DnstapFrameHandler<T> {
88 fn content_type(&self) -> String {
89 self.frame_handler.content_type()
90 }
91
92 fn max_frame_length(&self) -> usize {
93 self.frame_handler.max_frame_length()
94 }
95
96 fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
101 self.frame_handler
102 .handle_event(received_from, frame)
103 .map(|event| {
104 if let Event::Log(ref log_event) = event {
105 emit!(SocketEventsReceived {
106 mode: SocketMode::Unix,
107 byte_size: log_event.estimated_json_encoded_size_of(),
108 count: 1
109 })
110 }
111 event
112 })
113 }
114
115 fn multithreaded(&self) -> bool {
116 self.frame_handler.multithreaded()
117 }
118
119 fn max_frame_handling_tasks(&self) -> usize {
120 self.frame_handler.max_frame_handling_tasks()
121 }
122
123 fn host_key(&self) -> &Option<OwnedValuePath> {
124 self.frame_handler.host_key()
125 }
126
127 fn source_type_key(&self) -> Option<&OwnedValuePath> {
128 self.frame_handler.source_type_key()
129 }
130
131 fn timestamp_key(&self) -> Option<&OwnedValuePath> {
132 self.frame_handler.timestamp_key()
133 }
134}
135
136impl<T: FrameHandler + Clone> UnixFrameHandler for DnstapFrameHandler<T> {
137 fn socket_path(&self) -> PathBuf {
138 self.socket_path.clone()
139 }
140
141 fn socket_file_mode(&self) -> Option<u32> {
142 self.socket_file_mode
143 }
144
145 fn socket_receive_buffer_size(&self) -> Option<usize> {
146 self.socket_receive_buffer_size
147 }
148
149 fn socket_send_buffer_size(&self) -> Option<usize> {
150 self.socket_send_buffer_size
151 }
152}