vector/internal_events/
udp.rs

1use metrics::counter;
2use vector_lib::NamedInternalEvent;
3use vector_lib::internal_event::{
4    ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
5};
6
7use crate::internal_events::SocketOutgoingConnectionError;
8
9// TODO: Get rid of this. UDP is connectionless, so there's no "successful" connect event, only
10// successfully binding a socket that can be used for receiving.
11#[derive(Debug, NamedInternalEvent)]
12pub struct UdpSocketConnectionEstablished;
13
14impl InternalEvent for UdpSocketConnectionEstablished {
15    fn emit(self) {
16        debug!(message = "Connected.");
17        counter!("connection_established_total", "mode" => "udp").increment(1);
18    }
19}
20
21// TODO: Get rid of this. UDP is connectionless, so there's no "unsuccessful" connect event, only
22// unsuccessfully binding a socket that can be used for receiving.
23#[derive(NamedInternalEvent)]
24pub struct UdpSocketOutgoingConnectionError<E> {
25    pub error: E,
26}
27
28impl<E: std::error::Error> InternalEvent for UdpSocketOutgoingConnectionError<E> {
29    fn emit(self) {
30        // ## skip check-duplicate-events ##
31        // ## skip check-validity-events ##
32        emit!(SocketOutgoingConnectionError { error: self.error });
33    }
34}
35
36#[derive(Debug, NamedInternalEvent)]
37pub struct UdpSendIncompleteError {
38    pub data_size: usize,
39    pub sent: usize,
40}
41
42impl InternalEvent for UdpSendIncompleteError {
43    fn emit(self) {
44        let reason = "Could not send all data in one UDP datagram.";
45        error!(
46            message = reason,
47            data_size = self.data_size,
48            sent = self.sent,
49            dropped = self.data_size - self.sent,
50            error_type = error_type::WRITER_FAILED,
51            stage = error_stage::SENDING,
52        );
53        counter!(
54            "component_errors_total",
55            "error_type" => error_type::WRITER_FAILED,
56            "stage" => error_stage::SENDING,
57        )
58        .increment(1);
59        // deprecated
60        counter!("connection_send_errors_total", "mode" => "udp").increment(1);
61
62        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
63    }
64}
65
66#[derive(Debug, NamedInternalEvent)]
67pub struct UdpChunkingError {
68    pub error: vector_common::Error,
69    pub data_size: usize,
70}
71
72impl InternalEvent for UdpChunkingError {
73    fn emit(self) {
74        let reason = "Could not chunk UDP datagram.";
75        error!(
76            message = reason,
77            data_size = self.data_size,
78            error = self.error,
79            error_type = error_type::WRITER_FAILED,
80            stage = error_stage::SENDING,
81        );
82        counter!(
83            "component_errors_total",
84            "error_type" => error_type::WRITER_FAILED,
85            "stage" => error_stage::SENDING,
86        )
87        .increment(1);
88
89        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
90    }
91}