vector/sources/dnstap/
unix.rs1use std::path::PathBuf;
2
3use bytes::Bytes;
4use vector_lib::{
5 EstimatedJsonEncodedSizeOf, configurable::configurable_component, lookup::OwnedValuePath,
6};
7
8use crate::{
9 event::Event,
10 internal_events::{SocketEventsReceived, SocketMode},
11 sources::util::framestream::{FrameHandler, UnixFrameHandler},
12};
13
14#[configurable_component]
16#[derive(Clone, Debug)]
17#[serde(deny_unknown_fields)]
18pub struct UnixConfig {
19 pub socket_path: PathBuf,
24
25 pub socket_file_mode: Option<u32>,
30
31 #[configurable(metadata(docs::type_unit = "bytes"))]
35 pub socket_receive_buffer_size: Option<usize>,
36
37 #[configurable(metadata(docs::type_unit = "bytes"))]
41 pub socket_send_buffer_size: Option<usize>,
42}
43
44impl UnixConfig {
45 pub fn new(socket_path: PathBuf) -> Self {
46 Self {
47 socket_path,
48 ..Self::default()
49 }
50 }
51}
52
53impl Default for UnixConfig {
54 fn default() -> Self {
55 Self {
56 socket_path: PathBuf::from("/run/bind/dnstap.sock"),
57 socket_file_mode: None,
58 socket_receive_buffer_size: None,
59 socket_send_buffer_size: None,
60 }
61 }
62}
63
64#[derive(Clone)]
65pub struct DnstapFrameHandler<T: FrameHandler + Clone> {
66 frame_handler: T,
67 socket_path: PathBuf,
68 socket_file_mode: Option<u32>,
69 socket_receive_buffer_size: Option<usize>,
70 socket_send_buffer_size: Option<usize>,
71}
72
73impl<T: FrameHandler + Clone> DnstapFrameHandler<T> {
74 pub fn new(config: UnixConfig, frame_handler: T) -> Self {
75 Self {
76 frame_handler,
77 socket_path: config.socket_path.clone(),
78 socket_file_mode: config.socket_file_mode,
79 socket_receive_buffer_size: config.socket_receive_buffer_size,
80 socket_send_buffer_size: config.socket_send_buffer_size,
81 }
82 }
83}
84
85impl<T: FrameHandler + Clone> FrameHandler for DnstapFrameHandler<T> {
86 fn content_type(&self) -> String {
87 self.frame_handler.content_type()
88 }
89
90 fn max_frame_length(&self) -> usize {
91 self.frame_handler.max_frame_length()
92 }
93
94 fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
99 self.frame_handler
100 .handle_event(received_from, frame)
101 .map(|event| {
102 if let Event::Log(ref log_event) = event {
103 emit!(SocketEventsReceived {
104 mode: SocketMode::Unix,
105 byte_size: log_event.estimated_json_encoded_size_of(),
106 count: 1
107 })
108 }
109 event
110 })
111 }
112
113 fn multithreaded(&self) -> bool {
114 self.frame_handler.multithreaded()
115 }
116
117 fn max_frame_handling_tasks(&self) -> usize {
118 self.frame_handler.max_frame_handling_tasks()
119 }
120
121 fn host_key(&self) -> &Option<OwnedValuePath> {
122 self.frame_handler.host_key()
123 }
124
125 fn source_type_key(&self) -> Option<&OwnedValuePath> {
126 self.frame_handler.source_type_key()
127 }
128
129 fn timestamp_key(&self) -> Option<&OwnedValuePath> {
130 self.frame_handler.timestamp_key()
131 }
132}
133
134impl<T: FrameHandler + Clone> UnixFrameHandler for DnstapFrameHandler<T> {
135 fn socket_path(&self) -> PathBuf {
136 self.socket_path.clone()
137 }
138
139 fn socket_file_mode(&self) -> Option<u32> {
140 self.socket_file_mode
141 }
142
143 fn socket_receive_buffer_size(&self) -> Option<usize> {
144 self.socket_receive_buffer_size
145 }
146
147 fn socket_send_buffer_size(&self) -> Option<usize> {
148 self.socket_send_buffer_size
149 }
150}