vector/
gcp.rs

1#![allow(missing_docs)]
2use std::{
3    sync::{Arc, LazyLock, RwLock},
4    time::Duration,
5};
6
7use base64::prelude::{BASE64_URL_SAFE, Engine as _};
8pub use goauth::scopes::Scope;
9use goauth::{
10    GoErr,
11    auth::{JwtClaims, Token, TokenErr},
12    credentials::Credentials,
13};
14use http::{Uri, uri::PathAndQuery};
15use hyper::header::AUTHORIZATION;
16use smpl_jwt::Jwt;
17use snafu::{ResultExt, Snafu};
18use tokio::sync::watch;
19use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
20
21use crate::{
22    config::ProxyConfig,
23    http::{HttpClient, HttpError},
24};
25
26const SERVICE_ACCOUNT_TOKEN_URL: &str =
27    "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
28
29// See https://cloud.google.com/compute/docs/access/authenticate-workloads#applications
30const METADATA_TOKEN_EXPIRY_MARGIN_SECS: u64 = 200;
31
32const METADATA_TOKEN_ERROR_RETRY_SECS: u64 = 2;
33
34pub const PUBSUB_URL: &str = "https://pubsub.googleapis.com";
35
36pub static PUBSUB_ADDRESS: LazyLock<String> = LazyLock::new(|| {
37    std::env::var("EMULATOR_ADDRESS").unwrap_or_else(|_| "http://localhost:8681".into())
38});
39
40#[derive(Debug, Snafu)]
41#[snafu(visibility(pub))]
42pub enum GcpError {
43    #[snafu(display("This requires one of api_key or credentials_path to be defined"))]
44    MissingAuth,
45    #[snafu(display("Invalid GCP credentials: {}", source))]
46    InvalidCredentials { source: GoErr },
47    #[snafu(display("Invalid GCP API key: {}", source))]
48    InvalidApiKey { source: base64::DecodeError },
49    #[snafu(display("Healthcheck endpoint forbidden"))]
50    HealthcheckForbidden,
51    #[snafu(display("Invalid RSA key in GCP credentials: {}", source))]
52    InvalidRsaKey { source: GoErr },
53    #[snafu(display("Failed to get OAuth token: {}", source))]
54    GetToken { source: GoErr },
55    #[snafu(display("Failed to get OAuth token text: {}", source))]
56    GetTokenBytes { source: hyper::Error },
57    #[snafu(display("Failed to get implicit GCP token: {}", source))]
58    GetImplicitToken { source: HttpError },
59    #[snafu(display("Failed to parse OAuth token JSON: {}", source))]
60    TokenFromJson { source: TokenErr },
61    #[snafu(display("Failed to parse OAuth token JSON text: {}", source))]
62    TokenJsonFromStr { source: serde_json::Error },
63    #[snafu(display("Failed to build HTTP client: {}", source))]
64    BuildHttpClient { source: HttpError },
65}
66
67/// Configuration of the authentication strategy for interacting with GCP services.
68// TODO: We're duplicating the "either this or that" verbiage for each field because this struct gets flattened into the
69// component config types, which means all that's carried over are the fields, not the type itself.
70//
71// Seems like we really really have it as a nested field -- i.e. `auth.api_key` -- which is a closer fit to how we do
72// similar things in configuration (TLS, framing, decoding, etc.). Doing so would let us embed the type itself, and
73// hoist up the common documentation bits to the docs for the type rather than the fields.
74#[configurable_component]
75#[derive(Clone, Debug, Default)]
76pub struct GcpAuthConfig {
77    /// An [API key][gcp_api_key].
78    ///
79    /// Either an API key or a path to a service account credentials JSON file can be specified.
80    ///
81    /// If both are unset, the `GOOGLE_APPLICATION_CREDENTIALS` environment variable is checked for a filename. If no
82    /// filename is named, an attempt is made to fetch an instance service account for the compute instance the program is
83    /// running on. If this is not on a GCE instance, then you must define it with an API key or service account
84    /// credentials JSON file.
85    ///
86    /// [gcp_api_key]: https://cloud.google.com/docs/authentication/api-keys
87    pub api_key: Option<SensitiveString>,
88
89    /// Path to a [service account][gcp_service_account_credentials] credentials JSON file.
90    ///
91    /// Either an API key or a path to a service account credentials JSON file can be specified.
92    ///
93    /// If both are unset, the `GOOGLE_APPLICATION_CREDENTIALS` environment variable is checked for a filename. If no
94    /// filename is named, an attempt is made to fetch an instance service account for the compute instance the program is
95    /// running on. If this is not on a GCE instance, then you must define it with an API key or service account
96    /// credentials JSON file.
97    ///
98    /// [gcp_service_account_credentials]: https://cloud.google.com/docs/authentication/production#manually
99    pub credentials_path: Option<String>,
100
101    /// Skip all authentication handling. For use with integration tests only.
102    #[serde(default, skip_serializing)]
103    #[configurable(metadata(docs::hidden))]
104    pub skip_authentication: bool,
105}
106
107impl GcpAuthConfig {
108    pub async fn build(&self, scope: Scope) -> crate::Result<GcpAuthenticator> {
109        Ok(if self.skip_authentication {
110            GcpAuthenticator::None
111        } else {
112            let gap = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok();
113            let creds_path = self.credentials_path.as_ref().or(gap.as_ref());
114            match (&creds_path, &self.api_key) {
115                (Some(path), _) => GcpAuthenticator::from_file(path, scope).await?,
116                (None, Some(api_key)) => GcpAuthenticator::from_api_key(api_key.inner())?,
117                (None, None) => GcpAuthenticator::new_implicit().await?,
118            }
119        })
120    }
121}
122
123#[derive(Clone, Debug)]
124pub enum GcpAuthenticator {
125    Credentials(Arc<InnerCreds>),
126    ApiKey(Box<str>),
127    None,
128}
129
130#[derive(Debug)]
131pub struct InnerCreds {
132    creds: Option<(Credentials, Scope)>,
133    token: RwLock<Token>,
134}
135
136impl GcpAuthenticator {
137    async fn from_file(path: &str, scope: Scope) -> crate::Result<Self> {
138        let creds = Credentials::from_file(path).context(InvalidCredentialsSnafu)?;
139        let token = RwLock::new(fetch_token(&creds, &scope).await?);
140        let creds = Some((creds, scope));
141        Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
142    }
143
144    async fn new_implicit() -> crate::Result<Self> {
145        let token = RwLock::new(get_token_implicit().await?);
146        let creds = None;
147        Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
148    }
149
150    fn from_api_key(api_key: &str) -> crate::Result<Self> {
151        BASE64_URL_SAFE
152            .decode(api_key)
153            .context(InvalidApiKeySnafu)?;
154        Ok(Self::ApiKey(api_key.into()))
155    }
156
157    pub fn make_token(&self) -> Option<String> {
158        match self {
159            Self::Credentials(inner) => Some(inner.make_token()),
160            Self::ApiKey(_) | Self::None => None,
161        }
162    }
163
164    pub fn apply<T>(&self, request: &mut http::Request<T>) {
165        if let Some(token) = self.make_token() {
166            request
167                .headers_mut()
168                .insert(AUTHORIZATION, token.parse().unwrap());
169        }
170        self.apply_uri(request.uri_mut());
171    }
172
173    pub fn apply_uri(&self, uri: &mut Uri) {
174        match self {
175            Self::Credentials(_) | Self::None => (),
176            Self::ApiKey(api_key) => {
177                let mut parts = uri.clone().into_parts();
178                let path = parts
179                    .path_and_query
180                    .as_ref()
181                    .map_or("/", PathAndQuery::path);
182                let paq = format!("{path}?key={api_key}");
183                // The API key is verified above to only contain
184                // URL-safe characters. That key is added to a path
185                // that came from a successfully parsed URI. As such,
186                // re-parsing the string cannot fail.
187                parts.path_and_query =
188                    Some(paq.parse().expect("Could not re-parse path and query"));
189                *uri = Uri::from_parts(parts).expect("Could not re-parse URL");
190            }
191        }
192    }
193
194    pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> {
195        let (sender, receiver) = watch::channel(());
196        tokio::spawn(self.clone().token_regenerator(sender));
197        receiver
198    }
199
200    async fn token_regenerator(self, sender: watch::Sender<()>) {
201        match self {
202            Self::Credentials(inner) => {
203                let mut expires_in = inner.token.read().unwrap().expires_in() as u64;
204                loop {
205                    let deadline = Duration::from_secs(
206                        expires_in
207                            .saturating_sub(METADATA_TOKEN_EXPIRY_MARGIN_SECS)
208                            .max(METADATA_TOKEN_ERROR_RETRY_SECS),
209                    );
210                    debug!(
211                        deadline = deadline.as_secs(),
212                        "Sleeping before refreshing GCP authentication token.",
213                    );
214                    tokio::time::sleep(deadline).await;
215                    match inner.regenerate_token().await {
216                        Ok(()) => {
217                            sender.send_replace(());
218                            debug!("GCP authentication token renewed.");
219                            // Rather than an expected fresh token, the Metadata Server may return
220                            // the same (cached) token during the last 300 seconds of its lifetime.
221                            // This scenario is handled by retrying the token refresh after the
222                            // METADATA_TOKEN_ERROR_RETRY_SECS period when a fresh token is expected
223                            expires_in = inner.token.read().unwrap().expires_in() as u64;
224                        }
225                        Err(error) => {
226                            error!(
227                                message = "Failed to update GCP authentication token.",
228                                %error
229                            );
230                            expires_in = METADATA_TOKEN_EXPIRY_MARGIN_SECS;
231                        }
232                    }
233                }
234            }
235            Self::ApiKey(_) | Self::None => {
236                // This keeps the sender end of the watch open without
237                // actually sending anything, effectively creating an
238                // empty watch stream.
239                sender.closed().await
240            }
241        }
242    }
243}
244
245impl InnerCreds {
246    async fn regenerate_token(&self) -> crate::Result<()> {
247        let token = match &self.creds {
248            Some((creds, scope)) => fetch_token(creds, scope).await?,
249            None => get_token_implicit().await?,
250        };
251        *self.token.write().unwrap() = token;
252        Ok(())
253    }
254
255    fn make_token(&self) -> String {
256        let token = self.token.read().unwrap();
257        format!("{} {}", token.token_type(), token.access_token())
258    }
259}
260
261async fn fetch_token(creds: &Credentials, scope: &Scope) -> crate::Result<Token> {
262    let claims = JwtClaims::new(
263        creds.iss(),
264        std::slice::from_ref(scope),
265        creds.token_uri(),
266        None,
267        None,
268    );
269    let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?;
270    let jwt = Jwt::new(claims, rsa_key, None);
271
272    debug!(
273        message = "Fetching GCP authentication token.",
274        project = ?creds.project(),
275        iss = ?creds.iss(),
276        token_uri = ?creds.token_uri(),
277    );
278    goauth::get_token(&jwt, creds)
279        .await
280        .context(GetTokenSnafu)
281        .map_err(Into::into)
282}
283
284async fn get_token_implicit() -> Result<Token, GcpError> {
285    debug!("Fetching implicit GCP authentication token.");
286    let req = http::Request::get(SERVICE_ACCOUNT_TOKEN_URL)
287        .header("Metadata-Flavor", "Google")
288        .body(hyper::Body::empty())
289        .unwrap();
290
291    let proxy = ProxyConfig::from_env();
292    let res = HttpClient::new(None, &proxy)
293        .context(BuildHttpClientSnafu)?
294        .send(req)
295        .await
296        .context(GetImplicitTokenSnafu)?;
297
298    let body = res.into_body();
299    let bytes = hyper::body::to_bytes(body)
300        .await
301        .context(GetTokenBytesSnafu)?;
302
303    // Token::from_str is irresponsible and may panic!
304    match serde_json::from_slice::<Token>(&bytes) {
305        Ok(token) => Ok(token),
306        Err(error) => Err(match serde_json::from_slice::<TokenErr>(&bytes) {
307            Ok(error) => GcpError::TokenFromJson { source: error },
308            Err(_) => GcpError::TokenJsonFromStr { source: error },
309        }),
310    }
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316    use crate::assert_downcast_matches;
317
318    #[tokio::test]
319    async fn fails_missing_creds() {
320        let error = build_auth("").await.expect_err("build failed to error");
321        assert_downcast_matches!(error, GcpError, GcpError::GetImplicitToken { .. });
322        // This should be a more relevant error
323    }
324
325    #[tokio::test]
326    async fn skip_authentication() {
327        let auth = build_auth(
328            r#"
329                skip_authentication = true
330                api_key = "testing"
331            "#,
332        )
333        .await
334        .expect("build_auth failed");
335        assert!(matches!(auth, GcpAuthenticator::None));
336    }
337
338    #[tokio::test]
339    async fn uses_api_key() {
340        let key = crate::test_util::random_string(16);
341
342        let auth = build_auth(&format!(r#"api_key = "{key}""#))
343            .await
344            .expect("build_auth failed");
345        assert!(matches!(auth, GcpAuthenticator::ApiKey(..)));
346
347        assert_eq!(
348            apply_uri(&auth, "http://example.com"),
349            format!("http://example.com/?key={key}")
350        );
351        assert_eq!(
352            apply_uri(&auth, "http://example.com/"),
353            format!("http://example.com/?key={key}")
354        );
355        assert_eq!(
356            apply_uri(&auth, "http://example.com/path"),
357            format!("http://example.com/path?key={key}")
358        );
359        assert_eq!(
360            apply_uri(&auth, "http://example.com/path1/"),
361            format!("http://example.com/path1/?key={key}")
362        );
363    }
364
365    #[tokio::test]
366    async fn fails_bad_api_key() {
367        let error = build_auth(r#"api_key = "abc%xyz""#)
368            .await
369            .expect_err("build failed to error");
370        assert_downcast_matches!(error, GcpError, GcpError::InvalidApiKey { .. });
371    }
372
373    fn apply_uri(auth: &GcpAuthenticator, uri: &str) -> String {
374        let mut uri: Uri = uri.parse().unwrap();
375        auth.apply_uri(&mut uri);
376        uri.to_string()
377    }
378
379    async fn build_auth(toml: &str) -> crate::Result<GcpAuthenticator> {
380        let config: GcpAuthConfig = toml::from_str(toml).expect("Invalid TOML");
381        config.build(Scope::Compute).await
382    }
383}