vector/sinks/prometheus/remote_write/
config.rs1use http::Uri;
2use snafu::prelude::*;
3
4use crate::{
5 http::HttpClient,
6 sinks::{
7 prelude::*,
8 prometheus::PrometheusRemoteWriteAuth,
9 util::{auth::Auth, http::http_response_retry_logic},
10 UriParseSnafu,
11 },
12};
13
14use super::{
15 service::{build_request, RemoteWriteService},
16 sink::{PrometheusRemoteWriteDefaultBatchSettings, RemoteWriteSink},
17};
18
19#[cfg(feature = "aws-core")]
20use super::Errors;
21
22#[configurable_component]
24#[derive(Clone, Copy, Debug, Derivative)]
25#[derivative(Default)]
26pub struct RemoteWriteBatchConfig {
27 #[configurable(derived)]
28 #[serde(flatten)]
29 pub batch_settings: BatchConfig<PrometheusRemoteWriteDefaultBatchSettings>,
30
31 #[serde(default = "crate::serde::default_true")]
33 #[derivative(Default(value = "true"))]
34 pub aggregate: bool,
35}
36
37#[configurable_component(sink(
39 "prometheus_remote_write",
40 "Deliver metric data to a Prometheus remote write endpoint."
41))]
42#[derive(Clone, Debug, Derivative)]
43#[derivative(Default)]
44#[serde(deny_unknown_fields)]
45pub struct RemoteWriteConfig {
46 #[configurable(metadata(docs::examples = "https://localhost:8087/api/v1/write"))]
50 pub endpoint: String,
51
52 #[configurable(metadata(docs::examples = "service"))]
61 #[configurable(metadata(docs::advanced))]
62 pub default_namespace: Option<String>,
63
64 #[serde(default = "crate::sinks::prometheus::default_histogram_buckets")]
68 #[configurable(metadata(docs::advanced))]
69 pub buckets: Vec<f64>,
70
71 #[serde(default = "crate::sinks::prometheus::default_summary_quantiles")]
75 #[configurable(metadata(docs::advanced))]
76 pub quantiles: Vec<f64>,
77
78 #[configurable(derived)]
79 #[serde(default)]
80 pub batch: RemoteWriteBatchConfig,
81
82 #[configurable(derived)]
83 #[serde(default)]
84 pub request: TowerRequestConfig,
85
86 #[serde(default)]
92 #[configurable(metadata(docs::examples = "my-domain"))]
93 #[configurable(metadata(docs::advanced))]
94 pub tenant_id: Option<Template>,
95
96 #[serde(skip_serializing_if = "crate::serde::is_default")]
101 #[configurable(metadata(docs::common = false, docs::required = false))]
102 pub expire_metrics_secs: Option<f64>,
103
104 #[configurable(derived)]
105 pub tls: Option<TlsConfig>,
106
107 #[configurable(derived)]
108 pub auth: Option<PrometheusRemoteWriteAuth>,
109
110 #[cfg(feature = "aws-config")]
111 #[configurable(derived)]
112 #[configurable(metadata(docs::advanced))]
113 pub aws: Option<crate::aws::RegionOrEndpoint>,
114
115 #[configurable(derived)]
116 #[serde(
117 default,
118 deserialize_with = "crate::serde::bool_or_struct",
119 skip_serializing_if = "crate::serde::is_default"
120 )]
121 pub acknowledgements: AcknowledgementsConfig,
122
123 #[configurable(derived)]
124 #[configurable(metadata(docs::advanced))]
125 #[serde(default = "default_compression")]
126 #[derivative(Default(value = "default_compression()"))]
127 pub compression: Compression,
128}
129
130const fn default_compression() -> Compression {
131 Compression::Snappy
132}
133
134impl_generate_config_from_default!(RemoteWriteConfig);
135
136#[async_trait::async_trait]
137#[typetag::serde(name = "prometheus_remote_write")]
138impl SinkConfig for RemoteWriteConfig {
139 fn acknowledgements(&self) -> &AcknowledgementsConfig {
140 &self.acknowledgements
141 }
142
143 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
144 let endpoint = self.endpoint.parse::<Uri>().context(UriParseSnafu)?;
145 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
146 let request_settings = self.request.into_settings();
147 let buckets = self.buckets.clone();
148 let quantiles = self.quantiles.clone();
149 let default_namespace = self.default_namespace.clone();
150
151 let client = HttpClient::new(tls_settings, cx.proxy())?;
152
153 let auth = match &self.auth {
154 Some(PrometheusRemoteWriteAuth::Basic { user, password }) => {
155 Some(Auth::Basic(crate::http::Auth::Basic {
156 user: user.clone(),
157 password: password.clone().into(),
158 }))
159 }
160 Some(PrometheusRemoteWriteAuth::Bearer { token }) => {
161 Some(Auth::Basic(crate::http::Auth::Bearer {
162 token: token.clone(),
163 }))
164 }
165 #[cfg(feature = "aws-core")]
166 Some(PrometheusRemoteWriteAuth::Aws(aws_auth)) => {
167 let region = self
168 .aws
169 .as_ref()
170 .map(|config| config.region())
171 .ok_or(Errors::AwsRegionRequired)?
172 .ok_or(Errors::AwsRegionRequired)?;
173 Some(Auth::Aws {
174 credentials_provider: aws_auth
175 .credentials_provider(region.clone(), cx.proxy(), self.tls.as_ref())
176 .await?,
177 region,
178 })
179 }
180 None => None,
181 };
182
183 let healthcheck = healthcheck(
184 client.clone(),
185 endpoint.clone(),
186 self.compression,
187 auth.clone(),
188 )
189 .boxed();
190
191 let service = RemoteWriteService {
192 endpoint,
193 client,
194 auth,
195 compression: self.compression,
196 };
197 let service = ServiceBuilder::new()
198 .settings(request_settings, http_response_retry_logic())
199 .service(service);
200
201 let sink = RemoteWriteSink {
202 tenant_id: self.tenant_id.clone(),
203 compression: self.compression,
204 aggregate: self.batch.aggregate,
205 batch_settings: self
206 .batch
207 .batch_settings
208 .validate()?
209 .into_batcher_settings()?,
210 buckets,
211 quantiles,
212 default_namespace,
213 expire_metrics_secs: self.expire_metrics_secs,
214 service,
215 };
216
217 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
218 }
219
220 fn input(&self) -> Input {
221 Input::metric()
222 }
223}
224
225async fn healthcheck(
226 client: HttpClient,
227 endpoint: Uri,
228 compression: Compression,
229 auth: Option<Auth>,
230) -> crate::Result<()> {
231 let body = bytes::Bytes::new();
232 let request =
233 build_request(http::Method::GET, &endpoint, compression, body, None, auth).await?;
234 let response = client.send(request).await?;
235
236 match response.status() {
237 http::StatusCode::OK => Ok(()),
238 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
239 }
240}