1use std::time::Duration;
2
3use metrics::{counter, histogram};
4use tokio::time::error::Elapsed;
5use vector_lib::internal_event::InternalEvent;
6use vector_lib::{
7 internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL},
8 json_size::JsonSize,
9};
10
11use super::prelude::io_error_code;
12
13#[derive(Debug)]
14pub struct ExecEventsReceived<'a> {
15 pub count: usize,
16 pub command: &'a str,
17 pub byte_size: JsonSize,
18}
19
20impl InternalEvent for ExecEventsReceived<'_> {
21 fn emit(self) {
22 trace!(
23 message = "Events received.",
24 count = self.count,
25 byte_size = self.byte_size.get(),
26 command = %self.command,
27 );
28 counter!(
29 "component_received_events_total",
30 "command" => self.command.to_owned(),
31 )
32 .increment(self.count as u64);
33 counter!(
34 "component_received_event_bytes_total",
35 "command" => self.command.to_owned(),
36 )
37 .increment(self.byte_size.get() as u64);
38 }
39}
40
41#[derive(Debug)]
42pub struct ExecFailedError<'a> {
43 pub command: &'a str,
44 pub error: std::io::Error,
45}
46
47impl InternalEvent for ExecFailedError<'_> {
48 fn emit(self) {
49 error!(
50 message = "Unable to exec.",
51 command = %self.command,
52 error = ?self.error,
53 error_type = error_type::COMMAND_FAILED,
54 error_code = %io_error_code(&self.error),
55 stage = error_stage::RECEIVING,
56 internal_log_rate_limit = true,
57 );
58 counter!(
59 "component_errors_total",
60 "command" => self.command.to_owned(),
61 "error_type" => error_type::COMMAND_FAILED,
62 "error_code" => io_error_code(&self.error),
63 "stage" => error_stage::RECEIVING,
64 )
65 .increment(1);
66 }
67}
68
69#[derive(Debug)]
70pub struct ExecTimeoutError<'a> {
71 pub command: &'a str,
72 pub elapsed_seconds: u64,
73 pub error: Elapsed,
74}
75
76impl InternalEvent for ExecTimeoutError<'_> {
77 fn emit(self) {
78 error!(
79 message = "Timeout during exec.",
80 command = %self.command,
81 elapsed_seconds = %self.elapsed_seconds,
82 error = %self.error,
83 error_type = error_type::TIMED_OUT,
84 stage = error_stage::RECEIVING,
85 internal_log_rate_limit = true,
86 );
87 counter!(
88 "component_errors_total",
89 "command" => self.command.to_owned(),
90 "error_type" => error_type::TIMED_OUT,
91 "stage" => error_stage::RECEIVING,
92 )
93 .increment(1);
94 }
95}
96
97#[derive(Debug)]
98pub struct ExecCommandExecuted<'a> {
99 pub command: &'a str,
100 pub exit_status: Option<i32>,
101 pub exec_duration: Duration,
102}
103
104impl ExecCommandExecuted<'_> {
105 fn exit_status_string(&self) -> String {
106 match self.exit_status {
107 Some(exit_status) => exit_status.to_string(),
108 None => "unknown".to_string(),
109 }
110 }
111}
112
113impl InternalEvent for ExecCommandExecuted<'_> {
114 fn emit(self) {
115 let exit_status = self.exit_status_string();
116 trace!(
117 message = "Executed command.",
118 command = %self.command,
119 exit_status = %exit_status,
120 elapsed_millis = %self.exec_duration.as_millis(),
121 internal_log_rate_limit = true,
122 );
123 counter!(
124 "command_executed_total",
125 "command" => self.command.to_owned(),
126 "exit_status" => exit_status.clone(),
127 )
128 .increment(1);
129
130 histogram!(
131 "command_execution_duration_seconds",
132 "exit_status" => exit_status,
133 "command" => self.command.to_owned(),
134 )
135 .record(self.exec_duration);
136 }
137}
138
139pub enum ExecFailedToSignalChild {
140 #[cfg(unix)]
141 SignalError(nix::errno::Errno),
142 #[cfg(unix)]
143 FailedToMarshalPid(std::num::TryFromIntError),
144 #[cfg(unix)]
145 NoPid,
146 #[cfg(windows)]
147 IoError(std::io::Error),
148}
149
150impl ExecFailedToSignalChild {
151 fn to_error_code(&self) -> String {
152 use ExecFailedToSignalChild::*;
153
154 match self {
155 #[cfg(unix)]
156 SignalError(err) => format!("errno_{err}"),
157 #[cfg(unix)]
158 FailedToMarshalPid(_) => String::from("failed_to_marshal_pid"),
159 #[cfg(unix)]
160 NoPid => String::from("no_pid"),
161 #[cfg(windows)]
162 IoError(err) => err.to_string(),
163 }
164 }
165}
166
167impl std::fmt::Display for ExecFailedToSignalChild {
168 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
169 use ExecFailedToSignalChild::*;
170
171 match self {
172 #[cfg(unix)]
173 SignalError(err) => write!(f, "errno: {err}"),
174 #[cfg(unix)]
175 FailedToMarshalPid(err) => write!(f, "failed to marshal pid to i32: {err}"),
176 #[cfg(unix)]
177 NoPid => write!(f, "child had no pid"),
178 #[cfg(windows)]
179 IoError(err) => write!(f, "io error: {}", err),
180 }
181 }
182}
183
184pub struct ExecFailedToSignalChildError<'a> {
185 pub command: &'a tokio::process::Command,
186 pub error: ExecFailedToSignalChild,
187}
188
189impl InternalEvent for ExecFailedToSignalChildError<'_> {
190 fn emit(self) {
191 error!(
192 message = %format!("Failed to send SIGTERM to child, aborting early: {}", self.error),
193 command = ?self.command.as_std(),
194 error_code = %self.error.to_error_code(),
195 error_type = error_type::COMMAND_FAILED,
196 stage = error_stage::RECEIVING,
197 internal_log_rate_limit = true,
198 );
199 counter!(
200 "component_errors_total",
201 "command" => format!("{:?}", self.command.as_std()),
202 "error_code" => self.error.to_error_code(),
203 "error_type" => error_type::COMMAND_FAILED,
204 "stage" => error_stage::RECEIVING,
205 )
206 .increment(1);
207 }
208}
209
210pub struct ExecChannelClosedError;
211
212impl InternalEvent for ExecChannelClosedError {
213 fn emit(self) {
214 let exec_reason = "Receive channel closed, unable to send.";
215 error!(
216 message = exec_reason,
217 error_type = error_type::COMMAND_FAILED,
218 stage = error_stage::RECEIVING,
219 internal_log_rate_limit = true,
220 );
221 counter!(
222 "component_errors_total",
223 "error_type" => error_type::COMMAND_FAILED,
224 "stage" => error_stage::RECEIVING,
225 )
226 .increment(1);
227 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
228 count: 1,
229 reason: exec_reason
230 });
231 }
232}