vector/sources/util/
unix_datagram.rs

1use std::{fs::remove_file, path::PathBuf};
2
3use bytes::{Bytes, BytesMut};
4use futures::StreamExt;
5use tokio::net::UnixDatagram;
6use tracing::field;
7use vector_lib::{
8    EstimatedJsonEncodedSizeOf,
9    codecs::{DecoderFramedRead, StreamDecodingError},
10    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
11};
12
13use crate::{
14    SourceSender,
15    codecs::Decoder,
16    event::Event,
17    internal_events::{
18        SocketEventsReceived, SocketMode, SocketReceiveError, StreamClosedError,
19        UnixSocketFileDeleteError,
20    },
21    shutdown::ShutdownSignal,
22    sources::{
23        Source,
24        util::{change_socket_permissions, unix::UNNAMED_SOCKET_HOST},
25    },
26};
27
28/// Returns a `Source` object corresponding to a Unix domain datagram socket.
29/// Passing in different functions for `decoder` and `handle_events` can allow
30/// for different source-specific logic (such as decoding syslog messages in the
31/// syslog source).
32pub fn build_unix_datagram_source(
33    listen_path: PathBuf,
34    socket_file_mode: Option<u32>,
35    max_length: usize,
36    decoder: Decoder,
37    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
38    shutdown: ShutdownSignal,
39    out: SourceSender,
40) -> crate::Result<Source> {
41    Ok(Box::pin(async move {
42        let socket = UnixDatagram::bind(&listen_path).expect("Failed to bind to datagram socket");
43        info!(message = "Listening.", path = ?listen_path, r#type = "unix_datagram");
44
45        change_socket_permissions(&listen_path, socket_file_mode)
46            .expect("Failed to set socket permissions");
47
48        let result = listen(socket, max_length, decoder, shutdown, handle_events, out).await;
49
50        // Delete socket file.
51        if let Err(error) = remove_file(&listen_path) {
52            emit!(UnixSocketFileDeleteError {
53                path: &listen_path,
54                error
55            });
56        }
57
58        result
59    }))
60}
61
62async fn listen(
63    socket: UnixDatagram,
64    max_length: usize,
65    decoder: Decoder,
66    mut shutdown: ShutdownSignal,
67    handle_events: impl Fn(&mut [Event], Option<Bytes>) + Clone + Send + Sync + 'static,
68    mut out: SourceSender,
69) -> Result<(), ()> {
70    let mut buf = BytesMut::with_capacity(max_length);
71    let bytes_received = register!(BytesReceived::from(Protocol::UNIX));
72    loop {
73        buf.resize(max_length, 0);
74        tokio::select! {
75            recv = socket.recv_from(&mut buf) => {
76                let (byte_size, address) = recv.map_err(|error| {
77                    let error = vector_lib::codecs::decoding::Error::FramingError(error.into());
78                    emit!(SocketReceiveError {
79                        mode: SocketMode::Unix,
80                        error: &error
81                    })
82                })?;
83
84                let span = info_span!("datagram");
85                let received_from = if !address.is_unnamed() {
86                    let path = address.as_pathname().map(|e| e.to_owned()).inspect(|path| {
87                        span.record("peer_path", field::debug(path));
88                    });
89
90                    path.map(|p| p.to_string_lossy().into_owned().into())
91                } else {
92                    // In most cases, we'll be connecting to this
93                    // socket from an unnamed socket (a socket not
94                    // bound to a file). Instead of a filename, we'll
95                    // surface a specific host value.
96                    span.record("peer_path", field::debug(UNNAMED_SOCKET_HOST));
97                    Some(UNNAMED_SOCKET_HOST.into())
98                };
99
100                bytes_received.emit(ByteSize(byte_size));
101
102                let payload = buf.split_to(byte_size);
103
104                let mut stream = DecoderFramedRead::new(payload.as_ref(), decoder.clone());
105
106                loop {
107                    match stream.next().await {
108                        Some(Ok((mut events, _byte_size))) => {
109                            emit!(SocketEventsReceived {
110                                mode: SocketMode::Unix,
111                                byte_size: events.estimated_json_encoded_size_of(),
112                                count: events.len()
113                            });
114
115                            handle_events(&mut events, received_from.clone());
116
117                            let count = events.len();
118                            if (out.send_batch(events).await).is_err() {
119                                emit!(StreamClosedError { count });
120                            }
121                        },
122                        Some(Err(error)) => {
123                            emit!(SocketReceiveError {
124                                mode: SocketMode::Unix,
125                                error: &error
126                            });
127                            if !error.can_continue() {
128                                break;
129                            }
130                        },
131                        None => break,
132                    }
133                }
134            }
135            _ = &mut shutdown => return Ok(()),
136        }
137    }
138}