vector/internal_events/
unix.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use std::{io::Error, path::Path};
4
5use metrics::counter;
6use vector_lib::internal_event::{
7    ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8};
9
10use crate::internal_events::SocketOutgoingConnectionError;
11
12#[derive(Debug)]
13pub struct UnixSocketConnectionEstablished<'a> {
14    pub path: &'a std::path::Path,
15}
16
17impl InternalEvent for UnixSocketConnectionEstablished<'_> {
18    fn emit(self) {
19        debug!(message = "Connected.", path = ?self.path);
20        counter!("connection_established_total", "mode" => "unix").increment(1);
21    }
22}
23
24#[derive(Debug)]
25pub struct UnixSocketOutgoingConnectionError<E> {
26    pub error: E,
27}
28
29impl<E: std::error::Error> InternalEvent for UnixSocketOutgoingConnectionError<E> {
30    fn emit(self) {
31        // ## skip check-duplicate-events ##
32        // ## skip check-validity-events ##
33        emit!(SocketOutgoingConnectionError { error: self.error });
34    }
35}
36
37#[cfg(all(
38    unix,
39    any(feature = "sources-utils-net-unix", feature = "sources-dnstap")
40))]
41#[derive(Debug)]
42pub struct UnixSocketError<'a, E> {
43    pub(crate) error: &'a E,
44    pub path: &'a std::path::Path,
45}
46
47#[cfg(all(
48    unix,
49    any(feature = "sources-utils-net-unix", feature = "sources-dnstap")
50))]
51impl<E: std::fmt::Display> InternalEvent for UnixSocketError<'_, E> {
52    fn emit(self) {
53        error!(
54            message = "Unix socket error.",
55            error = %self.error,
56            path = ?self.path,
57            error_type = error_type::CONNECTION_FAILED,
58            stage = error_stage::PROCESSING,
59            internal_log_rate_limit = true,
60        );
61        counter!(
62            "component_errors_total",
63            "error_type" => error_type::CONNECTION_FAILED,
64            "stage" => error_stage::PROCESSING,
65        )
66        .increment(1);
67    }
68}
69
70#[derive(Debug)]
71pub struct UnixSocketSendError<'a, E> {
72    pub(crate) error: &'a E,
73    pub path: &'a std::path::Path,
74}
75
76impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
77    fn emit(self) {
78        let reason = "Unix socket send error.";
79        error!(
80            message = reason,
81            error = %self.error,
82            path = ?self.path,
83            error_type = error_type::WRITER_FAILED,
84            stage = error_stage::SENDING,
85            internal_log_rate_limit = true,
86        );
87        counter!(
88            "component_errors_total",
89            "error_type" => error_type::WRITER_FAILED,
90            "stage" => error_stage::SENDING,
91        )
92        .increment(1);
93
94        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
95    }
96}
97
98#[derive(Debug)]
99pub struct UnixSendIncompleteError {
100    pub data_size: usize,
101    pub sent: usize,
102}
103
104impl InternalEvent for UnixSendIncompleteError {
105    fn emit(self) {
106        let reason = "Could not send all data in one Unix datagram.";
107        error!(
108            message = reason,
109            data_size = self.data_size,
110            sent = self.sent,
111            dropped = self.data_size - self.sent,
112            error_type = error_type::WRITER_FAILED,
113            stage = error_stage::SENDING,
114            internal_log_rate_limit = true,
115        );
116        counter!(
117            "component_errors_total",
118            "error_type" => error_type::WRITER_FAILED,
119            "stage" => error_stage::SENDING,
120        )
121        .increment(1);
122
123        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
124    }
125}
126
127#[derive(Debug)]
128pub struct UnixSocketFileDeleteError<'a> {
129    pub path: &'a Path,
130    pub error: Error,
131}
132
133impl InternalEvent for UnixSocketFileDeleteError<'_> {
134    fn emit(self) {
135        error!(
136            message = "Failed in deleting unix socket file.",
137            path = %self.path.display(),
138            error = %self.error,
139            error_code = "delete_socket_file",
140            error_type = error_type::WRITER_FAILED,
141            stage = error_stage::PROCESSING,
142            internal_log_rate_limit = true,
143        );
144        counter!(
145            "component_errors_total",
146            "error_code" => "delete_socket_file",
147            "error_type" => error_type::WRITER_FAILED,
148            "stage" => error_stage::PROCESSING,
149        )
150        .increment(1);
151    }
152}