1use vector_lib::{
2 codecs::{
3 TextSerializerConfig,
4 encoding::{Framer, FramingConfig},
5 },
6 configurable::configurable_component,
7};
8
9#[cfg(not(windows))]
10use crate::sinks::util::unix::UnixSinkConfig;
11use crate::{
12 codecs::{Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType},
13 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
14 sinks::util::{tcp::TcpSinkConfig, udp::UdpSinkConfig},
15};
16
17#[configurable_component(sink("socket", "Deliver logs to a remote socket endpoint."))]
19#[derive(Clone, Debug)]
20pub struct SocketSinkConfig {
21 #[serde(flatten)]
22 pub mode: Mode,
23
24 #[configurable(derived)]
25 #[serde(
26 default,
27 deserialize_with = "crate::serde::bool_or_struct",
28 skip_serializing_if = "crate::serde::is_default"
29 )]
30 pub acknowledgements: AcknowledgementsConfig,
31}
32
33#[configurable_component]
35#[derive(Clone, Debug)]
36#[serde(tag = "mode", rename_all = "snake_case")]
37#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
38pub enum Mode {
39 Tcp(TcpMode),
41
42 Udp(UdpMode),
44
45 #[serde(alias = "unix")]
47 UnixStream(UnixMode),
48
49 UnixDatagram(UnixMode),
53}
54
55#[configurable_component]
57#[derive(Clone, Debug)]
58pub struct TcpMode {
59 #[serde(flatten)]
60 config: TcpSinkConfig,
61
62 #[serde(flatten)]
63 encoding: EncodingConfigWithFraming,
64}
65
66#[configurable_component]
68#[derive(Clone, Debug)]
69pub struct UdpMode {
70 #[serde(flatten)]
71 config: UdpSinkConfig,
72
73 #[configurable(derived)]
74 encoding: EncodingConfig,
75}
76
77#[configurable_component]
79#[derive(Clone, Debug)]
80pub struct UnixMode {
81 #[serde(flatten)]
82 config: UnixSinkConfig,
83
84 #[serde(flatten)]
85 encoding: EncodingConfigWithFraming,
86}
87
88#[cfg(windows)]
90#[configurable_component]
92#[derive(Clone, Debug)]
93pub struct UnixSinkConfig {
94 #[configurable(metadata(docs::examples = "/path/to/socket"))]
98 pub path: std::path::PathBuf,
99}
100
101impl GenerateConfig for SocketSinkConfig {
102 fn generate_config() -> toml::Value {
103 toml::from_str(
104 r#"address = "92.12.333.224:5000"
105 mode = "tcp"
106 encoding.codec = "json""#,
107 )
108 .unwrap()
109 }
110}
111
112impl SocketSinkConfig {
113 pub const fn new(mode: Mode, acknowledgements: AcknowledgementsConfig) -> Self {
114 SocketSinkConfig {
115 mode,
116 acknowledgements,
117 }
118 }
119
120 pub fn make_basic_tcp_config(
121 address: String,
122 acknowledgements: AcknowledgementsConfig,
123 ) -> Self {
124 Self::new(
125 Mode::Tcp(TcpMode {
126 config: TcpSinkConfig::from_address(address),
127 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
128 }),
129 acknowledgements,
130 )
131 }
132}
133
134#[async_trait::async_trait]
135#[typetag::serde(name = "socket")]
136impl SinkConfig for SocketSinkConfig {
137 async fn build(
138 &self,
139 _cx: SinkContext,
140 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
141 match &self.mode {
142 Mode::Tcp(TcpMode { config, encoding }) => {
143 let transformer = encoding.transformer();
144 let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
145 let encoder = Encoder::<Framer>::new(framer, serializer);
146 config.build(transformer, encoder)
147 }
148 Mode::Udp(UdpMode { config, encoding }) => {
149 let transformer = encoding.transformer();
150 let serializer = encoding.build()?;
151 let chunker = serializer.chunker();
152 let encoder = Encoder::<()>::new(serializer);
153 config.build(transformer, encoder, chunker)
154 }
155 #[cfg(unix)]
156 Mode::UnixStream(UnixMode { config, encoding }) => {
157 let transformer = encoding.transformer();
158 let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
159 let encoder = Encoder::<Framer>::new(framer, serializer);
160 config.build(
161 transformer,
162 encoder,
163 super::util::service::net::UnixMode::Stream,
164 )
165 }
166 #[allow(unused)]
167 #[cfg(unix)]
168 Mode::UnixDatagram(UnixMode { config, encoding }) => {
169 cfg_if! {
170 if #[cfg(not(target_os = "macos"))] {
171 let transformer = encoding.transformer();
172 let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
173 let encoder = Encoder::<Framer>::new(framer, serializer);
174 config.build(
175 transformer,
176 encoder,
177 super::util::service::net::UnixMode::Datagram,
178 )
179 }
180 else {
181 Err("UnixDatagram is not available on macOS platforms.".into())
182 }
183 }
184 }
185 #[cfg(not(unix))]
186 Mode::UnixStream(_) | Mode::UnixDatagram(_) => {
187 Err("Unix modes are supported only on Unix platforms.".into())
188 }
189 }
190 }
191
192 fn input(&self) -> Input {
193 let encoder_input_type = match &self.mode {
194 Mode::Tcp(TcpMode { encoding, .. }) => encoding.config().1.input_type(),
195 Mode::Udp(UdpMode { encoding, .. }) => encoding.config().input_type(),
196 Mode::UnixStream(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
197 Mode::UnixDatagram(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
198 };
199 Input::new(encoder_input_type)
200 }
201
202 fn acknowledgements(&self) -> &AcknowledgementsConfig {
203 &self.acknowledgements
204 }
205}
206
207#[cfg(test)]
208mod test {
209 use std::{
210 future::ready,
211 net::{SocketAddr, UdpSocket},
212 };
213
214 #[cfg(target_os = "windows")]
215 use cfg_if::cfg_if;
216 use futures::stream::StreamExt;
217 use futures_util::stream;
218 use serde_json::Value;
219 use tokio::{
220 net::TcpListener,
221 time::{Duration, sleep, timeout},
222 };
223 use tokio_stream::wrappers::TcpListenerStream;
224 use tokio_util::codec::{FramedRead, LinesCodec};
225 use vector_lib::codecs::JsonSerializerConfig;
226
227 use super::*;
228 cfg_if! { if #[cfg(unix)] {
229 use vector_lib::codecs::NativeJsonSerializerConfig;
230 use crate::test_util::random_metrics_with_stream;
231 use std::path::PathBuf;
232 } }
233 #[cfg(all(unix, not(target_os = "macos")))]
234 use std::os::unix::net::UnixDatagram;
235
236 use crate::{
237 config::SinkContext,
238 event::{Event, LogEvent},
239 test_util::{
240 CountReceiver,
241 addr::{next_addr, next_addr_v6},
242 components::{SINK_TAGS, assert_sink_compliance, run_and_assert_sink_compliance},
243 random_lines_with_stream, trace_init,
244 },
245 };
246
247 #[test]
248 fn generate_config() {
249 crate::test_util::test_generate_config::<SocketSinkConfig>();
250 }
251
252 enum DatagramSocket {
253 Udp(UdpSocket),
254 #[cfg(all(unix, not(target_os = "macos")))]
255 Unix(UnixDatagram),
256 }
257
258 enum DatagramSocketAddr {
259 Udp(SocketAddr),
260 #[cfg(all(unix, not(target_os = "macos")))]
261 Unix(PathBuf),
262 }
263
264 async fn test_datagram(datagram_addr: DatagramSocketAddr) {
265 let receiver = match &datagram_addr {
266 DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
267 #[cfg(all(unix, not(target_os = "macos")))]
268 DatagramSocketAddr::Unix(path) => {
269 DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
270 }
271 };
272
273 let config = SocketSinkConfig {
274 mode: match &datagram_addr {
275 DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
276 config: UdpSinkConfig::from_address(addr.to_string()),
277 encoding: JsonSerializerConfig::default().into(),
278 }),
279 #[cfg(all(unix, not(target_os = "macos")))]
280 DatagramSocketAddr::Unix(path) => Mode::UnixDatagram(UnixMode {
281 config: UnixSinkConfig::new(path.to_path_buf()),
282 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
283 }),
284 },
285 acknowledgements: Default::default(),
286 };
287
288 let context = SinkContext::default();
289 assert_sink_compliance(&SINK_TAGS, async move {
290 let (sink, _healthcheck) = config.build(context).await.unwrap();
291
292 let event = Event::Log(LogEvent::from("raw log line"));
293 sink.run(stream::once(ready(event.into()))).await
294 })
295 .await
296 .expect("Running sink failed");
297
298 let mut buf = [0; 256];
299 let size = match &receiver {
300 DatagramSocket::Udp(sock) => {
301 sock.recv_from(&mut buf).expect("Did not receive message").0
302 }
303 #[cfg(all(unix, not(target_os = "macos")))]
304 DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
305 };
306
307 let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
308 let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
309 let data = data.as_object().expect("Not a JSON object");
310 assert!(data.get("timestamp").is_some());
311 let message = data.get("message").expect("No message in JSON");
312 assert_eq!(message, &Value::String("raw log line".into()));
313 }
314
315 #[tokio::test]
316 async fn udp_ipv4() {
317 trace_init();
318
319 let (_guard, addr) = next_addr();
320 test_datagram(DatagramSocketAddr::Udp(addr)).await;
321 }
322
323 #[tokio::test]
324 async fn udp_ipv6() {
325 trace_init();
326
327 let (_guard, addr) = next_addr_v6();
328 test_datagram(DatagramSocketAddr::Udp(addr)).await;
329 }
330
331 #[cfg(all(unix, not(target_os = "macos")))]
332 #[tokio::test]
333 async fn unix_datagram() {
334 trace_init();
335
336 test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
337 "unix_datagram_socket_test",
338 )))
339 .await;
340 }
341
342 #[tokio::test]
343 async fn tcp_stream() {
344 trace_init();
345
346 let (_guard, addr) = next_addr();
347 let config = SocketSinkConfig {
348 mode: Mode::Tcp(TcpMode {
349 config: TcpSinkConfig::from_address(addr.to_string()),
350 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
351 }),
352 acknowledgements: Default::default(),
353 };
354
355 let mut receiver = CountReceiver::receive_lines(addr);
356
357 let (lines, events) = random_lines_with_stream(10, 100, None);
358
359 assert_sink_compliance(&SINK_TAGS, async move {
360 let context = SinkContext::default();
361 let (sink, _healthcheck) = config.build(context).await.unwrap();
362
363 sink.run(events).await
364 })
365 .await
366 .expect("Running sink failed");
367
368 receiver.connected().await;
370
371 let output = receiver.await;
372 assert_eq!(lines.len(), output.len());
373 for (source, received) in lines.iter().zip(output) {
374 let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
375 let received = json.get("message").unwrap().as_str().unwrap();
376 assert_eq!(source, received);
377 }
378 }
379
380 #[cfg(unix)]
381 #[tokio::test]
382 async fn metrics_socket() {
383 trace_init();
384
385 let out_path = temp_uds_path("unix_socket_test");
386 let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
387
388 let config = SocketSinkConfig {
389 mode: Mode::UnixStream(UnixMode {
390 config: UnixSinkConfig::new(out_path),
391 encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
392 }),
393 acknowledgements: Default::default(),
394 };
395
396 let (expected, events) = random_metrics_with_stream(10, None, None);
397
398 assert_sink_compliance(&SINK_TAGS, async move {
399 let context = SinkContext::default();
400 let (sink, _healthcheck) = config.build(context).await.unwrap();
401
402 sink.run(events).await
403 })
404 .await
405 .expect("Running sink failed");
406
407 receiver.connected().await;
409
410 let output = receiver.await;
411 assert_eq!(expected.len(), output.len());
412 for (source, received) in expected.iter().zip(output) {
413 let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
414 let received = json.get("metric").unwrap();
415 let received_name = received.get("name").unwrap().as_str().unwrap();
416 assert_eq!(source.as_metric().name(), received_name);
417 }
418 }
419
420 #[tokio::test]
430 async fn tcp_stream_detects_disconnect() {
431 use std::{
432 pin::Pin,
433 sync::{
434 Arc,
435 atomic::{AtomicUsize, Ordering},
436 },
437 task::Poll,
438 };
439
440 use futures::{FutureExt, SinkExt, StreamExt, channel::mpsc};
441 use tokio::{
442 io::{AsyncRead, AsyncWriteExt, ReadBuf},
443 net::TcpStream,
444 task::yield_now,
445 time::{Duration, interval},
446 };
447 use tokio_stream::wrappers::IntervalStream;
448
449 use crate::{
450 event::EventArray,
451 tls::{self, MaybeTlsIncomingStream, MaybeTlsSettings, TlsConfig, TlsEnableableConfig},
452 };
453
454 trace_init();
455
456 let (_guard, addr) = next_addr();
457 let config = SocketSinkConfig {
458 mode: Mode::Tcp(TcpMode {
459 config: TcpSinkConfig::new(
460 addr.to_string(),
461 None,
462 Some(TlsEnableableConfig {
463 enabled: Some(true),
464 options: TlsConfig {
465 verify_certificate: Some(false),
466 verify_hostname: Some(false),
467 ca_file: Some(tls::TEST_PEM_CRT_PATH.into()),
468 ..Default::default()
469 },
470 }),
471 None,
472 ),
473 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
474 }),
475 acknowledgements: Default::default(),
476 };
477 let context = SinkContext::default();
478 let (sink, _healthcheck) = config.build(context).await.unwrap();
479 let (mut sender, receiver) = mpsc::channel::<Option<EventArray>>(0);
480 let jh1 = tokio::spawn(async move {
481 let stream = receiver
482 .take_while(|event| ready(event.is_some()))
483 .map(|event| event.unwrap())
484 .boxed();
485 run_and_assert_sink_compliance(sink, stream, &SINK_TAGS).await
486 });
487
488 let msg_counter = Arc::new(AtomicUsize::new(0));
489 let msg_counter1 = Arc::clone(&msg_counter);
490 let conn_counter = Arc::new(AtomicUsize::new(0));
491 let conn_counter1 = Arc::clone(&conn_counter);
492
493 let (close_tx, close_rx) = tokio::sync::oneshot::channel::<()>();
494 let mut close_rx = Some(close_rx.map(|x| x.unwrap()));
495
496 let config = Some(TlsEnableableConfig::test_config());
497
498 let jh2 = tokio::spawn(async move {
500 let tls = MaybeTlsSettings::from_config(config.as_ref(), true).unwrap();
501 let listener = tls.bind(&addr).await.unwrap();
502 listener
503 .accept_stream()
504 .take(2)
505 .for_each(|connection| {
506 let mut close_rx = close_rx.take();
507
508 conn_counter1.fetch_add(1, Ordering::SeqCst);
509 let msg_counter1 = Arc::clone(&msg_counter1);
510
511 let mut stream: MaybeTlsIncomingStream<TcpStream> = connection.unwrap();
512
513 std::future::poll_fn(move |cx| {
514 loop {
515 if let Some(fut) = close_rx.as_mut()
516 && let Poll::Ready(()) = fut.poll_unpin(cx)
517 {
518 stream
519 .get_mut()
520 .unwrap()
521 .shutdown()
522 .now_or_never()
523 .unwrap()
524 .unwrap();
525 close_rx = None;
526 }
527
528 let mut buf = [0u8; 11];
529 let mut buf = ReadBuf::new(&mut buf);
530 return match Pin::new(&mut stream).poll_read(cx, &mut buf) {
531 Poll::Ready(Ok(())) => {
532 if buf.filled().is_empty() {
533 Poll::Ready(())
534 } else {
535 msg_counter1.fetch_add(1, Ordering::SeqCst);
536 continue;
537 }
538 }
539 Poll::Ready(Err(error)) => panic!("{error}"),
540 Poll::Pending => Poll::Pending,
541 };
542 }
543 })
544 })
545 .await;
546 });
547
548 let (_, mut events) = random_lines_with_stream(10, 10, None);
549 while let Some(event) = events.next().await {
550 sender.send(Some(event)).await.unwrap();
551 }
552
553 IntervalStream::new(interval(Duration::from_millis(100)))
557 .take(500)
558 .take_while(|_| ready(msg_counter.load(Ordering::SeqCst) != 10))
559 .for_each(|_| ready(()))
560 .await;
561 close_tx.send(()).unwrap();
562
563 yield_now().await;
565
566 assert_eq!(msg_counter.load(Ordering::SeqCst), 10);
567 assert_eq!(conn_counter.load(Ordering::SeqCst), 1);
568
569 let (_, mut events) = random_lines_with_stream(10, 10, None);
571 while let Some(event) = events.next().await {
572 sender.send(Some(event)).await.unwrap();
573 }
574
575 sender.send(None).await.unwrap();
577 jh1.await.unwrap();
578 jh2.await.unwrap();
579
580 assert_eq!(msg_counter.load(Ordering::SeqCst), 20);
582 assert_eq!(conn_counter.load(Ordering::SeqCst), 2);
583 }
584
585 #[tokio::test]
587 async fn reconnect() {
588 trace_init();
589
590 let (_guard, addr) = next_addr();
591 let config = SocketSinkConfig {
592 mode: Mode::Tcp(TcpMode {
593 config: TcpSinkConfig::from_address(addr.to_string()),
594 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
595 }),
596 acknowledgements: Default::default(),
597 };
598
599 let context = SinkContext::default();
600 let (sink, _healthcheck) = config.build(context).await.unwrap();
601
602 let (_, events) = random_lines_with_stream(1000, 10000, None);
603 let sink_handle = tokio::spawn(run_and_assert_sink_compliance(sink, events, &SINK_TAGS));
604
605 let mut count = 20usize;
607 TcpListenerStream::new(TcpListener::bind(addr).await.unwrap())
608 .next()
609 .await
610 .unwrap()
611 .map(|socket| FramedRead::new(socket, LinesCodec::new()))
612 .unwrap()
613 .map(|x| x.unwrap())
614 .take_while(|_| {
615 ready(if count > 0 {
616 count -= 1;
617 true
618 } else {
619 false
620 })
621 })
622 .collect::<Vec<_>>()
623 .await;
624
625 if cfg!(windows) {
627 sleep(Duration::from_secs(1)).await;
629 }
630
631 assert!(
634 timeout(
635 Duration::from_secs(5),
636 CountReceiver::receive_lines(addr).connected()
637 )
638 .await
639 .is_ok()
640 );
641
642 sink_handle.await.unwrap();
643 }
644
645 #[cfg(unix)]
646 fn temp_uds_path(name: &str) -> PathBuf {
647 tempfile::tempdir().unwrap().keep().join(name)
648 }
649}