1use std::{collections::HashMap, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::{Uri, response::Parts};
6use serde_with::serde_as;
7use snafu::ResultExt;
8use vector_lib::{config::LogNamespace, configurable::configurable_component, event::Event};
9
10use super::parser;
11use crate::{
12 Result,
13 config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
14 http::{Auth, QueryParameters},
15 internal_events::PrometheusParseError,
16 sources::{
17 self,
18 util::{
19 http::HttpMethod,
20 http_client::{
21 GenericHttpClientInputs, HttpClientBuilder, HttpClientContext, build_url, call,
22 default_interval, default_timeout, warn_if_interval_too_low,
23 },
24 },
25 },
26 tls::{TlsConfig, TlsSettings},
27};
28
29static PARSE_ERROR_NO_PATH: &str = "No path is set on the endpoint and we got a parse error,\
32 did you mean to use /metrics? This behavior changed in version 0.11.";
33static NOT_FOUND_NO_PATH: &str = "No path is set on the endpoint and we got a 404,\
34 did you mean to use /metrics?\
35 This behavior changed in version 0.11.";
36
37#[serde_as]
39#[configurable_component(source(
40 "prometheus_scrape",
41 "Collect metrics from Prometheus exporters."
42))]
43#[derive(Clone, Debug)]
44pub struct PrometheusScrapeConfig {
45 #[configurable(metadata(docs::examples = "http://localhost:9090/metrics"))]
47 #[serde(alias = "hosts")]
48 endpoints: Vec<String>,
49
50 #[serde(default = "default_interval")]
54 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
55 #[serde(rename = "scrape_interval_secs")]
56 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
57 interval: Duration,
58
59 #[serde(default = "default_timeout")]
61 #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
62 #[serde(rename = "scrape_timeout_secs")]
63 #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
64 timeout: Duration,
65
66 #[configurable(metadata(docs::advanced))]
70 instance_tag: Option<String>,
71
72 #[configurable(metadata(docs::advanced))]
76 endpoint_tag: Option<String>,
77
78 #[serde(default = "crate::serde::default_false")]
85 #[configurable(metadata(docs::advanced))]
86 honor_labels: bool,
87
88 #[serde(default)]
94 #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
95 #[configurable(metadata(docs::examples = "query_example()"))]
96 query: QueryParameters,
97
98 #[configurable(derived)]
99 tls: Option<TlsConfig>,
100
101 #[configurable(derived)]
102 #[configurable(metadata(docs::advanced))]
103 auth: Option<Auth>,
104}
105
106fn query_example() -> serde_json::Value {
107 serde_json::json! ({
108 "match[]": [
109 "{job=\"somejob\"}",
110 "{__name__=~\"job:.*\"}"
111 ]
112 })
113}
114
115impl GenerateConfig for PrometheusScrapeConfig {
116 fn generate_config() -> toml::Value {
117 toml::Value::try_from(Self {
118 endpoints: vec!["http://localhost:9090/metrics".to_string()],
119 interval: default_interval(),
120 timeout: default_timeout(),
121 instance_tag: Some("instance".to_string()),
122 endpoint_tag: Some("endpoint".to_string()),
123 honor_labels: false,
124 query: HashMap::new(),
125 tls: None,
126 auth: None,
127 })
128 .unwrap()
129 }
130}
131
132#[async_trait::async_trait]
133#[typetag::serde(name = "prometheus_scrape")]
134impl SourceConfig for PrometheusScrapeConfig {
135 async fn build(&self, cx: SourceContext) -> Result<sources::Source> {
136 let urls = self
137 .endpoints
138 .iter()
139 .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
140 .map(|r| r.map(|uri| build_url(&uri, &self.query)))
141 .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
142 let tls = TlsSettings::from_options(self.tls.as_ref())?;
143
144 let builder = PrometheusScrapeBuilder {
145 honor_labels: self.honor_labels,
146 instance_tag: self.instance_tag.clone(),
147 endpoint_tag: self.endpoint_tag.clone(),
148 };
149
150 warn_if_interval_too_low(self.timeout, self.interval);
151
152 let inputs = GenericHttpClientInputs {
153 urls,
154 interval: self.interval,
155 timeout: self.timeout,
156 headers: HashMap::new(),
157 content_type: "text/plain".to_string(),
158 auth: self.auth.clone(),
159 tls,
160 proxy: cx.proxy.clone(),
161 shutdown: cx.shutdown,
162 };
163
164 Ok(call(inputs, builder, cx.out, HttpMethod::Get).boxed())
165 }
166
167 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
168 vec![SourceOutput::new_metrics()]
169 }
170
171 fn can_acknowledge(&self) -> bool {
172 false
173 }
174}
175
176#[derive(Clone)]
180struct InstanceInfo {
181 tag: String,
182 instance: String,
183 honor_label: bool,
184}
185
186#[derive(Clone)]
190struct EndpointInfo {
191 tag: String,
192 endpoint: String,
193 honor_label: bool,
194}
195
196#[derive(Clone)]
198struct PrometheusScrapeBuilder {
199 honor_labels: bool,
200 instance_tag: Option<String>,
201 endpoint_tag: Option<String>,
202}
203
204impl HttpClientBuilder for PrometheusScrapeBuilder {
205 type Context = PrometheusScrapeContext;
206
207 fn build(&self, url: &Uri) -> Self::Context {
209 let instance_info = self.instance_tag.as_ref().map(|tag| {
210 let instance = format!(
211 "{}:{}",
212 url.host().unwrap_or_default(),
213 url.port_u16().unwrap_or_else(|| match url.scheme() {
214 Some(scheme) if scheme == &http::uri::Scheme::HTTP => 80,
215 Some(scheme) if scheme == &http::uri::Scheme::HTTPS => 443,
216 _ => 0,
217 })
218 );
219 InstanceInfo {
220 tag: tag.to_string(),
221 instance,
222 honor_label: self.honor_labels,
223 }
224 });
225 let endpoint_info = self.endpoint_tag.as_ref().map(|tag| EndpointInfo {
226 tag: tag.to_string(),
227 endpoint: url.to_string(),
228 honor_label: self.honor_labels,
229 });
230 PrometheusScrapeContext {
231 instance_info,
232 endpoint_info,
233 }
234 }
235}
236
237struct PrometheusScrapeContext {
239 instance_info: Option<InstanceInfo>,
240 endpoint_info: Option<EndpointInfo>,
241}
242
243impl HttpClientContext for PrometheusScrapeContext {
244 fn enrich_events(&mut self, events: &mut Vec<Event>) {
245 for event in events.iter_mut() {
246 let metric = event.as_mut_metric();
247 if let Some(InstanceInfo {
248 tag,
249 instance,
250 honor_label,
251 }) = &self.instance_info
252 {
253 match (honor_label, metric.tag_value(tag)) {
254 (false, Some(old_instance)) => {
255 metric.replace_tag(format!("exported_{tag}"), old_instance);
256 metric.replace_tag(tag.clone(), instance.clone());
257 }
258 (true, Some(_)) => {}
259 (_, None) => {
260 metric.replace_tag(tag.clone(), instance.clone());
261 }
262 }
263 }
264 if let Some(EndpointInfo {
265 tag,
266 endpoint,
267 honor_label,
268 }) = &self.endpoint_info
269 {
270 match (honor_label, metric.tag_value(tag)) {
271 (false, Some(old_endpoint)) => {
272 metric.replace_tag(format!("exported_{tag}"), old_endpoint);
273 metric.replace_tag(tag.clone(), endpoint.clone());
274 }
275 (true, Some(_)) => {}
276 (_, None) => {
277 metric.replace_tag(tag.clone(), endpoint.clone());
278 }
279 }
280 }
281 }
282 }
283
284 fn on_response(&mut self, url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
286 let body = String::from_utf8_lossy(body);
287
288 match parser::parse_text(&body) {
289 Ok(events) => Some(events),
290 Err(error) => {
291 if url.path() == "/" {
292 warn!(
294 message = PARSE_ERROR_NO_PATH,
295 endpoint = %url,
296 );
297 }
298 emit!(PrometheusParseError {
299 error,
300 url: url.clone(),
301 body,
302 });
303 None
304 }
305 }
306 }
307
308 fn on_http_response_error(&self, url: &Uri, header: &Parts) {
309 if header.status == hyper::StatusCode::NOT_FOUND && url.path() == "/" {
310 warn!(
312 message = NOT_FOUND_NO_PATH,
313 endpoint = %url,
314 );
315 }
316 }
317}
318
319#[cfg(all(test, feature = "sinks-prometheus"))]
320mod test {
321 use http_body::Body as _;
322 use hyper::{
323 Body, Client, Response, Server,
324 service::{make_service_fn, service_fn},
325 };
326 use similar_asserts::assert_eq;
327 use tokio::time::{Duration, sleep};
328 use warp::Filter;
329
330 use super::*;
331 use crate::{
332 Error, config,
333 http::{ParameterValue, QueryParameterValue},
334 sinks::prometheus::exporter::PrometheusExporterConfig,
335 test_util::{
336 addr::next_addr,
337 components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
338 start_topology, trace_init, wait_for_tcp,
339 },
340 };
341
342 #[test]
343 fn generate_config() {
344 crate::test_util::test_generate_config::<PrometheusScrapeConfig>();
345 }
346
347 #[tokio::test]
348 async fn test_prometheus_sets_headers() {
349 let (_guard, in_addr) = next_addr();
350
351 let dummy_endpoint = warp::path!("metrics").and(warp::header::exact("Accept", "text/plain")).map(|| {
352 r#"
353 promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
354 "#
355 });
356
357 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
358 wait_for_tcp(in_addr).await;
359
360 let config = PrometheusScrapeConfig {
361 endpoints: vec![format!("http://{}/metrics", in_addr)],
362 interval: Duration::from_secs(1),
363 timeout: default_timeout(),
364 instance_tag: Some("instance".to_string()),
365 endpoint_tag: Some("endpoint".to_string()),
366 honor_labels: true,
367 query: HashMap::new(),
368 auth: None,
369 tls: None,
370 };
371
372 let events = run_and_assert_source_compliance(
373 config,
374 Duration::from_secs(3),
375 &HTTP_PULL_SOURCE_TAGS,
376 )
377 .await;
378 assert!(!events.is_empty());
379 }
380
381 #[tokio::test]
382 async fn test_prometheus_honor_labels() {
383 let (_guard, in_addr) = next_addr();
384
385 let dummy_endpoint = warp::path!("metrics").map(|| {
386 r#"
387 promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
388 "#
389 });
390
391 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
392 wait_for_tcp(in_addr).await;
393
394 let config = PrometheusScrapeConfig {
395 endpoints: vec![format!("http://{}/metrics", in_addr)],
396 interval: Duration::from_secs(1),
397 timeout: default_timeout(),
398 instance_tag: Some("instance".to_string()),
399 endpoint_tag: Some("endpoint".to_string()),
400 honor_labels: true,
401 query: HashMap::new(),
402 auth: None,
403 tls: None,
404 };
405
406 let events = run_and_assert_source_compliance(
407 config,
408 Duration::from_secs(3),
409 &HTTP_PULL_SOURCE_TAGS,
410 )
411 .await;
412 assert!(!events.is_empty());
413
414 let metrics: Vec<_> = events
415 .into_iter()
416 .map(|event| event.into_metric())
417 .collect();
418
419 for metric in metrics {
420 assert_eq!(
421 metric.tag_value("instance"),
422 Some(String::from("localhost:9999"))
423 );
424 assert_eq!(
425 metric.tag_value("endpoint"),
426 Some(String::from("http://example.com"))
427 );
428 assert_eq!(metric.tag_value("exported_instance"), None,);
429 assert_eq!(metric.tag_value("exported_endpoint"), None,);
430 }
431 }
432
433 #[tokio::test]
434 async fn test_prometheus_do_not_honor_labels() {
435 let (_guard, in_addr) = next_addr();
436
437 let dummy_endpoint = warp::path!("metrics").map(|| {
438 r#"
439 promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
440 "#
441 });
442
443 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
444 wait_for_tcp(in_addr).await;
445
446 let config = PrometheusScrapeConfig {
447 endpoints: vec![format!("http://{}/metrics", in_addr)],
448 interval: Duration::from_secs(1),
449 timeout: default_timeout(),
450 instance_tag: Some("instance".to_string()),
451 endpoint_tag: Some("endpoint".to_string()),
452 honor_labels: false,
453 query: HashMap::new(),
454 auth: None,
455 tls: None,
456 };
457
458 let events = run_and_assert_source_compliance(
459 config,
460 Duration::from_secs(3),
461 &HTTP_PULL_SOURCE_TAGS,
462 )
463 .await;
464 assert!(!events.is_empty());
465
466 let metrics: Vec<_> = events
467 .into_iter()
468 .map(|event| event.into_metric())
469 .collect();
470
471 for metric in metrics {
472 assert_eq!(
473 metric.tag_value("instance"),
474 Some(format!("{}:{}", in_addr.ip(), in_addr.port()))
475 );
476 assert_eq!(
477 metric.tag_value("endpoint"),
478 Some(format!(
479 "http://{}:{}/metrics",
480 in_addr.ip(),
481 in_addr.port()
482 ))
483 );
484 assert_eq!(
485 metric.tag_value("exported_instance"),
486 Some(String::from("localhost:9999"))
487 );
488 assert_eq!(
489 metric.tag_value("exported_endpoint"),
490 Some(String::from("http://example.com"))
491 );
492 }
493 }
494
495 #[tokio::test]
500 async fn test_prometheus_duplicate_tags() {
501 let (_guard, in_addr) = next_addr();
502
503 let dummy_endpoint = warp::path!("metrics").map(|| {
504 r#"
505 metric_label{code="200",code="success"} 100 1612411516789
506 "#
507 });
508
509 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
510 wait_for_tcp(in_addr).await;
511
512 let config = PrometheusScrapeConfig {
513 endpoints: vec![format!("http://{}/metrics", in_addr)],
514 interval: Duration::from_secs(1),
515 timeout: default_timeout(),
516 instance_tag: Some("instance".to_string()),
517 endpoint_tag: Some("endpoint".to_string()),
518 honor_labels: true,
519 query: HashMap::new(),
520 auth: None,
521 tls: None,
522 };
523
524 let events = run_and_assert_source_compliance(
525 config,
526 Duration::from_secs(3),
527 &HTTP_PULL_SOURCE_TAGS,
528 )
529 .await;
530 assert!(!events.is_empty());
531
532 let metrics: Vec<vector_lib::event::Metric> = events
533 .into_iter()
534 .map(|event| event.into_metric())
535 .collect();
536 let metric = &metrics[0];
537
538 assert_eq!(metric.name(), "metric_label");
539
540 let code_tag = metric
541 .tags()
542 .unwrap()
543 .iter_all()
544 .filter(|(name, _value)| *name == "code")
545 .map(|(_name, value)| value)
546 .collect::<Vec<_>>();
547
548 assert_eq!(1, code_tag.len());
549 assert_eq!("success", code_tag[0].unwrap());
550 }
551
552 #[tokio::test]
553 async fn test_prometheus_request_query() {
554 let (_guard, in_addr) = next_addr();
555
556 let dummy_endpoint = warp::path!("metrics").and(warp::query::raw()).map(|query| {
557 format!(
558 r#"
559 promhttp_metric_handler_requests_total{{query="{query}"}} 100 1612411516789
560 "#
561 )
562 });
563
564 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
565 wait_for_tcp(in_addr).await;
566
567 let config = PrometheusScrapeConfig {
568 endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
569 interval: Duration::from_secs(1),
570 timeout: default_timeout(),
571 instance_tag: Some("instance".to_string()),
572 endpoint_tag: Some("endpoint".to_string()),
573 honor_labels: false,
574 query: HashMap::from([
575 (
576 "key1".to_string(),
577 QueryParameterValue::MultiParams(vec![ParameterValue::String(
578 "val2".to_string(),
579 )]),
580 ),
581 (
582 "key2".to_string(),
583 QueryParameterValue::MultiParams(vec![
584 ParameterValue::String("val1".to_string()),
585 ParameterValue::String("val2".to_string()),
586 ]),
587 ),
588 ]),
589 auth: None,
590 tls: None,
591 };
592
593 let events = run_and_assert_source_compliance(
594 config,
595 Duration::from_secs(3),
596 &HTTP_PULL_SOURCE_TAGS,
597 )
598 .await;
599 assert!(!events.is_empty());
600
601 let metrics: Vec<_> = events
602 .into_iter()
603 .map(|event| event.into_metric())
604 .collect();
605
606 let expected = HashMap::from([
607 (
608 "key1".to_string(),
609 vec!["val1".to_string(), "val2".to_string()],
610 ),
611 (
612 "key2".to_string(),
613 vec!["val1".to_string(), "val2".to_string()],
614 ),
615 ]);
616
617 for metric in metrics {
618 let query = metric.tag_value("query").expect("query must be tagged");
619 let mut got: HashMap<String, Vec<String>> = HashMap::new();
620 for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
621 got.entry(k.to_string()).or_default().push(v.to_string());
622 }
623 for v in got.values_mut() {
624 v.sort();
625 }
626 assert_eq!(got, expected);
627 }
628 }
629
630 #[tokio::test]
633 async fn test_prometheus_routing() {
634 trace_init();
635 let (_in_guard, in_addr) = next_addr();
636 let (_out_guard, out_addr) = next_addr();
637
638 let make_svc = make_service_fn(|_| async {
639 Ok::<_, Error>(service_fn(|_| async {
640 Ok::<_, Error>(Response::new(Body::from(
641 r#"
642 # HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
643 # TYPE promhttp_metric_handler_requests_total counter
644 promhttp_metric_handler_requests_total{code="200"} 100 1612411516789
645 promhttp_metric_handler_requests_total{code="404"} 7 1612411516789
646 prometheus_remote_storage_samples_in_total 57011636 1612411516789
647 # A histogram, which has a pretty complex representation in the text format:
648 # HELP http_request_duration_seconds A histogram of the request duration.
649 # TYPE http_request_duration_seconds histogram
650 http_request_duration_seconds_bucket{le="0.05"} 24054 1612411516789
651 http_request_duration_seconds_bucket{le="0.1"} 33444 1612411516789
652 http_request_duration_seconds_bucket{le="0.2"} 100392 1612411516789
653 http_request_duration_seconds_bucket{le="0.5"} 129389 1612411516789
654 http_request_duration_seconds_bucket{le="1"} 133988 1612411516789
655 http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411516789
656 http_request_duration_seconds_sum 53423 1612411516789
657 http_request_duration_seconds_count 144320 1612411516789
658 # Finally a summary, which has a complex representation, too:
659 # HELP rpc_duration_seconds A summary of the RPC duration in seconds.
660 # TYPE rpc_duration_seconds summary
661 rpc_duration_seconds{code="200",quantile="0.01"} 3102 1612411516789
662 rpc_duration_seconds{code="200",quantile="0.05"} 3272 1612411516789
663 rpc_duration_seconds{code="200",quantile="0.5"} 4773 1612411516789
664 rpc_duration_seconds{code="200",quantile="0.9"} 9001 1612411516789
665 rpc_duration_seconds{code="200",quantile="0.99"} 76656 1612411516789
666 rpc_duration_seconds_sum{code="200"} 1.7560473e+07 1612411516789
667 rpc_duration_seconds_count{code="200"} 2693 1612411516789
668 "#,
669 )))
670 }))
671 });
672
673 tokio::spawn(async move {
674 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
675 error!(message = "Server error.", %error);
676 }
677 });
678 wait_for_tcp(in_addr).await;
679
680 let mut config = config::Config::builder();
681 config.add_source(
682 "in",
683 PrometheusScrapeConfig {
684 endpoints: vec![format!("http://{}", in_addr)],
685 instance_tag: None,
686 endpoint_tag: None,
687 honor_labels: false,
688 query: HashMap::new(),
689 interval: Duration::from_secs(1),
690 timeout: default_timeout(),
691 tls: None,
692 auth: None,
693 },
694 );
695 config.add_sink(
696 "out",
697 &["in"],
698 PrometheusExporterConfig {
699 address: out_addr,
700 auth: None,
701 tls: None,
702 default_namespace: Some("vector".into()),
703 buckets: vec![1.0, 2.0, 4.0],
704 quantiles: vec![],
705 distributions_as_summaries: false,
706 flush_period_secs: Duration::from_secs(3),
707 suppress_timestamp: false,
708 acknowledgements: Default::default(),
709 },
710 );
711
712 let (topology, _) = start_topology(config.build().unwrap(), false).await;
713 sleep(Duration::from_secs(1)).await;
714
715 let response = Client::new()
716 .get(format!("http://{out_addr}/metrics").parse().unwrap())
717 .await
718 .unwrap();
719
720 assert!(response.status().is_success());
721 let body = response.into_body().collect().await.unwrap().to_bytes();
722 let lines = std::str::from_utf8(&body)
723 .unwrap()
724 .lines()
725 .collect::<Vec<_>>();
726
727 assert_eq!(
728 lines,
729 vec![
730 "# HELP vector_http_request_duration_seconds http_request_duration_seconds",
731 "# TYPE vector_http_request_duration_seconds histogram",
732 "vector_http_request_duration_seconds_bucket{le=\"0.05\"} 24054 1612411516789",
733 "vector_http_request_duration_seconds_bucket{le=\"0.1\"} 33444 1612411516789",
734 "vector_http_request_duration_seconds_bucket{le=\"0.2\"} 100392 1612411516789",
735 "vector_http_request_duration_seconds_bucket{le=\"0.5\"} 129389 1612411516789",
736 "vector_http_request_duration_seconds_bucket{le=\"1\"} 133988 1612411516789",
737 "vector_http_request_duration_seconds_bucket{le=\"+Inf\"} 144320 1612411516789",
738 "vector_http_request_duration_seconds_sum 53423 1612411516789",
739 "vector_http_request_duration_seconds_count 144320 1612411516789",
740 "# HELP vector_prometheus_remote_storage_samples_in_total prometheus_remote_storage_samples_in_total",
741 "# TYPE vector_prometheus_remote_storage_samples_in_total gauge",
742 "vector_prometheus_remote_storage_samples_in_total 57011636 1612411516789",
743 "# HELP vector_promhttp_metric_handler_requests_total promhttp_metric_handler_requests_total",
744 "# TYPE vector_promhttp_metric_handler_requests_total counter",
745 "vector_promhttp_metric_handler_requests_total{code=\"200\"} 100 1612411516789",
746 "vector_promhttp_metric_handler_requests_total{code=\"404\"} 7 1612411516789",
747 "# HELP vector_rpc_duration_seconds rpc_duration_seconds",
748 "# TYPE vector_rpc_duration_seconds summary",
749 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.01\"} 3102 1612411516789",
750 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.05\"} 3272 1612411516789",
751 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.5\"} 4773 1612411516789",
752 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.9\"} 9001 1612411516789",
753 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.99\"} 76656 1612411516789",
754 "vector_rpc_duration_seconds_sum{code=\"200\"} 17560473 1612411516789",
755 "vector_rpc_duration_seconds_count{code=\"200\"} 2693 1612411516789",
756 ],
757 );
758
759 topology.stop().await;
760 }
761}
762
763#[cfg(all(test, feature = "prometheus-integration-tests"))]
764mod integration_tests {
765 use tokio::time::Duration;
766
767 use super::*;
768 use crate::{
769 event::{MetricKind, MetricValue},
770 test_util::components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
771 };
772
773 #[tokio::test]
774 async fn scrapes_metrics() {
775 let config = PrometheusScrapeConfig {
776 endpoints: vec!["http://prometheus:9090/metrics".into()],
777 interval: Duration::from_secs(1),
778 timeout: Duration::from_secs(1),
779 instance_tag: Some("instance".to_string()),
780 endpoint_tag: Some("endpoint".to_string()),
781 honor_labels: false,
782 query: HashMap::new(),
783 auth: None,
784 tls: None,
785 };
786
787 let events = run_and_assert_source_compliance(
788 config,
789 Duration::from_secs(3),
790 &HTTP_PULL_SOURCE_TAGS,
791 )
792 .await;
793 assert!(!events.is_empty());
794
795 let metrics: Vec<_> = events
796 .into_iter()
797 .map(|event| event.into_metric())
798 .collect();
799
800 let find_metric = |name: &str| {
801 metrics
802 .iter()
803 .find(|metric| metric.name() == name)
804 .unwrap_or_else(|| panic!("Missing metric {name:?}"))
805 };
806
807 let build = find_metric("prometheus_build_info");
809 assert!(matches!(build.kind(), MetricKind::Absolute));
810 assert!(matches!(build.value(), &MetricValue::Gauge { .. }));
811 assert!(build.tags().unwrap().contains_key("branch"));
812 assert!(build.tags().unwrap().contains_key("version"));
813 assert_eq!(
814 build.tag_value("instance"),
815 Some("prometheus:9090".to_string())
816 );
817 assert_eq!(
818 build.tag_value("endpoint"),
819 Some("http://prometheus:9090/metrics".to_string())
820 );
821
822 let queries = find_metric("prometheus_engine_queries");
823 assert!(matches!(queries.kind(), MetricKind::Absolute));
824 assert!(matches!(queries.value(), &MetricValue::Gauge { .. }));
825 assert_eq!(
826 queries.tag_value("instance"),
827 Some("prometheus:9090".to_string())
828 );
829 assert_eq!(
830 queries.tag_value("endpoint"),
831 Some("http://prometheus:9090/metrics".to_string())
832 );
833
834 let go_info = find_metric("go_info");
835 assert!(matches!(go_info.kind(), MetricKind::Absolute));
836 assert!(matches!(go_info.value(), &MetricValue::Gauge { .. }));
837 assert!(go_info.tags().unwrap().contains_key("version"));
838 assert_eq!(
839 go_info.tag_value("instance"),
840 Some("prometheus:9090".to_string())
841 );
842 assert_eq!(
843 go_info.tag_value("endpoint"),
844 Some("http://prometheus:9090/metrics".to_string())
845 );
846 }
847}