1use std::{collections::HashMap, time::Duration};
2
3use bytes::Bytes;
4use futures_util::FutureExt;
5use http::{Uri, response::Parts};
6use serde_with::serde_as;
7use snafu::ResultExt;
8use vector_lib::{config::LogNamespace, configurable::configurable_component, event::Event};
9
10use super::parser;
11use crate::{
12 Result,
13 config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
14 http::{Auth, QueryParameters},
15 internal_events::PrometheusParseError,
16 sources::{
17 self,
18 util::{
19 http::HttpMethod,
20 http_client::{
21 GenericHttpClientInputs, HttpClientBuilder, HttpClientContext, build_url, call,
22 default_interval, default_timeout, warn_if_interval_too_low,
23 },
24 },
25 },
26 tls::{TlsConfig, TlsSettings},
27};
28
29static PARSE_ERROR_NO_PATH: &str = "No path is set on the endpoint and we got a parse error,\
32 did you mean to use /metrics? This behavior changed in version 0.11.";
33static NOT_FOUND_NO_PATH: &str = "No path is set on the endpoint and we got a 404,\
34 did you mean to use /metrics?\
35 This behavior changed in version 0.11.";
36
37#[serde_as]
39#[configurable_component(source(
40 "prometheus_scrape",
41 "Collect metrics from Prometheus exporters."
42))]
43#[derive(Clone, Debug)]
44pub struct PrometheusScrapeConfig {
45 #[configurable(metadata(docs::examples = "http://localhost:9090/metrics"))]
47 #[serde(alias = "hosts")]
48 endpoints: Vec<String>,
49
50 #[serde(default = "default_interval")]
54 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
55 #[serde(rename = "scrape_interval_secs")]
56 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
57 interval: Duration,
58
59 #[serde(default = "default_timeout")]
61 #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
62 #[serde(rename = "scrape_timeout_secs")]
63 #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
64 timeout: Duration,
65
66 #[configurable(metadata(docs::advanced))]
70 instance_tag: Option<String>,
71
72 #[configurable(metadata(docs::advanced))]
76 endpoint_tag: Option<String>,
77
78 #[serde(default = "crate::serde::default_false")]
85 #[configurable(metadata(docs::advanced))]
86 honor_labels: bool,
87
88 #[serde(default)]
94 #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
95 #[configurable(metadata(docs::examples = "query_example()"))]
96 query: QueryParameters,
97
98 #[configurable(derived)]
99 tls: Option<TlsConfig>,
100
101 #[configurable(derived)]
102 #[configurable(metadata(docs::advanced))]
103 auth: Option<Auth>,
104}
105
106fn query_example() -> serde_json::Value {
107 serde_json::json! ({
108 "match[]": [
109 "{job=\"somejob\"}",
110 "{__name__=~\"job:.*\"}"
111 ]
112 })
113}
114
115impl GenerateConfig for PrometheusScrapeConfig {
116 fn generate_config() -> toml::Value {
117 toml::Value::try_from(Self {
118 endpoints: vec!["http://localhost:9090/metrics".to_string()],
119 interval: default_interval(),
120 timeout: default_timeout(),
121 instance_tag: Some("instance".to_string()),
122 endpoint_tag: Some("endpoint".to_string()),
123 honor_labels: false,
124 query: HashMap::new(),
125 tls: None,
126 auth: None,
127 })
128 .unwrap()
129 }
130}
131
132#[async_trait::async_trait]
133#[typetag::serde(name = "prometheus_scrape")]
134impl SourceConfig for PrometheusScrapeConfig {
135 async fn build(&self, cx: SourceContext) -> Result<sources::Source> {
136 let urls = self
137 .endpoints
138 .iter()
139 .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
140 .map(|r| r.map(|uri| build_url(&uri, &self.query)))
141 .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
142 let tls = TlsSettings::from_options(self.tls.as_ref())?;
143
144 let builder = PrometheusScrapeBuilder {
145 honor_labels: self.honor_labels,
146 instance_tag: self.instance_tag.clone(),
147 endpoint_tag: self.endpoint_tag.clone(),
148 };
149
150 warn_if_interval_too_low(self.timeout, self.interval);
151
152 let inputs = GenericHttpClientInputs {
153 urls,
154 interval: self.interval,
155 timeout: self.timeout,
156 headers: HashMap::new(),
157 content_type: "text/plain".to_string(),
158 auth: self.auth.clone(),
159 tls,
160 proxy: cx.proxy.clone(),
161 shutdown: cx.shutdown,
162 };
163
164 Ok(call(inputs, builder, cx.out, HttpMethod::Get).boxed())
165 }
166
167 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
168 vec![SourceOutput::new_metrics()]
169 }
170
171 fn can_acknowledge(&self) -> bool {
172 false
173 }
174}
175
176#[derive(Clone)]
180struct InstanceInfo {
181 tag: String,
182 instance: String,
183 honor_label: bool,
184}
185
186#[derive(Clone)]
190struct EndpointInfo {
191 tag: String,
192 endpoint: String,
193 honor_label: bool,
194}
195
196#[derive(Clone)]
198struct PrometheusScrapeBuilder {
199 honor_labels: bool,
200 instance_tag: Option<String>,
201 endpoint_tag: Option<String>,
202}
203
204impl HttpClientBuilder for PrometheusScrapeBuilder {
205 type Context = PrometheusScrapeContext;
206
207 fn build(&self, url: &Uri) -> Self::Context {
209 let instance_info = self.instance_tag.as_ref().map(|tag| {
210 let instance = format!(
211 "{}:{}",
212 url.host().unwrap_or_default(),
213 url.port_u16().unwrap_or_else(|| match url.scheme() {
214 Some(scheme) if scheme == &http::uri::Scheme::HTTP => 80,
215 Some(scheme) if scheme == &http::uri::Scheme::HTTPS => 443,
216 _ => 0,
217 })
218 );
219 InstanceInfo {
220 tag: tag.to_string(),
221 instance,
222 honor_label: self.honor_labels,
223 }
224 });
225 let endpoint_info = self.endpoint_tag.as_ref().map(|tag| EndpointInfo {
226 tag: tag.to_string(),
227 endpoint: url.to_string(),
228 honor_label: self.honor_labels,
229 });
230 PrometheusScrapeContext {
231 instance_info,
232 endpoint_info,
233 }
234 }
235}
236
237struct PrometheusScrapeContext {
239 instance_info: Option<InstanceInfo>,
240 endpoint_info: Option<EndpointInfo>,
241}
242
243impl HttpClientContext for PrometheusScrapeContext {
244 fn enrich_events(&mut self, events: &mut Vec<Event>) {
245 for event in events.iter_mut() {
246 let metric = event.as_mut_metric();
247 if let Some(InstanceInfo {
248 tag,
249 instance,
250 honor_label,
251 }) = &self.instance_info
252 {
253 match (honor_label, metric.tag_value(tag)) {
254 (false, Some(old_instance)) => {
255 metric.replace_tag(format!("exported_{tag}"), old_instance);
256 metric.replace_tag(tag.clone(), instance.clone());
257 }
258 (true, Some(_)) => {}
259 (_, None) => {
260 metric.replace_tag(tag.clone(), instance.clone());
261 }
262 }
263 }
264 if let Some(EndpointInfo {
265 tag,
266 endpoint,
267 honor_label,
268 }) = &self.endpoint_info
269 {
270 match (honor_label, metric.tag_value(tag)) {
271 (false, Some(old_endpoint)) => {
272 metric.replace_tag(format!("exported_{tag}"), old_endpoint);
273 metric.replace_tag(tag.clone(), endpoint.clone());
274 }
275 (true, Some(_)) => {}
276 (_, None) => {
277 metric.replace_tag(tag.clone(), endpoint.clone());
278 }
279 }
280 }
281 }
282 }
283
284 fn on_response(&mut self, url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
286 let body = String::from_utf8_lossy(body);
287
288 match parser::parse_text(&body) {
289 Ok(events) => Some(events),
290 Err(error) => {
291 if url.path() == "/" {
292 warn!(
294 message = PARSE_ERROR_NO_PATH,
295 endpoint = %url,
296 );
297 }
298 emit!(PrometheusParseError {
299 error,
300 url: url.clone(),
301 body,
302 });
303 None
304 }
305 }
306 }
307
308 fn on_http_response_error(&self, url: &Uri, header: &Parts) {
309 if header.status == hyper::StatusCode::NOT_FOUND && url.path() == "/" {
310 warn!(
312 message = NOT_FOUND_NO_PATH,
313 endpoint = %url,
314 );
315 }
316 }
317}
318
319#[cfg(all(test, feature = "sinks-prometheus"))]
320mod test {
321 use hyper::{
322 Body, Client, Response, Server,
323 service::{make_service_fn, service_fn},
324 };
325 use similar_asserts::assert_eq;
326 use tokio::time::{Duration, sleep};
327 use warp::Filter;
328
329 use super::*;
330 use crate::{
331 Error, config,
332 http::{ParameterValue, QueryParameterValue},
333 sinks::prometheus::exporter::PrometheusExporterConfig,
334 test_util::{
335 components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
336 next_addr, start_topology, trace_init, wait_for_tcp,
337 },
338 };
339
340 #[test]
341 fn generate_config() {
342 crate::test_util::test_generate_config::<PrometheusScrapeConfig>();
343 }
344
345 #[tokio::test]
346 async fn test_prometheus_sets_headers() {
347 let in_addr = next_addr();
348
349 let dummy_endpoint = warp::path!("metrics").and(warp::header::exact("Accept", "text/plain")).map(|| {
350 r#"
351 promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
352 "#
353 });
354
355 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
356 wait_for_tcp(in_addr).await;
357
358 let config = PrometheusScrapeConfig {
359 endpoints: vec![format!("http://{}/metrics", in_addr)],
360 interval: Duration::from_secs(1),
361 timeout: default_timeout(),
362 instance_tag: Some("instance".to_string()),
363 endpoint_tag: Some("endpoint".to_string()),
364 honor_labels: true,
365 query: HashMap::new(),
366 auth: None,
367 tls: None,
368 };
369
370 let events = run_and_assert_source_compliance(
371 config,
372 Duration::from_secs(3),
373 &HTTP_PULL_SOURCE_TAGS,
374 )
375 .await;
376 assert!(!events.is_empty());
377 }
378
379 #[tokio::test]
380 async fn test_prometheus_honor_labels() {
381 let in_addr = next_addr();
382
383 let dummy_endpoint = warp::path!("metrics").map(|| {
384 r#"
385 promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
386 "#
387 });
388
389 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
390 wait_for_tcp(in_addr).await;
391
392 let config = PrometheusScrapeConfig {
393 endpoints: vec![format!("http://{}/metrics", in_addr)],
394 interval: Duration::from_secs(1),
395 timeout: default_timeout(),
396 instance_tag: Some("instance".to_string()),
397 endpoint_tag: Some("endpoint".to_string()),
398 honor_labels: true,
399 query: HashMap::new(),
400 auth: None,
401 tls: None,
402 };
403
404 let events = run_and_assert_source_compliance(
405 config,
406 Duration::from_secs(3),
407 &HTTP_PULL_SOURCE_TAGS,
408 )
409 .await;
410 assert!(!events.is_empty());
411
412 let metrics: Vec<_> = events
413 .into_iter()
414 .map(|event| event.into_metric())
415 .collect();
416
417 for metric in metrics {
418 assert_eq!(
419 metric.tag_value("instance"),
420 Some(String::from("localhost:9999"))
421 );
422 assert_eq!(
423 metric.tag_value("endpoint"),
424 Some(String::from("http://example.com"))
425 );
426 assert_eq!(metric.tag_value("exported_instance"), None,);
427 assert_eq!(metric.tag_value("exported_endpoint"), None,);
428 }
429 }
430
431 #[tokio::test]
432 async fn test_prometheus_do_not_honor_labels() {
433 let in_addr = next_addr();
434
435 let dummy_endpoint = warp::path!("metrics").map(|| {
436 r#"
437 promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
438 "#
439 });
440
441 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
442 wait_for_tcp(in_addr).await;
443
444 let config = PrometheusScrapeConfig {
445 endpoints: vec![format!("http://{}/metrics", in_addr)],
446 interval: Duration::from_secs(1),
447 timeout: default_timeout(),
448 instance_tag: Some("instance".to_string()),
449 endpoint_tag: Some("endpoint".to_string()),
450 honor_labels: false,
451 query: HashMap::new(),
452 auth: None,
453 tls: None,
454 };
455
456 let events = run_and_assert_source_compliance(
457 config,
458 Duration::from_secs(3),
459 &HTTP_PULL_SOURCE_TAGS,
460 )
461 .await;
462 assert!(!events.is_empty());
463
464 let metrics: Vec<_> = events
465 .into_iter()
466 .map(|event| event.into_metric())
467 .collect();
468
469 for metric in metrics {
470 assert_eq!(
471 metric.tag_value("instance"),
472 Some(format!("{}:{}", in_addr.ip(), in_addr.port()))
473 );
474 assert_eq!(
475 metric.tag_value("endpoint"),
476 Some(format!(
477 "http://{}:{}/metrics",
478 in_addr.ip(),
479 in_addr.port()
480 ))
481 );
482 assert_eq!(
483 metric.tag_value("exported_instance"),
484 Some(String::from("localhost:9999"))
485 );
486 assert_eq!(
487 metric.tag_value("exported_endpoint"),
488 Some(String::from("http://example.com"))
489 );
490 }
491 }
492
493 #[tokio::test]
498 async fn test_prometheus_duplicate_tags() {
499 let in_addr = next_addr();
500
501 let dummy_endpoint = warp::path!("metrics").map(|| {
502 r#"
503 metric_label{code="200",code="success"} 100 1612411516789
504 "#
505 });
506
507 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
508 wait_for_tcp(in_addr).await;
509
510 let config = PrometheusScrapeConfig {
511 endpoints: vec![format!("http://{}/metrics", in_addr)],
512 interval: Duration::from_secs(1),
513 timeout: default_timeout(),
514 instance_tag: Some("instance".to_string()),
515 endpoint_tag: Some("endpoint".to_string()),
516 honor_labels: true,
517 query: HashMap::new(),
518 auth: None,
519 tls: None,
520 };
521
522 let events = run_and_assert_source_compliance(
523 config,
524 Duration::from_secs(3),
525 &HTTP_PULL_SOURCE_TAGS,
526 )
527 .await;
528 assert!(!events.is_empty());
529
530 let metrics: Vec<vector_lib::event::Metric> = events
531 .into_iter()
532 .map(|event| event.into_metric())
533 .collect();
534 let metric = &metrics[0];
535
536 assert_eq!(metric.name(), "metric_label");
537
538 let code_tag = metric
539 .tags()
540 .unwrap()
541 .iter_all()
542 .filter(|(name, _value)| *name == "code")
543 .map(|(_name, value)| value)
544 .collect::<Vec<_>>();
545
546 assert_eq!(1, code_tag.len());
547 assert_eq!("success", code_tag[0].unwrap());
548 }
549
550 #[tokio::test]
551 async fn test_prometheus_request_query() {
552 let in_addr = next_addr();
553
554 let dummy_endpoint = warp::path!("metrics").and(warp::query::raw()).map(|query| {
555 format!(
556 r#"
557 promhttp_metric_handler_requests_total{{query="{query}"}} 100 1612411516789
558 "#
559 )
560 });
561
562 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
563 wait_for_tcp(in_addr).await;
564
565 let config = PrometheusScrapeConfig {
566 endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
567 interval: Duration::from_secs(1),
568 timeout: default_timeout(),
569 instance_tag: Some("instance".to_string()),
570 endpoint_tag: Some("endpoint".to_string()),
571 honor_labels: false,
572 query: HashMap::from([
573 (
574 "key1".to_string(),
575 QueryParameterValue::MultiParams(vec![ParameterValue::String(
576 "val2".to_string(),
577 )]),
578 ),
579 (
580 "key2".to_string(),
581 QueryParameterValue::MultiParams(vec![
582 ParameterValue::String("val1".to_string()),
583 ParameterValue::String("val2".to_string()),
584 ]),
585 ),
586 ]),
587 auth: None,
588 tls: None,
589 };
590
591 let events = run_and_assert_source_compliance(
592 config,
593 Duration::from_secs(3),
594 &HTTP_PULL_SOURCE_TAGS,
595 )
596 .await;
597 assert!(!events.is_empty());
598
599 let metrics: Vec<_> = events
600 .into_iter()
601 .map(|event| event.into_metric())
602 .collect();
603
604 let expected = HashMap::from([
605 (
606 "key1".to_string(),
607 vec!["val1".to_string(), "val2".to_string()],
608 ),
609 (
610 "key2".to_string(),
611 vec!["val1".to_string(), "val2".to_string()],
612 ),
613 ]);
614
615 for metric in metrics {
616 let query = metric.tag_value("query").expect("query must be tagged");
617 let mut got: HashMap<String, Vec<String>> = HashMap::new();
618 for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
619 got.entry(k.to_string()).or_default().push(v.to_string());
620 }
621 for v in got.values_mut() {
622 v.sort();
623 }
624 assert_eq!(got, expected);
625 }
626 }
627
628 #[tokio::test]
631 async fn test_prometheus_routing() {
632 trace_init();
633 let in_addr = next_addr();
634 let out_addr = next_addr();
635
636 let make_svc = make_service_fn(|_| async {
637 Ok::<_, Error>(service_fn(|_| async {
638 Ok::<_, Error>(Response::new(Body::from(
639 r#"
640 # HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
641 # TYPE promhttp_metric_handler_requests_total counter
642 promhttp_metric_handler_requests_total{code="200"} 100 1612411516789
643 promhttp_metric_handler_requests_total{code="404"} 7 1612411516789
644 prometheus_remote_storage_samples_in_total 57011636 1612411516789
645 # A histogram, which has a pretty complex representation in the text format:
646 # HELP http_request_duration_seconds A histogram of the request duration.
647 # TYPE http_request_duration_seconds histogram
648 http_request_duration_seconds_bucket{le="0.05"} 24054 1612411516789
649 http_request_duration_seconds_bucket{le="0.1"} 33444 1612411516789
650 http_request_duration_seconds_bucket{le="0.2"} 100392 1612411516789
651 http_request_duration_seconds_bucket{le="0.5"} 129389 1612411516789
652 http_request_duration_seconds_bucket{le="1"} 133988 1612411516789
653 http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411516789
654 http_request_duration_seconds_sum 53423 1612411516789
655 http_request_duration_seconds_count 144320 1612411516789
656 # Finally a summary, which has a complex representation, too:
657 # HELP rpc_duration_seconds A summary of the RPC duration in seconds.
658 # TYPE rpc_duration_seconds summary
659 rpc_duration_seconds{code="200",quantile="0.01"} 3102 1612411516789
660 rpc_duration_seconds{code="200",quantile="0.05"} 3272 1612411516789
661 rpc_duration_seconds{code="200",quantile="0.5"} 4773 1612411516789
662 rpc_duration_seconds{code="200",quantile="0.9"} 9001 1612411516789
663 rpc_duration_seconds{code="200",quantile="0.99"} 76656 1612411516789
664 rpc_duration_seconds_sum{code="200"} 1.7560473e+07 1612411516789
665 rpc_duration_seconds_count{code="200"} 2693 1612411516789
666 "#,
667 )))
668 }))
669 });
670
671 tokio::spawn(async move {
672 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
673 error!(message = "Server error.", %error);
674 }
675 });
676 wait_for_tcp(in_addr).await;
677
678 let mut config = config::Config::builder();
679 config.add_source(
680 "in",
681 PrometheusScrapeConfig {
682 endpoints: vec![format!("http://{}", in_addr)],
683 instance_tag: None,
684 endpoint_tag: None,
685 honor_labels: false,
686 query: HashMap::new(),
687 interval: Duration::from_secs(1),
688 timeout: default_timeout(),
689 tls: None,
690 auth: None,
691 },
692 );
693 config.add_sink(
694 "out",
695 &["in"],
696 PrometheusExporterConfig {
697 address: out_addr,
698 auth: None,
699 tls: None,
700 default_namespace: Some("vector".into()),
701 buckets: vec![1.0, 2.0, 4.0],
702 quantiles: vec![],
703 distributions_as_summaries: false,
704 flush_period_secs: Duration::from_secs(3),
705 suppress_timestamp: false,
706 acknowledgements: Default::default(),
707 },
708 );
709
710 let (topology, _) = start_topology(config.build().unwrap(), false).await;
711 sleep(Duration::from_secs(1)).await;
712
713 let response = Client::new()
714 .get(format!("http://{out_addr}/metrics").parse().unwrap())
715 .await
716 .unwrap();
717
718 assert!(response.status().is_success());
719 let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
720 let lines = std::str::from_utf8(&body)
721 .unwrap()
722 .lines()
723 .collect::<Vec<_>>();
724
725 assert_eq!(
726 lines,
727 vec![
728 "# HELP vector_http_request_duration_seconds http_request_duration_seconds",
729 "# TYPE vector_http_request_duration_seconds histogram",
730 "vector_http_request_duration_seconds_bucket{le=\"0.05\"} 24054 1612411516789",
731 "vector_http_request_duration_seconds_bucket{le=\"0.1\"} 33444 1612411516789",
732 "vector_http_request_duration_seconds_bucket{le=\"0.2\"} 100392 1612411516789",
733 "vector_http_request_duration_seconds_bucket{le=\"0.5\"} 129389 1612411516789",
734 "vector_http_request_duration_seconds_bucket{le=\"1\"} 133988 1612411516789",
735 "vector_http_request_duration_seconds_bucket{le=\"+Inf\"} 144320 1612411516789",
736 "vector_http_request_duration_seconds_sum 53423 1612411516789",
737 "vector_http_request_duration_seconds_count 144320 1612411516789",
738 "# HELP vector_prometheus_remote_storage_samples_in_total prometheus_remote_storage_samples_in_total",
739 "# TYPE vector_prometheus_remote_storage_samples_in_total gauge",
740 "vector_prometheus_remote_storage_samples_in_total 57011636 1612411516789",
741 "# HELP vector_promhttp_metric_handler_requests_total promhttp_metric_handler_requests_total",
742 "# TYPE vector_promhttp_metric_handler_requests_total counter",
743 "vector_promhttp_metric_handler_requests_total{code=\"200\"} 100 1612411516789",
744 "vector_promhttp_metric_handler_requests_total{code=\"404\"} 7 1612411516789",
745 "# HELP vector_rpc_duration_seconds rpc_duration_seconds",
746 "# TYPE vector_rpc_duration_seconds summary",
747 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.01\"} 3102 1612411516789",
748 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.05\"} 3272 1612411516789",
749 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.5\"} 4773 1612411516789",
750 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.9\"} 9001 1612411516789",
751 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.99\"} 76656 1612411516789",
752 "vector_rpc_duration_seconds_sum{code=\"200\"} 17560473 1612411516789",
753 "vector_rpc_duration_seconds_count{code=\"200\"} 2693 1612411516789",
754 ],
755 );
756
757 topology.stop().await;
758 }
759}
760
761#[cfg(all(test, feature = "prometheus-integration-tests"))]
762mod integration_tests {
763 use tokio::time::Duration;
764
765 use super::*;
766 use crate::{
767 event::{MetricKind, MetricValue},
768 test_util::components::{HTTP_PULL_SOURCE_TAGS, run_and_assert_source_compliance},
769 };
770
771 #[tokio::test]
772 async fn scrapes_metrics() {
773 let config = PrometheusScrapeConfig {
774 endpoints: vec!["http://prometheus:9090/metrics".into()],
775 interval: Duration::from_secs(1),
776 timeout: Duration::from_secs(1),
777 instance_tag: Some("instance".to_string()),
778 endpoint_tag: Some("endpoint".to_string()),
779 honor_labels: false,
780 query: HashMap::new(),
781 auth: None,
782 tls: None,
783 };
784
785 let events = run_and_assert_source_compliance(
786 config,
787 Duration::from_secs(3),
788 &HTTP_PULL_SOURCE_TAGS,
789 )
790 .await;
791 assert!(!events.is_empty());
792
793 let metrics: Vec<_> = events
794 .into_iter()
795 .map(|event| event.into_metric())
796 .collect();
797
798 let find_metric = |name: &str| {
799 metrics
800 .iter()
801 .find(|metric| metric.name() == name)
802 .unwrap_or_else(|| panic!("Missing metric {name:?}"))
803 };
804
805 let build = find_metric("prometheus_build_info");
807 assert!(matches!(build.kind(), MetricKind::Absolute));
808 assert!(matches!(build.value(), &MetricValue::Gauge { .. }));
809 assert!(build.tags().unwrap().contains_key("branch"));
810 assert!(build.tags().unwrap().contains_key("version"));
811 assert_eq!(
812 build.tag_value("instance"),
813 Some("prometheus:9090".to_string())
814 );
815 assert_eq!(
816 build.tag_value("endpoint"),
817 Some("http://prometheus:9090/metrics".to_string())
818 );
819
820 let queries = find_metric("prometheus_engine_queries");
821 assert!(matches!(queries.kind(), MetricKind::Absolute));
822 assert!(matches!(queries.value(), &MetricValue::Gauge { .. }));
823 assert_eq!(
824 queries.tag_value("instance"),
825 Some("prometheus:9090".to_string())
826 );
827 assert_eq!(
828 queries.tag_value("endpoint"),
829 Some("http://prometheus:9090/metrics".to_string())
830 );
831
832 let go_info = find_metric("go_info");
833 assert!(matches!(go_info.kind(), MetricKind::Absolute));
834 assert!(matches!(go_info.value(), &MetricValue::Gauge { .. }));
835 assert!(go_info.tags().unwrap().contains_key("version"));
836 assert_eq!(
837 go_info.tag_value("instance"),
838 Some("prometheus:9090".to_string())
839 );
840 assert_eq!(
841 go_info.tag_value("endpoint"),
842 Some("http://prometheus:9090/metrics".to_string())
843 );
844 }
845}