vector/sinks/prometheus/remote_write/
config.rs

1use http::Uri;
2use snafu::prelude::*;
3
4use crate::{
5    http::HttpClient,
6    sinks::{
7        prelude::*,
8        prometheus::PrometheusRemoteWriteAuth,
9        util::{auth::Auth, http::http_response_retry_logic},
10        UriParseSnafu,
11    },
12};
13
14use super::{
15    service::{build_request, RemoteWriteService},
16    sink::{PrometheusRemoteWriteDefaultBatchSettings, RemoteWriteSink},
17};
18
19#[cfg(feature = "aws-core")]
20use super::Errors;
21
22/// The batch config for remote write.
23#[configurable_component]
24#[derive(Clone, Copy, Debug, Derivative)]
25#[derivative(Default)]
26pub struct RemoteWriteBatchConfig {
27    #[configurable(derived)]
28    #[serde(flatten)]
29    pub batch_settings: BatchConfig<PrometheusRemoteWriteDefaultBatchSettings>,
30
31    /// Whether or not to aggregate metrics within a batch.
32    #[serde(default = "crate::serde::default_true")]
33    #[derivative(Default(value = "true"))]
34    pub aggregate: bool,
35}
36
37/// Configuration for the `prometheus_remote_write` sink.
38#[configurable_component(sink(
39    "prometheus_remote_write",
40    "Deliver metric data to a Prometheus remote write endpoint."
41))]
42#[derive(Clone, Debug, Derivative)]
43#[derivative(Default)]
44#[serde(deny_unknown_fields)]
45pub struct RemoteWriteConfig {
46    /// The endpoint to send data to.
47    ///
48    /// The endpoint should include the scheme and the path to write to.
49    #[configurable(metadata(docs::examples = "https://localhost:8087/api/v1/write"))]
50    pub endpoint: String,
51
52    /// The default namespace for any metrics sent.
53    ///
54    /// This namespace is only used if a metric has no existing namespace. When a namespace is
55    /// present, it is used as a prefix to the metric name, and separated with an underscore (`_`).
56    ///
57    /// It should follow the Prometheus [naming conventions][prom_naming_docs].
58    ///
59    /// [prom_naming_docs]: https://prometheus.io/docs/practices/naming/#metric-names
60    #[configurable(metadata(docs::examples = "service"))]
61    #[configurable(metadata(docs::advanced))]
62    pub default_namespace: Option<String>,
63
64    /// Default buckets to use for aggregating [distribution][dist_metric_docs] metrics into histograms.
65    ///
66    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
67    #[serde(default = "crate::sinks::prometheus::default_histogram_buckets")]
68    #[configurable(metadata(docs::advanced))]
69    pub buckets: Vec<f64>,
70
71    /// Quantiles to use for aggregating [distribution][dist_metric_docs] metrics into a summary.
72    ///
73    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
74    #[serde(default = "crate::sinks::prometheus::default_summary_quantiles")]
75    #[configurable(metadata(docs::advanced))]
76    pub quantiles: Vec<f64>,
77
78    #[configurable(derived)]
79    #[serde(default)]
80    pub batch: RemoteWriteBatchConfig,
81
82    #[configurable(derived)]
83    #[serde(default)]
84    pub request: TowerRequestConfig,
85
86    /// The tenant ID to send.
87    ///
88    /// If set, a header named `X-Scope-OrgID` is added to outgoing requests with the value of this setting.
89    ///
90    /// This may be used by Cortex or other remote services to identify the tenant making the request.
91    #[serde(default)]
92    #[configurable(metadata(docs::examples = "my-domain"))]
93    #[configurable(metadata(docs::advanced))]
94    pub tenant_id: Option<Template>,
95
96    /// The amount of time, in seconds, that incremental metrics will persist in the internal metrics cache
97    /// after having not been updated before they expire and are removed.
98    ///
99    /// If unset, sending unique incremental metrics to this sink will cause indefinite memory growth.
100    #[serde(skip_serializing_if = "crate::serde::is_default")]
101    #[configurable(metadata(docs::common = false, docs::required = false))]
102    pub expire_metrics_secs: Option<f64>,
103
104    #[configurable(derived)]
105    pub tls: Option<TlsConfig>,
106
107    #[configurable(derived)]
108    pub auth: Option<PrometheusRemoteWriteAuth>,
109
110    #[cfg(feature = "aws-config")]
111    #[configurable(derived)]
112    #[configurable(metadata(docs::advanced))]
113    pub aws: Option<crate::aws::RegionOrEndpoint>,
114
115    #[configurable(derived)]
116    #[serde(
117        default,
118        deserialize_with = "crate::serde::bool_or_struct",
119        skip_serializing_if = "crate::serde::is_default"
120    )]
121    pub acknowledgements: AcknowledgementsConfig,
122
123    #[configurable(derived)]
124    #[configurable(metadata(docs::advanced))]
125    #[serde(default = "default_compression")]
126    #[derivative(Default(value = "default_compression()"))]
127    pub compression: Compression,
128}
129
130const fn default_compression() -> Compression {
131    Compression::Snappy
132}
133
134impl_generate_config_from_default!(RemoteWriteConfig);
135
136#[async_trait::async_trait]
137#[typetag::serde(name = "prometheus_remote_write")]
138impl SinkConfig for RemoteWriteConfig {
139    fn acknowledgements(&self) -> &AcknowledgementsConfig {
140        &self.acknowledgements
141    }
142
143    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
144        let endpoint = self.endpoint.parse::<Uri>().context(UriParseSnafu)?;
145        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
146        let request_settings = self.request.into_settings();
147        let buckets = self.buckets.clone();
148        let quantiles = self.quantiles.clone();
149        let default_namespace = self.default_namespace.clone();
150
151        let client = HttpClient::new(tls_settings, cx.proxy())?;
152
153        let auth = match &self.auth {
154            Some(PrometheusRemoteWriteAuth::Basic { user, password }) => {
155                Some(Auth::Basic(crate::http::Auth::Basic {
156                    user: user.clone(),
157                    password: password.clone().into(),
158                }))
159            }
160            Some(PrometheusRemoteWriteAuth::Bearer { token }) => {
161                Some(Auth::Basic(crate::http::Auth::Bearer {
162                    token: token.clone(),
163                }))
164            }
165            #[cfg(feature = "aws-core")]
166            Some(PrometheusRemoteWriteAuth::Aws(aws_auth)) => {
167                let region = self
168                    .aws
169                    .as_ref()
170                    .map(|config| config.region())
171                    .ok_or(Errors::AwsRegionRequired)?
172                    .ok_or(Errors::AwsRegionRequired)?;
173                Some(Auth::Aws {
174                    credentials_provider: aws_auth
175                        .credentials_provider(region.clone(), cx.proxy(), self.tls.as_ref())
176                        .await?,
177                    region,
178                })
179            }
180            None => None,
181        };
182
183        let healthcheck = healthcheck(
184            client.clone(),
185            endpoint.clone(),
186            self.compression,
187            auth.clone(),
188        )
189        .boxed();
190
191        let service = RemoteWriteService {
192            endpoint,
193            client,
194            auth,
195            compression: self.compression,
196        };
197        let service = ServiceBuilder::new()
198            .settings(request_settings, http_response_retry_logic())
199            .service(service);
200
201        let sink = RemoteWriteSink {
202            tenant_id: self.tenant_id.clone(),
203            compression: self.compression,
204            aggregate: self.batch.aggregate,
205            batch_settings: self
206                .batch
207                .batch_settings
208                .validate()?
209                .into_batcher_settings()?,
210            buckets,
211            quantiles,
212            default_namespace,
213            expire_metrics_secs: self.expire_metrics_secs,
214            service,
215        };
216
217        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
218    }
219
220    fn input(&self) -> Input {
221        Input::metric()
222    }
223}
224
225async fn healthcheck(
226    client: HttpClient,
227    endpoint: Uri,
228    compression: Compression,
229    auth: Option<Auth>,
230) -> crate::Result<()> {
231    let body = bytes::Bytes::new();
232    let request =
233        build_request(http::Method::GET, &endpoint, compression, body, None, auth).await?;
234    let response = client.send(request).await?;
235
236    match response.status() {
237        http::StatusCode::OK => Ok(()),
238        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
239    }
240}