vector/sources/util/
unix_datagram.rs1use std::{fs::remove_file, path::PathBuf};
2
3use bytes::{Bytes, BytesMut};
4use futures::StreamExt;
5use tokio::net::UnixDatagram;
6use tokio_util::codec::FramedRead;
7use tracing::field;
8use vector_lib::{
9 EstimatedJsonEncodedSizeOf,
10 codecs::StreamDecodingError,
11 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
12};
13
14use crate::{
15 SourceSender,
16 codecs::Decoder,
17 event::Event,
18 internal_events::{
19 SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
20 UnixSocketFileDeleteError,
21 },
22 shutdown::ShutdownSignal,
23 sources::{
24 Source,
25 util::{change_socket_permissions, unix::UNNAMED_SOCKET_HOST},
26 },
27};
28
29pub fn build_unix_datagram_source(
34 listen_path: PathBuf,
35 socket_file_mode: Option<u32>,
36 max_length: usize,
37 decoder: Decoder,
38 handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
39 shutdown: ShutdownSignal,
40 out: SourceSender,
41) -> crate::Result<Source> {
42 Ok(Box::pin(async move {
43 let socket = UnixDatagram::bind(&listen_path).expect("Failed to bind to datagram socket");
44 info!(message = "Listening.", path = ?listen_path, r#type = "unix_datagram");
45
46 change_socket_permissions(&listen_path, socket_file_mode)
47 .expect("Failed to set socket permissions");
48
49 let result = listen(socket, max_length, decoder, shutdown, handle_events, out).await;
50
51 if let Err(error) = remove_file(&listen_path) {
53 emit!(UnixSocketFileDeleteError {
54 path: &listen_path,
55 error
56 });
57 }
58
59 result
60 }))
61}
62
63async fn listen(
64 socket: UnixDatagram,
65 max_length: usize,
66 decoder: Decoder,
67 mut shutdown: ShutdownSignal,
68 handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
69 mut out: SourceSender,
70) -> Result<(), ()> {
71 let mut buf = BytesMut::with_capacity(max_length);
72 let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
73 loop {
74 buf.resize(max_length, 0);
75 tokio::select! {
76 recv = socket.recv_from(&mut buf) => {
77 let (byte_size, address) = recv.map_err(|error| {
78 let error = vector_lib::codecs::decoding::Error::FramingError(error.into());
79 emit!(SocketReceiveError {
80 mode: SocketMode::Unix,
81 error: &error
82 })
83 })?;
84
85 let span = info_span!("datagram");
86 let received_from = if !address.is_unnamed() {
87 let path = address.as_pathname().map(|e| e.to_owned()).inspect(|path| {
88 span.record("peer_path", field::debug(path));
89 });
90
91 path.map(|p| p.to_string_lossy().into_owned().into())
92 } else {
93 span.record("peer_path", field::debug(UNNAMED_SOCKET_HOST));
98 Some(UNNAMED_SOCKET_HOST.into())
99 };
100
101 bytes_received.emit(ByteSize(byte_size));
102
103 let payload = buf.split_to(byte_size);
104
105 let mut stream = FramedRead::new(payload.as_ref(), decoder.clone());
106
107 loop {
108 match stream.next().await {
109 Some(Ok((mut events, _byte_size))) => {
110 emit!(SocketEventsReceived {
111 mode: SocketMode::Unix,
112 byte_size: events.estimated_json_encoded_size_of(),
113 count: events.len()
114 });
115
116 handle_events(&mut events, received_from.clone());
117
118 let count = events.len();
119 if (out.send_batch(events).await).is_err() {
120 emit!(StreamClosedError { count });
121 }
122 },
123 Some(Err(error)) => {
124 emit!(SocketReceiveError {
125 mode: SocketMode::Unix,
126 error: &error
127 });
128 if !error.can_continue() {
129 break;
130 }
131 },
132 None => break,
133 }
134 }
135 }
136 _ = &mut shutdown => return Ok(()),
137 }
138 }
139}