vector/sources/util/net/tcp/
mod.rs1pub mod request_limiter;
2
3use std::{io, mem::drop, net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use futures::{future::BoxFuture, FutureExt, StreamExt};
7use futures_util::future::OptionFuture;
8use ipnet::IpNet;
9use listenfd::ListenFd;
10use smallvec::SmallVec;
11use socket2::SockRef;
12use tokio::{
13 io::AsyncWriteExt,
14 net::{TcpListener, TcpStream},
15 time::sleep,
16};
17use tokio_util::codec::{Decoder, FramedRead};
18use tracing::Instrument;
19use vector_lib::codecs::StreamDecodingError;
20use vector_lib::finalization::AddBatchNotifier;
21use vector_lib::lookup::{path, OwnedValuePath};
22use vector_lib::{
23 config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig},
24 EstimatedJsonEncodedSizeOf,
25};
26use vrl::value::ObjectMap;
27
28use self::request_limiter::RequestLimiter;
29use super::SocketListenAddr;
30use crate::{
31 codecs::ReadyFrames,
32 config::SourceContext,
33 event::{BatchNotifier, BatchStatus, Event},
34 internal_events::{
35 ConnectionOpen, DecoderFramingError, OpenGauge, SocketBindError, SocketEventsReceived,
36 SocketMode, SocketReceiveError, StreamClosedError, TcpBytesReceived, TcpSendAckError,
37 TcpSocketTlsConnectionError,
38 },
39 shutdown::ShutdownSignal,
40 sources::util::AfterReadExt,
41 tcp::TcpKeepaliveConfig,
42 tls::{CertificateMetadata, MaybeTlsIncomingStream, MaybeTlsListener, MaybeTlsSettings},
43 SourceSender,
44};
45
46pub const MAX_IN_FLIGHT_EVENTS_TARGET: usize = 100_000;
47
48pub async fn try_bind_tcp_listener(
49 addr: SocketListenAddr,
50 mut listenfd: ListenFd,
51 tls: &MaybeTlsSettings,
52 allowlist: Option<Vec<IpNet>>,
53) -> crate::Result<MaybeTlsListener> {
54 match addr {
55 SocketListenAddr::SocketAddr(addr) => tls.bind(&addr).await.map_err(Into::into),
56 SocketListenAddr::SystemdFd(offset) => match listenfd.take_tcp_listener(offset)? {
57 Some(listener) => TcpListener::from_std(listener)
58 .map(Into::into)
59 .map_err(Into::into),
60 None => {
61 Err(io::Error::new(io::ErrorKind::AddrInUse, "systemd fd already consumed").into())
62 }
63 },
64 }
65 .map(|listener| listener.with_allowlist(allowlist))
66}
67
68#[derive(Clone, Copy, Eq, PartialEq)]
69pub enum TcpSourceAck {
70 Ack,
71 Error,
72 Reject,
73}
74
75pub trait TcpSourceAcker {
76 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes>;
77}
78
79pub struct TcpNullAcker;
80
81impl TcpSourceAcker for TcpNullAcker {
82 fn build_ack(self, _ack: TcpSourceAck) -> Option<Bytes> {
85 None
86 }
87}
88
89pub trait TcpSource: Clone + Send + Sync + 'static
90where
91 <<Self as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
92{
93 type Error: From<io::Error>
96 + StreamDecodingError
97 + std::fmt::Debug
98 + std::fmt::Display
99 + Send
100 + Unpin;
101 type Item: Into<SmallVec<[Event; 1]>> + Send + Unpin;
102 type Decoder: Decoder<Item = (Self::Item, usize), Error = Self::Error> + Send + 'static;
103 type Acker: TcpSourceAcker + Send;
104
105 fn decoder(&self) -> Self::Decoder;
106
107 fn handle_events(&self, _events: &mut [Event], _host: std::net::SocketAddr) {}
108
109 fn build_acker(&self, item: &[Self::Item]) -> Self::Acker;
110
111 #[allow(clippy::too_many_arguments)]
112 fn run(
113 self,
114 addr: SocketListenAddr,
115 keepalive: Option<TcpKeepaliveConfig>,
116 shutdown_timeout_secs: Duration,
117 tls: MaybeTlsSettings,
118 tls_client_metadata_key: Option<OwnedValuePath>,
119 receive_buffer_bytes: Option<usize>,
120 max_connection_duration_secs: Option<u64>,
121 cx: SourceContext,
122 acknowledgements: SourceAcknowledgementsConfig,
123 max_connections: Option<u32>,
124 allowlist: Option<Vec<IpNet>>,
125 source_name: &'static str,
126 log_namespace: LogNamespace,
127 ) -> crate::Result<crate::sources::Source> {
128 let acknowledgements = cx.do_acknowledgements(acknowledgements);
129
130 Ok(Box::pin(async move {
131 let listenfd = ListenFd::from_env();
132 let listener = try_bind_tcp_listener(addr, listenfd, &tls, allowlist)
133 .await
134 .map_err(|error| {
135 emit!(SocketBindError {
136 mode: SocketMode::Tcp,
137 error: &error,
138 })
139 })?;
140
141 info!(
142 message = "Listening.",
143 addr = %listener
144 .local_addr()
145 .map(SocketListenAddr::SocketAddr)
146 .unwrap_or(addr)
147 );
148
149 let tripwire = cx.shutdown.clone();
150 let tripwire = async move {
151 _ = tripwire.await;
152 sleep(shutdown_timeout_secs).await;
153 }
154 .shared();
155
156 let connection_gauge = OpenGauge::new();
157 let shutdown_clone = cx.shutdown.clone();
158
159 let request_limiter =
160 RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, crate::num_threads());
161
162 listener
163 .accept_stream_limited(max_connections)
164 .take_until(shutdown_clone)
165 .for_each(move |(connection, tcp_connection_permit)| {
166 let shutdown_signal = cx.shutdown.clone();
167 let tripwire = tripwire.clone();
168 let source = self.clone();
169 let out = cx.out.clone();
170 let connection_gauge = connection_gauge.clone();
171 let request_limiter = request_limiter.clone();
172 let tls_client_metadata_key = tls_client_metadata_key.clone();
173
174 async move {
175 let socket = match connection {
176 Ok(socket) => socket,
177 Err(error) => {
178 emit!(SocketReceiveError {
179 mode: SocketMode::Tcp,
180 error: &error
181 });
182 return;
183 }
184 };
185
186 let peer_addr = socket.peer_addr();
187 let span = info_span!("connection", %peer_addr);
188
189 let tripwire = tripwire
190 .map(move |_| {
191 info!(
192 message = "Resetting connection (still open after seconds).",
193 seconds = ?shutdown_timeout_secs
194 );
195 })
196 .boxed();
197
198 span.clone().in_scope(|| {
199 debug!(message = "Accepted a new connection.", peer_addr = %peer_addr);
200
201 let open_token =
202 connection_gauge.open(|count| emit!(ConnectionOpen { count }));
203
204 let fut = handle_stream(
205 shutdown_signal,
206 socket,
207 keepalive,
208 receive_buffer_bytes,
209 max_connection_duration_secs,
210 source,
211 tripwire,
212 peer_addr,
213 out,
214 acknowledgements,
215 request_limiter,
216 tls_client_metadata_key.clone(),
217 source_name,
218 log_namespace,
219 );
220
221 tokio::spawn(
222 fut.map(move |()| {
223 drop(open_token);
224 drop(tcp_connection_permit);
225 })
226 .instrument(span.or_current()),
227 );
228 });
229 }
230 })
231 .map(Ok)
232 .await
233 }))
234 }
235}
236
237#[allow(clippy::too_many_arguments)]
238async fn handle_stream<T>(
239 mut shutdown_signal: ShutdownSignal,
240 mut socket: MaybeTlsIncomingStream<TcpStream>,
241 keepalive: Option<TcpKeepaliveConfig>,
242 receive_buffer_bytes: Option<usize>,
243 max_connection_duration_secs: Option<u64>,
244 source: T,
245 mut tripwire: BoxFuture<'static, ()>,
246 peer_addr: SocketAddr,
247 mut out: SourceSender,
248 acknowledgements: bool,
249 request_limiter: RequestLimiter,
250 tls_client_metadata_key: Option<OwnedValuePath>,
251 source_name: &'static str,
252 log_namespace: LogNamespace,
253) where
254 <<T as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
255 T: TcpSource,
256{
257 tokio::select! {
258 result = socket.handshake() => {
259 if let Err(error) = result {
260 emit!(TcpSocketTlsConnectionError { error });
261 return;
262 }
263 },
264 _ = &mut shutdown_signal => {
265 return;
266 }
267 };
268
269 if let Some(keepalive) = keepalive {
270 if let Err(error) = socket.set_keepalive(keepalive) {
271 warn!(message = "Failed configuring TCP keepalive.", %error);
272 }
273 }
274
275 if let Some(receive_buffer_bytes) = receive_buffer_bytes {
276 if let Err(error) = socket.set_receive_buffer_bytes(receive_buffer_bytes) {
277 warn!(message = "Failed configuring receive buffer size on TCP socket.", %error);
278 }
279 }
280
281 let socket = socket.after_read(move |byte_size| {
282 emit!(TcpBytesReceived {
283 byte_size,
284 peer_addr
285 });
286 });
287
288 let certificate_metadata = socket
289 .get_ref()
290 .ssl_stream()
291 .and_then(|stream| stream.ssl().peer_certificate())
292 .map(CertificateMetadata::from);
293
294 let reader = FramedRead::new(socket, source.decoder());
295 let mut reader = ReadyFrames::new(reader);
296
297 let connection_close_timeout = OptionFuture::from(
298 max_connection_duration_secs
299 .map(|timeout_secs| tokio::time::sleep(Duration::from_secs(timeout_secs))),
300 );
301
302 tokio::pin!(connection_close_timeout);
303
304 loop {
305 let mut permit = tokio::select! {
306 _ = &mut tripwire => break,
307 Some(_) = &mut connection_close_timeout => {
308 if close_socket(reader.get_ref().get_ref().get_ref()) {
309 break;
310 }
311 None
312 },
313 _ = &mut shutdown_signal => {
314 if close_socket(reader.get_ref().get_ref().get_ref()) {
315 break;
316 }
317 None
318 },
319 permit = request_limiter.acquire() => {
320 Some(permit)
321 }
322 else => break,
323 };
324
325 let timeout = tokio::time::sleep(Duration::from_millis(10));
326 tokio::pin!(timeout);
327
328 tokio::select! {
329 _ = &mut tripwire => break,
330 _ = &mut shutdown_signal => {
331 if close_socket(reader.get_ref().get_ref().get_ref()) {
332 break;
333 }
334 },
335 _ = &mut timeout => {
336 continue;
339 }
340 res = reader.next() => {
341 match res {
342 Some(Ok((frames, _byte_size))) => {
343 let _num_frames = frames.len();
344 let acker = source.build_acker(&frames);
345 let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
346
347 let mut events = frames.into_iter().flat_map(Into::into).collect::<Vec<Event>>();
348 let count = events.len();
349
350 emit!(SocketEventsReceived {
351 mode: SocketMode::Tcp,
352 byte_size: events.estimated_json_encoded_size_of(),
353 count,
354 });
355
356 if let Some(permit) = &mut permit {
357 permit.decoding_finished(events.len());
361 }
362
363 if let Some(batch) = batch {
364 for event in &mut events {
365 event.add_batch_notifier(batch.clone());
366 }
367 }
368
369
370 if let Some(certificate_metadata) = &certificate_metadata {
371 let mut metadata = ObjectMap::new();
372 metadata.insert("subject".into(), certificate_metadata.subject().into());
373 for event in &mut events {
374 let log = event.as_mut_log();
375
376 log_namespace.insert_source_metadata(
377 source_name,
378 log,
379 tls_client_metadata_key.as_ref().map(LegacyKey::Overwrite),
380 path!("tls_client_metadata"),
381 metadata.clone()
382 );
383 }
384 }
385
386 source.handle_events(&mut events, peer_addr);
387 match out.send_batch(events).await {
388 Ok(_) => {
389 let ack = match receiver {
390 None => TcpSourceAck::Ack,
391 Some(receiver) =>
392 match receiver.await {
393 BatchStatus::Delivered => TcpSourceAck::Ack,
394 BatchStatus::Errored => {TcpSourceAck::Error},
395 BatchStatus::Rejected => {
396 TcpSourceAck::Reject
398 }
399 }
400 };
401 if let Some(ack_bytes) = acker.build_ack(ack){
402 let stream = reader.get_mut().get_mut();
403 if let Err(error) = stream.write_all(&ack_bytes).await {
404 emit!(TcpSendAckError{ error });
405 break;
406 }
407 }
408 if ack != TcpSourceAck::Ack {
409 break;
410 }
411 }
412 Err(_) => {
413 emit!(StreamClosedError { count });
414 break;
415 }
416 }
417 }
418 Some(Err(error)) => {
419 if !<<T as TcpSource>::Error as StreamDecodingError>::can_continue(&error) {
420 emit!(DecoderFramingError { error });
421 break;
422 }
423 }
424 None => {
425 debug!("Connection closed.");
426 break
427 },
428 }
429 }
430 else => break,
431 }
432
433 drop(permit);
434 }
435}
436
437fn close_socket(socket: &MaybeTlsIncomingStream<TcpStream>) -> bool {
438 debug!("Start graceful shutdown.");
439 if let Some(stream) = socket.get_ref() {
442 let socket = SockRef::from(stream);
443 if let Err(error) = socket.shutdown(std::net::Shutdown::Write) {
444 warn!(message = "Failed in signalling to the other side to close the TCP channel.", %error);
445 }
446 false
447 } else {
448 debug!("Closing connection that hasn't yet been fully established.");
450 true
451 }
452}