1use std::{collections::HashMap, net::SocketAddr};
15
16use base64::{Engine, prelude::BASE64_URL_SAFE};
17use bytes::Bytes;
18use itertools::Itertools;
19use vector_lib::{config::LogNamespace, configurable::configurable_component};
20use warp::http::HeaderMap;
21
22use super::parser;
23use crate::{
24 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
25 config::{
26 GenerateConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
27 },
28 event::Event,
29 http::KeepaliveConfig,
30 serde::bool_or_struct,
31 sources::{
32 self,
33 util::{HttpSource, http::HttpMethod},
34 },
35 tls::TlsEnableableConfig,
36};
37
38#[configurable_component(source(
40 "prometheus_pushgateway",
41 "Receive metrics via the Prometheus Pushgateway protocol."
42))]
43#[derive(Clone, Debug)]
44pub struct PrometheusPushgatewayConfig {
45 #[configurable(metadata(docs::examples = "0.0.0.0:9091"))]
49 address: SocketAddr,
50
51 #[configurable(derived)]
52 tls: Option<TlsEnableableConfig>,
53
54 #[configurable(derived)]
55 #[configurable(metadata(docs::advanced))]
56 auth: Option<HttpServerAuthConfig>,
57
58 #[configurable(derived)]
59 #[serde(default, deserialize_with = "bool_or_struct")]
60 acknowledgements: SourceAcknowledgementsConfig,
61
62 #[configurable(derived)]
63 #[serde(default)]
64 keepalive: KeepaliveConfig,
65
66 #[serde(default = "crate::serde::default_false")]
71 aggregate_metrics: bool,
72}
73
74impl GenerateConfig for PrometheusPushgatewayConfig {
75 fn generate_config() -> toml::Value {
76 toml::Value::try_from(Self {
77 address: "127.0.0.1:9091".parse().unwrap(),
78 tls: None,
79 auth: None,
80 acknowledgements: SourceAcknowledgementsConfig::default(),
81 aggregate_metrics: false,
82 keepalive: KeepaliveConfig::default(),
83 })
84 .unwrap()
85 }
86}
87
88#[async_trait::async_trait]
89#[typetag::serde(name = "prometheus_pushgateway")]
90impl SourceConfig for PrometheusPushgatewayConfig {
91 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
92 let source = PushgatewaySource {
93 aggregate_metrics: self.aggregate_metrics,
94 };
95 source.run(
96 self.address,
97 "",
98 HttpMethod::Post,
99 http::StatusCode::OK,
100 false,
101 self.tls.as_ref(),
102 self.auth.as_ref(),
103 cx,
104 self.acknowledgements,
105 self.keepalive.clone(),
106 )
107 }
108
109 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
110 vec![SourceOutput::new_metrics()]
111 }
112
113 fn can_acknowledge(&self) -> bool {
114 true
115 }
116}
117
118#[derive(Clone)]
119struct PushgatewaySource {
120 aggregate_metrics: bool,
121}
122
123impl PushgatewaySource {
124 const fn aggregation_enabled(&self) -> bool {
125 self.aggregate_metrics
126 }
127}
128
129impl HttpSource for PushgatewaySource {
130 fn build_events(
131 &self,
132 body: Bytes,
133 _header_map: &HeaderMap,
134 _query_parameters: &HashMap<String, String>,
135 full_path: &str,
136 ) -> Result<Vec<Event>, ErrorMessage> {
137 let body = String::from_utf8_lossy(&body);
138
139 let path_labels = parse_path_labels(full_path)?;
140
141 parser::parse_text_with_overrides(&body, path_labels, self.aggregation_enabled()).map_err(
142 |error| {
143 ErrorMessage::new(
144 http::StatusCode::UNPROCESSABLE_ENTITY,
145 format!("Failed to parse metrics body: {error}"),
146 )
147 },
148 )
149 }
150}
151
152fn parse_path_labels(path: &str) -> Result<Vec<(String, String)>, ErrorMessage> {
153 if !path.starts_with("/metrics/job") {
154 return Err(ErrorMessage::new(
155 http::StatusCode::BAD_REQUEST,
156 "Path must begin with '/metrics/job'".to_owned(),
157 ));
158 }
159
160 path.split('/')
161 .skip(2)
164 .chunks(2)
165 .into_iter()
166 .map(|mut c| {
173 c.next().zip(c.next()).ok_or_else(|| {
174 ErrorMessage::new(
175 http::StatusCode::BAD_REQUEST,
176 "Request path must have an even number of segments to form grouping key"
177 .to_string(),
178 )
179 })
180 })
181 .map(|res| res.and_then(|(k, v)| decode_label_pair(k, v)))
185 .collect()
186}
187
188fn decode_label_pair(k: &str, v: &str) -> Result<(String, String), ErrorMessage> {
189 let Some(stripped_key) = k.strip_suffix("@base64") else {
191 return Ok((k.to_owned(), v.to_owned()));
192 };
193
194 if v.chars().all(|c| c == '=') {
205 if stripped_key == "job" {
207 return Err(ErrorMessage::new(
208 http::StatusCode::BAD_REQUEST,
209 "Job must not have an empty value".to_owned(),
210 ));
211 }
212
213 return Ok((stripped_key.to_owned(), "".to_owned()));
214 }
215
216 let missing_padding = v.len() % 4;
224 let padded_value = if missing_padding == 0 {
225 v.to_owned()
226 } else {
227 let padding = "=".repeat(missing_padding);
228 v.to_owned() + &padding
229 };
230
231 let decoded_bytes = BASE64_URL_SAFE.decode(padded_value).map_err(|_| {
232 ErrorMessage::new(
233 http::StatusCode::BAD_REQUEST,
234 format!("Grouping key invalid - invalid base64 value for key {k}: {v}"),
235 )
236 })?;
237
238 let decoded = String::from_utf8(decoded_bytes).map_err(|_| {
239 ErrorMessage::new(
240 http::StatusCode::BAD_REQUEST,
241 format!("Grouping key invalid - invalid UTF-8 in decoded base64 value for key {k}"),
242 )
243 })?;
244
245 Ok((stripped_key.to_owned(), decoded))
246}
247
248#[cfg(test)]
249mod test {
250 use chrono::{TimeZone, Timelike, Utc};
251 use vector_lib::{
252 event::{EventStatus, Metric, MetricKind, MetricValue},
253 tls::MaybeTlsSettings,
254 };
255
256 use super::*;
257 use crate::{
258 SourceSender, test_util,
259 test_util::{
260 components::{HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
261 wait_for_tcp,
262 },
263 };
264
265 fn events_to_metrics(events: Vec<Event>) -> Vec<Metric> {
266 events.into_iter().map(Event::into_metric).collect()
267 }
268
269 #[test]
270 fn generate_config() {
271 crate::test_util::test_generate_config::<PrometheusPushgatewayConfig>();
272 }
273
274 #[test]
275 fn test_parse_simple_path() {
276 let path = "/metrics/job/foo/instance/bar";
277 let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar")]
278 .into_iter()
279 .map(|(k, v)| (k.to_owned(), v.to_owned()))
280 .collect();
281 let actual = parse_path_labels(path);
282
283 assert!(actual.is_ok());
284 assert_eq!(actual.unwrap(), expected);
285 }
286
287 #[test]
288 fn test_parse_path_wrong_number_of_segments() {
289 let path = "/metrics/job/foo/instance";
290 let result = parse_path_labels(path);
291
292 assert!(result.is_err());
293 assert!(result.unwrap_err().message().contains("number of segments"));
294 }
295
296 #[test]
297 fn test_parse_path_with_base64_segment() {
298 let path = "/metrics/job/foo/instance@base64/YmFyL2Jheg==";
299 let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar/baz")]
300 .into_iter()
301 .map(|(k, v)| (k.to_owned(), v.to_owned()))
302 .collect();
303 let actual = parse_path_labels(path);
304
305 assert!(actual.is_ok());
306 assert_eq!(actual.unwrap(), expected);
307 }
308
309 #[test]
310 fn test_parse_path_with_base64_segment_missing_padding() {
311 let path = "/metrics/job/foo/instance@base64/YmFyL2Jheg";
312 let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar/baz")]
313 .into_iter()
314 .map(|(k, v)| (k.to_owned(), v.to_owned()))
315 .collect();
316 let actual = parse_path_labels(path);
317
318 assert!(actual.is_ok());
319 assert_eq!(actual.unwrap(), expected);
320 }
321
322 #[test]
323 fn test_parse_path_empty_job_name_invalid() {
324 let path = "/metrics/job@base64/=";
325 let result = parse_path_labels(path);
326
327 assert!(result.is_err());
328 assert!(result.unwrap_err().message().contains("Job must not"));
329 }
330
331 #[test]
332 fn test_parse_path_empty_path_invalid() {
333 let path = "/";
334 let result = parse_path_labels(path);
335
336 assert!(result.is_err());
337 assert!(result.unwrap_err().message().contains("Path must begin"));
338 }
339
340 #[test]
344 fn test_parse_path_duplicate_labels_preserves_order() {
345 let path = "/metrics/job/foo/instance/bar/instance/baz";
346 let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar"), ("instance", "baz")]
347 .into_iter()
348 .map(|(k, v)| (k.to_owned(), v.to_owned()))
349 .collect();
350 let actual = parse_path_labels(path);
351
352 assert!(actual.is_ok());
353 assert_eq!(actual.unwrap(), expected);
354 }
355
356 #[tokio::test]
357 async fn test_whole_request_happy_path_http() {
358 whole_request_happy_path(None).await;
359 }
360
361 #[tokio::test]
362 async fn test_whole_request_happy_path_https() {
363 whole_request_happy_path(Some(TlsEnableableConfig::test_config())).await;
364 }
365 async fn whole_request_happy_path(tls: Option<TlsEnableableConfig>) {
366 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
367 let address = test_util::next_addr();
368 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
369
370 let source = PrometheusPushgatewayConfig {
371 address,
372 auth: None,
373 tls: tls.clone(),
374 acknowledgements: SourceAcknowledgementsConfig::default(),
375 keepalive: KeepaliveConfig::default(),
376 aggregate_metrics: true,
377 };
378 let source = source
379 .build(SourceContext::new_test(tx, None))
380 .await
381 .unwrap();
382 tokio::spawn(source);
383 wait_for_tcp(address).await;
384
385 let proto = MaybeTlsSettings::from_config(tls.as_ref(), true)
386 .unwrap()
387 .http_protocol_name();
388 let push_path = "metrics/job/async_worker";
389 let push_url = format!(
390 "{}://{}:{}/{}",
391 proto,
392 address.ip(),
393 address.port(),
394 push_path
395 );
396 let push_body = r#"
397 # TYPE jobs_total counter
398 # HELP jobs_total Total number of jobs
399 jobs_total{type="a"} 1.0 1612411506789
400 # TYPE jobs_current gauge
401 # HELP jobs_current Current number of jobs
402 jobs_current{type="a"} 5.0 1612411506789
403 # TYPE jobs_distribution histogram
404 # HELP jobs_distribution Distribution of jobs
405 jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
406 jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
407 jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
408 jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
409 jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
410 jobs_distribution_sum{type="a"} 8.0 1612411506789
411 jobs_distribution_count{type="a"} 1.0 1612411506789
412 # TYPE jobs_summary summary
413 # HELP jobs_summary Summary of jobs
414 jobs_summary_sum{type="a"} 8.0 1612411506789
415 jobs_summary_count{type="a"} 1.0 1612411506789
416 "#;
417
418 let timestamp = Utc
419 .with_ymd_and_hms(2021, 2, 4, 4, 5, 6)
420 .single()
421 .and_then(|t| t.with_nanosecond(789 * 1_000_000))
422 .expect("invalid timestamp");
423
424 let expected = vec![
425 Metric::new(
426 "jobs_total",
427 MetricKind::Incremental,
428 MetricValue::Counter { value: 1.0 },
429 )
430 .with_tags(Some(
431 metric_tags! { "job" => "async_worker", "type" => "a" },
432 ))
433 .with_timestamp(Some(timestamp)),
434 Metric::new(
435 "jobs_current",
436 MetricKind::Absolute,
437 MetricValue::Gauge { value: 5.0 },
438 )
439 .with_tags(Some(
440 metric_tags! { "job" => "async_worker", "type" => "a" },
441 ))
442 .with_timestamp(Some(timestamp)),
443 Metric::new(
444 "jobs_distribution",
445 MetricKind::Incremental,
446 MetricValue::AggregatedHistogram {
447 buckets: vector_lib::buckets![
448 1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
449 ],
450 count: 1,
451 sum: 8.0,
452 },
453 )
454 .with_tags(Some(
455 metric_tags! { "job" => "async_worker", "type" => "a" },
456 ))
457 .with_timestamp(Some(timestamp)),
458 Metric::new(
459 "jobs_summary",
460 MetricKind::Absolute,
461 MetricValue::AggregatedSummary {
462 quantiles: vector_lib::quantiles![],
463 count: 1,
464 sum: 8.0,
465 },
466 )
467 .with_tags(Some(
468 metric_tags! { "job" => "async_worker", "type" => "a" },
469 ))
470 .with_timestamp(Some(timestamp)),
471 ];
472
473 let output = test_util::spawn_collect_ready(
474 async move {
475 let client = reqwest::Client::builder()
476 .danger_accept_invalid_certs(true)
477 .build()
478 .unwrap();
479 client.post(push_url).body(push_body).send().await.unwrap();
480 },
481 rx,
482 1,
483 )
484 .await;
485
486 vector_lib::assert_event_data_eq!(expected, events_to_metrics(output));
487 })
488 .await;
489 }
490}