use std::collections::HashMap;
use std::time::Duration;
use bytes::Bytes;
use futures_util::FutureExt;
use http::{response::Parts, Uri};
use serde_with::serde_as;
use snafu::ResultExt;
use vector_lib::configurable::configurable_component;
use vector_lib::{config::LogNamespace, event::Event};
use super::parser;
use crate::sources::util::http::HttpMethod;
use crate::sources::util::http_client::{default_timeout, warn_if_interval_too_low};
use crate::{
config::{GenerateConfig, SourceConfig, SourceContext, SourceOutput},
http::Auth,
internal_events::PrometheusParseError,
sources::{
self,
util::http_client::{
build_url, call, default_interval, GenericHttpClientInputs, HttpClientBuilder,
HttpClientContext,
},
},
tls::{TlsConfig, TlsSettings},
Result,
};
static PARSE_ERROR_NO_PATH: &str = "No path is set on the endpoint and we got a parse error,\
did you mean to use /metrics? This behavior changed in version 0.11.";
static NOT_FOUND_NO_PATH: &str = "No path is set on the endpoint and we got a 404,\
did you mean to use /metrics?\
This behavior changed in version 0.11.";
#[serde_as]
#[configurable_component(source(
"prometheus_scrape",
"Collect metrics from Prometheus exporters."
))]
#[derive(Clone, Debug)]
pub struct PrometheusScrapeConfig {
#[configurable(metadata(docs::examples = "http://localhost:9090/metrics"))]
#[serde(alias = "hosts")]
endpoints: Vec<String>,
#[serde(default = "default_interval")]
#[serde_as(as = "serde_with::DurationSeconds<u64>")]
#[serde(rename = "scrape_interval_secs")]
#[configurable(metadata(docs::human_name = "Scrape Interval"))]
interval: Duration,
#[serde(default = "default_timeout")]
#[serde_as(as = "serde_with:: DurationSecondsWithFrac<f64>")]
#[serde(rename = "scrape_timeout_secs")]
#[configurable(metadata(docs::human_name = "Scrape Timeout"))]
timeout: Duration,
#[configurable(metadata(docs::advanced))]
instance_tag: Option<String>,
#[configurable(metadata(docs::advanced))]
endpoint_tag: Option<String>,
#[serde(default = "crate::serde::default_false")]
#[configurable(metadata(docs::advanced))]
honor_labels: bool,
#[serde(default)]
#[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
#[configurable(metadata(docs::examples = "query_example()"))]
query: HashMap<String, Vec<String>>,
#[configurable(derived)]
tls: Option<TlsConfig>,
#[configurable(derived)]
#[configurable(metadata(docs::advanced))]
auth: Option<Auth>,
}
fn query_example() -> serde_json::Value {
serde_json::json! ({
"match[]": [
"{job=\"somejob\"}",
"{__name__=~\"job:.*\"}"
]
})
}
impl GenerateConfig for PrometheusScrapeConfig {
fn generate_config() -> toml::Value {
toml::Value::try_from(Self {
endpoints: vec!["http://localhost:9090/metrics".to_string()],
interval: default_interval(),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
query: HashMap::new(),
tls: None,
auth: None,
})
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "prometheus_scrape")]
impl SourceConfig for PrometheusScrapeConfig {
async fn build(&self, cx: SourceContext) -> Result<sources::Source> {
let urls = self
.endpoints
.iter()
.map(|s| s.parse::<Uri>().context(sources::UriParseSnafu))
.map(|r| r.map(|uri| build_url(&uri, &self.query)))
.collect::<std::result::Result<Vec<Uri>, sources::BuildError>>()?;
let tls = TlsSettings::from_options(&self.tls)?;
let builder = PrometheusScrapeBuilder {
honor_labels: self.honor_labels,
instance_tag: self.instance_tag.clone(),
endpoint_tag: self.endpoint_tag.clone(),
};
warn_if_interval_too_low(self.timeout, self.interval);
let inputs = GenericHttpClientInputs {
urls,
interval: self.interval,
timeout: self.timeout,
headers: HashMap::new(),
content_type: "text/plain".to_string(),
auth: self.auth.clone(),
tls,
proxy: cx.proxy.clone(),
shutdown: cx.shutdown,
};
Ok(call(inputs, builder, cx.out, HttpMethod::Get).boxed())
}
fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
vec![SourceOutput::new_metrics()]
}
fn can_acknowledge(&self) -> bool {
false
}
}
#[derive(Clone)]
struct InstanceInfo {
tag: String,
instance: String,
honor_label: bool,
}
#[derive(Clone)]
struct EndpointInfo {
tag: String,
endpoint: String,
honor_label: bool,
}
#[derive(Clone)]
struct PrometheusScrapeBuilder {
honor_labels: bool,
instance_tag: Option<String>,
endpoint_tag: Option<String>,
}
impl HttpClientBuilder for PrometheusScrapeBuilder {
type Context = PrometheusScrapeContext;
fn build(&self, url: &Uri) -> Self::Context {
let instance_info = self.instance_tag.as_ref().map(|tag| {
let instance = format!(
"{}:{}",
url.host().unwrap_or_default(),
url.port_u16().unwrap_or_else(|| match url.scheme() {
Some(scheme) if scheme == &http::uri::Scheme::HTTP => 80,
Some(scheme) if scheme == &http::uri::Scheme::HTTPS => 443,
_ => 0,
})
);
InstanceInfo {
tag: tag.to_string(),
instance,
honor_label: self.honor_labels,
}
});
let endpoint_info = self.endpoint_tag.as_ref().map(|tag| EndpointInfo {
tag: tag.to_string(),
endpoint: url.to_string(),
honor_label: self.honor_labels,
});
PrometheusScrapeContext {
instance_info,
endpoint_info,
}
}
}
struct PrometheusScrapeContext {
instance_info: Option<InstanceInfo>,
endpoint_info: Option<EndpointInfo>,
}
impl HttpClientContext for PrometheusScrapeContext {
fn enrich_events(&mut self, events: &mut Vec<Event>) {
for event in events.iter_mut() {
let metric = event.as_mut_metric();
if let Some(InstanceInfo {
tag,
instance,
honor_label,
}) = &self.instance_info
{
match (honor_label, metric.tag_value(tag)) {
(false, Some(old_instance)) => {
metric.replace_tag(format!("exported_{}", tag), old_instance);
metric.replace_tag(tag.clone(), instance.clone());
}
(true, Some(_)) => {}
(_, None) => {
metric.replace_tag(tag.clone(), instance.clone());
}
}
}
if let Some(EndpointInfo {
tag,
endpoint,
honor_label,
}) = &self.endpoint_info
{
match (honor_label, metric.tag_value(tag)) {
(false, Some(old_endpoint)) => {
metric.replace_tag(format!("exported_{}", tag), old_endpoint);
metric.replace_tag(tag.clone(), endpoint.clone());
}
(true, Some(_)) => {}
(_, None) => {
metric.replace_tag(tag.clone(), endpoint.clone());
}
}
}
}
}
fn on_response(&mut self, url: &Uri, _header: &Parts, body: &Bytes) -> Option<Vec<Event>> {
let body = String::from_utf8_lossy(body);
match parser::parse_text(&body) {
Ok(events) => Some(events),
Err(error) => {
if url.path() == "/" {
warn!(
message = PARSE_ERROR_NO_PATH,
endpoint = %url,
);
}
emit!(PrometheusParseError {
error,
url: url.clone(),
body,
});
None
}
}
}
fn on_http_response_error(&self, url: &Uri, header: &Parts) {
if header.status == hyper::StatusCode::NOT_FOUND && url.path() == "/" {
warn!(
message = NOT_FOUND_NO_PATH,
endpoint = %url,
);
}
}
}
#[cfg(all(test, feature = "sinks-prometheus"))]
mod test {
use hyper::{
service::{make_service_fn, service_fn},
Body, Client, Response, Server,
};
use similar_asserts::assert_eq;
use tokio::time::{sleep, Duration};
use warp::Filter;
use super::*;
use crate::{
config,
sinks::prometheus::exporter::PrometheusExporterConfig,
test_util::{
components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS},
next_addr, start_topology, trace_init, wait_for_tcp,
},
Error,
};
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<PrometheusScrapeConfig>();
}
#[tokio::test]
async fn test_prometheus_sets_headers() {
let in_addr = next_addr();
let dummy_endpoint = warp::path!("metrics").and(warp::header::exact("Accept", "text/plain")).map(|| {
r#"
promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
"#
});
tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
wait_for_tcp(in_addr).await;
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
query: HashMap::new(),
auth: None,
tls: None,
};
let events = run_and_assert_source_compliance(
config,
Duration::from_secs(3),
&HTTP_PULL_SOURCE_TAGS,
)
.await;
assert!(!events.is_empty());
}
#[tokio::test]
async fn test_prometheus_honor_labels() {
let in_addr = next_addr();
let dummy_endpoint = warp::path!("metrics").map(|| {
r#"
promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
"#
});
tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
wait_for_tcp(in_addr).await;
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
query: HashMap::new(),
auth: None,
tls: None,
};
let events = run_and_assert_source_compliance(
config,
Duration::from_secs(3),
&HTTP_PULL_SOURCE_TAGS,
)
.await;
assert!(!events.is_empty());
let metrics: Vec<_> = events
.into_iter()
.map(|event| event.into_metric())
.collect();
for metric in metrics {
assert_eq!(
metric.tag_value("instance"),
Some(String::from("localhost:9999"))
);
assert_eq!(
metric.tag_value("endpoint"),
Some(String::from("http://example.com"))
);
assert_eq!(metric.tag_value("exported_instance"), None,);
assert_eq!(metric.tag_value("exported_endpoint"), None,);
}
}
#[tokio::test]
async fn test_prometheus_do_not_honor_labels() {
let in_addr = next_addr();
let dummy_endpoint = warp::path!("metrics").map(|| {
r#"
promhttp_metric_handler_requests_total{endpoint="http://example.com", instance="localhost:9999", code="200"} 100 1612411516789
"#
});
tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
wait_for_tcp(in_addr).await;
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
query: HashMap::new(),
auth: None,
tls: None,
};
let events = run_and_assert_source_compliance(
config,
Duration::from_secs(3),
&HTTP_PULL_SOURCE_TAGS,
)
.await;
assert!(!events.is_empty());
let metrics: Vec<_> = events
.into_iter()
.map(|event| event.into_metric())
.collect();
for metric in metrics {
assert_eq!(
metric.tag_value("instance"),
Some(format!("{}:{}", in_addr.ip(), in_addr.port()))
);
assert_eq!(
metric.tag_value("endpoint"),
Some(format!(
"http://{}:{}/metrics",
in_addr.ip(),
in_addr.port()
))
);
assert_eq!(
metric.tag_value("exported_instance"),
Some(String::from("localhost:9999"))
);
assert_eq!(
metric.tag_value("exported_endpoint"),
Some(String::from("http://example.com"))
);
}
}
#[tokio::test]
async fn test_prometheus_duplicate_tags() {
let in_addr = next_addr();
let dummy_endpoint = warp::path!("metrics").map(|| {
r#"
metric_label{code="200",code="success"} 100 1612411516789
"#
});
tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
wait_for_tcp(in_addr).await;
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: true,
query: HashMap::new(),
auth: None,
tls: None,
};
let events = run_and_assert_source_compliance(
config,
Duration::from_secs(3),
&HTTP_PULL_SOURCE_TAGS,
)
.await;
assert!(!events.is_empty());
let metrics: Vec<vector_lib::event::Metric> = events
.into_iter()
.map(|event| event.into_metric())
.collect();
let metric = &metrics[0];
assert_eq!(metric.name(), "metric_label");
let code_tag = metric
.tags()
.unwrap()
.iter_all()
.filter(|(name, _value)| *name == "code")
.map(|(_name, value)| value)
.collect::<Vec<_>>();
assert_eq!(1, code_tag.len());
assert_eq!("success", code_tag[0].unwrap());
}
#[tokio::test]
async fn test_prometheus_request_query() {
let in_addr = next_addr();
let dummy_endpoint = warp::path!("metrics").and(warp::query::raw()).map(|query| {
format!(
r#"
promhttp_metric_handler_requests_total{{query="{}"}} 100 1612411516789
"#,
query
)
});
tokio::spawn(warp::serve(dummy_endpoint).run(in_addr));
wait_for_tcp(in_addr).await;
let config = PrometheusScrapeConfig {
endpoints: vec![format!("http://{}/metrics?key1=val1", in_addr)],
interval: Duration::from_secs(1),
timeout: default_timeout(),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
query: HashMap::from([
("key1".to_string(), vec!["val2".to_string()]),
(
"key2".to_string(),
vec!["val1".to_string(), "val2".to_string()],
),
]),
auth: None,
tls: None,
};
let events = run_and_assert_source_compliance(
config,
Duration::from_secs(3),
&HTTP_PULL_SOURCE_TAGS,
)
.await;
assert!(!events.is_empty());
let metrics: Vec<_> = events
.into_iter()
.map(|event| event.into_metric())
.collect();
let expected = HashMap::from([
(
"key1".to_string(),
vec!["val1".to_string(), "val2".to_string()],
),
(
"key2".to_string(),
vec!["val1".to_string(), "val2".to_string()],
),
]);
for metric in metrics {
let query = metric.tag_value("query").expect("query must be tagged");
let mut got: HashMap<String, Vec<String>> = HashMap::new();
for (k, v) in url::form_urlencoded::parse(query.as_bytes()) {
got.entry(k.to_string()).or_default().push(v.to_string());
}
for v in got.values_mut() {
v.sort();
}
assert_eq!(got, expected);
}
}
#[tokio::test]
async fn test_prometheus_routing() {
trace_init();
let in_addr = next_addr();
let out_addr = next_addr();
let make_svc = make_service_fn(|_| async {
Ok::<_, Error>(service_fn(|_| async {
Ok::<_, Error>(Response::new(Body::from(
r#"
# HELP promhttp_metric_handler_requests_total Total number of scrapes by HTTP status code.
# TYPE promhttp_metric_handler_requests_total counter
promhttp_metric_handler_requests_total{code="200"} 100 1612411516789
promhttp_metric_handler_requests_total{code="404"} 7 1612411516789
prometheus_remote_storage_samples_in_total 57011636 1612411516789
# A histogram, which has a pretty complex representation in the text format:
# HELP http_request_duration_seconds A histogram of the request duration.
# TYPE http_request_duration_seconds histogram
http_request_duration_seconds_bucket{le="0.05"} 24054 1612411516789
http_request_duration_seconds_bucket{le="0.1"} 33444 1612411516789
http_request_duration_seconds_bucket{le="0.2"} 100392 1612411516789
http_request_duration_seconds_bucket{le="0.5"} 129389 1612411516789
http_request_duration_seconds_bucket{le="1"} 133988 1612411516789
http_request_duration_seconds_bucket{le="+Inf"} 144320 1612411516789
http_request_duration_seconds_sum 53423 1612411516789
http_request_duration_seconds_count 144320 1612411516789
# Finally a summary, which has a complex representation, too:
# HELP rpc_duration_seconds A summary of the RPC duration in seconds.
# TYPE rpc_duration_seconds summary
rpc_duration_seconds{code="200",quantile="0.01"} 3102 1612411516789
rpc_duration_seconds{code="200",quantile="0.05"} 3272 1612411516789
rpc_duration_seconds{code="200",quantile="0.5"} 4773 1612411516789
rpc_duration_seconds{code="200",quantile="0.9"} 9001 1612411516789
rpc_duration_seconds{code="200",quantile="0.99"} 76656 1612411516789
rpc_duration_seconds_sum{code="200"} 1.7560473e+07 1612411516789
rpc_duration_seconds_count{code="200"} 2693 1612411516789
"#,
)))
}))
});
tokio::spawn(async move {
if let Err(error) = Server::bind(&in_addr).serve(make_svc).await {
error!(message = "Server error.", %error);
}
});
wait_for_tcp(in_addr).await;
let mut config = config::Config::builder();
config.add_source(
"in",
PrometheusScrapeConfig {
endpoints: vec![format!("http://{}", in_addr)],
instance_tag: None,
endpoint_tag: None,
honor_labels: false,
query: HashMap::new(),
interval: Duration::from_secs(1),
timeout: default_timeout(),
tls: None,
auth: None,
},
);
config.add_sink(
"out",
&["in"],
PrometheusExporterConfig {
address: out_addr,
auth: None,
tls: None,
default_namespace: Some("vector".into()),
buckets: vec![1.0, 2.0, 4.0],
quantiles: vec![],
distributions_as_summaries: false,
flush_period_secs: Duration::from_secs(3),
suppress_timestamp: false,
acknowledgements: Default::default(),
},
);
let (topology, _) = start_topology(config.build().unwrap(), false).await;
sleep(Duration::from_secs(1)).await;
let response = Client::new()
.get(format!("http://{}/metrics", out_addr).parse().unwrap())
.await
.unwrap();
assert!(response.status().is_success());
let body = hyper::body::to_bytes(response.into_body()).await.unwrap();
let lines = std::str::from_utf8(&body)
.unwrap()
.lines()
.collect::<Vec<_>>();
assert_eq!(lines, vec![
"# HELP vector_http_request_duration_seconds http_request_duration_seconds",
"# TYPE vector_http_request_duration_seconds histogram",
"vector_http_request_duration_seconds_bucket{le=\"0.05\"} 24054 1612411516789",
"vector_http_request_duration_seconds_bucket{le=\"0.1\"} 33444 1612411516789",
"vector_http_request_duration_seconds_bucket{le=\"0.2\"} 100392 1612411516789",
"vector_http_request_duration_seconds_bucket{le=\"0.5\"} 129389 1612411516789",
"vector_http_request_duration_seconds_bucket{le=\"1\"} 133988 1612411516789",
"vector_http_request_duration_seconds_bucket{le=\"+Inf\"} 144320 1612411516789",
"vector_http_request_duration_seconds_sum 53423 1612411516789",
"vector_http_request_duration_seconds_count 144320 1612411516789",
"# HELP vector_prometheus_remote_storage_samples_in_total prometheus_remote_storage_samples_in_total",
"# TYPE vector_prometheus_remote_storage_samples_in_total gauge",
"vector_prometheus_remote_storage_samples_in_total 57011636 1612411516789",
"# HELP vector_promhttp_metric_handler_requests_total promhttp_metric_handler_requests_total",
"# TYPE vector_promhttp_metric_handler_requests_total counter",
"vector_promhttp_metric_handler_requests_total{code=\"200\"} 100 1612411516789",
"vector_promhttp_metric_handler_requests_total{code=\"404\"} 7 1612411516789",
"# HELP vector_rpc_duration_seconds rpc_duration_seconds",
"# TYPE vector_rpc_duration_seconds summary",
"vector_rpc_duration_seconds{code=\"200\",quantile=\"0.01\"} 3102 1612411516789",
"vector_rpc_duration_seconds{code=\"200\",quantile=\"0.05\"} 3272 1612411516789",
"vector_rpc_duration_seconds{code=\"200\",quantile=\"0.5\"} 4773 1612411516789",
"vector_rpc_duration_seconds{code=\"200\",quantile=\"0.9\"} 9001 1612411516789",
"vector_rpc_duration_seconds{code=\"200\",quantile=\"0.99\"} 76656 1612411516789",
"vector_rpc_duration_seconds_sum{code=\"200\"} 17560473 1612411516789",
"vector_rpc_duration_seconds_count{code=\"200\"} 2693 1612411516789",
],
);
topology.stop().await;
}
}
#[cfg(all(test, feature = "prometheus-integration-tests"))]
mod integration_tests {
use tokio::time::Duration;
use super::*;
use crate::{
event::{MetricKind, MetricValue},
test_util::components::{run_and_assert_source_compliance, HTTP_PULL_SOURCE_TAGS},
};
#[tokio::test]
async fn scrapes_metrics() {
let config = PrometheusScrapeConfig {
endpoints: vec!["http://prometheus:9090/metrics".into()],
interval: Duration::from_secs(1),
timeout: Duration::from_secs(1),
instance_tag: Some("instance".to_string()),
endpoint_tag: Some("endpoint".to_string()),
honor_labels: false,
query: HashMap::new(),
auth: None,
tls: None,
};
let events = run_and_assert_source_compliance(
config,
Duration::from_secs(3),
&HTTP_PULL_SOURCE_TAGS,
)
.await;
assert!(!events.is_empty());
let metrics: Vec<_> = events
.into_iter()
.map(|event| event.into_metric())
.collect();
let find_metric = |name: &str| {
metrics
.iter()
.find(|metric| metric.name() == name)
.unwrap_or_else(|| panic!("Missing metric {:?}", name))
};
let build = find_metric("prometheus_build_info");
assert!(matches!(build.kind(), MetricKind::Absolute));
assert!(matches!(build.value(), &MetricValue::Gauge { .. }));
assert!(build.tags().unwrap().contains_key("branch"));
assert!(build.tags().unwrap().contains_key("version"));
assert_eq!(
build.tag_value("instance"),
Some("prometheus:9090".to_string())
);
assert_eq!(
build.tag_value("endpoint"),
Some("http://prometheus:9090/metrics".to_string())
);
let queries = find_metric("prometheus_engine_queries");
assert!(matches!(queries.kind(), MetricKind::Absolute));
assert!(matches!(queries.value(), &MetricValue::Gauge { .. }));
assert_eq!(
queries.tag_value("instance"),
Some("prometheus:9090".to_string())
);
assert_eq!(
queries.tag_value("endpoint"),
Some("http://prometheus:9090/metrics".to_string())
);
let go_info = find_metric("go_info");
assert!(matches!(go_info.kind(), MetricKind::Absolute));
assert!(matches!(go_info.value(), &MetricValue::Gauge { .. }));
assert!(go_info.tags().unwrap().contains_key("version"));
assert_eq!(
go_info.tag_value("instance"),
Some("prometheus:9090".to_string())
);
assert_eq!(
go_info.tag_value("endpoint"),
Some("http://prometheus:9090/metrics".to_string())
);
}
}