vector/sources/prometheus/
scrape.rs

1use std::{collections::HashMap, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::{Uri, response::Parts};
6use serde_with::serde_as;
7use snafu::ResultExt;
8use vector_lib::{config::LogNamespace, configurable::configurable_component, event::Event};
9
10use super::parser;
11use crate::{
12    Result,
13    config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
14    http::{Auth, QueryParameters},
15    internal_events::PrometheusParseError,
16    sources::{
17        self,
18        util::{
19            http::HttpMethod,
20            http_client::{
21                GenericHttpClientInputs, HttpClientBuilder, HttpClientContext, build_url, call,
22                default_interval, default_timeout, warn_if_interval_too_low,
23            },
24        },
25    },
26    tls::{TlsConfig, TlsSettings},
27};
28
29// pulled up, and split over multiple lines, because the long lines trip up rustfmt such that it
30// gave up trying to format, but reported no error
31static PARSE_ERROR_NO_PATH: &str = "No path is set on the endpoint and we got a parse error,\
32                                    did you mean to use /metrics? This behavior changed in version 0.11.";
33static NOT_FOUND_NO_PATH: &str = "No path is set on the endpoint and we got a 404,\
34                                  did you mean to use /metrics?\
35                                  This behavior changed in version 0.11.";
36
37/// Configuration for the `prometheus_scrape` source.
38#[serde_as]
39#[configurable_component(source(
40    "prometheus_scrape",
41    "Collect metrics from Prometheus exporters."
42))]
43#[derive(Clone, Debug)]
44pub struct PrometheusScrapeConfig {
45    /// Endpoints to scrape metrics from.
46    #[configurable(metadata(docs::examples = "http://localhost:9090/metrics"))]
47    #[serde(alias = "hosts")]
48    endpoints: Vec<String>,
49
50    /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
51    /// than the interval a new scrape will be started. This can take extra resources, set the timeout
52    /// to a value lower than the scrape interval to prevent this from happening.
53    #[serde(default = "default_interval")]
54    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
55    #[serde(rename = "scrape_interval_secs")]
56    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
57    interval: Duration,
58
59    /// The timeout for each scrape request.
60    #[serde(default = "default_timeout")]
61    #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
62    #[serde(rename = "scrape_timeout_secs")]
63    #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
64    timeout: Duration,
65
66    /// The tag name added to each event representing the scraped instance's `host:port`.
67    ///
68    /// The tag value is the host and port of the scraped instance.
69    #[configurable(metadata(docs::advanced))]
70    instance_tag: Option<String>,
71
72    /// The tag name added to each event representing the scraped instance's endpoint.
73    ///
74    /// The tag value is the endpoint of the scraped instance.
75    #[configurable(metadata(docs::advanced))]
76    endpoint_tag: Option<String>,
77
78    /// Controls how tag conflicts are handled if the scraped source has tags to be added.
79    ///
80    /// If `true`, the new tag is not added if the scraped metric has the tag already. If `false`, the conflicting tag
81    /// is renamed by prepending `exported_` to the original name.
82    ///
83    /// This matches Prometheus’ `honor_labels` configuration.
84    #[serde(default = "crate::serde::default_false")]
85    #[configurable(metadata(docs::advanced))]
86    honor_labels: bool,
87
88    /// Custom parameters for the scrape request query string.
89    ///
90    /// One or more values for the same parameter key can be provided. The parameters provided in this option are
91    /// appended to any parameters manually provided in the `endpoints` option. This option is especially useful when
92    /// scraping the `/federate` endpoint.
93    #[serde(default)]
94    #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
95    #[configurable(metadata(docs::examples = "query_example()"))]
96    query: QueryParameters,
97
98    #[configurable(derived)]
99    tls: Option<TlsConfig>,
100
101    #[configurable(derived)]
102    #[configurable(metadata(docs::advanced))]
103    auth: Option<Auth>,
104}
105
106fn query_example() -> serde_json::Value {
107    serde_json::json! ({
108        "match[]": [
109            "{job=\"somejob\"}",
110            "{__name__=~\"job:.*\"}"
111        ]
112    })
113}
114
115impl GenerateConfig for PrometheusScrapeConfig {
116    fn generate_config() -> toml::Value {
117        toml::Value::try_from(Self {
118            endpoints: vec!["http://localhost:9090/metrics".to_string()],
119            interval: default_interval(),
120            timeout: default_timeout(),
121            instance_tag: Some("instance".to_string()),
122            endpoint_tag: Some("endpoint".to_string()),
123            honor_labels: false,
124            query: HashMap::new(),
125            tls: None,
126            auth: None,
127        })
128        .unwrap()
129    }
130}
131
132#[async_trait::async_trait]
133#[typetag::serde(name = "prometheus_scrape")]
134impl SourceConfig for PrometheusScrapeConfig {
135    async fn build(&self, cx: SourceContext) -> Result<sources::Source> {
136        let urls = self
137            .endpoints
138            .iter()
139            .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
140            .map(|r| r.map(|uri| build_url(&uri, &self.query)))
141            .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
142        let tls = TlsSettings::from_options(self.tls.as_ref())?;
143
144        let builder = PrometheusScrapeBuilder {
145            honor_labels: self.honor_labels,
146            instance_tag: self.instance_tag.clone(),
147            endpoint_tag: self.endpoint_tag.clone(),
148        };
149
150        warn_if_interval_too_low(self.timeout, self.interval);
151
152        let inputs = GenericHttpClientInputs {
153            urls,
154            interval: self.interval,
155            timeout: self.timeout,
156            headers: HashMap::new(),
157            content_type: "text/plain".to_string(),
158            auth: self.auth.clone(),
159            tls,
160            proxy: cx.proxy.clone(),
161            shutdown: cx.shutdown,
162        };
163
164        Ok(call(inputs, builder, cx.out, HttpMethod::Get).boxed())
165    }
166
167    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
168        vec![SourceOutput::new_metrics()]
169    }
170
171    fn can_acknowledge(&self) -> bool {
172        false
173    }
174}
175
176// InstanceInfo stores the scraped instance info and the tag to insert into the log event with. It
177// is used to join these two pieces of info to avoid storing the instance if instance_tag is not
178// configured
179#[derive(Clone)]
180struct InstanceInfo {
181    tag: String,
182    instance: String,
183    honor_label: bool,
184}
185
186// EndpointInfo stores the scraped endpoint info and the tag to insert into the log event with. It
187// is used to join these two pieces of info to avoid storing the endpoint if endpoint_tag is not
188// configured
189#[derive(Clone)]
190struct EndpointInfo {
191    tag: String,
192    endpoint: String,
193    honor_label: bool,
194}
195
196/// Captures the configuration options required to build request-specific context.
197#[derive(Clone)]
198struct PrometheusScrapeBuilder {
199    honor_labels: bool,
200    instance_tag: Option<String>,
201    endpoint_tag: Option<String>,
202}
203
204impl HttpClientBuilder for PrometheusScrapeBuilder {
205    type Context = PrometheusScrapeContext;
206
207    /// Expands the context with the instance info and endpoint info for the current request.
208    fn build(&self, url: &Uri) -> Self::Context {
209        let instance_info = self.instance_tag.as_ref().map(|tag| {
210            let instance = format!(
211                "{}:{}",
212                url.host().unwrap_or_default(),
213                url.port_u16().unwrap_or_else(|| match url.scheme() {
214                    Some(scheme) if scheme == &http::uri::Scheme::HTTP => 80,
215                    Some(scheme) if scheme == &http::uri::Scheme::HTTPS => 443,
216                    _ => 0,
217                })
218            );
219            InstanceInfo {
220                tag: tag.to_string(),
221                instance,
222                honor_label: self.honor_labels,
223            }
224        });
225        let endpoint_info = self.endpoint_tag.as_ref().map(|tag| EndpointInfo {
226            tag: tag.to_string(),
227            endpoint: url.to_string(),
228            honor_label: self.honor_labels,
229        });
230        PrometheusScrapeContext {
231            instance_info,
232            endpoint_info,
233        }
234    }
235}
236
237/// Request-specific context required for decoding into events.
238struct PrometheusScrapeContext {
239    instance_info: Option<InstanceInfo>,
240    endpoint_info: Option<EndpointInfo>,
241}
242
243impl HttpClientContext for PrometheusScrapeContext {
244    fn enrich_events(&mut self, events: &mut Vec<Event>) {
245        for event in events.iter_mut() {
246            let metric = event.as_mut_metric();
247            if let Some(InstanceInfo {
248                tag,
249                instance,
250                honor_label,
251            }) = &self.instance_info
252            {
253                match (honor_label, metric.tag_value(tag)) {
254                    (false, Some(old_instance)) => {
255                        metric.replace_tag(format!("exported_{tag}"), old_instance);
256                        metric.replace_tag(tag.clone(), instance.clone());
257                    }
258                    (true, Some(_)) => {}
259                    (_, None) => {
260                        metric.replace_tag(tag.clone(), instance.clone());
261                    }
262                }
263            }
264            if let Some(EndpointInfo {
265                tag,
266                endpoint,
267                honor_label,
268            }) = &self.endpoint_info
269            {
270                match (honor_label, metric.tag_value(tag)) {
271                    (false, Some(old_endpoint)) => {
272                        metric.replace_tag(format!("exported_{tag}"), old_endpoint);
273                        metric.replace_tag(tag.clone(), endpoint.clone());
274                    }
275                    (true, Some(_)) => {}
276                    (_, None) => {
277                        metric.replace_tag(tag.clone(), endpoint.clone());
278                    }
279                }
280            }
281        }
282    }
283
284    /// Parses the Prometheus HTTP response into metric events
285    fn on_response(&mut self, url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
286        let body = String::from_utf8_lossy(body);
287
288        match parser::parse_text(&body) {
289            Ok(events) => Some(events),
290            Err(error) => {
291                if url.path() == "/" {
292                    // https://github.com/vectordotdev/vector/pull/3801#issuecomment-700723178
293                    warn!(
294                        message = PARSE_ERROR_NO_PATH,
295                        endpoint = %url,
296                    );
297                }
298                emit!(PrometheusParseError {
299                    error,
300                    url: url.clone(),
301                    body,
302                });
303                None
304            }
305        }
306    }
307
308    fn on_http_response_error(&self, url: &Uri, header: &Parts) {
309        if header.status == hyper::StatusCode::NOT_FOUND && url.path() == "/" {
310            // https://github.com/vectordotdev/vector/pull/3801#issuecomment-700723178
311            warn!(
312                message = NOT_FOUND_NO_PATH,
313                endpoint = %url,
314            );
315        }
316    }
317}
318
319#[cfg(all(test, feature = "sinks-prometheus"))]
320mod test {
321    use http_body::Body as _;
322    use hyper::{
323        Body, Client, Response, Server,
324        service::{make_service_fn, service_fn},
325    };
326    use similar_asserts::assert_eq;
327    use tokio::time::{Duration, sleep};
328    use warp::Filter;
329
330    use super::*;
331    use crate::{
332        Error, config,
333        http::{ParameterValue, QueryParameterValue},
334        sinks::prometheus::exporter::PrometheusExporterConfig,
335        test_util::{
336            addr::next_addr,
337            components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
338            start_topology, trace_init, wait_for_tcp,
339        },
340    };
341
342    #[test]
343    fn generate_config() {
344        crate::test_util::test_generate_config::<PrometheusScrapeConfig>();
345    }
346
347    #[tokio::test]
348    async fn test_prometheus_sets_headers() {
349        let (_guard, in_addr) = next_addr();
350
351        let dummy_endpoint = warp::path!("metrics").and(warp::header::exact("Accept", "text/plain")).map(|| {
352            r#"
353                    promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
354                    "#
355        });
356
357        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
358        wait_for_tcp(in_addr).await;
359
360        let config = PrometheusScrapeConfig {
361            endpoints: vec![format!("http://{}/metrics", in_addr)],
362            interval: Duration::from_secs(1),
363            timeout: default_timeout(),
364            instance_tag: Some("instance".to_string()),
365            endpoint_tag: Some("endpoint".to_string()),
366            honor_labels: true,
367            query: HashMap::new(),
368            auth: None,
369            tls: None,
370        };
371
372        let events = run_and_assert_source_compliance(
373            config,
374            Duration::from_secs(3),
375            &HTTP_PULL_SOURCE_TAGS,
376        )
377        .await;
378        assert!(!events.is_empty());
379    }
380
381    #[tokio::test]
382    async fn test_prometheus_honor_labels() {
383        let (_guard, in_addr) = next_addr();
384
385        let dummy_endpoint = warp::path!("metrics").map(|| {
386                r#"
387                    promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
388                    "#
389        });
390
391        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
392        wait_for_tcp(in_addr).await;
393
394        let config = PrometheusScrapeConfig {
395            endpoints: vec![format!("http://{}/metrics", in_addr)],
396            interval: Duration::from_secs(1),
397            timeout: default_timeout(),
398            instance_tag: Some("instance".to_string()),
399            endpoint_tag: Some("endpoint".to_string()),
400            honor_labels: true,
401            query: HashMap::new(),
402            auth: None,
403            tls: None,
404        };
405
406        let events = run_and_assert_source_compliance(
407            config,
408            Duration::from_secs(3),
409            &HTTP_PULL_SOURCE_TAGS,
410        )
411        .await;
412        assert!(!events.is_empty());
413
414        let metrics: Vec<_> = events
415            .into_iter()
416            .map(|event| event.into_metric())
417            .collect();
418
419        for metric in metrics {
420            assert_eq!(
421                metric.tag_value("instance"),
422                Some(String::from("localhost:9999"))
423            );
424            assert_eq!(
425                metric.tag_value("endpoint"),
426                Some(String::from("http://example.com"))
427            );
428            assert_eq!(metric.tag_value("exported_instance"), None,);
429            assert_eq!(metric.tag_value("exported_endpoint"), None,);
430        }
431    }
432
433    #[tokio::test]
434    async fn test_prometheus_do_not_honor_labels() {
435        let (_guard, in_addr) = next_addr();
436
437        let dummy_endpoint = warp::path!("metrics").map(|| {
438                r#"
439                    promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
440                "#
441        });
442
443        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
444        wait_for_tcp(in_addr).await;
445
446        let config = PrometheusScrapeConfig {
447            endpoints: vec![format!("http://{}/metrics", in_addr)],
448            interval: Duration::from_secs(1),
449            timeout: default_timeout(),
450            instance_tag: Some("instance".to_string()),
451            endpoint_tag: Some("endpoint".to_string()),
452            honor_labels: false,
453            query: HashMap::new(),
454            auth: None,
455            tls: None,
456        };
457
458        let events = run_and_assert_source_compliance(
459            config,
460            Duration::from_secs(3),
461            &HTTP_PULL_SOURCE_TAGS,
462        )
463        .await;
464        assert!(!events.is_empty());
465
466        let metrics: Vec<_> = events
467            .into_iter()
468            .map(|event| event.into_metric())
469            .collect();
470
471        for metric in metrics {
472            assert_eq!(
473                metric.tag_value("instance"),
474                Some(format!("{}:{}", in_addr.ip(), in_addr.port()))
475            );
476            assert_eq!(
477                metric.tag_value("endpoint"),
478                Some(format!(
479                    "http://{}:{}/metrics",
480                    in_addr.ip(),
481                    in_addr.port()
482                ))
483            );
484            assert_eq!(
485                metric.tag_value("exported_instance"),
486                Some(String::from("localhost:9999"))
487            );
488            assert_eq!(
489                metric.tag_value("exported_endpoint"),
490                Some(String::from("http://example.com"))
491            );
492        }
493    }
494
495    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
496    /// > Label names MUST be unique within a LabelSet.
497    /// Prometheus itself will reject the metric with an error. Largely to remain backward compatible with older versions of Vector,
498    /// we accept the metric, but take the last label in the list.
499    #[tokio::test]
500    async fn test_prometheus_duplicate_tags() {
501        let (_guard, in_addr) = next_addr();
502
503        let dummy_endpoint = warp::path!("metrics").map(|| {
504            r#"
505                    metric_label{code="200",code="success"} 100 1612411516789
506            "#
507        });
508
509        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
510        wait_for_tcp(in_addr).await;
511
512        let config = PrometheusScrapeConfig {
513            endpoints: vec![format!("http://{}/metrics", in_addr)],
514            interval: Duration::from_secs(1),
515            timeout: default_timeout(),
516            instance_tag: Some("instance".to_string()),
517            endpoint_tag: Some("endpoint".to_string()),
518            honor_labels: true,
519            query: HashMap::new(),
520            auth: None,
521            tls: None,
522        };
523
524        let events = run_and_assert_source_compliance(
525            config,
526            Duration::from_secs(3),
527            &HTTP_PULL_SOURCE_TAGS,
528        )
529        .await;
530        assert!(!events.is_empty());
531
532        let metrics: Vec<vector_lib::event::Metric> = events
533            .into_iter()
534            .map(|event| event.into_metric())
535            .collect();
536        let metric = &metrics[0];
537
538        assert_eq!(metric.name(), "metric_label");
539
540        let code_tag = metric
541            .tags()
542            .unwrap()
543            .iter_all()
544            .filter(|(name, _value)| *name == "code")
545            .map(|(_name, value)| value)
546            .collect::<Vec<_>>();
547
548        assert_eq!(1, code_tag.len());
549        assert_eq!("success", code_tag[0].unwrap());
550    }
551
552    #[tokio::test]
553    async fn test_prometheus_request_query() {
554        let (_guard, in_addr) = next_addr();
555
556        let dummy_endpoint = warp::path!("metrics").and(warp::query::raw()).map(|query| {
557            format!(
558                r#"
559                    promhttp_metric_handler_requests_total{{query="{query}"}} 100 1612411516789
560                "#
561            )
562        });
563
564        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
565        wait_for_tcp(in_addr).await;
566
567        let config = PrometheusScrapeConfig {
568            endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
569            interval: Duration::from_secs(1),
570            timeout: default_timeout(),
571            instance_tag: Some("instance".to_string()),
572            endpoint_tag: Some("endpoint".to_string()),
573            honor_labels: false,
574            query: HashMap::from([
575                (
576                    "key1".to_string(),
577                    QueryParameterValue::MultiParams(vec![ParameterValue::String(
578                        "val2".to_string(),
579                    )]),
580                ),
581                (
582                    "key2".to_string(),
583                    QueryParameterValue::MultiParams(vec![
584                        ParameterValue::String("val1".to_string()),
585                        ParameterValue::String("val2".to_string()),
586                    ]),
587                ),
588            ]),
589            auth: None,
590            tls: None,
591        };
592
593        let events = run_and_assert_source_compliance(
594            config,
595            Duration::from_secs(3),
596            &HTTP_PULL_SOURCE_TAGS,
597        )
598        .await;
599        assert!(!events.is_empty());
600
601        let metrics: Vec<_> = events
602            .into_iter()
603            .map(|event| event.into_metric())
604            .collect();
605
606        let expected = HashMap::from([
607            (
608                "key1".to_string(),
609                vec!["val1".to_string(), "val2".to_string()],
610            ),
611            (
612                "key2".to_string(),
613                vec!["val1".to_string(), "val2".to_string()],
614            ),
615        ]);
616
617        for metric in metrics {
618            let query = metric.tag_value("query").expect("query must be tagged");
619            let mut got: HashMap<String, Vec<String>> = HashMap::new();
620            for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
621                got.entry(k.to_string()).or_default().push(v.to_string());
622            }
623            for v in got.values_mut() {
624                v.sort();
625            }
626            assert_eq!(got, expected);
627        }
628    }
629
630    // Intentially not using assert_source_compliance here because this is a round-trip test which
631    // means source and sink will both emit `EventsSent` , triggering multi-emission check.
632    #[tokio::test]
633    async fn test_prometheus_routing() {
634        trace_init();
635        let (_in_guard, in_addr) = next_addr();
636        let (_out_guard, out_addr) = next_addr();
637
638        let make_svc = make_service_fn(|_| async {
639            Ok::<_, Error>(service_fn(|_| async {
640                Ok::<_, Error>(Response::new(Body::from(
641                    r#"
642                    # HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
643                    # TYPE promhttp_metric_handler_requests_total counter
644                    promhttp_metric_handler_requests_total{code="200"} 100 1612411516789
645                    promhttp_metric_handler_requests_total{code="404"} 7 1612411516789
646                    prometheus_remote_storage_samples_in_total 57011636 1612411516789
647                    # A histogram, which has a pretty complex representation in the text format:
648                    # HELP http_request_duration_seconds A histogram of the request duration.
649                    # TYPE http_request_duration_seconds histogram
650                    http_request_duration_seconds_bucket{le="0.05"} 24054 1612411516789
651                    http_request_duration_seconds_bucket{le="0.1"} 33444 1612411516789
652                    http_request_duration_seconds_bucket{le="0.2"} 100392 1612411516789
653                    http_request_duration_seconds_bucket{le="0.5"} 129389 1612411516789
654                    http_request_duration_seconds_bucket{le="1"} 133988 1612411516789
655                    http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411516789
656                    http_request_duration_seconds_sum 53423 1612411516789
657                    http_request_duration_seconds_count 144320 1612411516789
658                    # Finally a summary, which has a complex representation, too:
659                    # HELP rpc_duration_seconds A summary of the RPC duration in seconds.
660                    # TYPE rpc_duration_seconds summary
661                    rpc_duration_seconds{code="200",quantile="0.01"} 3102 1612411516789
662                    rpc_duration_seconds{code="200",quantile="0.05"} 3272 1612411516789
663                    rpc_duration_seconds{code="200",quantile="0.5"} 4773 1612411516789
664                    rpc_duration_seconds{code="200",quantile="0.9"} 9001 1612411516789
665                    rpc_duration_seconds{code="200",quantile="0.99"} 76656 1612411516789
666                    rpc_duration_seconds_sum{code="200"} 1.7560473e+07 1612411516789
667                    rpc_duration_seconds_count{code="200"} 2693 1612411516789
668                    "#,
669                )))
670            }))
671        });
672
673        tokio::spawn(async move {
674            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
675                error!(message = "Server error.", %error);
676            }
677        });
678        wait_for_tcp(in_addr).await;
679
680        let mut config = config::Config::builder();
681        config.add_source(
682            "in",
683            PrometheusScrapeConfig {
684                endpoints: vec![format!("http://{}", in_addr)],
685                instance_tag: None,
686                endpoint_tag: None,
687                honor_labels: false,
688                query: HashMap::new(),
689                interval: Duration::from_secs(1),
690                timeout: default_timeout(),
691                tls: None,
692                auth: None,
693            },
694        );
695        config.add_sink(
696            "out",
697            &["in"],
698            PrometheusExporterConfig {
699                address: out_addr,
700                auth: None,
701                tls: None,
702                default_namespace: Some("vector".into()),
703                buckets: vec![1.0, 2.0, 4.0],
704                quantiles: vec![],
705                distributions_as_summaries: false,
706                flush_period_secs: Duration::from_secs(3),
707                suppress_timestamp: false,
708                acknowledgements: Default::default(),
709            },
710        );
711
712        let (topology, _) = start_topology(config.build().unwrap(), false).await;
713        sleep(Duration::from_secs(1)).await;
714
715        let response = Client::new()
716            .get(format!("http://{out_addr}/metrics").parse().unwrap())
717            .await
718            .unwrap();
719
720        assert!(response.status().is_success());
721        let body = response.into_body().collect().await.unwrap().to_bytes();
722        let lines = std::str::from_utf8(&body)
723            .unwrap()
724            .lines()
725            .collect::<Vec<_>>();
726
727        assert_eq!(
728            lines,
729            vec![
730                "# HELP vector_http_request_duration_seconds http_request_duration_seconds",
731                "# TYPE vector_http_request_duration_seconds histogram",
732                "vector_http_request_duration_seconds_bucket{le=\"0.05\"} 24054 1612411516789",
733                "vector_http_request_duration_seconds_bucket{le=\"0.1\"} 33444 1612411516789",
734                "vector_http_request_duration_seconds_bucket{le=\"0.2\"} 100392 1612411516789",
735                "vector_http_request_duration_seconds_bucket{le=\"0.5\"} 129389 1612411516789",
736                "vector_http_request_duration_seconds_bucket{le=\"1\"} 133988 1612411516789",
737                "vector_http_request_duration_seconds_bucket{le=\"+Inf\"} 144320 1612411516789",
738                "vector_http_request_duration_seconds_sum 53423 1612411516789",
739                "vector_http_request_duration_seconds_count 144320 1612411516789",
740                "# HELP vector_prometheus_remote_storage_samples_in_total prometheus_remote_storage_samples_in_total",
741                "# TYPE vector_prometheus_remote_storage_samples_in_total gauge",
742                "vector_prometheus_remote_storage_samples_in_total 57011636 1612411516789",
743                "# HELP vector_promhttp_metric_handler_requests_total promhttp_metric_handler_requests_total",
744                "# TYPE vector_promhttp_metric_handler_requests_total counter",
745                "vector_promhttp_metric_handler_requests_total{code=\"200\"} 100 1612411516789",
746                "vector_promhttp_metric_handler_requests_total{code=\"404\"} 7 1612411516789",
747                "# HELP vector_rpc_duration_seconds rpc_duration_seconds",
748                "# TYPE vector_rpc_duration_seconds summary",
749                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.01\"} 3102 1612411516789",
750                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.05\"} 3272 1612411516789",
751                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.5\"} 4773 1612411516789",
752                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.9\"} 9001 1612411516789",
753                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.99\"} 76656 1612411516789",
754                "vector_rpc_duration_seconds_sum{code=\"200\"} 17560473 1612411516789",
755                "vector_rpc_duration_seconds_count{code=\"200\"} 2693 1612411516789",
756            ],
757        );
758
759        topology.stop().await;
760    }
761}
762
763#[cfg(all(test, feature = "prometheus-integration-tests"))]
764mod integration_tests {
765    use tokio::time::Duration;
766
767    use super::*;
768    use crate::{
769        event::{MetricKind, MetricValue},
770        test_util::components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
771    };
772
773    #[tokio::test]
774    async fn scrapes_metrics() {
775        let config = PrometheusScrapeConfig {
776            endpoints: vec!["http://prometheus:9090/metrics".into()],
777            interval: Duration::from_secs(1),
778            timeout: Duration::from_secs(1),
779            instance_tag: Some("instance".to_string()),
780            endpoint_tag: Some("endpoint".to_string()),
781            honor_labels: false,
782            query: HashMap::new(),
783            auth: None,
784            tls: None,
785        };
786
787        let events = run_and_assert_source_compliance(
788            config,
789            Duration::from_secs(3),
790            &HTTP_PULL_SOURCE_TAGS,
791        )
792        .await;
793        assert!(!events.is_empty());
794
795        let metrics: Vec<_> = events
796            .into_iter()
797            .map(|event| event.into_metric())
798            .collect();
799
800        let find_metric = |name: &str| {
801            metrics
802                .iter()
803                .find(|metric| metric.name() == name)
804                .unwrap_or_else(|| panic!("Missing metric {name:?}"))
805        };
806
807        // Sample some well-known metrics
808        let build = find_metric("prometheus_build_info");
809        assert!(matches!(build.kind(), MetricKind::Absolute));
810        assert!(matches!(build.value(), &MetricValue::Gauge { .. }));
811        assert!(build.tags().unwrap().contains_key("branch"));
812        assert!(build.tags().unwrap().contains_key("version"));
813        assert_eq!(
814            build.tag_value("instance"),
815            Some("prometheus:9090".to_string())
816        );
817        assert_eq!(
818            build.tag_value("endpoint"),
819            Some("http://prometheus:9090/metrics".to_string())
820        );
821
822        let queries = find_metric("prometheus_engine_queries");
823        assert!(matches!(queries.kind(), MetricKind::Absolute));
824        assert!(matches!(queries.value(), &MetricValue::Gauge { .. }));
825        assert_eq!(
826            queries.tag_value("instance"),
827            Some("prometheus:9090".to_string())
828        );
829        assert_eq!(
830            queries.tag_value("endpoint"),
831            Some("http://prometheus:9090/metrics".to_string())
832        );
833
834        let go_info = find_metric("go_info");
835        assert!(matches!(go_info.kind(), MetricKind::Absolute));
836        assert!(matches!(go_info.value(), &MetricValue::Gauge { .. }));
837        assert!(go_info.tags().unwrap().contains_key("version"));
838        assert_eq!(
839            go_info.tag_value("instance"),
840            Some("prometheus:9090".to_string())
841        );
842        assert_eq!(
843            go_info.tag_value("endpoint"),
844            Some("http://prometheus:9090/metrics".to_string())
845        );
846    }
847}