1use base64::prelude::BASE64_URL_SAFE;
15use base64::Engine;
16use std::{collections::HashMap, net::SocketAddr};
17
18use bytes::Bytes;
19use itertools::Itertools;
20use vector_lib::config::LogNamespace;
21use vector_lib::configurable::configurable_component;
22use warp::http::HeaderMap;
23
24use super::parser;
25use crate::common::http::server_auth::HttpServerAuthConfig;
26use crate::common::http::ErrorMessage;
27use crate::http::KeepaliveConfig;
28use crate::{
29 config::{
30 GenerateConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
31 },
32 event::Event,
33 serde::bool_or_struct,
34 sources::{
35 self,
36 util::{http::HttpMethod, HttpSource},
37 },
38 tls::TlsEnableableConfig,
39};
40
41#[configurable_component(source(
43 "prometheus_pushgateway",
44 "Receive metrics via the Prometheus Pushgateway protocol."
45))]
46#[derive(Clone, Debug)]
47pub struct PrometheusPushgatewayConfig {
48 #[configurable(metadata(docs::examples = "0.0.0.0:9091"))]
52 address: SocketAddr,
53
54 #[configurable(derived)]
55 tls: Option<TlsEnableableConfig>,
56
57 #[configurable(derived)]
58 #[configurable(metadata(docs::advanced))]
59 auth: Option<HttpServerAuthConfig>,
60
61 #[configurable(derived)]
62 #[serde(default, deserialize_with = "bool_or_struct")]
63 acknowledgements: SourceAcknowledgementsConfig,
64
65 #[configurable(derived)]
66 #[serde(default)]
67 keepalive: KeepaliveConfig,
68
69 #[serde(default = "crate::serde::default_false")]
74 aggregate_metrics: bool,
75}
76
77impl GenerateConfig for PrometheusPushgatewayConfig {
78 fn generate_config() -> toml::Value {
79 toml::Value::try_from(Self {
80 address: "127.0.0.1:9091".parse().unwrap(),
81 tls: None,
82 auth: None,
83 acknowledgements: SourceAcknowledgementsConfig::default(),
84 aggregate_metrics: false,
85 keepalive: KeepaliveConfig::default(),
86 })
87 .unwrap()
88 }
89}
90
91#[async_trait::async_trait]
92#[typetag::serde(name = "prometheus_pushgateway")]
93impl SourceConfig for PrometheusPushgatewayConfig {
94 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
95 let source = PushgatewaySource {
96 aggregate_metrics: self.aggregate_metrics,
97 };
98 source.run(
99 self.address,
100 "",
101 HttpMethod::Post,
102 http::StatusCode::OK,
103 false,
104 self.tls.as_ref(),
105 self.auth.as_ref(),
106 cx,
107 self.acknowledgements,
108 self.keepalive.clone(),
109 )
110 }
111
112 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
113 vec![SourceOutput::new_metrics()]
114 }
115
116 fn can_acknowledge(&self) -> bool {
117 true
118 }
119}
120
121#[derive(Clone)]
122struct PushgatewaySource {
123 aggregate_metrics: bool,
124}
125
126impl PushgatewaySource {
127 const fn aggregation_enabled(&self) -> bool {
128 self.aggregate_metrics
129 }
130}
131
132impl HttpSource for PushgatewaySource {
133 fn build_events(
134 &self,
135 body: Bytes,
136 _header_map: &HeaderMap,
137 _query_parameters: &HashMap<String, String>,
138 full_path: &str,
139 ) -> Result<Vec<Event>, ErrorMessage> {
140 let body = String::from_utf8_lossy(&body);
141
142 let path_labels = parse_path_labels(full_path)?;
143
144 parser::parse_text_with_overrides(&body, path_labels, self.aggregation_enabled()).map_err(
145 |error| {
146 ErrorMessage::new(
147 http::StatusCode::UNPROCESSABLE_ENTITY,
148 format!("Failed to parse metrics body: {error}"),
149 )
150 },
151 )
152 }
153}
154
155fn parse_path_labels(path: &str) -> Result<Vec<(String, String)>, ErrorMessage> {
156 if !path.starts_with("/metrics/job") {
157 return Err(ErrorMessage::new(
158 http::StatusCode::BAD_REQUEST,
159 "Path must begin with '/metrics/job'".to_owned(),
160 ));
161 }
162
163 path.split('/')
164 .skip(2)
167 .chunks(2)
168 .into_iter()
169 .map(|mut c| {
176 c.next().zip(c.next()).ok_or_else(|| {
177 ErrorMessage::new(
178 http::StatusCode::BAD_REQUEST,
179 "Request path must have an even number of segments to form grouping key"
180 .to_string(),
181 )
182 })
183 })
184 .map(|res| res.and_then(|(k, v)| decode_label_pair(k, v)))
188 .collect()
189}
190
191fn decode_label_pair(k: &str, v: &str) -> Result<(String, String), ErrorMessage> {
192 let Some(stripped_key) = k.strip_suffix("@base64") else {
194 return Ok((k.to_owned(), v.to_owned()));
195 };
196
197 if v.chars().all(|c| c == '=') {
208 if stripped_key == "job" {
210 return Err(ErrorMessage::new(
211 http::StatusCode::BAD_REQUEST,
212 "Job must not have an empty value".to_owned(),
213 ));
214 }
215
216 return Ok((stripped_key.to_owned(), "".to_owned()));
217 }
218
219 let missing_padding = v.len() % 4;
227 let padded_value = if missing_padding == 0 {
228 v.to_owned()
229 } else {
230 let padding = "=".repeat(missing_padding);
231 v.to_owned() + &padding
232 };
233
234 let decoded_bytes = BASE64_URL_SAFE.decode(padded_value).map_err(|_| {
235 ErrorMessage::new(
236 http::StatusCode::BAD_REQUEST,
237 format!("Grouping key invalid - invalid base64 value for key {k}: {v}"),
238 )
239 })?;
240
241 let decoded = String::from_utf8(decoded_bytes).map_err(|_| {
242 ErrorMessage::new(
243 http::StatusCode::BAD_REQUEST,
244 format!("Grouping key invalid - invalid UTF-8 in decoded base64 value for key {k}"),
245 )
246 })?;
247
248 Ok((stripped_key.to_owned(), decoded))
249}
250
251#[cfg(test)]
252mod test {
253 use super::*;
254 use crate::test_util::components::{assert_source_compliance, HTTP_PUSH_SOURCE_TAGS};
255 use crate::test_util::wait_for_tcp;
256 use crate::{test_util, SourceSender};
257 use chrono::{TimeZone, Timelike, Utc};
258 use vector_lib::event::{EventStatus, Metric, MetricKind, MetricValue};
259 use vector_lib::tls::MaybeTlsSettings;
260
261 fn events_to_metrics(events: Vec<Event>) -> Vec<Metric> {
262 events.into_iter().map(Event::into_metric).collect()
263 }
264
265 #[test]
266 fn generate_config() {
267 crate::test_util::test_generate_config::<PrometheusPushgatewayConfig>();
268 }
269
270 #[test]
271 fn test_parse_simple_path() {
272 let path = "/metrics/job/foo/instance/bar";
273 let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar")]
274 .into_iter()
275 .map(|(k, v)| (k.to_owned(), v.to_owned()))
276 .collect();
277 let actual = parse_path_labels(path);
278
279 assert!(actual.is_ok());
280 assert_eq!(actual.unwrap(), expected);
281 }
282
283 #[test]
284 fn test_parse_path_wrong_number_of_segments() {
285 let path = "/metrics/job/foo/instance";
286 let result = parse_path_labels(path);
287
288 assert!(result.is_err());
289 assert!(result.unwrap_err().message().contains("number of segments"));
290 }
291
292 #[test]
293 fn test_parse_path_with_base64_segment() {
294 let path = "/metrics/job/foo/instance@base64/YmFyL2Jheg==";
295 let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar/baz")]
296 .into_iter()
297 .map(|(k, v)| (k.to_owned(), v.to_owned()))
298 .collect();
299 let actual = parse_path_labels(path);
300
301 assert!(actual.is_ok());
302 assert_eq!(actual.unwrap(), expected);
303 }
304
305 #[test]
306 fn test_parse_path_with_base64_segment_missing_padding() {
307 let path = "/metrics/job/foo/instance@base64/YmFyL2Jheg";
308 let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar/baz")]
309 .into_iter()
310 .map(|(k, v)| (k.to_owned(), v.to_owned()))
311 .collect();
312 let actual = parse_path_labels(path);
313
314 assert!(actual.is_ok());
315 assert_eq!(actual.unwrap(), expected);
316 }
317
318 #[test]
319 fn test_parse_path_empty_job_name_invalid() {
320 let path = "/metrics/job@base64/=";
321 let result = parse_path_labels(path);
322
323 assert!(result.is_err());
324 assert!(result.unwrap_err().message().contains("Job must not"));
325 }
326
327 #[test]
328 fn test_parse_path_empty_path_invalid() {
329 let path = "/";
330 let result = parse_path_labels(path);
331
332 assert!(result.is_err());
333 assert!(result.unwrap_err().message().contains("Path must begin"));
334 }
335
336 #[test]
340 fn test_parse_path_duplicate_labels_preserves_order() {
341 let path = "/metrics/job/foo/instance/bar/instance/baz";
342 let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar"), ("instance", "baz")]
343 .into_iter()
344 .map(|(k, v)| (k.to_owned(), v.to_owned()))
345 .collect();
346 let actual = parse_path_labels(path);
347
348 assert!(actual.is_ok());
349 assert_eq!(actual.unwrap(), expected);
350 }
351
352 #[tokio::test]
353 async fn test_whole_request_happy_path_http() {
354 whole_request_happy_path(None).await;
355 }
356
357 #[tokio::test]
358 async fn test_whole_request_happy_path_https() {
359 whole_request_happy_path(Some(TlsEnableableConfig::test_config())).await;
360 }
361 async fn whole_request_happy_path(tls: Option<TlsEnableableConfig>) {
362 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
363 let address = test_util::next_addr();
364 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
365
366 let source = PrometheusPushgatewayConfig {
367 address,
368 auth: None,
369 tls: tls.clone(),
370 acknowledgements: SourceAcknowledgementsConfig::default(),
371 keepalive: KeepaliveConfig::default(),
372 aggregate_metrics: true,
373 };
374 let source = source
375 .build(SourceContext::new_test(tx, None))
376 .await
377 .unwrap();
378 tokio::spawn(source);
379 wait_for_tcp(address).await;
380
381 let proto = MaybeTlsSettings::from_config(tls.as_ref(), true)
382 .unwrap()
383 .http_protocol_name();
384 let push_path = "metrics/job/async_worker";
385 let push_url = format!(
386 "{}://{}:{}/{}",
387 proto,
388 address.ip(),
389 address.port(),
390 push_path
391 );
392 let push_body = r#"
393 # TYPE jobs_total counter
394 # HELP jobs_total Total number of jobs
395 jobs_total{type="a"} 1.0 1612411506789
396 # TYPE jobs_current gauge
397 # HELP jobs_current Current number of jobs
398 jobs_current{type="a"} 5.0 1612411506789
399 # TYPE jobs_distribution histogram
400 # HELP jobs_distribution Distribution of jobs
401 jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
402 jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
403 jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
404 jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
405 jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
406 jobs_distribution_sum{type="a"} 8.0 1612411506789
407 jobs_distribution_count{type="a"} 1.0 1612411506789
408 # TYPE jobs_summary summary
409 # HELP jobs_summary Summary of jobs
410 jobs_summary_sum{type="a"} 8.0 1612411506789
411 jobs_summary_count{type="a"} 1.0 1612411506789
412 "#;
413
414 let timestamp = Utc
415 .with_ymd_and_hms(2021, 2, 4, 4, 5, 6)
416 .single()
417 .and_then(|t| t.with_nanosecond(789 * 1_000_000))
418 .expect("invalid timestamp");
419
420 let expected = vec![
421 Metric::new(
422 "jobs_total",
423 MetricKind::Incremental,
424 MetricValue::Counter { value: 1.0 },
425 )
426 .with_tags(Some(
427 metric_tags! { "job" => "async_worker", "type" => "a" },
428 ))
429 .with_timestamp(Some(timestamp)),
430 Metric::new(
431 "jobs_current",
432 MetricKind::Absolute,
433 MetricValue::Gauge { value: 5.0 },
434 )
435 .with_tags(Some(
436 metric_tags! { "job" => "async_worker", "type" => "a" },
437 ))
438 .with_timestamp(Some(timestamp)),
439 Metric::new(
440 "jobs_distribution",
441 MetricKind::Incremental,
442 MetricValue::AggregatedHistogram {
443 buckets: vector_lib::buckets![
444 1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
445 ],
446 count: 1,
447 sum: 8.0,
448 },
449 )
450 .with_tags(Some(
451 metric_tags! { "job" => "async_worker", "type" => "a" },
452 ))
453 .with_timestamp(Some(timestamp)),
454 Metric::new(
455 "jobs_summary",
456 MetricKind::Absolute,
457 MetricValue::AggregatedSummary {
458 quantiles: vector_lib::quantiles![],
459 count: 1,
460 sum: 8.0,
461 },
462 )
463 .with_tags(Some(
464 metric_tags! { "job" => "async_worker", "type" => "a" },
465 ))
466 .with_timestamp(Some(timestamp)),
467 ];
468
469 let output = test_util::spawn_collect_ready(
470 async move {
471 let client = reqwest::Client::builder()
472 .danger_accept_invalid_certs(true)
473 .build()
474 .unwrap();
475 client.post(push_url).body(push_body).send().await.unwrap();
476 },
477 rx,
478 1,
479 )
480 .await;
481
482 vector_lib::assert_event_data_eq!(expected, events_to_metrics(output));
483 })
484 .await;
485 }
486}