1use std::{
2 io::ErrorKind,
3 net::SocketAddr,
4 pin::Pin,
5 task::{Context, Poll},
6 time::Duration,
7};
8
9use async_trait::async_trait;
10use bytes::{Bytes, BytesMut};
11use futures::{SinkExt, StreamExt, stream::BoxStream, task::noop_waker_ref};
12use snafu::{ResultExt, Snafu};
13use tokio::{
14 io::{AsyncRead, ReadBuf},
15 net::TcpStream,
16 time::sleep,
17};
18use tokio_util::codec::Encoder;
19use vector_lib::{
20 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
21 json_size::JsonSize,
22};
23
24use crate::{
25 codecs::Transformer,
26 common::backoff::ExponentialBackoff,
27 dns,
28 event::Event,
29 internal_events::{
30 ConnectionOpen, OpenGauge, SocketMode, SocketSendError, TcpSocketConnectionEstablished,
31 TcpSocketConnectionShutdown, TcpSocketOutgoingConnectionError,
32 },
33 sink_ext::VecSinkExt,
34 sinks::{
35 Healthcheck, VectorSink,
36 util::{
37 EncodedEvent, SinkBuildError, StreamSink,
38 socket_bytes_sink::{BytesSink, ShutdownCheck},
39 },
40 },
41 tcp::TcpKeepaliveConfig,
42 tls::{MaybeTlsSettings, MaybeTlsStream, TlsEnableableConfig, TlsError},
43};
44
45#[derive(Debug, Snafu)]
46enum TcpError {
47 #[snafu(display("Connect error: {}", source))]
48 ConnectError { source: TlsError },
49 #[snafu(display("Unable to resolve DNS: {}", source))]
50 DnsError { source: dns::DnsError },
51 #[snafu(display("No addresses returned."))]
52 NoAddresses,
53}
54
55#[configurable_component]
57#[derive(Clone, Debug)]
58pub struct TcpSinkConfig {
59 #[configurable(metadata(docs::examples = "92.12.333.224:5000"))]
65 #[configurable(metadata(docs::examples = "https://somehost:5000"))]
66 address: String,
67
68 #[configurable(derived)]
69 keepalive: Option<TcpKeepaliveConfig>,
70
71 #[configurable(derived)]
72 tls: Option<TlsEnableableConfig>,
73
74 #[configurable(metadata(docs::type_unit = "bytes"))]
78 #[configurable(metadata(docs::examples = 65536))]
79 send_buffer_bytes: Option<usize>,
80}
81
82impl TcpSinkConfig {
83 pub const fn new(
84 address: String,
85 keepalive: Option<TcpKeepaliveConfig>,
86 tls: Option<TlsEnableableConfig>,
87 send_buffer_bytes: Option<usize>,
88 ) -> Self {
89 Self {
90 address,
91 keepalive,
92 tls,
93 send_buffer_bytes,
94 }
95 }
96
97 pub const fn from_address(address: String) -> Self {
98 Self {
99 address,
100 keepalive: None,
101 tls: None,
102 send_buffer_bytes: None,
103 }
104 }
105
106 pub fn build(
107 &self,
108 transformer: Transformer,
109 encoder: impl Encoder<Event, Error = vector_lib::codecs::encoding::Error>
110 + Clone
111 + Send
112 + Sync
113 + 'static,
114 ) -> crate::Result<(VectorSink, Healthcheck)> {
115 let uri = self.address.parse::<http::Uri>()?;
116 let host = uri.host().ok_or(SinkBuildError::MissingHost)?.to_string();
117 let port = uri.port_u16().ok_or(SinkBuildError::MissingPort)?;
118 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
119 let connector = TcpConnector::new(host, port, self.keepalive, tls, self.send_buffer_bytes);
120 let sink = TcpSink::new(connector.clone(), transformer, encoder);
121
122 Ok((
123 VectorSink::from_event_streamsink(sink),
124 Box::pin(async move { connector.healthcheck().await }),
125 ))
126 }
127}
128
129#[derive(Clone)]
130struct TcpConnector {
131 host: String,
132 port: u16,
133 keepalive: Option<TcpKeepaliveConfig>,
134 tls: MaybeTlsSettings,
135 send_buffer_bytes: Option<usize>,
136}
137
138impl TcpConnector {
139 const fn new(
140 host: String,
141 port: u16,
142 keepalive: Option<TcpKeepaliveConfig>,
143 tls: MaybeTlsSettings,
144 send_buffer_bytes: Option<usize>,
145 ) -> Self {
146 Self {
147 host,
148 port,
149 keepalive,
150 tls,
151 send_buffer_bytes,
152 }
153 }
154
155 #[cfg(test)]
156 fn from_host_port(host: String, port: u16) -> Self {
157 Self::new(host, port, None, None.into(), None)
158 }
159
160 const fn fresh_backoff() -> ExponentialBackoff {
161 ExponentialBackoff::from_millis(2)
163 .factor(250)
164 .max_delay(Duration::from_secs(60))
165 }
166
167 async fn connect(&self) -> Result<MaybeTlsStream<TcpStream>, TcpError> {
168 let ip = dns::Resolver
169 .lookup_ip(self.host.clone())
170 .await
171 .context(DnsSnafu)?
172 .next()
173 .ok_or(TcpError::NoAddresses)?;
174
175 let addr = SocketAddr::new(ip, self.port);
176 self.tls
177 .connect(&self.host, &addr)
178 .await
179 .context(ConnectSnafu)
180 .map(|mut maybe_tls| {
181 if let Some(keepalive) = self.keepalive
182 && let Err(error) = maybe_tls.set_keepalive(keepalive)
183 {
184 warn!(message = "Failed configuring TCP keepalive.", %error);
185 }
186
187 if let Some(send_buffer_bytes) = self.send_buffer_bytes
188 && let Err(error) = maybe_tls.set_send_buffer_bytes(send_buffer_bytes)
189 {
190 warn!(message = "Failed configuring send buffer size on TCP socket.", %error);
191 }
192
193 maybe_tls
194 })
195 }
196
197 async fn connect_backoff(&self) -> MaybeTlsStream<TcpStream> {
198 let mut backoff = Self::fresh_backoff();
199 loop {
200 match self.connect().await {
201 Ok(socket) => {
202 emit!(TcpSocketConnectionEstablished {
203 peer_addr: socket.peer_addr().ok(),
204 });
205 return socket;
206 }
207 Err(error) => {
208 emit!(TcpSocketOutgoingConnectionError { error });
209 sleep(backoff.next().unwrap()).await;
210 }
211 }
212 }
213 }
214
215 async fn healthcheck(&self) -> crate::Result<()> {
216 self.connect().await.map(|_| ()).map_err(Into::into)
217 }
218}
219
220struct TcpSink<E>
221where
222 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync,
223{
224 connector: TcpConnector,
225 transformer: Transformer,
226 encoder: E,
227}
228
229impl<E> TcpSink<E>
230where
231 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error> + Clone + Send + Sync + 'static,
232{
233 const fn new(connector: TcpConnector, transformer: Transformer, encoder: E) -> Self {
234 Self {
235 connector,
236 transformer,
237 encoder,
238 }
239 }
240
241 async fn connect(&self) -> BytesSink<MaybeTlsStream<TcpStream>> {
242 let stream = self.connector.connect_backoff().await;
243 BytesSink::new(stream, Self::shutdown_check, SocketMode::Tcp)
244 }
245
246 fn shutdown_check(stream: &mut MaybeTlsStream<TcpStream>) -> ShutdownCheck {
247 let mut cx = Context::from_waker(noop_waker_ref());
256 let mut buf = [0u8; 1];
257 let mut buf = ReadBuf::new(&mut buf);
258 match Pin::new(stream).poll_read(&mut cx, &mut buf) {
259 Poll::Ready(Err(error)) => ShutdownCheck::Error(error),
260 Poll::Ready(Ok(())) if buf.filled().is_empty() => {
261 ShutdownCheck::Close("ShutdownCheck::Close")
265 }
266 _ => ShutdownCheck::Alive,
267 }
268 }
269}
270
271#[async_trait]
272impl<E> StreamSink<Event> for TcpSink<E>
273where
274 E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>
275 + Clone
276 + Send
277 + Sync
278 + Sync
279 + 'static,
280{
281 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
282 let mut encoder = self.encoder.clone();
285 let mut input = input
286 .map(|mut event| {
287 let byte_size = event.size_of();
288 let json_byte_size = event.estimated_json_encoded_size_of();
289 let finalizers = event.metadata_mut().take_finalizers();
290 self.transformer.transform(&mut event);
291 let mut bytes = BytesMut::new();
292
293 if encoder.encode(event, &mut bytes).is_ok() {
295 let item = bytes.freeze();
296 EncodedEvent {
297 item,
298 finalizers,
299 byte_size,
300 json_byte_size,
301 }
302 } else {
303 EncodedEvent::new(Bytes::new(), 0, JsonSize::zero())
304 }
305 })
306 .peekable();
307
308 while Pin::new(&mut input).peek().await.is_some() {
309 let mut sink = self.connect().await;
310 let _open_token = OpenGauge::new().open(|count| emit!(ConnectionOpen { count }));
311
312 let result = match sink.send_all_peekable(&mut (&mut input).peekable()).await {
313 Ok(()) => sink.close().await,
314 Err(error) => Err(error),
315 };
316
317 if let Err(error) = result {
323 if error.kind() == ErrorKind::Other && error.to_string() == "ShutdownCheck::Close" {
324 emit!(TcpSocketConnectionShutdown {});
325 }
326 emit!(SocketSendError {
327 mode: SocketMode::Tcp,
328 error
329 });
330 }
331 }
332
333 Ok(())
334 }
335}
336
337#[cfg(test)]
338mod test {
339 use tokio::net::TcpListener;
340
341 use super::*;
342 use crate::test_util::{next_addr, trace_init};
343
344 #[tokio::test]
345 async fn healthcheck() {
346 trace_init();
347
348 let addr = next_addr();
349 let _listener = TcpListener::bind(&addr).await.unwrap();
350 let good = TcpConnector::from_host_port(addr.ip().to_string(), addr.port());
351 assert!(good.healthcheck().await.is_ok());
352
353 let addr = next_addr();
354 let bad = TcpConnector::from_host_port(addr.ip().to_string(), addr.port());
355 assert!(bad.healthcheck().await.is_err());
356 }
357}