vector/sinks/gcp/stackdriver/metrics/
config.rs

1use bytes::Bytes;
2use goauth::scopes::Scope;
3use http::{header::CONTENT_TYPE, Request, Uri};
4
5use super::{
6    request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder},
7    sink::StackdriverMetricsSink,
8};
9use crate::{
10    gcp::{GcpAuthConfig, GcpAuthenticator},
11    http::HttpClient,
12    sinks::{
13        gcp,
14        prelude::*,
15        util::{
16            http::{
17                http_response_retry_logic, HttpRequest, HttpService, HttpServiceRequestBuilder,
18            },
19            service::TowerRequestConfigDefaults,
20        },
21        HTTPRequestBuilderSnafu,
22    },
23};
24use snafu::ResultExt;
25
26#[derive(Clone, Copy, Debug)]
27pub struct StackdriverMetricsTowerRequestConfigDefaults;
28
29impl TowerRequestConfigDefaults for StackdriverMetricsTowerRequestConfigDefaults {
30    const RATE_LIMIT_NUM: u64 = 1_000;
31}
32
33/// Configuration for the `gcp_stackdriver_metrics` sink.
34#[configurable_component(sink(
35    "gcp_stackdriver_metrics",
36    "Deliver metrics to GCP's Cloud Monitoring system."
37))]
38#[derive(Clone, Debug, Default)]
39pub struct StackdriverConfig {
40    #[serde(skip, default = "default_endpoint")]
41    pub(super) endpoint: String,
42
43    /// The project ID to which to publish metrics.
44    ///
45    /// See the [Google Cloud Platform project management documentation][project_docs] for more details.
46    ///
47    /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects
48    pub(super) project_id: String,
49
50    /// The monitored resource to associate the metrics with.
51    pub(super) resource: gcp::GcpTypedResource,
52
53    #[serde(flatten)]
54    pub(super) auth: GcpAuthConfig,
55
56    /// The default namespace to use for metrics that do not have one.
57    ///
58    /// Metrics with the same name can only be differentiated by their namespace, and not all
59    /// metrics have their own namespace.
60    #[serde(default = "default_metric_namespace_value")]
61    pub(super) default_namespace: String,
62
63    #[configurable(derived)]
64    #[serde(default)]
65    pub(super) request: TowerRequestConfig<StackdriverMetricsTowerRequestConfigDefaults>,
66
67    #[configurable(derived)]
68    #[serde(default)]
69    pub(super) batch: BatchConfig<StackdriverMetricsDefaultBatchSettings>,
70
71    #[configurable(derived)]
72    pub(super) tls: Option<TlsConfig>,
73
74    #[configurable(derived)]
75    #[serde(
76        default,
77        deserialize_with = "crate::serde::bool_or_struct",
78        skip_serializing_if = "crate::serde::is_default"
79    )]
80    pub(super) acknowledgements: AcknowledgementsConfig,
81}
82
83fn default_metric_namespace_value() -> String {
84    "namespace".to_string()
85}
86
87fn default_endpoint() -> String {
88    "https://monitoring.googleapis.com".to_string()
89}
90
91impl_generate_config_from_default!(StackdriverConfig);
92
93#[async_trait::async_trait]
94#[typetag::serde(name = "gcp_stackdriver_metrics")]
95impl SinkConfig for StackdriverConfig {
96    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
97        let auth = self.auth.build(Scope::MonitoringWrite).await?;
98
99        let healthcheck = healthcheck().boxed();
100        let started = chrono::Utc::now();
101        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
102        let client = HttpClient::new(tls_settings, cx.proxy())?;
103
104        let batch_settings = self.batch.validate()?.into_batcher_settings()?;
105
106        let request_builder = StackdriverMetricsRequestBuilder {
107            encoder: StackdriverMetricsEncoder {
108                default_namespace: self.default_namespace.clone(),
109                started,
110                resource: self.resource.clone(),
111            },
112        };
113
114        let request_limits = self.request.into_settings();
115
116        let uri: Uri = format!(
117            "{}/v3/projects/{}/timeSeries",
118            self.endpoint, self.project_id
119        )
120        .parse()?;
121
122        auth.spawn_regenerate_token();
123
124        let stackdriver_metrics_service_request_builder =
125            StackdriverMetricsServiceRequestBuilder { uri, auth };
126
127        let service = HttpService::new(client, stackdriver_metrics_service_request_builder);
128
129        let service = ServiceBuilder::new()
130            .settings(request_limits, http_response_retry_logic())
131            .service(service);
132
133        let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder);
134
135        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
136    }
137
138    fn input(&self) -> Input {
139        Input::metric()
140    }
141
142    fn acknowledgements(&self) -> &AcknowledgementsConfig {
143        &self.acknowledgements
144    }
145}
146
147#[derive(Clone, Copy, Debug, Default)]
148pub struct StackdriverMetricsDefaultBatchSettings;
149
150impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings {
151    const MAX_EVENTS: Option<usize> = Some(1);
152    const MAX_BYTES: Option<usize> = None;
153    const TIMEOUT_SECS: f64 = 1.0;
154}
155
156#[derive(Debug, Clone)]
157pub(super) struct StackdriverMetricsServiceRequestBuilder {
158    pub(super) uri: Uri,
159    pub(super) auth: GcpAuthenticator,
160}
161
162impl HttpServiceRequestBuilder<()> for StackdriverMetricsServiceRequestBuilder {
163    fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
164        let builder = Request::post(self.uri.clone()).header(CONTENT_TYPE, "application/json");
165
166        let mut request = builder
167            .body(request.take_payload())
168            .context(HTTPRequestBuilderSnafu)
169            .map_err(Into::<crate::Error>::into)?;
170
171        self.auth.apply(&mut request);
172
173        Ok(request)
174    }
175}
176
177async fn healthcheck() -> crate::Result<()> {
178    Ok(())
179}