1use std::{collections::HashMap, io::Error, path::PathBuf, process::ExitStatus};
2
3use chrono::Utc;
4use futures::StreamExt;
5use smallvec::SmallVec;
6use snafu::Snafu;
7use tokio::{
8 io::{AsyncRead, BufReader},
9 process::Command,
10 sync::mpsc::{channel, Sender},
11 time::{self, sleep, Duration, Instant},
12};
13use tokio_stream::wrappers::IntervalStream;
14use tokio_util::codec::FramedRead;
15use vector_lib::codecs::{
16 decoding::{DeserializerConfig, FramingConfig},
17 StreamDecodingError,
18};
19use vector_lib::configurable::configurable_component;
20use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
21use vector_lib::{config::LegacyKey, EstimatedJsonEncodedSizeOf};
22use vrl::path::OwnedValuePath;
23use vrl::value::Kind;
24
25use crate::{
26 codecs::{Decoder, DecodingConfig},
27 config::{SourceConfig, SourceContext, SourceOutput},
28 event::Event,
29 internal_events::{
30 ExecChannelClosedError, ExecCommandExecuted, ExecEventsReceived, ExecFailedError,
31 ExecFailedToSignalChild, ExecFailedToSignalChildError, ExecTimeoutError, StreamClosedError,
32 },
33 serde::default_decoding,
34 shutdown::ShutdownSignal,
35 SourceSender,
36};
37use vector_lib::config::{log_schema, LogNamespace};
38use vector_lib::lookup::{owned_value_path, path};
39
40#[cfg(test)]
41mod tests;
42
43#[configurable_component(source("exec", "Collect output from a process running on the host."))]
45#[derive(Clone, Debug)]
46#[serde(deny_unknown_fields)]
47pub struct ExecConfig {
48 #[configurable(derived)]
49 pub mode: Mode,
50
51 #[configurable(derived)]
52 pub scheduled: Option<ScheduledConfig>,
53
54 #[configurable(derived)]
55 pub streaming: Option<StreamingConfig>,
56
57 #[configurable(metadata(docs::examples = "echo", docs::examples = "Hello World!"))]
59 pub command: Vec<String>,
60
61 #[serde(default)]
64 #[configurable(metadata(docs::additional_props_description = "An environment variable."))]
65 #[configurable(metadata(docs::examples = "environment_examples()"))]
66 pub environment: Option<HashMap<String, String>>,
67
68 #[serde(default = "default_clear_environment")]
70 pub clear_environment: bool,
71
72 pub working_directory: Option<PathBuf>,
74
75 #[serde(default = "default_include_stderr")]
77 pub include_stderr: bool,
78
79 #[serde(default = "default_maximum_buffer_size")]
81 pub maximum_buffer_size_bytes: usize,
82
83 #[configurable(derived)]
84 framing: Option<FramingConfig>,
85
86 #[configurable(derived)]
87 #[serde(default = "default_decoding")]
88 decoding: DeserializerConfig,
89
90 #[configurable(metadata(docs::hidden))]
92 #[serde(default)]
93 log_namespace: Option<bool>,
94}
95
96#[configurable_component]
98#[derive(Clone, Copy, Debug)]
99#[serde(rename_all = "snake_case", deny_unknown_fields)]
100pub enum Mode {
101 Scheduled,
103
104 Streaming,
106}
107
108#[configurable_component]
110#[derive(Clone, Debug)]
111#[serde(deny_unknown_fields)]
112pub struct ScheduledConfig {
113 #[serde(default = "default_exec_interval_secs")]
117 exec_interval_secs: u64,
118}
119
120#[configurable_component]
122#[derive(Clone, Debug)]
123#[serde(deny_unknown_fields)]
124pub struct StreamingConfig {
125 #[serde(default = "default_respawn_on_exit")]
127 respawn_on_exit: bool,
128
129 #[serde(default = "default_respawn_interval_secs")]
131 #[configurable(metadata(docs::human_name = "Respawn Interval"))]
132 respawn_interval_secs: u64,
133}
134
135#[derive(Debug, PartialEq, Eq, Snafu)]
136pub enum ExecConfigError {
137 #[snafu(display("A non-empty list for command must be provided"))]
138 CommandEmpty,
139 #[snafu(display("The maximum buffer size must be greater than zero"))]
140 ZeroBuffer,
141}
142
143impl Default for ExecConfig {
144 fn default() -> Self {
145 ExecConfig {
146 mode: Mode::Scheduled,
147 scheduled: Some(ScheduledConfig {
148 exec_interval_secs: default_exec_interval_secs(),
149 }),
150 streaming: None,
151 command: vec!["echo".to_owned(), "Hello World!".to_owned()],
152 environment: None,
153 clear_environment: default_clear_environment(),
154 working_directory: None,
155 include_stderr: default_include_stderr(),
156 maximum_buffer_size_bytes: default_maximum_buffer_size(),
157 framing: None,
158 decoding: default_decoding(),
159 log_namespace: None,
160 }
161 }
162}
163
164const fn default_maximum_buffer_size() -> usize {
165 1000000
167}
168
169const fn default_exec_interval_secs() -> u64 {
170 60
171}
172
173const fn default_respawn_interval_secs() -> u64 {
174 5
175}
176
177const fn default_respawn_on_exit() -> bool {
178 true
179}
180
181const fn default_clear_environment() -> bool {
182 false
183}
184
185const fn default_include_stderr() -> bool {
186 true
187}
188
189fn environment_examples() -> HashMap<String, String> {
190 HashMap::<_, _>::from_iter([
191 ("LANG".to_owned(), "es_ES.UTF-8".to_owned()),
192 ("TZ".to_owned(), "Etc/UTC".to_owned()),
193 ("PATH".to_owned(), "/bin:/usr/bin:/usr/local/bin".to_owned()),
194 ])
195}
196
197fn get_hostname() -> Option<String> {
198 crate::get_hostname().ok()
199}
200
201const STDOUT: &str = "stdout";
202const STDERR: &str = "stderr";
203const STREAM_KEY: &str = "stream";
204const PID_KEY: &str = "pid";
205const COMMAND_KEY: &str = "command";
206
207impl_generate_config_from_default!(ExecConfig);
208
209impl ExecConfig {
210 fn validate(&self) -> Result<(), ExecConfigError> {
211 if self.command.is_empty() {
212 Err(ExecConfigError::CommandEmpty)
213 } else if self.maximum_buffer_size_bytes == 0 {
214 Err(ExecConfigError::ZeroBuffer)
215 } else {
216 Ok(())
217 }
218 }
219
220 fn command_line(&self) -> String {
221 self.command.join(" ")
222 }
223
224 const fn exec_interval_secs_or_default(&self) -> u64 {
225 match &self.scheduled {
226 None => default_exec_interval_secs(),
227 Some(config) => config.exec_interval_secs,
228 }
229 }
230
231 const fn respawn_on_exit_or_default(&self) -> bool {
232 match &self.streaming {
233 None => default_respawn_on_exit(),
234 Some(config) => config.respawn_on_exit,
235 }
236 }
237
238 const fn respawn_interval_secs_or_default(&self) -> u64 {
239 match &self.streaming {
240 None => default_respawn_interval_secs(),
241 Some(config) => config.respawn_interval_secs,
242 }
243 }
244}
245
246#[async_trait::async_trait]
247#[typetag::serde(name = "exec")]
248impl SourceConfig for ExecConfig {
249 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
250 self.validate()?;
251 let hostname = get_hostname();
252 let log_namespace = cx.log_namespace(self.log_namespace);
253
254 let framing = self
255 .framing
256 .clone()
257 .unwrap_or_else(|| self.decoding.default_stream_framing());
258 let decoder = DecodingConfig::new(framing, self.decoding.clone(), log_namespace).build()?;
259
260 match &self.mode {
261 Mode::Scheduled => {
262 let exec_interval_secs = self.exec_interval_secs_or_default();
263
264 Ok(Box::pin(run_scheduled(
265 self.clone(),
266 hostname,
267 exec_interval_secs,
268 decoder,
269 cx.shutdown,
270 cx.out,
271 log_namespace,
272 )))
273 }
274 Mode::Streaming => {
275 let respawn_on_exit = self.respawn_on_exit_or_default();
276 let respawn_interval_secs = self.respawn_interval_secs_or_default();
277
278 Ok(Box::pin(run_streaming(
279 self.clone(),
280 hostname,
281 respawn_on_exit,
282 respawn_interval_secs,
283 decoder,
284 cx.shutdown,
285 cx.out,
286 log_namespace,
287 )))
288 }
289 }
290 }
291
292 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
293 let log_namespace = global_log_namespace.merge(Some(self.log_namespace.unwrap_or(false)));
294
295 let schema_definition = self
296 .decoding
297 .schema_definition(log_namespace)
298 .with_standard_vector_source_metadata()
299 .with_source_metadata(
300 Self::NAME,
301 Some(LegacyKey::InsertIfEmpty(
302 log_schema()
303 .host_key()
304 .map_or(OwnedValuePath::root(), |key| key.clone()),
305 )),
306 &owned_value_path!("host"),
307 Kind::bytes().or_undefined(),
308 Some("host"),
309 )
310 .with_source_metadata(
311 Self::NAME,
312 Some(LegacyKey::InsertIfEmpty(owned_value_path!(STREAM_KEY))),
313 &owned_value_path!(STREAM_KEY),
314 Kind::bytes().or_undefined(),
315 None,
316 )
317 .with_source_metadata(
318 Self::NAME,
319 Some(LegacyKey::InsertIfEmpty(owned_value_path!(PID_KEY))),
320 &owned_value_path!(PID_KEY),
321 Kind::integer().or_undefined(),
322 None,
323 )
324 .with_source_metadata(
325 Self::NAME,
326 Some(LegacyKey::InsertIfEmpty(owned_value_path!(COMMAND_KEY))),
327 &owned_value_path!(COMMAND_KEY),
328 Kind::bytes(),
329 None,
330 );
331
332 vec![SourceOutput::new_maybe_logs(
333 self.decoding.output_type(),
334 schema_definition,
335 )]
336 }
337
338 fn can_acknowledge(&self) -> bool {
339 false
340 }
341}
342
343async fn run_scheduled(
344 config: ExecConfig,
345 hostname: Option<String>,
346 exec_interval_secs: u64,
347 decoder: Decoder,
348 shutdown: ShutdownSignal,
349 out: SourceSender,
350 log_namespace: LogNamespace,
351) -> Result<(), ()> {
352 debug!("Starting scheduled exec runs.");
353 let schedule = Duration::from_secs(exec_interval_secs);
354
355 let mut interval = IntervalStream::new(time::interval(schedule)).take_until(shutdown.clone());
356
357 while interval.next().await.is_some() {
358 let timeout = tokio::time::timeout(
360 schedule,
361 run_command(
362 config.clone(),
363 hostname.clone(),
364 decoder.clone(),
365 shutdown.clone(),
366 out.clone(),
367 log_namespace,
368 ),
369 )
370 .await;
371
372 match timeout {
373 Ok(output) => {
374 if let Err(command_error) = output {
375 emit!(ExecFailedError {
376 command: config.command_line().as_str(),
377 error: command_error,
378 });
379 }
380 }
381 Err(error) => {
382 emit!(ExecTimeoutError {
383 command: config.command_line().as_str(),
384 elapsed_seconds: schedule.as_secs(),
385 error,
386 });
387 }
388 }
389 }
390
391 debug!("Finished scheduled exec runs.");
392 Ok(())
393}
394
395#[allow(clippy::too_many_arguments)]
396async fn run_streaming(
397 config: ExecConfig,
398 hostname: Option<String>,
399 respawn_on_exit: bool,
400 respawn_interval_secs: u64,
401 decoder: Decoder,
402 mut shutdown: ShutdownSignal,
403 out: SourceSender,
404 log_namespace: LogNamespace,
405) -> Result<(), ()> {
406 if respawn_on_exit {
407 let duration = Duration::from_secs(respawn_interval_secs);
408
409 loop {
411 let output = run_command(
412 config.clone(),
413 hostname.clone(),
414 decoder.clone(),
415 shutdown.clone(),
416 out.clone(),
417 log_namespace,
418 )
419 .await;
420
421 if let Err(command_error) = output {
423 emit!(ExecFailedError {
424 command: config.command_line().as_str(),
425 error: command_error,
426 });
427 }
428
429 tokio::select! {
430 _ = &mut shutdown => break, _ = sleep(duration) => debug!("Restarting streaming process."),
432 }
433 }
434 } else {
435 let output = run_command(
436 config.clone(),
437 hostname,
438 decoder,
439 shutdown,
440 out,
441 log_namespace,
442 )
443 .await;
444
445 if let Err(command_error) = output {
446 emit!(ExecFailedError {
447 command: config.command_line().as_str(),
448 error: command_error,
449 });
450 }
451 }
452
453 Ok(())
454}
455
456async fn run_command(
457 config: ExecConfig,
458 hostname: Option<String>,
459 decoder: Decoder,
460 mut shutdown: ShutdownSignal,
461 mut out: SourceSender,
462 log_namespace: LogNamespace,
463) -> Result<Option<ExitStatus>, Error> {
464 debug!("Starting command run.");
465 let mut command = build_command(&config);
466
467 let start = Instant::now();
470
471 let mut child = command.spawn()?;
472
473 let (sender, mut receiver) = channel(1024);
475
476 if config.include_stderr {
478 let stderr = child
479 .stderr
480 .take()
481 .ok_or_else(|| Error::other("Unable to take stderr of spawned process"))?;
482
483 let stderr_reader = BufReader::new(stderr);
485
486 spawn_reader_thread(stderr_reader, decoder.clone(), STDERR, sender.clone());
487 }
488
489 let stdout = child
490 .stdout
491 .take()
492 .ok_or_else(|| Error::other("Unable to take stdout of spawned process"))?;
493
494 let stdout_reader = BufReader::new(stdout);
496
497 let pid = child.id();
498
499 spawn_reader_thread(stdout_reader, decoder.clone(), STDOUT, sender);
500
501 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
502
503 'outer: loop {
504 tokio::select! {
505 _ = &mut shutdown => {
506 if !shutdown_child(&mut child, &command).await {
507 break 'outer; }
509 }
510 v = receiver.recv() => {
511 match v {
512 None => break 'outer,
513 Some(((mut events, byte_size), stream)) => {
514 bytes_received.emit(ByteSize(byte_size));
515
516 let count = events.len();
517 emit!(ExecEventsReceived {
518 count,
519 command: config.command_line().as_str(),
520 byte_size: events.estimated_json_encoded_size_of(),
521 });
522
523 for event in &mut events {
524 handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
525 }
526 if (out.send_batch(events).await).is_err() {
527 emit!(StreamClosedError { count });
528 break;
529 }
530 },
531 }
532 }
533 }
534 }
535
536 let elapsed = start.elapsed();
537
538 let result = match child.try_wait() {
539 Ok(Some(exit_status)) => {
540 handle_exit_status(&config, exit_status.code(), elapsed);
541 Ok(Some(exit_status))
542 }
543 Ok(None) => {
544 handle_exit_status(&config, None, elapsed);
545 Ok(None)
546 }
547 Err(error) => {
548 error!(message = "Unable to obtain exit status.", %error);
549
550 handle_exit_status(&config, None, elapsed);
551 Ok(None)
552 }
553 };
554
555 debug!("Finished command run.");
556
557 result
558}
559
560fn handle_exit_status(config: &ExecConfig, exit_status: Option<i32>, exec_duration: Duration) {
561 emit!(ExecCommandExecuted {
562 command: config.command_line().as_str(),
563 exit_status,
564 exec_duration,
565 });
566}
567
568#[cfg(unix)]
569async fn shutdown_child(
570 child: &mut tokio::process::Child,
571 command: &tokio::process::Command,
572) -> bool {
573 match child.id().map(i32::try_from) {
574 Some(Ok(pid)) => {
575 if let Err(error) = nix::sys::signal::kill(
577 nix::unistd::Pid::from_raw(pid),
578 nix::sys::signal::Signal::SIGTERM,
579 ) {
580 emit!(ExecFailedToSignalChildError {
581 command,
582 error: ExecFailedToSignalChild::SignalError(error)
583 });
584 false
585 } else {
586 true
587 }
588 }
589 Some(Err(err)) => {
590 emit!(ExecFailedToSignalChildError {
591 command,
592 error: ExecFailedToSignalChild::FailedToMarshalPid(err)
593 });
594 false
595 }
596 None => {
597 emit!(ExecFailedToSignalChildError {
598 command,
599 error: ExecFailedToSignalChild::NoPid
600 });
601 false
602 }
603 }
604}
605
606#[cfg(windows)]
607async fn shutdown_child(
608 child: &mut tokio::process::Child,
609 command: &tokio::process::Command,
610) -> bool {
611 match child.kill().await {
613 Ok(()) => true,
614 Err(err) => {
615 emit!(ExecFailedToSignalChildError {
616 command: &command,
617 error: ExecFailedToSignalChild::IoError(err)
618 });
619 false
620 }
621 }
622}
623
624fn build_command(config: &ExecConfig) -> Command {
625 let command = &config.command[0];
626
627 let mut command = Command::new(command);
628
629 if config.command.len() > 1 {
630 command.args(&config.command[1..]);
631 };
632
633 command.kill_on_drop(true);
634
635 if config.clear_environment {
637 command.env_clear();
638 }
639
640 if let Some(envs) = &config.environment {
642 command.envs(envs);
643 }
644
645 if let Some(current_dir) = &config.working_directory {
647 command.current_dir(current_dir);
648 }
649
650 command.stdout(std::process::Stdio::piped());
652
653 if config.include_stderr {
655 command.stderr(std::process::Stdio::piped());
656 } else {
657 command.stderr(std::process::Stdio::null());
658 }
659
660 command.stdin(std::process::Stdio::null());
662
663 command
664}
665
666fn handle_event(
667 config: &ExecConfig,
668 hostname: &Option<String>,
669 data_stream: &Option<String>,
670 pid: Option<u32>,
671 event: &mut Event,
672 log_namespace: LogNamespace,
673) {
674 if let Event::Log(log) = event {
675 log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME, Utc::now());
676
677 if let Some(data_stream) = data_stream {
679 log_namespace.insert_source_metadata(
680 ExecConfig::NAME,
681 log,
682 Some(LegacyKey::InsertIfEmpty(path!(STREAM_KEY))),
683 path!(STREAM_KEY),
684 data_stream.clone(),
685 );
686 }
687
688 if let Some(pid) = pid {
690 log_namespace.insert_source_metadata(
691 ExecConfig::NAME,
692 log,
693 Some(LegacyKey::InsertIfEmpty(path!(PID_KEY))),
694 path!(PID_KEY),
695 pid as i64,
696 );
697 }
698
699 if let Some(hostname) = hostname {
701 log_namespace.insert_source_metadata(
702 ExecConfig::NAME,
703 log,
704 log_schema().host_key().map(LegacyKey::InsertIfEmpty),
705 path!("host"),
706 hostname.clone(),
707 );
708 }
709
710 log_namespace.insert_source_metadata(
712 ExecConfig::NAME,
713 log,
714 Some(LegacyKey::InsertIfEmpty(path!(COMMAND_KEY))),
715 path!(COMMAND_KEY),
716 config.command.clone(),
717 );
718 }
719}
720
721fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
722 reader: BufReader<R>,
723 decoder: Decoder,
724 origin: &'static str,
725 sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>,
726) {
727 drop(tokio::spawn(async move {
729 debug!("Start capturing {} command output.", origin);
730
731 let mut stream = FramedRead::new(reader, decoder);
732 while let Some(result) = stream.next().await {
733 match result {
734 Ok(next) => {
735 if sender.send((next, origin)).await.is_err() {
736 emit!(ExecChannelClosedError);
739 break;
740 }
741 }
742 Err(error) => {
743 if !error.can_continue() {
746 break;
747 }
748 }
749 }
750 }
751
752 debug!("Finished capturing {} command output.", origin);
753 }));
754}