vector/sources/dnstap/
tcp.rs

1use ipnet::IpNet;
2use std::time::Duration;
3
4use bytes::Bytes;
5use serde_with::serde_as;
6use vector_lib::configurable::configurable_component;
7use vector_lib::ipallowlist::IpAllowlistConfig;
8use vector_lib::lookup::{owned_value_path, path};
9use vector_lib::tcp::TcpKeepaliveConfig;
10use vector_lib::tls::{CertificateMetadata, MaybeTlsSettings, TlsSourceConfig};
11use vector_lib::EstimatedJsonEncodedSizeOf;
12use vrl::path::OwnedValuePath;
13use vrl::value::ObjectMap;
14
15use crate::internal_events::{SocketEventsReceived, SocketMode};
16use crate::sources::util::framestream::{FrameHandler, TcpFrameHandler};
17use crate::{event::Event, sources::util::net::SocketListenAddr};
18
19use vector_lib::config::{LegacyKey, LogNamespace};
20use vector_lib::lookup::lookup_v2::OptionalValuePath;
21
22/// TCP configuration for the `dnstap` source.
23#[serde_as]
24#[configurable_component]
25#[derive(Clone, Debug)]
26pub struct TcpConfig {
27    #[configurable(derived)]
28    address: SocketListenAddr,
29
30    #[configurable(derived)]
31    keepalive: Option<TcpKeepaliveConfig>,
32
33    /// The timeout before a connection is forcefully closed during shutdown.
34    #[serde(default = "default_shutdown_timeout_secs")]
35    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
36    #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
37    shutdown_timeout_secs: Duration,
38
39    /// Overrides the name of the log field used to add the peer host's port to each event.
40    ///
41    /// The value will be the peer host's port i.e. `9000`.
42    ///
43    /// By default, `"port"` is used.
44    ///
45    /// Set to `""` to suppress this key.
46    #[serde(default = "default_port_key")]
47    pub port_key: OptionalValuePath,
48
49    #[configurable(derived)]
50    permit_origin: Option<IpAllowlistConfig>,
51
52    #[configurable(derived)]
53    tls: Option<TlsSourceConfig>,
54
55    /// The size of the receive buffer used for each connection.
56    #[configurable(metadata(docs::type_unit = "bytes"))]
57    receive_buffer_bytes: Option<usize>,
58
59    /// Maximum duration to keep each connection open. Connections open for longer than this duration are closed.
60    ///
61    /// This is helpful for load balancing long-lived connections.
62    #[configurable(metadata(docs::type_unit = "seconds"))]
63    max_connection_duration_secs: Option<u64>,
64
65    /// The maximum number of TCP connections that are allowed at any given time.
66    #[configurable(metadata(docs::type_unit = "connections"))]
67    pub connection_limit: Option<u32>,
68}
69
70const fn default_shutdown_timeout_secs() -> Duration {
71    Duration::from_secs(30)
72}
73
74fn default_port_key() -> OptionalValuePath {
75    OptionalValuePath::from(owned_value_path!("port"))
76}
77
78impl TcpConfig {
79    pub fn from_address(address: SocketListenAddr) -> Self {
80        Self {
81            address,
82            keepalive: None,
83            shutdown_timeout_secs: default_shutdown_timeout_secs(),
84            port_key: default_port_key(),
85            permit_origin: None,
86            tls: None,
87            receive_buffer_bytes: None,
88            max_connection_duration_secs: None,
89            connection_limit: None,
90        }
91    }
92
93    pub const fn port_key(&self) -> &OptionalValuePath {
94        &self.port_key
95    }
96
97    pub const fn tls(&self) -> &Option<TlsSourceConfig> {
98        &self.tls
99    }
100
101    pub const fn address(&self) -> SocketListenAddr {
102        self.address
103    }
104
105    pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
106        self.keepalive
107    }
108
109    pub const fn shutdown_timeout_secs(&self) -> Duration {
110        self.shutdown_timeout_secs
111    }
112
113    pub const fn receive_buffer_bytes(&self) -> Option<usize> {
114        self.receive_buffer_bytes
115    }
116
117    pub const fn max_connection_duration_secs(&self) -> Option<u64> {
118        self.max_connection_duration_secs
119    }
120}
121
122#[derive(Clone)]
123pub struct DnstapFrameHandler<T: FrameHandler + Clone> {
124    frame_handler: T,
125    address: SocketListenAddr,
126    keepalive: Option<TcpKeepaliveConfig>,
127    shutdown_timeout_secs: Duration,
128    tls: MaybeTlsSettings,
129    tls_client_metadata_key: Option<OwnedValuePath>,
130    tls_client_metadata: Option<ObjectMap>,
131    receive_buffer_bytes: Option<usize>,
132    max_connection_duration_secs: Option<u64>,
133    max_connections: Option<u32>,
134    allowlist: Option<Vec<IpNet>>,
135    log_namespace: LogNamespace,
136}
137
138impl<T: FrameHandler + Clone> DnstapFrameHandler<T> {
139    pub fn new(
140        config: TcpConfig,
141        tls: MaybeTlsSettings,
142        frame_handler: T,
143        log_namespace: LogNamespace,
144    ) -> Self {
145        let tls_client_metadata_key = config
146            .tls()
147            .as_ref()
148            .and_then(|tls| tls.client_metadata_key.clone())
149            .and_then(|k| k.path);
150
151        Self {
152            frame_handler,
153            address: config.address,
154            keepalive: config.keepalive,
155            shutdown_timeout_secs: config.shutdown_timeout_secs,
156            tls,
157            tls_client_metadata_key,
158            tls_client_metadata: None,
159            receive_buffer_bytes: config.receive_buffer_bytes,
160            max_connection_duration_secs: config.max_connection_duration_secs,
161            max_connections: config.connection_limit,
162            allowlist: config.permit_origin.map(Into::into),
163            log_namespace,
164        }
165    }
166}
167
168impl<T: FrameHandler + Clone> FrameHandler for DnstapFrameHandler<T> {
169    fn content_type(&self) -> String {
170        self.frame_handler.content_type()
171    }
172
173    fn max_frame_length(&self) -> usize {
174        self.frame_handler.max_frame_length()
175    }
176
177    /**
178     * Function to pass into util::framestream::build_framestream_unix_source
179     * Takes a data frame from the unix socket and turns it into a Vector Event.
180     **/
181    fn handle_event(&self, received_from: Option<Bytes>, frame: Bytes) -> Option<Event> {
182        self.frame_handler
183            .handle_event(received_from, frame)
184            .map(|mut event| {
185                if let Event::Log(mut log_event) = event {
186                    if let Some(tls_client_metadata) = &self.tls_client_metadata {
187                        self.log_namespace.insert_source_metadata(
188                            super::DnstapConfig::NAME,
189                            &mut log_event,
190                            self.tls_client_metadata_key
191                                .as_ref()
192                                .map(LegacyKey::Overwrite),
193                            path!("tls_client_metadata"),
194                            tls_client_metadata.clone(),
195                        );
196                    }
197
198                    emit!(SocketEventsReceived {
199                        mode: SocketMode::Tcp,
200                        byte_size: log_event.estimated_json_encoded_size_of(),
201                        count: 1
202                    });
203
204                    event = Event::from(log_event);
205                }
206                event
207            })
208    }
209
210    fn multithreaded(&self) -> bool {
211        self.frame_handler.multithreaded()
212    }
213
214    fn max_frame_handling_tasks(&self) -> usize {
215        self.frame_handler.max_frame_handling_tasks()
216    }
217
218    fn host_key(&self) -> &Option<OwnedValuePath> {
219        self.frame_handler.host_key()
220    }
221
222    fn source_type_key(&self) -> Option<&OwnedValuePath> {
223        self.frame_handler.source_type_key()
224    }
225
226    fn timestamp_key(&self) -> Option<&OwnedValuePath> {
227        self.frame_handler.timestamp_key()
228    }
229}
230
231impl<T: FrameHandler + Clone> TcpFrameHandler for DnstapFrameHandler<T> {
232    fn address(&self) -> SocketListenAddr {
233        self.address
234    }
235
236    fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
237        self.keepalive
238    }
239
240    fn shutdown_timeout_secs(&self) -> Duration {
241        self.shutdown_timeout_secs
242    }
243
244    fn tls(&self) -> MaybeTlsSettings {
245        self.tls.clone()
246    }
247
248    fn tls_client_metadata_key(&self) -> Option<OwnedValuePath> {
249        self.tls_client_metadata_key.clone()
250    }
251
252    fn receive_buffer_bytes(&self) -> Option<usize> {
253        self.receive_buffer_bytes
254    }
255
256    fn max_connection_duration_secs(&self) -> Option<u64> {
257        self.max_connection_duration_secs
258    }
259
260    fn max_connections(&self) -> Option<u32> {
261        self.max_connections
262    }
263
264    fn insert_tls_client_metadata(&mut self, metadata: Option<CertificateMetadata>) {
265        self.tls_client_metadata = metadata.map(|c| {
266            let mut metadata = ObjectMap::new();
267            metadata.insert("subject".into(), c.subject().into());
268            metadata
269        });
270    }
271
272    fn allowed_origins(&self) -> Option<&[IpNet]> {
273        self.allowlist.as_deref()
274    }
275}