vector/sources/prometheus/
remote_write.rs1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::Bytes;
4use prost::Message;
5use vector_lib::config::LogNamespace;
6use vector_lib::configurable::configurable_component;
7use vector_lib::prometheus::parser::proto;
8use warp::http::{HeaderMap, StatusCode};
9
10use super::parser;
11use crate::{
12 common::http::{server_auth::HttpServerAuthConfig, ErrorMessage},
13 config::{
14 GenerateConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
15 },
16 event::Event,
17 http::KeepaliveConfig,
18 internal_events::PrometheusRemoteWriteParseError,
19 serde::bool_or_struct,
20 sources::{
21 self,
22 util::{decode, http::HttpMethod, HttpSource},
23 },
24 tls::TlsEnableableConfig,
25};
26
27#[configurable_component(source(
29 "prometheus_remote_write",
30 "Receive metric via the Prometheus Remote Write protocol."
31))]
32#[derive(Clone, Debug)]
33pub struct PrometheusRemoteWriteConfig {
34 #[configurable(metadata(docs::examples = "0.0.0.0:9090"))]
38 address: SocketAddr,
39
40 #[configurable(derived)]
41 tls: Option<TlsEnableableConfig>,
42
43 #[configurable(derived)]
44 #[configurable(metadata(docs::advanced))]
45 auth: Option<HttpServerAuthConfig>,
46
47 #[configurable(derived)]
48 #[serde(default, deserialize_with = "bool_or_struct")]
49 acknowledgements: SourceAcknowledgementsConfig,
50
51 #[configurable(derived)]
52 #[serde(default)]
53 keepalive: KeepaliveConfig,
54}
55
56impl PrometheusRemoteWriteConfig {
57 #[cfg(test)]
58 pub fn from_address(address: SocketAddr) -> Self {
59 Self {
60 address,
61 tls: None,
62 auth: None,
63 acknowledgements: false.into(),
64 keepalive: KeepaliveConfig::default(),
65 }
66 }
67}
68
69impl GenerateConfig for PrometheusRemoteWriteConfig {
70 fn generate_config() -> toml::Value {
71 toml::Value::try_from(Self {
72 address: "127.0.0.1:9090".parse().unwrap(),
73 tls: None,
74 auth: None,
75 acknowledgements: SourceAcknowledgementsConfig::default(),
76 keepalive: KeepaliveConfig::default(),
77 })
78 .unwrap()
79 }
80}
81
82#[async_trait::async_trait]
83#[typetag::serde(name = "prometheus_remote_write")]
84impl SourceConfig for PrometheusRemoteWriteConfig {
85 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
86 let source = RemoteWriteSource;
87 source.run(
88 self.address,
89 "",
90 HttpMethod::Post,
91 StatusCode::OK,
92 true,
93 self.tls.as_ref(),
94 self.auth.as_ref(),
95 cx,
96 self.acknowledgements,
97 self.keepalive.clone(),
98 )
99 }
100
101 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
102 vec![SourceOutput::new_metrics()]
103 }
104
105 fn can_acknowledge(&self) -> bool {
106 true
107 }
108}
109
110#[derive(Clone)]
111struct RemoteWriteSource;
112
113impl RemoteWriteSource {
114 fn decode_body(&self, body: Bytes) -> Result<Vec<Event>, ErrorMessage> {
115 let request = proto::WriteRequest::decode(body).map_err(|error| {
116 emit!(PrometheusRemoteWriteParseError {
117 error: error.clone()
118 });
119 ErrorMessage::new(
120 StatusCode::BAD_REQUEST,
121 format!("Could not decode write request: {error}"),
122 )
123 })?;
124 parser::parse_request(request).map_err(|error| {
125 ErrorMessage::new(
126 StatusCode::BAD_REQUEST,
127 format!("Could not decode write request: {error}"),
128 )
129 })
130 }
131}
132
133impl HttpSource for RemoteWriteSource {
134 fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
135 decode(encoding_header.or(Some("snappy")), body)
137 }
138
139 fn build_events(
140 &self,
141 body: Bytes,
142 _header_map: &HeaderMap,
143 _query_parameters: &HashMap<String, String>,
144 _full_path: &str,
145 ) -> Result<Vec<Event>, ErrorMessage> {
146 let events = self.decode_body(body)?;
147 Ok(events)
148 }
149}
150
151#[cfg(test)]
152mod test {
153 use chrono::{SubsecRound as _, Utc};
154 use vector_lib::{
155 event::{EventStatus, Metric, MetricKind, MetricValue},
156 metric_tags,
157 };
158
159 use super::*;
160 use crate::{
161 config::{SinkConfig, SinkContext},
162 sinks::prometheus::remote_write::RemoteWriteConfig,
163 test_util::{self, wait_for_tcp},
164 tls::MaybeTlsSettings,
165 SourceSender,
166 };
167
168 #[test]
169 fn generate_config() {
170 crate::test_util::test_generate_config::<PrometheusRemoteWriteConfig>();
171 }
172
173 #[tokio::test]
174 async fn receives_metrics_over_http() {
175 receives_metrics(None).await;
176 }
177
178 #[tokio::test]
179 async fn receives_metrics_over_https() {
180 receives_metrics(Some(TlsEnableableConfig::test_config())).await;
181 }
182
183 async fn receives_metrics(tls: Option<TlsEnableableConfig>) {
184 let address = test_util::next_addr();
185 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
186
187 let proto = MaybeTlsSettings::from_config(tls.as_ref(), true)
188 .unwrap()
189 .http_protocol_name();
190 let source = PrometheusRemoteWriteConfig {
191 address,
192 auth: None,
193 tls: tls.clone(),
194 acknowledgements: SourceAcknowledgementsConfig::default(),
195 keepalive: KeepaliveConfig::default(),
196 };
197 let source = source
198 .build(SourceContext::new_test(tx, None))
199 .await
200 .unwrap();
201 tokio::spawn(source);
202 wait_for_tcp(address).await;
203
204 let sink = RemoteWriteConfig {
205 endpoint: format!("{}://localhost:{}/", proto, address.port()),
206 tls: tls.map(|tls| tls.options),
207 ..Default::default()
208 };
209 let (sink, _) = sink
210 .build(SinkContext::default())
211 .await
212 .expect("Error building config.");
213
214 let events = make_events();
215 let events_copy = events.clone();
216 let mut output = test_util::spawn_collect_ready(
217 async move {
218 sink.run_events(events_copy).await.unwrap();
219 },
220 rx,
221 1,
222 )
223 .await;
224
225 output.sort_unstable_by_key(|event| event.as_metric().name().to_owned());
228
229 vector_lib::assert_event_data_eq!(events, output);
230 }
231
232 fn make_events() -> Vec<Event> {
233 let timestamp = || Utc::now().trunc_subsecs(3);
234 vec![
235 Metric::new(
236 "counter_1",
237 MetricKind::Absolute,
238 MetricValue::Counter { value: 42.0 },
239 )
240 .with_timestamp(Some(timestamp()))
241 .into(),
242 Metric::new(
243 "gauge_2",
244 MetricKind::Absolute,
245 MetricValue::Gauge { value: 41.0 },
246 )
247 .with_timestamp(Some(timestamp()))
248 .into(),
249 Metric::new(
250 "histogram_3",
251 MetricKind::Absolute,
252 MetricValue::AggregatedHistogram {
253 buckets: vector_lib::buckets![ 2.3 => 11, 4.2 => 85 ],
254 count: 96,
255 sum: 156.2,
256 },
257 )
258 .with_timestamp(Some(timestamp()))
259 .into(),
260 Metric::new(
261 "summary_4",
262 MetricKind::Absolute,
263 MetricValue::AggregatedSummary {
264 quantiles: vector_lib::quantiles![ 0.1 => 1.2, 0.5 => 3.6, 0.9 => 5.2 ],
265 count: 23,
266 sum: 8.6,
267 },
268 )
269 .with_timestamp(Some(timestamp()))
270 .into(),
271 ]
272 }
273
274 #[tokio::test]
279 async fn receives_metrics_duplicate_labels() {
280 let address = test_util::next_addr();
281 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
282
283 let source = PrometheusRemoteWriteConfig {
284 address,
285 auth: None,
286 tls: None,
287 acknowledgements: SourceAcknowledgementsConfig::default(),
288 keepalive: KeepaliveConfig::default(),
289 };
290 let source = source
291 .build(SourceContext::new_test(tx, None))
292 .await
293 .unwrap();
294 tokio::spawn(source);
295 wait_for_tcp(address).await;
296
297 let sink = RemoteWriteConfig {
298 endpoint: format!("http://localhost:{}/", address.port()),
299 ..Default::default()
300 };
301 let (sink, _) = sink
302 .build(SinkContext::default())
303 .await
304 .expect("Error building config.");
305
306 let timestamp = Utc::now().trunc_subsecs(3);
307
308 let events = vec![Metric::new(
309 "gauge_2",
310 MetricKind::Absolute,
311 MetricValue::Gauge { value: 41.0 },
312 )
313 .with_timestamp(Some(timestamp))
314 .with_tags(Some(metric_tags! {
315 "code" => "200".to_string(),
316 "code" => "success".to_string(),
317 }))
318 .into()];
319
320 let expected = vec![Metric::new(
321 "gauge_2",
322 MetricKind::Absolute,
323 MetricValue::Gauge { value: 41.0 },
324 )
325 .with_timestamp(Some(timestamp))
326 .with_tags(Some(metric_tags! {
327 "code" => "success".to_string(),
328 }))
329 .into()];
330
331 let output = test_util::spawn_collect_ready(
332 async move {
333 sink.run_events(events).await.unwrap();
334 },
335 rx,
336 1,
337 )
338 .await;
339
340 vector_lib::assert_event_data_eq!(expected, output);
341 }
342}
343
344#[cfg(all(test, feature = "prometheus-integration-tests"))]
345mod integration_tests {
346 use std::net::{SocketAddr, ToSocketAddrs as _};
347 use tokio::time::Duration;
348
349 use super::*;
350 use crate::test_util::components::{run_and_assert_source_compliance, HTTP_PUSH_SOURCE_TAGS};
351
352 fn source_receive_address() -> SocketAddr {
353 let address = std::env::var("REMOTE_WRITE_SOURCE_RECEIVE_ADDRESS")
354 .unwrap_or_else(|_| "127.0.0.1:9102".into());
355 address
358 .to_socket_addrs()
359 .unwrap()
360 .next()
361 .unwrap_or_else(|| panic!("Socket address {address:?} did not resolve"))
362 }
363
364 #[tokio::test]
365 async fn receive_something() {
366 let config = PrometheusRemoteWriteConfig {
374 address: source_receive_address(),
375 auth: None,
376 tls: None,
377 acknowledgements: SourceAcknowledgementsConfig::default(),
378 keepalive: KeepaliveConfig::default(),
379 };
380
381 let events = run_and_assert_source_compliance(
382 config,
383 Duration::from_secs(5),
384 &HTTP_PUSH_SOURCE_TAGS,
385 )
386 .await;
387 assert!(!events.is_empty());
388 }
389}