vector/
gcp.rs

1#![allow(missing_docs)]
2use std::{
3    sync::{Arc, LazyLock, RwLock},
4    time::Duration,
5};
6
7use base64::prelude::{Engine as _, BASE64_URL_SAFE};
8pub use goauth::scopes::Scope;
9use goauth::{
10    auth::{JwtClaims, Token, TokenErr},
11    credentials::Credentials,
12    GoErr,
13};
14use http::{uri::PathAndQuery, Uri};
15use hyper::header::AUTHORIZATION;
16use smpl_jwt::Jwt;
17use snafu::{ResultExt, Snafu};
18use tokio::sync::watch;
19use vector_lib::configurable::configurable_component;
20use vector_lib::sensitive_string::SensitiveString;
21
22use crate::{config::ProxyConfig, http::HttpClient, http::HttpError};
23
24const SERVICE_ACCOUNT_TOKEN_URL: &str =
25    "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
26
27// See https://cloud.google.com/compute/docs/access/authenticate-workloads#applications
28const METADATA_TOKEN_EXPIRY_MARGIN_SECS: u64 = 200;
29
30const METADATA_TOKEN_ERROR_RETRY_SECS: u64 = 2;
31
32pub const PUBSUB_URL: &str = "https://pubsub.googleapis.com";
33
34pub static PUBSUB_ADDRESS: LazyLock<String> = LazyLock::new(|| {
35    std::env::var("EMULATOR_ADDRESS").unwrap_or_else(|_| "http://localhost:8681".into())
36});
37
38#[derive(Debug, Snafu)]
39#[snafu(visibility(pub))]
40pub enum GcpError {
41    #[snafu(display("This requires one of api_key or credentials_path to be defined"))]
42    MissingAuth,
43    #[snafu(display("Invalid GCP credentials: {}", source))]
44    InvalidCredentials { source: GoErr },
45    #[snafu(display("Invalid GCP API key: {}", source))]
46    InvalidApiKey { source: base64::DecodeError },
47    #[snafu(display("Healthcheck endpoint forbidden"))]
48    HealthcheckForbidden,
49    #[snafu(display("Invalid RSA key in GCP credentials: {}", source))]
50    InvalidRsaKey { source: GoErr },
51    #[snafu(display("Failed to get OAuth token: {}", source))]
52    GetToken { source: GoErr },
53    #[snafu(display("Failed to get OAuth token text: {}", source))]
54    GetTokenBytes { source: hyper::Error },
55    #[snafu(display("Failed to get implicit GCP token: {}", source))]
56    GetImplicitToken { source: HttpError },
57    #[snafu(display("Failed to parse OAuth token JSON: {}", source))]
58    TokenFromJson { source: TokenErr },
59    #[snafu(display("Failed to parse OAuth token JSON text: {}", source))]
60    TokenJsonFromStr { source: serde_json::Error },
61    #[snafu(display("Failed to build HTTP client: {}", source))]
62    BuildHttpClient { source: HttpError },
63}
64
65/// Configuration of the authentication strategy for interacting with GCP services.
66// TODO: We're duplicating the "either this or that" verbiage for each field because this struct gets flattened into the
67// component config types, which means all that's carried over are the fields, not the type itself.
68//
69// Seems like we really really have it as a nested field -- i.e. `auth.api_key` -- which is a closer fit to how we do
70// similar things in configuration (TLS, framing, decoding, etc.). Doing so would let us embed the type itself, and
71// hoist up the common documentation bits to the docs for the type rather than the fields.
72#[configurable_component]
73#[derive(Clone, Debug, Default)]
74pub struct GcpAuthConfig {
75    /// An [API key][gcp_api_key].
76    ///
77    /// Either an API key or a path to a service account credentials JSON file can be specified.
78    ///
79    /// If both are unset, the `GOOGLE_APPLICATION_CREDENTIALS` environment variable is checked for a filename. If no
80    /// filename is named, an attempt is made to fetch an instance service account for the compute instance the program is
81    /// running on. If this is not on a GCE instance, then you must define it with an API key or service account
82    /// credentials JSON file.
83    ///
84    /// [gcp_api_key]: https://cloud.google.com/docs/authentication/api-keys
85    pub api_key: Option<SensitiveString>,
86
87    /// Path to a [service account][gcp_service_account_credentials] credentials JSON file.
88    ///
89    /// Either an API key or a path to a service account credentials JSON file can be specified.
90    ///
91    /// If both are unset, the `GOOGLE_APPLICATION_CREDENTIALS` environment variable is checked for a filename. If no
92    /// filename is named, an attempt is made to fetch an instance service account for the compute instance the program is
93    /// running on. If this is not on a GCE instance, then you must define it with an API key or service account
94    /// credentials JSON file.
95    ///
96    /// [gcp_service_account_credentials]: https://cloud.google.com/docs/authentication/production#manually
97    pub credentials_path: Option<String>,
98
99    /// Skip all authentication handling. For use with integration tests only.
100    #[serde(default, skip_serializing)]
101    #[configurable(metadata(docs::hidden))]
102    pub skip_authentication: bool,
103}
104
105impl GcpAuthConfig {
106    pub async fn build(&self, scope: Scope) -> crate::Result<GcpAuthenticator> {
107        Ok(if self.skip_authentication {
108            GcpAuthenticator::None
109        } else {
110            let gap = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok();
111            let creds_path = self.credentials_path.as_ref().or(gap.as_ref());
112            match (&creds_path, &self.api_key) {
113                (Some(path), _) => GcpAuthenticator::from_file(path, scope).await?,
114                (None, Some(api_key)) => GcpAuthenticator::from_api_key(api_key.inner())?,
115                (None, None) => GcpAuthenticator::new_implicit().await?,
116            }
117        })
118    }
119}
120
121#[derive(Clone, Debug)]
122pub enum GcpAuthenticator {
123    Credentials(Arc<InnerCreds>),
124    ApiKey(Box<str>),
125    None,
126}
127
128#[derive(Debug)]
129pub struct InnerCreds {
130    creds: Option<(Credentials, Scope)>,
131    token: RwLock<Token>,
132}
133
134impl GcpAuthenticator {
135    async fn from_file(path: &str, scope: Scope) -> crate::Result<Self> {
136        let creds = Credentials::from_file(path).context(InvalidCredentialsSnafu)?;
137        let token = RwLock::new(fetch_token(&creds, &scope).await?);
138        let creds = Some((creds, scope));
139        Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
140    }
141
142    async fn new_implicit() -> crate::Result<Self> {
143        let token = RwLock::new(get_token_implicit().await?);
144        let creds = None;
145        Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
146    }
147
148    fn from_api_key(api_key: &str) -> crate::Result<Self> {
149        BASE64_URL_SAFE
150            .decode(api_key)
151            .context(InvalidApiKeySnafu)?;
152        Ok(Self::ApiKey(api_key.into()))
153    }
154
155    pub fn make_token(&self) -> Option<String> {
156        match self {
157            Self::Credentials(inner) => Some(inner.make_token()),
158            Self::ApiKey(_) | Self::None => None,
159        }
160    }
161
162    pub fn apply<T>(&self, request: &mut http::Request<T>) {
163        if let Some(token) = self.make_token() {
164            request
165                .headers_mut()
166                .insert(AUTHORIZATION, token.parse().unwrap());
167        }
168        self.apply_uri(request.uri_mut());
169    }
170
171    pub fn apply_uri(&self, uri: &mut Uri) {
172        match self {
173            Self::Credentials(_) | Self::None => (),
174            Self::ApiKey(api_key) => {
175                let mut parts = uri.clone().into_parts();
176                let path = parts
177                    .path_and_query
178                    .as_ref()
179                    .map_or("/", PathAndQuery::path);
180                let paq = format!("{path}?key={api_key}");
181                // The API key is verified above to only contain
182                // URL-safe characters. That key is added to a path
183                // that came from a successfully parsed URI. As such,
184                // re-parsing the string cannot fail.
185                parts.path_and_query =
186                    Some(paq.parse().expect("Could not re-parse path and query"));
187                *uri = Uri::from_parts(parts).expect("Could not re-parse URL");
188            }
189        }
190    }
191
192    pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> {
193        let (sender, receiver) = watch::channel(());
194        tokio::spawn(self.clone().token_regenerator(sender));
195        receiver
196    }
197
198    async fn token_regenerator(self, sender: watch::Sender<()>) {
199        match self {
200            Self::Credentials(inner) => {
201                let mut expires_in = inner.token.read().unwrap().expires_in() as u64;
202                loop {
203                    let deadline = Duration::from_secs(
204                        expires_in
205                            .saturating_sub(METADATA_TOKEN_EXPIRY_MARGIN_SECS)
206                            .max(METADATA_TOKEN_ERROR_RETRY_SECS),
207                    );
208                    debug!(
209                        deadline = deadline.as_secs(),
210                        "Sleeping before refreshing GCP authentication token.",
211                    );
212                    tokio::time::sleep(deadline).await;
213                    match inner.regenerate_token().await {
214                        Ok(()) => {
215                            sender.send_replace(());
216                            debug!("GCP authentication token renewed.");
217                            // Rather than an expected fresh token, the Metadata Server may return
218                            // the same (cached) token during the last 300 seconds of its lifetime.
219                            // This scenario is handled by retrying the token refresh after the
220                            // METADATA_TOKEN_ERROR_RETRY_SECS period when a fresh token is expected
221                            expires_in = inner.token.read().unwrap().expires_in() as u64;
222                        }
223                        Err(error) => {
224                            error!(
225                                message = "Failed to update GCP authentication token.",
226                                %error
227                            );
228                            expires_in = METADATA_TOKEN_EXPIRY_MARGIN_SECS;
229                        }
230                    }
231                }
232            }
233            Self::ApiKey(_) | Self::None => {
234                // This keeps the sender end of the watch open without
235                // actually sending anything, effectively creating an
236                // empty watch stream.
237                sender.closed().await
238            }
239        }
240    }
241}
242
243impl InnerCreds {
244    async fn regenerate_token(&self) -> crate::Result<()> {
245        let token = match &self.creds {
246            Some((creds, scope)) => fetch_token(creds, scope).await?,
247            None => get_token_implicit().await?,
248        };
249        *self.token.write().unwrap() = token;
250        Ok(())
251    }
252
253    fn make_token(&self) -> String {
254        let token = self.token.read().unwrap();
255        format!("{} {}", token.token_type(), token.access_token())
256    }
257}
258
259async fn fetch_token(creds: &Credentials, scope: &Scope) -> crate::Result<Token> {
260    let claims = JwtClaims::new(creds.iss(), &[scope.clone()], creds.token_uri(), None, None);
261    let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?;
262    let jwt = Jwt::new(claims, rsa_key, None);
263
264    debug!(
265        message = "Fetching GCP authentication token.",
266        project = ?creds.project(),
267        iss = ?creds.iss(),
268        token_uri = ?creds.token_uri(),
269    );
270    goauth::get_token(&jwt, creds)
271        .await
272        .context(GetTokenSnafu)
273        .map_err(Into::into)
274}
275
276async fn get_token_implicit() -> Result<Token, GcpError> {
277    debug!("Fetching implicit GCP authentication token.");
278    let req = http::Request::get(SERVICE_ACCOUNT_TOKEN_URL)
279        .header("Metadata-Flavor", "Google")
280        .body(hyper::Body::empty())
281        .unwrap();
282
283    let proxy = ProxyConfig::from_env();
284    let res = HttpClient::new(None, &proxy)
285        .context(BuildHttpClientSnafu)?
286        .send(req)
287        .await
288        .context(GetImplicitTokenSnafu)?;
289
290    let body = res.into_body();
291    let bytes = hyper::body::to_bytes(body)
292        .await
293        .context(GetTokenBytesSnafu)?;
294
295    // Token::from_str is irresponsible and may panic!
296    match serde_json::from_slice::<Token>(&bytes) {
297        Ok(token) => Ok(token),
298        Err(error) => Err(match serde_json::from_slice::<TokenErr>(&bytes) {
299            Ok(error) => GcpError::TokenFromJson { source: error },
300            Err(_) => GcpError::TokenJsonFromStr { source: error },
301        }),
302    }
303}
304
305#[cfg(test)]
306mod tests {
307    use super::*;
308    use crate::assert_downcast_matches;
309
310    #[tokio::test]
311    async fn fails_missing_creds() {
312        let error = build_auth("").await.expect_err("build failed to error");
313        assert_downcast_matches!(error, GcpError, GcpError::GetImplicitToken { .. });
314        // This should be a more relevant error
315    }
316
317    #[tokio::test]
318    async fn skip_authentication() {
319        let auth = build_auth(
320            r#"
321                skip_authentication = true
322                api_key = "testing"
323            "#,
324        )
325        .await
326        .expect("build_auth failed");
327        assert!(matches!(auth, GcpAuthenticator::None));
328    }
329
330    #[tokio::test]
331    async fn uses_api_key() {
332        let key = crate::test_util::random_string(16);
333
334        let auth = build_auth(&format!(r#"api_key = "{key}""#))
335            .await
336            .expect("build_auth failed");
337        assert!(matches!(auth, GcpAuthenticator::ApiKey(..)));
338
339        assert_eq!(
340            apply_uri(&auth, "http://example.com"),
341            format!("http://example.com/?key={key}")
342        );
343        assert_eq!(
344            apply_uri(&auth, "http://example.com/"),
345            format!("http://example.com/?key={key}")
346        );
347        assert_eq!(
348            apply_uri(&auth, "http://example.com/path"),
349            format!("http://example.com/path?key={key}")
350        );
351        assert_eq!(
352            apply_uri(&auth, "http://example.com/path1/"),
353            format!("http://example.com/path1/?key={key}")
354        );
355    }
356
357    #[tokio::test]
358    async fn fails_bad_api_key() {
359        let error = build_auth(r#"api_key = "abc%xyz""#)
360            .await
361            .expect_err("build failed to error");
362        assert_downcast_matches!(error, GcpError, GcpError::InvalidApiKey { .. });
363    }
364
365    fn apply_uri(auth: &GcpAuthenticator, uri: &str) -> String {
366        let mut uri: Uri = uri.parse().unwrap();
367        auth.apply_uri(&mut uri);
368        uri.to_string()
369    }
370
371    async fn build_auth(toml: &str) -> crate::Result<GcpAuthenticator> {
372        let config: GcpAuthConfig = toml::from_str(toml).expect("Invalid TOML");
373        config.build(Scope::Compute).await
374    }
375}