1use std::collections::{HashMap, HashSet};
2
3use bytes::BytesMut;
4use futures::executor;
5use futures_util::StreamExt;
6use serde::{Deserialize, Serialize};
7use tokio::{io::AsyncWriteExt, process::Command, time};
8use tokio_util::codec;
9use vector_lib::configurable::{component::GenerateConfig, configurable_component};
10
11use crate::{config::SecretBackend, signal};
12
13#[configurable_component(secrets("exec"))]
15#[derive(Clone, Debug)]
16pub struct ExecBackend {
17 pub command: Vec<String>,
21
22 #[serde(default = "default_timeout_secs")]
24 pub timeout: u64,
25}
26
27impl GenerateConfig for ExecBackend {
28 fn generate_config() -> toml::Value {
29 toml::Value::try_from(ExecBackend {
30 command: vec![String::from("/path/to/script")],
31 timeout: 5,
32 })
33 .unwrap()
34 }
35}
36
37const fn default_timeout_secs() -> u64 {
38 5
39}
40
41#[derive(Clone, Debug, Deserialize, Serialize)]
42struct ExecQuery {
43 version: String,
44 secrets: HashSet<String>,
45}
46
47fn new_query(secrets: HashSet<String>) -> ExecQuery {
48 ExecQuery {
49 version: "1.0".to_string(),
50 secrets,
51 }
52}
53
54#[derive(Clone, Debug, Deserialize, Serialize)]
55struct ExecResponse {
56 value: Option<String>,
57 error: Option<String>,
58}
59
60impl SecretBackend for ExecBackend {
61 async fn retrieve(
62 &mut self,
63 secret_keys: HashSet<String>,
64 signal_rx: &mut signal::SignalRx,
65 ) -> crate::Result<HashMap<String, String>> {
66 let mut output = executor::block_on(async {
67 query_backend(
68 &self.command,
69 new_query(secret_keys.clone()),
70 self.timeout,
71 signal_rx,
72 )
73 .await
74 })?;
75 let mut secrets = HashMap::new();
76 for k in secret_keys.into_iter() {
77 if let Some(secret) = output.get_mut(&k) {
78 if let Some(e) = &secret.error {
79 return Err(format!("secret for key '{k}' was not retrieved: {e}").into());
80 }
81 if let Some(v) = secret.value.take() {
82 if v.is_empty() {
83 return Err(format!("secret for key '{k}' was empty").into());
84 }
85 secrets.insert(k.to_string(), v);
86 } else {
87 return Err(format!("secret for key '{k}' was empty").into());
88 }
89 } else {
90 return Err(format!("secret for key '{k}' was not retrieved").into());
91 }
92 }
93 Ok(secrets)
94 }
95}
96
97async fn query_backend(
98 cmd: &[String],
99 query: ExecQuery,
100 timeout: u64,
101 signal_rx: &mut signal::SignalRx,
102) -> crate::Result<HashMap<String, ExecResponse>> {
103 let command = &cmd[0];
104 let mut command = Command::new(command);
105
106 if cmd.len() > 1 {
107 command.args(&cmd[1..]);
108 };
109
110 command.kill_on_drop(true);
111 command.stderr(std::process::Stdio::piped());
112 command.stdin(std::process::Stdio::piped());
113 command.stdout(std::process::Stdio::piped());
114
115 let mut child = command.spawn()?;
116 let mut stdin = child.stdin.take().ok_or("unable to acquire stdin")?;
117 let mut stderr_stream = child
118 .stderr
119 .map(|s| codec::FramedRead::new(s, codec::LinesCodec::new()))
120 .ok_or("unable to acquire stderr")?;
121 let mut stdout_stream = child
122 .stdout
123 .map(|s| codec::FramedRead::new(s, codec::BytesCodec::new()))
124 .ok_or("unable to acquire stdout")?;
125
126 let query = serde_json::to_vec(&query)?;
127 tokio::spawn(async move { stdin.write_all(&query).await });
128
129 let timeout = time::sleep(time::Duration::from_secs(timeout));
130 tokio::pin!(timeout);
131 let mut output = BytesMut::new();
132 loop {
133 tokio::select! {
134 biased;
135 Ok(signal::SignalTo::Shutdown(_) | signal::SignalTo::Quit) = signal_rx.recv() => {
136 drop(command);
137 return Err("Secret retrieval was interrupted.".into());
138 }
139 Some(stderr) = stderr_stream.next() => {
140 match stderr {
141 Ok(l) => warn!("An exec backend generated message on stderr: {}.", l),
142 Err(e) => warn!("Error while reading from an exec backend stderr: {}.", e),
143 }
144 }
145 stdout = stdout_stream.next() => {
146 match stdout {
147 None => break,
148 Some(Ok(b)) => output.extend(b),
149 Some(Err(e)) => return Err(format!("Error while reading from an exec backend stdout: {e}.").into()),
150 }
151 }
152 _ = &mut timeout => {
153 drop(command);
154 return Err("Command timed-out".into());
155 }
156 }
157 }
158
159 let response = serde_json::from_slice::<HashMap<String, ExecResponse>>(&output)?;
160 Ok(response)
161}