vector/internal_events/
exec.rs

1use std::time::Duration;
2
3use metrics::{counter, histogram};
4use tokio::time::error::Elapsed;
5use vector_lib::{
6    NamedInternalEvent,
7    internal_event::{
8        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
9    },
10    json_size::JsonSize,
11};
12
13use super::prelude::io_error_code;
14
15#[derive(Debug, NamedInternalEvent)]
16pub struct ExecEventsReceived<'a> {
17    pub count: usize,
18    pub command: &'a str,
19    pub byte_size: JsonSize,
20}
21
22impl InternalEvent for ExecEventsReceived<'_> {
23    fn emit(self) {
24        trace!(
25            message = "Events received.",
26            count = self.count,
27            byte_size = self.byte_size.get(),
28            command = %self.command,
29        );
30        counter!(
31            "component_received_events_total",
32            "command" => self.command.to_owned(),
33        )
34        .increment(self.count as u64);
35        counter!(
36            "component_received_event_bytes_total",
37            "command" => self.command.to_owned(),
38        )
39        .increment(self.byte_size.get() as u64);
40    }
41}
42
43#[derive(Debug, NamedInternalEvent)]
44pub struct ExecFailedError<'a> {
45    pub command: &'a str,
46    pub error: std::io::Error,
47}
48
49impl InternalEvent for ExecFailedError<'_> {
50    fn emit(self) {
51        error!(
52            message = "Unable to exec.",
53            command = %self.command,
54            error = ?self.error,
55            error_type = error_type::COMMAND_FAILED,
56            error_code = %io_error_code(&self.error),
57            stage = error_stage::RECEIVING,
58        );
59        counter!(
60            "component_errors_total",
61            "command" => self.command.to_owned(),
62            "error_type" => error_type::COMMAND_FAILED,
63            "error_code" => io_error_code(&self.error),
64            "stage" => error_stage::RECEIVING,
65        )
66        .increment(1);
67    }
68}
69
70#[derive(Debug, NamedInternalEvent)]
71pub struct ExecTimeoutError<'a> {
72    pub command: &'a str,
73    pub elapsed_seconds: u64,
74    pub error: Elapsed,
75}
76
77impl InternalEvent for ExecTimeoutError<'_> {
78    fn emit(self) {
79        error!(
80            message = "Timeout during exec.",
81            command = %self.command,
82            elapsed_seconds = %self.elapsed_seconds,
83            error = %self.error,
84            error_type = error_type::TIMED_OUT,
85            stage = error_stage::RECEIVING,
86        );
87        counter!(
88            "component_errors_total",
89            "command" => self.command.to_owned(),
90            "error_type" => error_type::TIMED_OUT,
91            "stage" => error_stage::RECEIVING,
92        )
93        .increment(1);
94    }
95}
96
97#[derive(Debug, NamedInternalEvent)]
98pub struct ExecCommandExecuted<'a> {
99    pub command: &'a str,
100    pub exit_status: Option<i32>,
101    pub exec_duration: Duration,
102}
103
104impl ExecCommandExecuted<'_> {
105    fn exit_status_string(&self) -> String {
106        match self.exit_status {
107            Some(exit_status) => exit_status.to_string(),
108            None => "unknown".to_string(),
109        }
110    }
111}
112
113impl InternalEvent for ExecCommandExecuted<'_> {
114    fn emit(self) {
115        let exit_status = self.exit_status_string();
116        trace!(
117            message = "Executed command.",
118            command = %self.command,
119            exit_status = %exit_status,
120            elapsed_millis = %self.exec_duration.as_millis(),
121        );
122        counter!(
123            "command_executed_total",
124            "command" => self.command.to_owned(),
125            "exit_status" => exit_status.clone(),
126        )
127        .increment(1);
128
129        histogram!(
130            "command_execution_duration_seconds",
131            "exit_status" => exit_status,
132            "command" => self.command.to_owned(),
133        )
134        .record(self.exec_duration);
135    }
136}
137
138pub enum ExecFailedToSignalChild {
139    #[cfg(unix)]
140    SignalError(nix::errno::Errno),
141    #[cfg(unix)]
142    FailedToMarshalPid(std::num::TryFromIntError),
143    #[cfg(unix)]
144    NoPid,
145    #[cfg(windows)]
146    IoError(std::io::Error),
147}
148
149impl ExecFailedToSignalChild {
150    fn to_error_code(&self) -> String {
151        use ExecFailedToSignalChild::*;
152
153        match self {
154            #[cfg(unix)]
155            SignalError(err) => format!("errno_{err}"),
156            #[cfg(unix)]
157            FailedToMarshalPid(_) => String::from("failed_to_marshal_pid"),
158            #[cfg(unix)]
159            NoPid => String::from("no_pid"),
160            #[cfg(windows)]
161            IoError(err) => err.to_string(),
162        }
163    }
164}
165
166impl std::fmt::Display for ExecFailedToSignalChild {
167    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
168        use ExecFailedToSignalChild::*;
169
170        match self {
171            #[cfg(unix)]
172            SignalError(err) => write!(f, "errno: {err}"),
173            #[cfg(unix)]
174            FailedToMarshalPid(err) => write!(f, "failed to marshal pid to i32: {err}"),
175            #[cfg(unix)]
176            NoPid => write!(f, "child had no pid"),
177            #[cfg(windows)]
178            IoError(err) => write!(f, "io error: {}", err),
179        }
180    }
181}
182
183#[derive(NamedInternalEvent)]
184pub struct ExecFailedToSignalChildError<'a> {
185    pub command: &'a tokio::process::Command,
186    pub error: ExecFailedToSignalChild,
187}
188
189impl InternalEvent for ExecFailedToSignalChildError<'_> {
190    fn emit(self) {
191        error!(
192            message = %format!("Failed to send SIGTERM to child, aborting early: {}", self.error),
193            command = ?self.command.as_std(),
194            error_code = %self.error.to_error_code(),
195            error_type = error_type::COMMAND_FAILED,
196            stage = error_stage::RECEIVING,
197        );
198        counter!(
199            "component_errors_total",
200            "command" => format!("{:?}", self.command.as_std()),
201            "error_code" => self.error.to_error_code(),
202            "error_type" => error_type::COMMAND_FAILED,
203            "stage" => error_stage::RECEIVING,
204        )
205        .increment(1);
206    }
207}
208
209#[derive(NamedInternalEvent)]
210pub struct ExecChannelClosedError;
211
212impl InternalEvent for ExecChannelClosedError {
213    fn emit(self) {
214        let exec_reason = "Receive channel closed, unable to send.";
215        error!(
216            message = exec_reason,
217            error_type = error_type::COMMAND_FAILED,
218            stage = error_stage::RECEIVING,
219        );
220        counter!(
221            "component_errors_total",
222            "error_type" => error_type::COMMAND_FAILED,
223            "stage" => error_stage::RECEIVING,
224        )
225        .increment(1);
226        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
227            count: 1,
228            reason: exec_reason
229        });
230    }
231}