vector/internal_events/
exec.rs

1use std::time::Duration;
2
3use metrics::{counter, histogram};
4use tokio::time::error::Elapsed;
5use vector_lib::{
6    internal_event::{
7        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8    },
9    json_size::JsonSize,
10};
11
12use super::prelude::io_error_code;
13
14#[derive(Debug)]
15pub struct ExecEventsReceived<'a> {
16    pub count: usize,
17    pub command: &'a str,
18    pub byte_size: JsonSize,
19}
20
21impl InternalEvent for ExecEventsReceived<'_> {
22    fn emit(self) {
23        trace!(
24            message = "Events received.",
25            count = self.count,
26            byte_size = self.byte_size.get(),
27            command = %self.command,
28        );
29        counter!(
30            "component_received_events_total",
31            "command" => self.command.to_owned(),
32        )
33        .increment(self.count as u64);
34        counter!(
35            "component_received_event_bytes_total",
36            "command" => self.command.to_owned(),
37        )
38        .increment(self.byte_size.get() as u64);
39    }
40}
41
42#[derive(Debug)]
43pub struct ExecFailedError<'a> {
44    pub command: &'a str,
45    pub error: std::io::Error,
46}
47
48impl InternalEvent for ExecFailedError<'_> {
49    fn emit(self) {
50        error!(
51            message = "Unable to exec.",
52            command = %self.command,
53            error = ?self.error,
54            error_type = error_type::COMMAND_FAILED,
55            error_code = %io_error_code(&self.error),
56            stage = error_stage::RECEIVING,
57            internal_log_rate_limit = true,
58        );
59        counter!(
60            "component_errors_total",
61            "command" => self.command.to_owned(),
62            "error_type" => error_type::COMMAND_FAILED,
63            "error_code" => io_error_code(&self.error),
64            "stage" => error_stage::RECEIVING,
65        )
66        .increment(1);
67    }
68}
69
70#[derive(Debug)]
71pub struct ExecTimeoutError<'a> {
72    pub command: &'a str,
73    pub elapsed_seconds: u64,
74    pub error: Elapsed,
75}
76
77impl InternalEvent for ExecTimeoutError<'_> {
78    fn emit(self) {
79        error!(
80            message = "Timeout during exec.",
81            command = %self.command,
82            elapsed_seconds = %self.elapsed_seconds,
83            error = %self.error,
84            error_type = error_type::TIMED_OUT,
85            stage = error_stage::RECEIVING,
86            internal_log_rate_limit = true,
87        );
88        counter!(
89            "component_errors_total",
90            "command" => self.command.to_owned(),
91            "error_type" => error_type::TIMED_OUT,
92            "stage" => error_stage::RECEIVING,
93        )
94        .increment(1);
95    }
96}
97
98#[derive(Debug)]
99pub struct ExecCommandExecuted<'a> {
100    pub command: &'a str,
101    pub exit_status: Option<i32>,
102    pub exec_duration: Duration,
103}
104
105impl ExecCommandExecuted<'_> {
106    fn exit_status_string(&self) -> String {
107        match self.exit_status {
108            Some(exit_status) => exit_status.to_string(),
109            None => "unknown".to_string(),
110        }
111    }
112}
113
114impl InternalEvent for ExecCommandExecuted<'_> {
115    fn emit(self) {
116        let exit_status = self.exit_status_string();
117        trace!(
118            message = "Executed command.",
119            command = %self.command,
120            exit_status = %exit_status,
121            elapsed_millis = %self.exec_duration.as_millis(),
122            internal_log_rate_limit = true,
123        );
124        counter!(
125            "command_executed_total",
126            "command" => self.command.to_owned(),
127            "exit_status" => exit_status.clone(),
128        )
129        .increment(1);
130
131        histogram!(
132            "command_execution_duration_seconds",
133            "exit_status" => exit_status,
134            "command" => self.command.to_owned(),
135        )
136        .record(self.exec_duration);
137    }
138}
139
140pub enum ExecFailedToSignalChild {
141    #[cfg(unix)]
142    SignalError(nix::errno::Errno),
143    #[cfg(unix)]
144    FailedToMarshalPid(std::num::TryFromIntError),
145    #[cfg(unix)]
146    NoPid,
147    #[cfg(windows)]
148    IoError(std::io::Error),
149}
150
151impl ExecFailedToSignalChild {
152    fn to_error_code(&self) -> String {
153        use ExecFailedToSignalChild::*;
154
155        match self {
156            #[cfg(unix)]
157            SignalError(err) => format!("errno_{err}"),
158            #[cfg(unix)]
159            FailedToMarshalPid(_) => String::from("failed_to_marshal_pid"),
160            #[cfg(unix)]
161            NoPid => String::from("no_pid"),
162            #[cfg(windows)]
163            IoError(err) => err.to_string(),
164        }
165    }
166}
167
168impl std::fmt::Display for ExecFailedToSignalChild {
169    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
170        use ExecFailedToSignalChild::*;
171
172        match self {
173            #[cfg(unix)]
174            SignalError(err) => write!(f, "errno: {err}"),
175            #[cfg(unix)]
176            FailedToMarshalPid(err) => write!(f, "failed to marshal pid to i32: {err}"),
177            #[cfg(unix)]
178            NoPid => write!(f, "child had no pid"),
179            #[cfg(windows)]
180            IoError(err) => write!(f, "io error: {}", err),
181        }
182    }
183}
184
185pub struct ExecFailedToSignalChildError<'a> {
186    pub command: &'a tokio::process::Command,
187    pub error: ExecFailedToSignalChild,
188}
189
190impl InternalEvent for ExecFailedToSignalChildError<'_> {
191    fn emit(self) {
192        error!(
193            message = %format!("Failed to send SIGTERM to child, aborting early: {}", self.error),
194            command = ?self.command.as_std(),
195            error_code = %self.error.to_error_code(),
196            error_type = error_type::COMMAND_FAILED,
197            stage = error_stage::RECEIVING,
198            internal_log_rate_limit = true,
199        );
200        counter!(
201            "component_errors_total",
202            "command" => format!("{:?}", self.command.as_std()),
203            "error_code" => self.error.to_error_code(),
204            "error_type" => error_type::COMMAND_FAILED,
205            "stage" => error_stage::RECEIVING,
206        )
207        .increment(1);
208    }
209}
210
211pub struct ExecChannelClosedError;
212
213impl InternalEvent for ExecChannelClosedError {
214    fn emit(self) {
215        let exec_reason = "Receive channel closed, unable to send.";
216        error!(
217            message = exec_reason,
218            error_type = error_type::COMMAND_FAILED,
219            stage = error_stage::RECEIVING,
220            internal_log_rate_limit = true,
221        );
222        counter!(
223            "component_errors_total",
224            "error_type" => error_type::COMMAND_FAILED,
225            "stage" => error_stage::RECEIVING,
226        )
227        .increment(1);
228        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
229            count: 1,
230            reason: exec_reason
231        });
232    }
233}