vector/sources/socket/
tcp.rs

1use std::time::Duration;
2use vector_lib::ipallowlist::IpAllowlistConfig;
3
4use chrono::Utc;
5use serde_with::serde_as;
6use smallvec::SmallVec;
7use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
8use vector_lib::config::{LegacyKey, LogNamespace};
9use vector_lib::configurable::configurable_component;
10use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
11
12use crate::{
13    codecs::Decoder,
14    event::Event,
15    serde::default_decoding,
16    sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource},
17    tcp::TcpKeepaliveConfig,
18    tls::TlsSourceConfig,
19};
20
21use super::{default_host_key, SocketConfig};
22
23/// TCP configuration for the `socket` source.
24#[serde_as]
25#[configurable_component]
26#[derive(Clone, Debug)]
27pub struct TcpConfig {
28    #[configurable(derived)]
29    address: SocketListenAddr,
30
31    #[configurable(derived)]
32    keepalive: Option<TcpKeepaliveConfig>,
33
34    /// The timeout before a connection is forcefully closed during shutdown.
35    #[serde(default = "default_shutdown_timeout_secs")]
36    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
37    #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
38    shutdown_timeout_secs: Duration,
39
40    /// Overrides the name of the log field used to add the peer host to each event.
41    ///
42    /// The value will be the peer host's address, including the port i.e. `1.2.3.4:9000`.
43    ///
44    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
45    ///
46    /// Set to `""` to suppress this key.
47    ///
48    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
49    host_key: Option<OptionalValuePath>,
50
51    /// Overrides the name of the log field used to add the peer host's port to each event.
52    ///
53    /// The value will be the peer host's port i.e. `9000`.
54    ///
55    /// By default, `"port"` is used.
56    ///
57    /// Set to `""` to suppress this key.
58    #[serde(default = "default_port_key")]
59    port_key: OptionalValuePath,
60
61    #[configurable(derived)]
62    pub permit_origin: Option<IpAllowlistConfig>,
63
64    #[configurable(derived)]
65    tls: Option<TlsSourceConfig>,
66
67    /// The size of the receive buffer used for each connection.
68    #[configurable(metadata(docs::type_unit = "bytes"))]
69    receive_buffer_bytes: Option<usize>,
70
71    /// Maximum duration to keep each connection open. Connections open for longer than this duration are closed.
72    ///
73    /// This is helpful for load balancing long-lived connections.
74    #[configurable(metadata(docs::type_unit = "seconds"))]
75    max_connection_duration_secs: Option<u64>,
76
77    /// The maximum number of TCP connections that are allowed at any given time.
78    #[configurable(metadata(docs::type_unit = "connections"))]
79    pub connection_limit: Option<u32>,
80
81    #[configurable(derived)]
82    pub(super) framing: Option<FramingConfig>,
83
84    #[configurable(derived)]
85    #[serde(default = "default_decoding")]
86    pub(super) decoding: DeserializerConfig,
87
88    /// The namespace to use for logs. This overrides the global setting.
89    #[serde(default)]
90    #[configurable(metadata(docs::hidden))]
91    pub log_namespace: Option<bool>,
92}
93
94const fn default_shutdown_timeout_secs() -> Duration {
95    Duration::from_secs(30)
96}
97
98fn default_port_key() -> OptionalValuePath {
99    OptionalValuePath::from(owned_value_path!("port"))
100}
101
102impl TcpConfig {
103    pub fn from_address(address: SocketListenAddr) -> Self {
104        Self {
105            address,
106            keepalive: None,
107            shutdown_timeout_secs: default_shutdown_timeout_secs(),
108            host_key: None,
109            port_key: default_port_key(),
110            permit_origin: None,
111            tls: None,
112            receive_buffer_bytes: None,
113            max_connection_duration_secs: None,
114            framing: None,
115            decoding: default_decoding(),
116            connection_limit: None,
117            log_namespace: None,
118        }
119    }
120
121    pub(super) fn host_key(&self) -> OptionalValuePath {
122        self.host_key.clone().unwrap_or(default_host_key())
123    }
124
125    pub const fn port_key(&self) -> &OptionalValuePath {
126        &self.port_key
127    }
128
129    pub const fn tls(&self) -> &Option<TlsSourceConfig> {
130        &self.tls
131    }
132
133    pub const fn framing(&self) -> &Option<FramingConfig> {
134        &self.framing
135    }
136
137    pub const fn decoding(&self) -> &DeserializerConfig {
138        &self.decoding
139    }
140
141    pub const fn address(&self) -> SocketListenAddr {
142        self.address
143    }
144
145    pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
146        self.keepalive
147    }
148
149    pub const fn shutdown_timeout_secs(&self) -> Duration {
150        self.shutdown_timeout_secs
151    }
152
153    pub const fn receive_buffer_bytes(&self) -> Option<usize> {
154        self.receive_buffer_bytes
155    }
156
157    pub const fn max_connection_duration_secs(&self) -> Option<u64> {
158        self.max_connection_duration_secs
159    }
160
161    pub const fn set_max_connection_duration_secs(&mut self, val: Option<u64>) -> &mut Self {
162        self.max_connection_duration_secs = val;
163        self
164    }
165
166    pub const fn set_shutdown_timeout_secs(&mut self, val: u64) -> &mut Self {
167        self.shutdown_timeout_secs = Duration::from_secs(val);
168        self
169    }
170
171    pub fn set_tls(&mut self, val: Option<TlsSourceConfig>) -> &mut Self {
172        self.tls = val;
173        self
174    }
175
176    pub const fn set_framing(&mut self, val: Option<FramingConfig>) -> &mut Self {
177        self.framing = val;
178        self
179    }
180
181    pub fn set_decoding(&mut self, val: DeserializerConfig) -> &mut Self {
182        self.decoding = val;
183        self
184    }
185
186    pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
187        self.log_namespace = val;
188        self
189    }
190}
191
192#[derive(Clone)]
193pub struct RawTcpSource {
194    config: TcpConfig,
195    decoder: Decoder,
196    log_namespace: LogNamespace,
197}
198
199impl RawTcpSource {
200    pub const fn new(config: TcpConfig, decoder: Decoder, log_namespace: LogNamespace) -> Self {
201        Self {
202            config,
203            decoder,
204            log_namespace,
205        }
206    }
207}
208
209impl TcpSource for RawTcpSource {
210    type Error = vector_lib::codecs::decoding::Error;
211    type Item = SmallVec<[Event; 1]>;
212    type Decoder = Decoder;
213    type Acker = TcpNullAcker;
214
215    fn decoder(&self) -> Self::Decoder {
216        self.decoder.clone()
217    }
218
219    fn handle_events(&self, events: &mut [Event], host: std::net::SocketAddr) {
220        let now = Utc::now();
221
222        for event in events {
223            if let Event::Log(log) = event {
224                self.log_namespace.insert_standard_vector_source_metadata(
225                    log,
226                    SocketConfig::NAME,
227                    now,
228                );
229
230                let legacy_host_key = self
231                    .config
232                    .host_key
233                    .clone()
234                    .unwrap_or(default_host_key())
235                    .path;
236
237                self.log_namespace.insert_source_metadata(
238                    SocketConfig::NAME,
239                    log,
240                    legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
241                    path!("host"),
242                    host.ip().to_string(),
243                );
244
245                let legacy_port_key = self.config.port_key.clone().path;
246
247                self.log_namespace.insert_source_metadata(
248                    SocketConfig::NAME,
249                    log,
250                    legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
251                    path!("port"),
252                    host.port(),
253                );
254            }
255        }
256    }
257
258    fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
259        TcpNullAcker
260    }
261}