vector/sources/util/
unix_datagram.rs1use std::{fs::remove_file, path::PathBuf};
2
3use bytes::{Bytes, BytesMut};
4use futures::StreamExt;
5use tokio::net::UnixDatagram;
6use tracing::field;
7use vector_lib::{
8 EstimatedJsonEncodedSizeOf,
9 codecs::{DecoderFramedRead, StreamDecodingError},
10 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
11};
12
13use crate::{
14 SourceSender,
15 codecs::Decoder,
16 event::Event,
17 internal_events::{
18 SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
19 UnixSocketFileDeleteError,
20 },
21 shutdown::ShutdownSignal,
22 sources::{
23 Source,
24 util::{change_socket_permissions, unix::UNNAMED_SOCKET_HOST},
25 },
26};
27
28pub fn build_unix_datagram_source(
33 listen_path: PathBuf,
34 socket_file_mode: Option<u32>,
35 max_length: usize,
36 decoder: Decoder,
37 handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
38 shutdown: ShutdownSignal,
39 out: SourceSender,
40) -> crate::Result<Source> {
41 Ok(Box::pin(async move {
42 let socket = UnixDatagram::bind(&listen_path).expect("Failed to bind to datagram socket");
43 info!(message = "Listening.", path = ?listen_path, r#type = "unix_datagram");
44
45 change_socket_permissions(&listen_path, socket_file_mode)
46 .expect("Failed to set socket permissions");
47
48 let result = listen(socket, max_length, decoder, shutdown, handle_events, out).await;
49
50 if let Err(error) = remove_file(&listen_path) {
52 emit!(UnixSocketFileDeleteError {
53 path: &listen_path,
54 error
55 });
56 }
57
58 result
59 }))
60}
61
62async fn listen(
63 socket: UnixDatagram,
64 max_length: usize,
65 decoder: Decoder,
66 mut shutdown: ShutdownSignal,
67 handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
68 mut out: SourceSender,
69) -> Result<(), ()> {
70 let mut buf = BytesMut::with_capacity(max_length);
71 let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
72 loop {
73 buf.resize(max_length, 0);
74 tokio::select! {
75 recv = socket.recv_from(&mut buf) => {
76 let (byte_size, address) = recv.map_err(|error| {
77 let error = vector_lib::codecs::decoding::Error::FramingError(error.into());
78 emit!(SocketReceiveError {
79 mode: SocketMode::Unix,
80 error: &error
81 })
82 })?;
83
84 let span = info_span!("datagram");
85 let received_from = if !address.is_unnamed() {
86 let path = address.as_pathname().map(|e| e.to_owned()).inspect(|path| {
87 span.record("peer_path", field::debug(path));
88 });
89
90 path.map(|p| p.to_string_lossy().into_owned().into())
91 } else {
92 span.record("peer_path", field::debug(UNNAMED_SOCKET_HOST));
97 Some(UNNAMED_SOCKET_HOST.into())
98 };
99
100 bytes_received.emit(ByteSize(byte_size));
101
102 let payload = buf.split_to(byte_size);
103
104 let mut stream = DecoderFramedRead::new(payload.as_ref(), decoder.clone());
105
106 loop {
107 match stream.next().await {
108 Some(Ok((mut events, _byte_size))) => {
109 emit!(SocketEventsReceived {
110 mode: SocketMode::Unix,
111 byte_size: events.estimated_json_encoded_size_of(),
112 count: events.len()
113 });
114
115 handle_events(&mut events, received_from.clone());
116
117 let count = events.len();
118 if (out.send_batch(events).await).is_err() {
119 emit!(StreamClosedError { count });
120 }
121 },
122 Some(Err(error)) => {
123 emit!(SocketReceiveError {
124 mode: SocketMode::Unix,
125 error: &error
126 });
127 if !error.can_continue() {
128 break;
129 }
130 },
131 None => break,
132 }
133 }
134 }
135 _ = &mut shutdown => return Ok(()),
136 }
137 }
138}