vector/sources/util/net/tcp/
mod.rs

1pub mod request_limiter;
2
3use std::{io, mem::drop, net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use futures::{FutureExt, StreamExt, future::BoxFuture};
7use futures_util::future::OptionFuture;
8use ipnet::IpNet;
9use listenfd::ListenFd;
10use smallvec::SmallVec;
11use socket2::SockRef;
12use tokio::{
13    io::AsyncWriteExt,
14    net::{TcpListener, TcpStream},
15    time::sleep,
16};
17use tokio_util::codec::Decoder;
18use tracing::Instrument;
19use vector_lib::{
20    EstimatedJsonEncodedSizeOf,
21    codecs::{ReadyFrames, StreamDecodingError, internal_events::DecoderFramingError},
22    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig},
23    event::{BatchNotifier, BatchStatus, Event},
24    finalization::AddBatchNotifier,
25    lookup::{OwnedValuePath, path},
26    shutdown::ShutdownSignal,
27    source_sender::SourceSender,
28    tcp::TcpKeepaliveConfig,
29    tls::{CertificateMetadata, MaybeTlsIncomingStream, MaybeTlsListener, MaybeTlsSettings},
30};
31use vrl::value::ObjectMap;
32
33use self::request_limiter::RequestLimiter;
34use super::SocketListenAddr;
35use crate::{
36    config::SourceContext,
37    internal_events::{
38        ConnectionOpen, OpenGauge, SocketBindError, SocketEventsReceived, SocketMode,
39        SocketReceiveError, StreamClosedError, TcpBytesReceived, TcpSendAckError,
40        TcpSocketTlsConnectionError,
41    },
42    sources::util::{AfterReadExt, LenientFramedRead},
43};
44
45pub const MAX_IN_FLIGHT_EVENTS_TARGET: usize = 100_000;
46
47pub async fn try_bind_tcp_listener(
48    addr: SocketListenAddr,
49    mut listenfd: ListenFd,
50    tls: &MaybeTlsSettings,
51    allowlist: Option<Vec<IpNet>>,
52) -> crate::Result<MaybeTlsListener> {
53    match addr {
54        SocketListenAddr::SocketAddr(addr) => tls.bind(&addr).await.map_err(Into::into),
55        SocketListenAddr::SystemdFd(offset) => match listenfd.take_tcp_listener(offset)? {
56            Some(listener) => TcpListener::from_std(listener)
57                .map(Into::into)
58                .map_err(Into::into),
59            None => {
60                Err(io::Error::new(io::ErrorKind::AddrInUse, "systemd fd already consumed").into())
61            }
62        },
63    }
64    .map(|listener| listener.with_allowlist(allowlist))
65}
66
67#[derive(Clone, Copy, Eq, PartialEq)]
68pub enum TcpSourceAck {
69    Ack,
70    Error,
71    Reject,
72}
73
74pub trait TcpSourceAcker {
75    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes>;
76}
77
78pub struct TcpNullAcker;
79
80impl TcpSourceAcker for TcpNullAcker {
81    // This function builds an acknowledgement from the source data in
82    // the acker and the given acknowledgement code.
83    fn build_ack(self, _ack: TcpSourceAck) -> Option<Bytes> {
84        None
85    }
86}
87
88pub trait TcpSource: Clone + Send + Sync + 'static
89where
90    <<Self as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
91{
92    // Should be default: `std::io::Error`.
93    // Right now this is unstable: https://github.com/rust-lang/rust/issues/29661
94    type Error: From<io::Error>
95        + StreamDecodingError
96        + std::fmt::Debug
97        + std::fmt::Display
98        + Send
99        + Unpin;
100    type Item: Into<SmallVec<[Event; 1]>> + Send + Unpin;
101    type Decoder: Decoder<Item = (Self::Item, usize), Error = Self::Error> + Send + 'static;
102    type Acker: TcpSourceAcker + Send;
103
104    fn decoder(&self) -> Self::Decoder;
105
106    fn handle_events(&self, _events: &mut [Event], _host: std::net::SocketAddr) {}
107
108    fn build_acker(&self, item: &[Self::Item]) -> Self::Acker;
109
110    #[allow(clippy::too_many_arguments)]
111    fn run(
112        self,
113        addr: SocketListenAddr,
114        keepalive: Option<TcpKeepaliveConfig>,
115        shutdown_timeout_secs: Duration,
116        tls: MaybeTlsSettings,
117        tls_client_metadata_key: Option<OwnedValuePath>,
118        receive_buffer_bytes: Option<usize>,
119        max_connection_duration_secs: Option<u64>,
120        cx: SourceContext,
121        acknowledgements: SourceAcknowledgementsConfig,
122        max_connections: Option<u32>,
123        allowlist: Option<Vec<IpNet>>,
124        source_name: &'static str,
125        log_namespace: LogNamespace,
126    ) -> crate::Result<crate::sources::Source> {
127        let acknowledgements = cx.do_acknowledgements(acknowledgements);
128
129        Ok(Box::pin(async move {
130            let listenfd = ListenFd::from_env();
131            let listener = try_bind_tcp_listener(addr, listenfd, &tls, allowlist)
132                .await
133                .map_err(|error| {
134                    emit!(SocketBindError {
135                        mode: SocketMode::Tcp,
136                        error: &error,
137                    })
138                })?;
139
140            info!(
141                message = "Listening.",
142                addr = %listener
143                    .local_addr()
144                    .map(SocketListenAddr::SocketAddr)
145                    .unwrap_or(addr)
146            );
147
148            let tripwire = cx.shutdown.clone();
149            let tripwire = async move {
150                _ = tripwire.await;
151                sleep(shutdown_timeout_secs).await;
152            }
153            .shared();
154
155            let connection_gauge = OpenGauge::new();
156            let shutdown_clone = cx.shutdown.clone();
157
158            let request_limiter =
159                RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, crate::num_threads());
160
161            listener
162                .accept_stream_limited(max_connections)
163                .take_until(shutdown_clone)
164                .for_each(move |(connection, tcp_connection_permit)| {
165                    let shutdown_signal = cx.shutdown.clone();
166                    let tripwire = tripwire.clone();
167                    let source = self.clone();
168                    let out = cx.out.clone();
169                    let connection_gauge = connection_gauge.clone();
170                    let request_limiter = request_limiter.clone();
171                    let tls_client_metadata_key = tls_client_metadata_key.clone();
172
173                    async move {
174                        let socket = match connection {
175                            Ok(socket) => socket,
176                            Err(error) => {
177                                emit!(SocketReceiveError {
178                                    mode: SocketMode::Tcp,
179                                    error: &error
180                                });
181                                return;
182                            }
183                        };
184
185                        let peer_addr = socket.peer_addr();
186                        let span = info_span!("connection", %peer_addr);
187
188                        let tripwire = tripwire
189                            .map(move |_| {
190                                info!(
191                                    message = "Resetting connection (still open after seconds).",
192                                    seconds = ?shutdown_timeout_secs
193                                );
194                            })
195                            .boxed();
196
197                        span.clone().in_scope(|| {
198                            debug!(message = "Accepted a new connection.", peer_addr = %peer_addr);
199
200                            let open_token =
201                                connection_gauge.open(|count| emit!(ConnectionOpen { count }));
202
203                            let fut = handle_stream(
204                                shutdown_signal,
205                                socket,
206                                keepalive,
207                                receive_buffer_bytes,
208                                max_connection_duration_secs,
209                                source,
210                                tripwire,
211                                peer_addr,
212                                out,
213                                acknowledgements,
214                                request_limiter,
215                                tls_client_metadata_key.clone(),
216                                source_name,
217                                log_namespace,
218                            );
219
220                            tokio::spawn(
221                                fut.map(move |()| {
222                                    drop(open_token);
223                                    drop(tcp_connection_permit);
224                                })
225                                .instrument(span.or_current()),
226                            );
227                        });
228                    }
229                })
230                .map(Ok)
231                .await
232        }))
233    }
234}
235
236#[allow(clippy::too_many_arguments)]
237async fn handle_stream<T>(
238    mut shutdown_signal: ShutdownSignal,
239    mut socket: MaybeTlsIncomingStream<TcpStream>,
240    keepalive: Option<TcpKeepaliveConfig>,
241    receive_buffer_bytes: Option<usize>,
242    max_connection_duration_secs: Option<u64>,
243    source: T,
244    mut tripwire: BoxFuture<'static, ()>,
245    peer_addr: SocketAddr,
246    mut out: SourceSender,
247    acknowledgements: bool,
248    request_limiter: RequestLimiter,
249    tls_client_metadata_key: Option<OwnedValuePath>,
250    source_name: &'static str,
251    log_namespace: LogNamespace,
252) where
253    <<T as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
254    T: TcpSource,
255{
256    tokio::select! {
257        result = socket.handshake() => {
258            if let Err(error) = result {
259                emit!(TcpSocketTlsConnectionError { error });
260                return;
261            }
262        },
263        _ = &mut shutdown_signal => {
264            return;
265        }
266    };
267
268    if let Some(keepalive) = keepalive
269        && let Err(error) = socket.set_keepalive(keepalive)
270    {
271        warn!(message = "Failed configuring TCP keepalive.", %error);
272    }
273
274    if let Some(receive_buffer_bytes) = receive_buffer_bytes
275        && let Err(error) = socket.set_receive_buffer_bytes(receive_buffer_bytes)
276    {
277        warn!(message = "Failed configuring receive buffer size on TCP socket.", %error);
278    }
279
280    let socket = socket.after_read(move |byte_size| {
281        emit!(TcpBytesReceived {
282            byte_size,
283            peer_addr
284        });
285    });
286
287    let certificate_metadata = socket
288        .get_ref()
289        .ssl_stream()
290        .and_then(|stream| stream.ssl().peer_certificate())
291        .map(CertificateMetadata::from);
292
293    let reader = LenientFramedRead::new(socket, source.decoder());
294
295    let mut reader = ReadyFrames::new(reader);
296
297    let connection_close_timeout = OptionFuture::from(
298        max_connection_duration_secs
299            .map(|timeout_secs| tokio::time::sleep(Duration::from_secs(timeout_secs))),
300    );
301
302    tokio::pin!(connection_close_timeout);
303
304    loop {
305        let mut permit = tokio::select! {
306            _ = &mut tripwire => break,
307            Some(_) = &mut connection_close_timeout  => {
308                if close_socket(reader.get_ref().get_ref().get_ref()) {
309                    break;
310                }
311                None
312            },
313            _ = &mut shutdown_signal => {
314                if close_socket(reader.get_ref().get_ref().get_ref()) {
315                    break;
316                }
317                None
318            },
319            permit = request_limiter.acquire() => {
320                Some(permit)
321            }
322            else => break,
323        };
324
325        let timeout = tokio::time::sleep(Duration::from_millis(10));
326        tokio::pin!(timeout);
327
328        tokio::select! {
329            _ = &mut tripwire => break,
330            _ = &mut shutdown_signal => {
331                if close_socket(reader.get_ref().get_ref().get_ref()) {
332                    break;
333                }
334            },
335            _ = &mut timeout => {
336                // This connection is currently holding a permit, but has not received data for some time. Release
337                // the permit to let another connection try
338                continue;
339            }
340            res = reader.next() => {
341                match res {
342                    Some(Ok((frames, _byte_size))) => {
343                        let _num_frames = frames.len();
344                        let acker = source.build_acker(&frames);
345                        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
346
347                        let mut events = frames.into_iter().flat_map(Into::into).collect::<Vec<Event>>();
348                        let count = events.len();
349
350                        emit!(SocketEventsReceived {
351                            mode: SocketMode::Tcp,
352                            byte_size: events.estimated_json_encoded_size_of(),
353                            count,
354                        });
355
356                        if let Some(permit) = &mut permit {
357                            // Note that this is intentionally not the "number of events in a single request", but rather
358                            // the "number of events currently available". This may contain events from multiple events,
359                            // but it should always contain all events from each request.
360                            permit.decoding_finished(events.len());
361                        }
362
363                        if let Some(batch) = batch {
364                            for event in &mut events {
365                                event.add_batch_notifier(batch.clone());
366                            }
367                        }
368
369
370                        if let Some(certificate_metadata) = &certificate_metadata {
371                            let mut metadata = ObjectMap::new();
372                            metadata.insert("subject".into(), certificate_metadata.subject().into());
373                            for event in &mut events {
374                                let log = event.as_mut_log();
375
376                                log_namespace.insert_source_metadata(
377                                    source_name,
378                                    log,
379                                    tls_client_metadata_key.as_ref().map(LegacyKey::Overwrite),
380                                    path!("tls_client_metadata"),
381                                    metadata.clone()
382                                );
383                            }
384                        }
385
386                        source.handle_events(&mut events, peer_addr);
387                        match out.send_batch(events).await {
388                            Ok(_) => {
389                                let ack = match receiver {
390                                    None => TcpSourceAck::Ack,
391                                    Some(receiver) =>
392                                        match receiver.await {
393                                            BatchStatus::Delivered => TcpSourceAck::Ack,
394                                            BatchStatus::Errored => {TcpSourceAck::Error},
395                                            BatchStatus::Rejected => {
396                                                // Sinks are responsible for emitting ComponentEventsDropped.
397                                                TcpSourceAck::Reject
398                                            }
399                                        }
400                                };
401                                if let Some(ack_bytes) = acker.build_ack(ack){
402                                    let stream = reader.get_mut().get_mut();
403                                    if let Err(error) = stream.write_all(&ack_bytes).await {
404                                        emit!(TcpSendAckError{ error });
405                                        break;
406                                    }
407                                }
408                                if ack != TcpSourceAck::Ack {
409                                    break;
410                                }
411                            }
412                            Err(_) => {
413                                emit!(StreamClosedError { count });
414                                break;
415                            }
416                        }
417                    }
418                    Some(Err(error)) => {
419                        if !<<T as TcpSource>::Error as StreamDecodingError>::can_continue(&error) {
420                            emit!(DecoderFramingError { error });
421                            break;
422                        }
423                    }
424                    None => {
425                        debug!("Connection closed.");
426                        break
427                    },
428                }
429            }
430            else => break,
431        }
432
433        drop(permit);
434    }
435}
436
437fn close_socket(socket: &MaybeTlsIncomingStream<TcpStream>) -> bool {
438    debug!("Start graceful shutdown.");
439    // Close our write part of TCP socket to signal the other side
440    // that it should stop writing and close the channel.
441    if let Some(stream) = socket.get_ref() {
442        let socket = SockRef::from(stream);
443        if let Err(error) = socket.shutdown(std::net::Shutdown::Write) {
444            warn!(message = "Failed in signalling to the other side to close the TCP channel.", %error);
445        }
446        false
447    } else {
448        // Connection hasn't yet been established so we are done here.
449        debug!("Closing connection that hasn't yet been fully established.");
450        true
451    }
452}