vector/sources/util/
unix_datagram.rs

1use std::{fs::remove_file, path::PathBuf};
2
3use bytes::{Bytes, BytesMut};
4use futures::StreamExt;
5use tokio::net::UnixDatagram;
6use tokio_util::codec::FramedRead;
7use tracing::field;
8use vector_lib::{
9    EstimatedJsonEncodedSizeOf,
10    codecs::StreamDecodingError,
11    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
12};
13
14use crate::{
15    SourceSender,
16    codecs::Decoder,
17    event::Event,
18    internal_events::{
19        SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
20        UnixSocketFileDeleteError,
21    },
22    shutdown::ShutdownSignal,
23    sources::{
24        Source,
25        util::{change_socket_permissions, unix::UNNAMED_SOCKET_HOST},
26    },
27};
28
29/// Returns a `Source` object corresponding to a Unix domain datagram socket.
30/// Passing in different functions for `decoder` and `handle_events` can allow
31/// for different source-specific logic (such as decoding syslog messages in the
32/// syslog source).
33pub fn build_unix_datagram_source(
34    listen_path: PathBuf,
35    socket_file_mode: Option<u32>,
36    max_length: usize,
37    decoder: Decoder,
38    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
39    shutdown: ShutdownSignal,
40    out: SourceSender,
41) -> crate::Result<Source> {
42    Ok(Box::pin(async move {
43        let socket = UnixDatagram::bind(&listen_path).expect("Failed to bind to datagram socket");
44        info!(message = "Listening.", path = ?listen_path, r#type = "unix_datagram");
45
46        change_socket_permissions(&listen_path, socket_file_mode)
47            .expect("Failed to set socket permissions");
48
49        let result = listen(socket, max_length, decoder, shutdown, handle_events, out).await;
50
51        // Delete socket file.
52        if let Err(error) = remove_file(&listen_path) {
53            emit!(UnixSocketFileDeleteError {
54                path: &listen_path,
55                error
56            });
57        }
58
59        result
60    }))
61}
62
63async fn listen(
64    socket: UnixDatagram,
65    max_length: usize,
66    decoder: Decoder,
67    mut shutdown: ShutdownSignal,
68    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
69    mut out: SourceSender,
70) -> Result<(), ()> {
71    let mut buf = BytesMut::with_capacity(max_length);
72    let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
73    loop {
74        buf.resize(max_length, 0);
75        tokio::select! {
76            recv = socket.recv_from(&mut buf) => {
77                let (byte_size, address) = recv.map_err(|error| {
78                    let error = vector_lib::codecs::decoding::Error::FramingError(error.into());
79                    emit!(SocketReceiveError {
80                        mode: SocketMode::Unix,
81                        error: &error
82                    })
83                })?;
84
85                let span = info_span!("datagram");
86                let received_from = if !address.is_unnamed() {
87                    let path = address.as_pathname().map(|e| e.to_owned()).inspect(|path| {
88                        span.record("peer_path", field::debug(path));
89                    });
90
91                    path.map(|p| p.to_string_lossy().into_owned().into())
92                } else {
93                    // In most cases, we'll be connecting to this
94                    // socket from an unnamed socket (a socket not
95                    // bound to a file). Instead of a filename, we'll
96                    // surface a specific host value.
97                    span.record("peer_path", field::debug(UNNAMED_SOCKET_HOST));
98                    Some(UNNAMED_SOCKET_HOST.into())
99                };
100
101                bytes_received.emit(ByteSize(byte_size));
102
103                let payload = buf.split_to(byte_size);
104
105                let mut stream = FramedRead::new(payload.as_ref(), decoder.clone());
106
107                loop {
108                    match stream.next().await {
109                        Some(Ok((mut events, _byte_size))) => {
110                            emit!(SocketEventsReceived {
111                                mode: SocketMode::Unix,
112                                byte_size: events.estimated_json_encoded_size_of(),
113                                count: events.len()
114                            });
115
116                            handle_events(&mut events, received_from.clone());
117
118                            let count = events.len();
119                            if (out.send_batch(events).await).is_err() {
120                                emit!(StreamClosedError { count });
121                            }
122                        },
123                        Some(Err(error)) => {
124                            emit!(SocketReceiveError {
125                                mode: SocketMode::Unix,
126                                error: &error
127                            });
128                            if !error.can_continue() {
129                                break;
130                            }
131                        },
132                        None => break,
133                    }
134                }
135            }
136            _ = &mut shutdown => return Ok(()),
137        }
138    }
139}