vector/internal_events/
unix.rs

1use std::{io::Error, path::Path};
2
3use metrics::counter;
4use vector_lib::internal_event::{
5    error_stage, error_type, ComponentEventsDropped, InternalEvent, UNINTENTIONAL,
6};
7
8use crate::internal_events::SocketOutgoingConnectionError;
9
10#[derive(Debug)]
11pub struct UnixSocketConnectionEstablished<'a> {
12    pub path: &'a std::path::Path,
13}
14
15impl InternalEvent for UnixSocketConnectionEstablished<'_> {
16    fn emit(self) {
17        debug!(message = "Connected.", path = ?self.path);
18        counter!("connection_established_total", "mode" => "unix").increment(1);
19    }
20}
21
22#[derive(Debug)]
23pub struct UnixSocketOutgoingConnectionError<E> {
24    pub error: E,
25}
26
27impl<E: std::error::Error> InternalEvent for UnixSocketOutgoingConnectionError<E> {
28    fn emit(self) {
29        // ## skip check-duplicate-events ##
30        // ## skip check-validity-events ##
31        emit!(SocketOutgoingConnectionError { error: self.error });
32    }
33}
34
35#[cfg(all(
36    unix,
37    any(feature = "sources-utils-net-unix", feature = "sources-dnstap")
38))]
39#[derive(Debug)]
40pub struct UnixSocketError<'a, E> {
41    pub(crate) error: &'a E,
42    pub path: &'a std::path::Path,
43}
44
45#[cfg(all(
46    unix,
47    any(feature = "sources-utils-net-unix", feature = "sources-dnstap")
48))]
49impl<E: std::fmt::Display> InternalEvent for UnixSocketError<'_, E> {
50    fn emit(self) {
51        error!(
52            message = "Unix socket error.",
53            error = %self.error,
54            path = ?self.path,
55            error_type = error_type::CONNECTION_FAILED,
56            stage = error_stage::PROCESSING,
57            internal_log_rate_limit = true,
58        );
59        counter!(
60            "component_errors_total",
61            "error_type" => error_type::CONNECTION_FAILED,
62            "stage" => error_stage::PROCESSING,
63        )
64        .increment(1);
65    }
66}
67
68#[derive(Debug)]
69pub struct UnixSocketSendError<'a, E> {
70    pub(crate) error: &'a E,
71    pub path: &'a std::path::Path,
72}
73
74impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
75    fn emit(self) {
76        let reason = "Unix socket send error.";
77        error!(
78            message = reason,
79            error = %self.error,
80            path = ?self.path,
81            error_type = error_type::WRITER_FAILED,
82            stage = error_stage::SENDING,
83            internal_log_rate_limit = true,
84        );
85        counter!(
86            "component_errors_total",
87            "error_type" => error_type::WRITER_FAILED,
88            "stage" => error_stage::SENDING,
89        )
90        .increment(1);
91
92        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
93    }
94}
95
96#[derive(Debug)]
97pub struct UnixSendIncompleteError {
98    pub data_size: usize,
99    pub sent: usize,
100}
101
102impl InternalEvent for UnixSendIncompleteError {
103    fn emit(self) {
104        let reason = "Could not send all data in one Unix datagram.";
105        error!(
106            message = reason,
107            data_size = self.data_size,
108            sent = self.sent,
109            dropped = self.data_size - self.sent,
110            error_type = error_type::WRITER_FAILED,
111            stage = error_stage::SENDING,
112            internal_log_rate_limit = true,
113        );
114        counter!(
115            "component_errors_total",
116            "error_type" => error_type::WRITER_FAILED,
117            "stage" => error_stage::SENDING,
118        )
119        .increment(1);
120
121        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
122    }
123}
124
125#[derive(Debug)]
126pub struct UnixSocketFileDeleteError<'a> {
127    pub path: &'a Path,
128    pub error: Error,
129}
130
131impl InternalEvent for UnixSocketFileDeleteError<'_> {
132    fn emit(self) {
133        error!(
134            message = "Failed in deleting unix socket file.",
135            path = %self.path.display(),
136            error = %self.error,
137            error_code = "delete_socket_file",
138            error_type = error_type::WRITER_FAILED,
139            stage = error_stage::PROCESSING,
140            internal_log_rate_limit = true,
141        );
142        counter!(
143            "component_errors_total",
144            "error_code" => "delete_socket_file",
145            "error_type" => error_type::WRITER_FAILED,
146            "stage" => error_stage::PROCESSING,
147        )
148        .increment(1);
149    }
150}