vector/
gcp.rs

1#![allow(missing_docs)]
2use std::{
3    sync::{Arc, LazyLock, RwLock},
4    time::Duration,
5};
6
7use base64::prelude::{BASE64_URL_SAFE, Engine as _};
8pub use goauth::scopes::Scope;
9use goauth::{
10    GoErr,
11    auth::{JwtClaims, Token, TokenErr},
12    credentials::Credentials,
13};
14use http::{Uri, uri::PathAndQuery};
15use http_body::{Body as _, Collected};
16use hyper::header::AUTHORIZATION;
17use smpl_jwt::Jwt;
18use snafu::{ResultExt, Snafu};
19use tokio::sync::watch;
20use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
21
22use crate::{
23    config::ProxyConfig,
24    http::{HttpClient, HttpError},
25};
26
27const SERVICE_ACCOUNT_TOKEN_URL: &str =
28    "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token";
29
30// See https://cloud.google.com/compute/docs/access/authenticate-workloads#applications
31const METADATA_TOKEN_EXPIRY_MARGIN_SECS: u64 = 200;
32
33const METADATA_TOKEN_ERROR_RETRY_SECS: u64 = 2;
34
35pub const PUBSUB_URL: &str = "https://pubsub.googleapis.com";
36
37pub static PUBSUB_ADDRESS: LazyLock<String> = LazyLock::new(|| {
38    std::env::var("EMULATOR_ADDRESS").unwrap_or_else(|_| "http://localhost:8681".into())
39});
40
41#[derive(Debug, Snafu)]
42#[snafu(visibility(pub))]
43pub enum GcpError {
44    #[snafu(display("This requires one of api_key or credentials_path to be defined"))]
45    MissingAuth,
46    #[snafu(display("Invalid GCP credentials: {}", source))]
47    InvalidCredentials { source: GoErr },
48    #[snafu(display("Invalid GCP API key: {}", source))]
49    InvalidApiKey { source: base64::DecodeError },
50    #[snafu(display("Healthcheck endpoint forbidden"))]
51    HealthcheckForbidden,
52    #[snafu(display("Invalid RSA key in GCP credentials: {}", source))]
53    InvalidRsaKey { source: GoErr },
54    #[snafu(display("Failed to get OAuth token: {}", source))]
55    GetToken { source: GoErr },
56    #[snafu(display("Failed to get OAuth token text: {}", source))]
57    GetTokenBytes { source: hyper::Error },
58    #[snafu(display("Failed to get implicit GCP token: {}", source))]
59    GetImplicitToken { source: HttpError },
60    #[snafu(display("Failed to parse OAuth token JSON: {}", source))]
61    TokenFromJson { source: TokenErr },
62    #[snafu(display("Failed to parse OAuth token JSON text: {}", source))]
63    TokenJsonFromStr { source: serde_json::Error },
64    #[snafu(display("Failed to build HTTP client: {}", source))]
65    BuildHttpClient { source: HttpError },
66}
67
68/// Configuration of the authentication strategy for interacting with GCP services.
69// TODO: We're duplicating the "either this or that" verbiage for each field because this struct gets flattened into the
70// component config types, which means all that's carried over are the fields, not the type itself.
71//
72// Seems like we really really have it as a nested field -- i.e. `auth.api_key` -- which is a closer fit to how we do
73// similar things in configuration (TLS, framing, decoding, etc.). Doing so would let us embed the type itself, and
74// hoist up the common documentation bits to the docs for the type rather than the fields.
75#[configurable_component]
76#[derive(Clone, Debug, Default)]
77pub struct GcpAuthConfig {
78    /// An [API key][gcp_api_key].
79    ///
80    /// Either an API key or a path to a service account credentials JSON file can be specified.
81    ///
82    /// If both are unset, the `GOOGLE_APPLICATION_CREDENTIALS` environment variable is checked for a filename. If no
83    /// filename is named, an attempt is made to fetch an instance service account for the compute instance the program is
84    /// running on. If this is not on a GCE instance, then you must define it with an API key or service account
85    /// credentials JSON file.
86    ///
87    /// [gcp_api_key]: https://cloud.google.com/docs/authentication/api-keys
88    pub api_key: Option<SensitiveString>,
89
90    /// Path to a [service account][gcp_service_account_credentials] credentials JSON file.
91    ///
92    /// Either an API key or a path to a service account credentials JSON file can be specified.
93    ///
94    /// If both are unset, the `GOOGLE_APPLICATION_CREDENTIALS` environment variable is checked for a filename. If no
95    /// filename is named, an attempt is made to fetch an instance service account for the compute instance the program is
96    /// running on. If this is not on a GCE instance, then you must define it with an API key or service account
97    /// credentials JSON file.
98    ///
99    /// [gcp_service_account_credentials]: https://cloud.google.com/docs/authentication/production#manually
100    pub credentials_path: Option<String>,
101
102    /// Skip all authentication handling. For use with integration tests only.
103    #[serde(default, skip_serializing)]
104    #[configurable(metadata(docs::hidden))]
105    pub skip_authentication: bool,
106}
107
108impl GcpAuthConfig {
109    pub async fn build(&self, scope: Scope) -> crate::Result<GcpAuthenticator> {
110        Ok(if self.skip_authentication {
111            GcpAuthenticator::None
112        } else {
113            let gap = std::env::var("GOOGLE_APPLICATION_CREDENTIALS").ok();
114            let creds_path = self.credentials_path.as_ref().or(gap.as_ref());
115            match (&creds_path, &self.api_key) {
116                (Some(path), _) => GcpAuthenticator::from_file(path, scope).await?,
117                (None, Some(api_key)) => GcpAuthenticator::from_api_key(api_key.inner())?,
118                (None, None) => GcpAuthenticator::new_implicit().await?,
119            }
120        })
121    }
122}
123
124#[derive(Clone, Debug)]
125pub enum GcpAuthenticator {
126    Credentials(Arc<InnerCreds>),
127    ApiKey(Box<str>),
128    None,
129}
130
131#[derive(Debug)]
132pub struct InnerCreds {
133    creds: Option<(Credentials, Scope)>,
134    token: RwLock<Token>,
135}
136
137impl GcpAuthenticator {
138    async fn from_file(path: &str, scope: Scope) -> crate::Result<Self> {
139        let creds = Credentials::from_file(path).context(InvalidCredentialsSnafu)?;
140        let token = RwLock::new(fetch_token(&creds, &scope).await?);
141        let creds = Some((creds, scope));
142        Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
143    }
144
145    async fn new_implicit() -> crate::Result<Self> {
146        let token = RwLock::new(get_token_implicit().await?);
147        let creds = None;
148        Ok(Self::Credentials(Arc::new(InnerCreds { creds, token })))
149    }
150
151    fn from_api_key(api_key: &str) -> crate::Result<Self> {
152        BASE64_URL_SAFE
153            .decode(api_key)
154            .context(InvalidApiKeySnafu)?;
155        Ok(Self::ApiKey(api_key.into()))
156    }
157
158    pub fn make_token(&self) -> Option<String> {
159        match self {
160            Self::Credentials(inner) => Some(inner.make_token()),
161            Self::ApiKey(_) | Self::None => None,
162        }
163    }
164
165    pub fn apply<T>(&self, request: &mut http::Request<T>) {
166        if let Some(token) = self.make_token() {
167            request
168                .headers_mut()
169                .insert(AUTHORIZATION, token.parse().unwrap());
170        }
171        self.apply_uri(request.uri_mut());
172    }
173
174    pub fn apply_uri(&self, uri: &mut Uri) {
175        match self {
176            Self::Credentials(_) | Self::None => (),
177            Self::ApiKey(api_key) => {
178                let mut parts = uri.clone().into_parts();
179                let path = parts
180                    .path_and_query
181                    .as_ref()
182                    .map_or("/", PathAndQuery::path);
183                let paq = format!("{path}?key={api_key}");
184                // The API key is verified above to only contain
185                // URL-safe characters. That key is added to a path
186                // that came from a successfully parsed URI. As such,
187                // re-parsing the string cannot fail.
188                parts.path_and_query =
189                    Some(paq.parse().expect("Could not re-parse path and query"));
190                *uri = Uri::from_parts(parts).expect("Could not re-parse URL");
191            }
192        }
193    }
194
195    pub fn spawn_regenerate_token(&self) -> watch::Receiver<()> {
196        let (sender, receiver) = watch::channel(());
197        tokio::spawn(self.clone().token_regenerator(sender));
198        receiver
199    }
200
201    async fn token_regenerator(self, sender: watch::Sender<()>) {
202        match self {
203            Self::Credentials(inner) => {
204                let mut expires_in = inner.token.read().unwrap().expires_in() as u64;
205                loop {
206                    let deadline = Duration::from_secs(
207                        expires_in
208                            .saturating_sub(METADATA_TOKEN_EXPIRY_MARGIN_SECS)
209                            .max(METADATA_TOKEN_ERROR_RETRY_SECS),
210                    );
211                    debug!(
212                        deadline = deadline.as_secs(),
213                        "Sleeping before refreshing GCP authentication token.",
214                    );
215                    tokio::time::sleep(deadline).await;
216                    match inner.regenerate_token().await {
217                        Ok(()) => {
218                            sender.send_replace(());
219                            debug!("GCP authentication token renewed.");
220                            // Rather than an expected fresh token, the Metadata Server may return
221                            // the same (cached) token during the last 300 seconds of its lifetime.
222                            // This scenario is handled by retrying the token refresh after the
223                            // METADATA_TOKEN_ERROR_RETRY_SECS period when a fresh token is expected
224                            expires_in = inner.token.read().unwrap().expires_in() as u64;
225                        }
226                        Err(error) => {
227                            error!(
228                                message = "Failed to update GCP authentication token.",
229                                %error
230                            );
231                            expires_in = METADATA_TOKEN_EXPIRY_MARGIN_SECS;
232                        }
233                    }
234                }
235            }
236            Self::ApiKey(_) | Self::None => {
237                // This keeps the sender end of the watch open without
238                // actually sending anything, effectively creating an
239                // empty watch stream.
240                sender.closed().await
241            }
242        }
243    }
244}
245
246impl InnerCreds {
247    async fn regenerate_token(&self) -> crate::Result<()> {
248        let token = match &self.creds {
249            Some((creds, scope)) => fetch_token(creds, scope).await?,
250            None => get_token_implicit().await?,
251        };
252        *self.token.write().unwrap() = token;
253        Ok(())
254    }
255
256    fn make_token(&self) -> String {
257        let token = self.token.read().unwrap();
258        format!("{} {}", token.token_type(), token.access_token())
259    }
260}
261
262async fn fetch_token(creds: &Credentials, scope: &Scope) -> crate::Result<Token> {
263    let claims = JwtClaims::new(
264        creds.iss(),
265        std::slice::from_ref(scope),
266        creds.token_uri(),
267        None,
268        None,
269    );
270    let rsa_key = creds.rsa_key().context(InvalidRsaKeySnafu)?;
271    let jwt = Jwt::new(claims, rsa_key, None);
272
273    debug!(
274        message = "Fetching GCP authentication token.",
275        project = ?creds.project(),
276        iss = ?creds.iss(),
277        token_uri = ?creds.token_uri(),
278    );
279    goauth::get_token(&jwt, creds)
280        .await
281        .context(GetTokenSnafu)
282        .map_err(Into::into)
283}
284
285async fn get_token_implicit() -> Result<Token, GcpError> {
286    debug!("Fetching implicit GCP authentication token.");
287    let req = http::Request::get(SERVICE_ACCOUNT_TOKEN_URL)
288        .header("Metadata-Flavor", "Google")
289        .body(hyper::Body::empty())
290        .unwrap();
291
292    let proxy = ProxyConfig::from_env();
293    let res = HttpClient::new(None, &proxy)
294        .context(BuildHttpClientSnafu)?
295        .send(req)
296        .await
297        .context(GetImplicitTokenSnafu)?;
298
299    let body = res.into_body();
300    let bytes = body
301        .collect()
302        .await
303        .map(Collected::to_bytes)
304        .context(GetTokenBytesSnafu)?;
305
306    // Token::from_str is irresponsible and may panic!
307    match serde_json::from_slice::<Token>(&bytes) {
308        Ok(token) => Ok(token),
309        Err(error) => Err(match serde_json::from_slice::<TokenErr>(&bytes) {
310            Ok(error) => GcpError::TokenFromJson { source: error },
311            Err(_) => GcpError::TokenJsonFromStr { source: error },
312        }),
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::assert_downcast_matches;
320
321    #[tokio::test]
322    async fn fails_missing_creds() {
323        let error = build_auth("").await.expect_err("build failed to error");
324        assert_downcast_matches!(error, GcpError, GcpError::GetImplicitToken { .. });
325        // This should be a more relevant error
326    }
327
328    #[tokio::test]
329    async fn skip_authentication() {
330        let auth = build_auth(
331            r#"
332                skip_authentication = true
333                api_key = "testing"
334            "#,
335        )
336        .await
337        .expect("build_auth failed");
338        assert!(matches!(auth, GcpAuthenticator::None));
339    }
340
341    #[tokio::test]
342    async fn uses_api_key() {
343        let key = crate::test_util::random_string(16);
344
345        let auth = build_auth(&format!(r#"api_key = "{key}""#))
346            .await
347            .expect("build_auth failed");
348        assert!(matches!(auth, GcpAuthenticator::ApiKey(..)));
349
350        assert_eq!(
351            apply_uri(&auth, "http://example.com"),
352            format!("http://example.com/?key={key}")
353        );
354        assert_eq!(
355            apply_uri(&auth, "http://example.com/"),
356            format!("http://example.com/?key={key}")
357        );
358        assert_eq!(
359            apply_uri(&auth, "http://example.com/path"),
360            format!("http://example.com/path?key={key}")
361        );
362        assert_eq!(
363            apply_uri(&auth, "http://example.com/path1/"),
364            format!("http://example.com/path1/?key={key}")
365        );
366    }
367
368    #[tokio::test]
369    async fn fails_bad_api_key() {
370        let error = build_auth(r#"api_key = "abc%xyz""#)
371            .await
372            .expect_err("build failed to error");
373        assert_downcast_matches!(error, GcpError, GcpError::InvalidApiKey { .. });
374    }
375
376    fn apply_uri(auth: &GcpAuthenticator, uri: &str) -> String {
377        let mut uri: Uri = uri.parse().unwrap();
378        auth.apply_uri(&mut uri);
379        uri.to_string()
380    }
381
382    async fn build_auth(toml: &str) -> crate::Result<GcpAuthenticator> {
383        let config: GcpAuthConfig = toml::from_str(toml).expect("Invalid TOML");
384        config.build(Scope::Compute).await
385    }
386}