vector/internal_events/
exec.rs

1use std::time::Duration;
2
3use metrics::{counter, histogram};
4use tokio::time::error::Elapsed;
5use vector_lib::{
6    internal_event::{
7        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8    },
9    json_size::JsonSize,
10};
11
12use super::prelude::io_error_code;
13
14#[derive(Debug)]
15pub struct ExecEventsReceived<'a> {
16    pub count: usize,
17    pub command: &'a str,
18    pub byte_size: JsonSize,
19}
20
21impl InternalEvent for ExecEventsReceived<'_> {
22    fn emit(self) {
23        trace!(
24            message = "Events received.",
25            count = self.count,
26            byte_size = self.byte_size.get(),
27            command = %self.command,
28        );
29        counter!(
30            "component_received_events_total",
31            "command" => self.command.to_owned(),
32        )
33        .increment(self.count as u64);
34        counter!(
35            "component_received_event_bytes_total",
36            "command" => self.command.to_owned(),
37        )
38        .increment(self.byte_size.get() as u64);
39    }
40}
41
42#[derive(Debug)]
43pub struct ExecFailedError<'a> {
44    pub command: &'a str,
45    pub error: std::io::Error,
46}
47
48impl InternalEvent for ExecFailedError<'_> {
49    fn emit(self) {
50        error!(
51            message = "Unable to exec.",
52            command = %self.command,
53            error = ?self.error,
54            error_type = error_type::COMMAND_FAILED,
55            error_code = %io_error_code(&self.error),
56            stage = error_stage::RECEIVING,
57        );
58        counter!(
59            "component_errors_total",
60            "command" => self.command.to_owned(),
61            "error_type" => error_type::COMMAND_FAILED,
62            "error_code" => io_error_code(&self.error),
63            "stage" => error_stage::RECEIVING,
64        )
65        .increment(1);
66    }
67}
68
69#[derive(Debug)]
70pub struct ExecTimeoutError<'a> {
71    pub command: &'a str,
72    pub elapsed_seconds: u64,
73    pub error: Elapsed,
74}
75
76impl InternalEvent for ExecTimeoutError<'_> {
77    fn emit(self) {
78        error!(
79            message = "Timeout during exec.",
80            command = %self.command,
81            elapsed_seconds = %self.elapsed_seconds,
82            error = %self.error,
83            error_type = error_type::TIMED_OUT,
84            stage = error_stage::RECEIVING,
85        );
86        counter!(
87            "component_errors_total",
88            "command" => self.command.to_owned(),
89            "error_type" => error_type::TIMED_OUT,
90            "stage" => error_stage::RECEIVING,
91        )
92        .increment(1);
93    }
94}
95
96#[derive(Debug)]
97pub struct ExecCommandExecuted<'a> {
98    pub command: &'a str,
99    pub exit_status: Option<i32>,
100    pub exec_duration: Duration,
101}
102
103impl ExecCommandExecuted<'_> {
104    fn exit_status_string(&self) -> String {
105        match self.exit_status {
106            Some(exit_status) => exit_status.to_string(),
107            None => "unknown".to_string(),
108        }
109    }
110}
111
112impl InternalEvent for ExecCommandExecuted<'_> {
113    fn emit(self) {
114        let exit_status = self.exit_status_string();
115        trace!(
116            message = "Executed command.",
117            command = %self.command,
118            exit_status = %exit_status,
119            elapsed_millis = %self.exec_duration.as_millis(),
120        );
121        counter!(
122            "command_executed_total",
123            "command" => self.command.to_owned(),
124            "exit_status" => exit_status.clone(),
125        )
126        .increment(1);
127
128        histogram!(
129            "command_execution_duration_seconds",
130            "exit_status" => exit_status,
131            "command" => self.command.to_owned(),
132        )
133        .record(self.exec_duration);
134    }
135}
136
137pub enum ExecFailedToSignalChild {
138    #[cfg(unix)]
139    SignalError(nix::errno::Errno),
140    #[cfg(unix)]
141    FailedToMarshalPid(std::num::TryFromIntError),
142    #[cfg(unix)]
143    NoPid,
144    #[cfg(windows)]
145    IoError(std::io::Error),
146}
147
148impl ExecFailedToSignalChild {
149    fn to_error_code(&self) -> String {
150        use ExecFailedToSignalChild::*;
151
152        match self {
153            #[cfg(unix)]
154            SignalError(err) => format!("errno_{err}"),
155            #[cfg(unix)]
156            FailedToMarshalPid(_) => String::from("failed_to_marshal_pid"),
157            #[cfg(unix)]
158            NoPid => String::from("no_pid"),
159            #[cfg(windows)]
160            IoError(err) => err.to_string(),
161        }
162    }
163}
164
165impl std::fmt::Display for ExecFailedToSignalChild {
166    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
167        use ExecFailedToSignalChild::*;
168
169        match self {
170            #[cfg(unix)]
171            SignalError(err) => write!(f, "errno: {err}"),
172            #[cfg(unix)]
173            FailedToMarshalPid(err) => write!(f, "failed to marshal pid to i32: {err}"),
174            #[cfg(unix)]
175            NoPid => write!(f, "child had no pid"),
176            #[cfg(windows)]
177            IoError(err) => write!(f, "io error: {}", err),
178        }
179    }
180}
181
182pub struct ExecFailedToSignalChildError<'a> {
183    pub command: &'a tokio::process::Command,
184    pub error: ExecFailedToSignalChild,
185}
186
187impl InternalEvent for ExecFailedToSignalChildError<'_> {
188    fn emit(self) {
189        error!(
190            message = %format!("Failed to send SIGTERM to child, aborting early: {}", self.error),
191            command = ?self.command.as_std(),
192            error_code = %self.error.to_error_code(),
193            error_type = error_type::COMMAND_FAILED,
194            stage = error_stage::RECEIVING,
195        );
196        counter!(
197            "component_errors_total",
198            "command" => format!("{:?}", self.command.as_std()),
199            "error_code" => self.error.to_error_code(),
200            "error_type" => error_type::COMMAND_FAILED,
201            "stage" => error_stage::RECEIVING,
202        )
203        .increment(1);
204    }
205}
206
207pub struct ExecChannelClosedError;
208
209impl InternalEvent for ExecChannelClosedError {
210    fn emit(self) {
211        let exec_reason = "Receive channel closed, unable to send.";
212        error!(
213            message = exec_reason,
214            error_type = error_type::COMMAND_FAILED,
215            stage = error_stage::RECEIVING,
216        );
217        counter!(
218            "component_errors_total",
219            "error_type" => error_type::COMMAND_FAILED,
220            "stage" => error_stage::RECEIVING,
221        )
222        .increment(1);
223        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
224            count: 1,
225            reason: exec_reason
226        });
227    }
228}