vector/sinks/gcp/stackdriver/metrics/
config.rs1use bytes::Bytes;
2use goauth::scopes::Scope;
3use http::{Request, Uri, header::CONTENT_TYPE};
4use snafu::ResultExt;
5
6use super::{
7 request_builder::{StackdriverMetricsEncoder, StackdriverMetricsRequestBuilder},
8 sink::StackdriverMetricsSink,
9};
10use crate::{
11 gcp::{GcpAuthConfig, GcpAuthenticator},
12 http::HttpClient,
13 sinks::{
14 HTTPRequestBuilderSnafu, gcp,
15 prelude::*,
16 util::{
17 http::{
18 HttpRequest, HttpService, HttpServiceRequestBuilder, http_response_retry_logic,
19 },
20 service::TowerRequestConfigDefaults,
21 },
22 },
23};
24
25#[derive(Clone, Copy, Debug)]
26pub struct StackdriverMetricsTowerRequestConfigDefaults;
27
28impl TowerRequestConfigDefaults for StackdriverMetricsTowerRequestConfigDefaults {
29 const RATE_LIMIT_NUM: u64 = 1_000;
30}
31
32#[configurable_component(sink(
34 "gcp_stackdriver_metrics",
35 "Deliver metrics to GCP's Cloud Monitoring system."
36))]
37#[derive(Clone, Debug, Default)]
38pub struct StackdriverConfig {
39 #[serde(skip, default = "default_endpoint")]
40 pub(super) endpoint: String,
41
42 pub(super) project_id: String,
48
49 pub(super) resource: gcp::GcpTypedResource,
51
52 #[serde(flatten)]
53 pub(super) auth: GcpAuthConfig,
54
55 #[serde(default = "default_metric_namespace_value")]
60 pub(super) default_namespace: String,
61
62 #[configurable(derived)]
63 #[serde(default)]
64 pub(super) request: TowerRequestConfig<StackdriverMetricsTowerRequestConfigDefaults>,
65
66 #[configurable(derived)]
67 #[serde(default)]
68 pub(super) batch: BatchConfig<StackdriverMetricsDefaultBatchSettings>,
69
70 #[configurable(derived)]
71 pub(super) tls: Option<TlsConfig>,
72
73 #[configurable(derived)]
74 #[serde(
75 default,
76 deserialize_with = "crate::serde::bool_or_struct",
77 skip_serializing_if = "crate::serde::is_default"
78 )]
79 pub(super) acknowledgements: AcknowledgementsConfig,
80}
81
82fn default_metric_namespace_value() -> String {
83 "namespace".to_string()
84}
85
86fn default_endpoint() -> String {
87 "https://monitoring.googleapis.com".to_string()
88}
89
90impl_generate_config_from_default!(StackdriverConfig);
91
92#[async_trait::async_trait]
93#[typetag::serde(name = "gcp_stackdriver_metrics")]
94impl SinkConfig for StackdriverConfig {
95 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
96 let auth = self.auth.build(Scope::MonitoringWrite).await?;
97
98 let healthcheck = healthcheck().boxed();
99 let started = chrono::Utc::now();
100 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
101 let client = HttpClient::new(tls_settings, cx.proxy())?;
102
103 let batch_settings = self.batch.validate()?.into_batcher_settings()?;
104
105 let request_builder = StackdriverMetricsRequestBuilder {
106 encoder: StackdriverMetricsEncoder {
107 default_namespace: self.default_namespace.clone(),
108 started,
109 resource: self.resource.clone(),
110 },
111 };
112
113 let request_limits = self.request.into_settings();
114
115 let uri: Uri = format!(
116 "{}/v3/projects/{}/timeSeries",
117 self.endpoint, self.project_id
118 )
119 .parse()?;
120
121 auth.spawn_regenerate_token();
122
123 let stackdriver_metrics_service_request_builder =
124 StackdriverMetricsServiceRequestBuilder { uri, auth };
125
126 let service = HttpService::new(client, stackdriver_metrics_service_request_builder);
127
128 let service = ServiceBuilder::new()
129 .settings(request_limits, http_response_retry_logic())
130 .service(service);
131
132 let sink = StackdriverMetricsSink::new(service, batch_settings, request_builder);
133
134 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
135 }
136
137 fn input(&self) -> Input {
138 Input::metric()
139 }
140
141 fn acknowledgements(&self) -> &AcknowledgementsConfig {
142 &self.acknowledgements
143 }
144}
145
146#[derive(Clone, Copy, Debug, Default)]
147pub struct StackdriverMetricsDefaultBatchSettings;
148
149impl SinkBatchSettings for StackdriverMetricsDefaultBatchSettings {
150 const MAX_EVENTS: Option<usize> = Some(1);
151 const MAX_BYTES: Option<usize> = None;
152 const TIMEOUT_SECS: f64 = 1.0;
153}
154
155#[derive(Debug, Clone)]
156pub(super) struct StackdriverMetricsServiceRequestBuilder {
157 pub(super) uri: Uri,
158 pub(super) auth: GcpAuthenticator,
159}
160
161impl HttpServiceRequestBuilder<()> for StackdriverMetricsServiceRequestBuilder {
162 fn build(&self, mut request: HttpRequest<()>) -> Result<Request<Bytes>, crate::Error> {
163 let builder = Request::post(self.uri.clone()).header(CONTENT_TYPE, "application/json");
164
165 let mut request = builder
166 .body(request.take_payload())
167 .context(HTTPRequestBuilderSnafu)
168 .map_err(Into::<crate::Error>::into)?;
169
170 self.auth.apply(&mut request);
171
172 Ok(request)
173 }
174}
175
176async fn healthcheck() -> crate::Result<()> {
177 Ok(())
178}