vector/sinks/util/
tcp.rs

1use std::{
2    io::ErrorKind,
3    net::SocketAddr,
4    pin::Pin,
5    task::{Context, Poll},
6    time::Duration,
7};
8
9use async_trait::async_trait;
10use bytes::{Bytes, BytesMut};
11use futures::{SinkExt, StreamExt, stream::BoxStream, task::noop_waker_ref};
12use snafu::{ResultExt, Snafu};
13use tokio::{
14    io::{AsyncRead, ReadBuf},
15    net::TcpStream,
16    time::sleep,
17};
18use tokio_util::codec::Encoder;
19use vector_lib::{
20    ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
21    json_size::JsonSize,
22};
23
24use crate::{
25    codecs::Transformer,
26    common::backoff::ExponentialBackoff,
27    dns,
28    event::Event,
29    internal_events::{
30        ConnectionOpen, OpenGauge, SocketMode, SocketSendError, TcpSocketConnectionEstablished,
31        TcpSocketConnectionShutdown, TcpSocketOutgoingConnectionError,
32    },
33    sink_ext::VecSinkExt,
34    sinks::{
35        Healthcheck, VectorSink,
36        util::{
37            EncodedEvent, SinkBuildError, StreamSink,
38            socket_bytes_sink::{BytesSink, ShutdownCheck},
39        },
40    },
41    tcp::TcpKeepaliveConfig,
42    tls::{MaybeTlsSettings, MaybeTlsStream, TlsEnableableConfig, TlsError},
43};
44
45#[derive(Debug, Snafu)]
46enum TcpError {
47    #[snafu(display("Connect error: {}", source))]
48    ConnectError { source: TlsError },
49    #[snafu(display("Unable to resolve DNS: {}", source))]
50    DnsError { source: dns::DnsError },
51    #[snafu(display("No addresses returned."))]
52    NoAddresses,
53}
54
55/// A TCP sink.
56#[configurable_component]
57#[derive(Clone, Debug)]
58pub struct TcpSinkConfig {
59    /// The address to connect to.
60    ///
61    /// Both IP address and hostname are accepted formats.
62    ///
63    /// The address _must_ include a port.
64    #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
65    #[configurable(metadata(docs::examples = "https://somehost:5000"))]
66    address: String,
67
68    #[configurable(derived)]
69    keepalive: Option<TcpKeepaliveConfig>,
70
71    #[configurable(derived)]
72    tls: Option<TlsEnableableConfig>,
73
74    /// The size of the socket's send buffer.
75    ///
76    /// If set, the value of the setting is passed via the `SO_SNDBUF` option.
77    #[configurable(metadata(docs::type_unit = "bytes"))]
78    #[configurable(metadata(docs::examples = 65536))]
79    send_buffer_bytes: Option<usize>,
80}
81
82impl TcpSinkConfig {
83    pub const fn new(
84        address: String,
85        keepalive: Option<TcpKeepaliveConfig>,
86        tls: Option<TlsEnableableConfig>,
87        send_buffer_bytes: Option<usize>,
88    ) -> Self {
89        Self {
90            address,
91            keepalive,
92            tls,
93            send_buffer_bytes,
94        }
95    }
96
97    pub const fn from_address(address: String) -> Self {
98        Self {
99            address,
100            keepalive: None,
101            tls: None,
102            send_buffer_bytes: None,
103        }
104    }
105
106    pub fn build(
107        &self,
108        transformer: Transformer,
109        encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
110        + Clone
111        + Send
112        + Sync
113        + 'static,
114    ) -> crate::Result<(VectorSink, Healthcheck)> {
115        let uri = self.address.parse::<http::Uri>()?;
116        let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
117        let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
118        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
119        let connector = TcpConnector::new(host, port, self.keepalive, tls, self.send_buffer_bytes);
120        let sink = TcpSink::new(connector.clone(), transformer, encoder);
121
122        Ok((
123            VectorSink::from_event_streamsink(sink),
124            Box::pin(async move { connector.healthcheck().await }),
125        ))
126    }
127}
128
129#[derive(Clone)]
130struct TcpConnector {
131    host: String,
132    port: u16,
133    keepalive: Option<TcpKeepaliveConfig>,
134    tls: MaybeTlsSettings,
135    send_buffer_bytes: Option<usize>,
136}
137
138impl TcpConnector {
139    const fn new(
140        host: String,
141        port: u16,
142        keepalive: Option<TcpKeepaliveConfig>,
143        tls: MaybeTlsSettings,
144        send_buffer_bytes: Option<usize>,
145    ) -> Self {
146        Self {
147            host,
148            port,
149            keepalive,
150            tls,
151            send_buffer_bytes,
152        }
153    }
154
155    #[cfg(test)]
156    fn from_host_port(host: String, port: u16) -> Self {
157        Self::new(host, port, None, None.into(), None)
158    }
159
160    const fn fresh_backoff() -> ExponentialBackoff {
161        // TODO: make configurable
162        ExponentialBackoff::from_millis(2)
163            .factor(250)
164            .max_delay(Duration::from_secs(60))
165    }
166
167    async fn connect(&self) -> Result<MaybeTlsStream<TcpStream>, TcpError> {
168        let ip = dns::Resolver
169            .lookup_ip(self.host.clone())
170            .await
171            .context(DnsSnafu)?
172            .next()
173            .ok_or(TcpError::NoAddresses)?;
174
175        let addr = SocketAddr::new(ip, self.port);
176        self.tls
177            .connect(&self.host, &addr)
178            .await
179            .context(ConnectSnafu)
180            .map(|mut maybe_tls| {
181                if let Some(keepalive) = self.keepalive
182                    && let Err(error) = maybe_tls.set_keepalive(keepalive)
183                {
184                    warn!(message = "Failed configuring TCP keepalive.", %error);
185                }
186
187                if let Some(send_buffer_bytes) = self.send_buffer_bytes
188                    && let Err(error) = maybe_tls.set_send_buffer_bytes(send_buffer_bytes)
189                {
190                    warn!(message = "Failed configuring send buffer size on TCP socket.", %error);
191                }
192
193                maybe_tls
194            })
195    }
196
197    async fn connect_backoff(&self) -> MaybeTlsStream<TcpStream> {
198        let mut backoff = Self::fresh_backoff();
199        loop {
200            match self.connect().await {
201                Ok(socket) => {
202                    emit!(TcpSocketConnectionEstablished {
203                        peer_addr: socket.peer_addr().ok(),
204                    });
205                    return socket;
206                }
207                Err(error) => {
208                    emit!(TcpSocketOutgoingConnectionError { error });
209                    sleep(backoff.next().unwrap()).await;
210                }
211            }
212        }
213    }
214
215    async fn healthcheck(&self) -> crate::Result<()> {
216        self.connect().await.map(|_| ()).map_err(Into::into)
217    }
218}
219
220struct TcpSink<E>
221where
222    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
223{
224    connector: TcpConnector,
225    transformer: Transformer,
226    encoder: E,
227}
228
229impl<E> TcpSink<E>
230where
231    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync + 'static,
232{
233    const fn new(connector: TcpConnector, transformer: Transformer, encoder: E) -> Self {
234        Self {
235            connector,
236            transformer,
237            encoder,
238        }
239    }
240
241    async fn connect(&self) -> BytesSink<MaybeTlsStream<TcpStream>> {
242        let stream = self.connector.connect_backoff().await;
243        BytesSink::new(stream, Self::shutdown_check, SocketMode::Tcp)
244    }
245
246    fn shutdown_check(stream: &mut MaybeTlsStream<TcpStream>) -> ShutdownCheck {
247        // Test if the remote has issued a disconnect by calling read(2)
248        // with a 1 sized buffer.
249        //
250        // This can return a proper disconnect error or `Ok(0)`
251        // which means the pipe is broken and we should try to reconnect.
252        //
253        // If this returns `Poll::Pending` we know the connection is still
254        // valid and the write will most likely succeed.
255        let mut cx = Context::from_waker(noop_waker_ref());
256        let mut buf = [0u8; 1];
257        let mut buf = ReadBuf::new(&mut buf);
258        match Pin::new(stream).poll_read(&mut cx, &mut buf) {
259            Poll::Ready(Err(error)) => ShutdownCheck::Error(error),
260            Poll::Ready(Ok(())) if buf.filled().is_empty() => {
261                // Maybe this is only a sign to close the channel,
262                // in which case we should try to flush our buffers
263                // before disconnecting.
264                ShutdownCheck::Close("ShutdownCheck::Close")
265            }
266            _ => ShutdownCheck::Alive,
267        }
268    }
269}
270
271#[async_trait]
272impl<E> StreamSink<Event> for TcpSink<E>
273where
274    E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>
275        + Clone
276        + Send
277        + Sync
278        + Sync
279        + 'static,
280{
281    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
282        // We need [Peekable](https://docs.rs/futures/0.3.6/futures/stream/struct.Peekable.html) for initiating
283        // connection only when we have something to send.
284        let mut encoder = self.encoder.clone();
285        let mut input = input
286            .map(|mut event| {
287                let byte_size = event.size_of();
288                let json_byte_size = event.estimated_json_encoded_size_of();
289                let finalizers = event.metadata_mut().take_finalizers();
290                self.transformer.transform(&mut event);
291                let mut bytes = BytesMut::new();
292
293                // Errors are handled by `Encoder`.
294                if encoder.encode(event, &mut bytes).is_ok() {
295                    let item = bytes.freeze();
296                    EncodedEvent {
297                        item,
298                        finalizers,
299                        byte_size,
300                        json_byte_size,
301                    }
302                } else {
303                    EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
304                }
305            })
306            .peekable();
307
308        while Pin::new(&mut input).peek().await.is_some() {
309            let mut sink = self.connect().await;
310            let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));
311
312            let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
313                Ok(()) => sink.close().await,
314                Err(error) => Err(error),
315            };
316
317            // TODO we can consider retrying once in the Error case. This sink is a "best effort"
318            // delivery due to the nature of the underlying protocol.
319            // For now, if an error occurs we cannot assume that the events succeeded in delivery
320            // so we will emit `Error` / `EventsDropped` internal events regardless of if the server
321            // responded with Ok(0).
322            if let Err(error) = result {
323                if error.kind() == ErrorKind::Other && error.to_string() == "ShutdownCheck::Close" {
324                    emit!(TcpSocketConnectionShutdown {});
325                }
326                emit!(SocketSendError {
327                    mode: SocketMode::Tcp,
328                    error
329                });
330            }
331        }
332
333        Ok(())
334    }
335}
336
337#[cfg(test)]
338mod test {
339    use tokio::net::TcpListener;
340
341    use super::*;
342    use crate::test_util::{next_addr, trace_init};
343
344    #[tokio::test]
345    async fn healthcheck() {
346        trace_init();
347
348        let addr = next_addr();
349        let _listener = TcpListener::bind(&addr).await.unwrap();
350        let good = TcpConnector::from_host_port(addr.ip().to_string(), addr.port());
351        assert!(good.healthcheck().await.is_ok());
352
353        let addr = next_addr();
354        let bad = TcpConnector::from_host_port(addr.ip().to_string(), addr.port());
355        assert!(bad.healthcheck().await.is_err());
356    }
357}