1use std::{collections::HashMap, io::Error, path::PathBuf, process::ExitStatus};
2
3use chrono::Utc;
4use futures::StreamExt;
5use smallvec::SmallVec;
6use snafu::Snafu;
7use tokio::{
8 io::{AsyncRead, BufReader},
9 process::Command,
10 sync::mpsc::{Sender, channel},
11 time::{self, Duration, Instant, sleep},
12};
13use tokio_stream::wrappers::IntervalStream;
14use tokio_util::codec::FramedRead;
15use vector_lib::{
16 EstimatedJsonEncodedSizeOf,
17 codecs::{
18 StreamDecodingError,
19 decoding::{DeserializerConfig, FramingConfig},
20 },
21 config::{LegacyKey, LogNamespace, log_schema},
22 configurable::configurable_component,
23 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
24 lookup::{owned_value_path, path},
25};
26use vrl::{path::OwnedValuePath, value::Kind};
27
28use crate::{
29 SourceSender,
30 codecs::{Decoder, DecodingConfig},
31 config::{SourceConfig, SourceContext, SourceOutput},
32 event::Event,
33 internal_events::{
34 ExecChannelClosedError, ExecCommandExecuted, ExecEventsReceived, ExecFailedError,
35 ExecFailedToSignalChild, ExecFailedToSignalChildError, ExecTimeoutError, StreamClosedError,
36 },
37 serde::default_decoding,
38 shutdown::ShutdownSignal,
39};
40
41#[cfg(test)]
42mod tests;
43
44#[configurable_component(source("exec", "Collect output from a process running on the host."))]
46#[derive(Clone, Debug)]
47#[serde(deny_unknown_fields)]
48pub struct ExecConfig {
49 #[configurable(derived)]
50 pub mode: Mode,
51
52 #[configurable(derived)]
53 pub scheduled: Option<ScheduledConfig>,
54
55 #[configurable(derived)]
56 pub streaming: Option<StreamingConfig>,
57
58 #[configurable(metadata(docs::examples = "echo", docs::examples = "Hello World!"))]
60 pub command: Vec<String>,
61
62 #[serde(default)]
65 #[configurable(metadata(docs::additional_props_description = "An environment variable."))]
66 #[configurable(metadata(docs::examples = "environment_examples()"))]
67 pub environment: Option<HashMap<String, String>>,
68
69 #[serde(default = "default_clear_environment")]
71 pub clear_environment: bool,
72
73 pub working_directory: Option<PathBuf>,
75
76 #[serde(default = "default_include_stderr")]
78 pub include_stderr: bool,
79
80 #[serde(default = "default_maximum_buffer_size")]
82 pub maximum_buffer_size_bytes: usize,
83
84 #[configurable(derived)]
85 framing: Option<FramingConfig>,
86
87 #[configurable(derived)]
88 #[serde(default = "default_decoding")]
89 decoding: DeserializerConfig,
90
91 #[configurable(metadata(docs::hidden))]
93 #[serde(default)]
94 log_namespace: Option<bool>,
95}
96
97#[configurable_component]
99#[derive(Clone, Copy, Debug)]
100#[serde(rename_all = "snake_case", deny_unknown_fields)]
101pub enum Mode {
102 Scheduled,
104
105 Streaming,
107}
108
109#[configurable_component]
111#[derive(Clone, Debug)]
112#[serde(deny_unknown_fields)]
113pub struct ScheduledConfig {
114 #[serde(default = "default_exec_interval_secs")]
118 exec_interval_secs: u64,
119}
120
121#[configurable_component]
123#[derive(Clone, Debug)]
124#[serde(deny_unknown_fields)]
125pub struct StreamingConfig {
126 #[serde(default = "default_respawn_on_exit")]
128 respawn_on_exit: bool,
129
130 #[serde(default = "default_respawn_interval_secs")]
132 #[configurable(metadata(docs::human_name = "Respawn Interval"))]
133 respawn_interval_secs: u64,
134}
135
136#[derive(Debug, PartialEq, Eq, Snafu)]
137pub enum ExecConfigError {
138 #[snafu(display("A non-empty list for command must be provided"))]
139 CommandEmpty,
140 #[snafu(display("The maximum buffer size must be greater than zero"))]
141 ZeroBuffer,
142}
143
144impl Default for ExecConfig {
145 fn default() -> Self {
146 ExecConfig {
147 mode: Mode::Scheduled,
148 scheduled: Some(ScheduledConfig {
149 exec_interval_secs: default_exec_interval_secs(),
150 }),
151 streaming: None,
152 command: vec!["echo".to_owned(), "Hello World!".to_owned()],
153 environment: None,
154 clear_environment: default_clear_environment(),
155 working_directory: None,
156 include_stderr: default_include_stderr(),
157 maximum_buffer_size_bytes: default_maximum_buffer_size(),
158 framing: None,
159 decoding: default_decoding(),
160 log_namespace: None,
161 }
162 }
163}
164
165const fn default_maximum_buffer_size() -> usize {
166 1000000
168}
169
170const fn default_exec_interval_secs() -> u64 {
171 60
172}
173
174const fn default_respawn_interval_secs() -> u64 {
175 5
176}
177
178const fn default_respawn_on_exit() -> bool {
179 true
180}
181
182const fn default_clear_environment() -> bool {
183 false
184}
185
186const fn default_include_stderr() -> bool {
187 true
188}
189
190fn environment_examples() -> HashMap<String, String> {
191 HashMap::<_, _>::from_iter([
192 ("LANG".to_owned(), "es_ES.UTF-8".to_owned()),
193 ("TZ".to_owned(), "Etc/UTC".to_owned()),
194 ("PATH".to_owned(), "/bin:/usr/bin:/usr/local/bin".to_owned()),
195 ])
196}
197
198fn get_hostname() -> Option<String> {
199 crate::get_hostname().ok()
200}
201
202const STDOUT: &str = "stdout";
203const STDERR: &str = "stderr";
204const STREAM_KEY: &str = "stream";
205const PID_KEY: &str = "pid";
206const COMMAND_KEY: &str = "command";
207
208impl_generate_config_from_default!(ExecConfig);
209
210impl ExecConfig {
211 const fn validate(&self) -> Result<(), ExecConfigError> {
212 if self.command.is_empty() {
213 Err(ExecConfigError::CommandEmpty)
214 } else if self.maximum_buffer_size_bytes == 0 {
215 Err(ExecConfigError::ZeroBuffer)
216 } else {
217 Ok(())
218 }
219 }
220
221 fn command_line(&self) -> String {
222 self.command.join(" ")
223 }
224
225 const fn exec_interval_secs_or_default(&self) -> u64 {
226 match &self.scheduled {
227 None => default_exec_interval_secs(),
228 Some(config) => config.exec_interval_secs,
229 }
230 }
231
232 const fn respawn_on_exit_or_default(&self) -> bool {
233 match &self.streaming {
234 None => default_respawn_on_exit(),
235 Some(config) => config.respawn_on_exit,
236 }
237 }
238
239 const fn respawn_interval_secs_or_default(&self) -> u64 {
240 match &self.streaming {
241 None => default_respawn_interval_secs(),
242 Some(config) => config.respawn_interval_secs,
243 }
244 }
245}
246
247#[async_trait::async_trait]
248#[typetag::serde(name = "exec")]
249impl SourceConfig for ExecConfig {
250 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
251 self.validate()?;
252 let hostname = get_hostname();
253 let log_namespace = cx.log_namespace(self.log_namespace);
254
255 let framing = self
256 .framing
257 .clone()
258 .unwrap_or_else(|| self.decoding.default_stream_framing());
259 let decoder = DecodingConfig::new(framing, self.decoding.clone(), log_namespace).build()?;
260
261 match &self.mode {
262 Mode::Scheduled => {
263 let exec_interval_secs = self.exec_interval_secs_or_default();
264
265 Ok(Box::pin(run_scheduled(
266 self.clone(),
267 hostname,
268 exec_interval_secs,
269 decoder,
270 cx.shutdown,
271 cx.out,
272 log_namespace,
273 )))
274 }
275 Mode::Streaming => {
276 let respawn_on_exit = self.respawn_on_exit_or_default();
277 let respawn_interval_secs = self.respawn_interval_secs_or_default();
278
279 Ok(Box::pin(run_streaming(
280 self.clone(),
281 hostname,
282 respawn_on_exit,
283 respawn_interval_secs,
284 decoder,
285 cx.shutdown,
286 cx.out,
287 log_namespace,
288 )))
289 }
290 }
291 }
292
293 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
294 let log_namespace = global_log_namespace.merge(Some(self.log_namespace.unwrap_or(false)));
295
296 let schema_definition = self
297 .decoding
298 .schema_definition(log_namespace)
299 .with_standard_vector_source_metadata()
300 .with_source_metadata(
301 Self::NAME,
302 Some(LegacyKey::InsertIfEmpty(
303 log_schema()
304 .host_key()
305 .map_or(OwnedValuePath::root(), |key| key.clone()),
306 )),
307 &owned_value_path!("host"),
308 Kind::bytes().or_undefined(),
309 Some("host"),
310 )
311 .with_source_metadata(
312 Self::NAME,
313 Some(LegacyKey::InsertIfEmpty(owned_value_path!(STREAM_KEY))),
314 &owned_value_path!(STREAM_KEY),
315 Kind::bytes().or_undefined(),
316 None,
317 )
318 .with_source_metadata(
319 Self::NAME,
320 Some(LegacyKey::InsertIfEmpty(owned_value_path!(PID_KEY))),
321 &owned_value_path!(PID_KEY),
322 Kind::integer().or_undefined(),
323 None,
324 )
325 .with_source_metadata(
326 Self::NAME,
327 Some(LegacyKey::InsertIfEmpty(owned_value_path!(COMMAND_KEY))),
328 &owned_value_path!(COMMAND_KEY),
329 Kind::bytes(),
330 None,
331 );
332
333 vec![SourceOutput::new_maybe_logs(
334 self.decoding.output_type(),
335 schema_definition,
336 )]
337 }
338
339 fn can_acknowledge(&self) -> bool {
340 false
341 }
342}
343
344async fn run_scheduled(
345 config: ExecConfig,
346 hostname: Option<String>,
347 exec_interval_secs: u64,
348 decoder: Decoder,
349 shutdown: ShutdownSignal,
350 out: SourceSender,
351 log_namespace: LogNamespace,
352) -> Result<(), ()> {
353 debug!("Starting scheduled exec runs.");
354 let schedule = Duration::from_secs(exec_interval_secs);
355
356 let mut interval = IntervalStream::new(time::interval(schedule)).take_until(shutdown.clone());
357
358 while interval.next().await.is_some() {
359 let timeout = tokio::time::timeout(
361 schedule,
362 run_command(
363 config.clone(),
364 hostname.clone(),
365 decoder.clone(),
366 shutdown.clone(),
367 out.clone(),
368 log_namespace,
369 ),
370 )
371 .await;
372
373 match timeout {
374 Ok(output) => {
375 if let Err(command_error) = output {
376 emit!(ExecFailedError {
377 command: config.command_line().as_str(),
378 error: command_error,
379 });
380 }
381 }
382 Err(error) => {
383 emit!(ExecTimeoutError {
384 command: config.command_line().as_str(),
385 elapsed_seconds: schedule.as_secs(),
386 error,
387 });
388 }
389 }
390 }
391
392 debug!("Finished scheduled exec runs.");
393 Ok(())
394}
395
396#[allow(clippy::too_many_arguments)]
397async fn run_streaming(
398 config: ExecConfig,
399 hostname: Option<String>,
400 respawn_on_exit: bool,
401 respawn_interval_secs: u64,
402 decoder: Decoder,
403 mut shutdown: ShutdownSignal,
404 out: SourceSender,
405 log_namespace: LogNamespace,
406) -> Result<(), ()> {
407 if respawn_on_exit {
408 let duration = Duration::from_secs(respawn_interval_secs);
409
410 loop {
412 let output = run_command(
413 config.clone(),
414 hostname.clone(),
415 decoder.clone(),
416 shutdown.clone(),
417 out.clone(),
418 log_namespace,
419 )
420 .await;
421
422 if let Err(command_error) = output {
424 emit!(ExecFailedError {
425 command: config.command_line().as_str(),
426 error: command_error,
427 });
428 }
429
430 tokio::select! {
431 _ = &mut shutdown => break, _ = sleep(duration) => debug!("Restarting streaming process."),
433 }
434 }
435 } else {
436 let output = run_command(
437 config.clone(),
438 hostname,
439 decoder,
440 shutdown,
441 out,
442 log_namespace,
443 )
444 .await;
445
446 if let Err(command_error) = output {
447 emit!(ExecFailedError {
448 command: config.command_line().as_str(),
449 error: command_error,
450 });
451 }
452 }
453
454 Ok(())
455}
456
457async fn run_command(
458 config: ExecConfig,
459 hostname: Option<String>,
460 decoder: Decoder,
461 mut shutdown: ShutdownSignal,
462 mut out: SourceSender,
463 log_namespace: LogNamespace,
464) -> Result<Option<ExitStatus>, Error> {
465 debug!("Starting command run.");
466 let mut command = build_command(&config);
467
468 let start = Instant::now();
471
472 let mut child = command.spawn()?;
473
474 let (sender, mut receiver) = channel(1024);
476
477 if config.include_stderr {
479 let stderr = child
480 .stderr
481 .take()
482 .ok_or_else(|| Error::other("Unable to take stderr of spawned process"))?;
483
484 let stderr_reader = BufReader::new(stderr);
486
487 spawn_reader_thread(stderr_reader, decoder.clone(), STDERR, sender.clone());
488 }
489
490 let stdout = child
491 .stdout
492 .take()
493 .ok_or_else(|| Error::other("Unable to take stdout of spawned process"))?;
494
495 let stdout_reader = BufReader::new(stdout);
497
498 let pid = child.id();
499
500 spawn_reader_thread(stdout_reader, decoder.clone(), STDOUT, sender);
501
502 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
503
504 'outer: loop {
505 tokio::select! {
506 _ = &mut shutdown => {
507 if !shutdown_child(&mut child, &command).await {
508 break 'outer; }
510 }
511 v = receiver.recv() => {
512 match v {
513 None => break 'outer,
514 Some(((mut events, byte_size), stream)) => {
515 bytes_received.emit(ByteSize(byte_size));
516
517 let count = events.len();
518 emit!(ExecEventsReceived {
519 count,
520 command: config.command_line().as_str(),
521 byte_size: events.estimated_json_encoded_size_of(),
522 });
523
524 for event in &mut events {
525 handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
526 }
527 if (out.send_batch(events).await).is_err() {
528 emit!(StreamClosedError { count });
529 break;
530 }
531 },
532 }
533 }
534 }
535 }
536
537 let elapsed = start.elapsed();
538
539 let result = match child.try_wait() {
540 Ok(Some(exit_status)) => {
541 handle_exit_status(&config, exit_status.code(), elapsed);
542 Ok(Some(exit_status))
543 }
544 Ok(None) => {
545 handle_exit_status(&config, None, elapsed);
546 Ok(None)
547 }
548 Err(error) => {
549 error!(message = "Unable to obtain exit status.", %error);
550
551 handle_exit_status(&config, None, elapsed);
552 Ok(None)
553 }
554 };
555
556 debug!("Finished command run.");
557
558 result
559}
560
561fn handle_exit_status(config: &ExecConfig, exit_status: Option<i32>, exec_duration: Duration) {
562 emit!(ExecCommandExecuted {
563 command: config.command_line().as_str(),
564 exit_status,
565 exec_duration,
566 });
567}
568
569#[cfg(unix)]
570async fn shutdown_child(
571 child: &mut tokio::process::Child,
572 command: &tokio::process::Command,
573) -> bool {
574 match child.id().map(i32::try_from) {
575 Some(Ok(pid)) => {
576 if let Err(error) = nix::sys::signal::kill(
578 nix::unistd::Pid::from_raw(pid),
579 nix::sys::signal::Signal::SIGTERM,
580 ) {
581 emit!(ExecFailedToSignalChildError {
582 command,
583 error: ExecFailedToSignalChild::SignalError(error)
584 });
585 false
586 } else {
587 true
588 }
589 }
590 Some(Err(err)) => {
591 emit!(ExecFailedToSignalChildError {
592 command,
593 error: ExecFailedToSignalChild::FailedToMarshalPid(err)
594 });
595 false
596 }
597 None => {
598 emit!(ExecFailedToSignalChildError {
599 command,
600 error: ExecFailedToSignalChild::NoPid
601 });
602 false
603 }
604 }
605}
606
607#[cfg(windows)]
608async fn shutdown_child(
609 child: &mut tokio::process::Child,
610 command: &tokio::process::Command,
611) -> bool {
612 match child.kill().await {
614 Ok(()) => true,
615 Err(err) => {
616 emit!(ExecFailedToSignalChildError {
617 command: &command,
618 error: ExecFailedToSignalChild::IoError(err)
619 });
620 false
621 }
622 }
623}
624
625fn build_command(config: &ExecConfig) -> Command {
626 let command = &config.command[0];
627
628 let mut command = Command::new(command);
629
630 if config.command.len() > 1 {
631 command.args(&config.command[1..]);
632 };
633
634 command.kill_on_drop(true);
635
636 if config.clear_environment {
638 command.env_clear();
639 }
640
641 if let Some(envs) = &config.environment {
643 command.envs(envs);
644 }
645
646 if let Some(current_dir) = &config.working_directory {
648 command.current_dir(current_dir);
649 }
650
651 command.stdout(std::process::Stdio::piped());
653
654 if config.include_stderr {
656 command.stderr(std::process::Stdio::piped());
657 } else {
658 command.stderr(std::process::Stdio::null());
659 }
660
661 command.stdin(std::process::Stdio::null());
663
664 command
665}
666
667fn handle_event(
668 config: &ExecConfig,
669 hostname: &Option<String>,
670 data_stream: &Option<String>,
671 pid: Option<u32>,
672 event: &mut Event,
673 log_namespace: LogNamespace,
674) {
675 if let Event::Log(log) = event {
676 log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME, Utc::now());
677
678 if let Some(data_stream) = data_stream {
680 log_namespace.insert_source_metadata(
681 ExecConfig::NAME,
682 log,
683 Some(LegacyKey::InsertIfEmpty(path!(STREAM_KEY))),
684 path!(STREAM_KEY),
685 data_stream.clone(),
686 );
687 }
688
689 if let Some(pid) = pid {
691 log_namespace.insert_source_metadata(
692 ExecConfig::NAME,
693 log,
694 Some(LegacyKey::InsertIfEmpty(path!(PID_KEY))),
695 path!(PID_KEY),
696 pid as i64,
697 );
698 }
699
700 if let Some(hostname) = hostname {
702 log_namespace.insert_source_metadata(
703 ExecConfig::NAME,
704 log,
705 log_schema().host_key().map(LegacyKey::InsertIfEmpty),
706 path!("host"),
707 hostname.clone(),
708 );
709 }
710
711 log_namespace.insert_source_metadata(
713 ExecConfig::NAME,
714 log,
715 Some(LegacyKey::InsertIfEmpty(path!(COMMAND_KEY))),
716 path!(COMMAND_KEY),
717 config.command.clone(),
718 );
719 }
720}
721
722fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
723 reader: BufReader<R>,
724 decoder: Decoder,
725 origin: &'static str,
726 sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>,
727) {
728 drop(tokio::spawn(async move {
730 debug!("Start capturing {} command output.", origin);
731
732 let mut stream = FramedRead::new(reader, decoder);
733 while let Some(result) = stream.next().await {
734 match result {
735 Ok(next) => {
736 if sender.send((next, origin)).await.is_err() {
737 emit!(ExecChannelClosedError);
740 break;
741 }
742 }
743 Err(error) => {
744 if !error.can_continue() {
747 break;
748 }
749 }
750 }
751 }
752
753 debug!("Finished capturing {} command output.", origin);
754 }));
755}