vector/sources/exec/
mod.rs

1use std::{collections::HashMap, io::Error, path::PathBuf, process::ExitStatus};
2
3use chrono::Utc;
4use futures::StreamExt;
5use smallvec::SmallVec;
6use snafu::Snafu;
7use tokio::{
8    io::{AsyncRead, BufReader},
9    process::Command,
10    sync::mpsc::{Sender, channel},
11    time::{self, Duration, Instant, sleep},
12};
13use tokio_stream::wrappers::IntervalStream;
14use tokio_util::codec::FramedRead;
15use vector_lib::{
16    EstimatedJsonEncodedSizeOf,
17    codecs::{
18        StreamDecodingError,
19        decoding::{DeserializerConfig, FramingConfig},
20    },
21    config::{LegacyKey, LogNamespace, log_schema},
22    configurable::configurable_component,
23    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
24    lookup::{owned_value_path, path},
25};
26use vrl::{path::OwnedValuePath, value::Kind};
27
28use crate::{
29    SourceSender,
30    codecs::{Decoder, DecodingConfig},
31    config::{SourceConfig, SourceContext, SourceOutput},
32    event::Event,
33    internal_events::{
34        ExecChannelClosedError, ExecCommandExecuted, ExecEventsReceived, ExecFailedError,
35        ExecFailedToSignalChild, ExecFailedToSignalChildError, ExecTimeoutError, StreamClosedError,
36    },
37    serde::default_decoding,
38    shutdown::ShutdownSignal,
39};
40
41#[cfg(test)]
42mod tests;
43
44/// Configuration for the `exec` source.
45#[configurable_component(source("exec", "Collect output from a process running on the host."))]
46#[derive(Clone, Debug)]
47#[serde(deny_unknown_fields)]
48pub struct ExecConfig {
49    #[configurable(derived)]
50    pub mode: Mode,
51
52    #[configurable(derived)]
53    pub scheduled: Option<ScheduledConfig>,
54
55    #[configurable(derived)]
56    pub streaming: Option<StreamingConfig>,
57
58    /// The command to run, plus any arguments required.
59    #[configurable(metadata(docs::examples = "echo", docs::examples = "Hello World!"))]
60    pub command: Vec<String>,
61
62    /// Custom environment variables to set or update when running the command.
63    /// If a variable name already exists in the environment, its value is replaced.
64    #[serde(default)]
65    #[configurable(metadata(docs::additional_props_description = "An environment variable."))]
66    #[configurable(metadata(docs::examples = "environment_examples()"))]
67    pub environment: Option<HashMap<String, String>>,
68
69    /// Whether or not to clear the environment before setting custom environment variables.
70    #[serde(default = "default_clear_environment")]
71    pub clear_environment: bool,
72
73    /// The directory in which to run the command.
74    pub working_directory: Option<PathBuf>,
75
76    /// Whether or not the output from stderr should be included when generating events.
77    #[serde(default = "default_include_stderr")]
78    pub include_stderr: bool,
79
80    /// The maximum buffer size allowed before a log event is generated.
81    #[serde(default = "default_maximum_buffer_size")]
82    pub maximum_buffer_size_bytes: usize,
83
84    #[configurable(derived)]
85    framing: Option<FramingConfig>,
86
87    #[configurable(derived)]
88    #[serde(default = "default_decoding")]
89    decoding: DeserializerConfig,
90
91    /// The namespace to use for logs. This overrides the global setting.
92    #[configurable(metadata(docs::hidden))]
93    #[serde(default)]
94    log_namespace: Option<bool>,
95}
96
97/// Mode of operation for running the command.
98#[configurable_component]
99#[derive(Clone, Copy, Debug)]
100#[serde(rename_all = "snake_case", deny_unknown_fields)]
101pub enum Mode {
102    /// The command is run on a schedule.
103    Scheduled,
104
105    /// The command is run until it exits, potentially being restarted.
106    Streaming,
107}
108
109/// Configuration options for scheduled commands.
110#[configurable_component]
111#[derive(Clone, Debug)]
112#[serde(deny_unknown_fields)]
113pub struct ScheduledConfig {
114    /// The interval, in seconds, between scheduled command runs.
115    ///
116    /// If the command takes longer than `exec_interval_secs` to run, it is killed.
117    #[serde(default = "default_exec_interval_secs")]
118    exec_interval_secs: u64,
119}
120
121/// Configuration options for streaming commands.
122#[configurable_component]
123#[derive(Clone, Debug)]
124#[serde(deny_unknown_fields)]
125pub struct StreamingConfig {
126    /// Whether or not the command should be rerun if the command exits.
127    #[serde(default = "default_respawn_on_exit")]
128    respawn_on_exit: bool,
129
130    /// The amount of time, in seconds, before rerunning a streaming command that exited.
131    #[serde(default = "default_respawn_interval_secs")]
132    #[configurable(metadata(docs::human_name = "Respawn Interval"))]
133    respawn_interval_secs: u64,
134}
135
136#[derive(Debug, PartialEq, Eq, Snafu)]
137pub enum ExecConfigError {
138    #[snafu(display("A non-empty list for command must be provided"))]
139    CommandEmpty,
140    #[snafu(display("The maximum buffer size must be greater than zero"))]
141    ZeroBuffer,
142}
143
144impl Default for ExecConfig {
145    fn default() -> Self {
146        ExecConfig {
147            mode: Mode::Scheduled,
148            scheduled: Some(ScheduledConfig {
149                exec_interval_secs: default_exec_interval_secs(),
150            }),
151            streaming: None,
152            command: vec!["echo".to_owned(), "Hello World!".to_owned()],
153            environment: None,
154            clear_environment: default_clear_environment(),
155            working_directory: None,
156            include_stderr: default_include_stderr(),
157            maximum_buffer_size_bytes: default_maximum_buffer_size(),
158            framing: None,
159            decoding: default_decoding(),
160            log_namespace: None,
161        }
162    }
163}
164
165const fn default_maximum_buffer_size() -> usize {
166    // 1MB
167    1000000
168}
169
170const fn default_exec_interval_secs() -> u64 {
171    60
172}
173
174const fn default_respawn_interval_secs() -> u64 {
175    5
176}
177
178const fn default_respawn_on_exit() -> bool {
179    true
180}
181
182const fn default_clear_environment() -> bool {
183    false
184}
185
186const fn default_include_stderr() -> bool {
187    true
188}
189
190fn environment_examples() -> HashMap<String, String> {
191    HashMap::<_, _>::from_iter([
192        ("LANG".to_owned(), "es_ES.UTF-8".to_owned()),
193        ("TZ".to_owned(), "Etc/UTC".to_owned()),
194        ("PATH".to_owned(), "/bin:/usr/bin:/usr/local/bin".to_owned()),
195    ])
196}
197
198fn get_hostname() -> Option<String> {
199    crate::get_hostname().ok()
200}
201
202const STDOUT: &str = "stdout";
203const STDERR: &str = "stderr";
204const STREAM_KEY: &str = "stream";
205const PID_KEY: &str = "pid";
206const COMMAND_KEY: &str = "command";
207
208impl_generate_config_from_default!(ExecConfig);
209
210impl ExecConfig {
211    const fn validate(&self) -> Result<(), ExecConfigError> {
212        if self.command.is_empty() {
213            Err(ExecConfigError::CommandEmpty)
214        } else if self.maximum_buffer_size_bytes == 0 {
215            Err(ExecConfigError::ZeroBuffer)
216        } else {
217            Ok(())
218        }
219    }
220
221    fn command_line(&self) -> String {
222        self.command.join(" ")
223    }
224
225    const fn exec_interval_secs_or_default(&self) -> u64 {
226        match &self.scheduled {
227            None => default_exec_interval_secs(),
228            Some(config) => config.exec_interval_secs,
229        }
230    }
231
232    const fn respawn_on_exit_or_default(&self) -> bool {
233        match &self.streaming {
234            None => default_respawn_on_exit(),
235            Some(config) => config.respawn_on_exit,
236        }
237    }
238
239    const fn respawn_interval_secs_or_default(&self) -> u64 {
240        match &self.streaming {
241            None => default_respawn_interval_secs(),
242            Some(config) => config.respawn_interval_secs,
243        }
244    }
245}
246
247#[async_trait::async_trait]
248#[typetag::serde(name = "exec")]
249impl SourceConfig for ExecConfig {
250    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
251        self.validate()?;
252        let hostname = get_hostname();
253        let log_namespace = cx.log_namespace(self.log_namespace);
254
255        let framing = self
256            .framing
257            .clone()
258            .unwrap_or_else(|| self.decoding.default_stream_framing());
259        let decoder = DecodingConfig::new(framing, self.decoding.clone(), log_namespace).build()?;
260
261        match &self.mode {
262            Mode::Scheduled => {
263                let exec_interval_secs = self.exec_interval_secs_or_default();
264
265                Ok(Box::pin(run_scheduled(
266                    self.clone(),
267                    hostname,
268                    exec_interval_secs,
269                    decoder,
270                    cx.shutdown,
271                    cx.out,
272                    log_namespace,
273                )))
274            }
275            Mode::Streaming => {
276                let respawn_on_exit = self.respawn_on_exit_or_default();
277                let respawn_interval_secs = self.respawn_interval_secs_or_default();
278
279                Ok(Box::pin(run_streaming(
280                    self.clone(),
281                    hostname,
282                    respawn_on_exit,
283                    respawn_interval_secs,
284                    decoder,
285                    cx.shutdown,
286                    cx.out,
287                    log_namespace,
288                )))
289            }
290        }
291    }
292
293    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
294        let log_namespace = global_log_namespace.merge(Some(self.log_namespace.unwrap_or(false)));
295
296        let schema_definition = self
297            .decoding
298            .schema_definition(log_namespace)
299            .with_standard_vector_source_metadata()
300            .with_source_metadata(
301                Self::NAME,
302                Some(LegacyKey::InsertIfEmpty(
303                    log_schema()
304                        .host_key()
305                        .map_or(OwnedValuePath::root(), |key| key.clone()),
306                )),
307                &owned_value_path!("host"),
308                Kind::bytes().or_undefined(),
309                Some("host"),
310            )
311            .with_source_metadata(
312                Self::NAME,
313                Some(LegacyKey::InsertIfEmpty(owned_value_path!(STREAM_KEY))),
314                &owned_value_path!(STREAM_KEY),
315                Kind::bytes().or_undefined(),
316                None,
317            )
318            .with_source_metadata(
319                Self::NAME,
320                Some(LegacyKey::InsertIfEmpty(owned_value_path!(PID_KEY))),
321                &owned_value_path!(PID_KEY),
322                Kind::integer().or_undefined(),
323                None,
324            )
325            .with_source_metadata(
326                Self::NAME,
327                Some(LegacyKey::InsertIfEmpty(owned_value_path!(COMMAND_KEY))),
328                &owned_value_path!(COMMAND_KEY),
329                Kind::bytes(),
330                None,
331            );
332
333        vec![SourceOutput::new_maybe_logs(
334            self.decoding.output_type(),
335            schema_definition,
336        )]
337    }
338
339    fn can_acknowledge(&self) -> bool {
340        false
341    }
342}
343
344async fn run_scheduled(
345    config: ExecConfig,
346    hostname: Option<String>,
347    exec_interval_secs: u64,
348    decoder: Decoder,
349    shutdown: ShutdownSignal,
350    out: SourceSender,
351    log_namespace: LogNamespace,
352) -> Result<(), ()> {
353    debug!("Starting scheduled exec runs.");
354    let schedule = Duration::from_secs(exec_interval_secs);
355
356    let mut interval = IntervalStream::new(time::interval(schedule)).take_until(shutdown.clone());
357
358    while interval.next().await.is_some() {
359        // Wait for our task to finish, wrapping it in a timeout
360        let timeout = tokio::time::timeout(
361            schedule,
362            run_command(
363                config.clone(),
364                hostname.clone(),
365                decoder.clone(),
366                shutdown.clone(),
367                out.clone(),
368                log_namespace,
369            ),
370        )
371        .await;
372
373        match timeout {
374            Ok(output) => {
375                if let Err(command_error) = output {
376                    emit!(ExecFailedError {
377                        command: config.command_line().as_str(),
378                        error: command_error,
379                    });
380                }
381            }
382            Err(error) => {
383                emit!(ExecTimeoutError {
384                    command: config.command_line().as_str(),
385                    elapsed_seconds: schedule.as_secs(),
386                    error,
387                });
388            }
389        }
390    }
391
392    debug!("Finished scheduled exec runs.");
393    Ok(())
394}
395
396#[allow(clippy::too_many_arguments)]
397async fn run_streaming(
398    config: ExecConfig,
399    hostname: Option<String>,
400    respawn_on_exit: bool,
401    respawn_interval_secs: u64,
402    decoder: Decoder,
403    mut shutdown: ShutdownSignal,
404    out: SourceSender,
405    log_namespace: LogNamespace,
406) -> Result<(), ()> {
407    if respawn_on_exit {
408        let duration = Duration::from_secs(respawn_interval_secs);
409
410        // Continue to loop while not shutdown
411        loop {
412            let output = run_command(
413                config.clone(),
414                hostname.clone(),
415                decoder.clone(),
416                shutdown.clone(),
417                out.clone(),
418                log_namespace,
419            )
420            .await;
421
422            // handle command finished
423            if let Err(command_error) = output {
424                emit!(ExecFailedError {
425                    command: config.command_line().as_str(),
426                    error: command_error,
427                });
428            }
429
430            tokio::select! {
431                _ = &mut shutdown => break, // will break early if a shutdown is started
432                _ = sleep(duration) => debug!("Restarting streaming process."),
433            }
434        }
435    } else {
436        let output = run_command(
437            config.clone(),
438            hostname,
439            decoder,
440            shutdown,
441            out,
442            log_namespace,
443        )
444        .await;
445
446        if let Err(command_error) = output {
447            emit!(ExecFailedError {
448                command: config.command_line().as_str(),
449                error: command_error,
450            });
451        }
452    }
453
454    Ok(())
455}
456
457async fn run_command(
458    config: ExecConfig,
459    hostname: Option<String>,
460    decoder: Decoder,
461    mut shutdown: ShutdownSignal,
462    mut out: SourceSender,
463    log_namespace: LogNamespace,
464) -> Result<Option<ExitStatus>, Error> {
465    debug!("Starting command run.");
466    let mut command = build_command(&config);
467
468    // Mark the start time just before spawning the process as
469    // this seems to be the best approximation of exec duration
470    let start = Instant::now();
471
472    let mut child = command.spawn()?;
473
474    // Set up communication channels
475    let (sender, mut receiver) = channel(1024);
476
477    // Optionally include stderr
478    if config.include_stderr {
479        let stderr = child
480            .stderr
481            .take()
482            .ok_or_else(|| Error::other("Unable to take stderr of spawned process"))?;
483
484        // Create stderr async reader
485        let stderr_reader = BufReader::new(stderr);
486
487        spawn_reader_thread(stderr_reader, decoder.clone(), STDERR, sender.clone());
488    }
489
490    let stdout = child
491        .stdout
492        .take()
493        .ok_or_else(|| Error::other("Unable to take stdout of spawned process"))?;
494
495    // Create stdout async reader
496    let stdout_reader = BufReader::new(stdout);
497
498    let pid = child.id();
499
500    spawn_reader_thread(stdout_reader, decoder.clone(), STDOUT, sender);
501
502    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
503
504    'outer: loop {
505        tokio::select! {
506            _ = &mut shutdown => {
507                if !shutdown_child(&mut child, &command).await {
508                        break 'outer; // couldn't signal, exit early
509                }
510            }
511            v = receiver.recv() => {
512                match v {
513                    None => break 'outer,
514                    Some(((mut events, byte_size), stream)) => {
515                        bytes_received.emit(ByteSize(byte_size));
516
517                        let count = events.len();
518                        emit!(ExecEventsReceived {
519                            count,
520                            command: config.command_line().as_str(),
521                            byte_size: events.estimated_json_encoded_size_of(),
522                        });
523
524                        for event in &mut events {
525                            handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
526                        }
527                        if (out.send_batch(events).await).is_err() {
528                            emit!(StreamClosedError { count });
529                            break;
530                        }
531                    },
532                }
533            }
534        }
535    }
536
537    let elapsed = start.elapsed();
538
539    let result = match child.try_wait() {
540        Ok(Some(exit_status)) => {
541            handle_exit_status(&config, exit_status.code(), elapsed);
542            Ok(Some(exit_status))
543        }
544        Ok(None) => {
545            handle_exit_status(&config, None, elapsed);
546            Ok(None)
547        }
548        Err(error) => {
549            error!(message = "Unable to obtain exit status.", %error);
550
551            handle_exit_status(&config, None, elapsed);
552            Ok(None)
553        }
554    };
555
556    debug!("Finished command run.");
557
558    result
559}
560
561fn handle_exit_status(config: &ExecConfig, exit_status: Option<i32>, exec_duration: Duration) {
562    emit!(ExecCommandExecuted {
563        command: config.command_line().as_str(),
564        exit_status,
565        exec_duration,
566    });
567}
568
569#[cfg(unix)]
570async fn shutdown_child(
571    child: &mut tokio::process::Child,
572    command: &tokio::process::Command,
573) -> bool {
574    match child.id().map(i32::try_from) {
575        Some(Ok(pid)) => {
576            // shutting down, send a SIGTERM to the child
577            if let Err(error) = nix::sys::signal::kill(
578                nix::unistd::Pid::from_raw(pid),
579                nix::sys::signal::Signal::SIGTERM,
580            ) {
581                emit!(ExecFailedToSignalChildError {
582                    command,
583                    error: ExecFailedToSignalChild::SignalError(error)
584                });
585                false
586            } else {
587                true
588            }
589        }
590        Some(Err(err)) => {
591            emit!(ExecFailedToSignalChildError {
592                command,
593                error: ExecFailedToSignalChild::FailedToMarshalPid(err)
594            });
595            false
596        }
597        None => {
598            emit!(ExecFailedToSignalChildError {
599                command,
600                error: ExecFailedToSignalChild::NoPid
601            });
602            false
603        }
604    }
605}
606
607#[cfg(windows)]
608async fn shutdown_child(
609    child: &mut tokio::process::Child,
610    command: &tokio::process::Command,
611) -> bool {
612    // TODO Graceful shutdown of Windows processes
613    match child.kill().await {
614        Ok(()) => true,
615        Err(err) => {
616            emit!(ExecFailedToSignalChildError {
617                command: &command,
618                error: ExecFailedToSignalChild::IoError(err)
619            });
620            false
621        }
622    }
623}
624
625fn build_command(config: &ExecConfig) -> Command {
626    let command = &config.command[0];
627
628    let mut command = Command::new(command);
629
630    if config.command.len() > 1 {
631        command.args(&config.command[1..]);
632    };
633
634    command.kill_on_drop(true);
635
636    // Clear environment variables if needed
637    if config.clear_environment {
638        command.env_clear();
639    }
640
641    // Configure environment variables if needed
642    if let Some(envs) = &config.environment {
643        command.envs(envs);
644    }
645
646    // Explicitly set the current dir if needed
647    if let Some(current_dir) = &config.working_directory {
648        command.current_dir(current_dir);
649    }
650
651    // Pipe our stdout to the process
652    command.stdout(std::process::Stdio::piped());
653
654    // Pipe stderr to the process if needed
655    if config.include_stderr {
656        command.stderr(std::process::Stdio::piped());
657    } else {
658        command.stderr(std::process::Stdio::null());
659    }
660
661    // Stdin is not needed
662    command.stdin(std::process::Stdio::null());
663
664    command
665}
666
667fn handle_event(
668    config: &ExecConfig,
669    hostname: &Option<String>,
670    data_stream: &Option<String>,
671    pid: Option<u32>,
672    event: &mut Event,
673    log_namespace: LogNamespace,
674) {
675    if let Event::Log(log) = event {
676        log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME, Utc::now());
677
678        // Add data stream of stdin or stderr (if needed)
679        if let Some(data_stream) = data_stream {
680            log_namespace.insert_source_metadata(
681                ExecConfig::NAME,
682                log,
683                Some(LegacyKey::InsertIfEmpty(path!(STREAM_KEY))),
684                path!(STREAM_KEY),
685                data_stream.clone(),
686            );
687        }
688
689        // Add pid (if needed)
690        if let Some(pid) = pid {
691            log_namespace.insert_source_metadata(
692                ExecConfig::NAME,
693                log,
694                Some(LegacyKey::InsertIfEmpty(path!(PID_KEY))),
695                path!(PID_KEY),
696                pid as i64,
697            );
698        }
699
700        // Add hostname (if needed)
701        if let Some(hostname) = hostname {
702            log_namespace.insert_source_metadata(
703                ExecConfig::NAME,
704                log,
705                log_schema().host_key().map(LegacyKey::InsertIfEmpty),
706                path!("host"),
707                hostname.clone(),
708            );
709        }
710
711        // Add command
712        log_namespace.insert_source_metadata(
713            ExecConfig::NAME,
714            log,
715            Some(LegacyKey::InsertIfEmpty(path!(COMMAND_KEY))),
716            path!(COMMAND_KEY),
717            config.command.clone(),
718        );
719    }
720}
721
722fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
723    reader: BufReader<R>,
724    decoder: Decoder,
725    origin: &'static str,
726    sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>,
727) {
728    // Start the green background thread for collecting
729    drop(tokio::spawn(async move {
730        debug!("Start capturing {} command output.", origin);
731
732        let mut stream = FramedRead::new(reader, decoder);
733        while let Some(result) = stream.next().await {
734            match result {
735                Ok(next) => {
736                    if sender.send((next, origin)).await.is_err() {
737                        // If the receive half of the channel is closed, either due to close being
738                        // called or the Receiver handle dropping, the function returns an error.
739                        emit!(ExecChannelClosedError);
740                        break;
741                    }
742                }
743                Err(error) => {
744                    // Error is logged by `crate::codecs::Decoder`, no further
745                    // handling is needed here.
746                    if !error.can_continue() {
747                        break;
748                    }
749                }
750            }
751        }
752
753        debug!("Finished capturing {} command output.", origin);
754    }));
755}