1#![allow(missing_docs)]
2use std::{
3 sync::{Arc, LazyLock, RwLock},
4 time::Duration,
5};
6
7use base64::prelude::{BASE64_URL_SAFE, Engine as _};
8pub use goauth::scopes::Scope;
9use goauth::{
10 GoErr,
11 auth::{JwtClaims, Token, TokenErr},
12 credentials::Credentials,
13};
14use http::{Uri, uri::PathAndQuery};
15use hyper::header::AUTHORIZATION;
16use smpl_jwt::Jwt;
17use snafu::{ResultExt, Snafu};
18use tokio::sync::watch;
19use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
20
21use crate::{
22 config::ProxyConfig,
23 http::{HttpClient, HttpError},
24};
25
26const SERVICE_ACCOUNT_TOKEN_URL: &str =
27 "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
28
29const METADATA_TOKEN_EXPIRY_MARGIN_SECS: u64 = 200;
31
32const METADATA_TOKEN_ERROR_RETRY_SECS: u64 = 2;
33
34pub const PUBSUB_URL: &str = "https://pubsub.googleapis.com";
35
36pub static PUBSUB_ADDRESS: LazyLock<String> = LazyLock::new(|| {
37 std::env::var("EMULATOR_ADDRESS").unwrap_or_else(|_| "http://localhost:8681".into())
38});
39
40#[derive(Debug, Snafu)]
41#[snafu(visibility(pub))]
42pub enum GcpError {
43 #[snafu(display("This requires one of api_key or credentials_path to be defined"))]
44 MissingAuth,
45 #[snafu(display("Invalid GCP credentials: {}", source))]
46 InvalidCredentials { source: GoErr },
47 #[snafu(display("Invalid GCP API key: {}", source))]
48 InvalidApiKey { source: base64::DecodeError },
49 #[snafu(display("Healthcheck endpoint forbidden"))]
50 HealthcheckForbidden,
51 #[snafu(display("Invalid RSA key in GCP credentials: {}", source))]
52 InvalidRsaKey { source: GoErr },
53 #[snafu(display("Failed to get OAuth token: {}", source))]
54 GetToken { source: GoErr },
55 #[snafu(display("Failed to get OAuth token text: {}", source))]
56 GetTokenBytes { source: hyper::Error },
57 #[snafu(display("Failed to get implicit GCP token: {}", source))]
58 GetImplicitToken { source: HttpError },
59 #[snafu(display("Failed to parse OAuth token JSON: {}", source))]
60 TokenFromJson { source: TokenErr },
61 #[snafu(display("Failed to parse OAuth token JSON text: {}", source))]
62 TokenJsonFromStr { source: serde_json::Error },
63 #[snafu(display("Failed to build HTTP client: {}", source))]
64 BuildHttpClient { source: HttpError },
65}
66
67#[configurable_component]
75#[derive(Clone, Debug, Default)]
76pub struct GcpAuthConfig {
77 pub api_key: Option<SensitiveString>,
88
89 pub credentials_path: Option<String>,
100
101 #[serde(default, skip_serializing)]
103 #[configurable(metadata(docs::hidden))]
104 pub skip_authentication: bool,
105}
106
107impl GcpAuthConfig {
108 pub async fn build(&self, scope: Scope) -> crate::Result<GcpAuthenticator> {
109 Ok(if self.skip_authentication {
110 GcpAuthenticator::None
111 } else {
112 let gap = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok();
113 let creds_path = self.credentials_path.as_ref().or(gap.as_ref());
114 match (&creds_path, &self.api_key) {
115 (Some(path), _) => GcpAuthenticator::from_file(path, scope).await?,
116 (None, Some(api_key)) => GcpAuthenticator::from_api_key(api_key.inner())?,
117 (None, None) => GcpAuthenticator::new_implicit().await?,
118 }
119 })
120 }
121}
122
123#[derive(Clone, Debug)]
124pub enum GcpAuthenticator {
125 Credentials(Arc<InnerCreds>),
126 ApiKey(Box<str>),
127 None,
128}
129
130#[derive(Debug)]
131pub struct InnerCreds {
132 creds: Option<(Credentials, Scope)>,
133 token: RwLock<Token>,
134}
135
136impl GcpAuthenticator {
137 async fn from_file(path: &str, scope: Scope) -> crate::Result<Self> {
138 let creds = Credentials::from_file(path).context(InvalidCredentialsSnafu)?;
139 let token = RwLock::new(fetch_token(&creds, &scope).await?);
140 let creds = Some((creds, scope));
141 Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
142 }
143
144 async fn new_implicit() -> crate::Result<Self> {
145 let token = RwLock::new(get_token_implicit().await?);
146 let creds = None;
147 Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
148 }
149
150 fn from_api_key(api_key: &str) -> crate::Result<Self> {
151 BASE64_URL_SAFE
152 .decode(api_key)
153 .context(InvalidApiKeySnafu)?;
154 Ok(Self::ApiKey(api_key.into()))
155 }
156
157 pub fn make_token(&self) -> Option<String> {
158 match self {
159 Self::Credentials(inner) => Some(inner.make_token()),
160 Self::ApiKey(_) | Self::None => None,
161 }
162 }
163
164 pub fn apply<T>(&self, request: &mut http::Request<T>) {
165 if let Some(token) = self.make_token() {
166 request
167 .headers_mut()
168 .insert(AUTHORIZATION, token.parse().unwrap());
169 }
170 self.apply_uri(request.uri_mut());
171 }
172
173 pub fn apply_uri(&self, uri: &mut Uri) {
174 match self {
175 Self::Credentials(_) | Self::None => (),
176 Self::ApiKey(api_key) => {
177 let mut parts = uri.clone().into_parts();
178 let path = parts
179 .path_and_query
180 .as_ref()
181 .map_or("/", PathAndQuery::path);
182 let paq = format!("{path}?key={api_key}");
183 parts.path_and_query =
188 Some(paq.parse().expect("Could not re-parse path and query"));
189 *uri = Uri::from_parts(parts).expect("Could not re-parse URL");
190 }
191 }
192 }
193
194 pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> {
195 let (sender, receiver) = watch::channel(());
196 tokio::spawn(self.clone().token_regenerator(sender));
197 receiver
198 }
199
200 async fn token_regenerator(self, sender: watch::Sender<()>) {
201 match self {
202 Self::Credentials(inner) => {
203 let mut expires_in = inner.token.read().unwrap().expires_in() as u64;
204 loop {
205 let deadline = Duration::from_secs(
206 expires_in
207 .saturating_sub(METADATA_TOKEN_EXPIRY_MARGIN_SECS)
208 .max(METADATA_TOKEN_ERROR_RETRY_SECS),
209 );
210 debug!(
211 deadline = deadline.as_secs(),
212 "Sleeping before refreshing GCP authentication token.",
213 );
214 tokio::time::sleep(deadline).await;
215 match inner.regenerate_token().await {
216 Ok(()) => {
217 sender.send_replace(());
218 debug!("GCP authentication token renewed.");
219 expires_in = inner.token.read().unwrap().expires_in() as u64;
224 }
225 Err(error) => {
226 error!(
227 message = "Failed to update GCP authentication token.",
228 %error
229 );
230 expires_in = METADATA_TOKEN_EXPIRY_MARGIN_SECS;
231 }
232 }
233 }
234 }
235 Self::ApiKey(_) | Self::None => {
236 sender.closed().await
240 }
241 }
242 }
243}
244
245impl InnerCreds {
246 async fn regenerate_token(&self) -> crate::Result<()> {
247 let token = match &self.creds {
248 Some((creds, scope)) => fetch_token(creds, scope).await?,
249 None => get_token_implicit().await?,
250 };
251 *self.token.write().unwrap() = token;
252 Ok(())
253 }
254
255 fn make_token(&self) -> String {
256 let token = self.token.read().unwrap();
257 format!("{} {}", token.token_type(), token.access_token())
258 }
259}
260
261async fn fetch_token(creds: &Credentials, scope: &Scope) -> crate::Result<Token> {
262 let claims = JwtClaims::new(
263 creds.iss(),
264 std::slice::from_ref(scope),
265 creds.token_uri(),
266 None,
267 None,
268 );
269 let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?;
270 let jwt = Jwt::new(claims, rsa_key, None);
271
272 debug!(
273 message = "Fetching GCP authentication token.",
274 project = ?creds.project(),
275 iss = ?creds.iss(),
276 token_uri = ?creds.token_uri(),
277 );
278 goauth::get_token(&jwt, creds)
279 .await
280 .context(GetTokenSnafu)
281 .map_err(Into::into)
282}
283
284async fn get_token_implicit() -> Result<Token, GcpError> {
285 debug!("Fetching implicit GCP authentication token.");
286 let req = http::Request::get(SERVICE_ACCOUNT_TOKEN_URL)
287 .header("Metadata-Flavor", "Google")
288 .body(hyper::Body::empty())
289 .unwrap();
290
291 let proxy = ProxyConfig::from_env();
292 let res = HttpClient::new(None, &proxy)
293 .context(BuildHttpClientSnafu)?
294 .send(req)
295 .await
296 .context(GetImplicitTokenSnafu)?;
297
298 let body = res.into_body();
299 let bytes = hyper::body::to_bytes(body)
300 .await
301 .context(GetTokenBytesSnafu)?;
302
303 match serde_json::from_slice::<Token>(&bytes) {
305 Ok(token) => Ok(token),
306 Err(error) => Err(match serde_json::from_slice::<TokenErr>(&bytes) {
307 Ok(error) => GcpError::TokenFromJson { source: error },
308 Err(_) => GcpError::TokenJsonFromStr { source: error },
309 }),
310 }
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316 use crate::assert_downcast_matches;
317
318 #[tokio::test]
319 async fn fails_missing_creds() {
320 let error = build_auth("").await.expect_err("build failed to error");
321 assert_downcast_matches!(error, GcpError, GcpError::GetImplicitToken { .. });
322 }
324
325 #[tokio::test]
326 async fn skip_authentication() {
327 let auth = build_auth(
328 r#"
329 skip_authentication = true
330 api_key = "testing"
331 "#,
332 )
333 .await
334 .expect("build_auth failed");
335 assert!(matches!(auth, GcpAuthenticator::None));
336 }
337
338 #[tokio::test]
339 async fn uses_api_key() {
340 let key = crate::test_util::random_string(16);
341
342 let auth = build_auth(&format!(r#"api_key = "{key}""#))
343 .await
344 .expect("build_auth failed");
345 assert!(matches!(auth, GcpAuthenticator::ApiKey(..)));
346
347 assert_eq!(
348 apply_uri(&auth, "http://example.com"),
349 format!("http://example.com/?key={key}")
350 );
351 assert_eq!(
352 apply_uri(&auth, "http://example.com/"),
353 format!("http://example.com/?key={key}")
354 );
355 assert_eq!(
356 apply_uri(&auth, "http://example.com/path"),
357 format!("http://example.com/path?key={key}")
358 );
359 assert_eq!(
360 apply_uri(&auth, "http://example.com/path1/"),
361 format!("http://example.com/path1/?key={key}")
362 );
363 }
364
365 #[tokio::test]
366 async fn fails_bad_api_key() {
367 let error = build_auth(r#"api_key = "abc%xyz""#)
368 .await
369 .expect_err("build failed to error");
370 assert_downcast_matches!(error, GcpError, GcpError::InvalidApiKey { .. });
371 }
372
373 fn apply_uri(auth: &GcpAuthenticator, uri: &str) -> String {
374 let mut uri: Uri = uri.parse().unwrap();
375 auth.apply_uri(&mut uri);
376 uri.to_string()
377 }
378
379 async fn build_auth(toml: &str) -> crate::Result<GcpAuthenticator> {
380 let config: GcpAuthConfig = toml::from_str(toml).expect("Invalid TOML");
381 config.build(Scope::Compute).await
382 }
383}