vector/sinks/gcp/stackdriver/metrics/
config.rs1use bytes::Bytes;
2use goauth::scopes::Scope;
3use http::{header::CONTENT_TYPE, Request, Uri};
4
5use super::{
6 request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder},
7 sink::StackdriverMetricsSink,
8};
9use crate::{
10 gcp::{GcpAuthConfig, GcpAuthenticator},
11 http::HttpClient,
12 sinks::{
13 gcp,
14 prelude::*,
15 util::{
16 http::{
17 http_response_retry_logic, HttpRequest, HttpService, HttpServiceRequestBuilder,
18 },
19 service::TowerRequestConfigDefaults,
20 },
21 HTTPRequestBuilderSnafu,
22 },
23};
24use snafu::ResultExt;
25
26#[derive(Clone, Copy, Debug)]
27pub struct StackdriverMetricsTowerRequestConfigDefaults;
28
29impl TowerRequestConfigDefaults for StackdriverMetricsTowerRequestConfigDefaults {
30 const RATE_LIMIT_NUM: u64 = 1_000;
31}
32
33#[configurable_component(sink(
35 "gcp_stackdriver_metrics",
36 "Deliver metrics to GCP's Cloud Monitoring system."
37))]
38#[derive(Clone, Debug, Default)]
39pub struct StackdriverConfig {
40 #[serde(skip, default = "default_endpoint")]
41 pub(super) endpoint: String,
42
43 pub(super) project_id: String,
49
50 pub(super) resource: gcp::GcpTypedResource,
52
53 #[serde(flatten)]
54 pub(super) auth: GcpAuthConfig,
55
56 #[serde(default = "default_metric_namespace_value")]
61 pub(super) default_namespace: String,
62
63 #[configurable(derived)]
64 #[serde(default)]
65 pub(super) request: TowerRequestConfig<StackdriverMetricsTowerRequestConfigDefaults>,
66
67 #[configurable(derived)]
68 #[serde(default)]
69 pub(super) batch: BatchConfig<StackdriverMetricsDefaultBatchSettings>,
70
71 #[configurable(derived)]
72 pub(super) tls: Option<TlsConfig>,
73
74 #[configurable(derived)]
75 #[serde(
76 default,
77 deserialize_with = "crate::serde::bool_or_struct",
78 skip_serializing_if = "crate::serde::is_default"
79 )]
80 pub(super) acknowledgements: AcknowledgementsConfig,
81}
82
83fn default_metric_namespace_value() -> String {
84 "namespace".to_string()
85}
86
87fn default_endpoint() -> String {
88 "https://monitoring.googleapis.com".to_string()
89}
90
91impl_generate_config_from_default!(StackdriverConfig);
92
93#[async_trait::async_trait]
94#[typetag::serde(name = "gcp_stackdriver_metrics")]
95impl SinkConfig for StackdriverConfig {
96 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
97 let auth = self.auth.build(Scope::MonitoringWrite).await?;
98
99 let healthcheck = healthcheck().boxed();
100 let started = chrono::Utc::now();
101 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
102 let client = HttpClient::new(tls_settings, cx.proxy())?;
103
104 let batch_settings = self.batch.validate()?.into_batcher_settings()?;
105
106 let request_builder = StackdriverMetricsRequestBuilder {
107 encoder: StackdriverMetricsEncoder {
108 default_namespace: self.default_namespace.clone(),
109 started,
110 resource: self.resource.clone(),
111 },
112 };
113
114 let request_limits = self.request.into_settings();
115
116 let uri: Uri = format!(
117 "{}/v3/projects/{}/timeSeries",
118 self.endpoint, self.project_id
119 )
120 .parse()?;
121
122 auth.spawn_regenerate_token();
123
124 let stackdriver_metrics_service_request_builder =
125 StackdriverMetricsServiceRequestBuilder { uri, auth };
126
127 let service = HttpService::new(client, stackdriver_metrics_service_request_builder);
128
129 let service = ServiceBuilder::new()
130 .settings(request_limits, http_response_retry_logic())
131 .service(service);
132
133 let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder);
134
135 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
136 }
137
138 fn input(&self) -> Input {
139 Input::metric()
140 }
141
142 fn acknowledgements(&self) -> &AcknowledgementsConfig {
143 &self.acknowledgements
144 }
145}
146
147#[derive(Clone, Copy, Debug, Default)]
148pub struct StackdriverMetricsDefaultBatchSettings;
149
150impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings {
151 const MAX_EVENTS: Option<usize> = Some(1);
152 const MAX_BYTES: Option<usize> = None;
153 const TIMEOUT_SECS: f64 = 1.0;
154}
155
156#[derive(Debug, Clone)]
157pub(super) struct StackdriverMetricsServiceRequestBuilder {
158 pub(super) uri: Uri,
159 pub(super) auth: GcpAuthenticator,
160}
161
162impl HttpServiceRequestBuilder<()> for StackdriverMetricsServiceRequestBuilder {
163 fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
164 let builder = Request::post(self.uri.clone()).header(CONTENT_TYPE, "application/json");
165
166 let mut request = builder
167 .body(request.take_payload())
168 .context(HTTPRequestBuilderSnafu)
169 .map_err(Into::<crate::Error>::into)?;
170
171 self.auth.apply(&mut request);
172
173 Ok(request)
174 }
175}
176
177async fn healthcheck() -> crate::Result<()> {
178 Ok(())
179}