1use vector_lib::codecs::{
2 encoding::{Framer, FramingConfig},
3 TextSerializerConfig,
4};
5use vector_lib::configurable::configurable_component;
6
7#[cfg(not(windows))]
8use crate::sinks::util::unix::UnixSinkConfig;
9use crate::{
10 codecs::{Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType},
11 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
12 sinks::util::{tcp::TcpSinkConfig, udp::UdpSinkConfig},
13};
14
15#[configurable_component(sink("socket", "Deliver logs to a remote socket endpoint."))]
17#[derive(Clone, Debug)]
18pub struct SocketSinkConfig {
19 #[serde(flatten)]
20 pub mode: Mode,
21
22 #[configurable(derived)]
23 #[serde(
24 default,
25 deserialize_with = "crate::serde::bool_or_struct",
26 skip_serializing_if = "crate::serde::is_default"
27 )]
28 pub acknowledgements: AcknowledgementsConfig,
29}
30
31#[configurable_component]
33#[derive(Clone, Debug)]
34#[serde(tag = "mode", rename_all = "snake_case")]
35#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
36pub enum Mode {
37 Tcp(TcpMode),
39
40 Udp(UdpMode),
42
43 #[serde(alias = "unix")]
45 UnixStream(UnixMode),
46
47 UnixDatagram(UnixMode),
51}
52
53#[configurable_component]
55#[derive(Clone, Debug)]
56pub struct TcpMode {
57 #[serde(flatten)]
58 config: TcpSinkConfig,
59
60 #[serde(flatten)]
61 encoding: EncodingConfigWithFraming,
62}
63
64#[configurable_component]
66#[derive(Clone, Debug)]
67pub struct UdpMode {
68 #[serde(flatten)]
69 config: UdpSinkConfig,
70
71 #[configurable(derived)]
72 encoding: EncodingConfig,
73}
74
75#[configurable_component]
77#[derive(Clone, Debug)]
78pub struct UnixMode {
79 #[serde(flatten)]
80 config: UnixSinkConfig,
81
82 #[serde(flatten)]
83 encoding: EncodingConfigWithFraming,
84}
85
86#[cfg(windows)]
88#[configurable_component]
90#[derive(Clone, Debug)]
91pub struct UnixSinkConfig {
92 #[configurable(metadata(docs::examples = "/path/to/socket"))]
96 pub path: std::path::PathBuf,
97}
98
99impl GenerateConfig for SocketSinkConfig {
100 fn generate_config() -> toml::Value {
101 toml::from_str(
102 r#"address = "92.12.333.224:5000"
103 mode = "tcp"
104 encoding.codec = "json""#,
105 )
106 .unwrap()
107 }
108}
109
110impl SocketSinkConfig {
111 pub const fn new(mode: Mode, acknowledgements: AcknowledgementsConfig) -> Self {
112 SocketSinkConfig {
113 mode,
114 acknowledgements,
115 }
116 }
117
118 pub fn make_basic_tcp_config(
119 address: String,
120 acknowledgements: AcknowledgementsConfig,
121 ) -> Self {
122 Self::new(
123 Mode::Tcp(TcpMode {
124 config: TcpSinkConfig::from_address(address),
125 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
126 }),
127 acknowledgements,
128 )
129 }
130}
131
132#[async_trait::async_trait]
133#[typetag::serde(name = "socket")]
134impl SinkConfig for SocketSinkConfig {
135 async fn build(
136 &self,
137 _cx: SinkContext,
138 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
139 match &self.mode {
140 Mode::Tcp(TcpMode { config, encoding }) => {
141 let transformer = encoding.transformer();
142 let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
143 let encoder = Encoder::<Framer>::new(framer, serializer);
144 config.build(transformer, encoder)
145 }
146 Mode::Udp(UdpMode { config, encoding }) => {
147 let transformer = encoding.transformer();
148 let serializer = encoding.build()?;
149 let encoder = Encoder::<()>::new(serializer);
150 config.build(transformer, encoder)
151 }
152 #[cfg(unix)]
153 Mode::UnixStream(UnixMode { config, encoding }) => {
154 let transformer = encoding.transformer();
155 let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
156 let encoder = Encoder::<Framer>::new(framer, serializer);
157 config.build(
158 transformer,
159 encoder,
160 super::util::service::net::UnixMode::Stream,
161 )
162 }
163 #[allow(unused)]
164 #[cfg(unix)]
165 Mode::UnixDatagram(UnixMode { config, encoding }) => {
166 cfg_if! {
167 if #[cfg(not(target_os = "macos"))] {
168 let transformer = encoding.transformer();
169 let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
170 let encoder = Encoder::<Framer>::new(framer, serializer);
171 config.build(
172 transformer,
173 encoder,
174 super::util::service::net::UnixMode::Datagram,
175 )
176 }
177 else {
178 Err("UnixDatagram is not available on macOS platforms.".into())
179 }
180 }
181 }
182 #[cfg(not(unix))]
183 Mode::UnixStream(_) | Mode::UnixDatagram(_) => {
184 Err("Unix modes are supported only on Unix platforms.".into())
185 }
186 }
187 }
188
189 fn input(&self) -> Input {
190 let encoder_input_type = match &self.mode {
191 Mode::Tcp(TcpMode { encoding, .. }) => encoding.config().1.input_type(),
192 Mode::Udp(UdpMode { encoding, .. }) => encoding.config().input_type(),
193 Mode::UnixStream(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
194 Mode::UnixDatagram(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
195 };
196 Input::new(encoder_input_type)
197 }
198
199 fn acknowledgements(&self) -> &AcknowledgementsConfig {
200 &self.acknowledgements
201 }
202}
203
204#[cfg(test)]
205mod test {
206 use std::{
207 future::ready,
208 net::{SocketAddr, UdpSocket},
209 };
210
211 use futures::stream::StreamExt;
212 use futures_util::stream;
213 use serde_json::Value;
214 use tokio::{
215 net::TcpListener,
216 time::{sleep, timeout, Duration},
217 };
218 use tokio_stream::wrappers::TcpListenerStream;
219 use tokio_util::codec::{FramedRead, LinesCodec};
220 use vector_lib::codecs::JsonSerializerConfig;
221
222 use super::*;
223
224 #[cfg(target_os = "windows")]
225 use cfg_if::cfg_if;
226 cfg_if! { if #[cfg(unix)] {
227 use vector_lib::codecs::NativeJsonSerializerConfig;
228 use crate::test_util::random_metrics_with_stream;
229 use std::path::PathBuf;
230 } }
231 #[cfg(all(unix, not(target_os = "macos")))]
232 use std::os::unix::net::UnixDatagram;
233
234 use crate::{
235 config::SinkContext,
236 event::{Event, LogEvent},
237 test_util::{
238 components::{assert_sink_compliance, run_and_assert_sink_compliance, SINK_TAGS},
239 next_addr, next_addr_v6, random_lines_with_stream, trace_init, CountReceiver,
240 },
241 };
242
243 #[test]
244 fn generate_config() {
245 crate::test_util::test_generate_config::<SocketSinkConfig>();
246 }
247
248 enum DatagramSocket {
249 Udp(UdpSocket),
250 #[cfg(all(unix, not(target_os = "macos")))]
251 Unix(UnixDatagram),
252 }
253
254 enum DatagramSocketAddr {
255 Udp(SocketAddr),
256 #[cfg(all(unix, not(target_os = "macos")))]
257 Unix(PathBuf),
258 }
259
260 async fn test_datagram(datagram_addr: DatagramSocketAddr) {
261 let receiver = match &datagram_addr {
262 DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
263 #[cfg(all(unix, not(target_os = "macos")))]
264 DatagramSocketAddr::Unix(path) => {
265 DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
266 }
267 };
268
269 let config = SocketSinkConfig {
270 mode: match &datagram_addr {
271 DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
272 config: UdpSinkConfig::from_address(addr.to_string()),
273 encoding: JsonSerializerConfig::default().into(),
274 }),
275 #[cfg(all(unix, not(target_os = "macos")))]
276 DatagramSocketAddr::Unix(path) => Mode::UnixDatagram(UnixMode {
277 config: UnixSinkConfig::new(path.to_path_buf()),
278 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
279 }),
280 },
281 acknowledgements: Default::default(),
282 };
283
284 let context = SinkContext::default();
285 assert_sink_compliance(&SINK_TAGS, async move {
286 let (sink, _healthcheck) = config.build(context).await.unwrap();
287
288 let event = Event::Log(LogEvent::from("raw log line"));
289 sink.run(stream::once(ready(event.into()))).await
290 })
291 .await
292 .expect("Running sink failed");
293
294 let mut buf = [0; 256];
295 let size = match &receiver {
296 DatagramSocket::Udp(sock) => {
297 sock.recv_from(&mut buf).expect("Did not receive message").0
298 }
299 #[cfg(all(unix, not(target_os = "macos")))]
300 DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
301 };
302
303 let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
304 let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
305 let data = data.as_object().expect("Not a JSON object");
306 assert!(data.get("timestamp").is_some());
307 let message = data.get("message").expect("No message in JSON");
308 assert_eq!(message, &Value::String("raw log line".into()));
309 }
310
311 #[tokio::test]
312 async fn udp_ipv4() {
313 trace_init();
314
315 test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
316 }
317
318 #[tokio::test]
319 async fn udp_ipv6() {
320 trace_init();
321
322 test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
323 }
324
325 #[cfg(all(unix, not(target_os = "macos")))]
326 #[tokio::test]
327 async fn unix_datagram() {
328 trace_init();
329
330 test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
331 "unix_datagram_socket_test",
332 )))
333 .await;
334 }
335
336 #[tokio::test]
337 async fn tcp_stream() {
338 trace_init();
339
340 let addr = next_addr();
341 let config = SocketSinkConfig {
342 mode: Mode::Tcp(TcpMode {
343 config: TcpSinkConfig::from_address(addr.to_string()),
344 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
345 }),
346 acknowledgements: Default::default(),
347 };
348
349 let mut receiver = CountReceiver::receive_lines(addr);
350
351 let (lines, events) = random_lines_with_stream(10, 100, None);
352
353 assert_sink_compliance(&SINK_TAGS, async move {
354 let context = SinkContext::default();
355 let (sink, _healthcheck) = config.build(context).await.unwrap();
356
357 sink.run(events).await
358 })
359 .await
360 .expect("Running sink failed");
361
362 receiver.connected().await;
364
365 let output = receiver.await;
366 assert_eq!(lines.len(), output.len());
367 for (source, received) in lines.iter().zip(output) {
368 let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
369 let received = json.get("message").unwrap().as_str().unwrap();
370 assert_eq!(source, received);
371 }
372 }
373
374 #[cfg(unix)]
375 #[tokio::test]
376 async fn metrics_socket() {
377 trace_init();
378
379 let out_path = temp_uds_path("unix_socket_test");
380 let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
381
382 let config = SocketSinkConfig {
383 mode: Mode::UnixStream(UnixMode {
384 config: UnixSinkConfig::new(out_path),
385 encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
386 }),
387 acknowledgements: Default::default(),
388 };
389
390 let (expected, events) = random_metrics_with_stream(10, None, None);
391
392 assert_sink_compliance(&SINK_TAGS, async move {
393 let context = SinkContext::default();
394 let (sink, _healthcheck) = config.build(context).await.unwrap();
395
396 sink.run(events).await
397 })
398 .await
399 .expect("Running sink failed");
400
401 receiver.connected().await;
403
404 let output = receiver.await;
405 assert_eq!(expected.len(), output.len());
406 for (source, received) in expected.iter().zip(output) {
407 let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
408 let received = json.get("metric").unwrap();
409 let received_name = received.get("name").unwrap().as_str().unwrap();
410 assert_eq!(source.as_metric().name(), received_name);
411 }
412 }
413
414 #[tokio::test]
424 async fn tcp_stream_detects_disconnect() {
425 use std::{
426 pin::Pin,
427 sync::{
428 atomic::{AtomicUsize, Ordering},
429 Arc,
430 },
431 task::Poll,
432 };
433
434 use futures::{channel::mpsc, FutureExt, SinkExt, StreamExt};
435 use tokio::{
436 io::{AsyncRead, AsyncWriteExt, ReadBuf},
437 net::TcpStream,
438 task::yield_now,
439 time::{interval, Duration},
440 };
441 use tokio_stream::wrappers::IntervalStream;
442
443 use crate::event::EventArray;
444 use crate::tls::{
445 self, MaybeTlsIncomingStream, MaybeTlsSettings, TlsConfig, TlsEnableableConfig,
446 };
447
448 trace_init();
449
450 let addr = next_addr();
451 let config = SocketSinkConfig {
452 mode: Mode::Tcp(TcpMode {
453 config: TcpSinkConfig::new(
454 addr.to_string(),
455 None,
456 Some(TlsEnableableConfig {
457 enabled: Some(true),
458 options: TlsConfig {
459 verify_certificate: Some(false),
460 verify_hostname: Some(false),
461 ca_file: Some(tls::TEST_PEM_CRT_PATH.into()),
462 ..Default::default()
463 },
464 }),
465 None,
466 ),
467 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
468 }),
469 acknowledgements: Default::default(),
470 };
471 let context = SinkContext::default();
472 let (sink, _healthcheck) = config.build(context).await.unwrap();
473 let (mut sender, receiver) = mpsc::channel::<Option<EventArray>>(0);
474 let jh1 = tokio::spawn(async move {
475 let stream = receiver
476 .take_while(|event| ready(event.is_some()))
477 .map(|event| event.unwrap())
478 .boxed();
479 run_and_assert_sink_compliance(sink, stream, &SINK_TAGS).await
480 });
481
482 let msg_counter = Arc::new(AtomicUsize::new(0));
483 let msg_counter1 = Arc::clone(&msg_counter);
484 let conn_counter = Arc::new(AtomicUsize::new(0));
485 let conn_counter1 = Arc::clone(&conn_counter);
486
487 let (close_tx, close_rx) = tokio::sync::oneshot::channel::<()>();
488 let mut close_rx = Some(close_rx.map(|x| x.unwrap()));
489
490 let config = Some(TlsEnableableConfig::test_config());
491
492 let jh2 = tokio::spawn(async move {
494 let tls = MaybeTlsSettings::from_config(config.as_ref(), true).unwrap();
495 let listener = tls.bind(&addr).await.unwrap();
496 listener
497 .accept_stream()
498 .take(2)
499 .for_each(|connection| {
500 let mut close_rx = close_rx.take();
501
502 conn_counter1.fetch_add(1, Ordering::SeqCst);
503 let msg_counter1 = Arc::clone(&msg_counter1);
504
505 let mut stream: MaybeTlsIncomingStream<TcpStream> = connection.unwrap();
506
507 std::future::poll_fn(move |cx| loop {
508 if let Some(fut) = close_rx.as_mut() {
509 if let Poll::Ready(()) = fut.poll_unpin(cx) {
510 stream
511 .get_mut()
512 .unwrap()
513 .shutdown()
514 .now_or_never()
515 .unwrap()
516 .unwrap();
517 close_rx = None;
518 }
519 }
520
521 let mut buf = [0u8; 11];
522 let mut buf = ReadBuf::new(&mut buf);
523 return match Pin::new(&mut stream).poll_read(cx, &mut buf) {
524 Poll::Ready(Ok(())) => {
525 if buf.filled().is_empty() {
526 Poll::Ready(())
527 } else {
528 msg_counter1.fetch_add(1, Ordering::SeqCst);
529 continue;
530 }
531 }
532 Poll::Ready(Err(error)) => panic!("{}", error),
533 Poll::Pending => Poll::Pending,
534 };
535 })
536 })
537 .await;
538 });
539
540 let (_, mut events) = random_lines_with_stream(10, 10, None);
541 while let Some(event) = events.next().await {
542 sender.send(Some(event)).await.unwrap();
543 }
544
545 IntervalStream::new(interval(Duration::from_millis(100)))
549 .take(500)
550 .take_while(|_| ready(msg_counter.load(Ordering::SeqCst) != 10))
551 .for_each(|_| ready(()))
552 .await;
553 close_tx.send(()).unwrap();
554
555 yield_now().await;
557
558 assert_eq!(msg_counter.load(Ordering::SeqCst), 10);
559 assert_eq!(conn_counter.load(Ordering::SeqCst), 1);
560
561 let (_, mut events) = random_lines_with_stream(10, 10, None);
563 while let Some(event) = events.next().await {
564 sender.send(Some(event)).await.unwrap();
565 }
566
567 sender.send(None).await.unwrap();
569 jh1.await.unwrap();
570 jh2.await.unwrap();
571
572 assert_eq!(msg_counter.load(Ordering::SeqCst), 20);
574 assert_eq!(conn_counter.load(Ordering::SeqCst), 2);
575 }
576
577 #[tokio::test]
579 async fn reconnect() {
580 trace_init();
581
582 let addr = next_addr();
583 let config = SocketSinkConfig {
584 mode: Mode::Tcp(TcpMode {
585 config: TcpSinkConfig::from_address(addr.to_string()),
586 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
587 }),
588 acknowledgements: Default::default(),
589 };
590
591 let context = SinkContext::default();
592 let (sink, _healthcheck) = config.build(context).await.unwrap();
593
594 let (_, events) = random_lines_with_stream(1000, 10000, None);
595 let sink_handle = tokio::spawn(run_and_assert_sink_compliance(sink, events, &SINK_TAGS));
596
597 let mut count = 20usize;
599 TcpListenerStream::new(TcpListener::bind(addr).await.unwrap())
600 .next()
601 .await
602 .unwrap()
603 .map(|socket| FramedRead::new(socket, LinesCodec::new()))
604 .unwrap()
605 .map(|x| x.unwrap())
606 .take_while(|_| {
607 ready(if count > 0 {
608 count -= 1;
609 true
610 } else {
611 false
612 })
613 })
614 .collect::<Vec<_>>()
615 .await;
616
617 if cfg!(windows) {
619 sleep(Duration::from_secs(1)).await;
621 }
622
623 assert!(timeout(
626 Duration::from_secs(5),
627 CountReceiver::receive_lines(addr).connected()
628 )
629 .await
630 .is_ok());
631
632 sink_handle.await.unwrap();
633 }
634
635 #[cfg(unix)]
636 fn temp_uds_path(name: &str) -> PathBuf {
637 tempfile::tempdir().unwrap().keep().join(name)
638 }
639}