vector/sources/socket/
tcp.rs1use std::time::Duration;
2
3use chrono::Utc;
4use serde_with::serde_as;
5use smallvec::SmallVec;
6use vector_lib::{
7 codecs::decoding::{DeserializerConfig, FramingConfig},
8 config::{LegacyKey, LogNamespace},
9 configurable::configurable_component,
10 ipallowlist::IpAllowlistConfig,
11 lookup::{lookup_v2::OptionalValuePath, owned_value_path, path},
12};
13
14use super::{SocketConfig, default_host_key};
15use crate::{
16 codecs::Decoder,
17 event::Event,
18 serde::default_decoding,
19 sources::util::net::{SocketListenAddr, TcpNullAcker, TcpSource},
20 tcp::TcpKeepaliveConfig,
21 tls::TlsSourceConfig,
22};
23
24#[serde_as]
26#[configurable_component]
27#[derive(Clone, Debug)]
28pub struct TcpConfig {
29 #[configurable(derived)]
30 address: SocketListenAddr,
31
32 #[configurable(derived)]
33 keepalive: Option<TcpKeepaliveConfig>,
34
35 #[serde(default = "default_shutdown_timeout_secs")]
37 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
38 #[configurable(metadata(docs::human_name = "Shutdown Timeout"))]
39 shutdown_timeout_secs: Duration,
40
41 host_key: Option<OptionalValuePath>,
51
52 #[serde(default = "default_port_key")]
60 port_key: OptionalValuePath,
61
62 #[configurable(derived)]
63 pub permit_origin: Option<IpAllowlistConfig>,
64
65 #[configurable(derived)]
66 tls: Option<TlsSourceConfig>,
67
68 #[configurable(metadata(docs::type_unit = "bytes"))]
70 receive_buffer_bytes: Option<usize>,
71
72 #[configurable(metadata(docs::type_unit = "seconds"))]
76 max_connection_duration_secs: Option<u64>,
77
78 #[configurable(metadata(docs::type_unit = "connections"))]
80 pub connection_limit: Option<u32>,
81
82 #[configurable(derived)]
83 pub(super) framing: Option<FramingConfig>,
84
85 #[configurable(derived)]
86 #[serde(default = "default_decoding")]
87 pub(super) decoding: DeserializerConfig,
88
89 #[serde(default)]
91 #[configurable(metadata(docs::hidden))]
92 pub log_namespace: Option<bool>,
93}
94
95const fn default_shutdown_timeout_secs() -> Duration {
96 Duration::from_secs(30)
97}
98
99fn default_port_key() -> OptionalValuePath {
100 OptionalValuePath::from(owned_value_path!("port"))
101}
102
103impl TcpConfig {
104 pub fn from_address(address: SocketListenAddr) -> Self {
105 Self {
106 address,
107 keepalive: None,
108 shutdown_timeout_secs: default_shutdown_timeout_secs(),
109 host_key: None,
110 port_key: default_port_key(),
111 permit_origin: None,
112 tls: None,
113 receive_buffer_bytes: None,
114 max_connection_duration_secs: None,
115 framing: None,
116 decoding: default_decoding(),
117 connection_limit: None,
118 log_namespace: None,
119 }
120 }
121
122 pub(super) fn host_key(&self) -> OptionalValuePath {
123 self.host_key.clone().unwrap_or(default_host_key())
124 }
125
126 pub const fn port_key(&self) -> &OptionalValuePath {
127 &self.port_key
128 }
129
130 pub const fn tls(&self) -> &Option<TlsSourceConfig> {
131 &self.tls
132 }
133
134 pub const fn framing(&self) -> &Option<FramingConfig> {
135 &self.framing
136 }
137
138 pub const fn decoding(&self) -> &DeserializerConfig {
139 &self.decoding
140 }
141
142 pub const fn address(&self) -> SocketListenAddr {
143 self.address
144 }
145
146 pub const fn keepalive(&self) -> Option<TcpKeepaliveConfig> {
147 self.keepalive
148 }
149
150 pub const fn shutdown_timeout_secs(&self) -> Duration {
151 self.shutdown_timeout_secs
152 }
153
154 pub const fn receive_buffer_bytes(&self) -> Option<usize> {
155 self.receive_buffer_bytes
156 }
157
158 pub const fn max_connection_duration_secs(&self) -> Option<u64> {
159 self.max_connection_duration_secs
160 }
161
162 pub const fn set_max_connection_duration_secs(&mut self, val: Option<u64>) -> &mut Self {
163 self.max_connection_duration_secs = val;
164 self
165 }
166
167 pub const fn set_shutdown_timeout_secs(&mut self, val: u64) -> &mut Self {
168 self.shutdown_timeout_secs = Duration::from_secs(val);
169 self
170 }
171
172 pub fn set_tls(&mut self, val: Option<TlsSourceConfig>) -> &mut Self {
173 self.tls = val;
174 self
175 }
176
177 pub const fn set_framing(&mut self, val: Option<FramingConfig>) -> &mut Self {
178 self.framing = val;
179 self
180 }
181
182 pub fn set_decoding(&mut self, val: DeserializerConfig) -> &mut Self {
183 self.decoding = val;
184 self
185 }
186
187 pub const fn set_log_namespace(&mut self, val: Option<bool>) -> &mut Self {
188 self.log_namespace = val;
189 self
190 }
191}
192
193#[derive(Clone)]
194pub struct RawTcpSource {
195 config: TcpConfig,
196 decoder: Decoder,
197 log_namespace: LogNamespace,
198}
199
200impl RawTcpSource {
201 pub const fn new(config: TcpConfig, decoder: Decoder, log_namespace: LogNamespace) -> Self {
202 Self {
203 config,
204 decoder,
205 log_namespace,
206 }
207 }
208}
209
210impl TcpSource for RawTcpSource {
211 type Error = vector_lib::codecs::decoding::Error;
212 type Item = SmallVec<[Event; 1]>;
213 type Decoder = Decoder;
214 type Acker = TcpNullAcker;
215
216 fn decoder(&self) -> Self::Decoder {
217 self.decoder.clone()
218 }
219
220 fn handle_events(&self, events: &mut [Event], host: std::net::SocketAddr) {
221 let now = Utc::now();
222
223 for event in events {
224 if let Event::Log(log) = event {
225 self.log_namespace.insert_standard_vector_source_metadata(
226 log,
227 SocketConfig::NAME,
228 now,
229 );
230
231 let legacy_host_key = self
232 .config
233 .host_key
234 .clone()
235 .unwrap_or(default_host_key())
236 .path;
237
238 self.log_namespace.insert_source_metadata(
239 SocketConfig::NAME,
240 log,
241 legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
242 path!("host"),
243 host.ip().to_string(),
244 );
245
246 let legacy_port_key = self.config.port_key.clone().path;
247
248 self.log_namespace.insert_source_metadata(
249 SocketConfig::NAME,
250 log,
251 legacy_port_key.as_ref().map(LegacyKey::InsertIfEmpty),
252 path!("port"),
253 host.port(),
254 );
255 }
256 }
257 }
258
259 fn build_acker(&self, _: &[Self::Item]) -> Self::Acker {
260 TcpNullAcker
261 }
262}