vector/sources/prometheus/
pushgateway.rs

1//! A metrics source that emulates the behaviour of a Prometheus Pushgateway.
2//!
3//! The protocol
4//! [is described](https://github.com/prometheus/pushgateway/blob/master/README.md)
5//! in the original Pushgateway repo, though there are some important caveats
6//! to our implementation:
7//!
8//!   - We only support `POST` requests, not `PUT`, as the semantics of `PUT`
9//!     requests in the spec aren't possible to replicate within Vector.
10//!   - We don't support protobuf requests, only the Prometheus text format.
11//!   - Only counters and histograms can be aggregated as there is no meaningful
12//!     way to aggregate gauges or summaries.
13
14use std::{collections::HashMap, net::SocketAddr};
15
16use base64::{Engine, prelude::BASE64_URL_SAFE};
17use bytes::Bytes;
18use itertools::Itertools;
19use vector_lib::{config::LogNamespace, configurable::configurable_component};
20use warp::http::HeaderMap;
21
22use super::parser;
23use crate::{
24    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
25    config::{
26        GenerateConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
27    },
28    event::Event,
29    http::KeepaliveConfig,
30    serde::bool_or_struct,
31    sources::{
32        self,
33        util::{HttpSource, http::HttpMethod},
34    },
35    tls::TlsEnableableConfig,
36};
37
38/// Configuration for the `prometheus_pushgateway` source.
39#[configurable_component(source(
40    "prometheus_pushgateway",
41    "Receive metrics via the Prometheus Pushgateway protocol."
42))]
43#[derive(Clone, Debug)]
44pub struct PrometheusPushgatewayConfig {
45    /// The socket address to accept connections on.
46    ///
47    /// The address _must_ include a port.
48    #[configurable(metadata(docs::examples = "0.0.0.0:9091"))]
49    address: SocketAddr,
50
51    #[configurable(derived)]
52    tls: Option<TlsEnableableConfig>,
53
54    #[configurable(derived)]
55    #[configurable(metadata(docs::advanced))]
56    auth: Option<HttpServerAuthConfig>,
57
58    #[configurable(derived)]
59    #[serde(default, deserialize_with = "bool_or_struct")]
60    acknowledgements: SourceAcknowledgementsConfig,
61
62    #[configurable(derived)]
63    #[serde(default)]
64    keepalive: KeepaliveConfig,
65
66    /// Whether to aggregate values across pushes.
67    ///
68    /// Only applies to counters and histograms as gauges and summaries can't be
69    /// meaningfully aggregated.
70    #[serde(default = "crate::serde::default_false")]
71    aggregate_metrics: bool,
72}
73
74impl GenerateConfig for PrometheusPushgatewayConfig {
75    fn generate_config() -> toml::Value {
76        toml::Value::try_from(Self {
77            address: "127.0.0.1:9091".parse().unwrap(),
78            tls: None,
79            auth: None,
80            acknowledgements: SourceAcknowledgementsConfig::default(),
81            aggregate_metrics: false,
82            keepalive: KeepaliveConfig::default(),
83        })
84        .unwrap()
85    }
86}
87
88#[async_trait::async_trait]
89#[typetag::serde(name = "prometheus_pushgateway")]
90impl SourceConfig for PrometheusPushgatewayConfig {
91    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
92        let source = PushgatewaySource {
93            aggregate_metrics: self.aggregate_metrics,
94        };
95        source.run(
96            self.address,
97            "",
98            HttpMethod::Post,
99            http::StatusCode::OK,
100            false,
101            self.tls.as_ref(),
102            self.auth.as_ref(),
103            cx,
104            self.acknowledgements,
105            self.keepalive.clone(),
106        )
107    }
108
109    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
110        vec![SourceOutput::new_metrics()]
111    }
112
113    fn can_acknowledge(&self) -> bool {
114        true
115    }
116}
117
118#[derive(Clone)]
119struct PushgatewaySource {
120    aggregate_metrics: bool,
121}
122
123impl PushgatewaySource {
124    const fn aggregation_enabled(&self) -> bool {
125        self.aggregate_metrics
126    }
127}
128
129impl HttpSource for PushgatewaySource {
130    fn build_events(
131        &self,
132        body: Bytes,
133        _header_map: &HeaderMap,
134        _query_parameters: &HashMap<String, String>,
135        full_path: &str,
136    ) -> Result<Vec<Event>, ErrorMessage> {
137        let body = String::from_utf8_lossy(&body);
138
139        let path_labels = parse_path_labels(full_path)?;
140
141        parser::parse_text_with_overrides(&body, path_labels, self.aggregation_enabled()).map_err(
142            |error| {
143                ErrorMessage::new(
144                    http::StatusCode::UNPROCESSABLE_ENTITY,
145                    format!("Failed to parse metrics body: {error}"),
146                )
147            },
148        )
149    }
150}
151
152fn parse_path_labels(path: &str) -> Result<Vec<(String, String)>, ErrorMessage> {
153    if !path.starts_with("/metrics/job") {
154        return Err(ErrorMessage::new(
155            http::StatusCode::BAD_REQUEST,
156            "Path must begin with '/metrics/job'".to_owned(),
157        ));
158    }
159
160    path.split('/')
161        // Skip the first two segments as they're the empty string and
162        // "metrics", which is always there as a path prefix
163        .skip(2)
164        .chunks(2)
165        .into_iter()
166        // If we get a chunk that only has 1 item, return an error
167        // The path has to be made up of key-value pairs to be valid
168        //
169        // This includes the trailing slash case (where the single item
170        // is the empty string ("") to match the real Prometheus
171        // Pushgateway
172        .map(|mut c| {
173            c.next().zip(c.next()).ok_or_else(|| {
174                ErrorMessage::new(
175                    http::StatusCode::BAD_REQUEST,
176                    "Request path must have an even number of segments to form grouping key"
177                        .to_string(),
178                )
179            })
180        })
181        // Decode any values that have been base64 encoded per the Pushgateway spec
182        //
183        // See: https://github.com/prometheus/pushgateway#url
184        .map(|res| res.and_then(|(k, v)| decode_label_pair(k, v)))
185        .collect()
186}
187
188fn decode_label_pair(k: &str, v: &str) -> Result<(String, String), ErrorMessage> {
189    // Return early if we're not dealing with a base64-encoded label
190    let Some(stripped_key) = k.strip_suffix("@base64") else {
191        return Ok((k.to_owned(), v.to_owned()));
192    };
193
194    // The Prometheus Pushgateway spec explicitly uses one or more `=` characters
195    // (the padding character in base64) to represent an empty string in a path
196    // segment:
197    //
198    // https://github.com/prometheus/pushgateway/blob/ec7afda4eef288bd9b9c43d063e4df54c8961272/README.md#url
199    //
200    // Unfortunately, the Rust base64 crate doesn't treat an encoded string that
201    // only contains padding characters as valid and returns an error.
202    //
203    // Let's handle this case manually, before handing over to the base64 decoder.
204    if v.chars().all(|c| c == '=') {
205        // An empty job label isn't valid, so return an error if that's the key
206        if stripped_key == "job" {
207            return Err(ErrorMessage::new(
208                http::StatusCode::BAD_REQUEST,
209                "Job must not have an empty value".to_owned(),
210            ));
211        }
212
213        return Ok((stripped_key.to_owned(), "".to_owned()));
214    }
215
216    // The Prometheus Pushgateway has a fairly permissive base64 implementation
217    // that allows padding to be missing. We need to fake that by adding in
218    // any missing padding before we pass the value to the base64 decoder.
219    //
220    // This is documented, as examples in their README don't use padding:
221    //
222    // https://github.com/prometheus/pushgateway/blob/ec7afda4eef288bd9b9c43d063e4df54c8961272/README.md#url
223    let missing_padding = v.len() % 4;
224    let padded_value = if missing_padding == 0 {
225        v.to_owned()
226    } else {
227        let padding = "=".repeat(missing_padding);
228        v.to_owned() + &padding
229    };
230
231    let decoded_bytes = BASE64_URL_SAFE.decode(padded_value).map_err(|_| {
232        ErrorMessage::new(
233            http::StatusCode::BAD_REQUEST,
234            format!("Grouping key invalid - invalid base64 value for key {k}: {v}"),
235        )
236    })?;
237
238    let decoded = String::from_utf8(decoded_bytes).map_err(|_| {
239        ErrorMessage::new(
240            http::StatusCode::BAD_REQUEST,
241            format!("Grouping key invalid - invalid UTF-8 in decoded base64 value for key {k}"),
242        )
243    })?;
244
245    Ok((stripped_key.to_owned(), decoded))
246}
247
248#[cfg(test)]
249mod test {
250    use chrono::{TimeZone, Timelike, Utc};
251    use vector_lib::{
252        event::{EventStatus, Metric, MetricKind, MetricValue},
253        tls::MaybeTlsSettings,
254    };
255
256    use super::*;
257    use crate::{
258        SourceSender, test_util,
259        test_util::{
260            components::{HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
261            wait_for_tcp,
262        },
263    };
264
265    fn events_to_metrics(events: Vec<Event>) -> Vec<Metric> {
266        events.into_iter().map(Event::into_metric).collect()
267    }
268
269    #[test]
270    fn generate_config() {
271        crate::test_util::test_generate_config::<PrometheusPushgatewayConfig>();
272    }
273
274    #[test]
275    fn test_parse_simple_path() {
276        let path = "/metrics/job/foo/instance/bar";
277        let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar")]
278            .into_iter()
279            .map(|(k, v)| (k.to_owned(), v.to_owned()))
280            .collect();
281        let actual = parse_path_labels(path);
282
283        assert!(actual.is_ok());
284        assert_eq!(actual.unwrap(), expected);
285    }
286
287    #[test]
288    fn test_parse_path_wrong_number_of_segments() {
289        let path = "/metrics/job/foo/instance";
290        let result = parse_path_labels(path);
291
292        assert!(result.is_err());
293        assert!(result.unwrap_err().message().contains("number of segments"));
294    }
295
296    #[test]
297    fn test_parse_path_with_base64_segment() {
298        let path = "/metrics/job/foo/instance@base64/YmFyL2Jheg==";
299        let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar/baz")]
300            .into_iter()
301            .map(|(k, v)| (k.to_owned(), v.to_owned()))
302            .collect();
303        let actual = parse_path_labels(path);
304
305        assert!(actual.is_ok());
306        assert_eq!(actual.unwrap(), expected);
307    }
308
309    #[test]
310    fn test_parse_path_with_base64_segment_missing_padding() {
311        let path = "/metrics/job/foo/instance@base64/YmFyL2Jheg";
312        let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar/baz")]
313            .into_iter()
314            .map(|(k, v)| (k.to_owned(), v.to_owned()))
315            .collect();
316        let actual = parse_path_labels(path);
317
318        assert!(actual.is_ok());
319        assert_eq!(actual.unwrap(), expected);
320    }
321
322    #[test]
323    fn test_parse_path_empty_job_name_invalid() {
324        let path = "/metrics/job@base64/=";
325        let result = parse_path_labels(path);
326
327        assert!(result.is_err());
328        assert!(result.unwrap_err().message().contains("Job must not"));
329    }
330
331    #[test]
332    fn test_parse_path_empty_path_invalid() {
333        let path = "/";
334        let result = parse_path_labels(path);
335
336        assert!(result.is_err());
337        assert!(result.unwrap_err().message().contains("Path must begin"));
338    }
339
340    // This is to ensure that the last value for a given key is the one used when we
341    // pass the grouping key into the Prometheus text parser to override label values
342    // on individual metrics
343    #[test]
344    fn test_parse_path_duplicate_labels_preserves_order() {
345        let path = "/metrics/job/foo/instance/bar/instance/baz";
346        let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar"), ("instance", "baz")]
347            .into_iter()
348            .map(|(k, v)| (k.to_owned(), v.to_owned()))
349            .collect();
350        let actual = parse_path_labels(path);
351
352        assert!(actual.is_ok());
353        assert_eq!(actual.unwrap(), expected);
354    }
355
356    #[tokio::test]
357    async fn test_whole_request_happy_path_http() {
358        whole_request_happy_path(None).await;
359    }
360
361    #[tokio::test]
362    async fn test_whole_request_happy_path_https() {
363        whole_request_happy_path(Some(TlsEnableableConfig::test_config())).await;
364    }
365    async fn whole_request_happy_path(tls: Option<TlsEnableableConfig>) {
366        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
367            let address = test_util::next_addr();
368            let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
369
370            let source = PrometheusPushgatewayConfig {
371                address,
372                auth: None,
373                tls: tls.clone(),
374                acknowledgements: SourceAcknowledgementsConfig::default(),
375                keepalive: KeepaliveConfig::default(),
376                aggregate_metrics: true,
377            };
378            let source = source
379                .build(SourceContext::new_test(tx, None))
380                .await
381                .unwrap();
382            tokio::spawn(source);
383            wait_for_tcp(address).await;
384
385            let proto = MaybeTlsSettings::from_config(tls.as_ref(), true)
386                .unwrap()
387                .http_protocol_name();
388            let push_path = "metrics/job/async_worker";
389            let push_url = format!(
390                "{}://{}:{}/{}",
391                proto,
392                address.ip(),
393                address.port(),
394                push_path
395            );
396            let push_body = r#"
397                # TYPE jobs_total counter
398                # HELP jobs_total Total number of jobs
399                jobs_total{type="a"} 1.0 1612411506789
400                # TYPE jobs_current gauge
401                # HELP jobs_current Current number of jobs
402                jobs_current{type="a"} 5.0 1612411506789
403                # TYPE jobs_distribution histogram
404                # HELP jobs_distribution Distribution of jobs
405                jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
406                jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
407                jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
408                jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
409                jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
410                jobs_distribution_sum{type="a"} 8.0 1612411506789
411                jobs_distribution_count{type="a"} 1.0 1612411506789
412                # TYPE jobs_summary summary
413                # HELP jobs_summary Summary of jobs
414                jobs_summary_sum{type="a"} 8.0 1612411506789
415                jobs_summary_count{type="a"} 1.0 1612411506789
416                "#;
417
418            let timestamp = Utc
419                .with_ymd_and_hms(2021, 2, 4, 4, 5, 6)
420                .single()
421                .and_then(|t| t.with_nanosecond(789 * 1_000_000))
422                .expect("invalid timestamp");
423
424            let expected = vec![
425                Metric::new(
426                    "jobs_total",
427                    MetricKind::Incremental,
428                    MetricValue::Counter { value: 1.0 },
429                )
430                .with_tags(Some(
431                    metric_tags! { "job" => "async_worker", "type" => "a" },
432                ))
433                .with_timestamp(Some(timestamp)),
434                Metric::new(
435                    "jobs_current",
436                    MetricKind::Absolute,
437                    MetricValue::Gauge { value: 5.0 },
438                )
439                .with_tags(Some(
440                    metric_tags! { "job" => "async_worker", "type" => "a" },
441                ))
442                .with_timestamp(Some(timestamp)),
443                Metric::new(
444                    "jobs_distribution",
445                    MetricKind::Incremental,
446                    MetricValue::AggregatedHistogram {
447                        buckets: vector_lib::buckets![
448                            1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
449                        ],
450                        count: 1,
451                        sum: 8.0,
452                    },
453                )
454                .with_tags(Some(
455                    metric_tags! { "job" => "async_worker", "type" => "a" },
456                ))
457                .with_timestamp(Some(timestamp)),
458                Metric::new(
459                    "jobs_summary",
460                    MetricKind::Absolute,
461                    MetricValue::AggregatedSummary {
462                        quantiles: vector_lib::quantiles![],
463                        count: 1,
464                        sum: 8.0,
465                    },
466                )
467                .with_tags(Some(
468                    metric_tags! { "job" => "async_worker", "type" => "a" },
469                ))
470                .with_timestamp(Some(timestamp)),
471            ];
472
473            let output = test_util::spawn_collect_ready(
474                async move {
475                    let client = reqwest::Client::builder()
476                        .danger_accept_invalid_certs(true)
477                        .build()
478                        .unwrap();
479                    client.post(push_url).body(push_body).send().await.unwrap();
480                },
481                rx,
482                1,
483            )
484            .await;
485
486            vector_lib::assert_event_data_eq!(expected, events_to_metrics(output));
487        })
488        .await;
489    }
490}