vector/sources/util/net/tcp/
mod.rs1pub mod request_limiter;
2
3use std::{io, mem::drop, net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use futures::{FutureExt, StreamExt, future::BoxFuture};
7use futures_util::future::OptionFuture;
8use ipnet::IpNet;
9use listenfd::ListenFd;
10use smallvec::SmallVec;
11use socket2::SockRef;
12use tokio::{
13 io::AsyncWriteExt,
14 net::{TcpListener, TcpStream},
15 time::sleep,
16};
17use tokio_util::codec::Decoder;
18use tracing::Instrument;
19use vector_lib::{
20 EstimatedJsonEncodedSizeOf,
21 codecs::{ReadyFrames, StreamDecodingError, internal_events::DecoderFramingError},
22 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig},
23 event::{BatchNotifier, BatchStatus, Event},
24 finalization::AddBatchNotifier,
25 lookup::{OwnedValuePath, path},
26 shutdown::ShutdownSignal,
27 source_sender::SourceSender,
28 tcp::TcpKeepaliveConfig,
29 tls::{CertificateMetadata, MaybeTlsIncomingStream, MaybeTlsListener, MaybeTlsSettings},
30};
31use vrl::value::ObjectMap;
32
33use self::request_limiter::RequestLimiter;
34use super::SocketListenAddr;
35use crate::{
36 config::SourceContext,
37 internal_events::{
38 ConnectionOpen, OpenGauge, SocketBindError, SocketEventsReceived, SocketMode,
39 SocketReceiveError, StreamClosedError, TcpBytesReceived, TcpSendAckError,
40 TcpSocketTlsConnectionError,
41 },
42 sources::util::{AfterReadExt, LenientFramedRead},
43};
44
45pub const MAX_IN_FLIGHT_EVENTS_TARGET: usize = 100_000;
46
47pub async fn try_bind_tcp_listener(
48 addr: SocketListenAddr,
49 mut listenfd: ListenFd,
50 tls: &MaybeTlsSettings,
51 allowlist: Option<Vec<IpNet>>,
52) -> crate::Result<MaybeTlsListener> {
53 match addr {
54 SocketListenAddr::SocketAddr(addr) => tls.bind(&addr).await.map_err(Into::into),
55 SocketListenAddr::SystemdFd(offset) => match listenfd.take_tcp_listener(offset)? {
56 Some(listener) => TcpListener::from_std(listener)
57 .map(Into::into)
58 .map_err(Into::into),
59 None => {
60 Err(io::Error::new(io::ErrorKind::AddrInUse, "systemd fd already consumed").into())
61 }
62 },
63 }
64 .map(|listener| listener.with_allowlist(allowlist))
65}
66
67#[derive(Clone, Copy, Eq, PartialEq)]
68pub enum TcpSourceAck {
69 Ack,
70 Error,
71 Reject,
72}
73
74pub trait TcpSourceAcker {
75 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes>;
76}
77
78pub struct TcpNullAcker;
79
80impl TcpSourceAcker for TcpNullAcker {
81 fn build_ack(self, _ack: TcpSourceAck) -> Option<Bytes> {
84 None
85 }
86}
87
88pub trait TcpSource: Clone + Send + Sync + 'static
89where
90 <<Self as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
91{
92 type Error: From<io::Error>
95 + StreamDecodingError
96 + std::fmt::Debug
97 + std::fmt::Display
98 + Send
99 + Unpin;
100 type Item: Into<SmallVec<[Event; 1]>> + Send + Unpin;
101 type Decoder: Decoder<Item = (Self::Item, usize), Error = Self::Error> + Send + 'static;
102 type Acker: TcpSourceAcker + Send;
103
104 fn decoder(&self) -> Self::Decoder;
105
106 fn handle_events(&self, _events: &mut [Event], _host: std::net::SocketAddr) {}
107
108 fn build_acker(&self, item: &[Self::Item]) -> Self::Acker;
109
110 #[allow(clippy::too_many_arguments)]
111 fn run(
112 self,
113 addr: SocketListenAddr,
114 keepalive: Option<TcpKeepaliveConfig>,
115 shutdown_timeout_secs: Duration,
116 tls: MaybeTlsSettings,
117 tls_client_metadata_key: Option<OwnedValuePath>,
118 receive_buffer_bytes: Option<usize>,
119 max_connection_duration_secs: Option<u64>,
120 cx: SourceContext,
121 acknowledgements: SourceAcknowledgementsConfig,
122 max_connections: Option<u32>,
123 allowlist: Option<Vec<IpNet>>,
124 source_name: &'static str,
125 log_namespace: LogNamespace,
126 ) -> crate::Result<crate::sources::Source> {
127 let acknowledgements = cx.do_acknowledgements(acknowledgements);
128
129 Ok(Box::pin(async move {
130 let listenfd = ListenFd::from_env();
131 let listener = try_bind_tcp_listener(addr, listenfd, &tls, allowlist)
132 .await
133 .map_err(|error| {
134 emit!(SocketBindError {
135 mode: SocketMode::Tcp,
136 error: &error,
137 })
138 })?;
139
140 info!(
141 message = "Listening.",
142 addr = %listener
143 .local_addr()
144 .map(SocketListenAddr::SocketAddr)
145 .unwrap_or(addr)
146 );
147
148 let tripwire = cx.shutdown.clone();
149 let tripwire = async move {
150 _ = tripwire.await;
151 sleep(shutdown_timeout_secs).await;
152 }
153 .shared();
154
155 let connection_gauge = OpenGauge::new();
156 let shutdown_clone = cx.shutdown.clone();
157
158 let request_limiter =
159 RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, crate::num_threads());
160
161 listener
162 .accept_stream_limited(max_connections)
163 .take_until(shutdown_clone)
164 .for_each(move |(connection, tcp_connection_permit)| {
165 let shutdown_signal = cx.shutdown.clone();
166 let tripwire = tripwire.clone();
167 let source = self.clone();
168 let out = cx.out.clone();
169 let connection_gauge = connection_gauge.clone();
170 let request_limiter = request_limiter.clone();
171 let tls_client_metadata_key = tls_client_metadata_key.clone();
172
173 async move {
174 let socket = match connection {
175 Ok(socket) => socket,
176 Err(error) => {
177 emit!(SocketReceiveError {
178 mode: SocketMode::Tcp,
179 error: &error
180 });
181 return;
182 }
183 };
184
185 let peer_addr = socket.peer_addr();
186 let span = info_span!("connection", %peer_addr);
187
188 let tripwire = tripwire
189 .map(move |_| {
190 info!(
191 message = "Resetting connection (still open after seconds).",
192 seconds = ?shutdown_timeout_secs
193 );
194 })
195 .boxed();
196
197 span.clone().in_scope(|| {
198 debug!(message = "Accepted a new connection.", peer_addr = %peer_addr);
199
200 let open_token =
201 connection_gauge.open(|count| emit!(ConnectionOpen { count }));
202
203 let fut = handle_stream(
204 shutdown_signal,
205 socket,
206 keepalive,
207 receive_buffer_bytes,
208 max_connection_duration_secs,
209 source,
210 tripwire,
211 peer_addr,
212 out,
213 acknowledgements,
214 request_limiter,
215 tls_client_metadata_key.clone(),
216 source_name,
217 log_namespace,
218 );
219
220 tokio::spawn(
221 fut.map(move |()| {
222 drop(open_token);
223 drop(tcp_connection_permit);
224 })
225 .instrument(span.or_current()),
226 );
227 });
228 }
229 })
230 .map(Ok)
231 .await
232 }))
233 }
234}
235
236#[allow(clippy::too_many_arguments)]
237async fn handle_stream<T>(
238 mut shutdown_signal: ShutdownSignal,
239 mut socket: MaybeTlsIncomingStream<TcpStream>,
240 keepalive: Option<TcpKeepaliveConfig>,
241 receive_buffer_bytes: Option<usize>,
242 max_connection_duration_secs: Option<u64>,
243 source: T,
244 mut tripwire: BoxFuture<'static, ()>,
245 peer_addr: SocketAddr,
246 mut out: SourceSender,
247 acknowledgements: bool,
248 request_limiter: RequestLimiter,
249 tls_client_metadata_key: Option<OwnedValuePath>,
250 source_name: &'static str,
251 log_namespace: LogNamespace,
252) where
253 <<T as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
254 T: TcpSource,
255{
256 tokio::select! {
257 result = socket.handshake() => {
258 if let Err(error) = result {
259 emit!(TcpSocketTlsConnectionError { error });
260 return;
261 }
262 },
263 _ = &mut shutdown_signal => {
264 return;
265 }
266 };
267
268 if let Some(keepalive) = keepalive
269 && let Err(error) = socket.set_keepalive(keepalive)
270 {
271 warn!(message = "Failed configuring TCP keepalive.", %error);
272 }
273
274 if let Some(receive_buffer_bytes) = receive_buffer_bytes
275 && let Err(error) = socket.set_receive_buffer_bytes(receive_buffer_bytes)
276 {
277 warn!(message = "Failed configuring receive buffer size on TCP socket.", %error);
278 }
279
280 let socket = socket.after_read(move |byte_size| {
281 emit!(TcpBytesReceived {
282 byte_size,
283 peer_addr
284 });
285 });
286
287 let certificate_metadata = socket
288 .get_ref()
289 .ssl_stream()
290 .and_then(|stream| stream.ssl().peer_certificate())
291 .map(CertificateMetadata::from);
292
293 let reader = LenientFramedRead::new(socket, source.decoder());
294
295 let mut reader = ReadyFrames::new(reader);
296
297 let connection_close_timeout = OptionFuture::from(
298 max_connection_duration_secs
299 .map(|timeout_secs| tokio::time::sleep(Duration::from_secs(timeout_secs))),
300 );
301
302 tokio::pin!(connection_close_timeout);
303
304 loop {
305 let mut permit = tokio::select! {
306 _ = &mut tripwire => break,
307 Some(_) = &mut connection_close_timeout => {
308 if close_socket(reader.get_ref().get_ref().get_ref()) {
309 break;
310 }
311 None
312 },
313 _ = &mut shutdown_signal => {
314 if close_socket(reader.get_ref().get_ref().get_ref()) {
315 break;
316 }
317 None
318 },
319 permit = request_limiter.acquire() => {
320 Some(permit)
321 }
322 else => break,
323 };
324
325 let timeout = tokio::time::sleep(Duration::from_millis(10));
326 tokio::pin!(timeout);
327
328 tokio::select! {
329 _ = &mut tripwire => break,
330 _ = &mut shutdown_signal => {
331 if close_socket(reader.get_ref().get_ref().get_ref()) {
332 break;
333 }
334 },
335 _ = &mut timeout => {
336 continue;
339 }
340 res = reader.next() => {
341 match res {
342 Some(Ok((frames, _byte_size))) => {
343 let _num_frames = frames.len();
344 let acker = source.build_acker(&frames);
345 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
346
347 let mut events = frames.into_iter().flat_map(Into::into).collect::<Vec<Event>>();
348 let count = events.len();
349
350 emit!(SocketEventsReceived {
351 mode: SocketMode::Tcp,
352 byte_size: events.estimated_json_encoded_size_of(),
353 count,
354 });
355
356 if let Some(permit) = &mut permit {
357 permit.decoding_finished(events.len());
361 }
362
363 if let Some(batch) = batch {
364 for event in &mut events {
365 event.add_batch_notifier(batch.clone());
366 }
367 }
368
369
370 if let Some(certificate_metadata) = &certificate_metadata {
371 let mut metadata = ObjectMap::new();
372 metadata.insert("subject".into(), certificate_metadata.subject().into());
373 for event in &mut events {
374 let log = event.as_mut_log();
375
376 log_namespace.insert_source_metadata(
377 source_name,
378 log,
379 tls_client_metadata_key.as_ref().map(LegacyKey::Overwrite),
380 path!("tls_client_metadata"),
381 metadata.clone()
382 );
383 }
384 }
385
386 source.handle_events(&mut events, peer_addr);
387 match out.send_batch(events).await {
388 Ok(_) => {
389 let ack = match receiver {
390 None => TcpSourceAck::Ack,
391 Some(receiver) =>
392 match receiver.await {
393 BatchStatus::Delivered => TcpSourceAck::Ack,
394 BatchStatus::Errored => {TcpSourceAck::Error},
395 BatchStatus::Rejected => {
396 TcpSourceAck::Reject
398 }
399 }
400 };
401 if let Some(ack_bytes) = acker.build_ack(ack){
402 let stream = reader.get_mut().get_mut();
403 if let Err(error) = stream.write_all(&ack_bytes).await {
404 emit!(TcpSendAckError{ error });
405 break;
406 }
407 }
408 if ack != TcpSourceAck::Ack {
409 break;
410 }
411 }
412 Err(_) => {
413 emit!(StreamClosedError { count });
414 break;
415 }
416 }
417 }
418 Some(Err(error)) => {
419 if !<<T as TcpSource>::Error as StreamDecodingError>::can_continue(&error) {
420 emit!(DecoderFramingError { error });
421 break;
422 }
423 }
424 None => {
425 debug!("Connection closed.");
426 break
427 },
428 }
429 }
430 else => break,
431 }
432
433 drop(permit);
434 }
435}
436
437fn close_socket(socket: &MaybeTlsIncomingStream<TcpStream>) -> bool {
438 debug!("Start graceful shutdown.");
439 if let Some(stream) = socket.get_ref() {
442 let socket = SockRef::from(stream);
443 if let Err(error) = socket.shutdown(std::net::Shutdown::Write) {
444 warn!(message = "Failed in signalling to the other side to close the TCP channel.", %error);
445 }
446 false
447 } else {
448 debug!("Closing connection that hasn't yet been fully established.");
450 true
451 }
452}