1use vector_lib::{
2 codecs::{
3 TextSerializerConfig,
4 encoding::{Framer, FramingConfig},
5 },
6 configurable::configurable_component,
7};
8
9#[cfg(not(windows))]
10use crate::sinks::util::unix::UnixSinkConfig;
11use crate::{
12 codecs::{Encoder, EncodingConfig, EncodingConfigWithFraming, SinkType},
13 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
14 sinks::util::{tcp::TcpSinkConfig, udp::UdpSinkConfig},
15};
16
17#[configurable_component(sink("socket", "Deliver logs to a remote socket endpoint."))]
19#[derive(Clone, Debug)]
20pub struct SocketSinkConfig {
21 #[serde(flatten)]
22 pub mode: Mode,
23
24 #[configurable(derived)]
25 #[serde(
26 default,
27 deserialize_with = "crate::serde::bool_or_struct",
28 skip_serializing_if = "crate::serde::is_default"
29 )]
30 pub acknowledgements: AcknowledgementsConfig,
31}
32
33#[configurable_component]
35#[derive(Clone, Debug)]
36#[serde(tag = "mode", rename_all = "snake_case")]
37#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
38pub enum Mode {
39 Tcp(TcpMode),
41
42 Udp(UdpMode),
44
45 #[serde(alias = "unix")]
47 UnixStream(UnixMode),
48
49 UnixDatagram(UnixMode),
53}
54
55#[configurable_component]
57#[derive(Clone, Debug)]
58pub struct TcpMode {
59 #[serde(flatten)]
60 config: TcpSinkConfig,
61
62 #[serde(flatten)]
63 encoding: EncodingConfigWithFraming,
64}
65
66#[configurable_component]
68#[derive(Clone, Debug)]
69pub struct UdpMode {
70 #[serde(flatten)]
71 config: UdpSinkConfig,
72
73 #[configurable(derived)]
74 encoding: EncodingConfig,
75}
76
77#[configurable_component]
79#[derive(Clone, Debug)]
80pub struct UnixMode {
81 #[serde(flatten)]
82 config: UnixSinkConfig,
83
84 #[serde(flatten)]
85 encoding: EncodingConfigWithFraming,
86}
87
88#[cfg(windows)]
90#[configurable_component]
92#[derive(Clone, Debug)]
93pub struct UnixSinkConfig {
94 #[configurable(metadata(docs::examples = "/path/to/socket"))]
98 pub path: std::path::PathBuf,
99}
100
101impl GenerateConfig for SocketSinkConfig {
102 fn generate_config() -> toml::Value {
103 toml::from_str(
104 r#"address = "92.12.333.224:5000"
105 mode = "tcp"
106 encoding.codec = "json""#,
107 )
108 .unwrap()
109 }
110}
111
112impl SocketSinkConfig {
113 pub const fn new(mode: Mode, acknowledgements: AcknowledgementsConfig) -> Self {
114 SocketSinkConfig {
115 mode,
116 acknowledgements,
117 }
118 }
119
120 pub fn make_basic_tcp_config(
121 address: String,
122 acknowledgements: AcknowledgementsConfig,
123 ) -> Self {
124 Self::new(
125 Mode::Tcp(TcpMode {
126 config: TcpSinkConfig::from_address(address),
127 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
128 }),
129 acknowledgements,
130 )
131 }
132}
133
134#[async_trait::async_trait]
135#[typetag::serde(name = "socket")]
136impl SinkConfig for SocketSinkConfig {
137 async fn build(
138 &self,
139 _cx: SinkContext,
140 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
141 match &self.mode {
142 Mode::Tcp(TcpMode { config, encoding }) => {
143 let transformer = encoding.transformer();
144 let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
145 let encoder = Encoder::<Framer>::new(framer, serializer);
146 config.build(transformer, encoder)
147 }
148 Mode::Udp(UdpMode { config, encoding }) => {
149 let transformer = encoding.transformer();
150 let serializer = encoding.build()?;
151 let chunker = serializer.chunker();
152 let encoder = Encoder::<()>::new(serializer);
153 config.build(transformer, encoder, chunker)
154 }
155 #[cfg(unix)]
156 Mode::UnixStream(UnixMode { config, encoding }) => {
157 let transformer = encoding.transformer();
158 let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
159 let encoder = Encoder::<Framer>::new(framer, serializer);
160 config.build(
161 transformer,
162 encoder,
163 super::util::service::net::UnixMode::Stream,
164 )
165 }
166 #[allow(unused)]
167 #[cfg(unix)]
168 Mode::UnixDatagram(UnixMode { config, encoding }) => {
169 cfg_if! {
170 if #[cfg(not(target_os = "macos"))] {
171 let transformer = encoding.transformer();
172 let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
173 let encoder = Encoder::<Framer>::new(framer, serializer);
174 config.build(
175 transformer,
176 encoder,
177 super::util::service::net::UnixMode::Datagram,
178 )
179 }
180 else {
181 Err("UnixDatagram is not available on macOS platforms.".into())
182 }
183 }
184 }
185 #[cfg(not(unix))]
186 Mode::UnixStream(_) | Mode::UnixDatagram(_) => {
187 Err("Unix modes are supported only on Unix platforms.".into())
188 }
189 }
190 }
191
192 fn input(&self) -> Input {
193 let encoder_input_type = match &self.mode {
194 Mode::Tcp(TcpMode { encoding, .. }) => encoding.config().1.input_type(),
195 Mode::Udp(UdpMode { encoding, .. }) => encoding.config().input_type(),
196 Mode::UnixStream(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
197 Mode::UnixDatagram(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
198 };
199 Input::new(encoder_input_type)
200 }
201
202 fn acknowledgements(&self) -> &AcknowledgementsConfig {
203 &self.acknowledgements
204 }
205}
206
207#[cfg(test)]
208mod test {
209 use std::{
210 future::ready,
211 net::{SocketAddr, UdpSocket},
212 };
213
214 #[cfg(target_os = "windows")]
215 use cfg_if::cfg_if;
216 use futures::stream::StreamExt;
217 use futures_util::stream;
218 use serde_json::Value;
219 use tokio::{
220 net::TcpListener,
221 time::{Duration, sleep, timeout},
222 };
223 use tokio_stream::wrappers::TcpListenerStream;
224 use tokio_util::codec::{FramedRead, LinesCodec};
225 use vector_lib::codecs::JsonSerializerConfig;
226
227 use super::*;
228 cfg_if! { if #[cfg(unix)] {
229 use vector_lib::codecs::NativeJsonSerializerConfig;
230 use crate::test_util::random_metrics_with_stream;
231 use std::path::PathBuf;
232 } }
233 #[cfg(all(unix, not(target_os = "macos")))]
234 use std::os::unix::net::UnixDatagram;
235
236 use crate::{
237 config::SinkContext,
238 event::{Event, LogEvent},
239 test_util::{
240 CountReceiver,
241 components::{SINK_TAGS, assert_sink_compliance, run_and_assert_sink_compliance},
242 next_addr, next_addr_v6, random_lines_with_stream, trace_init,
243 },
244 };
245
246 #[test]
247 fn generate_config() {
248 crate::test_util::test_generate_config::<SocketSinkConfig>();
249 }
250
251 enum DatagramSocket {
252 Udp(UdpSocket),
253 #[cfg(all(unix, not(target_os = "macos")))]
254 Unix(UnixDatagram),
255 }
256
257 enum DatagramSocketAddr {
258 Udp(SocketAddr),
259 #[cfg(all(unix, not(target_os = "macos")))]
260 Unix(PathBuf),
261 }
262
263 async fn test_datagram(datagram_addr: DatagramSocketAddr) {
264 let receiver = match &datagram_addr {
265 DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
266 #[cfg(all(unix, not(target_os = "macos")))]
267 DatagramSocketAddr::Unix(path) => {
268 DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
269 }
270 };
271
272 let config = SocketSinkConfig {
273 mode: match &datagram_addr {
274 DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
275 config: UdpSinkConfig::from_address(addr.to_string()),
276 encoding: JsonSerializerConfig::default().into(),
277 }),
278 #[cfg(all(unix, not(target_os = "macos")))]
279 DatagramSocketAddr::Unix(path) => Mode::UnixDatagram(UnixMode {
280 config: UnixSinkConfig::new(path.to_path_buf()),
281 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
282 }),
283 },
284 acknowledgements: Default::default(),
285 };
286
287 let context = SinkContext::default();
288 assert_sink_compliance(&SINK_TAGS, async move {
289 let (sink, _healthcheck) = config.build(context).await.unwrap();
290
291 let event = Event::Log(LogEvent::from("raw log line"));
292 sink.run(stream::once(ready(event.into()))).await
293 })
294 .await
295 .expect("Running sink failed");
296
297 let mut buf = [0; 256];
298 let size = match &receiver {
299 DatagramSocket::Udp(sock) => {
300 sock.recv_from(&mut buf).expect("Did not receive message").0
301 }
302 #[cfg(all(unix, not(target_os = "macos")))]
303 DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
304 };
305
306 let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
307 let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
308 let data = data.as_object().expect("Not a JSON object");
309 assert!(data.get("timestamp").is_some());
310 let message = data.get("message").expect("No message in JSON");
311 assert_eq!(message, &Value::String("raw log line".into()));
312 }
313
314 #[tokio::test]
315 async fn udp_ipv4() {
316 trace_init();
317
318 test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
319 }
320
321 #[tokio::test]
322 async fn udp_ipv6() {
323 trace_init();
324
325 test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
326 }
327
328 #[cfg(all(unix, not(target_os = "macos")))]
329 #[tokio::test]
330 async fn unix_datagram() {
331 trace_init();
332
333 test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
334 "unix_datagram_socket_test",
335 )))
336 .await;
337 }
338
339 #[tokio::test]
340 async fn tcp_stream() {
341 trace_init();
342
343 let addr = next_addr();
344 let config = SocketSinkConfig {
345 mode: Mode::Tcp(TcpMode {
346 config: TcpSinkConfig::from_address(addr.to_string()),
347 encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
348 }),
349 acknowledgements: Default::default(),
350 };
351
352 let mut receiver = CountReceiver::receive_lines(addr);
353
354 let (lines, events) = random_lines_with_stream(10, 100, None);
355
356 assert_sink_compliance(&SINK_TAGS, async move {
357 let context = SinkContext::default();
358 let (sink, _healthcheck) = config.build(context).await.unwrap();
359
360 sink.run(events).await
361 })
362 .await
363 .expect("Running sink failed");
364
365 receiver.connected().await;
367
368 let output = receiver.await;
369 assert_eq!(lines.len(), output.len());
370 for (source, received) in lines.iter().zip(output) {
371 let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
372 let received = json.get("message").unwrap().as_str().unwrap();
373 assert_eq!(source, received);
374 }
375 }
376
377 #[cfg(unix)]
378 #[tokio::test]
379 async fn metrics_socket() {
380 trace_init();
381
382 let out_path = temp_uds_path("unix_socket_test");
383 let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());
384
385 let config = SocketSinkConfig {
386 mode: Mode::UnixStream(UnixMode {
387 config: UnixSinkConfig::new(out_path),
388 encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
389 }),
390 acknowledgements: Default::default(),
391 };
392
393 let (expected, events) = random_metrics_with_stream(10, None, None);
394
395 assert_sink_compliance(&SINK_TAGS, async move {
396 let context = SinkContext::default();
397 let (sink, _healthcheck) = config.build(context).await.unwrap();
398
399 sink.run(events).await
400 })
401 .await
402 .expect("Running sink failed");
403
404 receiver.connected().await;
406
407 let output = receiver.await;
408 assert_eq!(expected.len(), output.len());
409 for (source, received) in expected.iter().zip(output) {
410 let json = serde_json::from_str::<Value>(&received).expect("Invalid JSON");
411 let received = json.get("metric").unwrap();
412 let received_name = received.get("name").unwrap().as_str().unwrap();
413 assert_eq!(source.as_metric().name(), received_name);
414 }
415 }
416
417 #[tokio::test]
427 async fn tcp_stream_detects_disconnect() {
428 use std::{
429 pin::Pin,
430 sync::{
431 Arc,
432 atomic::{AtomicUsize, Ordering},
433 },
434 task::Poll,
435 };
436
437 use futures::{FutureExt, SinkExt, StreamExt, channel::mpsc};
438 use tokio::{
439 io::{AsyncRead, AsyncWriteExt, ReadBuf},
440 net::TcpStream,
441 task::yield_now,
442 time::{Duration, interval},
443 };
444 use tokio_stream::wrappers::IntervalStream;
445
446 use crate::{
447 event::EventArray,
448 tls::{self, MaybeTlsIncomingStream, MaybeTlsSettings, TlsConfig, TlsEnableableConfig},
449 };
450
451 trace_init();
452
453 let addr = next_addr();
454 let config = SocketSinkConfig {
455 mode: Mode::Tcp(TcpMode {
456 config: TcpSinkConfig::new(
457 addr.to_string(),
458 None,
459 Some(TlsEnableableConfig {
460 enabled: Some(true),
461 options: TlsConfig {
462 verify_certificate: Some(false),
463 verify_hostname: Some(false),
464 ca_file: Some(tls::TEST_PEM_CRT_PATH.into()),
465 ..Default::default()
466 },
467 }),
468 None,
469 ),
470 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
471 }),
472 acknowledgements: Default::default(),
473 };
474 let context = SinkContext::default();
475 let (sink, _healthcheck) = config.build(context).await.unwrap();
476 let (mut sender, receiver) = mpsc::channel::<Option<EventArray>>(0);
477 let jh1 = tokio::spawn(async move {
478 let stream = receiver
479 .take_while(|event| ready(event.is_some()))
480 .map(|event| event.unwrap())
481 .boxed();
482 run_and_assert_sink_compliance(sink, stream, &SINK_TAGS).await
483 });
484
485 let msg_counter = Arc::new(AtomicUsize::new(0));
486 let msg_counter1 = Arc::clone(&msg_counter);
487 let conn_counter = Arc::new(AtomicUsize::new(0));
488 let conn_counter1 = Arc::clone(&conn_counter);
489
490 let (close_tx, close_rx) = tokio::sync::oneshot::channel::<()>();
491 let mut close_rx = Some(close_rx.map(|x| x.unwrap()));
492
493 let config = Some(TlsEnableableConfig::test_config());
494
495 let jh2 = tokio::spawn(async move {
497 let tls = MaybeTlsSettings::from_config(config.as_ref(), true).unwrap();
498 let listener = tls.bind(&addr).await.unwrap();
499 listener
500 .accept_stream()
501 .take(2)
502 .for_each(|connection| {
503 let mut close_rx = close_rx.take();
504
505 conn_counter1.fetch_add(1, Ordering::SeqCst);
506 let msg_counter1 = Arc::clone(&msg_counter1);
507
508 let mut stream: MaybeTlsIncomingStream<TcpStream> = connection.unwrap();
509
510 std::future::poll_fn(move |cx| {
511 loop {
512 if let Some(fut) = close_rx.as_mut()
513 && let Poll::Ready(()) = fut.poll_unpin(cx)
514 {
515 stream
516 .get_mut()
517 .unwrap()
518 .shutdown()
519 .now_or_never()
520 .unwrap()
521 .unwrap();
522 close_rx = None;
523 }
524
525 let mut buf = [0u8; 11];
526 let mut buf = ReadBuf::new(&mut buf);
527 return match Pin::new(&mut stream).poll_read(cx, &mut buf) {
528 Poll::Ready(Ok(())) => {
529 if buf.filled().is_empty() {
530 Poll::Ready(())
531 } else {
532 msg_counter1.fetch_add(1, Ordering::SeqCst);
533 continue;
534 }
535 }
536 Poll::Ready(Err(error)) => panic!("{error}"),
537 Poll::Pending => Poll::Pending,
538 };
539 }
540 })
541 })
542 .await;
543 });
544
545 let (_, mut events) = random_lines_with_stream(10, 10, None);
546 while let Some(event) = events.next().await {
547 sender.send(Some(event)).await.unwrap();
548 }
549
550 IntervalStream::new(interval(Duration::from_millis(100)))
554 .take(500)
555 .take_while(|_| ready(msg_counter.load(Ordering::SeqCst) != 10))
556 .for_each(|_| ready(()))
557 .await;
558 close_tx.send(()).unwrap();
559
560 yield_now().await;
562
563 assert_eq!(msg_counter.load(Ordering::SeqCst), 10);
564 assert_eq!(conn_counter.load(Ordering::SeqCst), 1);
565
566 let (_, mut events) = random_lines_with_stream(10, 10, None);
568 while let Some(event) = events.next().await {
569 sender.send(Some(event)).await.unwrap();
570 }
571
572 sender.send(None).await.unwrap();
574 jh1.await.unwrap();
575 jh2.await.unwrap();
576
577 assert_eq!(msg_counter.load(Ordering::SeqCst), 20);
579 assert_eq!(conn_counter.load(Ordering::SeqCst), 2);
580 }
581
582 #[tokio::test]
584 async fn reconnect() {
585 trace_init();
586
587 let addr = next_addr();
588 let config = SocketSinkConfig {
589 mode: Mode::Tcp(TcpMode {
590 config: TcpSinkConfig::from_address(addr.to_string()),
591 encoding: (None::<FramingConfig>, TextSerializerConfig::default()).into(),
592 }),
593 acknowledgements: Default::default(),
594 };
595
596 let context = SinkContext::default();
597 let (sink, _healthcheck) = config.build(context).await.unwrap();
598
599 let (_, events) = random_lines_with_stream(1000, 10000, None);
600 let sink_handle = tokio::spawn(run_and_assert_sink_compliance(sink, events, &SINK_TAGS));
601
602 let mut count = 20usize;
604 TcpListenerStream::new(TcpListener::bind(addr).await.unwrap())
605 .next()
606 .await
607 .unwrap()
608 .map(|socket| FramedRead::new(socket, LinesCodec::new()))
609 .unwrap()
610 .map(|x| x.unwrap())
611 .take_while(|_| {
612 ready(if count > 0 {
613 count -= 1;
614 true
615 } else {
616 false
617 })
618 })
619 .collect::<Vec<_>>()
620 .await;
621
622 if cfg!(windows) {
624 sleep(Duration::from_secs(1)).await;
626 }
627
628 assert!(
631 timeout(
632 Duration::from_secs(5),
633 CountReceiver::receive_lines(addr).connected()
634 )
635 .await
636 .is_ok()
637 );
638
639 sink_handle.await.unwrap();
640 }
641
642 #[cfg(unix)]
643 fn temp_uds_path(name: &str) -> PathBuf {
644 tempfile::tempdir().unwrap().keep().join(name)
645 }
646}