1use std::collections::HashMap;
2use std::time::Duration;
3
4use bytes::Bytes;
5use futures_util::FutureExt;
6use http::{response::Parts, Uri};
7use serde_with::serde_as;
8use snafu::ResultExt;
9use vector_lib::configurable::configurable_component;
10use vector_lib::{config::LogNamespace, event::Event};
11
12use super::parser;
13use crate::http::QueryParameters;
14use crate::sources::util::http::HttpMethod;
15use crate::sources::util::http_client::{default_timeout, warn_if_interval_too_low};
16use crate::{
17 config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
18 http::Auth,
19 internal_events::PrometheusParseError,
20 sources::{
21 self,
22 util::http_client::{
23 build_url, call, default_interval, GenericHttpClientInputs, HttpClientBuilder,
24 HttpClientContext,
25 },
26 },
27 tls::{TlsConfig, TlsSettings},
28 Result,
29};
30
31static PARSE_ERROR_NO_PATH: &str = "No path is set on the endpoint and we got a parse error,\
34 did you mean to use /metrics? This behavior changed in version 0.11.";
35static NOT_FOUND_NO_PATH: &str = "No path is set on the endpoint and we got a 404,\
36 did you mean to use /metrics?\
37 This behavior changed in version 0.11.";
38
39#[serde_as]
41#[configurable_component(source(
42 "prometheus_scrape",
43 "Collect metrics from Prometheus exporters."
44))]
45#[derive(Clone, Debug)]
46pub struct PrometheusScrapeConfig {
47 #[configurable(metadata(docs::examples = "http://localhost:9090/metrics"))]
49 #[serde(alias = "hosts")]
50 endpoints: Vec<String>,
51
52 #[serde(default = "default_interval")]
56 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
57 #[serde(rename = "scrape_interval_secs")]
58 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
59 interval: Duration,
60
61 #[serde(default = "default_timeout")]
63 #[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
64 #[serde(rename = "scrape_timeout_secs")]
65 #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
66 timeout: Duration,
67
68 #[configurable(metadata(docs::advanced))]
72 instance_tag: Option<String>,
73
74 #[configurable(metadata(docs::advanced))]
78 endpoint_tag: Option<String>,
79
80 #[serde(default = "crate::serde::default_false")]
87 #[configurable(metadata(docs::advanced))]
88 honor_labels: bool,
89
90 #[serde(default)]
96 #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
97 #[configurable(metadata(docs::examples = "query_example()"))]
98 query: QueryParameters,
99
100 #[configurable(derived)]
101 tls: Option<TlsConfig>,
102
103 #[configurable(derived)]
104 #[configurable(metadata(docs::advanced))]
105 auth: Option<Auth>,
106}
107
108fn query_example() -> serde_json::Value {
109 serde_json::json! ({
110 "match[]": [
111 "{job=\"somejob\"}",
112 "{__name__=~\"job:.*\"}"
113 ]
114 })
115}
116
117impl GenerateConfig for PrometheusScrapeConfig {
118 fn generate_config() -> toml::Value {
119 toml::Value::try_from(Self {
120 endpoints: vec!["http://localhost:9090/metrics".to_string()],
121 interval: default_interval(),
122 timeout: default_timeout(),
123 instance_tag: Some("instance".to_string()),
124 endpoint_tag: Some("endpoint".to_string()),
125 honor_labels: false,
126 query: HashMap::new(),
127 tls: None,
128 auth: None,
129 })
130 .unwrap()
131 }
132}
133
134#[async_trait::async_trait]
135#[typetag::serde(name = "prometheus_scrape")]
136impl SourceConfig for PrometheusScrapeConfig {
137 async fn build(&self, cx: SourceContext) -> Result<sources::Source> {
138 let urls = self
139 .endpoints
140 .iter()
141 .map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
142 .map(|r| r.map(|uri| build_url(&uri, &self.query)))
143 .collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
144 let tls = TlsSettings::from_options(self.tls.as_ref())?;
145
146 let builder = PrometheusScrapeBuilder {
147 honor_labels: self.honor_labels,
148 instance_tag: self.instance_tag.clone(),
149 endpoint_tag: self.endpoint_tag.clone(),
150 };
151
152 warn_if_interval_too_low(self.timeout, self.interval);
153
154 let inputs = GenericHttpClientInputs {
155 urls,
156 interval: self.interval,
157 timeout: self.timeout,
158 headers: HashMap::new(),
159 content_type: "text/plain".to_string(),
160 auth: self.auth.clone(),
161 tls,
162 proxy: cx.proxy.clone(),
163 shutdown: cx.shutdown,
164 };
165
166 Ok(call(inputs, builder, cx.out, HttpMethod::Get).boxed())
167 }
168
169 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
170 vec![SourceOutput::new_metrics()]
171 }
172
173 fn can_acknowledge(&self) -> bool {
174 false
175 }
176}
177
178#[derive(Clone)]
182struct InstanceInfo {
183 tag: String,
184 instance: String,
185 honor_label: bool,
186}
187
188#[derive(Clone)]
192struct EndpointInfo {
193 tag: String,
194 endpoint: String,
195 honor_label: bool,
196}
197
198#[derive(Clone)]
200struct PrometheusScrapeBuilder {
201 honor_labels: bool,
202 instance_tag: Option<String>,
203 endpoint_tag: Option<String>,
204}
205
206impl HttpClientBuilder for PrometheusScrapeBuilder {
207 type Context = PrometheusScrapeContext;
208
209 fn build(&self, url: &Uri) -> Self::Context {
211 let instance_info = self.instance_tag.as_ref().map(|tag| {
212 let instance = format!(
213 "{}:{}",
214 url.host().unwrap_or_default(),
215 url.port_u16().unwrap_or_else(|| match url.scheme() {
216 Some(scheme) if scheme == &http::uri::Scheme::HTTP => 80,
217 Some(scheme) if scheme == &http::uri::Scheme::HTTPS => 443,
218 _ => 0,
219 })
220 );
221 InstanceInfo {
222 tag: tag.to_string(),
223 instance,
224 honor_label: self.honor_labels,
225 }
226 });
227 let endpoint_info = self.endpoint_tag.as_ref().map(|tag| EndpointInfo {
228 tag: tag.to_string(),
229 endpoint: url.to_string(),
230 honor_label: self.honor_labels,
231 });
232 PrometheusScrapeContext {
233 instance_info,
234 endpoint_info,
235 }
236 }
237}
238
239struct PrometheusScrapeContext {
241 instance_info: Option<InstanceInfo>,
242 endpoint_info: Option<EndpointInfo>,
243}
244
245impl HttpClientContext for PrometheusScrapeContext {
246 fn enrich_events(&mut self, events: &mut Vec<Event>) {
247 for event in events.iter_mut() {
248 let metric = event.as_mut_metric();
249 if let Some(InstanceInfo {
250 tag,
251 instance,
252 honor_label,
253 }) = &self.instance_info
254 {
255 match (honor_label, metric.tag_value(tag)) {
256 (false, Some(old_instance)) => {
257 metric.replace_tag(format!("exported_{tag}"), old_instance);
258 metric.replace_tag(tag.clone(), instance.clone());
259 }
260 (true, Some(_)) => {}
261 (_, None) => {
262 metric.replace_tag(tag.clone(), instance.clone());
263 }
264 }
265 }
266 if let Some(EndpointInfo {
267 tag,
268 endpoint,
269 honor_label,
270 }) = &self.endpoint_info
271 {
272 match (honor_label, metric.tag_value(tag)) {
273 (false, Some(old_endpoint)) => {
274 metric.replace_tag(format!("exported_{tag}"), old_endpoint);
275 metric.replace_tag(tag.clone(), endpoint.clone());
276 }
277 (true, Some(_)) => {}
278 (_, None) => {
279 metric.replace_tag(tag.clone(), endpoint.clone());
280 }
281 }
282 }
283 }
284 }
285
286 fn on_response(&mut self, url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
288 let body = String::from_utf8_lossy(body);
289
290 match parser::parse_text(&body) {
291 Ok(events) => Some(events),
292 Err(error) => {
293 if url.path() == "/" {
294 warn!(
296 message = PARSE_ERROR_NO_PATH,
297 endpoint = %url,
298 );
299 }
300 emit!(PrometheusParseError {
301 error,
302 url: url.clone(),
303 body,
304 });
305 None
306 }
307 }
308 }
309
310 fn on_http_response_error(&self, url: &Uri, header: &Parts) {
311 if header.status == hyper::StatusCode::NOT_FOUND && url.path() == "/" {
312 warn!(
314 message = NOT_FOUND_NO_PATH,
315 endpoint = %url,
316 );
317 }
318 }
319}
320
321#[cfg(all(test, feature = "sinks-prometheus"))]
322mod test {
323 use hyper::{
324 service::{make_service_fn, service_fn},
325 Body, Client, Response, Server,
326 };
327 use similar_asserts::assert_eq;
328 use tokio::time::{sleep, Duration};
329 use warp::Filter;
330
331 use super::*;
332 use crate::{
333 config,
334 http::{ParameterValue, QueryParameterValue},
335 sinks::prometheus::exporter::PrometheusExporterConfig,
336 test_util::{
337 components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS},
338 next_addr, start_topology, trace_init, wait_for_tcp,
339 },
340 Error,
341 };
342
343 #[test]
344 fn generate_config() {
345 crate::test_util::test_generate_config::<PrometheusScrapeConfig>();
346 }
347
348 #[tokio::test]
349 async fn test_prometheus_sets_headers() {
350 let in_addr = next_addr();
351
352 let dummy_endpoint = warp::path!("metrics").and(warp::header::exact("Accept", "text/plain")).map(|| {
353 r#"
354 promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
355 "#
356 });
357
358 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
359 wait_for_tcp(in_addr).await;
360
361 let config = PrometheusScrapeConfig {
362 endpoints: vec![format!("http://{}/metrics", in_addr)],
363 interval: Duration::from_secs(1),
364 timeout: default_timeout(),
365 instance_tag: Some("instance".to_string()),
366 endpoint_tag: Some("endpoint".to_string()),
367 honor_labels: true,
368 query: HashMap::new(),
369 auth: None,
370 tls: None,
371 };
372
373 let events = run_and_assert_source_compliance(
374 config,
375 Duration::from_secs(3),
376 &HTTP_PULL_SOURCE_TAGS,
377 )
378 .await;
379 assert!(!events.is_empty());
380 }
381
382 #[tokio::test]
383 async fn test_prometheus_honor_labels() {
384 let in_addr = next_addr();
385
386 let dummy_endpoint = warp::path!("metrics").map(|| {
387 r#"
388 promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
389 "#
390 });
391
392 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
393 wait_for_tcp(in_addr).await;
394
395 let config = PrometheusScrapeConfig {
396 endpoints: vec![format!("http://{}/metrics", in_addr)],
397 interval: Duration::from_secs(1),
398 timeout: default_timeout(),
399 instance_tag: Some("instance".to_string()),
400 endpoint_tag: Some("endpoint".to_string()),
401 honor_labels: true,
402 query: HashMap::new(),
403 auth: None,
404 tls: None,
405 };
406
407 let events = run_and_assert_source_compliance(
408 config,
409 Duration::from_secs(3),
410 &HTTP_PULL_SOURCE_TAGS,
411 )
412 .await;
413 assert!(!events.is_empty());
414
415 let metrics: Vec<_> = events
416 .into_iter()
417 .map(|event| event.into_metric())
418 .collect();
419
420 for metric in metrics {
421 assert_eq!(
422 metric.tag_value("instance"),
423 Some(String::from("localhost:9999"))
424 );
425 assert_eq!(
426 metric.tag_value("endpoint"),
427 Some(String::from("http://example.com"))
428 );
429 assert_eq!(metric.tag_value("exported_instance"), None,);
430 assert_eq!(metric.tag_value("exported_endpoint"), None,);
431 }
432 }
433
434 #[tokio::test]
435 async fn test_prometheus_do_not_honor_labels() {
436 let in_addr = next_addr();
437
438 let dummy_endpoint = warp::path!("metrics").map(|| {
439 r#"
440 promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
441 "#
442 });
443
444 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
445 wait_for_tcp(in_addr).await;
446
447 let config = PrometheusScrapeConfig {
448 endpoints: vec![format!("http://{}/metrics", in_addr)],
449 interval: Duration::from_secs(1),
450 timeout: default_timeout(),
451 instance_tag: Some("instance".to_string()),
452 endpoint_tag: Some("endpoint".to_string()),
453 honor_labels: false,
454 query: HashMap::new(),
455 auth: None,
456 tls: None,
457 };
458
459 let events = run_and_assert_source_compliance(
460 config,
461 Duration::from_secs(3),
462 &HTTP_PULL_SOURCE_TAGS,
463 )
464 .await;
465 assert!(!events.is_empty());
466
467 let metrics: Vec<_> = events
468 .into_iter()
469 .map(|event| event.into_metric())
470 .collect();
471
472 for metric in metrics {
473 assert_eq!(
474 metric.tag_value("instance"),
475 Some(format!("{}:{}", in_addr.ip(), in_addr.port()))
476 );
477 assert_eq!(
478 metric.tag_value("endpoint"),
479 Some(format!(
480 "http://{}:{}/metrics",
481 in_addr.ip(),
482 in_addr.port()
483 ))
484 );
485 assert_eq!(
486 metric.tag_value("exported_instance"),
487 Some(String::from("localhost:9999"))
488 );
489 assert_eq!(
490 metric.tag_value("exported_endpoint"),
491 Some(String::from("http://example.com"))
492 );
493 }
494 }
495
496 #[tokio::test]
501 async fn test_prometheus_duplicate_tags() {
502 let in_addr = next_addr();
503
504 let dummy_endpoint = warp::path!("metrics").map(|| {
505 r#"
506 metric_label{code="200",code="success"} 100 1612411516789
507 "#
508 });
509
510 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
511 wait_for_tcp(in_addr).await;
512
513 let config = PrometheusScrapeConfig {
514 endpoints: vec![format!("http://{}/metrics", in_addr)],
515 interval: Duration::from_secs(1),
516 timeout: default_timeout(),
517 instance_tag: Some("instance".to_string()),
518 endpoint_tag: Some("endpoint".to_string()),
519 honor_labels: true,
520 query: HashMap::new(),
521 auth: None,
522 tls: None,
523 };
524
525 let events = run_and_assert_source_compliance(
526 config,
527 Duration::from_secs(3),
528 &HTTP_PULL_SOURCE_TAGS,
529 )
530 .await;
531 assert!(!events.is_empty());
532
533 let metrics: Vec<vector_lib::event::Metric> = events
534 .into_iter()
535 .map(|event| event.into_metric())
536 .collect();
537 let metric = &metrics[0];
538
539 assert_eq!(metric.name(), "metric_label");
540
541 let code_tag = metric
542 .tags()
543 .unwrap()
544 .iter_all()
545 .filter(|(name, _value)| *name == "code")
546 .map(|(_name, value)| value)
547 .collect::<Vec<_>>();
548
549 assert_eq!(1, code_tag.len());
550 assert_eq!("success", code_tag[0].unwrap());
551 }
552
553 #[tokio::test]
554 async fn test_prometheus_request_query() {
555 let in_addr = next_addr();
556
557 let dummy_endpoint = warp::path!("metrics").and(warp::query::raw()).map(|query| {
558 format!(
559 r#"
560 promhttp_metric_handler_requests_total{{query="{query}"}} 100 1612411516789
561 "#
562 )
563 });
564
565 tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
566 wait_for_tcp(in_addr).await;
567
568 let config = PrometheusScrapeConfig {
569 endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
570 interval: Duration::from_secs(1),
571 timeout: default_timeout(),
572 instance_tag: Some("instance".to_string()),
573 endpoint_tag: Some("endpoint".to_string()),
574 honor_labels: false,
575 query: HashMap::from([
576 (
577 "key1".to_string(),
578 QueryParameterValue::MultiParams(vec![ParameterValue::String(
579 "val2".to_string(),
580 )]),
581 ),
582 (
583 "key2".to_string(),
584 QueryParameterValue::MultiParams(vec![
585 ParameterValue::String("val1".to_string()),
586 ParameterValue::String("val2".to_string()),
587 ]),
588 ),
589 ]),
590 auth: None,
591 tls: None,
592 };
593
594 let events = run_and_assert_source_compliance(
595 config,
596 Duration::from_secs(3),
597 &HTTP_PULL_SOURCE_TAGS,
598 )
599 .await;
600 assert!(!events.is_empty());
601
602 let metrics: Vec<_> = events
603 .into_iter()
604 .map(|event| event.into_metric())
605 .collect();
606
607 let expected = HashMap::from([
608 (
609 "key1".to_string(),
610 vec!["val1".to_string(), "val2".to_string()],
611 ),
612 (
613 "key2".to_string(),
614 vec!["val1".to_string(), "val2".to_string()],
615 ),
616 ]);
617
618 for metric in metrics {
619 let query = metric.tag_value("query").expect("query must be tagged");
620 let mut got: HashMap<String, Vec<String>> = HashMap::new();
621 for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
622 got.entry(k.to_string()).or_default().push(v.to_string());
623 }
624 for v in got.values_mut() {
625 v.sort();
626 }
627 assert_eq!(got, expected);
628 }
629 }
630
631 #[tokio::test]
634 async fn test_prometheus_routing() {
635 trace_init();
636 let in_addr = next_addr();
637 let out_addr = next_addr();
638
639 let make_svc = make_service_fn(|_| async {
640 Ok::<_, Error>(service_fn(|_| async {
641 Ok::<_, Error>(Response::new(Body::from(
642 r#"
643 # HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
644 # TYPE promhttp_metric_handler_requests_total counter
645 promhttp_metric_handler_requests_total{code="200"} 100 1612411516789
646 promhttp_metric_handler_requests_total{code="404"} 7 1612411516789
647 prometheus_remote_storage_samples_in_total 57011636 1612411516789
648 # A histogram, which has a pretty complex representation in the text format:
649 # HELP http_request_duration_seconds A histogram of the request duration.
650 # TYPE http_request_duration_seconds histogram
651 http_request_duration_seconds_bucket{le="0.05"} 24054 1612411516789
652 http_request_duration_seconds_bucket{le="0.1"} 33444 1612411516789
653 http_request_duration_seconds_bucket{le="0.2"} 100392 1612411516789
654 http_request_duration_seconds_bucket{le="0.5"} 129389 1612411516789
655 http_request_duration_seconds_bucket{le="1"} 133988 1612411516789
656 http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411516789
657 http_request_duration_seconds_sum 53423 1612411516789
658 http_request_duration_seconds_count 144320 1612411516789
659 # Finally a summary, which has a complex representation, too:
660 # HELP rpc_duration_seconds A summary of the RPC duration in seconds.
661 # TYPE rpc_duration_seconds summary
662 rpc_duration_seconds{code="200",quantile="0.01"} 3102 1612411516789
663 rpc_duration_seconds{code="200",quantile="0.05"} 3272 1612411516789
664 rpc_duration_seconds{code="200",quantile="0.5"} 4773 1612411516789
665 rpc_duration_seconds{code="200",quantile="0.9"} 9001 1612411516789
666 rpc_duration_seconds{code="200",quantile="0.99"} 76656 1612411516789
667 rpc_duration_seconds_sum{code="200"} 1.7560473e+07 1612411516789
668 rpc_duration_seconds_count{code="200"} 2693 1612411516789
669 "#,
670 )))
671 }))
672 });
673
674 tokio::spawn(async move {
675 if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
676 error!(message = "Server error.", %error);
677 }
678 });
679 wait_for_tcp(in_addr).await;
680
681 let mut config = config::Config::builder();
682 config.add_source(
683 "in",
684 PrometheusScrapeConfig {
685 endpoints: vec![format!("http://{}", in_addr)],
686 instance_tag: None,
687 endpoint_tag: None,
688 honor_labels: false,
689 query: HashMap::new(),
690 interval: Duration::from_secs(1),
691 timeout: default_timeout(),
692 tls: None,
693 auth: None,
694 },
695 );
696 config.add_sink(
697 "out",
698 &["in"],
699 PrometheusExporterConfig {
700 address: out_addr,
701 auth: None,
702 tls: None,
703 default_namespace: Some("vector".into()),
704 buckets: vec![1.0, 2.0, 4.0],
705 quantiles: vec![],
706 distributions_as_summaries: false,
707 flush_period_secs: Duration::from_secs(3),
708 suppress_timestamp: false,
709 acknowledgements: Default::default(),
710 },
711 );
712
713 let (topology, _) = start_topology(config.build().unwrap(), false).await;
714 sleep(Duration::from_secs(1)).await;
715
716 let response = Client::new()
717 .get(format!("http://{out_addr}/metrics").parse().unwrap())
718 .await
719 .unwrap();
720
721 assert!(response.status().is_success());
722 let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
723 let lines = std::str::from_utf8(&body)
724 .unwrap()
725 .lines()
726 .collect::<Vec<_>>();
727
728 assert_eq!(lines, vec![
729 "# HELP vector_http_request_duration_seconds http_request_duration_seconds",
730 "# TYPE vector_http_request_duration_seconds histogram",
731 "vector_http_request_duration_seconds_bucket{le=\"0.05\"} 24054 1612411516789",
732 "vector_http_request_duration_seconds_bucket{le=\"0.1\"} 33444 1612411516789",
733 "vector_http_request_duration_seconds_bucket{le=\"0.2\"} 100392 1612411516789",
734 "vector_http_request_duration_seconds_bucket{le=\"0.5\"} 129389 1612411516789",
735 "vector_http_request_duration_seconds_bucket{le=\"1\"} 133988 1612411516789",
736 "vector_http_request_duration_seconds_bucket{le=\"+Inf\"} 144320 1612411516789",
737 "vector_http_request_duration_seconds_sum 53423 1612411516789",
738 "vector_http_request_duration_seconds_count 144320 1612411516789",
739 "# HELP vector_prometheus_remote_storage_samples_in_total prometheus_remote_storage_samples_in_total",
740 "# TYPE vector_prometheus_remote_storage_samples_in_total gauge",
741 "vector_prometheus_remote_storage_samples_in_total 57011636 1612411516789",
742 "# HELP vector_promhttp_metric_handler_requests_total promhttp_metric_handler_requests_total",
743 "# TYPE vector_promhttp_metric_handler_requests_total counter",
744 "vector_promhttp_metric_handler_requests_total{code=\"200\"} 100 1612411516789",
745 "vector_promhttp_metric_handler_requests_total{code=\"404\"} 7 1612411516789",
746 "# HELP vector_rpc_duration_seconds rpc_duration_seconds",
747 "# TYPE vector_rpc_duration_seconds summary",
748 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.01\"} 3102 1612411516789",
749 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.05\"} 3272 1612411516789",
750 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.5\"} 4773 1612411516789",
751 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.9\"} 9001 1612411516789",
752 "vector_rpc_duration_seconds{code=\"200\",quantile=\"0.99\"} 76656 1612411516789",
753 "vector_rpc_duration_seconds_sum{code=\"200\"} 17560473 1612411516789",
754 "vector_rpc_duration_seconds_count{code=\"200\"} 2693 1612411516789",
755 ],
756 );
757
758 topology.stop().await;
759 }
760}
761
762#[cfg(all(test, feature = "prometheus-integration-tests"))]
763mod integration_tests {
764 use tokio::time::Duration;
765
766 use super::*;
767 use crate::{
768 event::{MetricKind, MetricValue},
769 test_util::components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS},
770 };
771
772 #[tokio::test]
773 async fn scrapes_metrics() {
774 let config = PrometheusScrapeConfig {
775 endpoints: vec!["http://prometheus:9090/metrics".into()],
776 interval: Duration::from_secs(1),
777 timeout: Duration::from_secs(1),
778 instance_tag: Some("instance".to_string()),
779 endpoint_tag: Some("endpoint".to_string()),
780 honor_labels: false,
781 query: HashMap::new(),
782 auth: None,
783 tls: None,
784 };
785
786 let events = run_and_assert_source_compliance(
787 config,
788 Duration::from_secs(3),
789 &HTTP_PULL_SOURCE_TAGS,
790 )
791 .await;
792 assert!(!events.is_empty());
793
794 let metrics: Vec<_> = events
795 .into_iter()
796 .map(|event| event.into_metric())
797 .collect();
798
799 let find_metric = |name: &str| {
800 metrics
801 .iter()
802 .find(|metric| metric.name() == name)
803 .unwrap_or_else(|| panic!("Missing metric {name:?}"))
804 };
805
806 let build = find_metric("prometheus_build_info");
808 assert!(matches!(build.kind(), MetricKind::Absolute));
809 assert!(matches!(build.value(), &MetricValue::Gauge { .. }));
810 assert!(build.tags().unwrap().contains_key("branch"));
811 assert!(build.tags().unwrap().contains_key("version"));
812 assert_eq!(
813 build.tag_value("instance"),
814 Some("prometheus:9090".to_string())
815 );
816 assert_eq!(
817 build.tag_value("endpoint"),
818 Some("http://prometheus:9090/metrics".to_string())
819 );
820
821 let queries = find_metric("prometheus_engine_queries");
822 assert!(matches!(queries.kind(), MetricKind::Absolute));
823 assert!(matches!(queries.value(), &MetricValue::Gauge { .. }));
824 assert_eq!(
825 queries.tag_value("instance"),
826 Some("prometheus:9090".to_string())
827 );
828 assert_eq!(
829 queries.tag_value("endpoint"),
830 Some("http://prometheus:9090/metrics".to_string())
831 );
832
833 let go_info = find_metric("go_info");
834 assert!(matches!(go_info.kind(), MetricKind::Absolute));
835 assert!(matches!(go_info.value(), &MetricValue::Gauge { .. }));
836 assert!(go_info.tags().unwrap().contains_key("version"));
837 assert_eq!(
838 go_info.tag_value("instance"),
839 Some("prometheus:9090".to_string())
840 );
841 assert_eq!(
842 go_info.tag_value("endpoint"),
843 Some("http://prometheus:9090/metrics".to_string())
844 );
845 }
846}