vector/sources/util/net/tcp/
mod.rs

1pub mod request_limiter;
2
3use std::{io, mem::drop, net::SocketAddr, time::Duration};
4
5use bytes::Bytes;
6use futures::{future::BoxFuture, FutureExt, StreamExt};
7use futures_util::future::OptionFuture;
8use ipnet::IpNet;
9use listenfd::ListenFd;
10use smallvec::SmallVec;
11use socket2::SockRef;
12use tokio::{
13    io::AsyncWriteExt,
14    net::{TcpListener, TcpStream},
15    time::sleep,
16};
17use tokio_util::codec::{Decoder, FramedRead};
18use tracing::Instrument;
19use vector_lib::codecs::StreamDecodingError;
20use vector_lib::finalization::AddBatchNotifier;
21use vector_lib::lookup::{path, OwnedValuePath};
22use vector_lib::{
23    config::{LegacyKey, LogNamespace, SourceAcknowledgementsConfig},
24    EstimatedJsonEncodedSizeOf,
25};
26use vrl::value::ObjectMap;
27
28use self::request_limiter::RequestLimiter;
29use super::SocketListenAddr;
30use crate::{
31    codecs::ReadyFrames,
32    config::SourceContext,
33    event::{BatchNotifier, BatchStatus, Event},
34    internal_events::{
35        ConnectionOpen, DecoderFramingError, OpenGauge, SocketBindError, SocketEventsReceived,
36        SocketMode, SocketReceiveError, StreamClosedError, TcpBytesReceived, TcpSendAckError,
37        TcpSocketTlsConnectionError,
38    },
39    shutdown::ShutdownSignal,
40    sources::util::AfterReadExt,
41    tcp::TcpKeepaliveConfig,
42    tls::{CertificateMetadata, MaybeTlsIncomingStream, MaybeTlsListener, MaybeTlsSettings},
43    SourceSender,
44};
45
46pub const MAX_IN_FLIGHT_EVENTS_TARGET: usize = 100_000;
47
48pub async fn try_bind_tcp_listener(
49    addr: SocketListenAddr,
50    mut listenfd: ListenFd,
51    tls: &MaybeTlsSettings,
52    allowlist: Option<Vec<IpNet>>,
53) -> crate::Result<MaybeTlsListener> {
54    match addr {
55        SocketListenAddr::SocketAddr(addr) => tls.bind(&addr).await.map_err(Into::into),
56        SocketListenAddr::SystemdFd(offset) => match listenfd.take_tcp_listener(offset)? {
57            Some(listener) => TcpListener::from_std(listener)
58                .map(Into::into)
59                .map_err(Into::into),
60            None => {
61                Err(io::Error::new(io::ErrorKind::AddrInUse, "systemd fd already consumed").into())
62            }
63        },
64    }
65    .map(|listener| listener.with_allowlist(allowlist))
66}
67
68#[derive(Clone, Copy, Eq, PartialEq)]
69pub enum TcpSourceAck {
70    Ack,
71    Error,
72    Reject,
73}
74
75pub trait TcpSourceAcker {
76    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes>;
77}
78
79pub struct TcpNullAcker;
80
81impl TcpSourceAcker for TcpNullAcker {
82    // This function builds an acknowledgement from the source data in
83    // the acker and the given acknowledgement code.
84    fn build_ack(self, _ack: TcpSourceAck) -> Option<Bytes> {
85        None
86    }
87}
88
89pub trait TcpSource: Clone + Send + Sync + 'static
90where
91    <<Self as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
92{
93    // Should be default: `std::io::Error`.
94    // Right now this is unstable: https://github.com/rust-lang/rust/issues/29661
95    type Error: From<io::Error>
96        + StreamDecodingError
97        + std::fmt::Debug
98        + std::fmt::Display
99        + Send
100        + Unpin;
101    type Item: Into<SmallVec<[Event; 1]>> + Send + Unpin;
102    type Decoder: Decoder<Item = (Self::Item, usize), Error = Self::Error> + Send + 'static;
103    type Acker: TcpSourceAcker + Send;
104
105    fn decoder(&self) -> Self::Decoder;
106
107    fn handle_events(&self, _events: &mut [Event], _host: std::net::SocketAddr) {}
108
109    fn build_acker(&self, item: &[Self::Item]) -> Self::Acker;
110
111    #[allow(clippy::too_many_arguments)]
112    fn run(
113        self,
114        addr: SocketListenAddr,
115        keepalive: Option<TcpKeepaliveConfig>,
116        shutdown_timeout_secs: Duration,
117        tls: MaybeTlsSettings,
118        tls_client_metadata_key: Option<OwnedValuePath>,
119        receive_buffer_bytes: Option<usize>,
120        max_connection_duration_secs: Option<u64>,
121        cx: SourceContext,
122        acknowledgements: SourceAcknowledgementsConfig,
123        max_connections: Option<u32>,
124        allowlist: Option<Vec<IpNet>>,
125        source_name: &'static str,
126        log_namespace: LogNamespace,
127    ) -> crate::Result<crate::sources::Source> {
128        let acknowledgements = cx.do_acknowledgements(acknowledgements);
129
130        Ok(Box::pin(async move {
131            let listenfd = ListenFd::from_env();
132            let listener = try_bind_tcp_listener(addr, listenfd, &tls, allowlist)
133                .await
134                .map_err(|error| {
135                    emit!(SocketBindError {
136                        mode: SocketMode::Tcp,
137                        error: &error,
138                    })
139                })?;
140
141            info!(
142                message = "Listening.",
143                addr = %listener
144                    .local_addr()
145                    .map(SocketListenAddr::SocketAddr)
146                    .unwrap_or(addr)
147            );
148
149            let tripwire = cx.shutdown.clone();
150            let tripwire = async move {
151                _ = tripwire.await;
152                sleep(shutdown_timeout_secs).await;
153            }
154            .shared();
155
156            let connection_gauge = OpenGauge::new();
157            let shutdown_clone = cx.shutdown.clone();
158
159            let request_limiter =
160                RequestLimiter::new(MAX_IN_FLIGHT_EVENTS_TARGET, crate::num_threads());
161
162            listener
163                .accept_stream_limited(max_connections)
164                .take_until(shutdown_clone)
165                .for_each(move |(connection, tcp_connection_permit)| {
166                    let shutdown_signal = cx.shutdown.clone();
167                    let tripwire = tripwire.clone();
168                    let source = self.clone();
169                    let out = cx.out.clone();
170                    let connection_gauge = connection_gauge.clone();
171                    let request_limiter = request_limiter.clone();
172                    let tls_client_metadata_key = tls_client_metadata_key.clone();
173
174                    async move {
175                        let socket = match connection {
176                            Ok(socket) => socket,
177                            Err(error) => {
178                                emit!(SocketReceiveError {
179                                    mode: SocketMode::Tcp,
180                                    error: &error
181                                });
182                                return;
183                            }
184                        };
185
186                        let peer_addr = socket.peer_addr();
187                        let span = info_span!("connection", %peer_addr);
188
189                        let tripwire = tripwire
190                            .map(move |_| {
191                                info!(
192                                    message = "Resetting connection (still open after seconds).",
193                                    seconds = ?shutdown_timeout_secs
194                                );
195                            })
196                            .boxed();
197
198                        span.clone().in_scope(|| {
199                            debug!(message = "Accepted a new connection.", peer_addr = %peer_addr);
200
201                            let open_token =
202                                connection_gauge.open(|count| emit!(ConnectionOpen { count }));
203
204                            let fut = handle_stream(
205                                shutdown_signal,
206                                socket,
207                                keepalive,
208                                receive_buffer_bytes,
209                                max_connection_duration_secs,
210                                source,
211                                tripwire,
212                                peer_addr,
213                                out,
214                                acknowledgements,
215                                request_limiter,
216                                tls_client_metadata_key.clone(),
217                                source_name,
218                                log_namespace,
219                            );
220
221                            tokio::spawn(
222                                fut.map(move |()| {
223                                    drop(open_token);
224                                    drop(tcp_connection_permit);
225                                })
226                                .instrument(span.or_current()),
227                            );
228                        });
229                    }
230                })
231                .map(Ok)
232                .await
233        }))
234    }
235}
236
237#[allow(clippy::too_many_arguments)]
238async fn handle_stream<T>(
239    mut shutdown_signal: ShutdownSignal,
240    mut socket: MaybeTlsIncomingStream<TcpStream>,
241    keepalive: Option<TcpKeepaliveConfig>,
242    receive_buffer_bytes: Option<usize>,
243    max_connection_duration_secs: Option<u64>,
244    source: T,
245    mut tripwire: BoxFuture<'static, ()>,
246    peer_addr: SocketAddr,
247    mut out: SourceSender,
248    acknowledgements: bool,
249    request_limiter: RequestLimiter,
250    tls_client_metadata_key: Option<OwnedValuePath>,
251    source_name: &'static str,
252    log_namespace: LogNamespace,
253) where
254    <<T as TcpSource>::Decoder as tokio_util::codec::Decoder>::Item: std::marker::Send,
255    T: TcpSource,
256{
257    tokio::select! {
258        result = socket.handshake() => {
259            if let Err(error) = result {
260                emit!(TcpSocketTlsConnectionError { error });
261                return;
262            }
263        },
264        _ = &mut shutdown_signal => {
265            return;
266        }
267    };
268
269    if let Some(keepalive) = keepalive {
270        if let Err(error) = socket.set_keepalive(keepalive) {
271            warn!(message = "Failed configuring TCP keepalive.", %error);
272        }
273    }
274
275    if let Some(receive_buffer_bytes) = receive_buffer_bytes {
276        if let Err(error) = socket.set_receive_buffer_bytes(receive_buffer_bytes) {
277            warn!(message = "Failed configuring receive buffer size on TCP socket.", %error);
278        }
279    }
280
281    let socket = socket.after_read(move |byte_size| {
282        emit!(TcpBytesReceived {
283            byte_size,
284            peer_addr
285        });
286    });
287
288    let certificate_metadata = socket
289        .get_ref()
290        .ssl_stream()
291        .and_then(|stream| stream.ssl().peer_certificate())
292        .map(CertificateMetadata::from);
293
294    let reader = FramedRead::new(socket, source.decoder());
295    let mut reader = ReadyFrames::new(reader);
296
297    let connection_close_timeout = OptionFuture::from(
298        max_connection_duration_secs
299            .map(|timeout_secs| tokio::time::sleep(Duration::from_secs(timeout_secs))),
300    );
301
302    tokio::pin!(connection_close_timeout);
303
304    loop {
305        let mut permit = tokio::select! {
306            _ = &mut tripwire => break,
307            Some(_) = &mut connection_close_timeout  => {
308                if close_socket(reader.get_ref().get_ref().get_ref()) {
309                    break;
310                }
311                None
312            },
313            _ = &mut shutdown_signal => {
314                if close_socket(reader.get_ref().get_ref().get_ref()) {
315                    break;
316                }
317                None
318            },
319            permit = request_limiter.acquire() => {
320                Some(permit)
321            }
322            else => break,
323        };
324
325        let timeout = tokio::time::sleep(Duration::from_millis(10));
326        tokio::pin!(timeout);
327
328        tokio::select! {
329            _ = &mut tripwire => break,
330            _ = &mut shutdown_signal => {
331                if close_socket(reader.get_ref().get_ref().get_ref()) {
332                    break;
333                }
334            },
335            _ = &mut timeout => {
336                // This connection is currently holding a permit, but has not received data for some time. Release
337                // the permit to let another connection try
338                continue;
339            }
340            res = reader.next() => {
341                match res {
342                    Some(Ok((frames, _byte_size))) => {
343                        let _num_frames = frames.len();
344                        let acker = source.build_acker(&frames);
345                        let (batch, receiver) = BatchNotifier::maybe_new_with_receiver(acknowledgements);
346
347                        let mut events = frames.into_iter().flat_map(Into::into).collect::<Vec<Event>>();
348                        let count = events.len();
349
350                        emit!(SocketEventsReceived {
351                            mode: SocketMode::Tcp,
352                            byte_size: events.estimated_json_encoded_size_of(),
353                            count,
354                        });
355
356                        if let Some(permit) = &mut permit {
357                            // Note that this is intentionally not the "number of events in a single request", but rather
358                            // the "number of events currently available". This may contain events from multiple events,
359                            // but it should always contain all events from each request.
360                            permit.decoding_finished(events.len());
361                        }
362
363                        if let Some(batch) = batch {
364                            for event in &mut events {
365                                event.add_batch_notifier(batch.clone());
366                            }
367                        }
368
369
370                        if let Some(certificate_metadata) = &certificate_metadata {
371                            let mut metadata = ObjectMap::new();
372                            metadata.insert("subject".into(), certificate_metadata.subject().into());
373                            for event in &mut events {
374                                let log = event.as_mut_log();
375
376                                log_namespace.insert_source_metadata(
377                                    source_name,
378                                    log,
379                                    tls_client_metadata_key.as_ref().map(LegacyKey::Overwrite),
380                                    path!("tls_client_metadata"),
381                                    metadata.clone()
382                                );
383                            }
384                        }
385
386                        source.handle_events(&mut events, peer_addr);
387                        match out.send_batch(events).await {
388                            Ok(_) => {
389                                let ack = match receiver {
390                                    None => TcpSourceAck::Ack,
391                                    Some(receiver) =>
392                                        match receiver.await {
393                                            BatchStatus::Delivered => TcpSourceAck::Ack,
394                                            BatchStatus::Errored => {TcpSourceAck::Error},
395                                            BatchStatus::Rejected => {
396                                                // Sinks are responsible for emitting ComponentEventsDropped.
397                                                TcpSourceAck::Reject
398                                            }
399                                        }
400                                };
401                                if let Some(ack_bytes) = acker.build_ack(ack){
402                                    let stream = reader.get_mut().get_mut();
403                                    if let Err(error) = stream.write_all(&ack_bytes).await {
404                                        emit!(TcpSendAckError{ error });
405                                        break;
406                                    }
407                                }
408                                if ack != TcpSourceAck::Ack {
409                                    break;
410                                }
411                            }
412                            Err(_) => {
413                                emit!(StreamClosedError { count });
414                                break;
415                            }
416                        }
417                    }
418                    Some(Err(error)) => {
419                        if !<<T as TcpSource>::Error as StreamDecodingError>::can_continue(&error) {
420                            emit!(DecoderFramingError { error });
421                            break;
422                        }
423                    }
424                    None => {
425                        debug!("Connection closed.");
426                        break
427                    },
428                }
429            }
430            else => break,
431        }
432
433        drop(permit);
434    }
435}
436
437fn close_socket(socket: &MaybeTlsIncomingStream<TcpStream>) -> bool {
438    debug!("Start graceful shutdown.");
439    // Close our write part of TCP socket to signal the other side
440    // that it should stop writing and close the channel.
441    if let Some(stream) = socket.get_ref() {
442        let socket = SockRef::from(stream);
443        if let Err(error) = socket.shutdown(std::net::Shutdown::Write) {
444            warn!(message = "Failed in signalling to the other side to close the TCP channel.", %error);
445        }
446        false
447    } else {
448        // Connection hasn't yet been established so we are done here.
449        debug!("Closing connection that hasn't yet been fully established.");
450        true
451    }
452}