vector/internal_events/
unix.rs

1#![allow(dead_code)] // TODO requires optional feature compilation
2
3use std::{io::Error, path::Path};
4
5use metrics::counter;
6use vector_lib::NamedInternalEvent;
7use vector_lib::internal_event::{
8    ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
9};
10
11use crate::internal_events::SocketOutgoingConnectionError;
12
13#[derive(Debug, NamedInternalEvent)]
14pub struct UnixSocketConnectionEstablished<'a> {
15    pub path: &'a std::path::Path,
16}
17
18impl InternalEvent for UnixSocketConnectionEstablished<'_> {
19    fn emit(self) {
20        debug!(message = "Connected.", path = ?self.path);
21        counter!("connection_established_total", "mode" => "unix").increment(1);
22    }
23}
24
25#[derive(Debug, NamedInternalEvent)]
26pub struct UnixSocketOutgoingConnectionError<E> {
27    pub error: E,
28}
29
30impl<E: std::error::Error> InternalEvent for UnixSocketOutgoingConnectionError<E> {
31    fn emit(self) {
32        // ## skip check-duplicate-events ##
33        // ## skip check-validity-events ##
34        emit!(SocketOutgoingConnectionError { error: self.error });
35    }
36}
37
38#[cfg(all(
39    unix,
40    any(feature = "sources-utils-net-unix", feature = "sources-dnstap")
41))]
42#[derive(Debug, NamedInternalEvent)]
43pub struct UnixSocketError<'a, E> {
44    pub(crate) error: &'a E,
45    pub path: &'a std::path::Path,
46}
47
48#[cfg(all(
49    unix,
50    any(feature = "sources-utils-net-unix", feature = "sources-dnstap")
51))]
52impl<E: std::fmt::Display> InternalEvent for UnixSocketError<'_, E> {
53    fn emit(self) {
54        error!(
55            message = "Unix socket error.",
56            error = %self.error,
57            path = ?self.path,
58            error_type = error_type::CONNECTION_FAILED,
59            stage = error_stage::PROCESSING,
60        );
61        counter!(
62            "component_errors_total",
63            "error_type" => error_type::CONNECTION_FAILED,
64            "stage" => error_stage::PROCESSING,
65        )
66        .increment(1);
67    }
68}
69
70#[derive(Debug, NamedInternalEvent)]
71pub struct UnixSocketSendError<'a, E> {
72    pub(crate) error: &'a E,
73    pub path: &'a std::path::Path,
74}
75
76impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
77    fn emit(self) {
78        let reason = "Unix socket send error.";
79        error!(
80            message = reason,
81            error = %self.error,
82            path = ?self.path,
83            error_type = error_type::WRITER_FAILED,
84            stage = error_stage::SENDING,
85        );
86        counter!(
87            "component_errors_total",
88            "error_type" => error_type::WRITER_FAILED,
89            "stage" => error_stage::SENDING,
90        )
91        .increment(1);
92
93        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
94    }
95}
96
97#[derive(Debug, NamedInternalEvent)]
98pub struct UnixSendIncompleteError {
99    pub data_size: usize,
100    pub sent: usize,
101}
102
103impl InternalEvent for UnixSendIncompleteError {
104    fn emit(self) {
105        let reason = "Could not send all data in one Unix datagram.";
106        error!(
107            message = reason,
108            data_size = self.data_size,
109            sent = self.sent,
110            dropped = self.data_size - self.sent,
111            error_type = error_type::WRITER_FAILED,
112            stage = error_stage::SENDING,
113        );
114        counter!(
115            "component_errors_total",
116            "error_type" => error_type::WRITER_FAILED,
117            "stage" => error_stage::SENDING,
118        )
119        .increment(1);
120
121        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
122    }
123}
124
125#[derive(Debug, NamedInternalEvent)]
126pub struct UnixSocketFileDeleteError<'a> {
127    pub path: &'a Path,
128    pub error: Error,
129}
130
131impl InternalEvent for UnixSocketFileDeleteError<'_> {
132    fn emit(self) {
133        error!(
134            message = "Failed in deleting unix socket file.",
135            path = %self.path.display(),
136            error = %self.error,
137            error_code = "delete_socket_file",
138            error_type = error_type::WRITER_FAILED,
139            stage = error_stage::PROCESSING,
140        );
141        counter!(
142            "component_errors_total",
143            "error_code" => "delete_socket_file",
144            "error_type" => error_type::WRITER_FAILED,
145            "stage" => error_stage::PROCESSING,
146        )
147        .increment(1);
148    }
149}