vector/sources/util/
unix_datagram.rs

1use std::{fs::remove_file, path::PathBuf};
2
3use bytes::{Bytes, BytesMut};
4use futures::StreamExt;
5use tokio::net::UnixDatagram;
6use tokio_util::codec::FramedRead;
7use tracing::field;
8use vector_lib::codecs::StreamDecodingError;
9use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
10use vector_lib::EstimatedJsonEncodedSizeOf;
11
12use crate::{
13    codecs::Decoder,
14    event::Event,
15    internal_events::{
16        SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
17        UnixSocketFileDeleteError,
18    },
19    shutdown::ShutdownSignal,
20    sources::util::change_socket_permissions,
21    sources::util::unix::UNNAMED_SOCKET_HOST,
22    sources::Source,
23    SourceSender,
24};
25
26/// Returns a `Source` object corresponding to a Unix domain datagram socket.
27/// Passing in different functions for `decoder` and `handle_events` can allow
28/// for different source-specific logic (such as decoding syslog messages in the
29/// syslog source).
30pub fn build_unix_datagram_source(
31    listen_path: PathBuf,
32    socket_file_mode: Option<u32>,
33    max_length: usize,
34    decoder: Decoder,
35    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
36    shutdown: ShutdownSignal,
37    out: SourceSender,
38) -> crate::Result<Source> {
39    Ok(Box::pin(async move {
40        let socket = UnixDatagram::bind(&listen_path).expect("Failed to bind to datagram socket");
41        info!(message = "Listening.", path = ?listen_path, r#type = "unix_datagram");
42
43        change_socket_permissions(&listen_path, socket_file_mode)
44            .expect("Failed to set socket permissions");
45
46        let result = listen(socket, max_length, decoder, shutdown, handle_events, out).await;
47
48        // Delete socket file.
49        if let Err(error) = remove_file(&listen_path) {
50            emit!(UnixSocketFileDeleteError {
51                path: &listen_path,
52                error
53            });
54        }
55
56        result
57    }))
58}
59
60async fn listen(
61    socket: UnixDatagram,
62    max_length: usize,
63    decoder: Decoder,
64    mut shutdown: ShutdownSignal,
65    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
66    mut out: SourceSender,
67) -> Result<(), ()> {
68    let mut buf = BytesMut::with_capacity(max_length);
69    let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
70    loop {
71        buf.resize(max_length, 0);
72        tokio::select! {
73            recv = socket.recv_from(&mut buf) => {
74                let (byte_size, address) = recv.map_err(|error| {
75                    let error = vector_lib::codecs::decoding::Error::FramingError(error.into());
76                    emit!(SocketReceiveError {
77                        mode: SocketMode::Unix,
78                        error: &error
79                    })
80                })?;
81
82                let span = info_span!("datagram");
83                let received_from = if !address.is_unnamed() {
84                    let path = address.as_pathname().map(|e| e.to_owned()).inspect(|path| {
85                        span.record("peer_path", field::debug(path));
86                    });
87
88                    path.map(|p| p.to_string_lossy().into_owned().into())
89                } else {
90                    // In most cases, we'll be connecting to this
91                    // socket from an unnamed socket (a socket not
92                    // bound to a file). Instead of a filename, we'll
93                    // surface a specific host value.
94                    span.record("peer_path", field::debug(UNNAMED_SOCKET_HOST));
95                    Some(UNNAMED_SOCKET_HOST.into())
96                };
97
98                bytes_received.emit(ByteSize(byte_size));
99
100                let payload = buf.split_to(byte_size);
101
102                let mut stream = FramedRead::new(payload.as_ref(), decoder.clone());
103
104                loop {
105                    match stream.next().await {
106                        Some(Ok((mut events, _byte_size))) => {
107                            emit!(SocketEventsReceived {
108                                mode: SocketMode::Unix,
109                                byte_size: events.estimated_json_encoded_size_of(),
110                                count: events.len()
111                            });
112
113                            handle_events(&mut events, received_from.clone());
114
115                            let count = events.len();
116                            if (out.send_batch(events).await).is_err() {
117                                emit!(StreamClosedError { count });
118                            }
119                        },
120                        Some(Err(error)) => {
121                            emit!(SocketReceiveError {
122                                mode: SocketMode::Unix,
123                                error: &error
124                            });
125                            if !error.can_continue() {
126                                break;
127                            }
128                        },
129                        None => break,
130                    }
131                }
132            }
133            _ = &mut shutdown => return Ok(()),
134        }
135    }
136}