vector/sources/exec/
mod.rs

1use std::{collections::HashMap, io::Error, path::PathBuf, process::ExitStatus};
2
3use chrono::Utc;
4use futures::StreamExt;
5use smallvec::SmallVec;
6use snafu::Snafu;
7use tokio::{
8    io::{AsyncRead, BufReader},
9    process::Command,
10    sync::mpsc::{channel, Sender},
11    time::{self, sleep, Duration, Instant},
12};
13use tokio_stream::wrappers::IntervalStream;
14use tokio_util::codec::FramedRead;
15use vector_lib::codecs::{
16    decoding::{DeserializerConfig, FramingConfig},
17    StreamDecodingError,
18};
19use vector_lib::configurable::configurable_component;
20use vector_lib::internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol};
21use vector_lib::{config::LegacyKey, EstimatedJsonEncodedSizeOf};
22use vrl::path::OwnedValuePath;
23use vrl::value::Kind;
24
25use crate::{
26    codecs::{Decoder, DecodingConfig},
27    config::{SourceConfig, SourceContext, SourceOutput},
28    event::Event,
29    internal_events::{
30        ExecChannelClosedError, ExecCommandExecuted, ExecEventsReceived, ExecFailedError,
31        ExecFailedToSignalChild, ExecFailedToSignalChildError, ExecTimeoutError, StreamClosedError,
32    },
33    serde::default_decoding,
34    shutdown::ShutdownSignal,
35    SourceSender,
36};
37use vector_lib::config::{log_schema, LogNamespace};
38use vector_lib::lookup::{owned_value_path, path};
39
40#[cfg(test)]
41mod tests;
42
43/// Configuration for the `exec` source.
44#[configurable_component(source("exec", "Collect output from a process running on the host."))]
45#[derive(Clone, Debug)]
46#[serde(deny_unknown_fields)]
47pub struct ExecConfig {
48    #[configurable(derived)]
49    pub mode: Mode,
50
51    #[configurable(derived)]
52    pub scheduled: Option<ScheduledConfig>,
53
54    #[configurable(derived)]
55    pub streaming: Option<StreamingConfig>,
56
57    /// The command to run, plus any arguments required.
58    #[configurable(metadata(docs::examples = "echo", docs::examples = "Hello World!"))]
59    pub command: Vec<String>,
60
61    /// Custom environment variables to set or update when running the command.
62    /// If a variable name already exists in the environment, its value is replaced.
63    #[serde(default)]
64    #[configurable(metadata(docs::additional_props_description = "An environment variable."))]
65    #[configurable(metadata(docs::examples = "environment_examples()"))]
66    pub environment: Option<HashMap<String, String>>,
67
68    /// Whether or not to clear the environment before setting custom environment variables.
69    #[serde(default = "default_clear_environment")]
70    pub clear_environment: bool,
71
72    /// The directory in which to run the command.
73    pub working_directory: Option<PathBuf>,
74
75    /// Whether or not the output from stderr should be included when generating events.
76    #[serde(default = "default_include_stderr")]
77    pub include_stderr: bool,
78
79    /// The maximum buffer size allowed before a log event is generated.
80    #[serde(default = "default_maximum_buffer_size")]
81    pub maximum_buffer_size_bytes: usize,
82
83    #[configurable(derived)]
84    framing: Option<FramingConfig>,
85
86    #[configurable(derived)]
87    #[serde(default = "default_decoding")]
88    decoding: DeserializerConfig,
89
90    /// The namespace to use for logs. This overrides the global setting.
91    #[configurable(metadata(docs::hidden))]
92    #[serde(default)]
93    log_namespace: Option<bool>,
94}
95
96/// Mode of operation for running the command.
97#[configurable_component]
98#[derive(Clone, Copy, Debug)]
99#[serde(rename_all = "snake_case", deny_unknown_fields)]
100pub enum Mode {
101    /// The command is run on a schedule.
102    Scheduled,
103
104    /// The command is run until it exits, potentially being restarted.
105    Streaming,
106}
107
108/// Configuration options for scheduled commands.
109#[configurable_component]
110#[derive(Clone, Debug)]
111#[serde(deny_unknown_fields)]
112pub struct ScheduledConfig {
113    /// The interval, in seconds, between scheduled command runs.
114    ///
115    /// If the command takes longer than `exec_interval_secs` to run, it is killed.
116    #[serde(default = "default_exec_interval_secs")]
117    exec_interval_secs: u64,
118}
119
120/// Configuration options for streaming commands.
121#[configurable_component]
122#[derive(Clone, Debug)]
123#[serde(deny_unknown_fields)]
124pub struct StreamingConfig {
125    /// Whether or not the command should be rerun if the command exits.
126    #[serde(default = "default_respawn_on_exit")]
127    respawn_on_exit: bool,
128
129    /// The amount of time, in seconds, before rerunning a streaming command that exited.
130    #[serde(default = "default_respawn_interval_secs")]
131    #[configurable(metadata(docs::human_name = "Respawn Interval"))]
132    respawn_interval_secs: u64,
133}
134
135#[derive(Debug, PartialEq, Eq, Snafu)]
136pub enum ExecConfigError {
137    #[snafu(display("A non-empty list for command must be provided"))]
138    CommandEmpty,
139    #[snafu(display("The maximum buffer size must be greater than zero"))]
140    ZeroBuffer,
141}
142
143impl Default for ExecConfig {
144    fn default() -> Self {
145        ExecConfig {
146            mode: Mode::Scheduled,
147            scheduled: Some(ScheduledConfig {
148                exec_interval_secs: default_exec_interval_secs(),
149            }),
150            streaming: None,
151            command: vec!["echo".to_owned(), "Hello World!".to_owned()],
152            environment: None,
153            clear_environment: default_clear_environment(),
154            working_directory: None,
155            include_stderr: default_include_stderr(),
156            maximum_buffer_size_bytes: default_maximum_buffer_size(),
157            framing: None,
158            decoding: default_decoding(),
159            log_namespace: None,
160        }
161    }
162}
163
164const fn default_maximum_buffer_size() -> usize {
165    // 1MB
166    1000000
167}
168
169const fn default_exec_interval_secs() -> u64 {
170    60
171}
172
173const fn default_respawn_interval_secs() -> u64 {
174    5
175}
176
177const fn default_respawn_on_exit() -> bool {
178    true
179}
180
181const fn default_clear_environment() -> bool {
182    false
183}
184
185const fn default_include_stderr() -> bool {
186    true
187}
188
189fn environment_examples() -> HashMap<String, String> {
190    HashMap::<_, _>::from_iter([
191        ("LANG".to_owned(), "es_ES.UTF-8".to_owned()),
192        ("TZ".to_owned(), "Etc/UTC".to_owned()),
193        ("PATH".to_owned(), "/bin:/usr/bin:/usr/local/bin".to_owned()),
194    ])
195}
196
197fn get_hostname() -> Option<String> {
198    crate::get_hostname().ok()
199}
200
201const STDOUT: &str = "stdout";
202const STDERR: &str = "stderr";
203const STREAM_KEY: &str = "stream";
204const PID_KEY: &str = "pid";
205const COMMAND_KEY: &str = "command";
206
207impl_generate_config_from_default!(ExecConfig);
208
209impl ExecConfig {
210    fn validate(&self) -> Result<(), ExecConfigError> {
211        if self.command.is_empty() {
212            Err(ExecConfigError::CommandEmpty)
213        } else if self.maximum_buffer_size_bytes == 0 {
214            Err(ExecConfigError::ZeroBuffer)
215        } else {
216            Ok(())
217        }
218    }
219
220    fn command_line(&self) -> String {
221        self.command.join(" ")
222    }
223
224    const fn exec_interval_secs_or_default(&self) -> u64 {
225        match &self.scheduled {
226            None => default_exec_interval_secs(),
227            Some(config) => config.exec_interval_secs,
228        }
229    }
230
231    const fn respawn_on_exit_or_default(&self) -> bool {
232        match &self.streaming {
233            None => default_respawn_on_exit(),
234            Some(config) => config.respawn_on_exit,
235        }
236    }
237
238    const fn respawn_interval_secs_or_default(&self) -> u64 {
239        match &self.streaming {
240            None => default_respawn_interval_secs(),
241            Some(config) => config.respawn_interval_secs,
242        }
243    }
244}
245
246#[async_trait::async_trait]
247#[typetag::serde(name = "exec")]
248impl SourceConfig for ExecConfig {
249    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
250        self.validate()?;
251        let hostname = get_hostname();
252        let log_namespace = cx.log_namespace(self.log_namespace);
253
254        let framing = self
255            .framing
256            .clone()
257            .unwrap_or_else(|| self.decoding.default_stream_framing());
258        let decoder = DecodingConfig::new(framing, self.decoding.clone(), log_namespace).build()?;
259
260        match &self.mode {
261            Mode::Scheduled => {
262                let exec_interval_secs = self.exec_interval_secs_or_default();
263
264                Ok(Box::pin(run_scheduled(
265                    self.clone(),
266                    hostname,
267                    exec_interval_secs,
268                    decoder,
269                    cx.shutdown,
270                    cx.out,
271                    log_namespace,
272                )))
273            }
274            Mode::Streaming => {
275                let respawn_on_exit = self.respawn_on_exit_or_default();
276                let respawn_interval_secs = self.respawn_interval_secs_or_default();
277
278                Ok(Box::pin(run_streaming(
279                    self.clone(),
280                    hostname,
281                    respawn_on_exit,
282                    respawn_interval_secs,
283                    decoder,
284                    cx.shutdown,
285                    cx.out,
286                    log_namespace,
287                )))
288            }
289        }
290    }
291
292    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
293        let log_namespace = global_log_namespace.merge(Some(self.log_namespace.unwrap_or(false)));
294
295        let schema_definition = self
296            .decoding
297            .schema_definition(log_namespace)
298            .with_standard_vector_source_metadata()
299            .with_source_metadata(
300                Self::NAME,
301                Some(LegacyKey::InsertIfEmpty(
302                    log_schema()
303                        .host_key()
304                        .map_or(OwnedValuePath::root(), |key| key.clone()),
305                )),
306                &owned_value_path!("host"),
307                Kind::bytes().or_undefined(),
308                Some("host"),
309            )
310            .with_source_metadata(
311                Self::NAME,
312                Some(LegacyKey::InsertIfEmpty(owned_value_path!(STREAM_KEY))),
313                &owned_value_path!(STREAM_KEY),
314                Kind::bytes().or_undefined(),
315                None,
316            )
317            .with_source_metadata(
318                Self::NAME,
319                Some(LegacyKey::InsertIfEmpty(owned_value_path!(PID_KEY))),
320                &owned_value_path!(PID_KEY),
321                Kind::integer().or_undefined(),
322                None,
323            )
324            .with_source_metadata(
325                Self::NAME,
326                Some(LegacyKey::InsertIfEmpty(owned_value_path!(COMMAND_KEY))),
327                &owned_value_path!(COMMAND_KEY),
328                Kind::bytes(),
329                None,
330            );
331
332        vec![SourceOutput::new_maybe_logs(
333            self.decoding.output_type(),
334            schema_definition,
335        )]
336    }
337
338    fn can_acknowledge(&self) -> bool {
339        false
340    }
341}
342
343async fn run_scheduled(
344    config: ExecConfig,
345    hostname: Option<String>,
346    exec_interval_secs: u64,
347    decoder: Decoder,
348    shutdown: ShutdownSignal,
349    out: SourceSender,
350    log_namespace: LogNamespace,
351) -> Result<(), ()> {
352    debug!("Starting scheduled exec runs.");
353    let schedule = Duration::from_secs(exec_interval_secs);
354
355    let mut interval = IntervalStream::new(time::interval(schedule)).take_until(shutdown.clone());
356
357    while interval.next().await.is_some() {
358        // Wait for our task to finish, wrapping it in a timeout
359        let timeout = tokio::time::timeout(
360            schedule,
361            run_command(
362                config.clone(),
363                hostname.clone(),
364                decoder.clone(),
365                shutdown.clone(),
366                out.clone(),
367                log_namespace,
368            ),
369        )
370        .await;
371
372        match timeout {
373            Ok(output) => {
374                if let Err(command_error) = output {
375                    emit!(ExecFailedError {
376                        command: config.command_line().as_str(),
377                        error: command_error,
378                    });
379                }
380            }
381            Err(error) => {
382                emit!(ExecTimeoutError {
383                    command: config.command_line().as_str(),
384                    elapsed_seconds: schedule.as_secs(),
385                    error,
386                });
387            }
388        }
389    }
390
391    debug!("Finished scheduled exec runs.");
392    Ok(())
393}
394
395#[allow(clippy::too_many_arguments)]
396async fn run_streaming(
397    config: ExecConfig,
398    hostname: Option<String>,
399    respawn_on_exit: bool,
400    respawn_interval_secs: u64,
401    decoder: Decoder,
402    mut shutdown: ShutdownSignal,
403    out: SourceSender,
404    log_namespace: LogNamespace,
405) -> Result<(), ()> {
406    if respawn_on_exit {
407        let duration = Duration::from_secs(respawn_interval_secs);
408
409        // Continue to loop while not shutdown
410        loop {
411            let output = run_command(
412                config.clone(),
413                hostname.clone(),
414                decoder.clone(),
415                shutdown.clone(),
416                out.clone(),
417                log_namespace,
418            )
419            .await;
420
421            // handle command finished
422            if let Err(command_error) = output {
423                emit!(ExecFailedError {
424                    command: config.command_line().as_str(),
425                    error: command_error,
426                });
427            }
428
429            tokio::select! {
430                _ = &mut shutdown => break, // will break early if a shutdown is started
431                _ = sleep(duration) => debug!("Restarting streaming process."),
432            }
433        }
434    } else {
435        let output = run_command(
436            config.clone(),
437            hostname,
438            decoder,
439            shutdown,
440            out,
441            log_namespace,
442        )
443        .await;
444
445        if let Err(command_error) = output {
446            emit!(ExecFailedError {
447                command: config.command_line().as_str(),
448                error: command_error,
449            });
450        }
451    }
452
453    Ok(())
454}
455
456async fn run_command(
457    config: ExecConfig,
458    hostname: Option<String>,
459    decoder: Decoder,
460    mut shutdown: ShutdownSignal,
461    mut out: SourceSender,
462    log_namespace: LogNamespace,
463) -> Result<Option<ExitStatus>, Error> {
464    debug!("Starting command run.");
465    let mut command = build_command(&config);
466
467    // Mark the start time just before spawning the process as
468    // this seems to be the best approximation of exec duration
469    let start = Instant::now();
470
471    let mut child = command.spawn()?;
472
473    // Set up communication channels
474    let (sender, mut receiver) = channel(1024);
475
476    // Optionally include stderr
477    if config.include_stderr {
478        let stderr = child
479            .stderr
480            .take()
481            .ok_or_else(|| Error::other("Unable to take stderr of spawned process"))?;
482
483        // Create stderr async reader
484        let stderr_reader = BufReader::new(stderr);
485
486        spawn_reader_thread(stderr_reader, decoder.clone(), STDERR, sender.clone());
487    }
488
489    let stdout = child
490        .stdout
491        .take()
492        .ok_or_else(|| Error::other("Unable to take stdout of spawned process"))?;
493
494    // Create stdout async reader
495    let stdout_reader = BufReader::new(stdout);
496
497    let pid = child.id();
498
499    spawn_reader_thread(stdout_reader, decoder.clone(), STDOUT, sender);
500
501    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
502
503    'outer: loop {
504        tokio::select! {
505            _ = &mut shutdown => {
506                if !shutdown_child(&mut child, &command).await {
507                        break 'outer; // couldn't signal, exit early
508                }
509            }
510            v = receiver.recv() => {
511                match v {
512                    None => break 'outer,
513                    Some(((mut events, byte_size), stream)) => {
514                        bytes_received.emit(ByteSize(byte_size));
515
516                        let count = events.len();
517                        emit!(ExecEventsReceived {
518                            count,
519                            command: config.command_line().as_str(),
520                            byte_size: events.estimated_json_encoded_size_of(),
521                        });
522
523                        for event in &mut events {
524                            handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
525                        }
526                        if (out.send_batch(events).await).is_err() {
527                            emit!(StreamClosedError { count });
528                            break;
529                        }
530                    },
531                }
532            }
533        }
534    }
535
536    let elapsed = start.elapsed();
537
538    let result = match child.try_wait() {
539        Ok(Some(exit_status)) => {
540            handle_exit_status(&config, exit_status.code(), elapsed);
541            Ok(Some(exit_status))
542        }
543        Ok(None) => {
544            handle_exit_status(&config, None, elapsed);
545            Ok(None)
546        }
547        Err(error) => {
548            error!(message = "Unable to obtain exit status.", %error);
549
550            handle_exit_status(&config, None, elapsed);
551            Ok(None)
552        }
553    };
554
555    debug!("Finished command run.");
556
557    result
558}
559
560fn handle_exit_status(config: &ExecConfig, exit_status: Option<i32>, exec_duration: Duration) {
561    emit!(ExecCommandExecuted {
562        command: config.command_line().as_str(),
563        exit_status,
564        exec_duration,
565    });
566}
567
568#[cfg(unix)]
569async fn shutdown_child(
570    child: &mut tokio::process::Child,
571    command: &tokio::process::Command,
572) -> bool {
573    match child.id().map(i32::try_from) {
574        Some(Ok(pid)) => {
575            // shutting down, send a SIGTERM to the child
576            if let Err(error) = nix::sys::signal::kill(
577                nix::unistd::Pid::from_raw(pid),
578                nix::sys::signal::Signal::SIGTERM,
579            ) {
580                emit!(ExecFailedToSignalChildError {
581                    command,
582                    error: ExecFailedToSignalChild::SignalError(error)
583                });
584                false
585            } else {
586                true
587            }
588        }
589        Some(Err(err)) => {
590            emit!(ExecFailedToSignalChildError {
591                command,
592                error: ExecFailedToSignalChild::FailedToMarshalPid(err)
593            });
594            false
595        }
596        None => {
597            emit!(ExecFailedToSignalChildError {
598                command,
599                error: ExecFailedToSignalChild::NoPid
600            });
601            false
602        }
603    }
604}
605
606#[cfg(windows)]
607async fn shutdown_child(
608    child: &mut tokio::process::Child,
609    command: &tokio::process::Command,
610) -> bool {
611    // TODO Graceful shutdown of Windows processes
612    match child.kill().await {
613        Ok(()) => true,
614        Err(err) => {
615            emit!(ExecFailedToSignalChildError {
616                command: &command,
617                error: ExecFailedToSignalChild::IoError(err)
618            });
619            false
620        }
621    }
622}
623
624fn build_command(config: &ExecConfig) -> Command {
625    let command = &config.command[0];
626
627    let mut command = Command::new(command);
628
629    if config.command.len() > 1 {
630        command.args(&config.command[1..]);
631    };
632
633    command.kill_on_drop(true);
634
635    // Clear environment variables if needed
636    if config.clear_environment {
637        command.env_clear();
638    }
639
640    // Configure environment variables if needed
641    if let Some(envs) = &config.environment {
642        command.envs(envs);
643    }
644
645    // Explicitly set the current dir if needed
646    if let Some(current_dir) = &config.working_directory {
647        command.current_dir(current_dir);
648    }
649
650    // Pipe our stdout to the process
651    command.stdout(std::process::Stdio::piped());
652
653    // Pipe stderr to the process if needed
654    if config.include_stderr {
655        command.stderr(std::process::Stdio::piped());
656    } else {
657        command.stderr(std::process::Stdio::null());
658    }
659
660    // Stdin is not needed
661    command.stdin(std::process::Stdio::null());
662
663    command
664}
665
666fn handle_event(
667    config: &ExecConfig,
668    hostname: &Option<String>,
669    data_stream: &Option<String>,
670    pid: Option<u32>,
671    event: &mut Event,
672    log_namespace: LogNamespace,
673) {
674    if let Event::Log(log) = event {
675        log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME, Utc::now());
676
677        // Add data stream of stdin or stderr (if needed)
678        if let Some(data_stream) = data_stream {
679            log_namespace.insert_source_metadata(
680                ExecConfig::NAME,
681                log,
682                Some(LegacyKey::InsertIfEmpty(path!(STREAM_KEY))),
683                path!(STREAM_KEY),
684                data_stream.clone(),
685            );
686        }
687
688        // Add pid (if needed)
689        if let Some(pid) = pid {
690            log_namespace.insert_source_metadata(
691                ExecConfig::NAME,
692                log,
693                Some(LegacyKey::InsertIfEmpty(path!(PID_KEY))),
694                path!(PID_KEY),
695                pid as i64,
696            );
697        }
698
699        // Add hostname (if needed)
700        if let Some(hostname) = hostname {
701            log_namespace.insert_source_metadata(
702                ExecConfig::NAME,
703                log,
704                log_schema().host_key().map(LegacyKey::InsertIfEmpty),
705                path!("host"),
706                hostname.clone(),
707            );
708        }
709
710        // Add command
711        log_namespace.insert_source_metadata(
712            ExecConfig::NAME,
713            log,
714            Some(LegacyKey::InsertIfEmpty(path!(COMMAND_KEY))),
715            path!(COMMAND_KEY),
716            config.command.clone(),
717        );
718    }
719}
720
721fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
722    reader: BufReader<R>,
723    decoder: Decoder,
724    origin: &'static str,
725    sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>,
726) {
727    // Start the green background thread for collecting
728    drop(tokio::spawn(async move {
729        debug!("Start capturing {} command output.", origin);
730
731        let mut stream = FramedRead::new(reader, decoder);
732        while let Some(result) = stream.next().await {
733            match result {
734                Ok(next) => {
735                    if sender.send((next, origin)).await.is_err() {
736                        // If the receive half of the channel is closed, either due to close being
737                        // called or the Receiver handle dropping, the function returns an error.
738                        emit!(ExecChannelClosedError);
739                        break;
740                    }
741                }
742                Err(error) => {
743                    // Error is logged by `crate::codecs::Decoder`, no further
744                    // handling is needed here.
745                    if !error.can_continue() {
746                        break;
747                    }
748                }
749            }
750        }
751
752        debug!("Finished capturing {} command output.", origin);
753    }));
754}