1use std::time::Duration;
2
3use metrics::{counter, histogram};
4use tokio::time::error::Elapsed;
5use vector_lib::{
6 internal_event::{
7 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8 },
9 json_size::JsonSize,
10};
11
12use super::prelude::io_error_code;
13
14#[derive(Debug)]
15pub struct ExecEventsReceived<'a> {
16 pub count: usize,
17 pub command: &'a str,
18 pub byte_size: JsonSize,
19}
20
21impl InternalEvent for ExecEventsReceived<'_> {
22 fn emit(self) {
23 trace!(
24 message = "Events received.",
25 count = self.count,
26 byte_size = self.byte_size.get(),
27 command = %self.command,
28 );
29 counter!(
30 "component_received_events_total",
31 "command" => self.command.to_owned(),
32 )
33 .increment(self.count as u64);
34 counter!(
35 "component_received_event_bytes_total",
36 "command" => self.command.to_owned(),
37 )
38 .increment(self.byte_size.get() as u64);
39 }
40}
41
42#[derive(Debug)]
43pub struct ExecFailedError<'a> {
44 pub command: &'a str,
45 pub error: std::io::Error,
46}
47
48impl InternalEvent for ExecFailedError<'_> {
49 fn emit(self) {
50 error!(
51 message = "Unable to exec.",
52 command = %self.command,
53 error = ?self.error,
54 error_type = error_type::COMMAND_FAILED,
55 error_code = %io_error_code(&self.error),
56 stage = error_stage::RECEIVING,
57 internal_log_rate_limit = true,
58 );
59 counter!(
60 "component_errors_total",
61 "command" => self.command.to_owned(),
62 "error_type" => error_type::COMMAND_FAILED,
63 "error_code" => io_error_code(&self.error),
64 "stage" => error_stage::RECEIVING,
65 )
66 .increment(1);
67 }
68}
69
70#[derive(Debug)]
71pub struct ExecTimeoutError<'a> {
72 pub command: &'a str,
73 pub elapsed_seconds: u64,
74 pub error: Elapsed,
75}
76
77impl InternalEvent for ExecTimeoutError<'_> {
78 fn emit(self) {
79 error!(
80 message = "Timeout during exec.",
81 command = %self.command,
82 elapsed_seconds = %self.elapsed_seconds,
83 error = %self.error,
84 error_type = error_type::TIMED_OUT,
85 stage = error_stage::RECEIVING,
86 internal_log_rate_limit = true,
87 );
88 counter!(
89 "component_errors_total",
90 "command" => self.command.to_owned(),
91 "error_type" => error_type::TIMED_OUT,
92 "stage" => error_stage::RECEIVING,
93 )
94 .increment(1);
95 }
96}
97
98#[derive(Debug)]
99pub struct ExecCommandExecuted<'a> {
100 pub command: &'a str,
101 pub exit_status: Option<i32>,
102 pub exec_duration: Duration,
103}
104
105impl ExecCommandExecuted<'_> {
106 fn exit_status_string(&self) -> String {
107 match self.exit_status {
108 Some(exit_status) => exit_status.to_string(),
109 None => "unknown".to_string(),
110 }
111 }
112}
113
114impl InternalEvent for ExecCommandExecuted<'_> {
115 fn emit(self) {
116 let exit_status = self.exit_status_string();
117 trace!(
118 message = "Executed command.",
119 command = %self.command,
120 exit_status = %exit_status,
121 elapsed_millis = %self.exec_duration.as_millis(),
122 internal_log_rate_limit = true,
123 );
124 counter!(
125 "command_executed_total",
126 "command" => self.command.to_owned(),
127 "exit_status" => exit_status.clone(),
128 )
129 .increment(1);
130
131 histogram!(
132 "command_execution_duration_seconds",
133 "exit_status" => exit_status,
134 "command" => self.command.to_owned(),
135 )
136 .record(self.exec_duration);
137 }
138}
139
140pub enum ExecFailedToSignalChild {
141 #[cfg(unix)]
142 SignalError(nix::errno::Errno),
143 #[cfg(unix)]
144 FailedToMarshalPid(std::num::TryFromIntError),
145 #[cfg(unix)]
146 NoPid,
147 #[cfg(windows)]
148 IoError(std::io::Error),
149}
150
151impl ExecFailedToSignalChild {
152 fn to_error_code(&self) -> String {
153 use ExecFailedToSignalChild::*;
154
155 match self {
156 #[cfg(unix)]
157 SignalError(err) => format!("errno_{err}"),
158 #[cfg(unix)]
159 FailedToMarshalPid(_) => String::from("failed_to_marshal_pid"),
160 #[cfg(unix)]
161 NoPid => String::from("no_pid"),
162 #[cfg(windows)]
163 IoError(err) => err.to_string(),
164 }
165 }
166}
167
168impl std::fmt::Display for ExecFailedToSignalChild {
169 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
170 use ExecFailedToSignalChild::*;
171
172 match self {
173 #[cfg(unix)]
174 SignalError(err) => write!(f, "errno: {err}"),
175 #[cfg(unix)]
176 FailedToMarshalPid(err) => write!(f, "failed to marshal pid to i32: {err}"),
177 #[cfg(unix)]
178 NoPid => write!(f, "child had no pid"),
179 #[cfg(windows)]
180 IoError(err) => write!(f, "io error: {}", err),
181 }
182 }
183}
184
185pub struct ExecFailedToSignalChildError<'a> {
186 pub command: &'a tokio::process::Command,
187 pub error: ExecFailedToSignalChild,
188}
189
190impl InternalEvent for ExecFailedToSignalChildError<'_> {
191 fn emit(self) {
192 error!(
193 message = %format!("Failed to send SIGTERM to child, aborting early: {}", self.error),
194 command = ?self.command.as_std(),
195 error_code = %self.error.to_error_code(),
196 error_type = error_type::COMMAND_FAILED,
197 stage = error_stage::RECEIVING,
198 internal_log_rate_limit = true,
199 );
200 counter!(
201 "component_errors_total",
202 "command" => format!("{:?}", self.command.as_std()),
203 "error_code" => self.error.to_error_code(),
204 "error_type" => error_type::COMMAND_FAILED,
205 "stage" => error_stage::RECEIVING,
206 )
207 .increment(1);
208 }
209}
210
211pub struct ExecChannelClosedError;
212
213impl InternalEvent for ExecChannelClosedError {
214 fn emit(self) {
215 let exec_reason = "Receive channel closed, unable to send.";
216 error!(
217 message = exec_reason,
218 error_type = error_type::COMMAND_FAILED,
219 stage = error_stage::RECEIVING,
220 internal_log_rate_limit = true,
221 );
222 counter!(
223 "component_errors_total",
224 "error_type" => error_type::COMMAND_FAILED,
225 "stage" => error_stage::RECEIVING,
226 )
227 .increment(1);
228 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
229 count: 1,
230 reason: exec_reason
231 });
232 }
233}