vector/sources/prometheus/
scrape.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use bytes::Bytes;
5use futures_util::FutureExt;
6use http::{response::Parts, Uri};
7use serde_with::serde_as;
8use snafu::ResultExt;
9use vector_lib::configurable::configurable_component;
10use vector_lib::{config::LogNamespace, event::Event};
11
12use super::parser;
13use crate::http::QueryParameters;
14use crate::sources::util::http::HttpMethod;
15use crate::sources::util::http_client::{default_timeout, warn_if_interval_too_low};
16use crate::{
17    config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
18    http::Auth,
19    internal_events::PrometheusParseError,
20    sources::{
21        self,
22        util::http_client::{
23            build_url, call, default_interval, GenericHttpClientInputs, HttpClientBuilder,
24            HttpClientContext,
25        },
26    },
27    tls::{TlsConfig, TlsSettings},
28    Result,
29};
30
31// pulled up, and split over multiple lines, because the long lines trip up rustfmt such that it
32// gave up trying to format, but reported no error
33static PARSE_ERROR_NO_PATH: &str = "No path is set on the endpoint and we got a parse error,\
34                                    did you mean to use /metrics? This behavior changed in version 0.11.";
35static NOT_FOUND_NO_PATH: &str = "No path is set on the endpoint and we got a 404,\
36                                  did you mean to use /metrics?\
37                                  This behavior changed in version 0.11.";
38
39/// Configuration for the `prometheus_scrape` source.
40#[serde_as]
41#[configurable_component(source(
42    "prometheus_scrape",
43    "Collect metrics from Prometheus exporters."
44))]
45#[derive(Clone, Debug)]
46pub struct PrometheusScrapeConfig {
47    /// Endpoints to scrape metrics from.
48    #[configurable(metadata(docs::examples = "http://localhost:9090/metrics"))]
49    #[serde(alias = "hosts")]
50    endpoints: Vec<String>,
51
52    /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
53    /// than the interval a new scrape will be started. This can take extra resources, set the timeout
54    /// to a value lower than the scrape interval to prevent this from happening.
55    #[serde(default = "default_interval")]
56    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
57    #[serde(rename = "scrape_interval_secs")]
58    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
59    interval: Duration,
60
61    /// The timeout for each scrape request.
62    #[serde(default = "default_timeout")]
63    #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
64    #[serde(rename = "scrape_timeout_secs")]
65    #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
66    timeout: Duration,
67
68    /// The tag name added to each event representing the scraped instance's `host:port`.
69    ///
70    /// The tag value is the host and port of the scraped instance.
71    #[configurable(metadata(docs::advanced))]
72    instance_tag: Option<String>,
73
74    /// The tag name added to each event representing the scraped instance's endpoint.
75    ///
76    /// The tag value is the endpoint of the scraped instance.
77    #[configurable(metadata(docs::advanced))]
78    endpoint_tag: Option<String>,
79
80    /// Controls how tag conflicts are handled if the scraped source has tags to be added.
81    ///
82    /// If `true`, the new tag is not added if the scraped metric has the tag already. If `false`, the conflicting tag
83    /// is renamed by prepending `exported_` to the original name.
84    ///
85    /// This matches Prometheus’ `honor_labels` configuration.
86    #[serde(default = "crate::serde::default_false")]
87    #[configurable(metadata(docs::advanced))]
88    honor_labels: bool,
89
90    /// Custom parameters for the scrape request query string.
91    ///
92    /// One or more values for the same parameter key can be provided. The parameters provided in this option are
93    /// appended to any parameters manually provided in the `endpoints` option. This option is especially useful when
94    /// scraping the `/federate` endpoint.
95    #[serde(default)]
96    #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
97    #[configurable(metadata(docs::examples = "query_example()"))]
98    query: QueryParameters,
99
100    #[configurable(derived)]
101    tls: Option<TlsConfig>,
102
103    #[configurable(derived)]
104    #[configurable(metadata(docs::advanced))]
105    auth: Option<Auth>,
106}
107
108fn query_example() -> serde_json::Value {
109    serde_json::json! ({
110        "match[]": [
111            "{job=\"somejob\"}",
112            "{__name__=~\"job:.*\"}"
113        ]
114    })
115}
116
117impl GenerateConfig for PrometheusScrapeConfig {
118    fn generate_config() -> toml::Value {
119        toml::Value::try_from(Self {
120            endpoints: vec!["http://localhost:9090/metrics".to_string()],
121            interval: default_interval(),
122            timeout: default_timeout(),
123            instance_tag: Some("instance".to_string()),
124            endpoint_tag: Some("endpoint".to_string()),
125            honor_labels: false,
126            query: HashMap::new(),
127            tls: None,
128            auth: None,
129        })
130        .unwrap()
131    }
132}
133
134#[async_trait::async_trait]
135#[typetag::serde(name = "prometheus_scrape")]
136impl SourceConfig for PrometheusScrapeConfig {
137    async fn build(&self, cx: SourceContext) -> Result<sources::Source> {
138        let urls = self
139            .endpoints
140            .iter()
141            .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
142            .map(|r| r.map(|uri| build_url(&uri, &self.query)))
143            .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
144        let tls = TlsSettings::from_options(self.tls.as_ref())?;
145
146        let builder = PrometheusScrapeBuilder {
147            honor_labels: self.honor_labels,
148            instance_tag: self.instance_tag.clone(),
149            endpoint_tag: self.endpoint_tag.clone(),
150        };
151
152        warn_if_interval_too_low(self.timeout, self.interval);
153
154        let inputs = GenericHttpClientInputs {
155            urls,
156            interval: self.interval,
157            timeout: self.timeout,
158            headers: HashMap::new(),
159            content_type: "text/plain".to_string(),
160            auth: self.auth.clone(),
161            tls,
162            proxy: cx.proxy.clone(),
163            shutdown: cx.shutdown,
164        };
165
166        Ok(call(inputs, builder, cx.out, HttpMethod::Get).boxed())
167    }
168
169    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
170        vec![SourceOutput::new_metrics()]
171    }
172
173    fn can_acknowledge(&self) -> bool {
174        false
175    }
176}
177
178// InstanceInfo stores the scraped instance info and the tag to insert into the log event with. It
179// is used to join these two pieces of info to avoid storing the instance if instance_tag is not
180// configured
181#[derive(Clone)]
182struct InstanceInfo {
183    tag: String,
184    instance: String,
185    honor_label: bool,
186}
187
188// EndpointInfo stores the scraped endpoint info and the tag to insert into the log event with. It
189// is used to join these two pieces of info to avoid storing the endpoint if endpoint_tag is not
190// configured
191#[derive(Clone)]
192struct EndpointInfo {
193    tag: String,
194    endpoint: String,
195    honor_label: bool,
196}
197
198/// Captures the configuration options required to build request-specific context.
199#[derive(Clone)]
200struct PrometheusScrapeBuilder {
201    honor_labels: bool,
202    instance_tag: Option<String>,
203    endpoint_tag: Option<String>,
204}
205
206impl HttpClientBuilder for PrometheusScrapeBuilder {
207    type Context = PrometheusScrapeContext;
208
209    /// Expands the context with the instance info and endpoint info for the current request.
210    fn build(&self, url: &Uri) -> Self::Context {
211        let instance_info = self.instance_tag.as_ref().map(|tag| {
212            let instance = format!(
213                "{}:{}",
214                url.host().unwrap_or_default(),
215                url.port_u16().unwrap_or_else(|| match url.scheme() {
216                    Some(scheme) if scheme == &http::uri::Scheme::HTTP => 80,
217                    Some(scheme) if scheme == &http::uri::Scheme::HTTPS => 443,
218                    _ => 0,
219                })
220            );
221            InstanceInfo {
222                tag: tag.to_string(),
223                instance,
224                honor_label: self.honor_labels,
225            }
226        });
227        let endpoint_info = self.endpoint_tag.as_ref().map(|tag| EndpointInfo {
228            tag: tag.to_string(),
229            endpoint: url.to_string(),
230            honor_label: self.honor_labels,
231        });
232        PrometheusScrapeContext {
233            instance_info,
234            endpoint_info,
235        }
236    }
237}
238
239/// Request-specific context required for decoding into events.
240struct PrometheusScrapeContext {
241    instance_info: Option<InstanceInfo>,
242    endpoint_info: Option<EndpointInfo>,
243}
244
245impl HttpClientContext for PrometheusScrapeContext {
246    fn enrich_events(&mut self, events: &mut Vec<Event>) {
247        for event in events.iter_mut() {
248            let metric = event.as_mut_metric();
249            if let Some(InstanceInfo {
250                tag,
251                instance,
252                honor_label,
253            }) = &self.instance_info
254            {
255                match (honor_label, metric.tag_value(tag)) {
256                    (false, Some(old_instance)) => {
257                        metric.replace_tag(format!("exported_{tag}"), old_instance);
258                        metric.replace_tag(tag.clone(), instance.clone());
259                    }
260                    (true, Some(_)) => {}
261                    (_, None) => {
262                        metric.replace_tag(tag.clone(), instance.clone());
263                    }
264                }
265            }
266            if let Some(EndpointInfo {
267                tag,
268                endpoint,
269                honor_label,
270            }) = &self.endpoint_info
271            {
272                match (honor_label, metric.tag_value(tag)) {
273                    (false, Some(old_endpoint)) => {
274                        metric.replace_tag(format!("exported_{tag}"), old_endpoint);
275                        metric.replace_tag(tag.clone(), endpoint.clone());
276                    }
277                    (true, Some(_)) => {}
278                    (_, None) => {
279                        metric.replace_tag(tag.clone(), endpoint.clone());
280                    }
281                }
282            }
283        }
284    }
285
286    /// Parses the Prometheus HTTP response into metric events
287    fn on_response(&mut self, url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
288        let body = String::from_utf8_lossy(body);
289
290        match parser::parse_text(&body) {
291            Ok(events) => Some(events),
292            Err(error) => {
293                if url.path() == "/" {
294                    // https://github.com/vectordotdev/vector/pull/3801#issuecomment-700723178
295                    warn!(
296                        message = PARSE_ERROR_NO_PATH,
297                        endpoint = %url,
298                    );
299                }
300                emit!(PrometheusParseError {
301                    error,
302                    url: url.clone(),
303                    body,
304                });
305                None
306            }
307        }
308    }
309
310    fn on_http_response_error(&self, url: &Uri, header: &Parts) {
311        if header.status == hyper::StatusCode::NOT_FOUND && url.path() == "/" {
312            // https://github.com/vectordotdev/vector/pull/3801#issuecomment-700723178
313            warn!(
314                message = NOT_FOUND_NO_PATH,
315                endpoint = %url,
316            );
317        }
318    }
319}
320
321#[cfg(all(test, feature = "sinks-prometheus"))]
322mod test {
323    use hyper::{
324        service::{make_service_fn, service_fn},
325        Body, Client, Response, Server,
326    };
327    use similar_asserts::assert_eq;
328    use tokio::time::{sleep, Duration};
329    use warp::Filter;
330
331    use super::*;
332    use crate::{
333        config,
334        http::{ParameterValue, QueryParameterValue},
335        sinks::prometheus::exporter::PrometheusExporterConfig,
336        test_util::{
337            components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS},
338            next_addr, start_topology, trace_init, wait_for_tcp,
339        },
340        Error,
341    };
342
343    #[test]
344    fn generate_config() {
345        crate::test_util::test_generate_config::<PrometheusScrapeConfig>();
346    }
347
348    #[tokio::test]
349    async fn test_prometheus_sets_headers() {
350        let in_addr = next_addr();
351
352        let dummy_endpoint = warp::path!("metrics").and(warp::header::exact("Accept", "text/plain")).map(|| {
353            r#"
354                    promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
355                    "#
356        });
357
358        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
359        wait_for_tcp(in_addr).await;
360
361        let config = PrometheusScrapeConfig {
362            endpoints: vec![format!("http://{}/metrics", in_addr)],
363            interval: Duration::from_secs(1),
364            timeout: default_timeout(),
365            instance_tag: Some("instance".to_string()),
366            endpoint_tag: Some("endpoint".to_string()),
367            honor_labels: true,
368            query: HashMap::new(),
369            auth: None,
370            tls: None,
371        };
372
373        let events = run_and_assert_source_compliance(
374            config,
375            Duration::from_secs(3),
376            &HTTP_PULL_SOURCE_TAGS,
377        )
378        .await;
379        assert!(!events.is_empty());
380    }
381
382    #[tokio::test]
383    async fn test_prometheus_honor_labels() {
384        let in_addr = next_addr();
385
386        let dummy_endpoint = warp::path!("metrics").map(|| {
387                r#"
388                    promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
389                    "#
390        });
391
392        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
393        wait_for_tcp(in_addr).await;
394
395        let config = PrometheusScrapeConfig {
396            endpoints: vec![format!("http://{}/metrics", in_addr)],
397            interval: Duration::from_secs(1),
398            timeout: default_timeout(),
399            instance_tag: Some("instance".to_string()),
400            endpoint_tag: Some("endpoint".to_string()),
401            honor_labels: true,
402            query: HashMap::new(),
403            auth: None,
404            tls: None,
405        };
406
407        let events = run_and_assert_source_compliance(
408            config,
409            Duration::from_secs(3),
410            &HTTP_PULL_SOURCE_TAGS,
411        )
412        .await;
413        assert!(!events.is_empty());
414
415        let metrics: Vec<_> = events
416            .into_iter()
417            .map(|event| event.into_metric())
418            .collect();
419
420        for metric in metrics {
421            assert_eq!(
422                metric.tag_value("instance"),
423                Some(String::from("localhost:9999"))
424            );
425            assert_eq!(
426                metric.tag_value("endpoint"),
427                Some(String::from("http://example.com"))
428            );
429            assert_eq!(metric.tag_value("exported_instance"), None,);
430            assert_eq!(metric.tag_value("exported_endpoint"), None,);
431        }
432    }
433
434    #[tokio::test]
435    async fn test_prometheus_do_not_honor_labels() {
436        let in_addr = next_addr();
437
438        let dummy_endpoint = warp::path!("metrics").map(|| {
439                r#"
440                    promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
441                "#
442        });
443
444        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
445        wait_for_tcp(in_addr).await;
446
447        let config = PrometheusScrapeConfig {
448            endpoints: vec![format!("http://{}/metrics", in_addr)],
449            interval: Duration::from_secs(1),
450            timeout: default_timeout(),
451            instance_tag: Some("instance".to_string()),
452            endpoint_tag: Some("endpoint".to_string()),
453            honor_labels: false,
454            query: HashMap::new(),
455            auth: None,
456            tls: None,
457        };
458
459        let events = run_and_assert_source_compliance(
460            config,
461            Duration::from_secs(3),
462            &HTTP_PULL_SOURCE_TAGS,
463        )
464        .await;
465        assert!(!events.is_empty());
466
467        let metrics: Vec<_> = events
468            .into_iter()
469            .map(|event| event.into_metric())
470            .collect();
471
472        for metric in metrics {
473            assert_eq!(
474                metric.tag_value("instance"),
475                Some(format!("{}:{}", in_addr.ip(), in_addr.port()))
476            );
477            assert_eq!(
478                metric.tag_value("endpoint"),
479                Some(format!(
480                    "http://{}:{}/metrics",
481                    in_addr.ip(),
482                    in_addr.port()
483                ))
484            );
485            assert_eq!(
486                metric.tag_value("exported_instance"),
487                Some(String::from("localhost:9999"))
488            );
489            assert_eq!(
490                metric.tag_value("exported_endpoint"),
491                Some(String::from("http://example.com"))
492            );
493        }
494    }
495
496    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
497    /// > Label names MUST be unique within a LabelSet.
498    /// Prometheus itself will reject the metric with an error. Largely to remain backward compatible with older versions of Vector,
499    /// we accept the metric, but take the last label in the list.
500    #[tokio::test]
501    async fn test_prometheus_duplicate_tags() {
502        let in_addr = next_addr();
503
504        let dummy_endpoint = warp::path!("metrics").map(|| {
505            r#"
506                    metric_label{code="200",code="success"} 100 1612411516789
507            "#
508        });
509
510        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
511        wait_for_tcp(in_addr).await;
512
513        let config = PrometheusScrapeConfig {
514            endpoints: vec![format!("http://{}/metrics", in_addr)],
515            interval: Duration::from_secs(1),
516            timeout: default_timeout(),
517            instance_tag: Some("instance".to_string()),
518            endpoint_tag: Some("endpoint".to_string()),
519            honor_labels: true,
520            query: HashMap::new(),
521            auth: None,
522            tls: None,
523        };
524
525        let events = run_and_assert_source_compliance(
526            config,
527            Duration::from_secs(3),
528            &HTTP_PULL_SOURCE_TAGS,
529        )
530        .await;
531        assert!(!events.is_empty());
532
533        let metrics: Vec<vector_lib::event::Metric> = events
534            .into_iter()
535            .map(|event| event.into_metric())
536            .collect();
537        let metric = &metrics[0];
538
539        assert_eq!(metric.name(), "metric_label");
540
541        let code_tag = metric
542            .tags()
543            .unwrap()
544            .iter_all()
545            .filter(|(name, _value)| *name == "code")
546            .map(|(_name, value)| value)
547            .collect::<Vec<_>>();
548
549        assert_eq!(1, code_tag.len());
550        assert_eq!("success", code_tag[0].unwrap());
551    }
552
553    #[tokio::test]
554    async fn test_prometheus_request_query() {
555        let in_addr = next_addr();
556
557        let dummy_endpoint = warp::path!("metrics").and(warp::query::raw()).map(|query| {
558            format!(
559                r#"
560                    promhttp_metric_handler_requests_total{{query="{query}"}} 100 1612411516789
561                "#
562            )
563        });
564
565        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
566        wait_for_tcp(in_addr).await;
567
568        let config = PrometheusScrapeConfig {
569            endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
570            interval: Duration::from_secs(1),
571            timeout: default_timeout(),
572            instance_tag: Some("instance".to_string()),
573            endpoint_tag: Some("endpoint".to_string()),
574            honor_labels: false,
575            query: HashMap::from([
576                (
577                    "key1".to_string(),
578                    QueryParameterValue::MultiParams(vec![ParameterValue::String(
579                        "val2".to_string(),
580                    )]),
581                ),
582                (
583                    "key2".to_string(),
584                    QueryParameterValue::MultiParams(vec![
585                        ParameterValue::String("val1".to_string()),
586                        ParameterValue::String("val2".to_string()),
587                    ]),
588                ),
589            ]),
590            auth: None,
591            tls: None,
592        };
593
594        let events = run_and_assert_source_compliance(
595            config,
596            Duration::from_secs(3),
597            &HTTP_PULL_SOURCE_TAGS,
598        )
599        .await;
600        assert!(!events.is_empty());
601
602        let metrics: Vec<_> = events
603            .into_iter()
604            .map(|event| event.into_metric())
605            .collect();
606
607        let expected = HashMap::from([
608            (
609                "key1".to_string(),
610                vec!["val1".to_string(), "val2".to_string()],
611            ),
612            (
613                "key2".to_string(),
614                vec!["val1".to_string(), "val2".to_string()],
615            ),
616        ]);
617
618        for metric in metrics {
619            let query = metric.tag_value("query").expect("query must be tagged");
620            let mut got: HashMap<String, Vec<String>> = HashMap::new();
621            for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
622                got.entry(k.to_string()).or_default().push(v.to_string());
623            }
624            for v in got.values_mut() {
625                v.sort();
626            }
627            assert_eq!(got, expected);
628        }
629    }
630
631    // Intentially not using assert_source_compliance here because this is a round-trip test which
632    // means source and sink will both emit `EventsSent` , triggering multi-emission check.
633    #[tokio::test]
634    async fn test_prometheus_routing() {
635        trace_init();
636        let in_addr = next_addr();
637        let out_addr = next_addr();
638
639        let make_svc = make_service_fn(|_| async {
640            Ok::<_, Error>(service_fn(|_| async {
641                Ok::<_, Error>(Response::new(Body::from(
642                    r#"
643                    # HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
644                    # TYPE promhttp_metric_handler_requests_total counter
645                    promhttp_metric_handler_requests_total{code="200"} 100 1612411516789
646                    promhttp_metric_handler_requests_total{code="404"} 7 1612411516789
647                    prometheus_remote_storage_samples_in_total 57011636 1612411516789
648                    # A histogram, which has a pretty complex representation in the text format:
649                    # HELP http_request_duration_seconds A histogram of the request duration.
650                    # TYPE http_request_duration_seconds histogram
651                    http_request_duration_seconds_bucket{le="0.05"} 24054 1612411516789
652                    http_request_duration_seconds_bucket{le="0.1"} 33444 1612411516789
653                    http_request_duration_seconds_bucket{le="0.2"} 100392 1612411516789
654                    http_request_duration_seconds_bucket{le="0.5"} 129389 1612411516789
655                    http_request_duration_seconds_bucket{le="1"} 133988 1612411516789
656                    http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411516789
657                    http_request_duration_seconds_sum 53423 1612411516789
658                    http_request_duration_seconds_count 144320 1612411516789
659                    # Finally a summary, which has a complex representation, too:
660                    # HELP rpc_duration_seconds A summary of the RPC duration in seconds.
661                    # TYPE rpc_duration_seconds summary
662                    rpc_duration_seconds{code="200",quantile="0.01"} 3102 1612411516789
663                    rpc_duration_seconds{code="200",quantile="0.05"} 3272 1612411516789
664                    rpc_duration_seconds{code="200",quantile="0.5"} 4773 1612411516789
665                    rpc_duration_seconds{code="200",quantile="0.9"} 9001 1612411516789
666                    rpc_duration_seconds{code="200",quantile="0.99"} 76656 1612411516789
667                    rpc_duration_seconds_sum{code="200"} 1.7560473e+07 1612411516789
668                    rpc_duration_seconds_count{code="200"} 2693 1612411516789
669                    "#,
670                )))
671            }))
672        });
673
674        tokio::spawn(async move {
675            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
676                error!(message = "Server error.", %error);
677            }
678        });
679        wait_for_tcp(in_addr).await;
680
681        let mut config = config::Config::builder();
682        config.add_source(
683            "in",
684            PrometheusScrapeConfig {
685                endpoints: vec![format!("http://{}", in_addr)],
686                instance_tag: None,
687                endpoint_tag: None,
688                honor_labels: false,
689                query: HashMap::new(),
690                interval: Duration::from_secs(1),
691                timeout: default_timeout(),
692                tls: None,
693                auth: None,
694            },
695        );
696        config.add_sink(
697            "out",
698            &["in"],
699            PrometheusExporterConfig {
700                address: out_addr,
701                auth: None,
702                tls: None,
703                default_namespace: Some("vector".into()),
704                buckets: vec![1.0, 2.0, 4.0],
705                quantiles: vec![],
706                distributions_as_summaries: false,
707                flush_period_secs: Duration::from_secs(3),
708                suppress_timestamp: false,
709                acknowledgements: Default::default(),
710            },
711        );
712
713        let (topology, _) = start_topology(config.build().unwrap(), false).await;
714        sleep(Duration::from_secs(1)).await;
715
716        let response = Client::new()
717            .get(format!("http://{out_addr}/metrics").parse().unwrap())
718            .await
719            .unwrap();
720
721        assert!(response.status().is_success());
722        let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
723        let lines = std::str::from_utf8(&body)
724            .unwrap()
725            .lines()
726            .collect::<Vec<_>>();
727
728        assert_eq!(lines, vec![
729                "# HELP vector_http_request_duration_seconds http_request_duration_seconds",
730                "# TYPE vector_http_request_duration_seconds histogram",
731                "vector_http_request_duration_seconds_bucket{le=\"0.05\"} 24054 1612411516789",
732                "vector_http_request_duration_seconds_bucket{le=\"0.1\"} 33444 1612411516789",
733                "vector_http_request_duration_seconds_bucket{le=\"0.2\"} 100392 1612411516789",
734                "vector_http_request_duration_seconds_bucket{le=\"0.5\"} 129389 1612411516789",
735                "vector_http_request_duration_seconds_bucket{le=\"1\"} 133988 1612411516789",
736                "vector_http_request_duration_seconds_bucket{le=\"+Inf\"} 144320 1612411516789",
737                "vector_http_request_duration_seconds_sum 53423 1612411516789",
738                "vector_http_request_duration_seconds_count 144320 1612411516789",
739                "# HELP vector_prometheus_remote_storage_samples_in_total prometheus_remote_storage_samples_in_total",
740                "# TYPE vector_prometheus_remote_storage_samples_in_total gauge",
741                "vector_prometheus_remote_storage_samples_in_total 57011636 1612411516789",
742                "# HELP vector_promhttp_metric_handler_requests_total promhttp_metric_handler_requests_total",
743                "# TYPE vector_promhttp_metric_handler_requests_total counter",
744                "vector_promhttp_metric_handler_requests_total{code=\"200\"} 100 1612411516789",
745                "vector_promhttp_metric_handler_requests_total{code=\"404\"} 7 1612411516789",
746                "# HELP vector_rpc_duration_seconds rpc_duration_seconds",
747                "# TYPE vector_rpc_duration_seconds summary",
748                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.01\"} 3102 1612411516789",
749                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.05\"} 3272 1612411516789",
750                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.5\"} 4773 1612411516789",
751                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.9\"} 9001 1612411516789",
752                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.99\"} 76656 1612411516789",
753                "vector_rpc_duration_seconds_sum{code=\"200\"} 17560473 1612411516789",
754                "vector_rpc_duration_seconds_count{code=\"200\"} 2693 1612411516789",
755                ],
756            );
757
758        topology.stop().await;
759    }
760}
761
762#[cfg(all(test, feature = "prometheus-integration-tests"))]
763mod integration_tests {
764    use tokio::time::Duration;
765
766    use super::*;
767    use crate::{
768        event::{MetricKind, MetricValue},
769        test_util::components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS},
770    };
771
772    #[tokio::test]
773    async fn scrapes_metrics() {
774        let config = PrometheusScrapeConfig {
775            endpoints: vec!["http://prometheus:9090/metrics".into()],
776            interval: Duration::from_secs(1),
777            timeout: Duration::from_secs(1),
778            instance_tag: Some("instance".to_string()),
779            endpoint_tag: Some("endpoint".to_string()),
780            honor_labels: false,
781            query: HashMap::new(),
782            auth: None,
783            tls: None,
784        };
785
786        let events = run_and_assert_source_compliance(
787            config,
788            Duration::from_secs(3),
789            &HTTP_PULL_SOURCE_TAGS,
790        )
791        .await;
792        assert!(!events.is_empty());
793
794        let metrics: Vec<_> = events
795            .into_iter()
796            .map(|event| event.into_metric())
797            .collect();
798
799        let find_metric = |name: &str| {
800            metrics
801                .iter()
802                .find(|metric| metric.name() == name)
803                .unwrap_or_else(|| panic!("Missing metric {name:?}"))
804        };
805
806        // Sample some well-known metrics
807        let build = find_metric("prometheus_build_info");
808        assert!(matches!(build.kind(), MetricKind::Absolute));
809        assert!(matches!(build.value(), &MetricValue::Gauge { .. }));
810        assert!(build.tags().unwrap().contains_key("branch"));
811        assert!(build.tags().unwrap().contains_key("version"));
812        assert_eq!(
813            build.tag_value("instance"),
814            Some("prometheus:9090".to_string())
815        );
816        assert_eq!(
817            build.tag_value("endpoint"),
818            Some("http://prometheus:9090/metrics".to_string())
819        );
820
821        let queries = find_metric("prometheus_engine_queries");
822        assert!(matches!(queries.kind(), MetricKind::Absolute));
823        assert!(matches!(queries.value(), &MetricValue::Gauge { .. }));
824        assert_eq!(
825            queries.tag_value("instance"),
826            Some("prometheus:9090".to_string())
827        );
828        assert_eq!(
829            queries.tag_value("endpoint"),
830            Some("http://prometheus:9090/metrics".to_string())
831        );
832
833        let go_info = find_metric("go_info");
834        assert!(matches!(go_info.kind(), MetricKind::Absolute));
835        assert!(matches!(go_info.value(), &MetricValue::Gauge { .. }));
836        assert!(go_info.tags().unwrap().contains_key("version"));
837        assert_eq!(
838            go_info.tag_value("instance"),
839            Some("prometheus:9090".to_string())
840        );
841        assert_eq!(
842            go_info.tag_value("endpoint"),
843            Some("http://prometheus:9090/metrics".to_string())
844        );
845    }
846}