vector/sources/prometheus/
scrape.rs

1use std::{collections::HashMap, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::{Uri, response::Parts};
6use serde_with::serde_as;
7use snafu::ResultExt;
8use vector_lib::{config::LogNamespace, configurable::configurable_component, event::Event};
9
10use super::parser;
11use crate::{
12    Result,
13    config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
14    http::{Auth, QueryParameters},
15    internal_events::PrometheusParseError,
16    sources::{
17        self,
18        util::{
19            http::HttpMethod,
20            http_client::{
21                GenericHttpClientInputs, HttpClientBuilder, HttpClientContext, build_url, call,
22                default_interval, default_timeout, warn_if_interval_too_low,
23            },
24        },
25    },
26    tls::{TlsConfig, TlsSettings},
27};
28
29// pulled up, and split over multiple lines, because the long lines trip up rustfmt such that it
30// gave up trying to format, but reported no error
31static PARSE_ERROR_NO_PATH: &str = "No path is set on the endpoint and we got a parse error,\
32                                    did you mean to use /metrics? This behavior changed in version 0.11.";
33static NOT_FOUND_NO_PATH: &str = "No path is set on the endpoint and we got a 404,\
34                                  did you mean to use /metrics?\
35                                  This behavior changed in version 0.11.";
36
37/// Configuration for the `prometheus_scrape` source.
38#[serde_as]
39#[configurable_component(source(
40    "prometheus_scrape",
41    "Collect metrics from Prometheus exporters."
42))]
43#[derive(Clone, Debug)]
44pub struct PrometheusScrapeConfig {
45    /// Endpoints to scrape metrics from.
46    #[configurable(metadata(docs::examples = "http://localhost:9090/metrics"))]
47    #[serde(alias = "hosts")]
48    endpoints: Vec<String>,
49
50    /// The interval between scrapes. Requests are run concurrently so if a scrape takes longer
51    /// than the interval a new scrape will be started. This can take extra resources, set the timeout
52    /// to a value lower than the scrape interval to prevent this from happening.
53    #[serde(default = "default_interval")]
54    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
55    #[serde(rename = "scrape_interval_secs")]
56    #[configurable(metadata(docs::human_name = "Scrape Interval"))]
57    interval: Duration,
58
59    /// The timeout for each scrape request.
60    #[serde(default = "default_timeout")]
61    #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
62    #[serde(rename = "scrape_timeout_secs")]
63    #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
64    timeout: Duration,
65
66    /// The tag name added to each event representing the scraped instance's `host:port`.
67    ///
68    /// The tag value is the host and port of the scraped instance.
69    #[configurable(metadata(docs::advanced))]
70    instance_tag: Option<String>,
71
72    /// The tag name added to each event representing the scraped instance's endpoint.
73    ///
74    /// The tag value is the endpoint of the scraped instance.
75    #[configurable(metadata(docs::advanced))]
76    endpoint_tag: Option<String>,
77
78    /// Controls how tag conflicts are handled if the scraped source has tags to be added.
79    ///
80    /// If `true`, the new tag is not added if the scraped metric has the tag already. If `false`, the conflicting tag
81    /// is renamed by prepending `exported_` to the original name.
82    ///
83    /// This matches Prometheus’ `honor_labels` configuration.
84    #[serde(default = "crate::serde::default_false")]
85    #[configurable(metadata(docs::advanced))]
86    honor_labels: bool,
87
88    /// Custom parameters for the scrape request query string.
89    ///
90    /// One or more values for the same parameter key can be provided. The parameters provided in this option are
91    /// appended to any parameters manually provided in the `endpoints` option. This option is especially useful when
92    /// scraping the `/federate` endpoint.
93    #[serde(default)]
94    #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
95    #[configurable(metadata(docs::examples = "query_example()"))]
96    query: QueryParameters,
97
98    #[configurable(derived)]
99    tls: Option<TlsConfig>,
100
101    #[configurable(derived)]
102    #[configurable(metadata(docs::advanced))]
103    auth: Option<Auth>,
104}
105
106fn query_example() -> serde_json::Value {
107    serde_json::json! ({
108        "match[]": [
109            "{job=\"somejob\"}",
110            "{__name__=~\"job:.*\"}"
111        ]
112    })
113}
114
115impl GenerateConfig for PrometheusScrapeConfig {
116    fn generate_config() -> toml::Value {
117        toml::Value::try_from(Self {
118            endpoints: vec!["http://localhost:9090/metrics".to_string()],
119            interval: default_interval(),
120            timeout: default_timeout(),
121            instance_tag: Some("instance".to_string()),
122            endpoint_tag: Some("endpoint".to_string()),
123            honor_labels: false,
124            query: HashMap::new(),
125            tls: None,
126            auth: None,
127        })
128        .unwrap()
129    }
130}
131
132#[async_trait::async_trait]
133#[typetag::serde(name = "prometheus_scrape")]
134impl SourceConfig for PrometheusScrapeConfig {
135    async fn build(&self, cx: SourceContext) -> Result<sources::Source> {
136        let urls = self
137            .endpoints
138            .iter()
139            .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
140            .map(|r| r.map(|uri| build_url(&uri, &self.query)))
141            .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
142        let tls = TlsSettings::from_options(self.tls.as_ref())?;
143
144        let builder = PrometheusScrapeBuilder {
145            honor_labels: self.honor_labels,
146            instance_tag: self.instance_tag.clone(),
147            endpoint_tag: self.endpoint_tag.clone(),
148        };
149
150        warn_if_interval_too_low(self.timeout, self.interval);
151
152        let inputs = GenericHttpClientInputs {
153            urls,
154            interval: self.interval,
155            timeout: self.timeout,
156            headers: HashMap::new(),
157            content_type: "text/plain".to_string(),
158            auth: self.auth.clone(),
159            tls,
160            proxy: cx.proxy.clone(),
161            shutdown: cx.shutdown,
162        };
163
164        Ok(call(inputs, builder, cx.out, HttpMethod::Get).boxed())
165    }
166
167    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
168        vec![SourceOutput::new_metrics()]
169    }
170
171    fn can_acknowledge(&self) -> bool {
172        false
173    }
174}
175
176// InstanceInfo stores the scraped instance info and the tag to insert into the log event with. It
177// is used to join these two pieces of info to avoid storing the instance if instance_tag is not
178// configured
179#[derive(Clone)]
180struct InstanceInfo {
181    tag: String,
182    instance: String,
183    honor_label: bool,
184}
185
186// EndpointInfo stores the scraped endpoint info and the tag to insert into the log event with. It
187// is used to join these two pieces of info to avoid storing the endpoint if endpoint_tag is not
188// configured
189#[derive(Clone)]
190struct EndpointInfo {
191    tag: String,
192    endpoint: String,
193    honor_label: bool,
194}
195
196/// Captures the configuration options required to build request-specific context.
197#[derive(Clone)]
198struct PrometheusScrapeBuilder {
199    honor_labels: bool,
200    instance_tag: Option<String>,
201    endpoint_tag: Option<String>,
202}
203
204impl HttpClientBuilder for PrometheusScrapeBuilder {
205    type Context = PrometheusScrapeContext;
206
207    /// Expands the context with the instance info and endpoint info for the current request.
208    fn build(&self, url: &Uri) -> Self::Context {
209        let instance_info = self.instance_tag.as_ref().map(|tag| {
210            let instance = format!(
211                "{}:{}",
212                url.host().unwrap_or_default(),
213                url.port_u16().unwrap_or_else(|| match url.scheme() {
214                    Some(scheme) if scheme == &http::uri::Scheme::HTTP => 80,
215                    Some(scheme) if scheme == &http::uri::Scheme::HTTPS => 443,
216                    _ => 0,
217                })
218            );
219            InstanceInfo {
220                tag: tag.to_string(),
221                instance,
222                honor_label: self.honor_labels,
223            }
224        });
225        let endpoint_info = self.endpoint_tag.as_ref().map(|tag| EndpointInfo {
226            tag: tag.to_string(),
227            endpoint: url.to_string(),
228            honor_label: self.honor_labels,
229        });
230        PrometheusScrapeContext {
231            instance_info,
232            endpoint_info,
233        }
234    }
235}
236
237/// Request-specific context required for decoding into events.
238struct PrometheusScrapeContext {
239    instance_info: Option<InstanceInfo>,
240    endpoint_info: Option<EndpointInfo>,
241}
242
243impl HttpClientContext for PrometheusScrapeContext {
244    fn enrich_events(&mut self, events: &mut Vec<Event>) {
245        for event in events.iter_mut() {
246            let metric = event.as_mut_metric();
247            if let Some(InstanceInfo {
248                tag,
249                instance,
250                honor_label,
251            }) = &self.instance_info
252            {
253                match (honor_label, metric.tag_value(tag)) {
254                    (false, Some(old_instance)) => {
255                        metric.replace_tag(format!("exported_{tag}"), old_instance);
256                        metric.replace_tag(tag.clone(), instance.clone());
257                    }
258                    (true, Some(_)) => {}
259                    (_, None) => {
260                        metric.replace_tag(tag.clone(), instance.clone());
261                    }
262                }
263            }
264            if let Some(EndpointInfo {
265                tag,
266                endpoint,
267                honor_label,
268            }) = &self.endpoint_info
269            {
270                match (honor_label, metric.tag_value(tag)) {
271                    (false, Some(old_endpoint)) => {
272                        metric.replace_tag(format!("exported_{tag}"), old_endpoint);
273                        metric.replace_tag(tag.clone(), endpoint.clone());
274                    }
275                    (true, Some(_)) => {}
276                    (_, None) => {
277                        metric.replace_tag(tag.clone(), endpoint.clone());
278                    }
279                }
280            }
281        }
282    }
283
284    /// Parses the Prometheus HTTP response into metric events
285    fn on_response(&mut self, url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
286        let body = String::from_utf8_lossy(body);
287
288        match parser::parse_text(&body) {
289            Ok(events) => Some(events),
290            Err(error) => {
291                if url.path() == "/" {
292                    // https://github.com/vectordotdev/vector/pull/3801#issuecomment-700723178
293                    warn!(
294                        message = PARSE_ERROR_NO_PATH,
295                        endpoint = %url,
296                    );
297                }
298                emit!(PrometheusParseError {
299                    error,
300                    url: url.clone(),
301                    body,
302                });
303                None
304            }
305        }
306    }
307
308    fn on_http_response_error(&self, url: &Uri, header: &Parts) {
309        if header.status == hyper::StatusCode::NOT_FOUND && url.path() == "/" {
310            // https://github.com/vectordotdev/vector/pull/3801#issuecomment-700723178
311            warn!(
312                message = NOT_FOUND_NO_PATH,
313                endpoint = %url,
314            );
315        }
316    }
317}
318
319#[cfg(all(test, feature = "sinks-prometheus"))]
320mod test {
321    use hyper::{
322        Body, Client, Response, Server,
323        service::{make_service_fn, service_fn},
324    };
325    use similar_asserts::assert_eq;
326    use tokio::time::{Duration, sleep};
327    use warp::Filter;
328
329    use super::*;
330    use crate::{
331        Error, config,
332        http::{ParameterValue, QueryParameterValue},
333        sinks::prometheus::exporter::PrometheusExporterConfig,
334        test_util::{
335            components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
336            next_addr, start_topology, trace_init, wait_for_tcp,
337        },
338    };
339
340    #[test]
341    fn generate_config() {
342        crate::test_util::test_generate_config::<PrometheusScrapeConfig>();
343    }
344
345    #[tokio::test]
346    async fn test_prometheus_sets_headers() {
347        let in_addr = next_addr();
348
349        let dummy_endpoint = warp::path!("metrics").and(warp::header::exact("Accept", "text/plain")).map(|| {
350            r#"
351                    promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
352                    "#
353        });
354
355        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
356        wait_for_tcp(in_addr).await;
357
358        let config = PrometheusScrapeConfig {
359            endpoints: vec![format!("http://{}/metrics", in_addr)],
360            interval: Duration::from_secs(1),
361            timeout: default_timeout(),
362            instance_tag: Some("instance".to_string()),
363            endpoint_tag: Some("endpoint".to_string()),
364            honor_labels: true,
365            query: HashMap::new(),
366            auth: None,
367            tls: None,
368        };
369
370        let events = run_and_assert_source_compliance(
371            config,
372            Duration::from_secs(3),
373            &HTTP_PULL_SOURCE_TAGS,
374        )
375        .await;
376        assert!(!events.is_empty());
377    }
378
379    #[tokio::test]
380    async fn test_prometheus_honor_labels() {
381        let in_addr = next_addr();
382
383        let dummy_endpoint = warp::path!("metrics").map(|| {
384                r#"
385                    promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
386                    "#
387        });
388
389        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
390        wait_for_tcp(in_addr).await;
391
392        let config = PrometheusScrapeConfig {
393            endpoints: vec![format!("http://{}/metrics", in_addr)],
394            interval: Duration::from_secs(1),
395            timeout: default_timeout(),
396            instance_tag: Some("instance".to_string()),
397            endpoint_tag: Some("endpoint".to_string()),
398            honor_labels: true,
399            query: HashMap::new(),
400            auth: None,
401            tls: None,
402        };
403
404        let events = run_and_assert_source_compliance(
405            config,
406            Duration::from_secs(3),
407            &HTTP_PULL_SOURCE_TAGS,
408        )
409        .await;
410        assert!(!events.is_empty());
411
412        let metrics: Vec<_> = events
413            .into_iter()
414            .map(|event| event.into_metric())
415            .collect();
416
417        for metric in metrics {
418            assert_eq!(
419                metric.tag_value("instance"),
420                Some(String::from("localhost:9999"))
421            );
422            assert_eq!(
423                metric.tag_value("endpoint"),
424                Some(String::from("http://example.com"))
425            );
426            assert_eq!(metric.tag_value("exported_instance"), None,);
427            assert_eq!(metric.tag_value("exported_endpoint"), None,);
428        }
429    }
430
431    #[tokio::test]
432    async fn test_prometheus_do_not_honor_labels() {
433        let in_addr = next_addr();
434
435        let dummy_endpoint = warp::path!("metrics").map(|| {
436                r#"
437                    promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
438                "#
439        });
440
441        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
442        wait_for_tcp(in_addr).await;
443
444        let config = PrometheusScrapeConfig {
445            endpoints: vec![format!("http://{}/metrics", in_addr)],
446            interval: Duration::from_secs(1),
447            timeout: default_timeout(),
448            instance_tag: Some("instance".to_string()),
449            endpoint_tag: Some("endpoint".to_string()),
450            honor_labels: false,
451            query: HashMap::new(),
452            auth: None,
453            tls: None,
454        };
455
456        let events = run_and_assert_source_compliance(
457            config,
458            Duration::from_secs(3),
459            &HTTP_PULL_SOURCE_TAGS,
460        )
461        .await;
462        assert!(!events.is_empty());
463
464        let metrics: Vec<_> = events
465            .into_iter()
466            .map(|event| event.into_metric())
467            .collect();
468
469        for metric in metrics {
470            assert_eq!(
471                metric.tag_value("instance"),
472                Some(format!("{}:{}", in_addr.ip(), in_addr.port()))
473            );
474            assert_eq!(
475                metric.tag_value("endpoint"),
476                Some(format!(
477                    "http://{}:{}/metrics",
478                    in_addr.ip(),
479                    in_addr.port()
480                ))
481            );
482            assert_eq!(
483                metric.tag_value("exported_instance"),
484                Some(String::from("localhost:9999"))
485            );
486            assert_eq!(
487                metric.tag_value("exported_endpoint"),
488                Some(String::from("http://example.com"))
489            );
490        }
491    }
492
493    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
494    /// > Label names MUST be unique within a LabelSet.
495    /// Prometheus itself will reject the metric with an error. Largely to remain backward compatible with older versions of Vector,
496    /// we accept the metric, but take the last label in the list.
497    #[tokio::test]
498    async fn test_prometheus_duplicate_tags() {
499        let in_addr = next_addr();
500
501        let dummy_endpoint = warp::path!("metrics").map(|| {
502            r#"
503                    metric_label{code="200",code="success"} 100 1612411516789
504            "#
505        });
506
507        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
508        wait_for_tcp(in_addr).await;
509
510        let config = PrometheusScrapeConfig {
511            endpoints: vec![format!("http://{}/metrics", in_addr)],
512            interval: Duration::from_secs(1),
513            timeout: default_timeout(),
514            instance_tag: Some("instance".to_string()),
515            endpoint_tag: Some("endpoint".to_string()),
516            honor_labels: true,
517            query: HashMap::new(),
518            auth: None,
519            tls: None,
520        };
521
522        let events = run_and_assert_source_compliance(
523            config,
524            Duration::from_secs(3),
525            &HTTP_PULL_SOURCE_TAGS,
526        )
527        .await;
528        assert!(!events.is_empty());
529
530        let metrics: Vec<vector_lib::event::Metric> = events
531            .into_iter()
532            .map(|event| event.into_metric())
533            .collect();
534        let metric = &metrics[0];
535
536        assert_eq!(metric.name(), "metric_label");
537
538        let code_tag = metric
539            .tags()
540            .unwrap()
541            .iter_all()
542            .filter(|(name, _value)| *name == "code")
543            .map(|(_name, value)| value)
544            .collect::<Vec<_>>();
545
546        assert_eq!(1, code_tag.len());
547        assert_eq!("success", code_tag[0].unwrap());
548    }
549
550    #[tokio::test]
551    async fn test_prometheus_request_query() {
552        let in_addr = next_addr();
553
554        let dummy_endpoint = warp::path!("metrics").and(warp::query::raw()).map(|query| {
555            format!(
556                r#"
557                    promhttp_metric_handler_requests_total{{query="{query}"}} 100 1612411516789
558                "#
559            )
560        });
561
562        tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
563        wait_for_tcp(in_addr).await;
564
565        let config = PrometheusScrapeConfig {
566            endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
567            interval: Duration::from_secs(1),
568            timeout: default_timeout(),
569            instance_tag: Some("instance".to_string()),
570            endpoint_tag: Some("endpoint".to_string()),
571            honor_labels: false,
572            query: HashMap::from([
573                (
574                    "key1".to_string(),
575                    QueryParameterValue::MultiParams(vec![ParameterValue::String(
576                        "val2".to_string(),
577                    )]),
578                ),
579                (
580                    "key2".to_string(),
581                    QueryParameterValue::MultiParams(vec![
582                        ParameterValue::String("val1".to_string()),
583                        ParameterValue::String("val2".to_string()),
584                    ]),
585                ),
586            ]),
587            auth: None,
588            tls: None,
589        };
590
591        let events = run_and_assert_source_compliance(
592            config,
593            Duration::from_secs(3),
594            &HTTP_PULL_SOURCE_TAGS,
595        )
596        .await;
597        assert!(!events.is_empty());
598
599        let metrics: Vec<_> = events
600            .into_iter()
601            .map(|event| event.into_metric())
602            .collect();
603
604        let expected = HashMap::from([
605            (
606                "key1".to_string(),
607                vec!["val1".to_string(), "val2".to_string()],
608            ),
609            (
610                "key2".to_string(),
611                vec!["val1".to_string(), "val2".to_string()],
612            ),
613        ]);
614
615        for metric in metrics {
616            let query = metric.tag_value("query").expect("query must be tagged");
617            let mut got: HashMap<String, Vec<String>> = HashMap::new();
618            for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
619                got.entry(k.to_string()).or_default().push(v.to_string());
620            }
621            for v in got.values_mut() {
622                v.sort();
623            }
624            assert_eq!(got, expected);
625        }
626    }
627
628    // Intentially not using assert_source_compliance here because this is a round-trip test which
629    // means source and sink will both emit `EventsSent` , triggering multi-emission check.
630    #[tokio::test]
631    async fn test_prometheus_routing() {
632        trace_init();
633        let in_addr = next_addr();
634        let out_addr = next_addr();
635
636        let make_svc = make_service_fn(|_| async {
637            Ok::<_, Error>(service_fn(|_| async {
638                Ok::<_, Error>(Response::new(Body::from(
639                    r#"
640                    # HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
641                    # TYPE promhttp_metric_handler_requests_total counter
642                    promhttp_metric_handler_requests_total{code="200"} 100 1612411516789
643                    promhttp_metric_handler_requests_total{code="404"} 7 1612411516789
644                    prometheus_remote_storage_samples_in_total 57011636 1612411516789
645                    # A histogram, which has a pretty complex representation in the text format:
646                    # HELP http_request_duration_seconds A histogram of the request duration.
647                    # TYPE http_request_duration_seconds histogram
648                    http_request_duration_seconds_bucket{le="0.05"} 24054 1612411516789
649                    http_request_duration_seconds_bucket{le="0.1"} 33444 1612411516789
650                    http_request_duration_seconds_bucket{le="0.2"} 100392 1612411516789
651                    http_request_duration_seconds_bucket{le="0.5"} 129389 1612411516789
652                    http_request_duration_seconds_bucket{le="1"} 133988 1612411516789
653                    http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411516789
654                    http_request_duration_seconds_sum 53423 1612411516789
655                    http_request_duration_seconds_count 144320 1612411516789
656                    # Finally a summary, which has a complex representation, too:
657                    # HELP rpc_duration_seconds A summary of the RPC duration in seconds.
658                    # TYPE rpc_duration_seconds summary
659                    rpc_duration_seconds{code="200",quantile="0.01"} 3102 1612411516789
660                    rpc_duration_seconds{code="200",quantile="0.05"} 3272 1612411516789
661                    rpc_duration_seconds{code="200",quantile="0.5"} 4773 1612411516789
662                    rpc_duration_seconds{code="200",quantile="0.9"} 9001 1612411516789
663                    rpc_duration_seconds{code="200",quantile="0.99"} 76656 1612411516789
664                    rpc_duration_seconds_sum{code="200"} 1.7560473e+07 1612411516789
665                    rpc_duration_seconds_count{code="200"} 2693 1612411516789
666                    "#,
667                )))
668            }))
669        });
670
671        tokio::spawn(async move {
672            if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
673                error!(message = "Server error.", %error);
674            }
675        });
676        wait_for_tcp(in_addr).await;
677
678        let mut config = config::Config::builder();
679        config.add_source(
680            "in",
681            PrometheusScrapeConfig {
682                endpoints: vec![format!("http://{}", in_addr)],
683                instance_tag: None,
684                endpoint_tag: None,
685                honor_labels: false,
686                query: HashMap::new(),
687                interval: Duration::from_secs(1),
688                timeout: default_timeout(),
689                tls: None,
690                auth: None,
691            },
692        );
693        config.add_sink(
694            "out",
695            &["in"],
696            PrometheusExporterConfig {
697                address: out_addr,
698                auth: None,
699                tls: None,
700                default_namespace: Some("vector".into()),
701                buckets: vec![1.0, 2.0, 4.0],
702                quantiles: vec![],
703                distributions_as_summaries: false,
704                flush_period_secs: Duration::from_secs(3),
705                suppress_timestamp: false,
706                acknowledgements: Default::default(),
707            },
708        );
709
710        let (topology, _) = start_topology(config.build().unwrap(), false).await;
711        sleep(Duration::from_secs(1)).await;
712
713        let response = Client::new()
714            .get(format!("http://{out_addr}/metrics").parse().unwrap())
715            .await
716            .unwrap();
717
718        assert!(response.status().is_success());
719        let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
720        let lines = std::str::from_utf8(&body)
721            .unwrap()
722            .lines()
723            .collect::<Vec<_>>();
724
725        assert_eq!(
726            lines,
727            vec![
728                "# HELP vector_http_request_duration_seconds http_request_duration_seconds",
729                "# TYPE vector_http_request_duration_seconds histogram",
730                "vector_http_request_duration_seconds_bucket{le=\"0.05\"} 24054 1612411516789",
731                "vector_http_request_duration_seconds_bucket{le=\"0.1\"} 33444 1612411516789",
732                "vector_http_request_duration_seconds_bucket{le=\"0.2\"} 100392 1612411516789",
733                "vector_http_request_duration_seconds_bucket{le=\"0.5\"} 129389 1612411516789",
734                "vector_http_request_duration_seconds_bucket{le=\"1\"} 133988 1612411516789",
735                "vector_http_request_duration_seconds_bucket{le=\"+Inf\"} 144320 1612411516789",
736                "vector_http_request_duration_seconds_sum 53423 1612411516789",
737                "vector_http_request_duration_seconds_count 144320 1612411516789",
738                "# HELP vector_prometheus_remote_storage_samples_in_total prometheus_remote_storage_samples_in_total",
739                "# TYPE vector_prometheus_remote_storage_samples_in_total gauge",
740                "vector_prometheus_remote_storage_samples_in_total 57011636 1612411516789",
741                "# HELP vector_promhttp_metric_handler_requests_total promhttp_metric_handler_requests_total",
742                "# TYPE vector_promhttp_metric_handler_requests_total counter",
743                "vector_promhttp_metric_handler_requests_total{code=\"200\"} 100 1612411516789",
744                "vector_promhttp_metric_handler_requests_total{code=\"404\"} 7 1612411516789",
745                "# HELP vector_rpc_duration_seconds rpc_duration_seconds",
746                "# TYPE vector_rpc_duration_seconds summary",
747                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.01\"} 3102 1612411516789",
748                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.05\"} 3272 1612411516789",
749                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.5\"} 4773 1612411516789",
750                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.9\"} 9001 1612411516789",
751                "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.99\"} 76656 1612411516789",
752                "vector_rpc_duration_seconds_sum{code=\"200\"} 17560473 1612411516789",
753                "vector_rpc_duration_seconds_count{code=\"200\"} 2693 1612411516789",
754            ],
755        );
756
757        topology.stop().await;
758    }
759}
760
761#[cfg(all(test, feature = "prometheus-integration-tests"))]
762mod integration_tests {
763    use tokio::time::Duration;
764
765    use super::*;
766    use crate::{
767        event::{MetricKind, MetricValue},
768        test_util::components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
769    };
770
771    #[tokio::test]
772    async fn scrapes_metrics() {
773        let config = PrometheusScrapeConfig {
774            endpoints: vec!["http://prometheus:9090/metrics".into()],
775            interval: Duration::from_secs(1),
776            timeout: Duration::from_secs(1),
777            instance_tag: Some("instance".to_string()),
778            endpoint_tag: Some("endpoint".to_string()),
779            honor_labels: false,
780            query: HashMap::new(),
781            auth: None,
782            tls: None,
783        };
784
785        let events = run_and_assert_source_compliance(
786            config,
787            Duration::from_secs(3),
788            &HTTP_PULL_SOURCE_TAGS,
789        )
790        .await;
791        assert!(!events.is_empty());
792
793        let metrics: Vec<_> = events
794            .into_iter()
795            .map(|event| event.into_metric())
796            .collect();
797
798        let find_metric = |name: &str| {
799            metrics
800                .iter()
801                .find(|metric| metric.name() == name)
802                .unwrap_or_else(|| panic!("Missing metric {name:?}"))
803        };
804
805        // Sample some well-known metrics
806        let build = find_metric("prometheus_build_info");
807        assert!(matches!(build.kind(), MetricKind::Absolute));
808        assert!(matches!(build.value(), &MetricValue::Gauge { .. }));
809        assert!(build.tags().unwrap().contains_key("branch"));
810        assert!(build.tags().unwrap().contains_key("version"));
811        assert_eq!(
812            build.tag_value("instance"),
813            Some("prometheus:9090".to_string())
814        );
815        assert_eq!(
816            build.tag_value("endpoint"),
817            Some("http://prometheus:9090/metrics".to_string())
818        );
819
820        let queries = find_metric("prometheus_engine_queries");
821        assert!(matches!(queries.kind(), MetricKind::Absolute));
822        assert!(matches!(queries.value(), &MetricValue::Gauge { .. }));
823        assert_eq!(
824            queries.tag_value("instance"),
825            Some("prometheus:9090".to_string())
826        );
827        assert_eq!(
828            queries.tag_value("endpoint"),
829            Some("http://prometheus:9090/metrics".to_string())
830        );
831
832        let go_info = find_metric("go_info");
833        assert!(matches!(go_info.kind(), MetricKind::Absolute));
834        assert!(matches!(go_info.value(), &MetricValue::Gauge { .. }));
835        assert!(go_info.tags().unwrap().contains_key("version"));
836        assert_eq!(
837            go_info.tag_value("instance"),
838            Some("prometheus:9090".to_string())
839        );
840        assert_eq!(
841            go_info.tag_value("endpoint"),
842            Some("http://prometheus:9090/metrics".to_string())
843        );
844    }
845}