vector/sinks/prometheus/remote_write/
config.rs1use http::Uri;
2use snafu::prelude::*;
3
4#[cfg(feature = "aws-core")]
5use super::Errors;
6use super::{
7 service::{RemoteWriteService, build_request},
8 sink::{PrometheusRemoteWriteDefaultBatchSettings, RemoteWriteSink},
9};
10use crate::{
11 http::HttpClient,
12 sinks::{
13 UriParseSnafu,
14 prelude::*,
15 prometheus::PrometheusRemoteWriteAuth,
16 util::{auth::Auth, http::http_response_retry_logic},
17 },
18};
19
20#[configurable_component]
22#[derive(Clone, Copy, Debug, Derivative)]
23#[derivative(Default)]
24pub struct RemoteWriteBatchConfig {
25 #[configurable(derived)]
26 #[serde(flatten)]
27 pub batch_settings: BatchConfig<PrometheusRemoteWriteDefaultBatchSettings>,
28
29 #[serde(default = "crate::serde::default_true")]
31 #[derivative(Default(value = "true"))]
32 pub aggregate: bool,
33}
34
35#[configurable_component(sink(
37 "prometheus_remote_write",
38 "Deliver metric data to a Prometheus remote write endpoint."
39))]
40#[derive(Clone, Debug, Derivative)]
41#[derivative(Default)]
42#[serde(deny_unknown_fields)]
43pub struct RemoteWriteConfig {
44 #[configurable(metadata(docs::examples = "https://localhost:8087/api/v1/write"))]
48 pub endpoint: String,
49
50 #[configurable(metadata(docs::examples = "service"))]
59 #[configurable(metadata(docs::advanced))]
60 pub default_namespace: Option<String>,
61
62 #[serde(default = "crate::sinks::prometheus::default_histogram_buckets")]
66 #[configurable(metadata(docs::advanced))]
67 pub buckets: Vec<f64>,
68
69 #[serde(default = "crate::sinks::prometheus::default_summary_quantiles")]
73 #[configurable(metadata(docs::advanced))]
74 pub quantiles: Vec<f64>,
75
76 #[configurable(derived)]
77 #[serde(default)]
78 pub batch: RemoteWriteBatchConfig,
79
80 #[configurable(derived)]
81 #[serde(default)]
82 pub request: TowerRequestConfig,
83
84 #[serde(default)]
90 #[configurable(metadata(docs::examples = "my-domain"))]
91 #[configurable(metadata(docs::advanced))]
92 pub tenant_id: Option<Template>,
93
94 #[serde(skip_serializing_if = "crate::serde::is_default")]
99 #[configurable(metadata(docs::common = false, docs::required = false))]
100 pub expire_metrics_secs: Option<f64>,
101
102 #[configurable(derived)]
103 pub tls: Option<TlsConfig>,
104
105 #[configurable(derived)]
106 pub auth: Option<PrometheusRemoteWriteAuth>,
107
108 #[cfg(feature = "aws-config")]
109 #[configurable(derived)]
110 #[configurable(metadata(docs::advanced))]
111 pub aws: Option<crate::aws::RegionOrEndpoint>,
112
113 #[configurable(derived)]
114 #[serde(
115 default,
116 deserialize_with = "crate::serde::bool_or_struct",
117 skip_serializing_if = "crate::serde::is_default"
118 )]
119 pub acknowledgements: AcknowledgementsConfig,
120
121 #[configurable(derived)]
122 #[configurable(metadata(docs::advanced))]
123 #[serde(default = "default_compression")]
124 #[derivative(Default(value = "default_compression()"))]
125 pub compression: Compression,
126}
127
128const fn default_compression() -> Compression {
129 Compression::Snappy
130}
131
132impl_generate_config_from_default!(RemoteWriteConfig);
133
134#[async_trait::async_trait]
135#[typetag::serde(name = "prometheus_remote_write")]
136impl SinkConfig for RemoteWriteConfig {
137 fn acknowledgements(&self) -> &AcknowledgementsConfig {
138 &self.acknowledgements
139 }
140
141 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
142 let endpoint = self.endpoint.parse::<Uri>().context(UriParseSnafu)?;
143 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
144 let request_settings = self.request.into_settings();
145 let buckets = self.buckets.clone();
146 let quantiles = self.quantiles.clone();
147 let default_namespace = self.default_namespace.clone();
148
149 let client = HttpClient::new(tls_settings, cx.proxy())?;
150
151 let auth = match &self.auth {
152 Some(PrometheusRemoteWriteAuth::Basic { user, password }) => {
153 Some(Auth::Basic(crate::http::Auth::Basic {
154 user: user.clone(),
155 password: password.clone().into(),
156 }))
157 }
158 Some(PrometheusRemoteWriteAuth::Bearer { token }) => {
159 Some(Auth::Basic(crate::http::Auth::Bearer {
160 token: token.clone(),
161 }))
162 }
163 #[cfg(feature = "aws-core")]
164 Some(PrometheusRemoteWriteAuth::Aws(aws_auth)) => {
165 let region = self
166 .aws
167 .as_ref()
168 .map(|config| config.region())
169 .ok_or(Errors::AwsRegionRequired)?
170 .ok_or(Errors::AwsRegionRequired)?;
171 Some(Auth::Aws {
172 credentials_provider: aws_auth
173 .credentials_provider(region.clone(), cx.proxy(), self.tls.as_ref())
174 .await?,
175 region,
176 })
177 }
178 None => None,
179 };
180
181 let healthcheck = healthcheck(
182 client.clone(),
183 endpoint.clone(),
184 self.compression,
185 auth.clone(),
186 )
187 .boxed();
188
189 let service = RemoteWriteService {
190 endpoint,
191 client,
192 auth,
193 compression: self.compression,
194 };
195 let service = ServiceBuilder::new()
196 .settings(request_settings, http_response_retry_logic())
197 .service(service);
198
199 let sink = RemoteWriteSink {
200 tenant_id: self.tenant_id.clone(),
201 compression: self.compression,
202 aggregate: self.batch.aggregate,
203 batch_settings: self
204 .batch
205 .batch_settings
206 .validate()?
207 .into_batcher_settings()?,
208 buckets,
209 quantiles,
210 default_namespace,
211 expire_metrics_secs: self.expire_metrics_secs,
212 service,
213 };
214
215 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
216 }
217
218 fn input(&self) -> Input {
219 Input::metric()
220 }
221}
222
223async fn healthcheck(
224 client: HttpClient,
225 endpoint: Uri,
226 compression: Compression,
227 auth: Option<Auth>,
228) -> crate::Result<()> {
229 let body = bytes::Bytes::new();
230 let request =
231 build_request(http::Method::GET, &endpoint, compression, body, None, auth).await?;
232 let response = client.send(request).await?;
233
234 match response.status() {
235 http::StatusCode::OK => Ok(()),
236 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
237 }
238}