1use std::{collections::HashMap, io::Error, path::PathBuf, process::ExitStatus};
2
3use chrono::Utc;
4use futures::StreamExt;
5use smallvec::SmallVec;
6use snafu::Snafu;
7use tokio::{
8 io::{AsyncRead, BufReader},
9 process::Command,
10 sync::mpsc::{Sender, channel},
11 time::{self, Duration, Instant, sleep},
12};
13use tokio_stream::wrappers::IntervalStream;
14use vector_lib::{
15 EstimatedJsonEncodedSizeOf,
16 codecs::{
17 Decoder, DecoderFramedRead, DecodingConfig, StreamDecodingError,
18 decoding::{DeserializerConfig, FramingConfig},
19 },
20 config::{LegacyKey, LogNamespace, log_schema},
21 configurable::configurable_component,
22 internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
23 lookup::{owned_value_path, path},
24};
25use vrl::{path::OwnedValuePath, value::Kind};
26
27use crate::{
28 SourceSender,
29 config::{SourceConfig, SourceContext, SourceOutput},
30 event::Event,
31 internal_events::{
32 ExecChannelClosedError, ExecCommandExecuted, ExecEventsReceived, ExecFailedError,
33 ExecFailedToSignalChild, ExecFailedToSignalChildError, ExecTimeoutError, StreamClosedError,
34 },
35 serde::default_decoding,
36 shutdown::ShutdownSignal,
37};
38
39#[cfg(test)]
40mod tests;
41
42#[configurable_component(source("exec", "Collect output from a process running on the host."))]
44#[derive(Clone, Debug)]
45#[serde(deny_unknown_fields)]
46pub struct ExecConfig {
47 #[configurable(derived)]
48 pub mode: Mode,
49
50 #[configurable(derived)]
51 pub scheduled: Option<ScheduledConfig>,
52
53 #[configurable(derived)]
54 pub streaming: Option<StreamingConfig>,
55
56 #[configurable(metadata(docs::examples = "echo", docs::examples = "Hello World!"))]
58 pub command: Vec<String>,
59
60 #[serde(default)]
63 #[configurable(metadata(docs::additional_props_description = "An environment variable."))]
64 #[configurable(metadata(docs::examples = "environment_examples()"))]
65 pub environment: Option<HashMap<String, String>>,
66
67 #[serde(default = "default_clear_environment")]
69 pub clear_environment: bool,
70
71 pub working_directory: Option<PathBuf>,
73
74 #[serde(default = "default_include_stderr")]
76 pub include_stderr: bool,
77
78 #[serde(default = "default_maximum_buffer_size")]
80 pub maximum_buffer_size_bytes: usize,
81
82 #[configurable(derived)]
83 framing: Option<FramingConfig>,
84
85 #[configurable(derived)]
86 #[serde(default = "default_decoding")]
87 decoding: DeserializerConfig,
88
89 #[configurable(metadata(docs::hidden))]
91 #[serde(default)]
92 log_namespace: Option<bool>,
93}
94
95#[configurable_component]
97#[derive(Clone, Copy, Debug)]
98#[serde(rename_all = "snake_case", deny_unknown_fields)]
99pub enum Mode {
100 Scheduled,
102
103 Streaming,
105}
106
107#[configurable_component]
109#[derive(Clone, Debug)]
110#[serde(deny_unknown_fields)]
111pub struct ScheduledConfig {
112 #[serde(default = "default_exec_interval_secs")]
116 exec_interval_secs: u64,
117}
118
119#[configurable_component]
121#[derive(Clone, Debug)]
122#[serde(deny_unknown_fields)]
123pub struct StreamingConfig {
124 #[serde(default = "default_respawn_on_exit")]
126 respawn_on_exit: bool,
127
128 #[serde(default = "default_respawn_interval_secs")]
130 #[configurable(metadata(docs::human_name = "Respawn Interval"))]
131 respawn_interval_secs: u64,
132}
133
134#[derive(Debug, PartialEq, Eq, Snafu)]
135pub enum ExecConfigError {
136 #[snafu(display("A non-empty list for command must be provided"))]
137 CommandEmpty,
138 #[snafu(display("The maximum buffer size must be greater than zero"))]
139 ZeroBuffer,
140}
141
142impl Default for ExecConfig {
143 fn default() -> Self {
144 ExecConfig {
145 mode: Mode::Scheduled,
146 scheduled: Some(ScheduledConfig {
147 exec_interval_secs: default_exec_interval_secs(),
148 }),
149 streaming: None,
150 command: vec!["echo".to_owned(), "Hello World!".to_owned()],
151 environment: None,
152 clear_environment: default_clear_environment(),
153 working_directory: None,
154 include_stderr: default_include_stderr(),
155 maximum_buffer_size_bytes: default_maximum_buffer_size(),
156 framing: None,
157 decoding: default_decoding(),
158 log_namespace: None,
159 }
160 }
161}
162
163const fn default_maximum_buffer_size() -> usize {
164 1000000
166}
167
168const fn default_exec_interval_secs() -> u64 {
169 60
170}
171
172const fn default_respawn_interval_secs() -> u64 {
173 5
174}
175
176const fn default_respawn_on_exit() -> bool {
177 true
178}
179
180const fn default_clear_environment() -> bool {
181 false
182}
183
184const fn default_include_stderr() -> bool {
185 true
186}
187
188fn environment_examples() -> HashMap<String, String> {
189 HashMap::<_, _>::from_iter([
190 ("LANG".to_owned(), "es_ES.UTF-8".to_owned()),
191 ("TZ".to_owned(), "Etc/UTC".to_owned()),
192 ("PATH".to_owned(), "/bin:/usr/bin:/usr/local/bin".to_owned()),
193 ])
194}
195
196fn get_hostname() -> Option<String> {
197 crate::get_hostname().ok()
198}
199
200const STDOUT: &str = "stdout";
201const STDERR: &str = "stderr";
202const STREAM_KEY: &str = "stream";
203const PID_KEY: &str = "pid";
204const COMMAND_KEY: &str = "command";
205
206impl_generate_config_from_default!(ExecConfig);
207
208impl ExecConfig {
209 const fn validate(&self) -> Result<(), ExecConfigError> {
210 if self.command.is_empty() {
211 Err(ExecConfigError::CommandEmpty)
212 } else if self.maximum_buffer_size_bytes == 0 {
213 Err(ExecConfigError::ZeroBuffer)
214 } else {
215 Ok(())
216 }
217 }
218
219 fn command_line(&self) -> String {
220 self.command.join(" ")
221 }
222
223 const fn exec_interval_secs_or_default(&self) -> u64 {
224 match &self.scheduled {
225 None => default_exec_interval_secs(),
226 Some(config) => config.exec_interval_secs,
227 }
228 }
229
230 const fn respawn_on_exit_or_default(&self) -> bool {
231 match &self.streaming {
232 None => default_respawn_on_exit(),
233 Some(config) => config.respawn_on_exit,
234 }
235 }
236
237 const fn respawn_interval_secs_or_default(&self) -> u64 {
238 match &self.streaming {
239 None => default_respawn_interval_secs(),
240 Some(config) => config.respawn_interval_secs,
241 }
242 }
243}
244
245#[async_trait::async_trait]
246#[typetag::serde(name = "exec")]
247impl SourceConfig for ExecConfig {
248 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
249 self.validate()?;
250 let hostname = get_hostname();
251 let log_namespace = cx.log_namespace(self.log_namespace);
252
253 let framing = self
254 .framing
255 .clone()
256 .unwrap_or_else(|| self.decoding.default_stream_framing());
257 let decoder = DecodingConfig::new(framing, self.decoding.clone(), log_namespace).build()?;
258
259 match &self.mode {
260 Mode::Scheduled => {
261 let exec_interval_secs = self.exec_interval_secs_or_default();
262
263 Ok(Box::pin(run_scheduled(
264 self.clone(),
265 hostname,
266 exec_interval_secs,
267 decoder,
268 cx.shutdown,
269 cx.out,
270 log_namespace,
271 )))
272 }
273 Mode::Streaming => {
274 let respawn_on_exit = self.respawn_on_exit_or_default();
275 let respawn_interval_secs = self.respawn_interval_secs_or_default();
276
277 Ok(Box::pin(run_streaming(
278 self.clone(),
279 hostname,
280 respawn_on_exit,
281 respawn_interval_secs,
282 decoder,
283 cx.shutdown,
284 cx.out,
285 log_namespace,
286 )))
287 }
288 }
289 }
290
291 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
292 let log_namespace = global_log_namespace.merge(Some(self.log_namespace.unwrap_or(false)));
293
294 let schema_definition = self
295 .decoding
296 .schema_definition(log_namespace)
297 .with_standard_vector_source_metadata()
298 .with_source_metadata(
299 Self::NAME,
300 Some(LegacyKey::InsertIfEmpty(
301 log_schema()
302 .host_key()
303 .map_or(OwnedValuePath::root(), |key| key.clone()),
304 )),
305 &owned_value_path!("host"),
306 Kind::bytes().or_undefined(),
307 Some("host"),
308 )
309 .with_source_metadata(
310 Self::NAME,
311 Some(LegacyKey::InsertIfEmpty(owned_value_path!(STREAM_KEY))),
312 &owned_value_path!(STREAM_KEY),
313 Kind::bytes().or_undefined(),
314 None,
315 )
316 .with_source_metadata(
317 Self::NAME,
318 Some(LegacyKey::InsertIfEmpty(owned_value_path!(PID_KEY))),
319 &owned_value_path!(PID_KEY),
320 Kind::integer().or_undefined(),
321 None,
322 )
323 .with_source_metadata(
324 Self::NAME,
325 Some(LegacyKey::InsertIfEmpty(owned_value_path!(COMMAND_KEY))),
326 &owned_value_path!(COMMAND_KEY),
327 Kind::bytes(),
328 None,
329 );
330
331 vec![SourceOutput::new_maybe_logs(
332 self.decoding.output_type(),
333 schema_definition,
334 )]
335 }
336
337 fn can_acknowledge(&self) -> bool {
338 false
339 }
340}
341
342async fn run_scheduled(
343 config: ExecConfig,
344 hostname: Option<String>,
345 exec_interval_secs: u64,
346 decoder: Decoder,
347 shutdown: ShutdownSignal,
348 out: SourceSender,
349 log_namespace: LogNamespace,
350) -> Result<(), ()> {
351 debug!("Starting scheduled exec runs.");
352 let schedule = Duration::from_secs(exec_interval_secs);
353
354 let mut interval = IntervalStream::new(time::interval(schedule)).take_until(shutdown.clone());
355
356 while interval.next().await.is_some() {
357 let timeout = tokio::time::timeout(
359 schedule,
360 run_command(
361 config.clone(),
362 hostname.clone(),
363 decoder.clone(),
364 shutdown.clone(),
365 out.clone(),
366 log_namespace,
367 ),
368 )
369 .await;
370
371 match timeout {
372 Ok(output) => {
373 if let Err(command_error) = output {
374 emit!(ExecFailedError {
375 command: config.command_line().as_str(),
376 error: command_error,
377 });
378 }
379 }
380 Err(error) => {
381 emit!(ExecTimeoutError {
382 command: config.command_line().as_str(),
383 elapsed_seconds: schedule.as_secs(),
384 error,
385 });
386 }
387 }
388 }
389
390 debug!("Finished scheduled exec runs.");
391 Ok(())
392}
393
394#[allow(clippy::too_many_arguments)]
395async fn run_streaming(
396 config: ExecConfig,
397 hostname: Option<String>,
398 respawn_on_exit: bool,
399 respawn_interval_secs: u64,
400 decoder: Decoder,
401 mut shutdown: ShutdownSignal,
402 out: SourceSender,
403 log_namespace: LogNamespace,
404) -> Result<(), ()> {
405 if respawn_on_exit {
406 let duration = Duration::from_secs(respawn_interval_secs);
407
408 loop {
410 let output = run_command(
411 config.clone(),
412 hostname.clone(),
413 decoder.clone(),
414 shutdown.clone(),
415 out.clone(),
416 log_namespace,
417 )
418 .await;
419
420 if let Err(command_error) = output {
422 emit!(ExecFailedError {
423 command: config.command_line().as_str(),
424 error: command_error,
425 });
426 }
427
428 tokio::select! {
429 _ = &mut shutdown => break, _ = sleep(duration) => debug!("Restarting streaming process."),
431 }
432 }
433 } else {
434 let output = run_command(
435 config.clone(),
436 hostname,
437 decoder,
438 shutdown,
439 out,
440 log_namespace,
441 )
442 .await;
443
444 if let Err(command_error) = output {
445 emit!(ExecFailedError {
446 command: config.command_line().as_str(),
447 error: command_error,
448 });
449 }
450 }
451
452 Ok(())
453}
454
455async fn run_command(
456 config: ExecConfig,
457 hostname: Option<String>,
458 decoder: Decoder,
459 mut shutdown: ShutdownSignal,
460 mut out: SourceSender,
461 log_namespace: LogNamespace,
462) -> Result<Option<ExitStatus>, Error> {
463 debug!("Starting command run.");
464 let mut command = build_command(&config);
465
466 let start = Instant::now();
469
470 let mut child = command.spawn()?;
471
472 let (sender, mut receiver) = channel(1024);
474
475 if config.include_stderr {
477 let stderr = child
478 .stderr
479 .take()
480 .ok_or_else(|| Error::other("Unable to take stderr of spawned process"))?;
481
482 let stderr_reader = BufReader::new(stderr);
484
485 spawn_reader_thread(stderr_reader, decoder.clone(), STDERR, sender.clone());
486 }
487
488 let stdout = child
489 .stdout
490 .take()
491 .ok_or_else(|| Error::other("Unable to take stdout of spawned process"))?;
492
493 let stdout_reader = BufReader::new(stdout);
495
496 let pid = child.id();
497
498 spawn_reader_thread(stdout_reader, decoder.clone(), STDOUT, sender);
499
500 let bytes_received = register!(BytesReceived::from(Protocol::NONE));
501
502 'outer: loop {
503 tokio::select! {
504 _ = &mut shutdown => {
505 if !shutdown_child(&mut child, &command).await {
506 break 'outer; }
508 }
509 v = receiver.recv() => {
510 match v {
511 None => break 'outer,
512 Some(((mut events, byte_size), stream)) => {
513 bytes_received.emit(ByteSize(byte_size));
514
515 let count = events.len();
516 emit!(ExecEventsReceived {
517 count,
518 command: config.command_line().as_str(),
519 byte_size: events.estimated_json_encoded_size_of(),
520 });
521
522 for event in &mut events {
523 handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
524 }
525 if (out.send_batch(events).await).is_err() {
526 emit!(StreamClosedError { count });
527 break;
528 }
529 },
530 }
531 }
532 }
533 }
534
535 let elapsed = start.elapsed();
536
537 let result = match child.try_wait() {
538 Ok(Some(exit_status)) => {
539 handle_exit_status(&config, exit_status.code(), elapsed);
540 Ok(Some(exit_status))
541 }
542 Ok(None) => {
543 handle_exit_status(&config, None, elapsed);
544 Ok(None)
545 }
546 Err(error) => {
547 error!(message = "Unable to obtain exit status.", %error);
548
549 handle_exit_status(&config, None, elapsed);
550 Ok(None)
551 }
552 };
553
554 debug!("Finished command run.");
555
556 result
557}
558
559fn handle_exit_status(config: &ExecConfig, exit_status: Option<i32>, exec_duration: Duration) {
560 emit!(ExecCommandExecuted {
561 command: config.command_line().as_str(),
562 exit_status,
563 exec_duration,
564 });
565}
566
567#[cfg(unix)]
568async fn shutdown_child(
569 child: &mut tokio::process::Child,
570 command: &tokio::process::Command,
571) -> bool {
572 match child.id().map(i32::try_from) {
573 Some(Ok(pid)) => {
574 if let Err(error) = nix::sys::signal::kill(
576 nix::unistd::Pid::from_raw(pid),
577 nix::sys::signal::Signal::SIGTERM,
578 ) {
579 emit!(ExecFailedToSignalChildError {
580 command,
581 error: ExecFailedToSignalChild::SignalError(error)
582 });
583 false
584 } else {
585 true
586 }
587 }
588 Some(Err(err)) => {
589 emit!(ExecFailedToSignalChildError {
590 command,
591 error: ExecFailedToSignalChild::FailedToMarshalPid(err)
592 });
593 false
594 }
595 None => {
596 emit!(ExecFailedToSignalChildError {
597 command,
598 error: ExecFailedToSignalChild::NoPid
599 });
600 false
601 }
602 }
603}
604
605#[cfg(windows)]
606async fn shutdown_child(
607 child: &mut tokio::process::Child,
608 command: &tokio::process::Command,
609) -> bool {
610 match child.kill().await {
612 Ok(()) => true,
613 Err(err) => {
614 emit!(ExecFailedToSignalChildError {
615 command: &command,
616 error: ExecFailedToSignalChild::IoError(err)
617 });
618 false
619 }
620 }
621}
622
623fn build_command(config: &ExecConfig) -> Command {
624 let command = &config.command[0];
625
626 let mut command = Command::new(command);
627
628 if config.command.len() > 1 {
629 command.args(&config.command[1..]);
630 };
631
632 command.kill_on_drop(true);
633
634 if config.clear_environment {
636 command.env_clear();
637 }
638
639 if let Some(envs) = &config.environment {
641 command.envs(envs);
642 }
643
644 if let Some(current_dir) = &config.working_directory {
646 command.current_dir(current_dir);
647 }
648
649 command.stdout(std::process::Stdio::piped());
651
652 if config.include_stderr {
654 command.stderr(std::process::Stdio::piped());
655 } else {
656 command.stderr(std::process::Stdio::null());
657 }
658
659 command.stdin(std::process::Stdio::null());
661
662 command
663}
664
665fn handle_event(
666 config: &ExecConfig,
667 hostname: &Option<String>,
668 data_stream: &Option<String>,
669 pid: Option<u32>,
670 event: &mut Event,
671 log_namespace: LogNamespace,
672) {
673 if let Event::Log(log) = event {
674 log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME, Utc::now());
675
676 if let Some(data_stream) = data_stream {
678 log_namespace.insert_source_metadata(
679 ExecConfig::NAME,
680 log,
681 Some(LegacyKey::InsertIfEmpty(path!(STREAM_KEY))),
682 path!(STREAM_KEY),
683 data_stream.clone(),
684 );
685 }
686
687 if let Some(pid) = pid {
689 log_namespace.insert_source_metadata(
690 ExecConfig::NAME,
691 log,
692 Some(LegacyKey::InsertIfEmpty(path!(PID_KEY))),
693 path!(PID_KEY),
694 pid as i64,
695 );
696 }
697
698 if let Some(hostname) = hostname {
700 log_namespace.insert_source_metadata(
701 ExecConfig::NAME,
702 log,
703 log_schema().host_key().map(LegacyKey::InsertIfEmpty),
704 path!("host"),
705 hostname.clone(),
706 );
707 }
708
709 log_namespace.insert_source_metadata(
711 ExecConfig::NAME,
712 log,
713 Some(LegacyKey::InsertIfEmpty(path!(COMMAND_KEY))),
714 path!(COMMAND_KEY),
715 config.command.clone(),
716 );
717 }
718}
719
720fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
721 reader: BufReader<R>,
722 decoder: Decoder,
723 origin: &'static str,
724 sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>,
725) {
726 drop(tokio::spawn(async move {
728 debug!("Start capturing {} command output.", origin);
729
730 let mut stream = DecoderFramedRead::new(reader, decoder);
731 while let Some(result) = stream.next().await {
732 match result {
733 Ok(next) => {
734 if sender.send((next, origin)).await.is_err() {
735 emit!(ExecChannelClosedError);
738 break;
739 }
740 }
741 Err(error) => {
742 if !error.can_continue() {
745 break;
746 }
747 }
748 }
749 }
750
751 debug!("Finished capturing {} command output.", origin);
752 }));
753}