vector/secrets/
exec.rs

1use std::collections::{HashMap, HashSet};
2
3use bytes::BytesMut;
4use futures::executor;
5use futures_util::StreamExt;
6use serde::{Deserialize, Serialize};
7use tokio::{io::AsyncWriteExt, process::Command, time};
8use tokio_util::codec;
9use vector_lib::configurable::{component::GenerateConfig, configurable_component};
10use vrl::value::Value;
11
12use crate::{config::SecretBackend, signal};
13
14/// Configuration for the command that will be `exec`ed
15#[configurable_component(secrets("exec"))]
16#[configurable(metadata(docs::enum_tag_description = "The protocol version."))]
17#[derive(Clone, Debug)]
18#[serde(rename_all = "snake_case", tag = "version")]
19pub enum ExecVersion {
20    /// Expect the command to fetch the configuration options itself.
21    V1,
22
23    /// Configuration options to the command are to be curried upon each request.
24    V1_1 {
25        /// The name of the backend. This is `type` field in the backend request.
26        backend_type: String,
27        /// The configuration to pass to the secrets executable. This is the `config` field in the
28        /// backend request. Refer to the documentation of your `backend_type `to see which options
29        /// are required to be set.
30        backend_config: Value,
31    },
32}
33
34impl ExecVersion {
35    fn new_query(&self, secrets: HashSet<String>) -> ExecQuery {
36        match &self {
37            ExecVersion::V1 => ExecQuery {
38                version: "1.0".to_string(),
39                secrets,
40                r#type: None,
41                config: None,
42            },
43            ExecVersion::V1_1 {
44                backend_type,
45                backend_config,
46                ..
47            } => ExecQuery {
48                version: "1.1".to_string(),
49                secrets,
50                r#type: Some(backend_type.clone()),
51                config: Some(backend_config.clone()),
52            },
53        }
54    }
55}
56
57impl GenerateConfig for ExecVersion {
58    fn generate_config() -> toml::Value {
59        toml::Value::try_from(ExecVersion::V1).unwrap()
60    }
61}
62
63/// Configuration for the `exec` secrets backend.
64#[configurable_component(secrets("exec"))]
65#[derive(Clone, Debug)]
66pub struct ExecBackend {
67    /// Command arguments to execute.
68    ///
69    /// The path to the script or binary must be the first argument.
70    pub command: Vec<String>,
71
72    /// The timeout, in seconds, to wait for the command to complete.
73    #[serde(default = "default_timeout_secs")]
74    pub timeout: u64,
75
76    /// Settings for the protocol between Vector and the secrets executable.
77    #[serde(default = "default_protocol_version")]
78    pub protocol: ExecVersion,
79}
80
81impl GenerateConfig for ExecBackend {
82    fn generate_config() -> toml::Value {
83        toml::Value::try_from(ExecBackend {
84            command: vec![String::from("/path/to/script")],
85            timeout: 5,
86            protocol: ExecVersion::V1,
87        })
88        .unwrap()
89    }
90}
91
92const fn default_timeout_secs() -> u64 {
93    5
94}
95
96const fn default_protocol_version() -> ExecVersion {
97    ExecVersion::V1
98}
99
100#[derive(Clone, Debug, Serialize)]
101struct ExecQuery {
102    // Fields in all versions starting from v1
103    version: String,
104    secrets: HashSet<String>,
105    // Fields added in v1.1
106    #[serde(skip_serializing_if = "Option::is_none")]
107    r#type: Option<String>,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    config: Option<Value>,
110}
111
112#[derive(Clone, Debug, Deserialize, Serialize)]
113struct ExecResponse {
114    value: Option<String>,
115    error: Option<String>,
116}
117
118impl SecretBackend for ExecBackend {
119    async fn retrieve(
120        &mut self,
121        secret_keys: HashSet<String>,
122        signal_rx: &mut signal::SignalRx,
123    ) -> crate::Result<HashMap<String, String>> {
124        let mut output = executor::block_on(async {
125            query_backend(
126                &self.command,
127                self.protocol.new_query(secret_keys.clone()),
128                self.timeout,
129                signal_rx,
130            )
131            .await
132        })?;
133        let mut secrets = HashMap::new();
134        for k in secret_keys.into_iter() {
135            if let Some(secret) = output.get_mut(&k) {
136                if let Some(e) = &secret.error {
137                    return Err(format!("secret for key '{k}' was not retrieved: {e}").into());
138                }
139                if let Some(v) = secret.value.take() {
140                    if v.is_empty() {
141                        return Err(format!("secret for key '{k}' was empty").into());
142                    }
143                    secrets.insert(k.to_string(), v);
144                } else {
145                    return Err(format!("secret for key '{k}' was empty").into());
146                }
147            } else {
148                return Err(format!("secret for key '{k}' was not retrieved").into());
149            }
150        }
151        Ok(secrets)
152    }
153}
154
155async fn query_backend(
156    cmd: &[String],
157    query: ExecQuery,
158    timeout: u64,
159    signal_rx: &mut signal::SignalRx,
160) -> crate::Result<HashMap<String, ExecResponse>> {
161    let command = &cmd[0];
162    let mut command = Command::new(command);
163
164    if cmd.len() > 1 {
165        command.args(&cmd[1..]);
166    };
167
168    command.kill_on_drop(true);
169    command.stderr(std::process::Stdio::piped());
170    command.stdin(std::process::Stdio::piped());
171    command.stdout(std::process::Stdio::piped());
172
173    let mut child = command.spawn()?;
174    let mut stdin = child.stdin.take().ok_or("unable to acquire stdin")?;
175    let mut stderr_stream = child
176        .stderr
177        .map(|s| codec::FramedRead::new(s, codec::LinesCodec::new()))
178        .ok_or("unable to acquire stderr")?;
179    let mut stdout_stream = child
180        .stdout
181        .map(|s| codec::FramedRead::new(s, codec::BytesCodec::new()))
182        .ok_or("unable to acquire stdout")?;
183
184    let query = serde_json::to_vec(&query)?;
185    tokio::spawn(async move { stdin.write_all(&query).await });
186
187    let timeout = time::sleep(time::Duration::from_secs(timeout));
188    tokio::pin!(timeout);
189    let mut output = BytesMut::new();
190    loop {
191        tokio::select! {
192            biased;
193            Ok(signal::SignalTo::Shutdown(_) | signal::SignalTo::Quit) = signal_rx.recv() => {
194                drop(command);
195                return Err("Secret retrieval was interrupted.".into());
196            }
197            Some(stderr) = stderr_stream.next() => {
198                match stderr {
199                    Ok(l) => warn!("An exec backend generated message on stderr: {}.", l),
200                    Err(e) => warn!("Error while reading from an exec backend stderr: {}.", e),
201                }
202            }
203            stdout = stdout_stream.next() => {
204                match stdout {
205                    None => break,
206                    Some(Ok(b)) => output.extend(b),
207                    Some(Err(e)) => return Err(format!("Error while reading from an exec backend stdout: {e}.").into()),
208                }
209            }
210            _ = &mut timeout => {
211                drop(command);
212                return Err("Command timed-out".into());
213            }
214        }
215    }
216
217    let response = serde_json::from_slice::<HashMap<String, ExecResponse>>(&output)?;
218    Ok(response)
219}
220
221#[cfg(test)]
222mod tests {
223    use std::{
224        collections::{HashMap, HashSet},
225        path::PathBuf,
226    };
227
228    use rstest::rstest;
229    use tokio::sync::broadcast;
230    use vrl::value;
231
232    use crate::{
233        config::SecretBackend,
234        secrets::exec::{ExecBackend, ExecVersion},
235    };
236
237    fn make_test_backend(protocol: ExecVersion) -> ExecBackend {
238        let command_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
239            .join("tests/behavior/secrets/mock_secrets_exec.py");
240        ExecBackend {
241            command: ["python", command_path.to_str().unwrap()]
242                .map(String::from)
243                .to_vec(),
244            timeout: 5,
245            protocol,
246        }
247    }
248
249    #[tokio::test(flavor = "multi_thread")]
250    #[rstest(
251        protocol,
252        case(ExecVersion::V1),
253        case(ExecVersion::V1_1 {
254            backend_type: "file.json".to_string(),
255            backend_config: value!({"file_path": "/abc.json"}),
256        })
257    )]
258    async fn test_exec_backend(protocol: ExecVersion) {
259        let mut backend = make_test_backend(protocol);
260        let (_tx, mut rx) = broadcast::channel(1);
261        // These fake secrets are statically contained in mock_secrets_exec.py
262        let fake_secret_values: HashMap<String, String> = [
263            ("fake_secret_1", "123456"),
264            ("fake_secret_2", "123457"),
265            ("fake_secret_3", "123458"),
266            ("fake_secret_4", "123459"),
267            ("fake_secret_5", "123460"),
268        ]
269        .into_iter()
270        .map(|(k, v)| (k.to_string(), v.to_string()))
271        .collect();
272        // Calling the mock_secrets_exec.py program with the expected secret keys should provide
273        // the values expected above in `fake_secret_values`
274        let fetched_keys = backend
275            .retrieve(fake_secret_values.keys().cloned().collect(), &mut rx)
276            .await
277            .unwrap();
278        // Assert response is as expected
279        assert_eq!(fetched_keys.len(), 5);
280        for (fake_secret_key, fake_secret_value) in fake_secret_values {
281            assert_eq!(fetched_keys.get(&fake_secret_key), Some(&fake_secret_value));
282        }
283    }
284
285    #[tokio::test(flavor = "multi_thread")]
286    async fn test_exec_backend_missing_secrets() {
287        let mut backend = make_test_backend(ExecVersion::V1);
288        let (_tx, mut rx) = broadcast::channel(1);
289        let query_secrets: HashSet<String> =
290            ["fake_secret_900"].into_iter().map(String::from).collect();
291        let fetched_keys = backend.retrieve(query_secrets.clone(), &mut rx).await;
292        assert_eq!(
293            format!("{}", fetched_keys.unwrap_err()),
294            "secret for key 'fake_secret_900' was not retrieved: backend does not provide secret key"
295        );
296    }
297}