vector/sources/socket/
tcp.rs1use std::time::Duration;
2use vector_lib::ipallowlist::IpAllowlistConfig;
3
4use chrono::Utc;
5use serde_with::serde_as;
6use smallvec::SmallVec;
7use vector_lib::codecs::decoding::{DeserializerConfig, FramingConfig};
8use vector_lib::config::{LegacyKey, LogNamespace};
9use vector_lib::configurable::configurable_component;
10use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
11
12use crate::{
13 codecs::Decoder,
14 event::Event,
15 serde::default_decoding,
16 sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource},
17 tcp::TcpKeepaliveConfig,
18 tls::TlsSourceConfig,
19};
20
21use super::{default_host_key, SocketConfig};
22
23#[serde_as]
25#[configurable_component]
26#[derive(Clone, Debug)]
27pub struct TcpConfig {
28 #[configurable(derived)]
29 address: SocketListenAddr,
30
31 #[configurable(derived)]
32 keepalive: Option<TcpKeepaliveConfig>,
33
34 #[serde(default = "default_shutdown_timeout_secs")]
36 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
37 #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
38 shutdown_timeout_secs: Duration,
39
40 host_key: Option<OptionalValuePath>,
50
51 #[serde(default = "default_port_key")]
59 port_key: OptionalValuePath,
60
61 #[configurable(derived)]
62 pub permit_origin: Option<IpAllowlistConfig>,
63
64 #[configurable(derived)]
65 tls: Option<TlsSourceConfig>,
66
67 #[configurable(metadata(docs::type_unit = "bytes"))]
69 receive_buffer_bytes: Option<usize>,
70
71 #[configurable(metadata(docs::type_unit = "seconds"))]
75 max_connection_duration_secs: Option<u64>,
76
77 #[configurable(metadata(docs::type_unit = "connections"))]
79 pub connection_limit: Option<u32>,
80
81 #[configurable(derived)]
82 pub(super) framing: Option<FramingConfig>,
83
84 #[configurable(derived)]
85 #[serde(default = "default_decoding")]
86 pub(super) decoding: DeserializerConfig,
87
88 #[serde(default)]
90 #[configurable(metadata(docs::hidden))]
91 pub log_namespace: Option<bool>,
92}
93
94const fn default_shutdown_timeout_secs() -> Duration {
95 Duration::from_secs(30)
96}
97
98fn default_port_key() -> OptionalValuePath {
99 OptionalValuePath::from(owned_value_path!("port"))
100}
101
102impl TcpConfig {
103 pub fn from_address(address: SocketListenAddr) -> Self {
104 Self {
105 address,
106 keepalive: None,
107 shutdown_timeout_secs: default_shutdown_timeout_secs(),
108 host_key: None,
109 port_key: default_port_key(),
110 permit_origin: None,
111 tls: None,
112 receive_buffer_bytes: None,
113 max_connection_duration_secs: None,
114 framing: None,
115 decoding: default_decoding(),
116 connection_limit: None,
117 log_namespace: None,
118 }
119 }
120
121 pub(super) fn host_key(&self) -> OptionalValuePath {
122 self.host_key.clone().unwrap_or(default_host_key())
123 }
124
125 pub const fn port_key(&self) -> &OptionalValuePath {
126 &self.port_key
127 }
128
129 pub const fn tls(&self) -> &Option<TlsSourceConfig> {
130 &self.tls
131 }
132
133 pub const fn framing(&self) -> &Option<FramingConfig> {
134 &self.framing
135 }
136
137 pub const fn decoding(&self) -> &DeserializerConfig {
138 &self.decoding
139 }
140
141 pub const fn address(&self) -> SocketListenAddr {
142 self.address
143 }
144
145 pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
146 self.keepalive
147 }
148
149 pub const fn shutdown_timeout_secs(&self) -> Duration {
150 self.shutdown_timeout_secs
151 }
152
153 pub const fn receive_buffer_bytes(&self) -> Option<usize> {
154 self.receive_buffer_bytes
155 }
156
157 pub const fn max_connection_duration_secs(&self) -> Option<u64> {
158 self.max_connection_duration_secs
159 }
160
161 pub const fn set_max_connection_duration_secs(&mut self, val: Option<u64>) -> &mut Self {
162 self.max_connection_duration_secs = val;
163 self
164 }
165
166 pub const fn set_shutdown_timeout_secs(&mut self, val: u64) -> &mut Self {
167 self.shutdown_timeout_secs = Duration::from_secs(val);
168 self
169 }
170
171 pub fn set_tls(&mut self, val: Option<TlsSourceConfig>) -> &mut Self {
172 self.tls = val;
173 self
174 }
175
176 pub const fn set_framing(&mut self, val: Option<FramingConfig>) -> &mut Self {
177 self.framing = val;
178 self
179 }
180
181 pub fn set_decoding(&mut self, val: DeserializerConfig) -> &mut Self {
182 self.decoding = val;
183 self
184 }
185
186 pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
187 self.log_namespace = val;
188 self
189 }
190}
191
192#[derive(Clone)]
193pub struct RawTcpSource {
194 config: TcpConfig,
195 decoder: Decoder,
196 log_namespace: LogNamespace,
197}
198
199impl RawTcpSource {
200 pub const fn new(config: TcpConfig, decoder: Decoder, log_namespace: LogNamespace) -> Self {
201 Self {
202 config,
203 decoder,
204 log_namespace,
205 }
206 }
207}
208
209impl TcpSource for RawTcpSource {
210 type Error = vector_lib::codecs::decoding::Error;
211 type Item = SmallVec<[Event; 1]>;
212 type Decoder = Decoder;
213 type Acker = TcpNullAcker;
214
215 fn decoder(&self) -> Self::Decoder {
216 self.decoder.clone()
217 }
218
219 fn handle_events(&self, events: &mut [Event], host: std::net::SocketAddr) {
220 let now = Utc::now();
221
222 for event in events {
223 if let Event::Log(log) = event {
224 self.log_namespace.insert_standard_vector_source_metadata(
225 log,
226 SocketConfig::NAME,
227 now,
228 );
229
230 let legacy_host_key = self
231 .config
232 .host_key
233 .clone()
234 .unwrap_or(default_host_key())
235 .path;
236
237 self.log_namespace.insert_source_metadata(
238 SocketConfig::NAME,
239 log,
240 legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
241 path!("host"),
242 host.ip().to_string(),
243 );
244
245 let legacy_port_key = self.config.port_key.clone().path;
246
247 self.log_namespace.insert_source_metadata(
248 SocketConfig::NAME,
249 log,
250 legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
251 path!("port"),
252 host.port(),
253 );
254 }
255 }
256 }
257
258 fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
259 TcpNullAcker
260 }
261}