vector/sinks/prometheus/remote_write/
config.rs1use std::{collections::BTreeMap, sync::Arc};
2
3use http::{HeaderValue, Uri, header::AUTHORIZATION};
4use snafu::prelude::*;
5
6#[cfg(feature = "aws-core")]
7use super::Errors;
8use super::{
9 service::{RemoteWriteService, build_request},
10 sink::{PrometheusRemoteWriteDefaultBatchSettings, RemoteWriteSink},
11};
12use crate::{
13 http::HttpClient,
14 sinks::{
15 UriParseSnafu,
16 prelude::*,
17 prometheus::PrometheusRemoteWriteAuth,
18 util::{
19 auth::Auth,
20 http::{OrderedHeaderName, http_response_retry_logic},
21 service::TowerRequestConfig,
22 },
23 },
24};
25
26#[configurable_component]
28#[derive(Clone, Copy, Debug, Derivative)]
29#[derivative(Default)]
30pub struct RemoteWriteBatchConfig {
31 #[configurable(derived)]
32 #[serde(flatten)]
33 pub batch_settings: BatchConfig<PrometheusRemoteWriteDefaultBatchSettings>,
34
35 #[serde(default = "crate::serde::default_true")]
37 #[derivative(Default(value = "true"))]
38 pub aggregate: bool,
39}
40
41#[configurable_component(sink(
43 "prometheus_remote_write",
44 "Deliver metric data to a Prometheus remote write endpoint."
45))]
46#[derive(Clone, Debug, Derivative)]
47#[derivative(Default)]
48#[serde(deny_unknown_fields)]
49pub struct RemoteWriteConfig {
50 #[configurable(metadata(docs::examples = "https://localhost:8087/api/v1/write"))]
54 pub endpoint: String,
55
56 #[configurable(metadata(docs::examples = "service"))]
65 #[configurable(metadata(docs::advanced))]
66 pub default_namespace: Option<String>,
67
68 #[serde(default = "crate::sinks::prometheus::default_histogram_buckets")]
72 #[configurable(metadata(docs::advanced))]
73 pub buckets: Vec<f64>,
74
75 #[serde(default = "crate::sinks::prometheus::default_summary_quantiles")]
79 #[configurable(metadata(docs::advanced))]
80 pub quantiles: Vec<f64>,
81
82 #[configurable(derived)]
83 #[serde(default)]
84 pub batch: RemoteWriteBatchConfig,
85
86 #[configurable(derived)]
87 #[serde(default)]
88 pub request: RemoteWriteRequestConfig,
89
90 #[serde(default)]
96 #[configurable(metadata(docs::examples = "my-domain"))]
97 #[configurable(metadata(docs::advanced))]
98 pub tenant_id: Option<Template>,
99
100 #[serde(skip_serializing_if = "crate::serde::is_default")]
105 #[configurable(metadata(docs::common = false, docs::required = false))]
106 pub expire_metrics_secs: Option<f64>,
107
108 #[configurable(derived)]
109 pub tls: Option<TlsConfig>,
110
111 #[configurable(derived)]
112 pub auth: Option<PrometheusRemoteWriteAuth>,
113
114 #[cfg(feature = "aws-config")]
115 #[configurable(derived)]
116 #[configurable(metadata(docs::advanced))]
117 pub aws: Option<crate::aws::RegionOrEndpoint>,
118
119 #[configurable(derived)]
120 #[serde(
121 default,
122 deserialize_with = "crate::serde::bool_or_struct",
123 skip_serializing_if = "crate::serde::is_default"
124 )]
125 pub acknowledgements: AcknowledgementsConfig,
126
127 #[configurable(derived)]
128 #[configurable(metadata(docs::advanced))]
129 #[serde(default = "default_compression")]
130 #[derivative(Default(value = "default_compression()"))]
131 pub compression: Compression,
132}
133
134const fn default_compression() -> Compression {
135 Compression::Snappy
136}
137
138impl_generate_config_from_default!(RemoteWriteConfig);
139
140#[configurable_component]
142#[derive(Clone, Debug, Default)]
143#[serde(default)]
144pub struct RemoteWriteRequestConfig {
145 #[serde(flatten)]
146 pub tower: TowerRequestConfig,
147
148 #[serde(default)]
152 #[configurable(metadata(
153 docs::additional_props_description = "An HTTP request header and its static value."
154 ))]
155 #[configurable(metadata(docs::examples = "remote_write_headers_examples()"))]
156 pub headers: BTreeMap<String, String>,
157}
158
159fn remote_write_headers_examples() -> BTreeMap<String, String> {
160 BTreeMap::from([
161 ("Accept".to_string(), "text/plain".to_string()),
162 ("X-My-Custom-Header".to_string(), "A-Value".to_string()),
163 ])
164}
165
166fn validate_headers(
167 headers: &BTreeMap<String, String>,
168 configures_auth: bool,
169) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
170 let headers = crate::sinks::util::http::validate_headers(headers)?;
171
172 for name in headers.keys() {
173 if configures_auth && name.inner() == AUTHORIZATION {
174 return Err("Authorization header can not be used with defined auth options".into());
175 }
176 }
177
178 Ok(headers)
179}
180
181#[async_trait::async_trait]
182#[typetag::serde(name = "prometheus_remote_write")]
183impl SinkConfig for RemoteWriteConfig {
184 fn acknowledgements(&self) -> &AcknowledgementsConfig {
185 &self.acknowledgements
186 }
187
188 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
189 let endpoint = self.endpoint.parse::<Uri>().context(UriParseSnafu)?;
190 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
191 let request_settings = self.request.tower.into_settings();
192 let validated_headers = Arc::new(validate_headers(
193 &self.request.headers,
194 self.auth.is_some(),
195 )?);
196 let buckets = self.buckets.clone();
197 let quantiles = self.quantiles.clone();
198 let default_namespace = self.default_namespace.clone();
199
200 let client = HttpClient::new(tls_settings, cx.proxy())?;
201
202 let auth = match &self.auth {
203 Some(PrometheusRemoteWriteAuth::Basic { user, password }) => {
204 Some(Auth::Basic(crate::http::Auth::Basic {
205 user: user.clone(),
206 password: password.clone().into(),
207 }))
208 }
209 Some(PrometheusRemoteWriteAuth::Bearer { token }) => {
210 Some(Auth::Basic(crate::http::Auth::Bearer {
211 token: token.clone(),
212 }))
213 }
214 #[cfg(feature = "aws-core")]
215 Some(PrometheusRemoteWriteAuth::Aws(aws_auth)) => {
216 let region = self
217 .aws
218 .as_ref()
219 .map(|config| config.region())
220 .ok_or(Errors::AwsRegionRequired)?
221 .ok_or(Errors::AwsRegionRequired)?;
222 Some(Auth::Aws {
223 credentials_provider: aws_auth
224 .credentials_provider(region.clone(), cx.proxy(), self.tls.as_ref())
225 .await?,
226 region,
227 })
228 }
229 None => None,
230 };
231
232 let healthcheck_endpoint = match cx.healthcheck.uri {
233 Some(uri) => uri.uri,
234 None => endpoint.clone(),
235 };
236
237 let healthcheck = healthcheck(
238 client.clone(),
239 healthcheck_endpoint,
240 self.compression,
241 auth.clone(),
242 Arc::clone(&validated_headers),
243 )
244 .boxed();
245
246 let service = RemoteWriteService {
247 endpoint,
248 client,
249 auth,
250 compression: self.compression,
251 headers: validated_headers,
252 };
253 let service = ServiceBuilder::new()
254 .settings(request_settings, http_response_retry_logic())
255 .service(service);
256
257 let sink = RemoteWriteSink {
258 tenant_id: self.tenant_id.clone(),
259 compression: self.compression,
260 aggregate: self.batch.aggregate,
261 batch_settings: self
262 .batch
263 .batch_settings
264 .validate()?
265 .into_batcher_settings()?,
266 buckets,
267 quantiles,
268 default_namespace,
269 expire_metrics_secs: self.expire_metrics_secs,
270 service,
271 };
272
273 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
274 }
275
276 fn input(&self) -> Input {
277 Input::metric()
278 }
279}
280
281async fn healthcheck(
282 client: HttpClient,
283 endpoint: Uri,
284 compression: Compression,
285 auth: Option<Auth>,
286 headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
287) -> crate::Result<()> {
288 let body = bytes::Bytes::new();
289 let request = build_request(
290 http::Method::GET,
291 &endpoint,
292 compression,
293 body,
294 None,
295 auth,
296 headers,
297 )
298 .await?;
299 let response = client.send(request).await?;
300
301 match response.status() {
302 http::StatusCode::OK => Ok(()),
303 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
304 }
305}