1use std::{
2 convert::Infallible,
3 hash::Hash,
4 mem::{Discriminant, discriminant},
5 net::{IpAddr, Ipv4Addr, SocketAddr},
6 sync::{Arc, RwLock},
7 time::{Duration, Instant},
8};
9
10use async_trait::async_trait;
11use base64::prelude::{BASE64_STANDARD, Engine as _};
12use futures::{FutureExt, StreamExt, future, stream::BoxStream};
13use hyper::{
14 Body, Method, Request, Response, Server, StatusCode,
15 body::HttpBody,
16 header::HeaderValue,
17 service::{make_service_fn, service_fn},
18};
19use indexmap::{IndexMap, map::Entry};
20use serde_with::serde_as;
21use snafu::Snafu;
22use stream_cancel::{Trigger, Tripwire};
23use tower::ServiceBuilder;
24use tower_http::compression::CompressionLayer;
25use tracing::{Instrument, Span};
26use vector_lib::{
27 ByteSizeOf, EstimatedJsonEncodedSizeOf,
28 configurable::configurable_component,
29 internal_event::{
30 ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle as _, Output, Protocol,
31 Registered,
32 },
33};
34
35use super::collector::{MetricCollector, StringCollector};
36use crate::{
37 config::{AcknowledgementsConfig, GenerateConfig, Input, Resource, SinkConfig, SinkContext},
38 event::{
39 Event, EventStatus, Finalizable,
40 metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue},
41 },
42 http::{Auth, build_http_trace_layer},
43 internal_events::PrometheusNormalizationError,
44 sinks::{
45 Healthcheck, VectorSink,
46 util::{StreamSink, statistic::validate_quantiles},
47 },
48 tls::{MaybeTlsSettings, TlsEnableableConfig},
49};
50
51const MIN_FLUSH_PERIOD_SECS: u64 = 1;
52
53const LOCK_FAILED: &str = "Prometheus exporter data lock is poisoned";
54
55#[derive(Debug, Snafu)]
56enum BuildError {
57 #[snafu(display("Flush period for sets must be greater or equal to {} secs", min))]
58 FlushPeriodTooShort { min: u64 },
59}
60
61#[serde_as]
63#[configurable_component(sink(
64 "prometheus_exporter",
65 "Expose metric events on a Prometheus compatible endpoint."
66))]
67#[derive(Clone, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct PrometheusExporterConfig {
70 #[serde(alias = "namespace")]
79 #[configurable(metadata(docs::advanced))]
80 pub default_namespace: Option<String>,
81
82 #[serde(default = "default_address")]
86 #[configurable(metadata(docs::examples = "192.160.0.10:9598"))]
87 pub address: SocketAddr,
88
89 #[configurable(derived)]
90 pub auth: Option<Auth>,
91
92 #[configurable(derived)]
93 pub tls: Option<TlsEnableableConfig>,
94
95 #[serde(default = "super::default_histogram_buckets")]
99 #[configurable(metadata(docs::advanced))]
100 pub buckets: Vec<f64>,
101
102 #[serde(default = "super::default_summary_quantiles")]
106 #[configurable(metadata(docs::advanced))]
107 pub quantiles: Vec<f64>,
108
109 #[serde(default = "default_distributions_as_summaries")]
119 #[configurable(metadata(docs::advanced))]
120 pub distributions_as_summaries: bool,
121
122 #[serde(default = "default_flush_period_secs")]
129 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
130 #[configurable(metadata(docs::advanced))]
131 #[configurable(metadata(docs::human_name = "Flush Interval"))]
132 pub flush_period_secs: Duration,
133
134 #[serde(default)]
140 #[configurable(metadata(docs::advanced))]
141 pub suppress_timestamp: bool,
142
143 #[configurable(derived)]
144 #[serde(
145 default,
146 deserialize_with = "crate::serde::bool_or_struct",
147 skip_serializing_if = "crate::serde::is_default"
148 )]
149 pub acknowledgements: AcknowledgementsConfig,
150}
151
152impl Default for PrometheusExporterConfig {
153 fn default() -> Self {
154 Self {
155 default_namespace: None,
156 address: default_address(),
157 auth: None,
158 tls: None,
159 buckets: super::default_histogram_buckets(),
160 quantiles: super::default_summary_quantiles(),
161 distributions_as_summaries: default_distributions_as_summaries(),
162 flush_period_secs: default_flush_period_secs(),
163 suppress_timestamp: default_suppress_timestamp(),
164 acknowledgements: Default::default(),
165 }
166 }
167}
168
169const fn default_address() -> SocketAddr {
170 SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9598)
171}
172
173const fn default_distributions_as_summaries() -> bool {
174 false
175}
176
177const fn default_flush_period_secs() -> Duration {
178 Duration::from_secs(60)
179}
180
181const fn default_suppress_timestamp() -> bool {
182 false
183}
184
185impl GenerateConfig for PrometheusExporterConfig {
186 fn generate_config() -> toml::Value {
187 toml::Value::try_from(Self::default()).unwrap()
188 }
189}
190
191#[async_trait::async_trait]
192#[typetag::serde(name = "prometheus_exporter")]
193impl SinkConfig for PrometheusExporterConfig {
194 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
195 if self.flush_period_secs.as_secs() < MIN_FLUSH_PERIOD_SECS {
196 return Err(Box::new(BuildError::FlushPeriodTooShort {
197 min: MIN_FLUSH_PERIOD_SECS,
198 }));
199 }
200
201 validate_quantiles(&self.quantiles)?;
202
203 let sink = PrometheusExporter::new(self.clone());
204 let healthcheck = future::ok(()).boxed();
205
206 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
207 }
208
209 fn input(&self) -> Input {
210 Input::metric()
211 }
212
213 fn resources(&self) -> Vec<Resource> {
214 vec![Resource::tcp(self.address)]
215 }
216
217 fn acknowledgements(&self) -> &AcknowledgementsConfig {
218 &self.acknowledgements
219 }
220}
221
222struct PrometheusExporter {
223 server_shutdown_trigger: Option<Trigger>,
224 config: PrometheusExporterConfig,
225 metrics: Arc<RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>>,
226}
227
228#[derive(Clone, Copy, Debug)]
230struct MetricMetadata {
231 expiration_window: Duration,
232 expires_at: Instant,
233}
234
235impl MetricMetadata {
236 pub fn new(expiration_window: Duration) -> Self {
237 Self {
238 expiration_window,
239 expires_at: Instant::now() + expiration_window,
240 }
241 }
242
243 pub fn refresh(&mut self) {
245 self.expires_at = Instant::now() + self.expiration_window;
246 }
247
248 pub fn has_expired(&self, now: Instant) -> bool {
250 now >= self.expires_at
251 }
252}
253
254#[derive(Clone, Debug)]
263struct MetricRef {
264 series: MetricSeries,
265 value: Discriminant<MetricValue>,
266 bounds: Option<Vec<f64>>,
267}
268
269impl MetricRef {
270 pub fn from_metric(metric: &Metric) -> Self {
272 let bounds = match metric.value() {
274 MetricValue::AggregatedHistogram { buckets, .. } => {
275 Some(buckets.iter().map(|b| b.upper_limit).collect())
276 }
277 MetricValue::AggregatedSummary { quantiles, .. } => {
278 Some(quantiles.iter().map(|q| q.quantile).collect())
279 }
280 _ => None,
281 };
282
283 Self {
284 series: metric.series().clone(),
285 value: discriminant(metric.value()),
286 bounds,
287 }
288 }
289}
290
291impl PartialEq for MetricRef {
292 fn eq(&self, other: &Self) -> bool {
293 self.series == other.series && self.value == other.value && self.bounds == other.bounds
294 }
295}
296
297impl Eq for MetricRef {}
298
299impl Hash for MetricRef {
300 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
301 self.series.hash(state);
302 self.value.hash(state);
303 if let Some(bounds) = &self.bounds {
304 for bound in bounds {
305 bound.to_bits().hash(state);
306 }
307 }
308 }
309}
310
311fn authorized<T: HttpBody>(req: &Request<T>, auth: &Option<Auth>) -> bool {
312 if let Some(auth) = auth {
313 let headers = req.headers();
314 if let Some(auth_header) = headers.get(hyper::header::AUTHORIZATION) {
315 let encoded_credentials = match auth {
316 Auth::Basic { user, password } => Some(HeaderValue::from_str(
317 format!(
318 "Basic {}",
319 BASE64_STANDARD.encode(format!("{}:{}", user, password.inner()))
320 )
321 .as_str(),
322 )),
323 Auth::Bearer { token } => Some(HeaderValue::from_str(
324 format!("Bearer {}", token.inner()).as_str(),
325 )),
326 Auth::Custom { value } => Some(HeaderValue::from_str(value)),
327 #[cfg(feature = "aws-core")]
328 _ => None,
329 };
330
331 if let Some(Ok(encoded_credentials)) = encoded_credentials
332 && auth_header == encoded_credentials
333 {
334 return true;
335 }
336 }
337 } else {
338 return true;
339 }
340
341 false
342}
343
344#[derive(Clone)]
345struct Handler {
346 auth: Option<Auth>,
347 default_namespace: Option<String>,
348 buckets: Box<[f64]>,
349 quantiles: Box<[f64]>,
350 bytes_sent: Registered<BytesSent>,
351 events_sent: Registered<EventsSent>,
352}
353
354impl Handler {
355 fn handle<T: HttpBody>(
356 &self,
357 req: Request<T>,
358 metrics: &RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>,
359 ) -> Response<Body> {
360 let mut response = Response::new(Body::empty());
361
362 match (authorized(&req, &self.auth), req.method(), req.uri().path()) {
363 (false, _, _) => {
364 *response.status_mut() = StatusCode::UNAUTHORIZED;
365 response.headers_mut().insert(
366 http::header::WWW_AUTHENTICATE,
367 HeaderValue::from_static("Basic, Bearer"),
368 );
369 }
370
371 (true, &Method::GET, "/metrics") => {
372 let metrics = metrics.read().expect(LOCK_FAILED);
373
374 let count = metrics.len();
375 let byte_size = metrics
376 .iter()
377 .map(|(_, (metric, _))| metric.estimated_json_encoded_size_of())
378 .sum();
379
380 let mut collector = StringCollector::new();
381
382 for (_, (metric, _)) in metrics.iter() {
383 collector.encode_metric(
384 self.default_namespace.as_deref(),
385 &self.buckets,
386 &self.quantiles,
387 metric,
388 );
389 }
390
391 drop(metrics);
392
393 let body = collector.finish();
394 let body_size = body.size_of();
395
396 *response.body_mut() = body.into();
397
398 response.headers_mut().insert(
399 "Content-Type",
400 HeaderValue::from_static("text/plain; version=0.0.4"),
401 );
402
403 self.events_sent.emit(CountByteSize(count, byte_size));
404 self.bytes_sent.emit(ByteSize(body_size));
405 }
406
407 (true, _, _) => {
408 *response.status_mut() = StatusCode::NOT_FOUND;
409 }
410 }
411
412 response
413 }
414}
415
416impl PrometheusExporter {
417 fn new(config: PrometheusExporterConfig) -> Self {
418 Self {
419 server_shutdown_trigger: None,
420 config,
421 metrics: Arc::new(RwLock::new(IndexMap::new())),
422 }
423 }
424
425 async fn start_server_if_needed(&mut self) -> crate::Result<()> {
426 if self.server_shutdown_trigger.is_some() {
427 return Ok(());
428 }
429
430 let handler = Handler {
431 bytes_sent: register!(BytesSent::from(Protocol::HTTP)),
432 events_sent: register!(EventsSent::from(Output(None))),
433 default_namespace: self.config.default_namespace.clone(),
434 buckets: self.config.buckets.clone().into(),
435 quantiles: self.config.quantiles.clone().into(),
436 auth: self.config.auth.clone(),
437 };
438
439 let span = Span::current();
440 let metrics = Arc::clone(&self.metrics);
441
442 let new_service = make_service_fn(move |_| {
443 let span = Span::current();
444 let metrics = Arc::clone(&metrics);
445 let handler = handler.clone();
446
447 let inner = service_fn(move |req| {
448 let response = handler.handle(req, &metrics);
449
450 future::ok::<_, Infallible>(response)
451 });
452
453 let service = ServiceBuilder::new()
454 .layer(build_http_trace_layer(span.clone()))
455 .layer(CompressionLayer::new())
456 .service(inner);
457
458 async move { Ok::<_, Infallible>(service) }
459 });
460
461 let (trigger, tripwire) = Tripwire::new();
462
463 let tls = self.config.tls.clone();
464 let address = self.config.address;
465
466 let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?;
467 let listener = tls.bind(&address).await?;
468
469 tokio::spawn(async move {
470 info!(message = "Building HTTP server.", address = %address);
471
472 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
473 .serve(new_service)
474 .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler))
475 .instrument(span)
476 .await
477 .map_err(|error| error!("Server error: {}.", error))?;
478
479 Ok::<(), ()>(())
480 });
481
482 self.server_shutdown_trigger = Some(trigger);
483 Ok(())
484 }
485
486 fn normalize(&mut self, metric: Metric) -> Option<Metric> {
487 let new_metric = match metric.value() {
488 MetricValue::Distribution { .. } => {
489 let (series, data, metadata) = metric.into_parts();
491 let (time, kind, value) = data.into_parts();
492
493 let new_value = if self.config.distributions_as_summaries {
494 value
499 .distribution_to_sketch()
500 .expect("value should be distribution already")
501 } else {
502 value
503 .distribution_to_agg_histogram(&self.config.buckets)
504 .expect("value should be distribution already")
505 };
506
507 let data = MetricData::from_parts(time, kind, new_value);
508 Metric::from_parts(series, data, metadata)
509 }
510 _ => metric,
511 };
512
513 match new_metric.kind() {
514 MetricKind::Absolute => Some(new_metric),
515 MetricKind::Incremental => {
516 let metrics = self.metrics.read().expect(LOCK_FAILED);
517 let metric_ref = MetricRef::from_metric(&new_metric);
518
519 if let Some(existing) = metrics.get(&metric_ref) {
520 let mut current = existing.0.value().clone();
521 if current.add(new_metric.value()) {
522 return Some(new_metric.with_value(current).into_absolute());
525 }
526 }
527
528 Some(new_metric.into_absolute())
531 }
532 }
533 }
534}
535
536#[async_trait]
537impl StreamSink<Event> for PrometheusExporter {
538 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
539 self.start_server_if_needed()
540 .await
541 .map_err(|error| error!("Failed to start Prometheus exporter: {}.", error))?;
542
543 let mut last_flush = Instant::now();
544 let flush_period = self.config.flush_period_secs;
545
546 while let Some(event) = input.next().await {
547 if last_flush.elapsed() > self.config.flush_period_secs {
556 last_flush = Instant::now();
557
558 let mut metrics = self.metrics.write().expect(LOCK_FAILED);
559
560 metrics.retain(|_metric_ref, (_, metadata)| !metadata.has_expired(last_flush));
561 }
562
563 let mut metric = event.into_metric();
565 let finalizers = metric.take_finalizers();
566
567 match self.normalize(metric) {
568 Some(normalized) => {
569 let normalized = if self.config.suppress_timestamp {
570 normalized.with_timestamp(None)
571 } else {
572 normalized
573 };
574
575 let mut metrics = self.metrics.write().expect(LOCK_FAILED);
578
579 match metrics.entry(MetricRef::from_metric(&normalized)) {
580 Entry::Occupied(mut entry) => {
581 let (data, metadata) = entry.get_mut();
582 *data = normalized;
583 metadata.refresh();
584 }
585 Entry::Vacant(entry) => {
586 entry.insert((normalized, MetricMetadata::new(flush_period)));
587 }
588 }
589 finalizers.update_status(EventStatus::Delivered);
590 }
591 _ => {
592 emit!(PrometheusNormalizationError {});
593 finalizers.update_status(EventStatus::Errored);
594 }
595 }
596 }
597
598 Ok(())
599 }
600}
601
602#[cfg(test)]
603mod tests {
604 use std::io::Read;
605
606 use chrono::{Duration, Utc};
607 use flate2::read::GzDecoder;
608 use futures::stream;
609 use indoc::indoc;
610 use similar_asserts::assert_eq;
611 use tokio::{sync::oneshot::error::TryRecvError, time};
612 use vector_lib::{
613 event::{MetricTags, StatisticKind},
614 finalization::{BatchNotifier, BatchStatus},
615 metric_tags, samples,
616 sensitive_string::SensitiveString,
617 };
618
619 use super::*;
620 use crate::{
621 config::ProxyConfig,
622 event::metric::{Metric, MetricValue},
623 http::HttpClient,
624 sinks::prometheus::{distribution_to_agg_histogram, distribution_to_ddsketch},
625 test_util::{
626 addr::next_addr,
627 components::{SINK_TAGS, run_and_assert_sink_compliance},
628 random_string, trace_init,
629 },
630 tls::MaybeTlsSettings,
631 };
632
633 #[test]
634 fn generate_config() {
635 crate::test_util::test_generate_config::<PrometheusExporterConfig>();
636 }
637
638 #[tokio::test]
639 async fn prometheus_notls() {
640 export_and_fetch_simple(None).await;
641 }
642
643 #[tokio::test]
644 async fn prometheus_tls() {
645 let mut tls_config = TlsEnableableConfig::test_config();
646 tls_config.options.verify_hostname = Some(false);
647 export_and_fetch_simple(Some(tls_config)).await;
648 }
649
650 #[tokio::test]
651 async fn prometheus_noauth() {
652 let (name1, event1) = create_metric_gauge(None, 123.4);
653 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
654 let events = vec![event1, event2];
655
656 let response_result = export_and_fetch_with_auth(None, None, events, false).await;
657
658 assert!(response_result.is_ok());
659
660 let body = response_result.expect("Cannot extract body from the response");
661
662 assert!(body.contains(&format!(
663 indoc! {r#"
664 # HELP {name} {name}
665 # TYPE {name} gauge
666 {name}{{some_tag="some_value"}} 123.4
667 "#},
668 name = name1
669 )));
670 assert!(body.contains(&format!(
671 indoc! {r#"
672 # HELP {name} {name}
673 # TYPE {name} gauge
674 {name}{{some_tag="some_value"}} 3
675 "#},
676 name = name2
677 )));
678 }
679
680 #[tokio::test]
681 async fn prometheus_successful_basic_auth() {
682 let (name1, event1) = create_metric_gauge(None, 123.4);
683 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
684 let events = vec![event1, event2];
685
686 let auth_config = Auth::Basic {
687 user: "user".to_string(),
688 password: SensitiveString::from("password".to_string()),
689 };
690
691 let response_result =
692 export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
693 .await;
694
695 assert!(response_result.is_ok());
696
697 let body = response_result.expect("Cannot extract body from the response");
698
699 assert!(body.contains(&format!(
700 indoc! {r#"
701 # HELP {name} {name}
702 # TYPE {name} gauge
703 {name}{{some_tag="some_value"}} 123.4
704 "#},
705 name = name1
706 )));
707 assert!(body.contains(&format!(
708 indoc! {r#"
709 # HELP {name} {name}
710 # TYPE {name} gauge
711 {name}{{some_tag="some_value"}} 3
712 "#},
713 name = name2
714 )));
715 }
716
717 #[tokio::test]
718 async fn prometheus_successful_token_auth() {
719 let (name1, event1) = create_metric_gauge(None, 123.4);
720 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
721 let events = vec![event1, event2];
722
723 let auth_config = Auth::Bearer {
724 token: SensitiveString::from("token".to_string()),
725 };
726
727 let response_result =
728 export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
729 .await;
730
731 assert!(response_result.is_ok());
732
733 let body = response_result.expect("Cannot extract body from the response");
734
735 assert!(body.contains(&format!(
736 indoc! {r#"
737 # HELP {name} {name}
738 # TYPE {name} gauge
739 {name}{{some_tag="some_value"}} 123.4
740 "#},
741 name = name1
742 )));
743 assert!(body.contains(&format!(
744 indoc! {r#"
745 # HELP {name} {name}
746 # TYPE {name} gauge
747 {name}{{some_tag="some_value"}} 3
748 "#},
749 name = name2
750 )));
751 }
752
753 #[tokio::test]
754 async fn prometheus_missing_auth() {
755 let (_, event1) = create_metric_gauge(None, 123.4);
756 let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
757 let events = vec![event1, event2];
758
759 let server_auth_config = Auth::Bearer {
760 token: SensitiveString::from("token".to_string()),
761 };
762
763 let response_result =
764 export_and_fetch_with_auth(Some(server_auth_config), None, events, false).await;
765
766 assert!(response_result.is_err());
767 assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
768 }
769
770 #[tokio::test]
771 async fn prometheus_wrong_auth() {
772 let (_, event1) = create_metric_gauge(None, 123.4);
773 let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
774 let events = vec![event1, event2];
775
776 let server_auth_config = Auth::Bearer {
777 token: SensitiveString::from("token".to_string()),
778 };
779
780 let client_auth_config = Auth::Basic {
781 user: "user".to_string(),
782 password: SensitiveString::from("password".to_string()),
783 };
784
785 let response_result = export_and_fetch_with_auth(
786 Some(server_auth_config),
787 Some(client_auth_config),
788 events,
789 false,
790 )
791 .await;
792
793 assert!(response_result.is_err());
794 assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
795 }
796
797 #[tokio::test]
798 async fn encoding_gzip() {
799 let (name1, event1) = create_metric_gauge(None, 123.4);
800 let events = vec![event1];
801
802 let body_raw = export_and_fetch_raw(None, events, false, Some(String::from("gzip"))).await;
803 let expected = format!(
804 indoc! {r#"
805 # HELP {name} {name}
806 # TYPE {name} gauge
807 {name}{{some_tag="some_value"}} 123.4
808 "#},
809 name = name1,
810 );
811
812 let mut gz = GzDecoder::new(&body_raw[..]);
813 let mut body_decoded = String::new();
814 let _ = gz.read_to_string(&mut body_decoded);
815
816 assert!(body_raw.len() < expected.len());
817 assert_eq!(body_decoded, expected);
818 }
819
820 #[tokio::test]
821 async fn updates_timestamps() {
822 let timestamp1 = Utc::now();
823 let (name, event1) = create_metric_gauge(None, 123.4);
824 let event1 = Event::from(event1.into_metric().with_timestamp(Some(timestamp1)));
825 let (_, event2) = create_metric_gauge(Some(name.clone()), 12.0);
826 let timestamp2 = timestamp1 + Duration::seconds(1);
827 let event2 = Event::from(event2.into_metric().with_timestamp(Some(timestamp2)));
828 let events = vec![event1, event2];
829
830 let body = export_and_fetch(None, events, false).await;
831 let timestamp = timestamp2.timestamp_millis();
832 assert_eq!(
833 body,
834 format!(
835 indoc! {r#"
836 # HELP {name} {name}
837 # TYPE {name} gauge
838 {name}{{some_tag="some_value"}} 135.4 {timestamp}
839 "#},
840 name = name,
841 timestamp = timestamp
842 )
843 );
844 }
845
846 #[tokio::test]
847 async fn suppress_timestamp() {
848 let timestamp = Utc::now();
849 let (name, event) = create_metric_gauge(None, 123.4);
850 let event = Event::from(event.into_metric().with_timestamp(Some(timestamp)));
851 let events = vec![event];
852
853 let body = export_and_fetch(None, events, true).await;
854 assert_eq!(
855 body,
856 format!(
857 indoc! {r#"
858 # HELP {name} {name}
859 # TYPE {name} gauge
860 {name}{{some_tag="some_value"}} 123.4
861 "#},
862 name = name,
863 )
864 );
865 }
866
867 #[tokio::test]
872 async fn prometheus_duplicate_labels() {
873 let (name, event) = create_metric_with_tags(
874 None,
875 MetricValue::Gauge { value: 123.4 },
876 Some(metric_tags!("code" => "200", "code" => "success")),
877 );
878 let events = vec![event];
879
880 let response_result = export_and_fetch_with_auth(None, None, events, false).await;
881
882 assert!(response_result.is_ok());
883
884 let body = response_result.expect("Cannot extract body from the response");
885
886 assert!(body.contains(&format!(
887 indoc! {r#"
888 # HELP {name} {name}
889 # TYPE {name} gauge
890 {name}{{code="success"}} 123.4
891 "# },
892 name = name
893 )));
894 }
895
896 async fn export_and_fetch_raw(
897 tls_config: Option<TlsEnableableConfig>,
898 mut events: Vec<Event>,
899 suppress_timestamp: bool,
900 encoding: Option<String>,
901 ) -> hyper::body::Bytes {
902 trace_init();
903
904 let client_settings = MaybeTlsSettings::from_config(tls_config.as_ref(), false).unwrap();
905 let proto = client_settings.http_protocol_name();
906
907 let (_guard, address) = next_addr();
908 let config = PrometheusExporterConfig {
909 address,
910 tls: tls_config,
911 suppress_timestamp,
912 ..Default::default()
913 };
914
915 let mut receiver = BatchNotifier::apply_to(&mut events[..]);
917 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
918
919 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
920 let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
921 let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
922 sink,
923 stream::iter(events).chain(stream::once(async move {
924 time::sleep(time::Duration::from_millis(500)).await;
926 delayed_event
927 })),
928 &SINK_TAGS,
929 ));
930
931 time::sleep(time::Duration::from_millis(100)).await;
932
933 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
935
936 let mut request = Request::get(format!("{proto}://{address}/metrics"))
937 .body(Body::empty())
938 .expect("Error creating request.");
939
940 if let Some(ref encoding) = encoding {
941 request.headers_mut().insert(
942 http::header::ACCEPT_ENCODING,
943 HeaderValue::from_str(encoding.as_str()).unwrap(),
944 );
945 }
946
947 let proxy = ProxyConfig::default();
948 let result = HttpClient::new(client_settings, &proxy)
949 .unwrap()
950 .send(request)
951 .await
952 .expect("Could not fetch query");
953
954 assert!(result.status().is_success());
955
956 if encoding.is_some() {
957 assert!(
958 result
959 .headers()
960 .contains_key(http::header::CONTENT_ENCODING)
961 );
962 }
963
964 let body = result.into_body();
965 let bytes = http_body::Body::collect(body)
966 .await
967 .expect("Reading body failed")
968 .to_bytes();
969
970 sink_handle.await.unwrap();
971
972 bytes
973 }
974
975 async fn export_and_fetch(
976 tls_config: Option<TlsEnableableConfig>,
977 events: Vec<Event>,
978 suppress_timestamp: bool,
979 ) -> String {
980 let bytes = export_and_fetch_raw(tls_config, events, suppress_timestamp, None);
981 String::from_utf8(bytes.await.to_vec()).unwrap()
982 }
983
984 async fn export_and_fetch_with_auth(
985 server_auth_config: Option<Auth>,
986 client_auth_config: Option<Auth>,
987 mut events: Vec<Event>,
988 suppress_timestamp: bool,
989 ) -> Result<String, http::status::StatusCode> {
990 trace_init();
991
992 let client_settings = MaybeTlsSettings::from_config(None, false).unwrap();
993 let proto = client_settings.http_protocol_name();
994
995 let (_guard, address) = next_addr();
996 let config = PrometheusExporterConfig {
997 address,
998 auth: server_auth_config,
999 tls: None,
1000 suppress_timestamp,
1001 ..Default::default()
1002 };
1003
1004 let mut receiver = BatchNotifier::apply_to(&mut events[..]);
1006 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
1007
1008 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1009 let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
1010 let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
1011 sink,
1012 stream::iter(events).chain(stream::once(async move {
1013 time::sleep(time::Duration::from_millis(500)).await;
1015 delayed_event
1016 })),
1017 &SINK_TAGS,
1018 ));
1019
1020 time::sleep(time::Duration::from_millis(100)).await;
1021
1022 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
1024
1025 let mut request = Request::get(format!("{proto}://{address}/metrics"))
1026 .body(Body::empty())
1027 .expect("Error creating request.");
1028
1029 if let Some(client_auth_config) = client_auth_config {
1030 client_auth_config.apply(&mut request);
1031 }
1032
1033 let proxy = ProxyConfig::default();
1034 let result = HttpClient::new(client_settings, &proxy)
1035 .unwrap()
1036 .send(request)
1037 .await
1038 .expect("Could not fetch query");
1039
1040 if !result.status().is_success() {
1041 return Err(result.status());
1042 }
1043
1044 let body = result.into_body();
1045 let bytes = http_body::Body::collect(body)
1046 .await
1047 .expect("Reading body failed")
1048 .to_bytes();
1049 let result = String::from_utf8(bytes.to_vec()).unwrap();
1050
1051 sink_handle.await.unwrap();
1052
1053 Ok(result)
1054 }
1055
1056 async fn export_and_fetch_simple(tls_config: Option<TlsEnableableConfig>) {
1057 let (name1, event1) = create_metric_gauge(None, 123.4);
1058 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1059 let events = vec![event1, event2];
1060
1061 let body = export_and_fetch(tls_config, events, false).await;
1062
1063 assert!(body.contains(&format!(
1064 indoc! {r#"
1065 # HELP {name} {name}
1066 # TYPE {name} gauge
1067 {name}{{some_tag="some_value"}} 123.4
1068 "#},
1069 name = name1
1070 )));
1071 assert!(body.contains(&format!(
1072 indoc! {r#"
1073 # HELP {name} {name}
1074 # TYPE {name} gauge
1075 {name}{{some_tag="some_value"}} 3
1076 "#},
1077 name = name2
1078 )));
1079 }
1080
1081 pub fn create_metric_gauge(name: Option<String>, value: f64) -> (String, Event) {
1082 create_metric(name, MetricValue::Gauge { value })
1083 }
1084
1085 pub fn create_metric_set(name: Option<String>, values: Vec<&'static str>) -> (String, Event) {
1086 create_metric(
1087 name,
1088 MetricValue::Set {
1089 values: values.into_iter().map(Into::into).collect(),
1090 },
1091 )
1092 }
1093
1094 fn create_metric(name: Option<String>, value: MetricValue) -> (String, Event) {
1095 create_metric_with_tags(name, value, Some(metric_tags!("some_tag" => "some_value")))
1096 }
1097
1098 fn create_metric_with_tags(
1099 name: Option<String>,
1100 value: MetricValue,
1101 tags: Option<MetricTags>,
1102 ) -> (String, Event) {
1103 let name = name.unwrap_or_else(|| format!("vector_set_{}", random_string(16)));
1104 let event = Metric::new(name.clone(), MetricKind::Incremental, value)
1105 .with_tags(tags)
1106 .into();
1107 (name, event)
1108 }
1109
1110 #[tokio::test]
1111 async fn sink_absolute() {
1112 let (_guard, address) = next_addr();
1113 let config = PrometheusExporterConfig {
1114 address,
1115 tls: None,
1116 ..Default::default()
1117 };
1118
1119 let sink = PrometheusExporter::new(config);
1120
1121 let m1 = Metric::new(
1122 "absolute",
1123 MetricKind::Absolute,
1124 MetricValue::Counter { value: 32. },
1125 )
1126 .with_tags(Some(metric_tags!("tag1" => "value1")));
1127
1128 let m2 = m1.clone().with_tags(Some(metric_tags!("tag1" => "value2")));
1129
1130 let events = vec![
1131 Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 32. })),
1132 Event::Metric(m2.clone().with_value(MetricValue::Counter { value: 33. })),
1133 Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 40. })),
1134 ];
1135
1136 let metrics_handle = Arc::clone(&sink.metrics);
1137
1138 let sink = VectorSink::from_event_streamsink(sink);
1139 let input_events = stream::iter(events).map(Into::into);
1140 sink.run(input_events).await.unwrap();
1141
1142 let metrics_after = metrics_handle.read().unwrap();
1143
1144 let expected_m1 = metrics_after
1145 .get(&MetricRef::from_metric(&m1))
1146 .expect("m1 should exist");
1147 let expected_m1_value = MetricValue::Counter { value: 40. };
1148 assert_eq!(expected_m1.0.value(), &expected_m1_value);
1149
1150 let expected_m2 = metrics_after
1151 .get(&MetricRef::from_metric(&m2))
1152 .expect("m2 should exist");
1153 let expected_m2_value = MetricValue::Counter { value: 33. };
1154 assert_eq!(expected_m2.0.value(), &expected_m2_value);
1155 }
1156
1157 #[tokio::test]
1158 async fn sink_distributions_as_histograms() {
1159 let (_guard, address) = next_addr();
1167 let config = PrometheusExporterConfig {
1168 address,
1169 tls: None,
1170 ..Default::default()
1171 };
1172 let buckets = config.buckets.clone();
1173
1174 let sink = PrometheusExporter::new(config);
1175
1176 let base_summary_metric = Metric::new(
1178 "distrib_summary",
1179 MetricKind::Incremental,
1180 MetricValue::Distribution {
1181 statistic: StatisticKind::Summary,
1182 samples: samples!(1.0 => 1, 3.0 => 2),
1183 },
1184 );
1185
1186 let base_histogram_metric = Metric::new(
1187 "distrib_histo",
1188 MetricKind::Incremental,
1189 MetricValue::Distribution {
1190 statistic: StatisticKind::Histogram,
1191 samples: samples!(7.0 => 1, 9.0 => 2),
1192 },
1193 );
1194
1195 let metrics = [
1196 base_summary_metric.clone(),
1197 base_summary_metric
1198 .clone()
1199 .with_value(MetricValue::Distribution {
1200 statistic: StatisticKind::Summary,
1201 samples: samples!(1.0 => 2, 2.9 => 1),
1202 }),
1203 base_summary_metric
1204 .clone()
1205 .with_value(MetricValue::Distribution {
1206 statistic: StatisticKind::Summary,
1207 samples: samples!(1.0 => 4, 3.2 => 1),
1208 }),
1209 base_histogram_metric.clone(),
1210 base_histogram_metric
1211 .clone()
1212 .with_value(MetricValue::Distribution {
1213 statistic: StatisticKind::Histogram,
1214 samples: samples!(7.0 => 2, 9.9 => 1),
1215 }),
1216 base_histogram_metric
1217 .clone()
1218 .with_value(MetricValue::Distribution {
1219 statistic: StatisticKind::Histogram,
1220 samples: samples!(7.0 => 4, 10.2 => 1),
1221 }),
1222 ];
1223
1224 let mut merged_summary = base_summary_metric.clone();
1226 assert!(merged_summary.update(&metrics[1]));
1227 assert!(merged_summary.update(&metrics[2]));
1228 let expected_summary = distribution_to_agg_histogram(merged_summary, &buckets)
1229 .expect("input summary metric should have been distribution")
1230 .into_absolute();
1231
1232 let mut merged_histogram = base_histogram_metric.clone();
1233 assert!(merged_histogram.update(&metrics[4]));
1234 assert!(merged_histogram.update(&metrics[5]));
1235 let expected_histogram = distribution_to_agg_histogram(merged_histogram, &buckets)
1236 .expect("input histogram metric should have been distribution")
1237 .into_absolute();
1238
1239 let metrics_handle = Arc::clone(&sink.metrics);
1244
1245 let events = metrics
1246 .iter()
1247 .cloned()
1248 .map(Event::Metric)
1249 .collect::<Vec<_>>();
1250
1251 let sink = VectorSink::from_event_streamsink(sink);
1252 let input_events = stream::iter(events).map(Into::into);
1253 sink.run(input_events).await.unwrap();
1254
1255 let metrics_after = metrics_handle.read().unwrap();
1256
1257 assert_eq!(metrics_after.len(), 2);
1259
1260 let actual_summary = metrics_after
1261 .get(&MetricRef::from_metric(&expected_summary))
1262 .expect("summary metric should exist");
1263 assert_eq!(actual_summary.0.value(), expected_summary.value());
1264
1265 let actual_histogram = metrics_after
1266 .get(&MetricRef::from_metric(&expected_histogram))
1267 .expect("histogram metric should exist");
1268 assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1269 }
1270
1271 #[tokio::test]
1272 async fn sink_distributions_as_summaries() {
1273 let (_guard, address) = next_addr();
1287 let config = PrometheusExporterConfig {
1288 address,
1289 tls: None,
1290 distributions_as_summaries: true,
1291 ..Default::default()
1292 };
1293
1294 let sink = PrometheusExporter::new(config);
1295
1296 let base_summary_metric = Metric::new(
1298 "distrib_summary",
1299 MetricKind::Incremental,
1300 MetricValue::Distribution {
1301 statistic: StatisticKind::Summary,
1302 samples: samples!(1.0 => 1, 3.0 => 2),
1303 },
1304 );
1305
1306 let base_histogram_metric = Metric::new(
1307 "distrib_histo",
1308 MetricKind::Incremental,
1309 MetricValue::Distribution {
1310 statistic: StatisticKind::Histogram,
1311 samples: samples!(7.0 => 1, 9.0 => 2),
1312 },
1313 );
1314
1315 let metrics = [
1316 base_summary_metric.clone(),
1317 base_summary_metric
1318 .clone()
1319 .with_value(MetricValue::Distribution {
1320 statistic: StatisticKind::Summary,
1321 samples: samples!(1.0 => 2, 2.9 => 1),
1322 }),
1323 base_summary_metric
1324 .clone()
1325 .with_value(MetricValue::Distribution {
1326 statistic: StatisticKind::Summary,
1327 samples: samples!(1.0 => 4, 3.2 => 1),
1328 }),
1329 base_histogram_metric.clone(),
1330 base_histogram_metric
1331 .clone()
1332 .with_value(MetricValue::Distribution {
1333 statistic: StatisticKind::Histogram,
1334 samples: samples!(7.0 => 2, 9.9 => 1),
1335 }),
1336 base_histogram_metric
1337 .clone()
1338 .with_value(MetricValue::Distribution {
1339 statistic: StatisticKind::Histogram,
1340 samples: samples!(7.0 => 4, 10.2 => 1),
1341 }),
1342 ];
1343
1344 let mut merged_summary = base_summary_metric.clone();
1346 assert!(merged_summary.update(&metrics[1]));
1347 assert!(merged_summary.update(&metrics[2]));
1348 let expected_summary = distribution_to_ddsketch(merged_summary)
1349 .expect("input summary metric should have been distribution")
1350 .into_absolute();
1351
1352 let mut merged_histogram = base_histogram_metric.clone();
1353 assert!(merged_histogram.update(&metrics[4]));
1354 assert!(merged_histogram.update(&metrics[5]));
1355 let expected_histogram = distribution_to_ddsketch(merged_histogram)
1356 .expect("input histogram metric should have been distribution")
1357 .into_absolute();
1358
1359 let metrics_handle = Arc::clone(&sink.metrics);
1361
1362 let events = metrics
1363 .iter()
1364 .cloned()
1365 .map(Event::Metric)
1366 .collect::<Vec<_>>();
1367
1368 let sink = VectorSink::from_event_streamsink(sink);
1369 let input_events = stream::iter(events).map(Into::into);
1370 sink.run(input_events).await.unwrap();
1371
1372 let metrics_after = metrics_handle.read().unwrap();
1373
1374 assert_eq!(metrics_after.len(), 2);
1376
1377 let actual_summary = metrics_after
1378 .get(&MetricRef::from_metric(&expected_summary))
1379 .expect("summary metric should exist");
1380 assert_eq!(actual_summary.0.value(), expected_summary.value());
1381
1382 let actual_histogram = metrics_after
1383 .get(&MetricRef::from_metric(&expected_histogram))
1384 .expect("histogram metric should exist");
1385 assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1386 }
1387
1388 #[tokio::test]
1389 async fn sink_gauge_incremental_absolute_mix() {
1390 let (_guard, address) = next_addr();
1397 let config = PrometheusExporterConfig {
1398 address,
1399 tls: None,
1400 ..Default::default()
1401 };
1402
1403 let sink = PrometheusExporter::new(config);
1404
1405 let base_absolute_gauge_metric = Metric::new(
1406 "gauge",
1407 MetricKind::Absolute,
1408 MetricValue::Gauge { value: 100.0 },
1409 );
1410
1411 let base_incremental_gauge_metric = Metric::new(
1412 "gauge",
1413 MetricKind::Incremental,
1414 MetricValue::Gauge { value: -10.0 },
1415 );
1416
1417 let metrics = [
1418 base_absolute_gauge_metric.clone(),
1419 base_absolute_gauge_metric
1420 .clone()
1421 .with_value(MetricValue::Gauge { value: 333.0 }),
1422 base_incremental_gauge_metric.clone(),
1423 base_incremental_gauge_metric
1424 .clone()
1425 .with_value(MetricValue::Gauge { value: 4.0 }),
1426 ];
1427
1428 let metrics_handle = Arc::clone(&sink.metrics);
1430
1431 let events = metrics
1432 .iter()
1433 .cloned()
1434 .map(Event::Metric)
1435 .collect::<Vec<_>>();
1436
1437 let sink = VectorSink::from_event_streamsink(sink);
1438 let input_events = stream::iter(events).map(Into::into);
1439 sink.run(input_events).await.unwrap();
1440
1441 let metrics_after = metrics_handle.read().unwrap();
1442
1443 assert_eq!(metrics_after.len(), 1);
1445
1446 let expected_gauge = Metric::new(
1447 "gauge",
1448 MetricKind::Absolute,
1449 MetricValue::Gauge { value: 327.0 },
1450 );
1451
1452 let actual_gauge = metrics_after
1453 .get(&MetricRef::from_metric(&expected_gauge))
1454 .expect("gauge metric should exist");
1455 assert_eq!(actual_gauge.0.value(), expected_gauge.value());
1456 }
1457}
1458
1459#[cfg(all(test, feature = "prometheus-integration-tests"))]
1460mod integration_tests {
1461 #![allow(clippy::print_stdout)] #![allow(clippy::print_stderr)] #![allow(clippy::dbg_macro)] use chrono::Utc;
1466 use futures::{future::ready, stream};
1467 use serde_json::Value;
1468 use tokio::{sync::mpsc, time};
1469 use tokio_stream::wrappers::UnboundedReceiverStream;
1470
1471 use super::*;
1472 use crate::{
1473 config::ProxyConfig,
1474 http::HttpClient,
1475 test_util::{
1476 components::{SINK_TAGS, run_and_assert_sink_compliance},
1477 trace_init,
1478 },
1479 };
1480
1481 fn sink_exporter_address() -> String {
1482 std::env::var("SINK_EXPORTER_ADDRESS").unwrap_or_else(|_| "127.0.0.1:9101".into())
1483 }
1484
1485 fn prometheus_address() -> String {
1486 std::env::var("PROMETHEUS_ADDRESS").unwrap_or_else(|_| "localhost:9090".into())
1487 }
1488
1489 async fn fetch_exporter_body() -> String {
1490 let url = format!("http://{}/metrics", sink_exporter_address());
1491 let request = Request::get(url)
1492 .body(Body::empty())
1493 .expect("Error creating request.");
1494 let proxy = ProxyConfig::default();
1495 let result = HttpClient::new(None, &proxy)
1496 .unwrap()
1497 .send(request)
1498 .await
1499 .expect("Could not send request");
1500 let result = http_body::Body::collect(result.into_body())
1501 .await
1502 .expect("Error fetching body")
1503 .to_bytes();
1504 String::from_utf8_lossy(&result).to_string()
1505 }
1506
1507 async fn prometheus_query(query: &str) -> Value {
1508 let url = format!(
1509 "http://{}/api/v1/query?query={}",
1510 prometheus_address(),
1511 query
1512 );
1513 let request = Request::post(url)
1514 .body(Body::empty())
1515 .expect("Error creating request.");
1516 let proxy = ProxyConfig::default();
1517 let result = HttpClient::new(None, &proxy)
1518 .unwrap()
1519 .send(request)
1520 .await
1521 .expect("Could not fetch query");
1522 let result = http_body::Body::collect(result.into_body())
1523 .await
1524 .expect("Error fetching body")
1525 .to_bytes();
1526 let result = String::from_utf8_lossy(&result);
1527 serde_json::from_str(result.as_ref()).expect("Invalid JSON from prometheus")
1528 }
1529
1530 #[tokio::test]
1531 async fn prometheus_metrics() {
1532 trace_init();
1533
1534 prometheus_scrapes_metrics().await;
1535 time::sleep(time::Duration::from_millis(500)).await;
1536 reset_on_flush_period().await;
1537 expire_on_flush_period().await;
1538 }
1539
1540 async fn prometheus_scrapes_metrics() {
1541 let start = Utc::now().timestamp();
1542
1543 let config = PrometheusExporterConfig {
1544 address: sink_exporter_address().parse().unwrap(),
1545 flush_period_secs: Duration::from_secs(2),
1546 ..Default::default()
1547 };
1548 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1549 let (name, event) = tests::create_metric_gauge(None, 123.4);
1550 let (_, delayed_event) = tests::create_metric_gauge(Some("delayed".to_string()), 123.4);
1551
1552 run_and_assert_sink_compliance(
1553 sink,
1554 stream::once(ready(event)).chain(stream::once(async move {
1555 time::sleep(time::Duration::from_secs(2)).await;
1557 delayed_event
1558 })),
1559 &SINK_TAGS,
1560 )
1561 .await;
1562
1563 let result = prometheus_query(&name).await;
1565
1566 let data = &result["data"]["result"][0];
1567 assert_eq!(data["metric"]["__name__"], Value::String(name));
1568 assert_eq!(
1569 data["metric"]["instance"],
1570 Value::String(sink_exporter_address())
1571 );
1572 assert_eq!(
1573 data["metric"]["some_tag"],
1574 Value::String("some_value".into())
1575 );
1576 assert!(data["value"][0].as_f64().unwrap() >= start as f64);
1577 assert_eq!(data["value"][1], Value::String("123.4".into()));
1578 }
1579
1580 async fn reset_on_flush_period() {
1581 let config = PrometheusExporterConfig {
1582 address: sink_exporter_address().parse().unwrap(),
1583 flush_period_secs: Duration::from_secs(3),
1584 ..Default::default()
1585 };
1586 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1587 let (tx, rx) = mpsc::unbounded_channel();
1588 let input_events = UnboundedReceiverStream::new(rx);
1589
1590 let input_events = input_events.map(Into::into);
1591 let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1592
1593 let (name1, event) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1595 tx.send(event).expect("Failed to send.");
1596 let (name2, event) = tests::create_metric_set(None, vec!["3", "4", "5"]);
1597 tx.send(event).expect("Failed to send.");
1598
1599 time::sleep(time::Duration::from_secs(2)).await;
1602
1603 let result = prometheus_query(&name1).await;
1605 assert_eq!(
1606 result["data"]["result"][0]["value"][1],
1607 Value::String("3".into())
1608 );
1609 let result = prometheus_query(&name2).await;
1610 assert_eq!(
1611 result["data"]["result"][0]["value"][1],
1612 Value::String("3".into())
1613 );
1614
1615 time::sleep(time::Duration::from_secs(3)).await;
1619
1620 let (name2, event) = tests::create_metric_set(Some(name2), vec!["8", "9"]);
1621 tx.send(event).expect("Failed to send.");
1622
1623 time::sleep(time::Duration::from_secs(2)).await;
1625 let result = prometheus_query(&name1).await;
1626 assert_eq!(result["data"]["result"][0]["value"][1], Value::Null);
1627 let result = prometheus_query(&name2).await;
1628 assert_eq!(
1629 result["data"]["result"][0]["value"][1],
1630 Value::String("2".into())
1631 );
1632
1633 drop(tx);
1634 sink_handle.await.unwrap();
1635 }
1636
1637 async fn expire_on_flush_period() {
1638 let config = PrometheusExporterConfig {
1639 address: sink_exporter_address().parse().unwrap(),
1640 flush_period_secs: Duration::from_secs(3),
1641 ..Default::default()
1642 };
1643 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1644 let (tx, rx) = mpsc::unbounded_channel();
1645 let input_events = UnboundedReceiverStream::new(rx);
1646
1647 let input_events = input_events.map(Into::into);
1648 let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1649
1650 let (name1, event) = tests::create_metric_set(None, vec!["42"]);
1652 tx.send(event).expect("Failed to send.");
1653 let (name2, event) = tests::create_metric_gauge(None, 100.0);
1654 tx.send(event).expect("Failed to send.");
1655
1656 time::sleep(time::Duration::from_secs(1)).await;
1658
1659 let body = fetch_exporter_body().await;
1661 assert!(body.contains(&name1));
1662 assert!(body.contains(&name2));
1663
1664 for _ in 0..7 {
1666 let (_, event) = tests::create_metric_set(Some(name1.clone()), vec!["43"]);
1668 tx.send(event).expect("Failed to send.");
1669
1670 time::sleep(time::Duration::from_secs(1)).await;
1672 }
1673
1674 let body = fetch_exporter_body().await;
1676 assert!(body.contains(&name1));
1677 assert!(!body.contains(&name2));
1678
1679 drop(tx);
1680 sink_handle.await.unwrap();
1681 }
1682}