vector/sinks/gcp/stackdriver/metrics/
config.rs

1use bytes::Bytes;
2use goauth::scopes::Scope;
3use http::{Request, Uri, header::CONTENT_TYPE};
4use snafu::ResultExt;
5
6use super::{
7    request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder},
8    sink::StackdriverMetricsSink,
9};
10use crate::{
11    gcp::{GcpAuthConfig, GcpAuthenticator},
12    http::HttpClient,
13    sinks::{
14        HTTPRequestBuilderSnafu, gcp,
15        prelude::*,
16        util::{
17            http::{
18                HttpRequest, HttpService, HttpServiceRequestBuilder, http_response_retry_logic,
19            },
20            service::TowerRequestConfigDefaults,
21        },
22    },
23};
24
25#[derive(Clone, Copy, Debug)]
26pub struct StackdriverMetricsTowerRequestConfigDefaults;
27
28impl TowerRequestConfigDefaults for StackdriverMetricsTowerRequestConfigDefaults {
29    const RATE_LIMIT_NUM: u64 = 1_000;
30}
31
32/// Configuration for the `gcp_stackdriver_metrics` sink.
33#[configurable_component(sink(
34    "gcp_stackdriver_metrics",
35    "Deliver metrics to GCP's Cloud Monitoring system."
36))]
37#[derive(Clone, Debug, Default)]
38pub struct StackdriverConfig {
39    #[serde(skip, default = "default_endpoint")]
40    pub(super) endpoint: String,
41
42    /// The project ID to which to publish metrics.
43    ///
44    /// See the [Google Cloud Platform project management documentation][project_docs] for more details.
45    ///
46    /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects
47    pub(super) project_id: String,
48
49    /// The monitored resource to associate the metrics with.
50    pub(super) resource: gcp::GcpTypedResource,
51
52    #[serde(flatten)]
53    pub(super) auth: GcpAuthConfig,
54
55    /// The default namespace to use for metrics that do not have one.
56    ///
57    /// Metrics with the same name can only be differentiated by their namespace, and not all
58    /// metrics have their own namespace.
59    #[serde(default = "default_metric_namespace_value")]
60    pub(super) default_namespace: String,
61
62    #[configurable(derived)]
63    #[serde(default)]
64    pub(super) request: TowerRequestConfig<StackdriverMetricsTowerRequestConfigDefaults>,
65
66    #[configurable(derived)]
67    #[serde(default)]
68    pub(super) batch: BatchConfig<StackdriverMetricsDefaultBatchSettings>,
69
70    #[configurable(derived)]
71    pub(super) tls: Option<TlsConfig>,
72
73    #[configurable(derived)]
74    #[serde(
75        default,
76        deserialize_with = "crate::serde::bool_or_struct",
77        skip_serializing_if = "crate::serde::is_default"
78    )]
79    pub(super) acknowledgements: AcknowledgementsConfig,
80}
81
82fn default_metric_namespace_value() -> String {
83    "namespace".to_string()
84}
85
86fn default_endpoint() -> String {
87    "https://monitoring.googleapis.com".to_string()
88}
89
90impl_generate_config_from_default!(StackdriverConfig);
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "gcp_stackdriver_metrics")]
94impl SinkConfig for StackdriverConfig {
95    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
96        let auth = self.auth.build(Scope::MonitoringWrite).await?;
97
98        let healthcheck = healthcheck().boxed();
99        let started = chrono::Utc::now();
100        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
101        let client = HttpClient::new(tls_settings, cx.proxy())?;
102
103        let batch_settings = self.batch.validate()?.into_batcher_settings()?;
104
105        let request_builder = StackdriverMetricsRequestBuilder {
106            encoder: StackdriverMetricsEncoder {
107                default_namespace: self.default_namespace.clone(),
108                started,
109                resource: self.resource.clone(),
110            },
111        };
112
113        let request_limits = self.request.into_settings();
114
115        let uri: Uri = format!(
116            "{}/v3/projects/{}/timeSeries",
117            self.endpoint, self.project_id
118        )
119        .parse()?;
120
121        auth.spawn_regenerate_token();
122
123        let stackdriver_metrics_service_request_builder =
124            StackdriverMetricsServiceRequestBuilder { uri, auth };
125
126        let service = HttpService::new(client, stackdriver_metrics_service_request_builder);
127
128        let service = ServiceBuilder::new()
129            .settings(request_limits, http_response_retry_logic())
130            .service(service);
131
132        let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder);
133
134        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
135    }
136
137    fn input(&self) -> Input {
138        Input::metric()
139    }
140
141    fn acknowledgements(&self) -> &AcknowledgementsConfig {
142        &self.acknowledgements
143    }
144}
145
146#[derive(Clone, Copy, Debug, Default)]
147pub struct StackdriverMetricsDefaultBatchSettings;
148
149impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings {
150    const MAX_EVENTS: Option<usize> = Some(1);
151    const MAX_BYTES: Option<usize> = None;
152    const TIMEOUT_SECS: f64 = 1.0;
153}
154
155#[derive(Debug, Clone)]
156pub(super) struct StackdriverMetricsServiceRequestBuilder {
157    pub(super) uri: Uri,
158    pub(super) auth: GcpAuthenticator,
159}
160
161impl HttpServiceRequestBuilder<()> for StackdriverMetricsServiceRequestBuilder {
162    fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
163        let builder = Request::post(self.uri.clone()).header(CONTENT_TYPE, "application/json");
164
165        let mut request = builder
166            .body(request.take_payload())
167            .context(HTTPRequestBuilderSnafu)
168            .map_err(Into::<crate::Error>::into)?;
169
170        self.auth.apply(&mut request);
171
172        Ok(request)
173    }
174}
175
176async fn healthcheck() -> crate::Result<()> {
177    Ok(())
178}