1#![allow(missing_docs)]
2use std::{
3 sync::{Arc, LazyLock, RwLock},
4 time::Duration,
5};
6
7use base64::prelude::{BASE64_URL_SAFE, Engine as _};
8pub use goauth::scopes::Scope;
9use goauth::{
10 GoErr,
11 auth::{JwtClaims, Token, TokenErr},
12 credentials::Credentials,
13};
14use http::{Uri, uri::PathAndQuery};
15use http_body::{Body as _, Collected};
16use hyper::header::AUTHORIZATION;
17use smpl_jwt::Jwt;
18use snafu::{ResultExt, Snafu};
19use tokio::sync::watch;
20use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
21
22use crate::{
23 config::ProxyConfig,
24 http::{HttpClient, HttpError},
25};
26
27const SERVICE_ACCOUNT_TOKEN_URL: &str =
28 "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
29
30const METADATA_TOKEN_EXPIRY_MARGIN_SECS: u64 = 200;
32
33const METADATA_TOKEN_ERROR_RETRY_SECS: u64 = 2;
34
35pub const PUBSUB_URL: &str = "https://pubsub.googleapis.com";
36
37pub static PUBSUB_ADDRESS: LazyLock<String> = LazyLock::new(|| {
38 std::env::var("EMULATOR_ADDRESS").unwrap_or_else(|_| "http://localhost:8681".into())
39});
40
41#[derive(Debug, Snafu)]
42#[snafu(visibility(pub))]
43pub enum GcpError {
44 #[snafu(display("This requires one of api_key or credentials_path to be defined"))]
45 MissingAuth,
46 #[snafu(display("Invalid GCP credentials: {}", source))]
47 InvalidCredentials { source: GoErr },
48 #[snafu(display("Invalid GCP API key: {}", source))]
49 InvalidApiKey { source: base64::DecodeError },
50 #[snafu(display("Healthcheck endpoint forbidden"))]
51 HealthcheckForbidden,
52 #[snafu(display("Invalid RSA key in GCP credentials: {}", source))]
53 InvalidRsaKey { source: GoErr },
54 #[snafu(display("Failed to get OAuth token: {}", source))]
55 GetToken { source: GoErr },
56 #[snafu(display("Failed to get OAuth token text: {}", source))]
57 GetTokenBytes { source: hyper::Error },
58 #[snafu(display("Failed to get implicit GCP token: {}", source))]
59 GetImplicitToken { source: HttpError },
60 #[snafu(display("Failed to parse OAuth token JSON: {}", source))]
61 TokenFromJson { source: TokenErr },
62 #[snafu(display("Failed to parse OAuth token JSON text: {}", source))]
63 TokenJsonFromStr { source: serde_json::Error },
64 #[snafu(display("Failed to build HTTP client: {}", source))]
65 BuildHttpClient { source: HttpError },
66}
67
68#[configurable_component]
76#[derive(Clone, Debug, Default)]
77pub struct GcpAuthConfig {
78 pub api_key: Option<SensitiveString>,
89
90 pub credentials_path: Option<String>,
101
102 #[serde(default, skip_serializing)]
104 #[configurable(metadata(docs::hidden))]
105 pub skip_authentication: bool,
106}
107
108impl GcpAuthConfig {
109 pub async fn build(&self, scope: Scope) -> crate::Result<GcpAuthenticator> {
110 Ok(if self.skip_authentication {
111 GcpAuthenticator::None
112 } else {
113 let gap = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok();
114 let creds_path = self.credentials_path.as_ref().or(gap.as_ref());
115 match (&creds_path, &self.api_key) {
116 (Some(path), _) => GcpAuthenticator::from_file(path, scope).await?,
117 (None, Some(api_key)) => GcpAuthenticator::from_api_key(api_key.inner())?,
118 (None, None) => GcpAuthenticator::new_implicit().await?,
119 }
120 })
121 }
122}
123
124#[derive(Clone, Debug)]
125pub enum GcpAuthenticator {
126 Credentials(Arc<InnerCreds>),
127 ApiKey(Box<str>),
128 None,
129}
130
131#[derive(Debug)]
132pub struct InnerCreds {
133 creds: Option<(Credentials, Scope)>,
134 token: RwLock<Token>,
135}
136
137impl GcpAuthenticator {
138 async fn from_file(path: &str, scope: Scope) -> crate::Result<Self> {
139 let creds = Credentials::from_file(path).context(InvalidCredentialsSnafu)?;
140 let token = RwLock::new(fetch_token(&creds, &scope).await?);
141 let creds = Some((creds, scope));
142 Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
143 }
144
145 async fn new_implicit() -> crate::Result<Self> {
146 let token = RwLock::new(get_token_implicit().await?);
147 let creds = None;
148 Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
149 }
150
151 fn from_api_key(api_key: &str) -> crate::Result<Self> {
152 BASE64_URL_SAFE
153 .decode(api_key)
154 .context(InvalidApiKeySnafu)?;
155 Ok(Self::ApiKey(api_key.into()))
156 }
157
158 pub fn make_token(&self) -> Option<String> {
159 match self {
160 Self::Credentials(inner) => Some(inner.make_token()),
161 Self::ApiKey(_) | Self::None => None,
162 }
163 }
164
165 pub fn apply<T>(&self, request: &mut http::Request<T>) {
166 if let Some(token) = self.make_token() {
167 request
168 .headers_mut()
169 .insert(AUTHORIZATION, token.parse().unwrap());
170 }
171 self.apply_uri(request.uri_mut());
172 }
173
174 pub fn apply_uri(&self, uri: &mut Uri) {
175 match self {
176 Self::Credentials(_) | Self::None => (),
177 Self::ApiKey(api_key) => {
178 let mut parts = uri.clone().into_parts();
179 let path = parts
180 .path_and_query
181 .as_ref()
182 .map_or("/", PathAndQuery::path);
183 let paq = format!("{path}?key={api_key}");
184 parts.path_and_query =
189 Some(paq.parse().expect("Could not re-parse path and query"));
190 *uri = Uri::from_parts(parts).expect("Could not re-parse URL");
191 }
192 }
193 }
194
195 pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> {
196 let (sender, receiver) = watch::channel(());
197 tokio::spawn(self.clone().token_regenerator(sender));
198 receiver
199 }
200
201 async fn token_regenerator(self, sender: watch::Sender<()>) {
202 match self {
203 Self::Credentials(inner) => {
204 let mut expires_in = inner.token.read().unwrap().expires_in() as u64;
205 loop {
206 let deadline = Duration::from_secs(
207 expires_in
208 .saturating_sub(METADATA_TOKEN_EXPIRY_MARGIN_SECS)
209 .max(METADATA_TOKEN_ERROR_RETRY_SECS),
210 );
211 debug!(
212 deadline = deadline.as_secs(),
213 "Sleeping before refreshing GCP authentication token.",
214 );
215 tokio::time::sleep(deadline).await;
216 match inner.regenerate_token().await {
217 Ok(()) => {
218 sender.send_replace(());
219 debug!("GCP authentication token renewed.");
220 expires_in = inner.token.read().unwrap().expires_in() as u64;
225 }
226 Err(error) => {
227 error!(
228 message = "Failed to update GCP authentication token.",
229 %error
230 );
231 expires_in = METADATA_TOKEN_EXPIRY_MARGIN_SECS;
232 }
233 }
234 }
235 }
236 Self::ApiKey(_) | Self::None => {
237 sender.closed().await
241 }
242 }
243 }
244}
245
246impl InnerCreds {
247 async fn regenerate_token(&self) -> crate::Result<()> {
248 let token = match &self.creds {
249 Some((creds, scope)) => fetch_token(creds, scope).await?,
250 None => get_token_implicit().await?,
251 };
252 *self.token.write().unwrap() = token;
253 Ok(())
254 }
255
256 fn make_token(&self) -> String {
257 let token = self.token.read().unwrap();
258 format!("{} {}", token.token_type(), token.access_token())
259 }
260}
261
262async fn fetch_token(creds: &Credentials, scope: &Scope) -> crate::Result<Token> {
263 let claims = JwtClaims::new(
264 creds.iss(),
265 std::slice::from_ref(scope),
266 creds.token_uri(),
267 None,
268 None,
269 );
270 let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?;
271 let jwt = Jwt::new(claims, rsa_key, None);
272
273 debug!(
274 message = "Fetching GCP authentication token.",
275 project = ?creds.project(),
276 iss = ?creds.iss(),
277 token_uri = ?creds.token_uri(),
278 );
279 goauth::get_token(&jwt, creds)
280 .await
281 .context(GetTokenSnafu)
282 .map_err(Into::into)
283}
284
285async fn get_token_implicit() -> Result<Token, GcpError> {
286 debug!("Fetching implicit GCP authentication token.");
287 let req = http::Request::get(SERVICE_ACCOUNT_TOKEN_URL)
288 .header("Metadata-Flavor", "Google")
289 .body(hyper::Body::empty())
290 .unwrap();
291
292 let proxy = ProxyConfig::from_env();
293 let res = HttpClient::new(None, &proxy)
294 .context(BuildHttpClientSnafu)?
295 .send(req)
296 .await
297 .context(GetImplicitTokenSnafu)?;
298
299 let body = res.into_body();
300 let bytes = body
301 .collect()
302 .await
303 .map(Collected::to_bytes)
304 .context(GetTokenBytesSnafu)?;
305
306 match serde_json::from_slice::<Token>(&bytes) {
308 Ok(token) => Ok(token),
309 Err(error) => Err(match serde_json::from_slice::<TokenErr>(&bytes) {
310 Ok(error) => GcpError::TokenFromJson { source: error },
311 Err(_) => GcpError::TokenJsonFromStr { source: error },
312 }),
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319 use crate::assert_downcast_matches;
320
321 #[tokio::test]
322 async fn fails_missing_creds() {
323 let error = build_auth("").await.expect_err("build failed to error");
324 assert_downcast_matches!(error, GcpError, GcpError::GetImplicitToken { .. });
325 }
327
328 #[tokio::test]
329 async fn skip_authentication() {
330 let auth = build_auth(
331 r#"
332 skip_authentication = true
333 api_key = "testing"
334 "#,
335 )
336 .await
337 .expect("build_auth failed");
338 assert!(matches!(auth, GcpAuthenticator::None));
339 }
340
341 #[tokio::test]
342 async fn uses_api_key() {
343 let key = crate::test_util::random_string(16);
344
345 let auth = build_auth(&format!(r#"api_key = "{key}""#))
346 .await
347 .expect("build_auth failed");
348 assert!(matches!(auth, GcpAuthenticator::ApiKey(..)));
349
350 assert_eq!(
351 apply_uri(&auth, "http://example.com"),
352 format!("http://example.com/?key={key}")
353 );
354 assert_eq!(
355 apply_uri(&auth, "http://example.com/"),
356 format!("http://example.com/?key={key}")
357 );
358 assert_eq!(
359 apply_uri(&auth, "http://example.com/path"),
360 format!("http://example.com/path?key={key}")
361 );
362 assert_eq!(
363 apply_uri(&auth, "http://example.com/path1/"),
364 format!("http://example.com/path1/?key={key}")
365 );
366 }
367
368 #[tokio::test]
369 async fn fails_bad_api_key() {
370 let error = build_auth(r#"api_key = "abc%xyz""#)
371 .await
372 .expect_err("build failed to error");
373 assert_downcast_matches!(error, GcpError, GcpError::InvalidApiKey { .. });
374 }
375
376 fn apply_uri(auth: &GcpAuthenticator, uri: &str) -> String {
377 let mut uri: Uri = uri.parse().unwrap();
378 auth.apply_uri(&mut uri);
379 uri.to_string()
380 }
381
382 async fn build_auth(toml: &str) -> crate::Result<GcpAuthenticator> {
383 let config: GcpAuthConfig = toml::from_str(toml).expect("Invalid TOML");
384 config.build(Scope::Compute).await
385 }
386}