vector/sources/dnstap/
tcp.rs1use std::time::Duration;
2
3use bytes::Bytes;
4use ipnet::IpNet;
5use serde_with::serde_as;
6use vector_lib::{
7 EstimatedJsonEncodedSizeOf,
8 config::{LegacyKey, LogNamespace},
9 configurable::configurable_component,
10 ipallowlist::IpAllowlistConfig,
11 lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
12 tcp::TcpKeepaliveConfig,
13 tls::{CertificateMetadata, MaybeTlsSettings, TlsSourceConfig},
14};
15use vrl::{path::OwnedValuePath, value::ObjectMap};
16
17use crate::{
18 event::Event,
19 internal_events::{SocketEventsReceived, SocketMode},
20 sources::util::{
21 framestream::{FrameHandler, TcpFrameHandler},
22 net::SocketListenAddr,
23 },
24};
25
26#[serde_as]
28#[configurable_component]
29#[derive(Clone, Debug)]
30pub struct TcpConfig {
31 #[configurable(derived)]
32 address: SocketListenAddr,
33
34 #[configurable(derived)]
35 keepalive: Option<TcpKeepaliveConfig>,
36
37 #[serde(default = "default_shutdown_timeout_secs")]
39 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
40 #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
41 shutdown_timeout_secs: Duration,
42
43 #[serde(default = "default_port_key")]
51 pub port_key: OptionalValuePath,
52
53 #[configurable(derived)]
54 permit_origin: Option<IpAllowlistConfig>,
55
56 #[configurable(derived)]
57 tls: Option<TlsSourceConfig>,
58
59 #[configurable(metadata(docs::type_unit = "bytes"))]
61 receive_buffer_bytes: Option<usize>,
62
63 #[configurable(metadata(docs::type_unit = "seconds"))]
67 max_connection_duration_secs: Option<u64>,
68
69 #[configurable(metadata(docs::type_unit = "connections"))]
71 pub connection_limit: Option<u32>,
72}
73
74const fn default_shutdown_timeout_secs() -> Duration {
75 Duration::from_secs(30)
76}
77
78fn default_port_key() -> OptionalValuePath {
79 OptionalValuePath::from(owned_value_path!("port"))
80}
81
82impl TcpConfig {
83 pub fn from_address(address: SocketListenAddr) -> Self {
84 Self {
85 address,
86 keepalive: None,
87 shutdown_timeout_secs: default_shutdown_timeout_secs(),
88 port_key: default_port_key(),
89 permit_origin: None,
90 tls: None,
91 receive_buffer_bytes: None,
92 max_connection_duration_secs: None,
93 connection_limit: None,
94 }
95 }
96
97 pub const fn port_key(&self) -> &OptionalValuePath {
98 &self.port_key
99 }
100
101 pub const fn tls(&self) -> &Option<TlsSourceConfig> {
102 &self.tls
103 }
104
105 pub const fn address(&self) -> SocketListenAddr {
106 self.address
107 }
108
109 pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
110 self.keepalive
111 }
112
113 pub const fn shutdown_timeout_secs(&self) -> Duration {
114 self.shutdown_timeout_secs
115 }
116
117 pub const fn receive_buffer_bytes(&self) -> Option<usize> {
118 self.receive_buffer_bytes
119 }
120
121 pub const fn max_connection_duration_secs(&self) -> Option<u64> {
122 self.max_connection_duration_secs
123 }
124}
125
126#[derive(Clone)]
127pub struct DnstapFrameHandler<T: FrameHandler + Clone> {
128 frame_handler: T,
129 address: SocketListenAddr,
130 keepalive: Option<TcpKeepaliveConfig>,
131 shutdown_timeout_secs: Duration,
132 tls: MaybeTlsSettings,
133 tls_client_metadata_key: Option<OwnedValuePath>,
134 tls_client_metadata: Option<ObjectMap>,
135 receive_buffer_bytes: Option<usize>,
136 max_connection_duration_secs: Option<u64>,
137 max_connections: Option<u32>,
138 allowlist: Option<Vec<IpNet>>,
139 log_namespace: LogNamespace,
140}
141
142impl<T: FrameHandler + Clone> DnstapFrameHandler<T> {
143 pub fn new(
144 config: TcpConfig,
145 tls: MaybeTlsSettings,
146 frame_handler: T,
147 log_namespace: LogNamespace,
148 ) -> Self {
149 let tls_client_metadata_key = config
150 .tls()
151 .as_ref()
152 .and_then(|tls| tls.client_metadata_key.clone())
153 .and_then(|k| k.path);
154
155 Self {
156 frame_handler,
157 address: config.address,
158 keepalive: config.keepalive,
159 shutdown_timeout_secs: config.shutdown_timeout_secs,
160 tls,
161 tls_client_metadata_key,
162 tls_client_metadata: None,
163 receive_buffer_bytes: config.receive_buffer_bytes,
164 max_connection_duration_secs: config.max_connection_duration_secs,
165 max_connections: config.connection_limit,
166 allowlist: config.permit_origin.map(Into::into),
167 log_namespace,
168 }
169 }
170}
171
172impl<T: FrameHandler + Clone> FrameHandler for DnstapFrameHandler<T> {
173 fn content_type(&self) -> String {
174 self.frame_handler.content_type()
175 }
176
177 fn max_frame_length(&self) -> usize {
178 self.frame_handler.max_frame_length()
179 }
180
181 fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
186 self.frame_handler
187 .handle_event(received_from, frame)
188 .map(|mut event| {
189 if let Event::Log(mut log_event) = event {
190 if let Some(tls_client_metadata) = &self.tls_client_metadata {
191 self.log_namespace.insert_source_metadata(
192 super::DnstapConfig::NAME,
193 &mut log_event,
194 self.tls_client_metadata_key
195 .as_ref()
196 .map(LegacyKey::Overwrite),
197 path!("tls_client_metadata"),
198 tls_client_metadata.clone(),
199 );
200 }
201
202 emit!(SocketEventsReceived {
203 mode: SocketMode::Tcp,
204 byte_size: log_event.estimated_json_encoded_size_of(),
205 count: 1
206 });
207
208 event = Event::from(log_event);
209 }
210 event
211 })
212 }
213
214 fn multithreaded(&self) -> bool {
215 self.frame_handler.multithreaded()
216 }
217
218 fn max_frame_handling_tasks(&self) -> usize {
219 self.frame_handler.max_frame_handling_tasks()
220 }
221
222 fn host_key(&self) -> &Option<OwnedValuePath> {
223 self.frame_handler.host_key()
224 }
225
226 fn source_type_key(&self) -> Option<&OwnedValuePath> {
227 self.frame_handler.source_type_key()
228 }
229
230 fn timestamp_key(&self) -> Option<&OwnedValuePath> {
231 self.frame_handler.timestamp_key()
232 }
233}
234
235impl<T: FrameHandler + Clone> TcpFrameHandler for DnstapFrameHandler<T> {
236 fn address(&self) -> SocketListenAddr {
237 self.address
238 }
239
240 fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
241 self.keepalive
242 }
243
244 fn shutdown_timeout_secs(&self) -> Duration {
245 self.shutdown_timeout_secs
246 }
247
248 fn tls(&self) -> MaybeTlsSettings {
249 self.tls.clone()
250 }
251
252 fn tls_client_metadata_key(&self) -> Option<OwnedValuePath> {
253 self.tls_client_metadata_key.clone()
254 }
255
256 fn receive_buffer_bytes(&self) -> Option<usize> {
257 self.receive_buffer_bytes
258 }
259
260 fn max_connection_duration_secs(&self) -> Option<u64> {
261 self.max_connection_duration_secs
262 }
263
264 fn max_connections(&self) -> Option<u32> {
265 self.max_connections
266 }
267
268 fn insert_tls_client_metadata(&mut self, metadata: Option<CertificateMetadata>) {
269 self.tls_client_metadata = metadata.map(|c| {
270 let mut metadata = ObjectMap::new();
271 metadata.insert("subject".into(), c.subject().into());
272 metadata
273 });
274 }
275
276 fn allowed_origins(&self) -> Option<&[IpNet]> {
277 self.allowlist.as_deref()
278 }
279}