vector/sinks/prometheus/remote_write/
config.rs

1use std::{collections::BTreeMap, sync::Arc};
2
3use http::{HeaderValue, Uri, header::AUTHORIZATION};
4use snafu::prelude::*;
5
6#[cfg(feature = "aws-core")]
7use super::Errors;
8use super::{
9    service::{RemoteWriteService, build_request},
10    sink::{PrometheusRemoteWriteDefaultBatchSettings, RemoteWriteSink},
11};
12use crate::{
13    http::HttpClient,
14    sinks::{
15        UriParseSnafu,
16        prelude::*,
17        prometheus::PrometheusRemoteWriteAuth,
18        util::{
19            auth::Auth,
20            http::{OrderedHeaderName, http_response_retry_logic},
21            service::TowerRequestConfig,
22        },
23    },
24};
25
26/// The batch config for remote write.
27#[configurable_component]
28#[derive(Clone, Copy, Debug, Derivative)]
29#[derivative(Default)]
30pub struct RemoteWriteBatchConfig {
31    #[configurable(derived)]
32    #[serde(flatten)]
33    pub batch_settings: BatchConfig<PrometheusRemoteWriteDefaultBatchSettings>,
34
35    /// Whether or not to aggregate metrics within a batch.
36    #[serde(default = "crate::serde::default_true")]
37    #[derivative(Default(value = "true"))]
38    pub aggregate: bool,
39}
40
41/// Configuration for the `prometheus_remote_write` sink.
42#[configurable_component(sink(
43    "prometheus_remote_write",
44    "Deliver metric data to a Prometheus remote write endpoint."
45))]
46#[derive(Clone, Debug, Derivative)]
47#[derivative(Default)]
48#[serde(deny_unknown_fields)]
49pub struct RemoteWriteConfig {
50    /// The endpoint to send data to.
51    ///
52    /// The endpoint should include the scheme and the path to write to.
53    #[configurable(metadata(docs::examples = "https://localhost:8087/api/v1/write"))]
54    pub endpoint: String,
55
56    /// The default namespace for any metrics sent.
57    ///
58    /// This namespace is only used if a metric has no existing namespace. When a namespace is
59    /// present, it is used as a prefix to the metric name, and separated with an underscore (`_`).
60    ///
61    /// It should follow the Prometheus [naming conventions][prom_naming_docs].
62    ///
63    /// [prom_naming_docs]: https://prometheus.io/docs/practices/naming/#metric-names
64    #[configurable(metadata(docs::examples = "service"))]
65    #[configurable(metadata(docs::advanced))]
66    pub default_namespace: Option<String>,
67
68    /// Default buckets to use for aggregating [distribution][dist_metric_docs] metrics into histograms.
69    ///
70    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
71    #[serde(default = "crate::sinks::prometheus::default_histogram_buckets")]
72    #[configurable(metadata(docs::advanced))]
73    pub buckets: Vec<f64>,
74
75    /// Quantiles to use for aggregating [distribution][dist_metric_docs] metrics into a summary.
76    ///
77    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
78    #[serde(default = "crate::sinks::prometheus::default_summary_quantiles")]
79    #[configurable(metadata(docs::advanced))]
80    pub quantiles: Vec<f64>,
81
82    #[configurable(derived)]
83    #[serde(default)]
84    pub batch: RemoteWriteBatchConfig,
85
86    #[configurable(derived)]
87    #[serde(default)]
88    pub request: RemoteWriteRequestConfig,
89
90    /// The tenant ID to send.
91    ///
92    /// If set, a header named `X-Scope-OrgID` is added to outgoing requests with the value of this setting.
93    ///
94    /// This may be used by Cortex or other remote services to identify the tenant making the request.
95    #[serde(default)]
96    #[configurable(metadata(docs::examples = "my-domain"))]
97    #[configurable(metadata(docs::advanced))]
98    pub tenant_id: Option<Template>,
99
100    /// The amount of time, in seconds, that incremental metrics will persist in the internal metrics cache
101    /// after having not been updated before they expire and are removed.
102    ///
103    /// If unset, sending unique incremental metrics to this sink will cause indefinite memory growth.
104    #[serde(skip_serializing_if = "crate::serde::is_default")]
105    #[configurable(metadata(docs::common = false, docs::required = false))]
106    pub expire_metrics_secs: Option<f64>,
107
108    #[configurable(derived)]
109    pub tls: Option<TlsConfig>,
110
111    #[configurable(derived)]
112    pub auth: Option<PrometheusRemoteWriteAuth>,
113
114    #[cfg(feature = "aws-config")]
115    #[configurable(derived)]
116    #[configurable(metadata(docs::advanced))]
117    pub aws: Option<crate::aws::RegionOrEndpoint>,
118
119    #[configurable(derived)]
120    #[serde(
121        default,
122        deserialize_with = "crate::serde::bool_or_struct",
123        skip_serializing_if = "crate::serde::is_default"
124    )]
125    pub acknowledgements: AcknowledgementsConfig,
126
127    #[configurable(derived)]
128    #[configurable(metadata(docs::advanced))]
129    #[serde(default = "default_compression")]
130    #[derivative(Default(value = "default_compression()"))]
131    pub compression: Compression,
132}
133
134const fn default_compression() -> Compression {
135    Compression::Snappy
136}
137
138impl_generate_config_from_default!(RemoteWriteConfig);
139
140/// Outbound HTTP request settings for the Prometheus remote write sink.
141#[configurable_component]
142#[derive(Clone, Debug, Default)]
143#[serde(default)]
144pub struct RemoteWriteRequestConfig {
145    #[serde(flatten)]
146    pub tower: TowerRequestConfig,
147
148    /// Additional HTTP headers to add to every HTTP request.
149    ///
150    /// Values are applied verbatim; template expansion is not supported.
151    #[serde(default)]
152    #[configurable(metadata(
153        docs::additional_props_description = "An HTTP request header and its static value."
154    ))]
155    #[configurable(metadata(docs::examples = "remote_write_headers_examples()"))]
156    pub headers: BTreeMap<String, String>,
157}
158
159fn remote_write_headers_examples() -> BTreeMap<String, String> {
160    BTreeMap::from([
161        ("Accept".to_string(), "text/plain".to_string()),
162        ("X-My-Custom-Header".to_string(), "A-Value".to_string()),
163    ])
164}
165
166fn validate_headers(
167    headers: &BTreeMap<String, String>,
168    configures_auth: bool,
169) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
170    let headers = crate::sinks::util::http::validate_headers(headers)?;
171
172    for name in headers.keys() {
173        if configures_auth && name.inner() == AUTHORIZATION {
174            return Err("Authorization header can not be used with defined auth options".into());
175        }
176    }
177
178    Ok(headers)
179}
180
181#[async_trait::async_trait]
182#[typetag::serde(name = "prometheus_remote_write")]
183impl SinkConfig for RemoteWriteConfig {
184    fn acknowledgements(&self) -> &AcknowledgementsConfig {
185        &self.acknowledgements
186    }
187
188    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
189        let endpoint = self.endpoint.parse::<Uri>().context(UriParseSnafu)?;
190        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
191        let request_settings = self.request.tower.into_settings();
192        let validated_headers = Arc::new(validate_headers(
193            &self.request.headers,
194            self.auth.is_some(),
195        )?);
196        let buckets = self.buckets.clone();
197        let quantiles = self.quantiles.clone();
198        let default_namespace = self.default_namespace.clone();
199
200        let client = HttpClient::new(tls_settings, cx.proxy())?;
201
202        let auth = match &self.auth {
203            Some(PrometheusRemoteWriteAuth::Basic { user, password }) => {
204                Some(Auth::Basic(crate::http::Auth::Basic {
205                    user: user.clone(),
206                    password: password.clone().into(),
207                }))
208            }
209            Some(PrometheusRemoteWriteAuth::Bearer { token }) => {
210                Some(Auth::Basic(crate::http::Auth::Bearer {
211                    token: token.clone(),
212                }))
213            }
214            #[cfg(feature = "aws-core")]
215            Some(PrometheusRemoteWriteAuth::Aws(aws_auth)) => {
216                let region = self
217                    .aws
218                    .as_ref()
219                    .map(|config| config.region())
220                    .ok_or(Errors::AwsRegionRequired)?
221                    .ok_or(Errors::AwsRegionRequired)?;
222                Some(Auth::Aws {
223                    credentials_provider: aws_auth
224                        .credentials_provider(region.clone(), cx.proxy(), self.tls.as_ref())
225                        .await?,
226                    region,
227                })
228            }
229            None => None,
230        };
231
232        let healthcheck_endpoint = match cx.healthcheck.uri {
233            Some(uri) => uri.uri,
234            None => endpoint.clone(),
235        };
236
237        let healthcheck = healthcheck(
238            client.clone(),
239            healthcheck_endpoint,
240            self.compression,
241            auth.clone(),
242            Arc::clone(&validated_headers),
243        )
244        .boxed();
245
246        let service = RemoteWriteService {
247            endpoint,
248            client,
249            auth,
250            compression: self.compression,
251            headers: validated_headers,
252        };
253        let service = ServiceBuilder::new()
254            .settings(request_settings, http_response_retry_logic())
255            .service(service);
256
257        let sink = RemoteWriteSink {
258            tenant_id: self.tenant_id.clone(),
259            compression: self.compression,
260            aggregate: self.batch.aggregate,
261            batch_settings: self
262                .batch
263                .batch_settings
264                .validate()?
265                .into_batcher_settings()?,
266            buckets,
267            quantiles,
268            default_namespace,
269            expire_metrics_secs: self.expire_metrics_secs,
270            service,
271        };
272
273        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
274    }
275
276    fn input(&self) -> Input {
277        Input::metric()
278    }
279}
280
281async fn healthcheck(
282    client: HttpClient,
283    endpoint: Uri,
284    compression: Compression,
285    auth: Option<Auth>,
286    headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
287) -> crate::Result<()> {
288    let body = bytes::Bytes::new();
289    let request = build_request(
290        http::Method::GET,
291        &endpoint,
292        compression,
293        body,
294        None,
295        auth,
296        headers,
297    )
298    .await?;
299    let response = client.send(request).await?;
300
301    match response.status() {
302        http::StatusCode::OK => Ok(()),
303        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
304    }
305}