1use std::{
2 io::ErrorKind,
3 net::SocketAddr,
4 pin::Pin,
5 task::{Context, Poll},
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use bytes::{Bytes, BytesMut};
11use futures::{stream::BoxStream, task::noop_waker_ref, SinkExt, StreamExt};
12use snafu::{ResultExt, Snafu};
13use tokio::{
14 io::{AsyncRead, ReadBuf},
15 net::TcpStream,
16 time::sleep,
17};
18use tokio_util::codec::Encoder;
19use vector_lib::configurable::configurable_component;
20use vector_lib::json_size::JsonSize;
21use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
22
23use crate::{
24 codecs::Transformer,
25 common::backoff::ExponentialBackoff,
26 dns,
27 event::Event,
28 internal_events::{
29 ConnectionOpen, OpenGauge, SocketMode, SocketSendError, TcpSocketConnectionEstablished,
30 TcpSocketConnectionShutdown, TcpSocketOutgoingConnectionError,
31 },
32 sink_ext::VecSinkExt,
33 sinks::{
34 util::{
35 socket_bytes_sink::{BytesSink, ShutdownCheck},
36 EncodedEvent, SinkBuildError, StreamSink,
37 },
38 Healthcheck, VectorSink,
39 },
40 tcp::TcpKeepaliveConfig,
41 tls::{MaybeTlsSettings, MaybeTlsStream, TlsEnableableConfig, TlsError},
42};
43
44#[derive(Debug, Snafu)]
45enum TcpError {
46 #[snafu(display("Connect error: {}", source))]
47 ConnectError { source: TlsError },
48 #[snafu(display("Unable to resolve DNS: {}", source))]
49 DnsError { source: dns::DnsError },
50 #[snafu(display("No addresses returned."))]
51 NoAddresses,
52}
53
54#[configurable_component]
56#[derive(Clone, Debug)]
57pub struct TcpSinkConfig {
58 #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
64 #[configurable(metadata(docs::examples = "https://somehost:5000"))]
65 address: String,
66
67 #[configurable(derived)]
68 keepalive: Option<TcpKeepaliveConfig>,
69
70 #[configurable(derived)]
71 tls: Option<TlsEnableableConfig>,
72
73 #[configurable(metadata(docs::type_unit = "bytes"))]
77 #[configurable(metadata(docs::examples = 65536))]
78 send_buffer_bytes: Option<usize>,
79}
80
81impl TcpSinkConfig {
82 pub const fn new(
83 address: String,
84 keepalive: Option<TcpKeepaliveConfig>,
85 tls: Option<TlsEnableableConfig>,
86 send_buffer_bytes: Option<usize>,
87 ) -> Self {
88 Self {
89 address,
90 keepalive,
91 tls,
92 send_buffer_bytes,
93 }
94 }
95
96 pub const fn from_address(address: String) -> Self {
97 Self {
98 address,
99 keepalive: None,
100 tls: None,
101 send_buffer_bytes: None,
102 }
103 }
104
105 pub fn build(
106 &self,
107 transformer: Transformer,
108 encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
109 + Clone
110 + Send
111 + Sync
112 + 'static,
113 ) -> crate::Result<(VectorSink, Healthcheck)> {
114 let uri = self.address.parse::<http::Uri>()?;
115 let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
116 let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
117 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
118 let connector = TcpConnector::new(host, port, self.keepalive, tls, self.send_buffer_bytes);
119 let sink = TcpSink::new(connector.clone(), transformer, encoder);
120
121 Ok((
122 VectorSink::from_event_streamsink(sink),
123 Box::pin(async move { connector.healthcheck().await }),
124 ))
125 }
126}
127
128#[derive(Clone)]
129struct TcpConnector {
130 host: String,
131 port: u16,
132 keepalive: Option<TcpKeepaliveConfig>,
133 tls: MaybeTlsSettings,
134 send_buffer_bytes: Option<usize>,
135}
136
137impl TcpConnector {
138 const fn new(
139 host: String,
140 port: u16,
141 keepalive: Option<TcpKeepaliveConfig>,
142 tls: MaybeTlsSettings,
143 send_buffer_bytes: Option<usize>,
144 ) -> Self {
145 Self {
146 host,
147 port,
148 keepalive,
149 tls,
150 send_buffer_bytes,
151 }
152 }
153
154 #[cfg(test)]
155 fn from_host_port(host: String, port: u16) -> Self {
156 Self::new(host, port, None, None.into(), None)
157 }
158
159 const fn fresh_backoff() -> ExponentialBackoff {
160 ExponentialBackoff::from_millis(2)
162 .factor(250)
163 .max_delay(Duration::from_secs(60))
164 }
165
166 async fn connect(&self) -> Result<MaybeTlsStream<TcpStream>, TcpError> {
167 let ip = dns::Resolver
168 .lookup_ip(self.host.clone())
169 .await
170 .context(DnsSnafu)?
171 .next()
172 .ok_or(TcpError::NoAddresses)?;
173
174 let addr = SocketAddr::new(ip, self.port);
175 self.tls
176 .connect(&self.host, &addr)
177 .await
178 .context(ConnectSnafu)
179 .map(|mut maybe_tls| {
180 if let Some(keepalive) = self.keepalive {
181 if let Err(error) = maybe_tls.set_keepalive(keepalive) {
182 warn!(message = "Failed configuring TCP keepalive.", %error);
183 }
184 }
185
186 if let Some(send_buffer_bytes) = self.send_buffer_bytes {
187 if let Err(error) = maybe_tls.set_send_buffer_bytes(send_buffer_bytes) {
188 warn!(message = "Failed configuring send buffer size on TCP socket.", %error);
189 }
190 }
191
192 maybe_tls
193 })
194 }
195
196 async fn connect_backoff(&self) -> MaybeTlsStream<TcpStream> {
197 let mut backoff = Self::fresh_backoff();
198 loop {
199 match self.connect().await {
200 Ok(socket) => {
201 emit!(TcpSocketConnectionEstablished {
202 peer_addr: socket.peer_addr().ok(),
203 });
204 return socket;
205 }
206 Err(error) => {
207 emit!(TcpSocketOutgoingConnectionError { error });
208 sleep(backoff.next().unwrap()).await;
209 }
210 }
211 }
212 }
213
214 async fn healthcheck(&self) -> crate::Result<()> {
215 self.connect().await.map(|_| ()).map_err(Into::into)
216 }
217}
218
219struct TcpSink<E>
220where
221 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
222{
223 connector: TcpConnector,
224 transformer: Transformer,
225 encoder: E,
226}
227
228impl<E> TcpSink<E>
229where
230 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync + 'static,
231{
232 const fn new(connector: TcpConnector, transformer: Transformer, encoder: E) -> Self {
233 Self {
234 connector,
235 transformer,
236 encoder,
237 }
238 }
239
240 async fn connect(&self) -> BytesSink<MaybeTlsStream<TcpStream>> {
241 let stream = self.connector.connect_backoff().await;
242 BytesSink::new(stream, Self::shutdown_check, SocketMode::Tcp)
243 }
244
245 fn shutdown_check(stream: &mut MaybeTlsStream<TcpStream>) -> ShutdownCheck {
246 let mut cx = Context::from_waker(noop_waker_ref());
255 let mut buf = [0u8; 1];
256 let mut buf = ReadBuf::new(&mut buf);
257 match Pin::new(stream).poll_read(&mut cx, &mut buf) {
258 Poll::Ready(Err(error)) => ShutdownCheck::Error(error),
259 Poll::Ready(Ok(())) if buf.filled().is_empty() => {
260 ShutdownCheck::Close("ShutdownCheck::Close")
264 }
265 _ => ShutdownCheck::Alive,
266 }
267 }
268}
269
270#[async_trait]
271impl<E> StreamSink<Event> for TcpSink<E>
272where
273 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>
274 + Clone
275 + Send
276 + Sync
277 + Sync
278 + 'static,
279{
280 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
281 let mut encoder = self.encoder.clone();
284 let mut input = input
285 .map(|mut event| {
286 let byte_size = event.size_of();
287 let json_byte_size = event.estimated_json_encoded_size_of();
288 let finalizers = event.metadata_mut().take_finalizers();
289 self.transformer.transform(&mut event);
290 let mut bytes = BytesMut::new();
291
292 if encoder.encode(event, &mut bytes).is_ok() {
294 let item = bytes.freeze();
295 EncodedEvent {
296 item,
297 finalizers,
298 byte_size,
299 json_byte_size,
300 }
301 } else {
302 EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
303 }
304 })
305 .peekable();
306
307 while Pin::new(&mut input).peek().await.is_some() {
308 let mut sink = self.connect().await;
309 let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));
310
311 let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
312 Ok(()) => sink.close().await,
313 Err(error) => Err(error),
314 };
315
316 if let Err(error) = result {
322 if error.kind() == ErrorKind::Other && error.to_string() == "ShutdownCheck::Close" {
323 emit!(TcpSocketConnectionShutdown {});
324 }
325 emit!(SocketSendError {
326 mode: SocketMode::Tcp,
327 error
328 });
329 }
330 }
331
332 Ok(())
333 }
334}
335
336#[cfg(test)]
337mod test {
338 use tokio::net::TcpListener;
339
340 use super::*;
341 use crate::test_util::{next_addr, trace_init};
342
343 #[tokio::test]
344 async fn healthcheck() {
345 trace_init();
346
347 let addr = next_addr();
348 let _listener = TcpListener::bind(&addr).await.unwrap();
349 let good = TcpConnector::from_host_port(addr.ip().to_string(), addr.port());
350 assert!(good.healthcheck().await.is_ok());
351
352 let addr = next_addr();
353 let bad = TcpConnector::from_host_port(addr.ip().to_string(), addr.port());
354 assert!(bad.healthcheck().await.is_err());
355 }
356}