vector/internal_events/
exec.rs

1use std::time::Duration;
2
3use metrics::{counter, histogram};
4use tokio::time::error::Elapsed;
5use vector_lib::internal_event::InternalEvent;
6use vector_lib::{
7    internal_event::{error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL},
8    json_size::JsonSize,
9};
10
11use super::prelude::io_error_code;
12
13#[derive(Debug)]
14pub struct ExecEventsReceived<'a> {
15    pub count: usize,
16    pub command: &'a str,
17    pub byte_size: JsonSize,
18}
19
20impl InternalEvent for ExecEventsReceived<'_> {
21    fn emit(self) {
22        trace!(
23            message = "Events received.",
24            count = self.count,
25            byte_size = self.byte_size.get(),
26            command = %self.command,
27        );
28        counter!(
29            "component_received_events_total",
30            "command" => self.command.to_owned(),
31        )
32        .increment(self.count as u64);
33        counter!(
34            "component_received_event_bytes_total",
35            "command" => self.command.to_owned(),
36        )
37        .increment(self.byte_size.get() as u64);
38    }
39}
40
41#[derive(Debug)]
42pub struct ExecFailedError<'a> {
43    pub command: &'a str,
44    pub error: std::io::Error,
45}
46
47impl InternalEvent for ExecFailedError<'_> {
48    fn emit(self) {
49        error!(
50            message = "Unable to exec.",
51            command = %self.command,
52            error = ?self.error,
53            error_type = error_type::COMMAND_FAILED,
54            error_code = %io_error_code(&self.error),
55            stage = error_stage::RECEIVING,
56            internal_log_rate_limit = true,
57        );
58        counter!(
59            "component_errors_total",
60            "command" => self.command.to_owned(),
61            "error_type" => error_type::COMMAND_FAILED,
62            "error_code" => io_error_code(&self.error),
63            "stage" => error_stage::RECEIVING,
64        )
65        .increment(1);
66    }
67}
68
69#[derive(Debug)]
70pub struct ExecTimeoutError<'a> {
71    pub command: &'a str,
72    pub elapsed_seconds: u64,
73    pub error: Elapsed,
74}
75
76impl InternalEvent for ExecTimeoutError<'_> {
77    fn emit(self) {
78        error!(
79            message = "Timeout during exec.",
80            command = %self.command,
81            elapsed_seconds = %self.elapsed_seconds,
82            error = %self.error,
83            error_type = error_type::TIMED_OUT,
84            stage = error_stage::RECEIVING,
85            internal_log_rate_limit = true,
86        );
87        counter!(
88            "component_errors_total",
89            "command" => self.command.to_owned(),
90            "error_type" => error_type::TIMED_OUT,
91            "stage" => error_stage::RECEIVING,
92        )
93        .increment(1);
94    }
95}
96
97#[derive(Debug)]
98pub struct ExecCommandExecuted<'a> {
99    pub command: &'a str,
100    pub exit_status: Option<i32>,
101    pub exec_duration: Duration,
102}
103
104impl ExecCommandExecuted<'_> {
105    fn exit_status_string(&self) -> String {
106        match self.exit_status {
107            Some(exit_status) => exit_status.to_string(),
108            None => "unknown".to_string(),
109        }
110    }
111}
112
113impl InternalEvent for ExecCommandExecuted<'_> {
114    fn emit(self) {
115        let exit_status = self.exit_status_string();
116        trace!(
117            message = "Executed command.",
118            command = %self.command,
119            exit_status = %exit_status,
120            elapsed_millis = %self.exec_duration.as_millis(),
121            internal_log_rate_limit = true,
122        );
123        counter!(
124            "command_executed_total",
125            "command" => self.command.to_owned(),
126            "exit_status" => exit_status.clone(),
127        )
128        .increment(1);
129
130        histogram!(
131            "command_execution_duration_seconds",
132            "exit_status" => exit_status,
133            "command" => self.command.to_owned(),
134        )
135        .record(self.exec_duration);
136    }
137}
138
139pub enum ExecFailedToSignalChild {
140    #[cfg(unix)]
141    SignalError(nix::errno::Errno),
142    #[cfg(unix)]
143    FailedToMarshalPid(std::num::TryFromIntError),
144    #[cfg(unix)]
145    NoPid,
146    #[cfg(windows)]
147    IoError(std::io::Error),
148}
149
150impl ExecFailedToSignalChild {
151    fn to_error_code(&self) -> String {
152        use ExecFailedToSignalChild::*;
153
154        match self {
155            #[cfg(unix)]
156            SignalError(err) => format!("errno_{err}"),
157            #[cfg(unix)]
158            FailedToMarshalPid(_) => String::from("failed_to_marshal_pid"),
159            #[cfg(unix)]
160            NoPid => String::from("no_pid"),
161            #[cfg(windows)]
162            IoError(err) => err.to_string(),
163        }
164    }
165}
166
167impl std::fmt::Display for ExecFailedToSignalChild {
168    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
169        use ExecFailedToSignalChild::*;
170
171        match self {
172            #[cfg(unix)]
173            SignalError(err) => write!(f, "errno: {err}"),
174            #[cfg(unix)]
175            FailedToMarshalPid(err) => write!(f, "failed to marshal pid to i32: {err}"),
176            #[cfg(unix)]
177            NoPid => write!(f, "child had no pid"),
178            #[cfg(windows)]
179            IoError(err) => write!(f, "io error: {}", err),
180        }
181    }
182}
183
184pub struct ExecFailedToSignalChildError<'a> {
185    pub command: &'a tokio::process::Command,
186    pub error: ExecFailedToSignalChild,
187}
188
189impl InternalEvent for ExecFailedToSignalChildError<'_> {
190    fn emit(self) {
191        error!(
192            message = %format!("Failed to send SIGTERM to child, aborting early: {}", self.error),
193            command = ?self.command.as_std(),
194            error_code = %self.error.to_error_code(),
195            error_type = error_type::COMMAND_FAILED,
196            stage = error_stage::RECEIVING,
197            internal_log_rate_limit = true,
198        );
199        counter!(
200            "component_errors_total",
201            "command" => format!("{:?}", self.command.as_std()),
202            "error_code" => self.error.to_error_code(),
203            "error_type" => error_type::COMMAND_FAILED,
204            "stage" => error_stage::RECEIVING,
205        )
206        .increment(1);
207    }
208}
209
210pub struct ExecChannelClosedError;
211
212impl InternalEvent for ExecChannelClosedError {
213    fn emit(self) {
214        let exec_reason = "Receive channel closed, unable to send.";
215        error!(
216            message = exec_reason,
217            error_type = error_type::COMMAND_FAILED,
218            stage = error_stage::RECEIVING,
219            internal_log_rate_limit = true,
220        );
221        counter!(
222            "component_errors_total",
223            "error_type" => error_type::COMMAND_FAILED,
224            "stage" => error_stage::RECEIVING,
225        )
226        .increment(1);
227        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
228            count: 1,
229            reason: exec_reason
230        });
231    }
232}