1use std::time::Duration;
2
3use metrics::{counter, histogram};
4use tokio::time::error::Elapsed;
5use vector_lib::{
6 internal_event::{
7 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
8 },
9 json_size::JsonSize,
10};
11
12use super::prelude::io_error_code;
13
14#[derive(Debug)]
15pub struct ExecEventsReceived<'a> {
16 pub count: usize,
17 pub command: &'a str,
18 pub byte_size: JsonSize,
19}
20
21impl InternalEvent for ExecEventsReceived<'_> {
22 fn emit(self) {
23 trace!(
24 message = "Events received.",
25 count = self.count,
26 byte_size = self.byte_size.get(),
27 command = %self.command,
28 );
29 counter!(
30 "component_received_events_total",
31 "command" => self.command.to_owned(),
32 )
33 .increment(self.count as u64);
34 counter!(
35 "component_received_event_bytes_total",
36 "command" => self.command.to_owned(),
37 )
38 .increment(self.byte_size.get() as u64);
39 }
40}
41
42#[derive(Debug)]
43pub struct ExecFailedError<'a> {
44 pub command: &'a str,
45 pub error: std::io::Error,
46}
47
48impl InternalEvent for ExecFailedError<'_> {
49 fn emit(self) {
50 error!(
51 message = "Unable to exec.",
52 command = %self.command,
53 error = ?self.error,
54 error_type = error_type::COMMAND_FAILED,
55 error_code = %io_error_code(&self.error),
56 stage = error_stage::RECEIVING,
57 );
58 counter!(
59 "component_errors_total",
60 "command" => self.command.to_owned(),
61 "error_type" => error_type::COMMAND_FAILED,
62 "error_code" => io_error_code(&self.error),
63 "stage" => error_stage::RECEIVING,
64 )
65 .increment(1);
66 }
67}
68
69#[derive(Debug)]
70pub struct ExecTimeoutError<'a> {
71 pub command: &'a str,
72 pub elapsed_seconds: u64,
73 pub error: Elapsed,
74}
75
76impl InternalEvent for ExecTimeoutError<'_> {
77 fn emit(self) {
78 error!(
79 message = "Timeout during exec.",
80 command = %self.command,
81 elapsed_seconds = %self.elapsed_seconds,
82 error = %self.error,
83 error_type = error_type::TIMED_OUT,
84 stage = error_stage::RECEIVING,
85 );
86 counter!(
87 "component_errors_total",
88 "command" => self.command.to_owned(),
89 "error_type" => error_type::TIMED_OUT,
90 "stage" => error_stage::RECEIVING,
91 )
92 .increment(1);
93 }
94}
95
96#[derive(Debug)]
97pub struct ExecCommandExecuted<'a> {
98 pub command: &'a str,
99 pub exit_status: Option<i32>,
100 pub exec_duration: Duration,
101}
102
103impl ExecCommandExecuted<'_> {
104 fn exit_status_string(&self) -> String {
105 match self.exit_status {
106 Some(exit_status) => exit_status.to_string(),
107 None => "unknown".to_string(),
108 }
109 }
110}
111
112impl InternalEvent for ExecCommandExecuted<'_> {
113 fn emit(self) {
114 let exit_status = self.exit_status_string();
115 trace!(
116 message = "Executed command.",
117 command = %self.command,
118 exit_status = %exit_status,
119 elapsed_millis = %self.exec_duration.as_millis(),
120 );
121 counter!(
122 "command_executed_total",
123 "command" => self.command.to_owned(),
124 "exit_status" => exit_status.clone(),
125 )
126 .increment(1);
127
128 histogram!(
129 "command_execution_duration_seconds",
130 "exit_status" => exit_status,
131 "command" => self.command.to_owned(),
132 )
133 .record(self.exec_duration);
134 }
135}
136
137pub enum ExecFailedToSignalChild {
138 #[cfg(unix)]
139 SignalError(nix::errno::Errno),
140 #[cfg(unix)]
141 FailedToMarshalPid(std::num::TryFromIntError),
142 #[cfg(unix)]
143 NoPid,
144 #[cfg(windows)]
145 IoError(std::io::Error),
146}
147
148impl ExecFailedToSignalChild {
149 fn to_error_code(&self) -> String {
150 use ExecFailedToSignalChild::*;
151
152 match self {
153 #[cfg(unix)]
154 SignalError(err) => format!("errno_{err}"),
155 #[cfg(unix)]
156 FailedToMarshalPid(_) => String::from("failed_to_marshal_pid"),
157 #[cfg(unix)]
158 NoPid => String::from("no_pid"),
159 #[cfg(windows)]
160 IoError(err) => err.to_string(),
161 }
162 }
163}
164
165impl std::fmt::Display for ExecFailedToSignalChild {
166 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
167 use ExecFailedToSignalChild::*;
168
169 match self {
170 #[cfg(unix)]
171 SignalError(err) => write!(f, "errno: {err}"),
172 #[cfg(unix)]
173 FailedToMarshalPid(err) => write!(f, "failed to marshal pid to i32: {err}"),
174 #[cfg(unix)]
175 NoPid => write!(f, "child had no pid"),
176 #[cfg(windows)]
177 IoError(err) => write!(f, "io error: {}", err),
178 }
179 }
180}
181
182pub struct ExecFailedToSignalChildError<'a> {
183 pub command: &'a tokio::process::Command,
184 pub error: ExecFailedToSignalChild,
185}
186
187impl InternalEvent for ExecFailedToSignalChildError<'_> {
188 fn emit(self) {
189 error!(
190 message = %format!("Failed to send SIGTERM to child, aborting early: {}", self.error),
191 command = ?self.command.as_std(),
192 error_code = %self.error.to_error_code(),
193 error_type = error_type::COMMAND_FAILED,
194 stage = error_stage::RECEIVING,
195 );
196 counter!(
197 "component_errors_total",
198 "command" => format!("{:?}", self.command.as_std()),
199 "error_code" => self.error.to_error_code(),
200 "error_type" => error_type::COMMAND_FAILED,
201 "stage" => error_stage::RECEIVING,
202 )
203 .increment(1);
204 }
205}
206
207pub struct ExecChannelClosedError;
208
209impl InternalEvent for ExecChannelClosedError {
210 fn emit(self) {
211 let exec_reason = "Receive channel closed, unable to send.";
212 error!(
213 message = exec_reason,
214 error_type = error_type::COMMAND_FAILED,
215 stage = error_stage::RECEIVING,
216 );
217 counter!(
218 "component_errors_total",
219 "error_type" => error_type::COMMAND_FAILED,
220 "stage" => error_stage::RECEIVING,
221 )
222 .increment(1);
223 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
224 count: 1,
225 reason: exec_reason
226 });
227 }
228}