vector/sinks/util/
tcp.rs

1use std::{
2    io::ErrorKind,
3    net::SocketAddr,
4    pin::Pin,
5    task::{Context, Poll},
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use bytes::{Bytes, BytesMut};
11use futures::{stream::BoxStream, task::noop_waker_ref, SinkExt, StreamExt};
12use snafu::{ResultExt, Snafu};
13use tokio::{
14    io::{AsyncRead, ReadBuf},
15    net::TcpStream,
16    time::sleep,
17};
18use tokio_util::codec::Encoder;
19use vector_lib::configurable::configurable_component;
20use vector_lib::json_size::JsonSize;
21use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
22
23use crate::{
24    codecs::Transformer,
25    common::backoff::ExponentialBackoff,
26    dns,
27    event::Event,
28    internal_events::{
29        ConnectionOpen, OpenGauge, SocketMode, SocketSendError, TcpSocketConnectionEstablished,
30        TcpSocketConnectionShutdown, TcpSocketOutgoingConnectionError,
31    },
32    sink_ext::VecSinkExt,
33    sinks::{
34        util::{
35            socket_bytes_sink::{BytesSink, ShutdownCheck},
36            EncodedEvent, SinkBuildError, StreamSink,
37        },
38        Healthcheck, VectorSink,
39    },
40    tcp::TcpKeepaliveConfig,
41    tls::{MaybeTlsSettings, MaybeTlsStream, TlsEnableableConfig, TlsError},
42};
43
44#[derive(Debug, Snafu)]
45enum TcpError {
46    #[snafu(display("Connect error: {}", source))]
47    ConnectError { source: TlsError },
48    #[snafu(display("Unable to resolve DNS: {}", source))]
49    DnsError { source: dns::DnsError },
50    #[snafu(display("No addresses returned."))]
51    NoAddresses,
52}
53
54/// A TCP sink.
55#[configurable_component]
56#[derive(Clone, Debug)]
57pub struct TcpSinkConfig {
58    /// The address to connect to.
59    ///
60    /// Both IP address and hostname are accepted formats.
61    ///
62    /// The address _must_ include a port.
63    #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
64    #[configurable(metadata(docs::examples = "https://somehost:5000"))]
65    address: String,
66
67    #[configurable(derived)]
68    keepalive: Option<TcpKeepaliveConfig>,
69
70    #[configurable(derived)]
71    tls: Option<TlsEnableableConfig>,
72
73    /// The size of the socket's send buffer.
74    ///
75    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
76    #[configurable(metadata(docs::type_unit = "bytes"))]
77    #[configurable(metadata(docs::examples = 65536))]
78    send_buffer_bytes: Option<usize>,
79}
80
81impl TcpSinkConfig {
82    pub const fn new(
83        address: String,
84        keepalive: Option<TcpKeepaliveConfig>,
85        tls: Option<TlsEnableableConfig>,
86        send_buffer_bytes: Option<usize>,
87    ) -> Self {
88        Self {
89            address,
90            keepalive,
91            tls,
92            send_buffer_bytes,
93        }
94    }
95
96    pub const fn from_address(address: String) -> Self {
97        Self {
98            address,
99            keepalive: None,
100            tls: None,
101            send_buffer_bytes: None,
102        }
103    }
104
105    pub fn build(
106        &self,
107        transformer: Transformer,
108        encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
109            + Clone
110            + Send
111            + Sync
112            + 'static,
113    ) -> crate::Result<(VectorSink, Healthcheck)> {
114        let uri = self.address.parse::<http::Uri>()?;
115        let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
116        let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
117        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
118        let connector = TcpConnector::new(host, port, self.keepalive, tls, self.send_buffer_bytes);
119        let sink = TcpSink::new(connector.clone(), transformer, encoder);
120
121        Ok((
122            VectorSink::from_event_streamsink(sink),
123            Box::pin(async move { connector.healthcheck().await }),
124        ))
125    }
126}
127
128#[derive(Clone)]
129struct TcpConnector {
130    host: String,
131    port: u16,
132    keepalive: Option<TcpKeepaliveConfig>,
133    tls: MaybeTlsSettings,
134    send_buffer_bytes: Option<usize>,
135}
136
137impl TcpConnector {
138    const fn new(
139        host: String,
140        port: u16,
141        keepalive: Option<TcpKeepaliveConfig>,
142        tls: MaybeTlsSettings,
143        send_buffer_bytes: Option<usize>,
144    ) -> Self {
145        Self {
146            host,
147            port,
148            keepalive,
149            tls,
150            send_buffer_bytes,
151        }
152    }
153
154    #[cfg(test)]
155    fn from_host_port(host: String, port: u16) -> Self {
156        Self::new(host, port, None, None.into(), None)
157    }
158
159    const fn fresh_backoff() -> ExponentialBackoff {
160        // TODO: make configurable
161        ExponentialBackoff::from_millis(2)
162            .factor(250)
163            .max_delay(Duration::from_secs(60))
164    }
165
166    async fn connect(&self) -> Result<MaybeTlsStream<TcpStream>, TcpError> {
167        let ip = dns::Resolver
168            .lookup_ip(self.host.clone())
169            .await
170            .context(DnsSnafu)?
171            .next()
172            .ok_or(TcpError::NoAddresses)?;
173
174        let addr = SocketAddr::new(ip, self.port);
175        self.tls
176            .connect(&self.host, &addr)
177            .await
178            .context(ConnectSnafu)
179            .map(|mut maybe_tls| {
180                if let Some(keepalive) = self.keepalive {
181                    if let Err(error) = maybe_tls.set_keepalive(keepalive) {
182                        warn!(message = "Failed configuring TCP keepalive.", %error);
183                    }
184                }
185
186                if let Some(send_buffer_bytes) = self.send_buffer_bytes {
187                    if let Err(error) = maybe_tls.set_send_buffer_bytes(send_buffer_bytes) {
188                        warn!(message = "Failed configuring send buffer size on TCP socket.", %error);
189                    }
190                }
191
192                maybe_tls
193            })
194    }
195
196    async fn connect_backoff(&self) -> MaybeTlsStream<TcpStream> {
197        let mut backoff = Self::fresh_backoff();
198        loop {
199            match self.connect().await {
200                Ok(socket) => {
201                    emit!(TcpSocketConnectionEstablished {
202                        peer_addr: socket.peer_addr().ok(),
203                    });
204                    return socket;
205                }
206                Err(error) => {
207                    emit!(TcpSocketOutgoingConnectionError { error });
208                    sleep(backoff.next().unwrap()).await;
209                }
210            }
211        }
212    }
213
214    async fn healthcheck(&self) -> crate::Result<()> {
215        self.connect().await.map(|_| ()).map_err(Into::into)
216    }
217}
218
219struct TcpSink<E>
220where
221    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
222{
223    connector: TcpConnector,
224    transformer: Transformer,
225    encoder: E,
226}
227
228impl<E> TcpSink<E>
229where
230    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync + 'static,
231{
232    const fn new(connector: TcpConnector, transformer: Transformer, encoder: E) -> Self {
233        Self {
234            connector,
235            transformer,
236            encoder,
237        }
238    }
239
240    async fn connect(&self) -> BytesSink<MaybeTlsStream<TcpStream>> {
241        let stream = self.connector.connect_backoff().await;
242        BytesSink::new(stream, Self::shutdown_check, SocketMode::Tcp)
243    }
244
245    fn shutdown_check(stream: &mut MaybeTlsStream<TcpStream>) -> ShutdownCheck {
246        // Test if the remote has issued a disconnect by calling read(2)
247        // with a 1 sized buffer.
248        //
249        // This can return a proper disconnect error or `Ok(0)`
250        // which means the pipe is broken and we should try to reconnect.
251        //
252        // If this returns `Poll::Pending` we know the connection is still
253        // valid and the write will most likely succeed.
254        let mut cx = Context::from_waker(noop_waker_ref());
255        let mut buf = [0u8; 1];
256        let mut buf = ReadBuf::new(&mut buf);
257        match Pin::new(stream).poll_read(&mut cx, &mut buf) {
258            Poll::Ready(Err(error)) => ShutdownCheck::Error(error),
259            Poll::Ready(Ok(())) if buf.filled().is_empty() => {
260                // Maybe this is only a sign to close the channel,
261                // in which case we should try to flush our buffers
262                // before disconnecting.
263                ShutdownCheck::Close("ShutdownCheck::Close")
264            }
265            _ => ShutdownCheck::Alive,
266        }
267    }
268}
269
270#[async_trait]
271impl<E> StreamSink<Event> for TcpSink<E>
272where
273    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>
274        + Clone
275        + Send
276        + Sync
277        + Sync
278        + 'static,
279{
280    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
281        // We need [Peekable](https://docs.rs/futures/0.3.6/futures/stream/struct.Peekable.html) for initiating
282        // connection only when we have something to send.
283        let mut encoder = self.encoder.clone();
284        let mut input = input
285            .map(|mut event| {
286                let byte_size = event.size_of();
287                let json_byte_size = event.estimated_json_encoded_size_of();
288                let finalizers = event.metadata_mut().take_finalizers();
289                self.transformer.transform(&mut event);
290                let mut bytes = BytesMut::new();
291
292                // Errors are handled by `Encoder`.
293                if encoder.encode(event, &mut bytes).is_ok() {
294                    let item = bytes.freeze();
295                    EncodedEvent {
296                        item,
297                        finalizers,
298                        byte_size,
299                        json_byte_size,
300                    }
301                } else {
302                    EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
303                }
304            })
305            .peekable();
306
307        while Pin::new(&mut input).peek().await.is_some() {
308            let mut sink = self.connect().await;
309            let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));
310
311            let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
312                Ok(()) => sink.close().await,
313                Err(error) => Err(error),
314            };
315
316            // TODO we can consider retrying once in the Error case. This sink is a "best effort"
317            // delivery due to the nature of the underlying protocol.
318            // For now, if an error occurs we cannot assume that the events succeeded in delivery
319            // so we will emit `Error` / `EventsDropped` internal events regardless of if the server
320            // responded with Ok(0).
321            if let Err(error) = result {
322                if error.kind() == ErrorKind::Other && error.to_string() == "ShutdownCheck::Close" {
323                    emit!(TcpSocketConnectionShutdown {});
324                }
325                emit!(SocketSendError {
326                    mode: SocketMode::Tcp,
327                    error
328                });
329            }
330        }
331
332        Ok(())
333    }
334}
335
336#[cfg(test)]
337mod test {
338    use tokio::net::TcpListener;
339
340    use super::*;
341    use crate::test_util::{next_addr, trace_init};
342
343    #[tokio::test]
344    async fn healthcheck() {
345        trace_init();
346
347        let addr = next_addr();
348        let _listener = TcpListener::bind(&addr).await.unwrap();
349        let good = TcpConnector::from_host_port(addr.ip().to_string(), addr.port());
350        assert!(good.healthcheck().await.is_ok());
351
352        let addr = next_addr();
353        let bad = TcpConnector::from_host_port(addr.ip().to_string(), addr.port());
354        assert!(bad.healthcheck().await.is_err());
355    }
356}