1use std::time::Duration;
2
3use metrics::{counter, histogram};
4use tokio::time::error::Elapsed;
5use vector_lib::{
6 NamedInternalEvent,
7 internal_event::{
8 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
9 },
10 json_size::JsonSize,
11};
12
13use super::prelude::io_error_code;
14
15#[derive(Debug, NamedInternalEvent)]
16pub struct ExecEventsReceived<'a> {
17 pub count: usize,
18 pub command: &'a str,
19 pub byte_size: JsonSize,
20}
21
22impl InternalEvent for ExecEventsReceived<'_> {
23 fn emit(self) {
24 trace!(
25 message = "Events received.",
26 count = self.count,
27 byte_size = self.byte_size.get(),
28 command = %self.command,
29 );
30 counter!(
31 "component_received_events_total",
32 "command" => self.command.to_owned(),
33 )
34 .increment(self.count as u64);
35 counter!(
36 "component_received_event_bytes_total",
37 "command" => self.command.to_owned(),
38 )
39 .increment(self.byte_size.get() as u64);
40 }
41}
42
43#[derive(Debug, NamedInternalEvent)]
44pub struct ExecFailedError<'a> {
45 pub command: &'a str,
46 pub error: std::io::Error,
47}
48
49impl InternalEvent for ExecFailedError<'_> {
50 fn emit(self) {
51 error!(
52 message = "Unable to exec.",
53 command = %self.command,
54 error = ?self.error,
55 error_type = error_type::COMMAND_FAILED,
56 error_code = %io_error_code(&self.error),
57 stage = error_stage::RECEIVING,
58 );
59 counter!(
60 "component_errors_total",
61 "command" => self.command.to_owned(),
62 "error_type" => error_type::COMMAND_FAILED,
63 "error_code" => io_error_code(&self.error),
64 "stage" => error_stage::RECEIVING,
65 )
66 .increment(1);
67 }
68}
69
70#[derive(Debug, NamedInternalEvent)]
71pub struct ExecTimeoutError<'a> {
72 pub command: &'a str,
73 pub elapsed_seconds: u64,
74 pub error: Elapsed,
75}
76
77impl InternalEvent for ExecTimeoutError<'_> {
78 fn emit(self) {
79 error!(
80 message = "Timeout during exec.",
81 command = %self.command,
82 elapsed_seconds = %self.elapsed_seconds,
83 error = %self.error,
84 error_type = error_type::TIMED_OUT,
85 stage = error_stage::RECEIVING,
86 );
87 counter!(
88 "component_errors_total",
89 "command" => self.command.to_owned(),
90 "error_type" => error_type::TIMED_OUT,
91 "stage" => error_stage::RECEIVING,
92 )
93 .increment(1);
94 }
95}
96
97#[derive(Debug, NamedInternalEvent)]
98pub struct ExecCommandExecuted<'a> {
99 pub command: &'a str,
100 pub exit_status: Option<i32>,
101 pub exec_duration: Duration,
102}
103
104impl ExecCommandExecuted<'_> {
105 fn exit_status_string(&self) -> String {
106 match self.exit_status {
107 Some(exit_status) => exit_status.to_string(),
108 None => "unknown".to_string(),
109 }
110 }
111}
112
113impl InternalEvent for ExecCommandExecuted<'_> {
114 fn emit(self) {
115 let exit_status = self.exit_status_string();
116 trace!(
117 message = "Executed command.",
118 command = %self.command,
119 exit_status = %exit_status,
120 elapsed_millis = %self.exec_duration.as_millis(),
121 );
122 counter!(
123 "command_executed_total",
124 "command" => self.command.to_owned(),
125 "exit_status" => exit_status.clone(),
126 )
127 .increment(1);
128
129 histogram!(
130 "command_execution_duration_seconds",
131 "exit_status" => exit_status,
132 "command" => self.command.to_owned(),
133 )
134 .record(self.exec_duration);
135 }
136}
137
138pub enum ExecFailedToSignalChild {
139 #[cfg(unix)]
140 SignalError(nix::errno::Errno),
141 #[cfg(unix)]
142 FailedToMarshalPid(std::num::TryFromIntError),
143 #[cfg(unix)]
144 NoPid,
145 #[cfg(windows)]
146 IoError(std::io::Error),
147}
148
149impl ExecFailedToSignalChild {
150 fn to_error_code(&self) -> String {
151 use ExecFailedToSignalChild::*;
152
153 match self {
154 #[cfg(unix)]
155 SignalError(err) => format!("errno_{err}"),
156 #[cfg(unix)]
157 FailedToMarshalPid(_) => String::from("failed_to_marshal_pid"),
158 #[cfg(unix)]
159 NoPid => String::from("no_pid"),
160 #[cfg(windows)]
161 IoError(err) => err.to_string(),
162 }
163 }
164}
165
166impl std::fmt::Display for ExecFailedToSignalChild {
167 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
168 use ExecFailedToSignalChild::*;
169
170 match self {
171 #[cfg(unix)]
172 SignalError(err) => write!(f, "errno: {err}"),
173 #[cfg(unix)]
174 FailedToMarshalPid(err) => write!(f, "failed to marshal pid to i32: {err}"),
175 #[cfg(unix)]
176 NoPid => write!(f, "child had no pid"),
177 #[cfg(windows)]
178 IoError(err) => write!(f, "io error: {}", err),
179 }
180 }
181}
182
183#[derive(NamedInternalEvent)]
184pub struct ExecFailedToSignalChildError<'a> {
185 pub command: &'a tokio::process::Command,
186 pub error: ExecFailedToSignalChild,
187}
188
189impl InternalEvent for ExecFailedToSignalChildError<'_> {
190 fn emit(self) {
191 error!(
192 message = %format!("Failed to send SIGTERM to child, aborting early: {}", self.error),
193 command = ?self.command.as_std(),
194 error_code = %self.error.to_error_code(),
195 error_type = error_type::COMMAND_FAILED,
196 stage = error_stage::RECEIVING,
197 );
198 counter!(
199 "component_errors_total",
200 "command" => format!("{:?}", self.command.as_std()),
201 "error_code" => self.error.to_error_code(),
202 "error_type" => error_type::COMMAND_FAILED,
203 "stage" => error_stage::RECEIVING,
204 )
205 .increment(1);
206 }
207}
208
209#[derive(NamedInternalEvent)]
210pub struct ExecChannelClosedError;
211
212impl InternalEvent for ExecChannelClosedError {
213 fn emit(self) {
214 let exec_reason = "Receive channel closed, unable to send.";
215 error!(
216 message = exec_reason,
217 error_type = error_type::COMMAND_FAILED,
218 stage = error_stage::RECEIVING,
219 );
220 counter!(
221 "component_errors_total",
222 "error_type" => error_type::COMMAND_FAILED,
223 "stage" => error_stage::RECEIVING,
224 )
225 .increment(1);
226 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
227 count: 1,
228 reason: exec_reason
229 });
230 }
231}