vector/sources/dnstap/
tcp.rs1use std::time::Duration;
2
3use bytes::Bytes;
4use ipnet::IpNet;
5use serde_with::serde_as;
6use vector_lib::{
7    EstimatedJsonEncodedSizeOf,
8    config::{LegacyKey, LogNamespace},
9    configurable::configurable_component,
10    ipallowlist::IpAllowlistConfig,
11    lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
12    tcp::TcpKeepaliveConfig,
13    tls::{CertificateMetadata, MaybeTlsSettings, TlsSourceConfig},
14};
15use vrl::{path::OwnedValuePath, value::ObjectMap};
16
17use crate::{
18    event::Event,
19    internal_events::{SocketEventsReceived, SocketMode},
20    sources::util::{
21        framestream::{FrameHandler, TcpFrameHandler},
22        net::SocketListenAddr,
23    },
24};
25
26#[serde_as]
28#[configurable_component]
29#[derive(Clone, Debug)]
30pub struct TcpConfig {
31    #[configurable(derived)]
32    address: SocketListenAddr,
33
34    #[configurable(derived)]
35    keepalive: Option<TcpKeepaliveConfig>,
36
37    #[serde(default = "default_shutdown_timeout_secs")]
39    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
40    #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
41    shutdown_timeout_secs: Duration,
42
43    #[serde(default = "default_port_key")]
51    pub port_key: OptionalValuePath,
52
53    #[configurable(derived)]
54    permit_origin: Option<IpAllowlistConfig>,
55
56    #[configurable(derived)]
57    tls: Option<TlsSourceConfig>,
58
59    #[configurable(metadata(docs::type_unit = "bytes"))]
61    receive_buffer_bytes: Option<usize>,
62
63    #[configurable(metadata(docs::type_unit = "seconds"))]
67    max_connection_duration_secs: Option<u64>,
68
69    #[configurable(metadata(docs::type_unit = "connections"))]
71    pub connection_limit: Option<u32>,
72}
73
74const fn default_shutdown_timeout_secs() -> Duration {
75    Duration::from_secs(30)
76}
77
78fn default_port_key() -> OptionalValuePath {
79    OptionalValuePath::from(owned_value_path!("port"))
80}
81
82impl TcpConfig {
83    pub fn from_address(address: SocketListenAddr) -> Self {
84        Self {
85            address,
86            keepalive: None,
87            shutdown_timeout_secs: default_shutdown_timeout_secs(),
88            port_key: default_port_key(),
89            permit_origin: None,
90            tls: None,
91            receive_buffer_bytes: None,
92            max_connection_duration_secs: None,
93            connection_limit: None,
94        }
95    }
96
97    pub const fn port_key(&self) -> &OptionalValuePath {
98        &self.port_key
99    }
100
101    pub const fn tls(&self) -> &Option<TlsSourceConfig> {
102        &self.tls
103    }
104
105    pub const fn address(&self) -> SocketListenAddr {
106        self.address
107    }
108
109    pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
110        self.keepalive
111    }
112
113    pub const fn shutdown_timeout_secs(&self) -> Duration {
114        self.shutdown_timeout_secs
115    }
116
117    pub const fn receive_buffer_bytes(&self) -> Option<usize> {
118        self.receive_buffer_bytes
119    }
120
121    pub const fn max_connection_duration_secs(&self) -> Option<u64> {
122        self.max_connection_duration_secs
123    }
124}
125
126#[derive(Clone)]
127pub struct DnstapFrameHandler<T: FrameHandler + Clone> {
128    frame_handler: T,
129    address: SocketListenAddr,
130    keepalive: Option<TcpKeepaliveConfig>,
131    shutdown_timeout_secs: Duration,
132    tls: MaybeTlsSettings,
133    tls_client_metadata_key: Option<OwnedValuePath>,
134    tls_client_metadata: Option<ObjectMap>,
135    receive_buffer_bytes: Option<usize>,
136    max_connection_duration_secs: Option<u64>,
137    max_connections: Option<u32>,
138    allowlist: Option<Vec<IpNet>>,
139    log_namespace: LogNamespace,
140}
141
142impl<T: FrameHandler + Clone> DnstapFrameHandler<T> {
143    pub fn new(
144        config: TcpConfig,
145        tls: MaybeTlsSettings,
146        frame_handler: T,
147        log_namespace: LogNamespace,
148    ) -> Self {
149        let tls_client_metadata_key = config
150            .tls()
151            .as_ref()
152            .and_then(|tls| tls.client_metadata_key.clone())
153            .and_then(|k| k.path);
154
155        Self {
156            frame_handler,
157            address: config.address,
158            keepalive: config.keepalive,
159            shutdown_timeout_secs: config.shutdown_timeout_secs,
160            tls,
161            tls_client_metadata_key,
162            tls_client_metadata: None,
163            receive_buffer_bytes: config.receive_buffer_bytes,
164            max_connection_duration_secs: config.max_connection_duration_secs,
165            max_connections: config.connection_limit,
166            allowlist: config.permit_origin.map(Into::into),
167            log_namespace,
168        }
169    }
170}
171
172impl<T: FrameHandler + Clone> FrameHandler for DnstapFrameHandler<T> {
173    fn content_type(&self) -> String {
174        self.frame_handler.content_type()
175    }
176
177    fn max_frame_length(&self) -> usize {
178        self.frame_handler.max_frame_length()
179    }
180
181    fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
186        self.frame_handler
187            .handle_event(received_from, frame)
188            .map(|mut event| {
189                if let Event::Log(mut log_event) = event {
190                    if let Some(tls_client_metadata) = &self.tls_client_metadata {
191                        self.log_namespace.insert_source_metadata(
192                            super::DnstapConfig::NAME,
193                            &mut log_event,
194                            self.tls_client_metadata_key
195                                .as_ref()
196                                .map(LegacyKey::Overwrite),
197                            path!("tls_client_metadata"),
198                            tls_client_metadata.clone(),
199                        );
200                    }
201
202                    emit!(SocketEventsReceived {
203                        mode: SocketMode::Tcp,
204                        byte_size: log_event.estimated_json_encoded_size_of(),
205                        count: 1
206                    });
207
208                    event = Event::from(log_event);
209                }
210                event
211            })
212    }
213
214    fn multithreaded(&self) -> bool {
215        self.frame_handler.multithreaded()
216    }
217
218    fn max_frame_handling_tasks(&self) -> usize {
219        self.frame_handler.max_frame_handling_tasks()
220    }
221
222    fn host_key(&self) -> &Option<OwnedValuePath> {
223        self.frame_handler.host_key()
224    }
225
226    fn source_type_key(&self) -> Option<&OwnedValuePath> {
227        self.frame_handler.source_type_key()
228    }
229
230    fn timestamp_key(&self) -> Option<&OwnedValuePath> {
231        self.frame_handler.timestamp_key()
232    }
233}
234
235impl<T: FrameHandler + Clone> TcpFrameHandler for DnstapFrameHandler<T> {
236    fn address(&self) -> SocketListenAddr {
237        self.address
238    }
239
240    fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
241        self.keepalive
242    }
243
244    fn shutdown_timeout_secs(&self) -> Duration {
245        self.shutdown_timeout_secs
246    }
247
248    fn tls(&self) -> MaybeTlsSettings {
249        self.tls.clone()
250    }
251
252    fn tls_client_metadata_key(&self) -> Option<OwnedValuePath> {
253        self.tls_client_metadata_key.clone()
254    }
255
256    fn receive_buffer_bytes(&self) -> Option<usize> {
257        self.receive_buffer_bytes
258    }
259
260    fn max_connection_duration_secs(&self) -> Option<u64> {
261        self.max_connection_duration_secs
262    }
263
264    fn max_connections(&self) -> Option<u32> {
265        self.max_connections
266    }
267
268    fn insert_tls_client_metadata(&mut self, metadata: Option<CertificateMetadata>) {
269        self.tls_client_metadata = metadata.map(|c| {
270            let mut metadata = ObjectMap::new();
271            metadata.insert("subject".into(), c.subject().into());
272            metadata
273        });
274    }
275
276    fn allowed_origins(&self) -> Option<&[IpNet]> {
277        self.allowlist.as_deref()
278    }
279}