vector/sources/exec/
mod.rs

1use std::{collections::HashMap, io::Error, path::PathBuf, process::ExitStatus};
2
3use chrono::Utc;
4use futures::StreamExt;
5use smallvec::SmallVec;
6use snafu::Snafu;
7use tokio::{
8    io::{AsyncRead, BufReader},
9    process::Command,
10    sync::mpsc::{Sender, channel},
11    time::{self, Duration, Instant, sleep},
12};
13use tokio_stream::wrappers::IntervalStream;
14use vector_lib::{
15    EstimatedJsonEncodedSizeOf,
16    codecs::{
17        Decoder, DecoderFramedRead, DecodingConfig, StreamDecodingError,
18        decoding::{DeserializerConfig, FramingConfig},
19    },
20    config::{LegacyKey, LogNamespace, log_schema},
21    configurable::configurable_component,
22    internal_event::{ByteSize, BytesReceived, InternalEventHandle as _, Protocol},
23    lookup::{owned_value_path, path},
24};
25use vrl::{path::OwnedValuePath, value::Kind};
26
27use crate::{
28    SourceSender,
29    config::{SourceConfig, SourceContext, SourceOutput},
30    event::Event,
31    internal_events::{
32        ExecChannelClosedError, ExecCommandExecuted, ExecEventsReceived, ExecFailedError,
33        ExecFailedToSignalChild, ExecFailedToSignalChildError, ExecTimeoutError, StreamClosedError,
34    },
35    serde::default_decoding,
36    shutdown::ShutdownSignal,
37};
38
39#[cfg(test)]
40mod tests;
41
42/// Configuration for the `exec` source.
43#[configurable_component(source("exec", "Collect output from a process running on the host."))]
44#[derive(Clone, Debug)]
45#[serde(deny_unknown_fields)]
46pub struct ExecConfig {
47    #[configurable(derived)]
48    pub mode: Mode,
49
50    #[configurable(derived)]
51    pub scheduled: Option<ScheduledConfig>,
52
53    #[configurable(derived)]
54    pub streaming: Option<StreamingConfig>,
55
56    /// The command to run, plus any arguments required.
57    #[configurable(metadata(docs::examples = "echo", docs::examples = "Hello World!"))]
58    pub command: Vec<String>,
59
60    /// Custom environment variables to set or update when running the command.
61    /// If a variable name already exists in the environment, its value is replaced.
62    #[serde(default)]
63    #[configurable(metadata(docs::additional_props_description = "An environment variable."))]
64    #[configurable(metadata(docs::examples = "environment_examples()"))]
65    pub environment: Option<HashMap<String, String>>,
66
67    /// Whether or not to clear the environment before setting custom environment variables.
68    #[serde(default = "default_clear_environment")]
69    pub clear_environment: bool,
70
71    /// The directory in which to run the command.
72    pub working_directory: Option<PathBuf>,
73
74    /// Whether or not the output from stderr should be included when generating events.
75    #[serde(default = "default_include_stderr")]
76    pub include_stderr: bool,
77
78    /// The maximum buffer size allowed before a log event is generated.
79    #[serde(default = "default_maximum_buffer_size")]
80    pub maximum_buffer_size_bytes: usize,
81
82    #[configurable(derived)]
83    framing: Option<FramingConfig>,
84
85    #[configurable(derived)]
86    #[serde(default = "default_decoding")]
87    decoding: DeserializerConfig,
88
89    /// The namespace to use for logs. This overrides the global setting.
90    #[configurable(metadata(docs::hidden))]
91    #[serde(default)]
92    log_namespace: Option<bool>,
93}
94
95/// Mode of operation for running the command.
96#[configurable_component]
97#[derive(Clone, Copy, Debug)]
98#[serde(rename_all = "snake_case", deny_unknown_fields)]
99pub enum Mode {
100    /// The command is run on a schedule.
101    Scheduled,
102
103    /// The command is run until it exits, potentially being restarted.
104    Streaming,
105}
106
107/// Configuration options for scheduled commands.
108#[configurable_component]
109#[derive(Clone, Debug)]
110#[serde(deny_unknown_fields)]
111pub struct ScheduledConfig {
112    /// The interval, in seconds, between scheduled command runs.
113    ///
114    /// If the command takes longer than `exec_interval_secs` to run, it is killed.
115    #[serde(default = "default_exec_interval_secs")]
116    exec_interval_secs: u64,
117}
118
119/// Configuration options for streaming commands.
120#[configurable_component]
121#[derive(Clone, Debug)]
122#[serde(deny_unknown_fields)]
123pub struct StreamingConfig {
124    /// Whether or not the command should be rerun if the command exits.
125    #[serde(default = "default_respawn_on_exit")]
126    respawn_on_exit: bool,
127
128    /// The amount of time, in seconds, before rerunning a streaming command that exited.
129    #[serde(default = "default_respawn_interval_secs")]
130    #[configurable(metadata(docs::human_name = "Respawn Interval"))]
131    respawn_interval_secs: u64,
132}
133
134#[derive(Debug, PartialEq, Eq, Snafu)]
135pub enum ExecConfigError {
136    #[snafu(display("A non-empty list for command must be provided"))]
137    CommandEmpty,
138    #[snafu(display("The maximum buffer size must be greater than zero"))]
139    ZeroBuffer,
140}
141
142impl Default for ExecConfig {
143    fn default() -> Self {
144        ExecConfig {
145            mode: Mode::Scheduled,
146            scheduled: Some(ScheduledConfig {
147                exec_interval_secs: default_exec_interval_secs(),
148            }),
149            streaming: None,
150            command: vec!["echo".to_owned(), "Hello World!".to_owned()],
151            environment: None,
152            clear_environment: default_clear_environment(),
153            working_directory: None,
154            include_stderr: default_include_stderr(),
155            maximum_buffer_size_bytes: default_maximum_buffer_size(),
156            framing: None,
157            decoding: default_decoding(),
158            log_namespace: None,
159        }
160    }
161}
162
163const fn default_maximum_buffer_size() -> usize {
164    // 1MB
165    1000000
166}
167
168const fn default_exec_interval_secs() -> u64 {
169    60
170}
171
172const fn default_respawn_interval_secs() -> u64 {
173    5
174}
175
176const fn default_respawn_on_exit() -> bool {
177    true
178}
179
180const fn default_clear_environment() -> bool {
181    false
182}
183
184const fn default_include_stderr() -> bool {
185    true
186}
187
188fn environment_examples() -> HashMap<String, String> {
189    HashMap::<_, _>::from_iter([
190        ("LANG".to_owned(), "es_ES.UTF-8".to_owned()),
191        ("TZ".to_owned(), "Etc/UTC".to_owned()),
192        ("PATH".to_owned(), "/bin:/usr/bin:/usr/local/bin".to_owned()),
193    ])
194}
195
196fn get_hostname() -> Option<String> {
197    crate::get_hostname().ok()
198}
199
200const STDOUT: &str = "stdout";
201const STDERR: &str = "stderr";
202const STREAM_KEY: &str = "stream";
203const PID_KEY: &str = "pid";
204const COMMAND_KEY: &str = "command";
205
206impl_generate_config_from_default!(ExecConfig);
207
208impl ExecConfig {
209    const fn validate(&self) -> Result<(), ExecConfigError> {
210        if self.command.is_empty() {
211            Err(ExecConfigError::CommandEmpty)
212        } else if self.maximum_buffer_size_bytes == 0 {
213            Err(ExecConfigError::ZeroBuffer)
214        } else {
215            Ok(())
216        }
217    }
218
219    fn command_line(&self) -> String {
220        self.command.join(" ")
221    }
222
223    const fn exec_interval_secs_or_default(&self) -> u64 {
224        match &self.scheduled {
225            None => default_exec_interval_secs(),
226            Some(config) => config.exec_interval_secs,
227        }
228    }
229
230    const fn respawn_on_exit_or_default(&self) -> bool {
231        match &self.streaming {
232            None => default_respawn_on_exit(),
233            Some(config) => config.respawn_on_exit,
234        }
235    }
236
237    const fn respawn_interval_secs_or_default(&self) -> u64 {
238        match &self.streaming {
239            None => default_respawn_interval_secs(),
240            Some(config) => config.respawn_interval_secs,
241        }
242    }
243}
244
245#[async_trait::async_trait]
246#[typetag::serde(name = "exec")]
247impl SourceConfig for ExecConfig {
248    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
249        self.validate()?;
250        let hostname = get_hostname();
251        let log_namespace = cx.log_namespace(self.log_namespace);
252
253        let framing = self
254            .framing
255            .clone()
256            .unwrap_or_else(|| self.decoding.default_stream_framing());
257        let decoder = DecodingConfig::new(framing, self.decoding.clone(), log_namespace).build()?;
258
259        match &self.mode {
260            Mode::Scheduled => {
261                let exec_interval_secs = self.exec_interval_secs_or_default();
262
263                Ok(Box::pin(run_scheduled(
264                    self.clone(),
265                    hostname,
266                    exec_interval_secs,
267                    decoder,
268                    cx.shutdown,
269                    cx.out,
270                    log_namespace,
271                )))
272            }
273            Mode::Streaming => {
274                let respawn_on_exit = self.respawn_on_exit_or_default();
275                let respawn_interval_secs = self.respawn_interval_secs_or_default();
276
277                Ok(Box::pin(run_streaming(
278                    self.clone(),
279                    hostname,
280                    respawn_on_exit,
281                    respawn_interval_secs,
282                    decoder,
283                    cx.shutdown,
284                    cx.out,
285                    log_namespace,
286                )))
287            }
288        }
289    }
290
291    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
292        let log_namespace = global_log_namespace.merge(Some(self.log_namespace.unwrap_or(false)));
293
294        let schema_definition = self
295            .decoding
296            .schema_definition(log_namespace)
297            .with_standard_vector_source_metadata()
298            .with_source_metadata(
299                Self::NAME,
300                Some(LegacyKey::InsertIfEmpty(
301                    log_schema()
302                        .host_key()
303                        .map_or(OwnedValuePath::root(), |key| key.clone()),
304                )),
305                &owned_value_path!("host"),
306                Kind::bytes().or_undefined(),
307                Some("host"),
308            )
309            .with_source_metadata(
310                Self::NAME,
311                Some(LegacyKey::InsertIfEmpty(owned_value_path!(STREAM_KEY))),
312                &owned_value_path!(STREAM_KEY),
313                Kind::bytes().or_undefined(),
314                None,
315            )
316            .with_source_metadata(
317                Self::NAME,
318                Some(LegacyKey::InsertIfEmpty(owned_value_path!(PID_KEY))),
319                &owned_value_path!(PID_KEY),
320                Kind::integer().or_undefined(),
321                None,
322            )
323            .with_source_metadata(
324                Self::NAME,
325                Some(LegacyKey::InsertIfEmpty(owned_value_path!(COMMAND_KEY))),
326                &owned_value_path!(COMMAND_KEY),
327                Kind::bytes(),
328                None,
329            );
330
331        vec![SourceOutput::new_maybe_logs(
332            self.decoding.output_type(),
333            schema_definition,
334        )]
335    }
336
337    fn can_acknowledge(&self) -> bool {
338        false
339    }
340}
341
342async fn run_scheduled(
343    config: ExecConfig,
344    hostname: Option<String>,
345    exec_interval_secs: u64,
346    decoder: Decoder,
347    shutdown: ShutdownSignal,
348    out: SourceSender,
349    log_namespace: LogNamespace,
350) -> Result<(), ()> {
351    debug!("Starting scheduled exec runs.");
352    let schedule = Duration::from_secs(exec_interval_secs);
353
354    let mut interval = IntervalStream::new(time::interval(schedule)).take_until(shutdown.clone());
355
356    while interval.next().await.is_some() {
357        // Wait for our task to finish, wrapping it in a timeout
358        let timeout = tokio::time::timeout(
359            schedule,
360            run_command(
361                config.clone(),
362                hostname.clone(),
363                decoder.clone(),
364                shutdown.clone(),
365                out.clone(),
366                log_namespace,
367            ),
368        )
369        .await;
370
371        match timeout {
372            Ok(output) => {
373                if let Err(command_error) = output {
374                    emit!(ExecFailedError {
375                        command: config.command_line().as_str(),
376                        error: command_error,
377                    });
378                }
379            }
380            Err(error) => {
381                emit!(ExecTimeoutError {
382                    command: config.command_line().as_str(),
383                    elapsed_seconds: schedule.as_secs(),
384                    error,
385                });
386            }
387        }
388    }
389
390    debug!("Finished scheduled exec runs.");
391    Ok(())
392}
393
394#[allow(clippy::too_many_arguments)]
395async fn run_streaming(
396    config: ExecConfig,
397    hostname: Option<String>,
398    respawn_on_exit: bool,
399    respawn_interval_secs: u64,
400    decoder: Decoder,
401    mut shutdown: ShutdownSignal,
402    out: SourceSender,
403    log_namespace: LogNamespace,
404) -> Result<(), ()> {
405    if respawn_on_exit {
406        let duration = Duration::from_secs(respawn_interval_secs);
407
408        // Continue to loop while not shutdown
409        loop {
410            let output = run_command(
411                config.clone(),
412                hostname.clone(),
413                decoder.clone(),
414                shutdown.clone(),
415                out.clone(),
416                log_namespace,
417            )
418            .await;
419
420            // handle command finished
421            if let Err(command_error) = output {
422                emit!(ExecFailedError {
423                    command: config.command_line().as_str(),
424                    error: command_error,
425                });
426            }
427
428            tokio::select! {
429                _ = &mut shutdown => break, // will break early if a shutdown is started
430                _ = sleep(duration) => debug!("Restarting streaming process."),
431            }
432        }
433    } else {
434        let output = run_command(
435            config.clone(),
436            hostname,
437            decoder,
438            shutdown,
439            out,
440            log_namespace,
441        )
442        .await;
443
444        if let Err(command_error) = output {
445            emit!(ExecFailedError {
446                command: config.command_line().as_str(),
447                error: command_error,
448            });
449        }
450    }
451
452    Ok(())
453}
454
455async fn run_command(
456    config: ExecConfig,
457    hostname: Option<String>,
458    decoder: Decoder,
459    mut shutdown: ShutdownSignal,
460    mut out: SourceSender,
461    log_namespace: LogNamespace,
462) -> Result<Option<ExitStatus>, Error> {
463    debug!("Starting command run.");
464    let mut command = build_command(&config);
465
466    // Mark the start time just before spawning the process as
467    // this seems to be the best approximation of exec duration
468    let start = Instant::now();
469
470    let mut child = command.spawn()?;
471
472    // Set up communication channels
473    let (sender, mut receiver) = channel(1024);
474
475    // Optionally include stderr
476    if config.include_stderr {
477        let stderr = child
478            .stderr
479            .take()
480            .ok_or_else(|| Error::other("Unable to take stderr of spawned process"))?;
481
482        // Create stderr async reader
483        let stderr_reader = BufReader::new(stderr);
484
485        spawn_reader_thread(stderr_reader, decoder.clone(), STDERR, sender.clone());
486    }
487
488    let stdout = child
489        .stdout
490        .take()
491        .ok_or_else(|| Error::other("Unable to take stdout of spawned process"))?;
492
493    // Create stdout async reader
494    let stdout_reader = BufReader::new(stdout);
495
496    let pid = child.id();
497
498    spawn_reader_thread(stdout_reader, decoder.clone(), STDOUT, sender);
499
500    let bytes_received = register!(BytesReceived::from(Protocol::NONE));
501
502    'outer: loop {
503        tokio::select! {
504            _ = &mut shutdown => {
505                if !shutdown_child(&mut child, &command).await {
506                        break 'outer; // couldn't signal, exit early
507                }
508            }
509            v = receiver.recv() => {
510                match v {
511                    None => break 'outer,
512                    Some(((mut events, byte_size), stream)) => {
513                        bytes_received.emit(ByteSize(byte_size));
514
515                        let count = events.len();
516                        emit!(ExecEventsReceived {
517                            count,
518                            command: config.command_line().as_str(),
519                            byte_size: events.estimated_json_encoded_size_of(),
520                        });
521
522                        for event in &mut events {
523                            handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
524                        }
525                        if (out.send_batch(events).await).is_err() {
526                            emit!(StreamClosedError { count });
527                            break;
528                        }
529                    },
530                }
531            }
532        }
533    }
534
535    let elapsed = start.elapsed();
536
537    let result = match child.try_wait() {
538        Ok(Some(exit_status)) => {
539            handle_exit_status(&config, exit_status.code(), elapsed);
540            Ok(Some(exit_status))
541        }
542        Ok(None) => {
543            handle_exit_status(&config, None, elapsed);
544            Ok(None)
545        }
546        Err(error) => {
547            error!(message = "Unable to obtain exit status.", %error);
548
549            handle_exit_status(&config, None, elapsed);
550            Ok(None)
551        }
552    };
553
554    debug!("Finished command run.");
555
556    result
557}
558
559fn handle_exit_status(config: &ExecConfig, exit_status: Option<i32>, exec_duration: Duration) {
560    emit!(ExecCommandExecuted {
561        command: config.command_line().as_str(),
562        exit_status,
563        exec_duration,
564    });
565}
566
567#[cfg(unix)]
568async fn shutdown_child(
569    child: &mut tokio::process::Child,
570    command: &tokio::process::Command,
571) -> bool {
572    match child.id().map(i32::try_from) {
573        Some(Ok(pid)) => {
574            // shutting down, send a SIGTERM to the child
575            if let Err(error) = nix::sys::signal::kill(
576                nix::unistd::Pid::from_raw(pid),
577                nix::sys::signal::Signal::SIGTERM,
578            ) {
579                emit!(ExecFailedToSignalChildError {
580                    command,
581                    error: ExecFailedToSignalChild::SignalError(error)
582                });
583                false
584            } else {
585                true
586            }
587        }
588        Some(Err(err)) => {
589            emit!(ExecFailedToSignalChildError {
590                command,
591                error: ExecFailedToSignalChild::FailedToMarshalPid(err)
592            });
593            false
594        }
595        None => {
596            emit!(ExecFailedToSignalChildError {
597                command,
598                error: ExecFailedToSignalChild::NoPid
599            });
600            false
601        }
602    }
603}
604
605#[cfg(windows)]
606async fn shutdown_child(
607    child: &mut tokio::process::Child,
608    command: &tokio::process::Command,
609) -> bool {
610    // TODO Graceful shutdown of Windows processes
611    match child.kill().await {
612        Ok(()) => true,
613        Err(err) => {
614            emit!(ExecFailedToSignalChildError {
615                command: &command,
616                error: ExecFailedToSignalChild::IoError(err)
617            });
618            false
619        }
620    }
621}
622
623fn build_command(config: &ExecConfig) -> Command {
624    let command = &config.command[0];
625
626    let mut command = Command::new(command);
627
628    if config.command.len() > 1 {
629        command.args(&config.command[1..]);
630    };
631
632    command.kill_on_drop(true);
633
634    // Clear environment variables if needed
635    if config.clear_environment {
636        command.env_clear();
637    }
638
639    // Configure environment variables if needed
640    if let Some(envs) = &config.environment {
641        command.envs(envs);
642    }
643
644    // Explicitly set the current dir if needed
645    if let Some(current_dir) = &config.working_directory {
646        command.current_dir(current_dir);
647    }
648
649    // Pipe our stdout to the process
650    command.stdout(std::process::Stdio::piped());
651
652    // Pipe stderr to the process if needed
653    if config.include_stderr {
654        command.stderr(std::process::Stdio::piped());
655    } else {
656        command.stderr(std::process::Stdio::null());
657    }
658
659    // Stdin is not needed
660    command.stdin(std::process::Stdio::null());
661
662    command
663}
664
665fn handle_event(
666    config: &ExecConfig,
667    hostname: &Option<String>,
668    data_stream: &Option<String>,
669    pid: Option<u32>,
670    event: &mut Event,
671    log_namespace: LogNamespace,
672) {
673    if let Event::Log(log) = event {
674        log_namespace.insert_standard_vector_source_metadata(log, ExecConfig::NAME, Utc::now());
675
676        // Add data stream of stdin or stderr (if needed)
677        if let Some(data_stream) = data_stream {
678            log_namespace.insert_source_metadata(
679                ExecConfig::NAME,
680                log,
681                Some(LegacyKey::InsertIfEmpty(path!(STREAM_KEY))),
682                path!(STREAM_KEY),
683                data_stream.clone(),
684            );
685        }
686
687        // Add pid (if needed)
688        if let Some(pid) = pid {
689            log_namespace.insert_source_metadata(
690                ExecConfig::NAME,
691                log,
692                Some(LegacyKey::InsertIfEmpty(path!(PID_KEY))),
693                path!(PID_KEY),
694                pid as i64,
695            );
696        }
697
698        // Add hostname (if needed)
699        if let Some(hostname) = hostname {
700            log_namespace.insert_source_metadata(
701                ExecConfig::NAME,
702                log,
703                log_schema().host_key().map(LegacyKey::InsertIfEmpty),
704                path!("host"),
705                hostname.clone(),
706            );
707        }
708
709        // Add command
710        log_namespace.insert_source_metadata(
711            ExecConfig::NAME,
712            log,
713            Some(LegacyKey::InsertIfEmpty(path!(COMMAND_KEY))),
714            path!(COMMAND_KEY),
715            config.command.clone(),
716        );
717    }
718}
719
720fn spawn_reader_thread<R: 'static + AsyncRead + Unpin + std::marker::Send>(
721    reader: BufReader<R>,
722    decoder: Decoder,
723    origin: &'static str,
724    sender: Sender<((SmallVec<[Event; 1]>, usize), &'static str)>,
725) {
726    // Start the green background thread for collecting
727    drop(tokio::spawn(async move {
728        debug!("Start capturing {} command output.", origin);
729
730        let mut stream = DecoderFramedRead::new(reader, decoder);
731        while let Some(result) = stream.next().await {
732            match result {
733                Ok(next) => {
734                    if sender.send((next, origin)).await.is_err() {
735                        // If the receive half of the channel is closed, either due to close being
736                        // called or the Receiver handle dropping, the function returns an error.
737                        emit!(ExecChannelClosedError);
738                        break;
739                    }
740                }
741                Err(error) => {
742                    // Error is logged by `vector_lib::codecs::Decoder`, no further
743                    // handling is needed here.
744                    if !error.can_continue() {
745                        break;
746                    }
747                }
748            }
749        }
750
751        debug!("Finished capturing {} command output.", origin);
752    }));
753}