vector/sinks/prometheus/remote_write/
config.rs

1use http::Uri;
2use snafu::prelude::*;
3
4#[cfg(feature = "aws-core")]
5use super::Errors;
6use super::{
7    service::{RemoteWriteService, build_request},
8    sink::{PrometheusRemoteWriteDefaultBatchSettings, RemoteWriteSink},
9};
10use crate::{
11    http::HttpClient,
12    sinks::{
13        UriParseSnafu,
14        prelude::*,
15        prometheus::PrometheusRemoteWriteAuth,
16        util::{auth::Auth, http::http_response_retry_logic},
17    },
18};
19
20/// The batch config for remote write.
21#[configurable_component]
22#[derive(Clone, Copy, Debug, Derivative)]
23#[derivative(Default)]
24pub struct RemoteWriteBatchConfig {
25    #[configurable(derived)]
26    #[serde(flatten)]
27    pub batch_settings: BatchConfig<PrometheusRemoteWriteDefaultBatchSettings>,
28
29    /// Whether or not to aggregate metrics within a batch.
30    #[serde(default = "crate::serde::default_true")]
31    #[derivative(Default(value = "true"))]
32    pub aggregate: bool,
33}
34
35/// Configuration for the `prometheus_remote_write` sink.
36#[configurable_component(sink(
37    "prometheus_remote_write",
38    "Deliver metric data to a Prometheus remote write endpoint."
39))]
40#[derive(Clone, Debug, Derivative)]
41#[derivative(Default)]
42#[serde(deny_unknown_fields)]
43pub struct RemoteWriteConfig {
44    /// The endpoint to send data to.
45    ///
46    /// The endpoint should include the scheme and the path to write to.
47    #[configurable(metadata(docs::examples = "https://localhost:8087/api/v1/write"))]
48    pub endpoint: String,
49
50    /// The default namespace for any metrics sent.
51    ///
52    /// This namespace is only used if a metric has no existing namespace. When a namespace is
53    /// present, it is used as a prefix to the metric name, and separated with an underscore (`_`).
54    ///
55    /// It should follow the Prometheus [naming conventions][prom_naming_docs].
56    ///
57    /// [prom_naming_docs]: https://prometheus.io/docs/practices/naming/#metric-names
58    #[configurable(metadata(docs::examples = "service"))]
59    #[configurable(metadata(docs::advanced))]
60    pub default_namespace: Option<String>,
61
62    /// Default buckets to use for aggregating [distribution][dist_metric_docs] metrics into histograms.
63    ///
64    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
65    #[serde(default = "crate::sinks::prometheus::default_histogram_buckets")]
66    #[configurable(metadata(docs::advanced))]
67    pub buckets: Vec<f64>,
68
69    /// Quantiles to use for aggregating [distribution][dist_metric_docs] metrics into a summary.
70    ///
71    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
72    #[serde(default = "crate::sinks::prometheus::default_summary_quantiles")]
73    #[configurable(metadata(docs::advanced))]
74    pub quantiles: Vec<f64>,
75
76    #[configurable(derived)]
77    #[serde(default)]
78    pub batch: RemoteWriteBatchConfig,
79
80    #[configurable(derived)]
81    #[serde(default)]
82    pub request: TowerRequestConfig,
83
84    /// The tenant ID to send.
85    ///
86    /// If set, a header named `X-Scope-OrgID` is added to outgoing requests with the value of this setting.
87    ///
88    /// This may be used by Cortex or other remote services to identify the tenant making the request.
89    #[serde(default)]
90    #[configurable(metadata(docs::examples = "my-domain"))]
91    #[configurable(metadata(docs::advanced))]
92    pub tenant_id: Option<Template>,
93
94    /// The amount of time, in seconds, that incremental metrics will persist in the internal metrics cache
95    /// after having not been updated before they expire and are removed.
96    ///
97    /// If unset, sending unique incremental metrics to this sink will cause indefinite memory growth.
98    #[serde(skip_serializing_if = "crate::serde::is_default")]
99    #[configurable(metadata(docs::common = false, docs::required = false))]
100    pub expire_metrics_secs: Option<f64>,
101
102    #[configurable(derived)]
103    pub tls: Option<TlsConfig>,
104
105    #[configurable(derived)]
106    pub auth: Option<PrometheusRemoteWriteAuth>,
107
108    #[cfg(feature = "aws-config")]
109    #[configurable(derived)]
110    #[configurable(metadata(docs::advanced))]
111    pub aws: Option<crate::aws::RegionOrEndpoint>,
112
113    #[configurable(derived)]
114    #[serde(
115        default,
116        deserialize_with = "crate::serde::bool_or_struct",
117        skip_serializing_if = "crate::serde::is_default"
118    )]
119    pub acknowledgements: AcknowledgementsConfig,
120
121    #[configurable(derived)]
122    #[configurable(metadata(docs::advanced))]
123    #[serde(default = "default_compression")]
124    #[derivative(Default(value = "default_compression()"))]
125    pub compression: Compression,
126}
127
128const fn default_compression() -> Compression {
129    Compression::Snappy
130}
131
132impl_generate_config_from_default!(RemoteWriteConfig);
133
134#[async_trait::async_trait]
135#[typetag::serde(name = "prometheus_remote_write")]
136impl SinkConfig for RemoteWriteConfig {
137    fn acknowledgements(&self) -> &AcknowledgementsConfig {
138        &self.acknowledgements
139    }
140
141    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
142        let endpoint = self.endpoint.parse::<Uri>().context(UriParseSnafu)?;
143        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
144        let request_settings = self.request.into_settings();
145        let buckets = self.buckets.clone();
146        let quantiles = self.quantiles.clone();
147        let default_namespace = self.default_namespace.clone();
148
149        let client = HttpClient::new(tls_settings, cx.proxy())?;
150
151        let auth = match &self.auth {
152            Some(PrometheusRemoteWriteAuth::Basic { user, password }) => {
153                Some(Auth::Basic(crate::http::Auth::Basic {
154                    user: user.clone(),
155                    password: password.clone().into(),
156                }))
157            }
158            Some(PrometheusRemoteWriteAuth::Bearer { token }) => {
159                Some(Auth::Basic(crate::http::Auth::Bearer {
160                    token: token.clone(),
161                }))
162            }
163            #[cfg(feature = "aws-core")]
164            Some(PrometheusRemoteWriteAuth::Aws(aws_auth)) => {
165                let region = self
166                    .aws
167                    .as_ref()
168                    .map(|config| config.region())
169                    .ok_or(Errors::AwsRegionRequired)?
170                    .ok_or(Errors::AwsRegionRequired)?;
171                Some(Auth::Aws {
172                    credentials_provider: aws_auth
173                        .credentials_provider(region.clone(), cx.proxy(), self.tls.as_ref())
174                        .await?,
175                    region,
176                })
177            }
178            None => None,
179        };
180
181        let healthcheck = healthcheck(
182            client.clone(),
183            endpoint.clone(),
184            self.compression,
185            auth.clone(),
186        )
187        .boxed();
188
189        let service = RemoteWriteService {
190            endpoint,
191            client,
192            auth,
193            compression: self.compression,
194        };
195        let service = ServiceBuilder::new()
196            .settings(request_settings, http_response_retry_logic())
197            .service(service);
198
199        let sink = RemoteWriteSink {
200            tenant_id: self.tenant_id.clone(),
201            compression: self.compression,
202            aggregate: self.batch.aggregate,
203            batch_settings: self
204                .batch
205                .batch_settings
206                .validate()?
207                .into_batcher_settings()?,
208            buckets,
209            quantiles,
210            default_namespace,
211            expire_metrics_secs: self.expire_metrics_secs,
212            service,
213        };
214
215        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
216    }
217
218    fn input(&self) -> Input {
219        Input::metric()
220    }
221}
222
223async fn healthcheck(
224    client: HttpClient,
225    endpoint: Uri,
226    compression: Compression,
227    auth: Option<Auth>,
228) -> crate::Result<()> {
229    let body = bytes::Bytes::new();
230    let request =
231        build_request(http::Method::GET, &endpoint, compression, body, None, auth).await?;
232    let response = client.send(request).await?;
233
234    match response.status() {
235        http::StatusCode::OK => Ok(()),
236        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
237    }
238}