vector/sources/util/
unix_datagram.rs1use std::{fs::remove_file, path::PathBuf};
2
3use bytes::{Bytes, BytesMut};
4use futures::StreamExt;
5use tokio::net::UnixDatagram;
6use tokio_util::codec::FramedRead;
7use tracing::field;
8use vector_lib::codecs::StreamDecodingError;
9use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
10use vector_lib::EstimatedJsonEncodedSizeOf;
11
12use crate::{
13 codecs::Decoder,
14 event::Event,
15 internal_events::{
16 SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
17 UnixSocketFileDeleteError,
18 },
19 shutdown::ShutdownSignal,
20 sources::util::change_socket_permissions,
21 sources::util::unix::UNNAMED_SOCKET_HOST,
22 sources::Source,
23 SourceSender,
24};
25
26pub fn build_unix_datagram_source(
31 listen_path: PathBuf,
32 socket_file_mode: Option<u32>,
33 max_length: usize,
34 decoder: Decoder,
35 handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
36 shutdown: ShutdownSignal,
37 out: SourceSender,
38) -> crate::Result<Source> {
39 Ok(Box::pin(async move {
40 let socket = UnixDatagram::bind(&listen_path).expect("Failed to bind to datagram socket");
41 info!(message = "Listening.", path = ?listen_path, r#type = "unix_datagram");
42
43 change_socket_permissions(&listen_path, socket_file_mode)
44 .expect("Failed to set socket permissions");
45
46 let result = listen(socket, max_length, decoder, shutdown, handle_events, out).await;
47
48 if let Err(error) = remove_file(&listen_path) {
50 emit!(UnixSocketFileDeleteError {
51 path: &listen_path,
52 error
53 });
54 }
55
56 result
57 }))
58}
59
60async fn listen(
61 socket: UnixDatagram,
62 max_length: usize,
63 decoder: Decoder,
64 mut shutdown: ShutdownSignal,
65 handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
66 mut out: SourceSender,
67) -> Result<(), ()> {
68 let mut buf = BytesMut::with_capacity(max_length);
69 let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
70 loop {
71 buf.resize(max_length, 0);
72 tokio::select! {
73 recv = socket.recv_from(&mut buf) => {
74 let (byte_size, address) = recv.map_err(|error| {
75 let error = vector_lib::codecs::decoding::Error::FramingError(error.into());
76 emit!(SocketReceiveError {
77 mode: SocketMode::Unix,
78 error: &error
79 })
80 })?;
81
82 let span = info_span!("datagram");
83 let received_from = if !address.is_unnamed() {
84 let path = address.as_pathname().map(|e| e.to_owned()).inspect(|path| {
85 span.record("peer_path", field::debug(path));
86 });
87
88 path.map(|p| p.to_string_lossy().into_owned().into())
89 } else {
90 span.record("peer_path", field::debug(UNNAMED_SOCKET_HOST));
95 Some(UNNAMED_SOCKET_HOST.into())
96 };
97
98 bytes_received.emit(ByteSize(byte_size));
99
100 let payload = buf.split_to(byte_size);
101
102 let mut stream = FramedRead::new(payload.as_ref(), decoder.clone());
103
104 loop {
105 match stream.next().await {
106 Some(Ok((mut events, _byte_size))) => {
107 emit!(SocketEventsReceived {
108 mode: SocketMode::Unix,
109 byte_size: events.estimated_json_encoded_size_of(),
110 count: events.len()
111 });
112
113 handle_events(&mut events, received_from.clone());
114
115 let count = events.len();
116 if (out.send_batch(events).await).is_err() {
117 emit!(StreamClosedError { count });
118 }
119 },
120 Some(Err(error)) => {
121 emit!(SocketReceiveError {
122 mode: SocketMode::Unix,
123 error: &error
124 });
125 if !error.can_continue() {
126 break;
127 }
128 },
129 None => break,
130 }
131 }
132 }
133 _ = &mut shutdown => return Ok(()),
134 }
135 }
136}