vector/internal_events/
udp.rs

1use metrics::counter;
2use vector_lib::internal_event::{
3    ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
4};
5
6use crate::internal_events::SocketOutgoingConnectionError;
7
8// TODO: Get rid of this. UDP is connectionless, so there's no "successful" connect event, only
9// successfully binding a socket that can be used for receiving.
10#[derive(Debug)]
11pub struct UdpSocketConnectionEstablished;
12
13impl InternalEvent for UdpSocketConnectionEstablished {
14    fn emit(self) {
15        debug!(message = "Connected.");
16        counter!("connection_established_total", "mode" => "udp").increment(1);
17    }
18}
19
20// TODO: Get rid of this. UDP is connectionless, so there's no "unsuccessful" connect event, only
21// unsuccessfully binding a socket that can be used for receiving.
22pub struct UdpSocketOutgoingConnectionError<E> {
23    pub error: E,
24}
25
26impl<E: std::error::Error> InternalEvent for UdpSocketOutgoingConnectionError<E> {
27    fn emit(self) {
28        // ## skip check-duplicate-events ##
29        // ## skip check-validity-events ##
30        emit!(SocketOutgoingConnectionError { error: self.error });
31    }
32}
33
34#[derive(Debug)]
35pub struct UdpSendIncompleteError {
36    pub data_size: usize,
37    pub sent: usize,
38}
39
40impl InternalEvent for UdpSendIncompleteError {
41    fn emit(self) {
42        let reason = "Could not send all data in one UDP datagram.";
43        error!(
44            message = reason,
45            data_size = self.data_size,
46            sent = self.sent,
47            dropped = self.data_size - self.sent,
48            error_type = error_type::WRITER_FAILED,
49            stage = error_stage::SENDING,
50        );
51        counter!(
52            "component_errors_total",
53            "error_type" => error_type::WRITER_FAILED,
54            "stage" => error_stage::SENDING,
55        )
56        .increment(1);
57        // deprecated
58        counter!("connection_send_errors_total", "mode" => "udp").increment(1);
59
60        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
61    }
62}
63
64#[derive(Debug)]
65pub struct UdpChunkingError {
66    pub error: vector_common::Error,
67    pub data_size: usize,
68}
69
70impl InternalEvent for UdpChunkingError {
71    fn emit(self) {
72        let reason = "Could not chunk UDP datagram.";
73        error!(
74            message = reason,
75            data_size = self.data_size,
76            error = self.error,
77            error_type = error_type::WRITER_FAILED,
78            stage = error_stage::SENDING,
79        );
80        counter!(
81            "component_errors_total",
82            "error_type" => error_type::WRITER_FAILED,
83            "stage" => error_stage::SENDING,
84        )
85        .increment(1);
86
87        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
88    }
89}