vector/sources/prometheus/
pushgateway.rs

1//! A metrics source that emulates the behaviour of a Prometheus Pushgateway.
2//!
3//! The protocol
4//! [is described](https://github.com/prometheus/pushgateway/blob/master/README.md)
5//! in the original Pushgateway repo, though there are some important caveats
6//! to our implementation:
7//!
8//!   - We only support `POST` requests, not `PUT`, as the semantics of `PUT`
9//!     requests in the spec aren't possible to replicate within Vector.
10//!   - We don't support protobuf requests, only the Prometheus text format.
11//!   - Only counters and histograms can be aggregated as there is no meaningful
12//!     way to aggregate gauges or summaries.
13
14use base64::prelude::BASE64_URL_SAFE;
15use base64::Engine;
16use std::{collections::HashMap, net::SocketAddr};
17
18use bytes::Bytes;
19use itertools::Itertools;
20use vector_lib::config::LogNamespace;
21use vector_lib::configurable::configurable_component;
22use warp::http::HeaderMap;
23
24use super::parser;
25use crate::common::http::server_auth::HttpServerAuthConfig;
26use crate::common::http::ErrorMessage;
27use crate::http::KeepaliveConfig;
28use crate::{
29    config::{
30        GenerateConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
31    },
32    event::Event,
33    serde::bool_or_struct,
34    sources::{
35        self,
36        util::{http::HttpMethod, HttpSource},
37    },
38    tls::TlsEnableableConfig,
39};
40
41/// Configuration for the `prometheus_pushgateway` source.
42#[configurable_component(source(
43    "prometheus_pushgateway",
44    "Receive metrics via the Prometheus Pushgateway protocol."
45))]
46#[derive(Clone, Debug)]
47pub struct PrometheusPushgatewayConfig {
48    /// The socket address to accept connections on.
49    ///
50    /// The address _must_ include a port.
51    #[configurable(metadata(docs::examples = "0.0.0.0:9091"))]
52    address: SocketAddr,
53
54    #[configurable(derived)]
55    tls: Option<TlsEnableableConfig>,
56
57    #[configurable(derived)]
58    #[configurable(metadata(docs::advanced))]
59    auth: Option<HttpServerAuthConfig>,
60
61    #[configurable(derived)]
62    #[serde(default, deserialize_with = "bool_or_struct")]
63    acknowledgements: SourceAcknowledgementsConfig,
64
65    #[configurable(derived)]
66    #[serde(default)]
67    keepalive: KeepaliveConfig,
68
69    /// Whether to aggregate values across pushes.
70    ///
71    /// Only applies to counters and histograms as gauges and summaries can't be
72    /// meaningfully aggregated.
73    #[serde(default = "crate::serde::default_false")]
74    aggregate_metrics: bool,
75}
76
77impl GenerateConfig for PrometheusPushgatewayConfig {
78    fn generate_config() -> toml::Value {
79        toml::Value::try_from(Self {
80            address: "127.0.0.1:9091".parse().unwrap(),
81            tls: None,
82            auth: None,
83            acknowledgements: SourceAcknowledgementsConfig::default(),
84            aggregate_metrics: false,
85            keepalive: KeepaliveConfig::default(),
86        })
87        .unwrap()
88    }
89}
90
91#[async_trait::async_trait]
92#[typetag::serde(name = "prometheus_pushgateway")]
93impl SourceConfig for PrometheusPushgatewayConfig {
94    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
95        let source = PushgatewaySource {
96            aggregate_metrics: self.aggregate_metrics,
97        };
98        source.run(
99            self.address,
100            "",
101            HttpMethod::Post,
102            http::StatusCode::OK,
103            false,
104            self.tls.as_ref(),
105            self.auth.as_ref(),
106            cx,
107            self.acknowledgements,
108            self.keepalive.clone(),
109        )
110    }
111
112    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
113        vec![SourceOutput::new_metrics()]
114    }
115
116    fn can_acknowledge(&self) -> bool {
117        true
118    }
119}
120
121#[derive(Clone)]
122struct PushgatewaySource {
123    aggregate_metrics: bool,
124}
125
126impl PushgatewaySource {
127    const fn aggregation_enabled(&self) -> bool {
128        self.aggregate_metrics
129    }
130}
131
132impl HttpSource for PushgatewaySource {
133    fn build_events(
134        &self,
135        body: Bytes,
136        _header_map: &HeaderMap,
137        _query_parameters: &HashMap<String, String>,
138        full_path: &str,
139    ) -> Result<Vec<Event>, ErrorMessage> {
140        let body = String::from_utf8_lossy(&body);
141
142        let path_labels = parse_path_labels(full_path)?;
143
144        parser::parse_text_with_overrides(&body, path_labels, self.aggregation_enabled()).map_err(
145            |error| {
146                ErrorMessage::new(
147                    http::StatusCode::UNPROCESSABLE_ENTITY,
148                    format!("Failed to parse metrics body: {error}"),
149                )
150            },
151        )
152    }
153}
154
155fn parse_path_labels(path: &str) -> Result<Vec<(String, String)>, ErrorMessage> {
156    if !path.starts_with("/metrics/job") {
157        return Err(ErrorMessage::new(
158            http::StatusCode::BAD_REQUEST,
159            "Path must begin with '/metrics/job'".to_owned(),
160        ));
161    }
162
163    path.split('/')
164        // Skip the first two segments as they're the empty string and
165        // "metrics", which is always there as a path prefix
166        .skip(2)
167        .chunks(2)
168        .into_iter()
169        // If we get a chunk that only has 1 item, return an error
170        // The path has to be made up of key-value pairs to be valid
171        //
172        // This includes the trailing slash case (where the single item
173        // is the empty string ("") to match the real Prometheus
174        // Pushgateway
175        .map(|mut c| {
176            c.next().zip(c.next()).ok_or_else(|| {
177                ErrorMessage::new(
178                    http::StatusCode::BAD_REQUEST,
179                    "Request path must have an even number of segments to form grouping key"
180                        .to_string(),
181                )
182            })
183        })
184        // Decode any values that have been base64 encoded per the Pushgateway spec
185        //
186        // See: https://github.com/prometheus/pushgateway#url
187        .map(|res| res.and_then(|(k, v)| decode_label_pair(k, v)))
188        .collect()
189}
190
191fn decode_label_pair(k: &str, v: &str) -> Result<(String, String), ErrorMessage> {
192    // Return early if we're not dealing with a base64-encoded label
193    let Some(stripped_key) = k.strip_suffix("@base64") else {
194        return Ok((k.to_owned(), v.to_owned()));
195    };
196
197    // The Prometheus Pushgateway spec explicitly uses one or more `=` characters
198    // (the padding character in base64) to represent an empty string in a path
199    // segment:
200    //
201    // https://github.com/prometheus/pushgateway/blob/ec7afda4eef288bd9b9c43d063e4df54c8961272/README.md#url
202    //
203    // Unfortunately, the Rust base64 crate doesn't treat an encoded string that
204    // only contains padding characters as valid and returns an error.
205    //
206    // Let's handle this case manually, before handing over to the base64 decoder.
207    if v.chars().all(|c| c == '=') {
208        // An empty job label isn't valid, so return an error if that's the key
209        if stripped_key == "job" {
210            return Err(ErrorMessage::new(
211                http::StatusCode::BAD_REQUEST,
212                "Job must not have an empty value".to_owned(),
213            ));
214        }
215
216        return Ok((stripped_key.to_owned(), "".to_owned()));
217    }
218
219    // The Prometheus Pushgateway has a fairly permissive base64 implementation
220    // that allows padding to be missing. We need to fake that by adding in
221    // any missing padding before we pass the value to the base64 decoder.
222    //
223    // This is documented, as examples in their README don't use padding:
224    //
225    // https://github.com/prometheus/pushgateway/blob/ec7afda4eef288bd9b9c43d063e4df54c8961272/README.md#url
226    let missing_padding = v.len() % 4;
227    let padded_value = if missing_padding == 0 {
228        v.to_owned()
229    } else {
230        let padding = "=".repeat(missing_padding);
231        v.to_owned() + &padding
232    };
233
234    let decoded_bytes = BASE64_URL_SAFE.decode(padded_value).map_err(|_| {
235        ErrorMessage::new(
236            http::StatusCode::BAD_REQUEST,
237            format!("Grouping key invalid - invalid base64 value for key {k}: {v}"),
238        )
239    })?;
240
241    let decoded = String::from_utf8(decoded_bytes).map_err(|_| {
242        ErrorMessage::new(
243            http::StatusCode::BAD_REQUEST,
244            format!("Grouping key invalid - invalid UTF-8 in decoded base64 value for key {k}"),
245        )
246    })?;
247
248    Ok((stripped_key.to_owned(), decoded))
249}
250
251#[cfg(test)]
252mod test {
253    use super::*;
254    use crate::test_util::components::{assert_source_compliance, HTTP_PUSH_SOURCE_TAGS};
255    use crate::test_util::wait_for_tcp;
256    use crate::{test_util, SourceSender};
257    use chrono::{TimeZone, Timelike, Utc};
258    use vector_lib::event::{EventStatus, Metric, MetricKind, MetricValue};
259    use vector_lib::tls::MaybeTlsSettings;
260
261    fn events_to_metrics(events: Vec<Event>) -> Vec<Metric> {
262        events.into_iter().map(Event::into_metric).collect()
263    }
264
265    #[test]
266    fn generate_config() {
267        crate::test_util::test_generate_config::<PrometheusPushgatewayConfig>();
268    }
269
270    #[test]
271    fn test_parse_simple_path() {
272        let path = "/metrics/job/foo/instance/bar";
273        let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar")]
274            .into_iter()
275            .map(|(k, v)| (k.to_owned(), v.to_owned()))
276            .collect();
277        let actual = parse_path_labels(path);
278
279        assert!(actual.is_ok());
280        assert_eq!(actual.unwrap(), expected);
281    }
282
283    #[test]
284    fn test_parse_path_wrong_number_of_segments() {
285        let path = "/metrics/job/foo/instance";
286        let result = parse_path_labels(path);
287
288        assert!(result.is_err());
289        assert!(result.unwrap_err().message().contains("number of segments"));
290    }
291
292    #[test]
293    fn test_parse_path_with_base64_segment() {
294        let path = "/metrics/job/foo/instance@base64/YmFyL2Jheg==";
295        let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar/baz")]
296            .into_iter()
297            .map(|(k, v)| (k.to_owned(), v.to_owned()))
298            .collect();
299        let actual = parse_path_labels(path);
300
301        assert!(actual.is_ok());
302        assert_eq!(actual.unwrap(), expected);
303    }
304
305    #[test]
306    fn test_parse_path_with_base64_segment_missing_padding() {
307        let path = "/metrics/job/foo/instance@base64/YmFyL2Jheg";
308        let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar/baz")]
309            .into_iter()
310            .map(|(k, v)| (k.to_owned(), v.to_owned()))
311            .collect();
312        let actual = parse_path_labels(path);
313
314        assert!(actual.is_ok());
315        assert_eq!(actual.unwrap(), expected);
316    }
317
318    #[test]
319    fn test_parse_path_empty_job_name_invalid() {
320        let path = "/metrics/job@base64/=";
321        let result = parse_path_labels(path);
322
323        assert!(result.is_err());
324        assert!(result.unwrap_err().message().contains("Job must not"));
325    }
326
327    #[test]
328    fn test_parse_path_empty_path_invalid() {
329        let path = "/";
330        let result = parse_path_labels(path);
331
332        assert!(result.is_err());
333        assert!(result.unwrap_err().message().contains("Path must begin"));
334    }
335
336    // This is to ensure that the last value for a given key is the one used when we
337    // pass the grouping key into the Prometheus text parser to override label values
338    // on individual metrics
339    #[test]
340    fn test_parse_path_duplicate_labels_preserves_order() {
341        let path = "/metrics/job/foo/instance/bar/instance/baz";
342        let expected: Vec<_> = vec![("job", "foo"), ("instance", "bar"), ("instance", "baz")]
343            .into_iter()
344            .map(|(k, v)| (k.to_owned(), v.to_owned()))
345            .collect();
346        let actual = parse_path_labels(path);
347
348        assert!(actual.is_ok());
349        assert_eq!(actual.unwrap(), expected);
350    }
351
352    #[tokio::test]
353    async fn test_whole_request_happy_path_http() {
354        whole_request_happy_path(None).await;
355    }
356
357    #[tokio::test]
358    async fn test_whole_request_happy_path_https() {
359        whole_request_happy_path(Some(TlsEnableableConfig::test_config())).await;
360    }
361    async fn whole_request_happy_path(tls: Option<TlsEnableableConfig>) {
362        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
363            let address = test_util::next_addr();
364            let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
365
366            let source = PrometheusPushgatewayConfig {
367                address,
368                auth: None,
369                tls: tls.clone(),
370                acknowledgements: SourceAcknowledgementsConfig::default(),
371                keepalive: KeepaliveConfig::default(),
372                aggregate_metrics: true,
373            };
374            let source = source
375                .build(SourceContext::new_test(tx, None))
376                .await
377                .unwrap();
378            tokio::spawn(source);
379            wait_for_tcp(address).await;
380
381            let proto = MaybeTlsSettings::from_config(tls.as_ref(), true)
382                .unwrap()
383                .http_protocol_name();
384            let push_path = "metrics/job/async_worker";
385            let push_url = format!(
386                "{}://{}:{}/{}",
387                proto,
388                address.ip(),
389                address.port(),
390                push_path
391            );
392            let push_body = r#"
393                # TYPE jobs_total counter
394                # HELP jobs_total Total number of jobs
395                jobs_total{type="a"} 1.0 1612411506789
396                # TYPE jobs_current gauge
397                # HELP jobs_current Current number of jobs
398                jobs_current{type="a"} 5.0 1612411506789
399                # TYPE jobs_distribution histogram
400                # HELP jobs_distribution Distribution of jobs
401                jobs_distribution_bucket{type="a",le="1"} 0.0 1612411506789
402                jobs_distribution_bucket{type="a",le="2.5"} 0.0 1612411506789
403                jobs_distribution_bucket{type="a",le="5"} 0.0 1612411506789
404                jobs_distribution_bucket{type="a",le="10"} 1.0 1612411506789
405                jobs_distribution_bucket{type="a",le="+Inf"} 1.0 1612411506789
406                jobs_distribution_sum{type="a"} 8.0 1612411506789
407                jobs_distribution_count{type="a"} 1.0 1612411506789
408                # TYPE jobs_summary summary
409                # HELP jobs_summary Summary of jobs
410                jobs_summary_sum{type="a"} 8.0 1612411506789
411                jobs_summary_count{type="a"} 1.0 1612411506789
412                "#;
413
414            let timestamp = Utc
415                .with_ymd_and_hms(2021, 2, 4, 4, 5, 6)
416                .single()
417                .and_then(|t| t.with_nanosecond(789 * 1_000_000))
418                .expect("invalid timestamp");
419
420            let expected = vec![
421                Metric::new(
422                    "jobs_total",
423                    MetricKind::Incremental,
424                    MetricValue::Counter { value: 1.0 },
425                )
426                .with_tags(Some(
427                    metric_tags! { "job" => "async_worker", "type" => "a" },
428                ))
429                .with_timestamp(Some(timestamp)),
430                Metric::new(
431                    "jobs_current",
432                    MetricKind::Absolute,
433                    MetricValue::Gauge { value: 5.0 },
434                )
435                .with_tags(Some(
436                    metric_tags! { "job" => "async_worker", "type" => "a" },
437                ))
438                .with_timestamp(Some(timestamp)),
439                Metric::new(
440                    "jobs_distribution",
441                    MetricKind::Incremental,
442                    MetricValue::AggregatedHistogram {
443                        buckets: vector_lib::buckets![
444                            1.0 => 0, 2.5 => 0, 5.0 => 0, 10.0 => 1
445                        ],
446                        count: 1,
447                        sum: 8.0,
448                    },
449                )
450                .with_tags(Some(
451                    metric_tags! { "job" => "async_worker", "type" => "a" },
452                ))
453                .with_timestamp(Some(timestamp)),
454                Metric::new(
455                    "jobs_summary",
456                    MetricKind::Absolute,
457                    MetricValue::AggregatedSummary {
458                        quantiles: vector_lib::quantiles![],
459                        count: 1,
460                        sum: 8.0,
461                    },
462                )
463                .with_tags(Some(
464                    metric_tags! { "job" => "async_worker", "type" => "a" },
465                ))
466                .with_timestamp(Some(timestamp)),
467            ];
468
469            let output = test_util::spawn_collect_ready(
470                async move {
471                    let client = reqwest::Client::builder()
472                        .danger_accept_invalid_certs(true)
473                        .build()
474                        .unwrap();
475                    client.post(push_url).body(push_body).send().await.unwrap();
476                },
477                rx,
478                1,
479            )
480            .await;
481
482            vector_lib::assert_event_data_eq!(expected, events_to_metrics(output));
483        })
484        .await;
485    }
486}