vector/secrets/
exec.rs

1use std::collections::{HashMap, HashSet};
2
3use bytes::BytesMut;
4use futures::executor;
5use futures_util::StreamExt;
6use serde::{Deserialize, Serialize};
7use tokio::{io::AsyncWriteExt, process::Command, time};
8use tokio_util::codec;
9use vector_lib::configurable::{component::GenerateConfig, configurable_component};
10
11use crate::{config::SecretBackend, signal};
12
13/// Configuration for the `exec` secrets backend.
14#[configurable_component(secrets("exec"))]
15#[derive(Clone, Debug)]
16pub struct ExecBackend {
17    /// Command arguments to execute.
18    ///
19    /// The path to the script or binary must be the first argument.
20    pub command: Vec<String>,
21
22    /// The timeout, in seconds, to wait for the command to complete.
23    #[serde(default = "default_timeout_secs")]
24    pub timeout: u64,
25}
26
27impl GenerateConfig for ExecBackend {
28    fn generate_config() -> toml::Value {
29        toml::Value::try_from(ExecBackend {
30            command: vec![String::from("/path/to/script")],
31            timeout: 5,
32        })
33        .unwrap()
34    }
35}
36
37const fn default_timeout_secs() -> u64 {
38    5
39}
40
41#[derive(Clone, Debug, Deserialize, Serialize)]
42struct ExecQuery {
43    version: String,
44    secrets: HashSet<String>,
45}
46
47fn new_query(secrets: HashSet<String>) -> ExecQuery {
48    ExecQuery {
49        version: "1.0".to_string(),
50        secrets,
51    }
52}
53
54#[derive(Clone, Debug, Deserialize, Serialize)]
55struct ExecResponse {
56    value: Option<String>,
57    error: Option<String>,
58}
59
60impl SecretBackend for ExecBackend {
61    async fn retrieve(
62        &mut self,
63        secret_keys: HashSet<String>,
64        signal_rx: &mut signal::SignalRx,
65    ) -> crate::Result<HashMap<String, String>> {
66        let mut output = executor::block_on(async {
67            query_backend(
68                &self.command,
69                new_query(secret_keys.clone()),
70                self.timeout,
71                signal_rx,
72            )
73            .await
74        })?;
75        let mut secrets = HashMap::new();
76        for k in secret_keys.into_iter() {
77            if let Some(secret) = output.get_mut(&k) {
78                if let Some(e) = &secret.error {
79                    return Err(format!("secret for key '{k}' was not retrieved: {e}").into());
80                }
81                if let Some(v) = secret.value.take() {
82                    if v.is_empty() {
83                        return Err(format!("secret for key '{k}' was empty").into());
84                    }
85                    secrets.insert(k.to_string(), v);
86                } else {
87                    return Err(format!("secret for key '{k}' was empty").into());
88                }
89            } else {
90                return Err(format!("secret for key '{k}' was not retrieved").into());
91            }
92        }
93        Ok(secrets)
94    }
95}
96
97async fn query_backend(
98    cmd: &[String],
99    query: ExecQuery,
100    timeout: u64,
101    signal_rx: &mut signal::SignalRx,
102) -> crate::Result<HashMap<String, ExecResponse>> {
103    let command = &cmd[0];
104    let mut command = Command::new(command);
105
106    if cmd.len() > 1 {
107        command.args(&cmd[1..]);
108    };
109
110    command.kill_on_drop(true);
111    command.stderr(std::process::Stdio::piped());
112    command.stdin(std::process::Stdio::piped());
113    command.stdout(std::process::Stdio::piped());
114
115    let mut child = command.spawn()?;
116    let mut stdin = child.stdin.take().ok_or("unable to acquire stdin")?;
117    let mut stderr_stream = child
118        .stderr
119        .map(|s| codec::FramedRead::new(s, codec::LinesCodec::new()))
120        .ok_or("unable to acquire stderr")?;
121    let mut stdout_stream = child
122        .stdout
123        .map(|s| codec::FramedRead::new(s, codec::BytesCodec::new()))
124        .ok_or("unable to acquire stdout")?;
125
126    let query = serde_json::to_vec(&query)?;
127    tokio::spawn(async move { stdin.write_all(&query).await });
128
129    let timeout = time::sleep(time::Duration::from_secs(timeout));
130    tokio::pin!(timeout);
131    let mut output = BytesMut::new();
132    loop {
133        tokio::select! {
134            biased;
135            Ok(signal::SignalTo::Shutdown(_) | signal::SignalTo::Quit) = signal_rx.recv() => {
136                drop(command);
137                return Err("Secret retrieval was interrupted.".into());
138            }
139            Some(stderr) = stderr_stream.next() => {
140                match stderr {
141                    Ok(l) => warn!("An exec backend generated message on stderr: {}.", l),
142                    Err(e) => warn!("Error while reading from an exec backend stderr: {}.", e),
143                }
144            }
145            stdout = stdout_stream.next() => {
146                match stdout {
147                    None => break,
148                    Some(Ok(b)) => output.extend(b),
149                    Some(Err(e)) => return Err(format!("Error while reading from an exec backend stdout: {e}.").into()),
150                }
151            }
152            _ = &mut timeout => {
153                drop(command);
154                return Err("Command timed-out".into());
155            }
156        }
157    }
158
159    let response = serde_json::from_slice::<HashMap<String, ExecResponse>>(&output)?;
160    Ok(response)
161}