1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#[cfg(unix)]
use std::path::PathBuf;

use bytes::BytesMut;
use futures::{stream::BoxStream, StreamExt};
use futures_util::stream::Peekable;
use tokio::net::UdpSocket;
#[cfg(unix)]
use tokio::net::UnixDatagram;
use tokio_util::codec::Encoder;
use vector_lib::internal_event::RegisterInternalEvent;
use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle};
use vector_lib::EstimatedJsonEncodedSizeOf;

use crate::{
    codecs::Transformer,
    event::{Event, EventStatus, Finalizable},
    internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError},
};

#[cfg(unix)]
use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};

pub enum DatagramSocket {
    Udp(UdpSocket),
    #[cfg(unix)]
    Unix(UnixDatagram, PathBuf),
}

pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
    input: &mut Peekable<BoxStream<'_, Event>>,
    mut socket: DatagramSocket,
    transformer: &Transformer,
    encoder: &mut E,
    bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
) {
    while let Some(mut event) = input.next().await {
        let byte_size = event.estimated_json_encoded_size_of();

        transformer.transform(&mut event);

        let finalizers = event.take_finalizers();
        let mut bytes = BytesMut::new();

        // Errors are handled by `Encoder`.
        if encoder.encode(event, &mut bytes).is_err() {
            continue;
        }

        match send_datagram(&mut socket, &bytes).await {
            Ok(()) => {
                emit!(SocketEventsSent {
                    mode: match socket {
                        DatagramSocket::Udp(_) => SocketMode::Udp,
                        #[cfg(unix)]
                        DatagramSocket::Unix(..) => SocketMode::Unix,
                    },
                    count: 1,
                    byte_size,
                });

                bytes_sent.emit(ByteSize(bytes.len()));
                finalizers.update_status(EventStatus::Delivered);
            }
            Err(error) => {
                match socket {
                    DatagramSocket::Udp(_) => emit!(SocketSendError {
                        mode: SocketMode::Udp,
                        error
                    }),
                    #[cfg(unix)]
                    DatagramSocket::Unix(_, path) => {
                        emit!(UnixSocketSendError {
                            path: path.as_path(),
                            error: &error
                        })
                    }
                };
                finalizers.update_status(EventStatus::Errored);
                return;
            }
        }
    }
}

async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> {
    let sent = match socket {
        DatagramSocket::Udp(udp) => udp.send(buf).await,
        #[cfg(unix)]
        DatagramSocket::Unix(uds, _) => uds.send(buf).await,
    }?;
    if sent != buf.len() {
        match socket {
            DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError {
                data_size: buf.len(),
                sent,
            }),
            #[cfg(unix)]
            DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError {
                data_size: buf.len(),
                sent,
            }),
        }
    }
    Ok(())
}