vector/sinks/util/
datagram.rs1#[cfg(unix)]
2use std::path::PathBuf;
3
4use bytes::BytesMut;
5use futures::{stream::BoxStream, StreamExt};
6use futures_util::stream::Peekable;
7use tokio::net::UdpSocket;
8#[cfg(unix)]
9use tokio::net::UnixDatagram;
10use tokio_util::codec::Encoder;
11use vector_lib::internal_event::RegisterInternalEvent;
12use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle};
13use vector_lib::EstimatedJsonEncodedSizeOf;
14
15use crate::{
16 codecs::Transformer,
17 event::{Event, EventStatus, Finalizable},
18 internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError},
19};
20
21#[cfg(unix)]
22use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};
23
24pub enum DatagramSocket {
25 Udp(UdpSocket),
26 #[cfg(unix)]
27 Unix(UnixDatagram, PathBuf),
28}
29
30pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
31 input: &mut Peekable<BoxStream<'_, Event>>,
32 mut socket: DatagramSocket,
33 transformer: &Transformer,
34 encoder: &mut E,
35 bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
36) {
37 while let Some(mut event) = input.next().await {
38 let byte_size = event.estimated_json_encoded_size_of();
39
40 transformer.transform(&mut event);
41
42 let finalizers = event.take_finalizers();
43 let mut bytes = BytesMut::new();
44
45 if encoder.encode(event, &mut bytes).is_err() {
47 continue;
48 }
49
50 match send_datagram(&mut socket, &bytes).await {
51 Ok(()) => {
52 emit!(SocketEventsSent {
53 mode: match socket {
54 DatagramSocket::Udp(_) => SocketMode::Udp,
55 #[cfg(unix)]
56 DatagramSocket::Unix(..) => SocketMode::Unix,
57 },
58 count: 1,
59 byte_size,
60 });
61
62 bytes_sent.emit(ByteSize(bytes.len()));
63 finalizers.update_status(EventStatus::Delivered);
64 }
65 Err(error) => {
66 match socket {
67 DatagramSocket::Udp(_) => emit!(SocketSendError {
68 mode: SocketMode::Udp,
69 error
70 }),
71 #[cfg(unix)]
72 DatagramSocket::Unix(_, path) => {
73 emit!(UnixSocketSendError {
74 path: path.as_path(),
75 error: &error
76 })
77 }
78 };
79 finalizers.update_status(EventStatus::Errored);
80 return;
81 }
82 }
83 }
84}
85
86async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> {
87 let sent = match socket {
88 DatagramSocket::Udp(udp) => udp.send(buf).await,
89 #[cfg(unix)]
90 DatagramSocket::Unix(uds, _) => uds.send(buf).await,
91 }?;
92 if sent != buf.len() {
93 match socket {
94 DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError {
95 data_size: buf.len(),
96 sent,
97 }),
98 #[cfg(unix)]
99 DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError {
100 data_size: buf.len(),
101 sent,
102 }),
103 }
104 }
105 Ok(())
106}