vector/sources/dnstap/
tcp.rs1use ipnet::IpNet;
2use std::time::Duration;
3
4use bytes::Bytes;
5use serde_with::serde_as;
6use vector_lib::configurable::configurable_component;
7use vector_lib::ipallowlist::IpAllowlistConfig;
8use vector_lib::lookup::{owned_value_path, path};
9use vector_lib::tcp::TcpKeepaliveConfig;
10use vector_lib::tls::{CertificateMetadata, MaybeTlsSettings, TlsSourceConfig};
11use vector_lib::EstimatedJsonEncodedSizeOf;
12use vrl::path::OwnedValuePath;
13use vrl::value::ObjectMap;
14
15use crate::internal_events::{SocketEventsReceived, SocketMode};
16use crate::sources::util::framestream::{FrameHandler, TcpFrameHandler};
17use crate::{event::Event, sources::util::net::SocketListenAddr};
18
19use vector_lib::config::{LegacyKey, LogNamespace};
20use vector_lib::lookup::lookup_v2::OptionalValuePath;
21
22#[serde_as]
24#[configurable_component]
25#[derive(Clone, Debug)]
26pub struct TcpConfig {
27 #[configurable(derived)]
28 address: SocketListenAddr,
29
30 #[configurable(derived)]
31 keepalive: Option<TcpKeepaliveConfig>,
32
33 #[serde(default = "default_shutdown_timeout_secs")]
35 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
36 #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
37 shutdown_timeout_secs: Duration,
38
39 #[serde(default = "default_port_key")]
47 pub port_key: OptionalValuePath,
48
49 #[configurable(derived)]
50 permit_origin: Option<IpAllowlistConfig>,
51
52 #[configurable(derived)]
53 tls: Option<TlsSourceConfig>,
54
55 #[configurable(metadata(docs::type_unit = "bytes"))]
57 receive_buffer_bytes: Option<usize>,
58
59 #[configurable(metadata(docs::type_unit = "seconds"))]
63 max_connection_duration_secs: Option<u64>,
64
65 #[configurable(metadata(docs::type_unit = "connections"))]
67 pub connection_limit: Option<u32>,
68}
69
70const fn default_shutdown_timeout_secs() -> Duration {
71 Duration::from_secs(30)
72}
73
74fn default_port_key() -> OptionalValuePath {
75 OptionalValuePath::from(owned_value_path!("port"))
76}
77
78impl TcpConfig {
79 pub fn from_address(address: SocketListenAddr) -> Self {
80 Self {
81 address,
82 keepalive: None,
83 shutdown_timeout_secs: default_shutdown_timeout_secs(),
84 port_key: default_port_key(),
85 permit_origin: None,
86 tls: None,
87 receive_buffer_bytes: None,
88 max_connection_duration_secs: None,
89 connection_limit: None,
90 }
91 }
92
93 pub const fn port_key(&self) -> &OptionalValuePath {
94 &self.port_key
95 }
96
97 pub const fn tls(&self) -> &Option<TlsSourceConfig> {
98 &self.tls
99 }
100
101 pub const fn address(&self) -> SocketListenAddr {
102 self.address
103 }
104
105 pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
106 self.keepalive
107 }
108
109 pub const fn shutdown_timeout_secs(&self) -> Duration {
110 self.shutdown_timeout_secs
111 }
112
113 pub const fn receive_buffer_bytes(&self) -> Option<usize> {
114 self.receive_buffer_bytes
115 }
116
117 pub const fn max_connection_duration_secs(&self) -> Option<u64> {
118 self.max_connection_duration_secs
119 }
120}
121
122#[derive(Clone)]
123pub struct DnstapFrameHandler<T: FrameHandler + Clone> {
124 frame_handler: T,
125 address: SocketListenAddr,
126 keepalive: Option<TcpKeepaliveConfig>,
127 shutdown_timeout_secs: Duration,
128 tls: MaybeTlsSettings,
129 tls_client_metadata_key: Option<OwnedValuePath>,
130 tls_client_metadata: Option<ObjectMap>,
131 receive_buffer_bytes: Option<usize>,
132 max_connection_duration_secs: Option<u64>,
133 max_connections: Option<u32>,
134 allowlist: Option<Vec<IpNet>>,
135 log_namespace: LogNamespace,
136}
137
138impl<T: FrameHandler + Clone> DnstapFrameHandler<T> {
139 pub fn new(
140 config: TcpConfig,
141 tls: MaybeTlsSettings,
142 frame_handler: T,
143 log_namespace: LogNamespace,
144 ) -> Self {
145 let tls_client_metadata_key = config
146 .tls()
147 .as_ref()
148 .and_then(|tls| tls.client_metadata_key.clone())
149 .and_then(|k| k.path);
150
151 Self {
152 frame_handler,
153 address: config.address,
154 keepalive: config.keepalive,
155 shutdown_timeout_secs: config.shutdown_timeout_secs,
156 tls,
157 tls_client_metadata_key,
158 tls_client_metadata: None,
159 receive_buffer_bytes: config.receive_buffer_bytes,
160 max_connection_duration_secs: config.max_connection_duration_secs,
161 max_connections: config.connection_limit,
162 allowlist: config.permit_origin.map(Into::into),
163 log_namespace,
164 }
165 }
166}
167
168impl<T: FrameHandler + Clone> FrameHandler for DnstapFrameHandler<T> {
169 fn content_type(&self) -> String {
170 self.frame_handler.content_type()
171 }
172
173 fn max_frame_length(&self) -> usize {
174 self.frame_handler.max_frame_length()
175 }
176
177 fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
182 self.frame_handler
183 .handle_event(received_from, frame)
184 .map(|mut event| {
185 if let Event::Log(mut log_event) = event {
186 if let Some(tls_client_metadata) = &self.tls_client_metadata {
187 self.log_namespace.insert_source_metadata(
188 super::DnstapConfig::NAME,
189 &mut log_event,
190 self.tls_client_metadata_key
191 .as_ref()
192 .map(LegacyKey::Overwrite),
193 path!("tls_client_metadata"),
194 tls_client_metadata.clone(),
195 );
196 }
197
198 emit!(SocketEventsReceived {
199 mode: SocketMode::Tcp,
200 byte_size: log_event.estimated_json_encoded_size_of(),
201 count: 1
202 });
203
204 event = Event::from(log_event);
205 }
206 event
207 })
208 }
209
210 fn multithreaded(&self) -> bool {
211 self.frame_handler.multithreaded()
212 }
213
214 fn max_frame_handling_tasks(&self) -> usize {
215 self.frame_handler.max_frame_handling_tasks()
216 }
217
218 fn host_key(&self) -> &Option<OwnedValuePath> {
219 self.frame_handler.host_key()
220 }
221
222 fn source_type_key(&self) -> Option<&OwnedValuePath> {
223 self.frame_handler.source_type_key()
224 }
225
226 fn timestamp_key(&self) -> Option<&OwnedValuePath> {
227 self.frame_handler.timestamp_key()
228 }
229}
230
231impl<T: FrameHandler + Clone> TcpFrameHandler for DnstapFrameHandler<T> {
232 fn address(&self) -> SocketListenAddr {
233 self.address
234 }
235
236 fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
237 self.keepalive
238 }
239
240 fn shutdown_timeout_secs(&self) -> Duration {
241 self.shutdown_timeout_secs
242 }
243
244 fn tls(&self) -> MaybeTlsSettings {
245 self.tls.clone()
246 }
247
248 fn tls_client_metadata_key(&self) -> Option<OwnedValuePath> {
249 self.tls_client_metadata_key.clone()
250 }
251
252 fn receive_buffer_bytes(&self) -> Option<usize> {
253 self.receive_buffer_bytes
254 }
255
256 fn max_connection_duration_secs(&self) -> Option<u64> {
257 self.max_connection_duration_secs
258 }
259
260 fn max_connections(&self) -> Option<u32> {
261 self.max_connections
262 }
263
264 fn insert_tls_client_metadata(&mut self, metadata: Option<CertificateMetadata>) {
265 self.tls_client_metadata = metadata.map(|c| {
266 let mut metadata = ObjectMap::new();
267 metadata.insert("subject".into(), c.subject().into());
268 metadata
269 });
270 }
271
272 fn allowed_origins(&self) -> Option<&[IpNet]> {
273 self.allowlist.as_deref()
274 }
275}