1use std::collections::{HashMap, HashSet};
2
3use bytes::BytesMut;
4use futures::executor;
5use futures_util::StreamExt;
6use serde::{Deserialize, Serialize};
7use tokio::{io::AsyncWriteExt, process::Command, time};
8use tokio_util::codec;
9use vector_lib::configurable::{component::GenerateConfig, configurable_component};
10use vrl::value::Value;
11
12use crate::{config::SecretBackend, signal};
13
14#[configurable_component(secrets("exec"))]
16#[configurable(metadata(docs::enum_tag_description = "The protocol version."))]
17#[derive(Clone, Debug)]
18#[serde(rename_all = "snake_case", tag = "version")]
19pub enum ExecVersion {
20 V1,
22
23 V1_1 {
25 backend_type: String,
27 backend_config: Value,
31 },
32}
33
34impl ExecVersion {
35 fn new_query(&self, secrets: HashSet<String>) -> ExecQuery {
36 match &self {
37 ExecVersion::V1 => ExecQuery {
38 version: "1.0".to_string(),
39 secrets,
40 r#type: None,
41 config: None,
42 },
43 ExecVersion::V1_1 {
44 backend_type,
45 backend_config,
46 ..
47 } => ExecQuery {
48 version: "1.1".to_string(),
49 secrets,
50 r#type: Some(backend_type.clone()),
51 config: Some(backend_config.clone()),
52 },
53 }
54 }
55}
56
57impl GenerateConfig for ExecVersion {
58 fn generate_config() -> toml::Value {
59 toml::Value::try_from(ExecVersion::V1).unwrap()
60 }
61}
62
63#[configurable_component(secrets("exec"))]
65#[derive(Clone, Debug)]
66pub struct ExecBackend {
67 pub command: Vec<String>,
71
72 #[serde(default = "default_timeout_secs")]
74 pub timeout: u64,
75
76 #[serde(default = "default_protocol_version")]
78 pub protocol: ExecVersion,
79}
80
81impl GenerateConfig for ExecBackend {
82 fn generate_config() -> toml::Value {
83 toml::Value::try_from(ExecBackend {
84 command: vec![String::from("/path/to/script")],
85 timeout: 5,
86 protocol: ExecVersion::V1,
87 })
88 .unwrap()
89 }
90}
91
92const fn default_timeout_secs() -> u64 {
93 5
94}
95
96const fn default_protocol_version() -> ExecVersion {
97 ExecVersion::V1
98}
99
100#[derive(Clone, Debug, Serialize)]
101struct ExecQuery {
102 version: String,
104 secrets: HashSet<String>,
105 #[serde(skip_serializing_if = "Option::is_none")]
107 r#type: Option<String>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 config: Option<Value>,
110}
111
112#[derive(Clone, Debug, Deserialize, Serialize)]
113struct ExecResponse {
114 value: Option<String>,
115 error: Option<String>,
116}
117
118impl SecretBackend for ExecBackend {
119 async fn retrieve(
120 &mut self,
121 secret_keys: HashSet<String>,
122 signal_rx: &mut signal::SignalRx,
123 ) -> crate::Result<HashMap<String, String>> {
124 let mut output = executor::block_on(async {
125 query_backend(
126 &self.command,
127 self.protocol.new_query(secret_keys.clone()),
128 self.timeout,
129 signal_rx,
130 )
131 .await
132 })?;
133 let mut secrets = HashMap::new();
134 for k in secret_keys.into_iter() {
135 if let Some(secret) = output.get_mut(&k) {
136 if let Some(e) = &secret.error {
137 return Err(format!("secret for key '{k}' was not retrieved: {e}").into());
138 }
139 if let Some(v) = secret.value.take() {
140 if v.is_empty() {
141 return Err(format!("secret for key '{k}' was empty").into());
142 }
143 secrets.insert(k.to_string(), v);
144 } else {
145 return Err(format!("secret for key '{k}' was empty").into());
146 }
147 } else {
148 return Err(format!("secret for key '{k}' was not retrieved").into());
149 }
150 }
151 Ok(secrets)
152 }
153}
154
155async fn query_backend(
156 cmd: &[String],
157 query: ExecQuery,
158 timeout: u64,
159 signal_rx: &mut signal::SignalRx,
160) -> crate::Result<HashMap<String, ExecResponse>> {
161 let command = &cmd[0];
162 let mut command = Command::new(command);
163
164 if cmd.len() > 1 {
165 command.args(&cmd[1..]);
166 };
167
168 command.kill_on_drop(true);
169 command.stderr(std::process::Stdio::piped());
170 command.stdin(std::process::Stdio::piped());
171 command.stdout(std::process::Stdio::piped());
172
173 let mut child = command.spawn()?;
174 let mut stdin = child.stdin.take().ok_or("unable to acquire stdin")?;
175 let mut stderr_stream = child
176 .stderr
177 .map(|s| codec::FramedRead::new(s, codec::LinesCodec::new()))
178 .ok_or("unable to acquire stderr")?;
179 let mut stdout_stream = child
180 .stdout
181 .map(|s| codec::FramedRead::new(s, codec::BytesCodec::new()))
182 .ok_or("unable to acquire stdout")?;
183
184 let query = serde_json::to_vec(&query)?;
185 tokio::spawn(async move { stdin.write_all(&query).await });
186
187 let timeout = time::sleep(time::Duration::from_secs(timeout));
188 tokio::pin!(timeout);
189 let mut output = BytesMut::new();
190 loop {
191 tokio::select! {
192 biased;
193 Ok(signal::SignalTo::Shutdown(_) | signal::SignalTo::Quit) = signal_rx.recv() => {
194 drop(command);
195 return Err("Secret retrieval was interrupted.".into());
196 }
197 Some(stderr) = stderr_stream.next() => {
198 match stderr {
199 Ok(l) => warn!("An exec backend generated message on stderr: {}.", l),
200 Err(e) => warn!("Error while reading from an exec backend stderr: {}.", e),
201 }
202 }
203 stdout = stdout_stream.next() => {
204 match stdout {
205 None => break,
206 Some(Ok(b)) => output.extend(b),
207 Some(Err(e)) => return Err(format!("Error while reading from an exec backend stdout: {e}.").into()),
208 }
209 }
210 _ = &mut timeout => {
211 drop(command);
212 return Err("Command timed-out".into());
213 }
214 }
215 }
216
217 let response = serde_json::from_slice::<HashMap<String, ExecResponse>>(&output)?;
218 Ok(response)
219}
220
221#[cfg(test)]
222mod tests {
223 use std::{
224 collections::{HashMap, HashSet},
225 path::PathBuf,
226 };
227
228 use rstest::rstest;
229 use tokio::sync::broadcast;
230 use vrl::value;
231
232 use crate::{
233 config::SecretBackend,
234 secrets::exec::{ExecBackend, ExecVersion},
235 };
236
237 fn make_test_backend(protocol: ExecVersion) -> ExecBackend {
238 let command_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
239 .join("tests/behavior/secrets/mock_secrets_exec.py");
240 ExecBackend {
241 command: ["python", command_path.to_str().unwrap()]
242 .map(String::from)
243 .to_vec(),
244 timeout: 5,
245 protocol,
246 }
247 }
248
249 #[tokio::test(flavor = "multi_thread")]
250 #[rstest(
251 protocol,
252 case(ExecVersion::V1),
253 case(ExecVersion::V1_1 {
254 backend_type: "file.json".to_string(),
255 backend_config: value!({"file_path": "/abc.json"}),
256 })
257 )]
258 async fn test_exec_backend(protocol: ExecVersion) {
259 let mut backend = make_test_backend(protocol);
260 let (_tx, mut rx) = broadcast::channel(1);
261 let fake_secret_values: HashMap<String, String> = [
263 ("fake_secret_1", "123456"),
264 ("fake_secret_2", "123457"),
265 ("fake_secret_3", "123458"),
266 ("fake_secret_4", "123459"),
267 ("fake_secret_5", "123460"),
268 ]
269 .into_iter()
270 .map(|(k, v)| (k.to_string(), v.to_string()))
271 .collect();
272 let fetched_keys = backend
275 .retrieve(fake_secret_values.keys().cloned().collect(), &mut rx)
276 .await
277 .unwrap();
278 assert_eq!(fetched_keys.len(), 5);
280 for (fake_secret_key, fake_secret_value) in fake_secret_values {
281 assert_eq!(fetched_keys.get(&fake_secret_key), Some(&fake_secret_value));
282 }
283 }
284
285 #[tokio::test(flavor = "multi_thread")]
286 async fn test_exec_backend_missing_secrets() {
287 let mut backend = make_test_backend(ExecVersion::V1);
288 let (_tx, mut rx) = broadcast::channel(1);
289 let query_secrets: HashSet<String> =
290 ["fake_secret_900"].into_iter().map(String::from).collect();
291 let fetched_keys = backend.retrieve(query_secrets.clone(), &mut rx).await;
292 assert_eq!(
293 format!("{}", fetched_keys.unwrap_err()),
294 "secret for key 'fake_secret_900' was not retrieved: backend does not provide secret key"
295 );
296 }
297}