vector/sources/socket/
tcp.rs

1use std::time::Duration;
2
3use chrono::Utc;
4use serde_with::serde_as;
5use smallvec::SmallVec;
6use vector_lib::{
7    codecs::decoding::{DeserializerConfig, FramingConfig},
8    config::{LegacyKey, LogNamespace},
9    configurable::configurable_component,
10    ipallowlist::IpAllowlistConfig,
11    lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
12};
13
14use super::{SocketConfig, default_host_key};
15use crate::{
16    codecs::Decoder,
17    event::Event,
18    serde::default_decoding,
19    sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource},
20    tcp::TcpKeepaliveConfig,
21    tls::TlsSourceConfig,
22};
23
24/// TCP configuration for the `socket` source.
25#[serde_as]
26#[configurable_component]
27#[derive(Clone, Debug)]
28pub struct TcpConfig {
29    #[configurable(derived)]
30    address: SocketListenAddr,
31
32    #[configurable(derived)]
33    keepalive: Option<TcpKeepaliveConfig>,
34
35    /// The timeout before a connection is forcefully closed during shutdown.
36    #[serde(default = "default_shutdown_timeout_secs")]
37    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
38    #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
39    shutdown_timeout_secs: Duration,
40
41    /// Overrides the name of the log field used to add the peer host to each event.
42    ///
43    /// The value will be the peer host's address, including the port i.e. `1.2.3.4:9000`.
44    ///
45    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
46    ///
47    /// Set to `""` to suppress this key.
48    ///
49    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
50    host_key: Option<OptionalValuePath>,
51
52    /// Overrides the name of the log field used to add the peer host's port to each event.
53    ///
54    /// The value will be the peer host's port i.e. `9000`.
55    ///
56    /// By default, `"port"` is used.
57    ///
58    /// Set to `""` to suppress this key.
59    #[serde(default = "default_port_key")]
60    port_key: OptionalValuePath,
61
62    #[configurable(derived)]
63    pub permit_origin: Option<IpAllowlistConfig>,
64
65    #[configurable(derived)]
66    tls: Option<TlsSourceConfig>,
67
68    /// The size of the receive buffer used for each connection.
69    #[configurable(metadata(docs::type_unit = "bytes"))]
70    receive_buffer_bytes: Option<usize>,
71
72    /// Maximum duration to keep each connection open. Connections open for longer than this duration are closed.
73    ///
74    /// This is helpful for load balancing long-lived connections.
75    #[configurable(metadata(docs::type_unit = "seconds"))]
76    max_connection_duration_secs: Option<u64>,
77
78    /// The maximum number of TCP connections that are allowed at any given time.
79    #[configurable(metadata(docs::type_unit = "connections"))]
80    pub connection_limit: Option<u32>,
81
82    #[configurable(derived)]
83    pub(super) framing: Option<FramingConfig>,
84
85    #[configurable(derived)]
86    #[serde(default = "default_decoding")]
87    pub(super) decoding: DeserializerConfig,
88
89    /// The namespace to use for logs. This overrides the global setting.
90    #[serde(default)]
91    #[configurable(metadata(docs::hidden))]
92    pub log_namespace: Option<bool>,
93}
94
95const fn default_shutdown_timeout_secs() -> Duration {
96    Duration::from_secs(30)
97}
98
99fn default_port_key() -> OptionalValuePath {
100    OptionalValuePath::from(owned_value_path!("port"))
101}
102
103impl TcpConfig {
104    pub fn from_address(address: SocketListenAddr) -> Self {
105        Self {
106            address,
107            keepalive: None,
108            shutdown_timeout_secs: default_shutdown_timeout_secs(),
109            host_key: None,
110            port_key: default_port_key(),
111            permit_origin: None,
112            tls: None,
113            receive_buffer_bytes: None,
114            max_connection_duration_secs: None,
115            framing: None,
116            decoding: default_decoding(),
117            connection_limit: None,
118            log_namespace: None,
119        }
120    }
121
122    pub(super) fn host_key(&self) -> OptionalValuePath {
123        self.host_key.clone().unwrap_or(default_host_key())
124    }
125
126    pub const fn port_key(&self) -> &OptionalValuePath {
127        &self.port_key
128    }
129
130    pub const fn tls(&self) -> &Option<TlsSourceConfig> {
131        &self.tls
132    }
133
134    pub const fn framing(&self) -> &Option<FramingConfig> {
135        &self.framing
136    }
137
138    pub const fn decoding(&self) -> &DeserializerConfig {
139        &self.decoding
140    }
141
142    pub const fn address(&self) -> SocketListenAddr {
143        self.address
144    }
145
146    pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
147        self.keepalive
148    }
149
150    pub const fn shutdown_timeout_secs(&self) -> Duration {
151        self.shutdown_timeout_secs
152    }
153
154    pub const fn receive_buffer_bytes(&self) -> Option<usize> {
155        self.receive_buffer_bytes
156    }
157
158    pub const fn max_connection_duration_secs(&self) -> Option<u64> {
159        self.max_connection_duration_secs
160    }
161
162    pub const fn set_max_connection_duration_secs(&mut self, val: Option<u64>) -> &mut Self {
163        self.max_connection_duration_secs = val;
164        self
165    }
166
167    pub const fn set_shutdown_timeout_secs(&mut self, val: u64) -> &mut Self {
168        self.shutdown_timeout_secs = Duration::from_secs(val);
169        self
170    }
171
172    pub fn set_tls(&mut self, val: Option<TlsSourceConfig>) -> &mut Self {
173        self.tls = val;
174        self
175    }
176
177    pub const fn set_framing(&mut self, val: Option<FramingConfig>) -> &mut Self {
178        self.framing = val;
179        self
180    }
181
182    pub fn set_decoding(&mut self, val: DeserializerConfig) -> &mut Self {
183        self.decoding = val;
184        self
185    }
186
187    pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
188        self.log_namespace = val;
189        self
190    }
191}
192
193#[derive(Clone)]
194pub struct RawTcpSource {
195    config: TcpConfig,
196    decoder: Decoder,
197    log_namespace: LogNamespace,
198}
199
200impl RawTcpSource {
201    pub const fn new(config: TcpConfig, decoder: Decoder, log_namespace: LogNamespace) -> Self {
202        Self {
203            config,
204            decoder,
205            log_namespace,
206        }
207    }
208}
209
210impl TcpSource for RawTcpSource {
211    type Error = vector_lib::codecs::decoding::Error;
212    type Item = SmallVec<[Event; 1]>;
213    type Decoder = Decoder;
214    type Acker = TcpNullAcker;
215
216    fn decoder(&self) -> Self::Decoder {
217        self.decoder.clone()
218    }
219
220    fn handle_events(&self, events: &mut [Event], host: std::net::SocketAddr) {
221        let now = Utc::now();
222
223        for event in events {
224            if let Event::Log(log) = event {
225                self.log_namespace.insert_standard_vector_source_metadata(
226                    log,
227                    SocketConfig::NAME,
228                    now,
229                );
230
231                let legacy_host_key = self
232                    .config
233                    .host_key
234                    .clone()
235                    .unwrap_or(default_host_key())
236                    .path;
237
238                self.log_namespace.insert_source_metadata(
239                    SocketConfig::NAME,
240                    log,
241                    legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
242                    path!("host"),
243                    host.ip().to_string(),
244                );
245
246                let legacy_port_key = self.config.port_key.clone().path;
247
248                self.log_namespace.insert_source_metadata(
249                    SocketConfig::NAME,
250                    log,
251                    legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
252                    path!("port"),
253                    host.port(),
254                );
255            }
256        }
257    }
258
259    fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
260        TcpNullAcker
261    }
262}