1use std::{
2 convert::Infallible,
3 hash::Hash,
4 mem::{discriminant, Discriminant},
5 net::{IpAddr, Ipv4Addr, SocketAddr},
6 sync::{Arc, RwLock},
7 time::{Duration, Instant},
8};
9
10use async_trait::async_trait;
11use base64::prelude::{Engine as _, BASE64_STANDARD};
12use futures::{future, stream::BoxStream, FutureExt, StreamExt};
13use hyper::{
14 body::HttpBody,
15 header::HeaderValue,
16 service::{make_service_fn, service_fn},
17 Body, Method, Request, Response, Server, StatusCode,
18};
19use indexmap::{map::Entry, IndexMap};
20use serde_with::serde_as;
21use snafu::Snafu;
22use stream_cancel::{Trigger, Tripwire};
23use tower::ServiceBuilder;
24use tower_http::compression::CompressionLayer;
25use tracing::{Instrument, Span};
26use vector_lib::configurable::configurable_component;
27use vector_lib::{
28 internal_event::{
29 ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle as _, Output, Protocol,
30 Registered,
31 },
32 ByteSizeOf, EstimatedJsonEncodedSizeOf,
33};
34
35use super::collector::{MetricCollector, StringCollector};
36use crate::{
37 config::{AcknowledgementsConfig, GenerateConfig, Input, Resource, SinkConfig, SinkContext},
38 event::{
39 metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue},
40 Event, EventStatus, Finalizable,
41 },
42 http::{build_http_trace_layer, Auth},
43 internal_events::PrometheusNormalizationError,
44 sinks::{
45 util::{statistic::validate_quantiles, StreamSink},
46 Healthcheck, VectorSink,
47 },
48 tls::{MaybeTlsSettings, TlsEnableableConfig},
49};
50
51const MIN_FLUSH_PERIOD_SECS: u64 = 1;
52
53const LOCK_FAILED: &str = "Prometheus exporter data lock is poisoned";
54
55#[derive(Debug, Snafu)]
56enum BuildError {
57 #[snafu(display("Flush period for sets must be greater or equal to {} secs", min))]
58 FlushPeriodTooShort { min: u64 },
59}
60
61#[serde_as]
63#[configurable_component(sink(
64 "prometheus_exporter",
65 "Expose metric events on a Prometheus compatible endpoint."
66))]
67#[derive(Clone, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct PrometheusExporterConfig {
70 #[serde(alias = "namespace")]
79 #[configurable(metadata(docs::advanced))]
80 pub default_namespace: Option<String>,
81
82 #[serde(default = "default_address")]
86 #[configurable(metadata(docs::examples = "192.160.0.10:9598"))]
87 pub address: SocketAddr,
88
89 #[configurable(derived)]
90 pub auth: Option<Auth>,
91
92 #[configurable(derived)]
93 pub tls: Option<TlsEnableableConfig>,
94
95 #[serde(default = "super::default_histogram_buckets")]
99 #[configurable(metadata(docs::advanced))]
100 pub buckets: Vec<f64>,
101
102 #[serde(default = "super::default_summary_quantiles")]
106 #[configurable(metadata(docs::advanced))]
107 pub quantiles: Vec<f64>,
108
109 #[serde(default = "default_distributions_as_summaries")]
119 #[configurable(metadata(docs::advanced))]
120 pub distributions_as_summaries: bool,
121
122 #[serde(default = "default_flush_period_secs")]
129 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
130 #[configurable(metadata(docs::advanced))]
131 #[configurable(metadata(docs::human_name = "Flush Interval"))]
132 pub flush_period_secs: Duration,
133
134 #[serde(default)]
140 #[configurable(metadata(docs::advanced))]
141 pub suppress_timestamp: bool,
142
143 #[configurable(derived)]
144 #[serde(
145 default,
146 deserialize_with = "crate::serde::bool_or_struct",
147 skip_serializing_if = "crate::serde::is_default"
148 )]
149 pub acknowledgements: AcknowledgementsConfig,
150}
151
152impl Default for PrometheusExporterConfig {
153 fn default() -> Self {
154 Self {
155 default_namespace: None,
156 address: default_address(),
157 auth: None,
158 tls: None,
159 buckets: super::default_histogram_buckets(),
160 quantiles: super::default_summary_quantiles(),
161 distributions_as_summaries: default_distributions_as_summaries(),
162 flush_period_secs: default_flush_period_secs(),
163 suppress_timestamp: default_suppress_timestamp(),
164 acknowledgements: Default::default(),
165 }
166 }
167}
168
169const fn default_address() -> SocketAddr {
170 SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9598)
171}
172
173const fn default_distributions_as_summaries() -> bool {
174 false
175}
176
177const fn default_flush_period_secs() -> Duration {
178 Duration::from_secs(60)
179}
180
181const fn default_suppress_timestamp() -> bool {
182 false
183}
184
185impl GenerateConfig for PrometheusExporterConfig {
186 fn generate_config() -> toml::Value {
187 toml::Value::try_from(Self::default()).unwrap()
188 }
189}
190
191#[async_trait::async_trait]
192#[typetag::serde(name = "prometheus_exporter")]
193impl SinkConfig for PrometheusExporterConfig {
194 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
195 if self.flush_period_secs.as_secs() < MIN_FLUSH_PERIOD_SECS {
196 return Err(Box::new(BuildError::FlushPeriodTooShort {
197 min: MIN_FLUSH_PERIOD_SECS,
198 }));
199 }
200
201 validate_quantiles(&self.quantiles)?;
202
203 let sink = PrometheusExporter::new(self.clone());
204 let healthcheck = future::ok(()).boxed();
205
206 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
207 }
208
209 fn input(&self) -> Input {
210 Input::metric()
211 }
212
213 fn resources(&self) -> Vec<Resource> {
214 vec![Resource::tcp(self.address)]
215 }
216
217 fn acknowledgements(&self) -> &AcknowledgementsConfig {
218 &self.acknowledgements
219 }
220}
221
222struct PrometheusExporter {
223 server_shutdown_trigger: Option<Trigger>,
224 config: PrometheusExporterConfig,
225 metrics: Arc<RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>>,
226}
227
228#[derive(Clone, Copy, Debug)]
230struct MetricMetadata {
231 expiration_window: Duration,
232 expires_at: Instant,
233}
234
235impl MetricMetadata {
236 pub fn new(expiration_window: Duration) -> Self {
237 Self {
238 expiration_window,
239 expires_at: Instant::now() + expiration_window,
240 }
241 }
242
243 pub fn refresh(&mut self) {
245 self.expires_at = Instant::now() + self.expiration_window;
246 }
247
248 pub fn has_expired(&self, now: Instant) -> bool {
250 now >= self.expires_at
251 }
252}
253
254#[derive(Clone, Debug)]
263struct MetricRef {
264 series: MetricSeries,
265 value: Discriminant<MetricValue>,
266 bounds: Option<Vec<f64>>,
267}
268
269impl MetricRef {
270 pub fn from_metric(metric: &Metric) -> Self {
272 let bounds = match metric.value() {
274 MetricValue::AggregatedHistogram { buckets, .. } => {
275 Some(buckets.iter().map(|b| b.upper_limit).collect())
276 }
277 MetricValue::AggregatedSummary { quantiles, .. } => {
278 Some(quantiles.iter().map(|q| q.quantile).collect())
279 }
280 _ => None,
281 };
282
283 Self {
284 series: metric.series().clone(),
285 value: discriminant(metric.value()),
286 bounds,
287 }
288 }
289}
290
291impl PartialEq for MetricRef {
292 fn eq(&self, other: &Self) -> bool {
293 self.series == other.series && self.value == other.value && self.bounds == other.bounds
294 }
295}
296
297impl Eq for MetricRef {}
298
299impl Hash for MetricRef {
300 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
301 self.series.hash(state);
302 self.value.hash(state);
303 if let Some(bounds) = &self.bounds {
304 for bound in bounds {
305 bound.to_bits().hash(state);
306 }
307 }
308 }
309}
310
311fn authorized<T: HttpBody>(req: &Request<T>, auth: &Option<Auth>) -> bool {
312 if let Some(auth) = auth {
313 let headers = req.headers();
314 if let Some(auth_header) = headers.get(hyper::header::AUTHORIZATION) {
315 let encoded_credentials = match auth {
316 Auth::Basic { user, password } => Some(HeaderValue::from_str(
317 format!(
318 "Basic {}",
319 BASE64_STANDARD.encode(format!("{}:{}", user, password.inner()))
320 )
321 .as_str(),
322 )),
323 Auth::Bearer { token } => Some(HeaderValue::from_str(
324 format!("Bearer {}", token.inner()).as_str(),
325 )),
326 #[cfg(feature = "aws-core")]
327 _ => None,
328 };
329
330 if let Some(Ok(encoded_credentials)) = encoded_credentials {
331 if auth_header == encoded_credentials {
332 return true;
333 }
334 }
335 }
336 } else {
337 return true;
338 }
339
340 false
341}
342
343#[derive(Clone)]
344struct Handler {
345 auth: Option<Auth>,
346 default_namespace: Option<String>,
347 buckets: Box<[f64]>,
348 quantiles: Box<[f64]>,
349 bytes_sent: Registered<BytesSent>,
350 events_sent: Registered<EventsSent>,
351}
352
353impl Handler {
354 fn handle<T: HttpBody>(
355 &self,
356 req: Request<T>,
357 metrics: &RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>,
358 ) -> Response<Body> {
359 let mut response = Response::new(Body::empty());
360
361 match (authorized(&req, &self.auth), req.method(), req.uri().path()) {
362 (false, _, _) => {
363 *response.status_mut() = StatusCode::UNAUTHORIZED;
364 response.headers_mut().insert(
365 http::header::WWW_AUTHENTICATE,
366 HeaderValue::from_static("Basic, Bearer"),
367 );
368 }
369
370 (true, &Method::GET, "/metrics") => {
371 let metrics = metrics.read().expect(LOCK_FAILED);
372
373 let count = metrics.len();
374 let byte_size = metrics
375 .iter()
376 .map(|(_, (metric, _))| metric.estimated_json_encoded_size_of())
377 .sum();
378
379 let mut collector = StringCollector::new();
380
381 for (_, (metric, _)) in metrics.iter() {
382 collector.encode_metric(
383 self.default_namespace.as_deref(),
384 &self.buckets,
385 &self.quantiles,
386 metric,
387 );
388 }
389
390 drop(metrics);
391
392 let body = collector.finish();
393 let body_size = body.size_of();
394
395 *response.body_mut() = body.into();
396
397 response.headers_mut().insert(
398 "Content-Type",
399 HeaderValue::from_static("text/plain; version=0.0.4"),
400 );
401
402 self.events_sent.emit(CountByteSize(count, byte_size));
403 self.bytes_sent.emit(ByteSize(body_size));
404 }
405
406 (true, _, _) => {
407 *response.status_mut() = StatusCode::NOT_FOUND;
408 }
409 }
410
411 response
412 }
413}
414
415impl PrometheusExporter {
416 fn new(config: PrometheusExporterConfig) -> Self {
417 Self {
418 server_shutdown_trigger: None,
419 config,
420 metrics: Arc::new(RwLock::new(IndexMap::new())),
421 }
422 }
423
424 async fn start_server_if_needed(&mut self) -> crate::Result<()> {
425 if self.server_shutdown_trigger.is_some() {
426 return Ok(());
427 }
428
429 let handler = Handler {
430 bytes_sent: register!(BytesSent::from(Protocol::HTTP)),
431 events_sent: register!(EventsSent::from(Output(None))),
432 default_namespace: self.config.default_namespace.clone(),
433 buckets: self.config.buckets.clone().into(),
434 quantiles: self.config.quantiles.clone().into(),
435 auth: self.config.auth.clone(),
436 };
437
438 let span = Span::current();
439 let metrics = Arc::clone(&self.metrics);
440
441 let new_service = make_service_fn(move |_| {
442 let span = Span::current();
443 let metrics = Arc::clone(&metrics);
444 let handler = handler.clone();
445
446 let inner = service_fn(move |req| {
447 let response = handler.handle(req, &metrics);
448
449 future::ok::<_, Infallible>(response)
450 });
451
452 let service = ServiceBuilder::new()
453 .layer(build_http_trace_layer(span.clone()))
454 .layer(CompressionLayer::new())
455 .service(inner);
456
457 async move { Ok::<_, Infallible>(service) }
458 });
459
460 let (trigger, tripwire) = Tripwire::new();
461
462 let tls = self.config.tls.clone();
463 let address = self.config.address;
464
465 let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?;
466 let listener = tls.bind(&address).await?;
467
468 tokio::spawn(async move {
469 info!(message = "Building HTTP server.", address = %address);
470
471 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
472 .serve(new_service)
473 .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler))
474 .instrument(span)
475 .await
476 .map_err(|error| error!("Server error: {}.", error))?;
477
478 Ok::<(), ()>(())
479 });
480
481 self.server_shutdown_trigger = Some(trigger);
482 Ok(())
483 }
484
485 fn normalize(&mut self, metric: Metric) -> Option<Metric> {
486 let new_metric = match metric.value() {
487 MetricValue::Distribution { .. } => {
488 let (series, data, metadata) = metric.into_parts();
490 let (time, kind, value) = data.into_parts();
491
492 let new_value = if self.config.distributions_as_summaries {
493 value
498 .distribution_to_sketch()
499 .expect("value should be distribution already")
500 } else {
501 value
502 .distribution_to_agg_histogram(&self.config.buckets)
503 .expect("value should be distribution already")
504 };
505
506 let data = MetricData::from_parts(time, kind, new_value);
507 Metric::from_parts(series, data, metadata)
508 }
509 _ => metric,
510 };
511
512 match new_metric.kind() {
513 MetricKind::Absolute => Some(new_metric),
514 MetricKind::Incremental => {
515 let metrics = self.metrics.read().expect(LOCK_FAILED);
516 let metric_ref = MetricRef::from_metric(&new_metric);
517
518 if let Some(existing) = metrics.get(&metric_ref) {
519 let mut current = existing.0.value().clone();
520 if current.add(new_metric.value()) {
521 return Some(new_metric.with_value(current).into_absolute());
524 }
525 }
526
527 Some(new_metric.into_absolute())
530 }
531 }
532 }
533}
534
535#[async_trait]
536impl StreamSink<Event> for PrometheusExporter {
537 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
538 self.start_server_if_needed()
539 .await
540 .map_err(|error| error!("Failed to start Prometheus exporter: {}.", error))?;
541
542 let mut last_flush = Instant::now();
543 let flush_period = self.config.flush_period_secs;
544
545 while let Some(event) = input.next().await {
546 if last_flush.elapsed() > self.config.flush_period_secs {
555 last_flush = Instant::now();
556
557 let mut metrics = self.metrics.write().expect(LOCK_FAILED);
558
559 metrics.retain(|_metric_ref, (_, metadata)| !metadata.has_expired(last_flush));
560 }
561
562 let mut metric = event.into_metric();
564 let finalizers = metric.take_finalizers();
565
566 match self.normalize(metric) {
567 Some(normalized) => {
568 let normalized = if self.config.suppress_timestamp {
569 normalized.with_timestamp(None)
570 } else {
571 normalized
572 };
573
574 let mut metrics = self.metrics.write().expect(LOCK_FAILED);
577
578 match metrics.entry(MetricRef::from_metric(&normalized)) {
579 Entry::Occupied(mut entry) => {
580 let (data, metadata) = entry.get_mut();
581 *data = normalized;
582 metadata.refresh();
583 }
584 Entry::Vacant(entry) => {
585 entry.insert((normalized, MetricMetadata::new(flush_period)));
586 }
587 }
588 finalizers.update_status(EventStatus::Delivered);
589 }
590 _ => {
591 emit!(PrometheusNormalizationError {});
592 finalizers.update_status(EventStatus::Errored);
593 }
594 }
595 }
596
597 Ok(())
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use chrono::{Duration, Utc};
604 use flate2::read::GzDecoder;
605 use futures::stream;
606 use indoc::indoc;
607 use similar_asserts::assert_eq;
608 use std::io::Read;
609 use tokio::{sync::oneshot::error::TryRecvError, time};
610 use vector_lib::{
611 event::{MetricTags, StatisticKind},
612 finalization::{BatchNotifier, BatchStatus},
613 metric_tags, samples,
614 sensitive_string::SensitiveString,
615 };
616
617 use super::*;
618 use crate::{
619 config::ProxyConfig,
620 event::metric::{Metric, MetricValue},
621 http::HttpClient,
622 sinks::prometheus::{distribution_to_agg_histogram, distribution_to_ddsketch},
623 test_util::{
624 components::{run_and_assert_sink_compliance, SINK_TAGS},
625 next_addr, random_string, trace_init,
626 },
627 tls::MaybeTlsSettings,
628 };
629
630 #[test]
631 fn generate_config() {
632 crate::test_util::test_generate_config::<PrometheusExporterConfig>();
633 }
634
635 #[tokio::test]
636 async fn prometheus_notls() {
637 export_and_fetch_simple(None).await;
638 }
639
640 #[tokio::test]
641 async fn prometheus_tls() {
642 let mut tls_config = TlsEnableableConfig::test_config();
643 tls_config.options.verify_hostname = Some(false);
644 export_and_fetch_simple(Some(tls_config)).await;
645 }
646
647 #[tokio::test]
648 async fn prometheus_noauth() {
649 let (name1, event1) = create_metric_gauge(None, 123.4);
650 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
651 let events = vec![event1, event2];
652
653 let response_result = export_and_fetch_with_auth(None, None, events, false).await;
654
655 assert!(response_result.is_ok());
656
657 let body = response_result.expect("Cannot extract body from the response");
658
659 assert!(body.contains(&format!(
660 indoc! {r#"
661 # HELP {name} {name}
662 # TYPE {name} gauge
663 {name}{{some_tag="some_value"}} 123.4
664 "#},
665 name = name1
666 )));
667 assert!(body.contains(&format!(
668 indoc! {r#"
669 # HELP {name} {name}
670 # TYPE {name} gauge
671 {name}{{some_tag="some_value"}} 3
672 "#},
673 name = name2
674 )));
675 }
676
677 #[tokio::test]
678 async fn prometheus_successful_basic_auth() {
679 let (name1, event1) = create_metric_gauge(None, 123.4);
680 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
681 let events = vec![event1, event2];
682
683 let auth_config = Auth::Basic {
684 user: "user".to_string(),
685 password: SensitiveString::from("password".to_string()),
686 };
687
688 let response_result =
689 export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
690 .await;
691
692 assert!(response_result.is_ok());
693
694 let body = response_result.expect("Cannot extract body from the response");
695
696 assert!(body.contains(&format!(
697 indoc! {r#"
698 # HELP {name} {name}
699 # TYPE {name} gauge
700 {name}{{some_tag="some_value"}} 123.4
701 "#},
702 name = name1
703 )));
704 assert!(body.contains(&format!(
705 indoc! {r#"
706 # HELP {name} {name}
707 # TYPE {name} gauge
708 {name}{{some_tag="some_value"}} 3
709 "#},
710 name = name2
711 )));
712 }
713
714 #[tokio::test]
715 async fn prometheus_successful_token_auth() {
716 let (name1, event1) = create_metric_gauge(None, 123.4);
717 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
718 let events = vec![event1, event2];
719
720 let auth_config = Auth::Bearer {
721 token: SensitiveString::from("token".to_string()),
722 };
723
724 let response_result =
725 export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
726 .await;
727
728 assert!(response_result.is_ok());
729
730 let body = response_result.expect("Cannot extract body from the response");
731
732 assert!(body.contains(&format!(
733 indoc! {r#"
734 # HELP {name} {name}
735 # TYPE {name} gauge
736 {name}{{some_tag="some_value"}} 123.4
737 "#},
738 name = name1
739 )));
740 assert!(body.contains(&format!(
741 indoc! {r#"
742 # HELP {name} {name}
743 # TYPE {name} gauge
744 {name}{{some_tag="some_value"}} 3
745 "#},
746 name = name2
747 )));
748 }
749
750 #[tokio::test]
751 async fn prometheus_missing_auth() {
752 let (_, event1) = create_metric_gauge(None, 123.4);
753 let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
754 let events = vec![event1, event2];
755
756 let server_auth_config = Auth::Bearer {
757 token: SensitiveString::from("token".to_string()),
758 };
759
760 let response_result =
761 export_and_fetch_with_auth(Some(server_auth_config), None, events, false).await;
762
763 assert!(response_result.is_err());
764 assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
765 }
766
767 #[tokio::test]
768 async fn prometheus_wrong_auth() {
769 let (_, event1) = create_metric_gauge(None, 123.4);
770 let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
771 let events = vec![event1, event2];
772
773 let server_auth_config = Auth::Bearer {
774 token: SensitiveString::from("token".to_string()),
775 };
776
777 let client_auth_config = Auth::Basic {
778 user: "user".to_string(),
779 password: SensitiveString::from("password".to_string()),
780 };
781
782 let response_result = export_and_fetch_with_auth(
783 Some(server_auth_config),
784 Some(client_auth_config),
785 events,
786 false,
787 )
788 .await;
789
790 assert!(response_result.is_err());
791 assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
792 }
793
794 #[tokio::test]
795 async fn encoding_gzip() {
796 let (name1, event1) = create_metric_gauge(None, 123.4);
797 let events = vec![event1];
798
799 let body_raw = export_and_fetch_raw(None, events, false, Some(String::from("gzip"))).await;
800 let expected = format!(
801 indoc! {r#"
802 # HELP {name} {name}
803 # TYPE {name} gauge
804 {name}{{some_tag="some_value"}} 123.4
805 "#},
806 name = name1,
807 );
808
809 let mut gz = GzDecoder::new(&body_raw[..]);
810 let mut body_decoded = String::new();
811 let _ = gz.read_to_string(&mut body_decoded);
812
813 assert!(body_raw.len() < expected.len());
814 assert_eq!(body_decoded, expected);
815 }
816
817 #[tokio::test]
818 async fn updates_timestamps() {
819 let timestamp1 = Utc::now();
820 let (name, event1) = create_metric_gauge(None, 123.4);
821 let event1 = Event::from(event1.into_metric().with_timestamp(Some(timestamp1)));
822 let (_, event2) = create_metric_gauge(Some(name.clone()), 12.0);
823 let timestamp2 = timestamp1 + Duration::seconds(1);
824 let event2 = Event::from(event2.into_metric().with_timestamp(Some(timestamp2)));
825 let events = vec![event1, event2];
826
827 let body = export_and_fetch(None, events, false).await;
828 let timestamp = timestamp2.timestamp_millis();
829 assert_eq!(
830 body,
831 format!(
832 indoc! {r#"
833 # HELP {name} {name}
834 # TYPE {name} gauge
835 {name}{{some_tag="some_value"}} 135.4 {timestamp}
836 "#},
837 name = name,
838 timestamp = timestamp
839 )
840 );
841 }
842
843 #[tokio::test]
844 async fn suppress_timestamp() {
845 let timestamp = Utc::now();
846 let (name, event) = create_metric_gauge(None, 123.4);
847 let event = Event::from(event.into_metric().with_timestamp(Some(timestamp)));
848 let events = vec![event];
849
850 let body = export_and_fetch(None, events, true).await;
851 assert_eq!(
852 body,
853 format!(
854 indoc! {r#"
855 # HELP {name} {name}
856 # TYPE {name} gauge
857 {name}{{some_tag="some_value"}} 123.4
858 "#},
859 name = name,
860 )
861 );
862 }
863
864 #[tokio::test]
869 async fn prometheus_duplicate_labels() {
870 let (name, event) = create_metric_with_tags(
871 None,
872 MetricValue::Gauge { value: 123.4 },
873 Some(metric_tags!("code" => "200", "code" => "success")),
874 );
875 let events = vec![event];
876
877 let response_result = export_and_fetch_with_auth(None, None, events, false).await;
878
879 assert!(response_result.is_ok());
880
881 let body = response_result.expect("Cannot extract body from the response");
882
883 assert!(body.contains(&format!(
884 indoc! {r#"
885 # HELP {name} {name}
886 # TYPE {name} gauge
887 {name}{{code="success"}} 123.4
888 "# },
889 name = name
890 )));
891 }
892
893 async fn export_and_fetch_raw(
894 tls_config: Option<TlsEnableableConfig>,
895 mut events: Vec<Event>,
896 suppress_timestamp: bool,
897 encoding: Option<String>,
898 ) -> hyper::body::Bytes {
899 trace_init();
900
901 let client_settings = MaybeTlsSettings::from_config(tls_config.as_ref(), false).unwrap();
902 let proto = client_settings.http_protocol_name();
903
904 let address = next_addr();
905 let config = PrometheusExporterConfig {
906 address,
907 tls: tls_config,
908 suppress_timestamp,
909 ..Default::default()
910 };
911
912 let mut receiver = BatchNotifier::apply_to(&mut events[..]);
914 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
915
916 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
917 let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
918 let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
919 sink,
920 stream::iter(events).chain(stream::once(async move {
921 time::sleep(time::Duration::from_millis(500)).await;
923 delayed_event
924 })),
925 &SINK_TAGS,
926 ));
927
928 time::sleep(time::Duration::from_millis(100)).await;
929
930 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
932
933 let mut request = Request::get(format!("{proto}://{address}/metrics"))
934 .body(Body::empty())
935 .expect("Error creating request.");
936
937 if let Some(ref encoding) = encoding {
938 request.headers_mut().insert(
939 http::header::ACCEPT_ENCODING,
940 HeaderValue::from_str(encoding.as_str()).unwrap(),
941 );
942 }
943
944 let proxy = ProxyConfig::default();
945 let result = HttpClient::new(client_settings, &proxy)
946 .unwrap()
947 .send(request)
948 .await
949 .expect("Could not fetch query");
950
951 assert!(result.status().is_success());
952
953 if encoding.is_some() {
954 assert!(result
955 .headers()
956 .contains_key(http::header::CONTENT_ENCODING));
957 }
958
959 let body = result.into_body();
960 let bytes = hyper::body::to_bytes(body)
961 .await
962 .expect("Reading body failed");
963
964 sink_handle.await.unwrap();
965
966 bytes
967 }
968
969 async fn export_and_fetch(
970 tls_config: Option<TlsEnableableConfig>,
971 events: Vec<Event>,
972 suppress_timestamp: bool,
973 ) -> String {
974 let bytes = export_and_fetch_raw(tls_config, events, suppress_timestamp, None);
975 String::from_utf8(bytes.await.to_vec()).unwrap()
976 }
977
978 async fn export_and_fetch_with_auth(
979 server_auth_config: Option<Auth>,
980 client_auth_config: Option<Auth>,
981 mut events: Vec<Event>,
982 suppress_timestamp: bool,
983 ) -> Result<String, http::status::StatusCode> {
984 trace_init();
985
986 let client_settings = MaybeTlsSettings::from_config(None, false).unwrap();
987 let proto = client_settings.http_protocol_name();
988
989 let address = next_addr();
990 let config = PrometheusExporterConfig {
991 address,
992 auth: server_auth_config,
993 tls: None,
994 suppress_timestamp,
995 ..Default::default()
996 };
997
998 let mut receiver = BatchNotifier::apply_to(&mut events[..]);
1000 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
1001
1002 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1003 let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
1004 let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
1005 sink,
1006 stream::iter(events).chain(stream::once(async move {
1007 time::sleep(time::Duration::from_millis(500)).await;
1009 delayed_event
1010 })),
1011 &SINK_TAGS,
1012 ));
1013
1014 time::sleep(time::Duration::from_millis(100)).await;
1015
1016 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
1018
1019 let mut request = Request::get(format!("{proto}://{address}/metrics"))
1020 .body(Body::empty())
1021 .expect("Error creating request.");
1022
1023 if let Some(client_auth_config) = client_auth_config {
1024 client_auth_config.apply(&mut request);
1025 }
1026
1027 let proxy = ProxyConfig::default();
1028 let result = HttpClient::new(client_settings, &proxy)
1029 .unwrap()
1030 .send(request)
1031 .await
1032 .expect("Could not fetch query");
1033
1034 if !result.status().is_success() {
1035 return Err(result.status());
1036 }
1037
1038 let body = result.into_body();
1039 let bytes = hyper::body::to_bytes(body)
1040 .await
1041 .expect("Reading body failed");
1042 let result = String::from_utf8(bytes.to_vec()).unwrap();
1043
1044 sink_handle.await.unwrap();
1045
1046 Ok(result)
1047 }
1048
1049 async fn export_and_fetch_simple(tls_config: Option<TlsEnableableConfig>) {
1050 let (name1, event1) = create_metric_gauge(None, 123.4);
1051 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1052 let events = vec![event1, event2];
1053
1054 let body = export_and_fetch(tls_config, events, false).await;
1055
1056 assert!(body.contains(&format!(
1057 indoc! {r#"
1058 # HELP {name} {name}
1059 # TYPE {name} gauge
1060 {name}{{some_tag="some_value"}} 123.4
1061 "#},
1062 name = name1
1063 )));
1064 assert!(body.contains(&format!(
1065 indoc! {r#"
1066 # HELP {name} {name}
1067 # TYPE {name} gauge
1068 {name}{{some_tag="some_value"}} 3
1069 "#},
1070 name = name2
1071 )));
1072 }
1073
1074 pub fn create_metric_gauge(name: Option<String>, value: f64) -> (String, Event) {
1075 create_metric(name, MetricValue::Gauge { value })
1076 }
1077
1078 pub fn create_metric_set(name: Option<String>, values: Vec<&'static str>) -> (String, Event) {
1079 create_metric(
1080 name,
1081 MetricValue::Set {
1082 values: values.into_iter().map(Into::into).collect(),
1083 },
1084 )
1085 }
1086
1087 fn create_metric(name: Option<String>, value: MetricValue) -> (String, Event) {
1088 create_metric_with_tags(name, value, Some(metric_tags!("some_tag" => "some_value")))
1089 }
1090
1091 fn create_metric_with_tags(
1092 name: Option<String>,
1093 value: MetricValue,
1094 tags: Option<MetricTags>,
1095 ) -> (String, Event) {
1096 let name = name.unwrap_or_else(|| format!("vector_set_{}", random_string(16)));
1097 let event = Metric::new(name.clone(), MetricKind::Incremental, value)
1098 .with_tags(tags)
1099 .into();
1100 (name, event)
1101 }
1102
1103 #[tokio::test]
1104 async fn sink_absolute() {
1105 let config = PrometheusExporterConfig {
1106 address: next_addr(), tls: None,
1108 ..Default::default()
1109 };
1110
1111 let sink = PrometheusExporter::new(config);
1112
1113 let m1 = Metric::new(
1114 "absolute",
1115 MetricKind::Absolute,
1116 MetricValue::Counter { value: 32. },
1117 )
1118 .with_tags(Some(metric_tags!("tag1" => "value1")));
1119
1120 let m2 = m1.clone().with_tags(Some(metric_tags!("tag1" => "value2")));
1121
1122 let events = vec![
1123 Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 32. })),
1124 Event::Metric(m2.clone().with_value(MetricValue::Counter { value: 33. })),
1125 Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 40. })),
1126 ];
1127
1128 let metrics_handle = Arc::clone(&sink.metrics);
1129
1130 let sink = VectorSink::from_event_streamsink(sink);
1131 let input_events = stream::iter(events).map(Into::into);
1132 sink.run(input_events).await.unwrap();
1133
1134 let metrics_after = metrics_handle.read().unwrap();
1135
1136 let expected_m1 = metrics_after
1137 .get(&MetricRef::from_metric(&m1))
1138 .expect("m1 should exist");
1139 let expected_m1_value = MetricValue::Counter { value: 40. };
1140 assert_eq!(expected_m1.0.value(), &expected_m1_value);
1141
1142 let expected_m2 = metrics_after
1143 .get(&MetricRef::from_metric(&m2))
1144 .expect("m2 should exist");
1145 let expected_m2_value = MetricValue::Counter { value: 33. };
1146 assert_eq!(expected_m2.0.value(), &expected_m2_value);
1147 }
1148
1149 #[tokio::test]
1150 async fn sink_distributions_as_histograms() {
1151 let config = PrometheusExporterConfig {
1159 address: next_addr(), tls: None,
1161 ..Default::default()
1162 };
1163 let buckets = config.buckets.clone();
1164
1165 let sink = PrometheusExporter::new(config);
1166
1167 let base_summary_metric = Metric::new(
1169 "distrib_summary",
1170 MetricKind::Incremental,
1171 MetricValue::Distribution {
1172 statistic: StatisticKind::Summary,
1173 samples: samples!(1.0 => 1, 3.0 => 2),
1174 },
1175 );
1176
1177 let base_histogram_metric = Metric::new(
1178 "distrib_histo",
1179 MetricKind::Incremental,
1180 MetricValue::Distribution {
1181 statistic: StatisticKind::Histogram,
1182 samples: samples!(7.0 => 1, 9.0 => 2),
1183 },
1184 );
1185
1186 let metrics = vec![
1187 base_summary_metric.clone(),
1188 base_summary_metric
1189 .clone()
1190 .with_value(MetricValue::Distribution {
1191 statistic: StatisticKind::Summary,
1192 samples: samples!(1.0 => 2, 2.9 => 1),
1193 }),
1194 base_summary_metric
1195 .clone()
1196 .with_value(MetricValue::Distribution {
1197 statistic: StatisticKind::Summary,
1198 samples: samples!(1.0 => 4, 3.2 => 1),
1199 }),
1200 base_histogram_metric.clone(),
1201 base_histogram_metric
1202 .clone()
1203 .with_value(MetricValue::Distribution {
1204 statistic: StatisticKind::Histogram,
1205 samples: samples!(7.0 => 2, 9.9 => 1),
1206 }),
1207 base_histogram_metric
1208 .clone()
1209 .with_value(MetricValue::Distribution {
1210 statistic: StatisticKind::Histogram,
1211 samples: samples!(7.0 => 4, 10.2 => 1),
1212 }),
1213 ];
1214
1215 let mut merged_summary = base_summary_metric.clone();
1217 assert!(merged_summary.update(&metrics[1]));
1218 assert!(merged_summary.update(&metrics[2]));
1219 let expected_summary = distribution_to_agg_histogram(merged_summary, &buckets)
1220 .expect("input summary metric should have been distribution")
1221 .into_absolute();
1222
1223 let mut merged_histogram = base_histogram_metric.clone();
1224 assert!(merged_histogram.update(&metrics[4]));
1225 assert!(merged_histogram.update(&metrics[5]));
1226 let expected_histogram = distribution_to_agg_histogram(merged_histogram, &buckets)
1227 .expect("input histogram metric should have been distribution")
1228 .into_absolute();
1229
1230 let metrics_handle = Arc::clone(&sink.metrics);
1235
1236 let events = metrics
1237 .iter()
1238 .cloned()
1239 .map(Event::Metric)
1240 .collect::<Vec<_>>();
1241
1242 let sink = VectorSink::from_event_streamsink(sink);
1243 let input_events = stream::iter(events).map(Into::into);
1244 sink.run(input_events).await.unwrap();
1245
1246 let metrics_after = metrics_handle.read().unwrap();
1247
1248 assert_eq!(metrics_after.len(), 2);
1250
1251 let actual_summary = metrics_after
1252 .get(&MetricRef::from_metric(&expected_summary))
1253 .expect("summary metric should exist");
1254 assert_eq!(actual_summary.0.value(), expected_summary.value());
1255
1256 let actual_histogram = metrics_after
1257 .get(&MetricRef::from_metric(&expected_histogram))
1258 .expect("histogram metric should exist");
1259 assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1260 }
1261
1262 #[tokio::test]
1263 async fn sink_distributions_as_summaries() {
1264 let config = PrometheusExporterConfig {
1278 address: next_addr(), tls: None,
1280 distributions_as_summaries: true,
1281 ..Default::default()
1282 };
1283
1284 let sink = PrometheusExporter::new(config);
1285
1286 let base_summary_metric = Metric::new(
1288 "distrib_summary",
1289 MetricKind::Incremental,
1290 MetricValue::Distribution {
1291 statistic: StatisticKind::Summary,
1292 samples: samples!(1.0 => 1, 3.0 => 2),
1293 },
1294 );
1295
1296 let base_histogram_metric = Metric::new(
1297 "distrib_histo",
1298 MetricKind::Incremental,
1299 MetricValue::Distribution {
1300 statistic: StatisticKind::Histogram,
1301 samples: samples!(7.0 => 1, 9.0 => 2),
1302 },
1303 );
1304
1305 let metrics = vec![
1306 base_summary_metric.clone(),
1307 base_summary_metric
1308 .clone()
1309 .with_value(MetricValue::Distribution {
1310 statistic: StatisticKind::Summary,
1311 samples: samples!(1.0 => 2, 2.9 => 1),
1312 }),
1313 base_summary_metric
1314 .clone()
1315 .with_value(MetricValue::Distribution {
1316 statistic: StatisticKind::Summary,
1317 samples: samples!(1.0 => 4, 3.2 => 1),
1318 }),
1319 base_histogram_metric.clone(),
1320 base_histogram_metric
1321 .clone()
1322 .with_value(MetricValue::Distribution {
1323 statistic: StatisticKind::Histogram,
1324 samples: samples!(7.0 => 2, 9.9 => 1),
1325 }),
1326 base_histogram_metric
1327 .clone()
1328 .with_value(MetricValue::Distribution {
1329 statistic: StatisticKind::Histogram,
1330 samples: samples!(7.0 => 4, 10.2 => 1),
1331 }),
1332 ];
1333
1334 let mut merged_summary = base_summary_metric.clone();
1336 assert!(merged_summary.update(&metrics[1]));
1337 assert!(merged_summary.update(&metrics[2]));
1338 let expected_summary = distribution_to_ddsketch(merged_summary)
1339 .expect("input summary metric should have been distribution")
1340 .into_absolute();
1341
1342 let mut merged_histogram = base_histogram_metric.clone();
1343 assert!(merged_histogram.update(&metrics[4]));
1344 assert!(merged_histogram.update(&metrics[5]));
1345 let expected_histogram = distribution_to_ddsketch(merged_histogram)
1346 .expect("input histogram metric should have been distribution")
1347 .into_absolute();
1348
1349 let metrics_handle = Arc::clone(&sink.metrics);
1351
1352 let events = metrics
1353 .iter()
1354 .cloned()
1355 .map(Event::Metric)
1356 .collect::<Vec<_>>();
1357
1358 let sink = VectorSink::from_event_streamsink(sink);
1359 let input_events = stream::iter(events).map(Into::into);
1360 sink.run(input_events).await.unwrap();
1361
1362 let metrics_after = metrics_handle.read().unwrap();
1363
1364 assert_eq!(metrics_after.len(), 2);
1366
1367 let actual_summary = metrics_after
1368 .get(&MetricRef::from_metric(&expected_summary))
1369 .expect("summary metric should exist");
1370 assert_eq!(actual_summary.0.value(), expected_summary.value());
1371
1372 let actual_histogram = metrics_after
1373 .get(&MetricRef::from_metric(&expected_histogram))
1374 .expect("histogram metric should exist");
1375 assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1376 }
1377
1378 #[tokio::test]
1379 async fn sink_gauge_incremental_absolute_mix() {
1380 let config = PrometheusExporterConfig {
1387 address: next_addr(), tls: None,
1389 ..Default::default()
1390 };
1391
1392 let sink = PrometheusExporter::new(config);
1393
1394 let base_absolute_gauge_metric = Metric::new(
1395 "gauge",
1396 MetricKind::Absolute,
1397 MetricValue::Gauge { value: 100.0 },
1398 );
1399
1400 let base_incremental_gauge_metric = Metric::new(
1401 "gauge",
1402 MetricKind::Incremental,
1403 MetricValue::Gauge { value: -10.0 },
1404 );
1405
1406 let metrics = vec![
1407 base_absolute_gauge_metric.clone(),
1408 base_absolute_gauge_metric
1409 .clone()
1410 .with_value(MetricValue::Gauge { value: 333.0 }),
1411 base_incremental_gauge_metric.clone(),
1412 base_incremental_gauge_metric
1413 .clone()
1414 .with_value(MetricValue::Gauge { value: 4.0 }),
1415 ];
1416
1417 let metrics_handle = Arc::clone(&sink.metrics);
1419
1420 let events = metrics
1421 .iter()
1422 .cloned()
1423 .map(Event::Metric)
1424 .collect::<Vec<_>>();
1425
1426 let sink = VectorSink::from_event_streamsink(sink);
1427 let input_events = stream::iter(events).map(Into::into);
1428 sink.run(input_events).await.unwrap();
1429
1430 let metrics_after = metrics_handle.read().unwrap();
1431
1432 assert_eq!(metrics_after.len(), 1);
1434
1435 let expected_gauge = Metric::new(
1436 "gauge",
1437 MetricKind::Absolute,
1438 MetricValue::Gauge { value: 327.0 },
1439 );
1440
1441 let actual_gauge = metrics_after
1442 .get(&MetricRef::from_metric(&expected_gauge))
1443 .expect("gauge metric should exist");
1444 assert_eq!(actual_gauge.0.value(), expected_gauge.value());
1445 }
1446}
1447
1448#[cfg(all(test, feature = "prometheus-integration-tests"))]
1449mod integration_tests {
1450 #![allow(clippy::print_stdout)] #![allow(clippy::print_stderr)] #![allow(clippy::dbg_macro)] use chrono::Utc;
1455 use futures::{future::ready, stream};
1456 use serde_json::Value;
1457 use tokio::{sync::mpsc, time};
1458 use tokio_stream::wrappers::UnboundedReceiverStream;
1459
1460 use super::*;
1461 use crate::{
1462 config::ProxyConfig,
1463 http::HttpClient,
1464 test_util::{
1465 components::{run_and_assert_sink_compliance, SINK_TAGS},
1466 trace_init,
1467 },
1468 };
1469
1470 fn sink_exporter_address() -> String {
1471 std::env::var("SINK_EXPORTER_ADDRESS").unwrap_or_else(|_| "127.0.0.1:9101".into())
1472 }
1473
1474 fn prometheus_address() -> String {
1475 std::env::var("PROMETHEUS_ADDRESS").unwrap_or_else(|_| "localhost:9090".into())
1476 }
1477
1478 async fn fetch_exporter_body() -> String {
1479 let url = format!("http://{}/metrics", sink_exporter_address());
1480 let request = Request::get(url)
1481 .body(Body::empty())
1482 .expect("Error creating request.");
1483 let proxy = ProxyConfig::default();
1484 let result = HttpClient::new(None, &proxy)
1485 .unwrap()
1486 .send(request)
1487 .await
1488 .expect("Could not send request");
1489 let result = hyper::body::to_bytes(result.into_body())
1490 .await
1491 .expect("Error fetching body");
1492 String::from_utf8_lossy(&result).to_string()
1493 }
1494
1495 async fn prometheus_query(query: &str) -> Value {
1496 let url = format!(
1497 "http://{}/api/v1/query?query={}",
1498 prometheus_address(),
1499 query
1500 );
1501 let request = Request::post(url)
1502 .body(Body::empty())
1503 .expect("Error creating request.");
1504 let proxy = ProxyConfig::default();
1505 let result = HttpClient::new(None, &proxy)
1506 .unwrap()
1507 .send(request)
1508 .await
1509 .expect("Could not fetch query");
1510 let result = hyper::body::to_bytes(result.into_body())
1511 .await
1512 .expect("Error fetching body");
1513 let result = String::from_utf8_lossy(&result);
1514 serde_json::from_str(result.as_ref()).expect("Invalid JSON from prometheus")
1515 }
1516
1517 #[tokio::test]
1518 async fn prometheus_metrics() {
1519 trace_init();
1520
1521 prometheus_scrapes_metrics().await;
1522 time::sleep(time::Duration::from_millis(500)).await;
1523 reset_on_flush_period().await;
1524 expire_on_flush_period().await;
1525 }
1526
1527 async fn prometheus_scrapes_metrics() {
1528 let start = Utc::now().timestamp();
1529
1530 let config = PrometheusExporterConfig {
1531 address: sink_exporter_address().parse().unwrap(),
1532 flush_period_secs: Duration::from_secs(2),
1533 ..Default::default()
1534 };
1535 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1536 let (name, event) = tests::create_metric_gauge(None, 123.4);
1537 let (_, delayed_event) = tests::create_metric_gauge(Some("delayed".to_string()), 123.4);
1538
1539 run_and_assert_sink_compliance(
1540 sink,
1541 stream::once(ready(event)).chain(stream::once(async move {
1542 time::sleep(time::Duration::from_secs(2)).await;
1544 delayed_event
1545 })),
1546 &SINK_TAGS,
1547 )
1548 .await;
1549
1550 let result = prometheus_query(&name).await;
1552
1553 let data = &result["data"]["result"][0];
1554 assert_eq!(data["metric"]["__name__"], Value::String(name));
1555 assert_eq!(
1556 data["metric"]["instance"],
1557 Value::String(sink_exporter_address())
1558 );
1559 assert_eq!(
1560 data["metric"]["some_tag"],
1561 Value::String("some_value".into())
1562 );
1563 assert!(data["value"][0].as_f64().unwrap() >= start as f64);
1564 assert_eq!(data["value"][1], Value::String("123.4".into()));
1565 }
1566
1567 async fn reset_on_flush_period() {
1568 let config = PrometheusExporterConfig {
1569 address: sink_exporter_address().parse().unwrap(),
1570 flush_period_secs: Duration::from_secs(3),
1571 ..Default::default()
1572 };
1573 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1574 let (tx, rx) = mpsc::unbounded_channel();
1575 let input_events = UnboundedReceiverStream::new(rx);
1576
1577 let input_events = input_events.map(Into::into);
1578 let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1579
1580 let (name1, event) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1582 tx.send(event).expect("Failed to send.");
1583 let (name2, event) = tests::create_metric_set(None, vec!["3", "4", "5"]);
1584 tx.send(event).expect("Failed to send.");
1585
1586 time::sleep(time::Duration::from_secs(2)).await;
1589
1590 let result = prometheus_query(&name1).await;
1592 assert_eq!(
1593 result["data"]["result"][0]["value"][1],
1594 Value::String("3".into())
1595 );
1596 let result = prometheus_query(&name2).await;
1597 assert_eq!(
1598 result["data"]["result"][0]["value"][1],
1599 Value::String("3".into())
1600 );
1601
1602 time::sleep(time::Duration::from_secs(3)).await;
1606
1607 let (name2, event) = tests::create_metric_set(Some(name2), vec!["8", "9"]);
1608 tx.send(event).expect("Failed to send.");
1609
1610 time::sleep(time::Duration::from_secs(2)).await;
1612 let result = prometheus_query(&name1).await;
1613 assert_eq!(result["data"]["result"][0]["value"][1], Value::Null);
1614 let result = prometheus_query(&name2).await;
1615 assert_eq!(
1616 result["data"]["result"][0]["value"][1],
1617 Value::String("2".into())
1618 );
1619
1620 drop(tx);
1621 sink_handle.await.unwrap();
1622 }
1623
1624 async fn expire_on_flush_period() {
1625 let config = PrometheusExporterConfig {
1626 address: sink_exporter_address().parse().unwrap(),
1627 flush_period_secs: Duration::from_secs(3),
1628 ..Default::default()
1629 };
1630 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1631 let (tx, rx) = mpsc::unbounded_channel();
1632 let input_events = UnboundedReceiverStream::new(rx);
1633
1634 let input_events = input_events.map(Into::into);
1635 let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1636
1637 let (name1, event) = tests::create_metric_set(None, vec!["42"]);
1639 tx.send(event).expect("Failed to send.");
1640 let (name2, event) = tests::create_metric_gauge(None, 100.0);
1641 tx.send(event).expect("Failed to send.");
1642
1643 time::sleep(time::Duration::from_secs(1)).await;
1645
1646 let body = fetch_exporter_body().await;
1648 assert!(body.contains(&name1));
1649 assert!(body.contains(&name2));
1650
1651 for _ in 0..7 {
1653 let (_, event) = tests::create_metric_set(Some(name1.clone()), vec!["43"]);
1655 tx.send(event).expect("Failed to send.");
1656
1657 time::sleep(time::Duration::from_secs(1)).await;
1659 }
1660
1661 let body = fetch_exporter_body().await;
1663 assert!(body.contains(&name1));
1664 assert!(!body.contains(&name2));
1665
1666 drop(tx);
1667 sink_handle.await.unwrap();
1668 }
1669}