1#![allow(missing_docs)]
2use std::{
3 sync::{Arc, LazyLock, RwLock},
4 time::Duration,
5};
6
7use base64::prelude::{Engine as _, BASE64_URL_SAFE};
8pub use goauth::scopes::Scope;
9use goauth::{
10 auth::{JwtClaims, Token, TokenErr},
11 credentials::Credentials,
12 GoErr,
13};
14use http::{uri::PathAndQuery, Uri};
15use hyper::header::AUTHORIZATION;
16use smpl_jwt::Jwt;
17use snafu::{ResultExt, Snafu};
18use tokio::sync::watch;
19use vector_lib::configurable::configurable_component;
20use vector_lib::sensitive_string::SensitiveString;
21
22use crate::{config::ProxyConfig, http::HttpClient, http::HttpError};
23
24const SERVICE_ACCOUNT_TOKEN_URL: &str =
25 "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
26
27const METADATA_TOKEN_EXPIRY_MARGIN_SECS: u64 = 200;
29
30const METADATA_TOKEN_ERROR_RETRY_SECS: u64 = 2;
31
32pub const PUBSUB_URL: &str = "https://pubsub.googleapis.com";
33
34pub static PUBSUB_ADDRESS: LazyLock<String> = LazyLock::new(|| {
35 std::env::var("EMULATOR_ADDRESS").unwrap_or_else(|_| "http://localhost:8681".into())
36});
37
38#[derive(Debug, Snafu)]
39#[snafu(visibility(pub))]
40pub enum GcpError {
41 #[snafu(display("This requires one of api_key or credentials_path to be defined"))]
42 MissingAuth,
43 #[snafu(display("Invalid GCP credentials: {}", source))]
44 InvalidCredentials { source: GoErr },
45 #[snafu(display("Invalid GCP API key: {}", source))]
46 InvalidApiKey { source: base64::DecodeError },
47 #[snafu(display("Healthcheck endpoint forbidden"))]
48 HealthcheckForbidden,
49 #[snafu(display("Invalid RSA key in GCP credentials: {}", source))]
50 InvalidRsaKey { source: GoErr },
51 #[snafu(display("Failed to get OAuth token: {}", source))]
52 GetToken { source: GoErr },
53 #[snafu(display("Failed to get OAuth token text: {}", source))]
54 GetTokenBytes { source: hyper::Error },
55 #[snafu(display("Failed to get implicit GCP token: {}", source))]
56 GetImplicitToken { source: HttpError },
57 #[snafu(display("Failed to parse OAuth token JSON: {}", source))]
58 TokenFromJson { source: TokenErr },
59 #[snafu(display("Failed to parse OAuth token JSON text: {}", source))]
60 TokenJsonFromStr { source: serde_json::Error },
61 #[snafu(display("Failed to build HTTP client: {}", source))]
62 BuildHttpClient { source: HttpError },
63}
64
65#[configurable_component]
73#[derive(Clone, Debug, Default)]
74pub struct GcpAuthConfig {
75 pub api_key: Option<SensitiveString>,
86
87 pub credentials_path: Option<String>,
98
99 #[serde(default, skip_serializing)]
101 #[configurable(metadata(docs::hidden))]
102 pub skip_authentication: bool,
103}
104
105impl GcpAuthConfig {
106 pub async fn build(&self, scope: Scope) -> crate::Result<GcpAuthenticator> {
107 Ok(if self.skip_authentication {
108 GcpAuthenticator::None
109 } else {
110 let gap = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok();
111 let creds_path = self.credentials_path.as_ref().or(gap.as_ref());
112 match (&creds_path, &self.api_key) {
113 (Some(path), _) => GcpAuthenticator::from_file(path, scope).await?,
114 (None, Some(api_key)) => GcpAuthenticator::from_api_key(api_key.inner())?,
115 (None, None) => GcpAuthenticator::new_implicit().await?,
116 }
117 })
118 }
119}
120
121#[derive(Clone, Debug)]
122pub enum GcpAuthenticator {
123 Credentials(Arc<InnerCreds>),
124 ApiKey(Box<str>),
125 None,
126}
127
128#[derive(Debug)]
129pub struct InnerCreds {
130 creds: Option<(Credentials, Scope)>,
131 token: RwLock<Token>,
132}
133
134impl GcpAuthenticator {
135 async fn from_file(path: &str, scope: Scope) -> crate::Result<Self> {
136 let creds = Credentials::from_file(path).context(InvalidCredentialsSnafu)?;
137 let token = RwLock::new(fetch_token(&creds, &scope).await?);
138 let creds = Some((creds, scope));
139 Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
140 }
141
142 async fn new_implicit() -> crate::Result<Self> {
143 let token = RwLock::new(get_token_implicit().await?);
144 let creds = None;
145 Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
146 }
147
148 fn from_api_key(api_key: &str) -> crate::Result<Self> {
149 BASE64_URL_SAFE
150 .decode(api_key)
151 .context(InvalidApiKeySnafu)?;
152 Ok(Self::ApiKey(api_key.into()))
153 }
154
155 pub fn make_token(&self) -> Option<String> {
156 match self {
157 Self::Credentials(inner) => Some(inner.make_token()),
158 Self::ApiKey(_) | Self::None => None,
159 }
160 }
161
162 pub fn apply<T>(&self, request: &mut http::Request<T>) {
163 if let Some(token) = self.make_token() {
164 request
165 .headers_mut()
166 .insert(AUTHORIZATION, token.parse().unwrap());
167 }
168 self.apply_uri(request.uri_mut());
169 }
170
171 pub fn apply_uri(&self, uri: &mut Uri) {
172 match self {
173 Self::Credentials(_) | Self::None => (),
174 Self::ApiKey(api_key) => {
175 let mut parts = uri.clone().into_parts();
176 let path = parts
177 .path_and_query
178 .as_ref()
179 .map_or("/", PathAndQuery::path);
180 let paq = format!("{path}?key={api_key}");
181 parts.path_and_query =
186 Some(paq.parse().expect("Could not re-parse path and query"));
187 *uri = Uri::from_parts(parts).expect("Could not re-parse URL");
188 }
189 }
190 }
191
192 pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> {
193 let (sender, receiver) = watch::channel(());
194 tokio::spawn(self.clone().token_regenerator(sender));
195 receiver
196 }
197
198 async fn token_regenerator(self, sender: watch::Sender<()>) {
199 match self {
200 Self::Credentials(inner) => {
201 let mut expires_in = inner.token.read().unwrap().expires_in() as u64;
202 loop {
203 let deadline = Duration::from_secs(
204 expires_in
205 .saturating_sub(METADATA_TOKEN_EXPIRY_MARGIN_SECS)
206 .max(METADATA_TOKEN_ERROR_RETRY_SECS),
207 );
208 debug!(
209 deadline = deadline.as_secs(),
210 "Sleeping before refreshing GCP authentication token.",
211 );
212 tokio::time::sleep(deadline).await;
213 match inner.regenerate_token().await {
214 Ok(()) => {
215 sender.send_replace(());
216 debug!("GCP authentication token renewed.");
217 expires_in = inner.token.read().unwrap().expires_in() as u64;
222 }
223 Err(error) => {
224 error!(
225 message = "Failed to update GCP authentication token.",
226 %error
227 );
228 expires_in = METADATA_TOKEN_EXPIRY_MARGIN_SECS;
229 }
230 }
231 }
232 }
233 Self::ApiKey(_) | Self::None => {
234 sender.closed().await
238 }
239 }
240 }
241}
242
243impl InnerCreds {
244 async fn regenerate_token(&self) -> crate::Result<()> {
245 let token = match &self.creds {
246 Some((creds, scope)) => fetch_token(creds, scope).await?,
247 None => get_token_implicit().await?,
248 };
249 *self.token.write().unwrap() = token;
250 Ok(())
251 }
252
253 fn make_token(&self) -> String {
254 let token = self.token.read().unwrap();
255 format!("{} {}", token.token_type(), token.access_token())
256 }
257}
258
259async fn fetch_token(creds: &Credentials, scope: &Scope) -> crate::Result<Token> {
260 let claims = JwtClaims::new(creds.iss(), &[scope.clone()], creds.token_uri(), None, None);
261 let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?;
262 let jwt = Jwt::new(claims, rsa_key, None);
263
264 debug!(
265 message = "Fetching GCP authentication token.",
266 project = ?creds.project(),
267 iss = ?creds.iss(),
268 token_uri = ?creds.token_uri(),
269 );
270 goauth::get_token(&jwt, creds)
271 .await
272 .context(GetTokenSnafu)
273 .map_err(Into::into)
274}
275
276async fn get_token_implicit() -> Result<Token, GcpError> {
277 debug!("Fetching implicit GCP authentication token.");
278 let req = http::Request::get(SERVICE_ACCOUNT_TOKEN_URL)
279 .header("Metadata-Flavor", "Google")
280 .body(hyper::Body::empty())
281 .unwrap();
282
283 let proxy = ProxyConfig::from_env();
284 let res = HttpClient::new(None, &proxy)
285 .context(BuildHttpClientSnafu)?
286 .send(req)
287 .await
288 .context(GetImplicitTokenSnafu)?;
289
290 let body = res.into_body();
291 let bytes = hyper::body::to_bytes(body)
292 .await
293 .context(GetTokenBytesSnafu)?;
294
295 match serde_json::from_slice::<Token>(&bytes) {
297 Ok(token) => Ok(token),
298 Err(error) => Err(match serde_json::from_slice::<TokenErr>(&bytes) {
299 Ok(error) => GcpError::TokenFromJson { source: error },
300 Err(_) => GcpError::TokenJsonFromStr { source: error },
301 }),
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use super::*;
308 use crate::assert_downcast_matches;
309
310 #[tokio::test]
311 async fn fails_missing_creds() {
312 let error = build_auth("").await.expect_err("build failed to error");
313 assert_downcast_matches!(error, GcpError, GcpError::GetImplicitToken { .. });
314 }
316
317 #[tokio::test]
318 async fn skip_authentication() {
319 let auth = build_auth(
320 r#"
321 skip_authentication = true
322 api_key = "testing"
323 "#,
324 )
325 .await
326 .expect("build_auth failed");
327 assert!(matches!(auth, GcpAuthenticator::None));
328 }
329
330 #[tokio::test]
331 async fn uses_api_key() {
332 let key = crate::test_util::random_string(16);
333
334 let auth = build_auth(&format!(r#"api_key = "{key}""#))
335 .await
336 .expect("build_auth failed");
337 assert!(matches!(auth, GcpAuthenticator::ApiKey(..)));
338
339 assert_eq!(
340 apply_uri(&auth, "http://example.com"),
341 format!("http://example.com/?key={key}")
342 );
343 assert_eq!(
344 apply_uri(&auth, "http://example.com/"),
345 format!("http://example.com/?key={key}")
346 );
347 assert_eq!(
348 apply_uri(&auth, "http://example.com/path"),
349 format!("http://example.com/path?key={key}")
350 );
351 assert_eq!(
352 apply_uri(&auth, "http://example.com/path1/"),
353 format!("http://example.com/path1/?key={key}")
354 );
355 }
356
357 #[tokio::test]
358 async fn fails_bad_api_key() {
359 let error = build_auth(r#"api_key = "abc%xyz""#)
360 .await
361 .expect_err("build failed to error");
362 assert_downcast_matches!(error, GcpError, GcpError::InvalidApiKey { .. });
363 }
364
365 fn apply_uri(auth: &GcpAuthenticator, uri: &str) -> String {
366 let mut uri: Uri = uri.parse().unwrap();
367 auth.apply_uri(&mut uri);
368 uri.to_string()
369 }
370
371 async fn build_auth(toml: &str) -> crate::Result<GcpAuthenticator> {
372 let config: GcpAuthConfig = toml::from_str(toml).expect("Invalid TOML");
373 config.build(Scope::Compute).await
374 }
375}