1use std::{
2 convert::Infallible,
3 hash::Hash,
4 mem::{Discriminant, discriminant},
5 net::{IpAddr, Ipv4Addr, SocketAddr},
6 sync::{Arc, RwLock},
7 time::{Duration, Instant},
8};
9
10use async_trait::async_trait;
11use base64::prelude::{BASE64_STANDARD, Engine as _};
12use futures::{FutureExt, StreamExt, future, stream::BoxStream};
13use hyper::{
14 Body, Method, Request, Response, Server, StatusCode,
15 body::HttpBody,
16 header::HeaderValue,
17 service::{make_service_fn, service_fn},
18};
19use indexmap::{IndexMap, map::Entry};
20use serde_with::serde_as;
21use snafu::Snafu;
22use stream_cancel::{Trigger, Tripwire};
23use tower::ServiceBuilder;
24use tower_http::compression::CompressionLayer;
25use tracing::{Instrument, Span};
26use vector_lib::{
27 ByteSizeOf, EstimatedJsonEncodedSizeOf,
28 configurable::configurable_component,
29 internal_event::{
30 ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle as _, Output, Protocol,
31 Registered,
32 },
33};
34
35use super::collector::{MetricCollector, StringCollector};
36use crate::{
37 config::{AcknowledgementsConfig, GenerateConfig, Input, Resource, SinkConfig, SinkContext},
38 event::{
39 Event, EventStatus, Finalizable,
40 metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue},
41 },
42 http::{Auth, build_http_trace_layer},
43 internal_events::PrometheusNormalizationError,
44 sinks::{
45 Healthcheck, VectorSink,
46 util::{StreamSink, statistic::validate_quantiles},
47 },
48 tls::{MaybeTlsSettings, TlsEnableableConfig},
49};
50
51const MIN_FLUSH_PERIOD_SECS: u64 = 1;
52
53const LOCK_FAILED: &str = "Prometheus exporter data lock is poisoned";
54
55#[derive(Debug, Snafu)]
56enum BuildError {
57 #[snafu(display("Flush period for sets must be greater or equal to {} secs", min))]
58 FlushPeriodTooShort { min: u64 },
59}
60
61#[serde_as]
63#[configurable_component(sink(
64 "prometheus_exporter",
65 "Expose metric events on a Prometheus compatible endpoint."
66))]
67#[derive(Clone, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct PrometheusExporterConfig {
70 #[serde(alias = "namespace")]
79 #[configurable(metadata(docs::advanced))]
80 pub default_namespace: Option<String>,
81
82 #[serde(default = "default_address")]
86 #[configurable(metadata(docs::examples = "192.160.0.10:9598"))]
87 pub address: SocketAddr,
88
89 #[configurable(derived)]
90 pub auth: Option<Auth>,
91
92 #[configurable(derived)]
93 pub tls: Option<TlsEnableableConfig>,
94
95 #[serde(default = "super::default_histogram_buckets")]
99 #[configurable(metadata(docs::advanced))]
100 pub buckets: Vec<f64>,
101
102 #[serde(default = "super::default_summary_quantiles")]
106 #[configurable(metadata(docs::advanced))]
107 pub quantiles: Vec<f64>,
108
109 #[serde(default = "default_distributions_as_summaries")]
119 #[configurable(metadata(docs::advanced))]
120 pub distributions_as_summaries: bool,
121
122 #[serde(default = "default_flush_period_secs")]
129 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
130 #[configurable(metadata(docs::advanced))]
131 #[configurable(metadata(docs::human_name = "Flush Interval"))]
132 pub flush_period_secs: Duration,
133
134 #[serde(default)]
140 #[configurable(metadata(docs::advanced))]
141 pub suppress_timestamp: bool,
142
143 #[configurable(derived)]
144 #[serde(
145 default,
146 deserialize_with = "crate::serde::bool_or_struct",
147 skip_serializing_if = "crate::serde::is_default"
148 )]
149 pub acknowledgements: AcknowledgementsConfig,
150}
151
152impl Default for PrometheusExporterConfig {
153 fn default() -> Self {
154 Self {
155 default_namespace: None,
156 address: default_address(),
157 auth: None,
158 tls: None,
159 buckets: super::default_histogram_buckets(),
160 quantiles: super::default_summary_quantiles(),
161 distributions_as_summaries: default_distributions_as_summaries(),
162 flush_period_secs: default_flush_period_secs(),
163 suppress_timestamp: default_suppress_timestamp(),
164 acknowledgements: Default::default(),
165 }
166 }
167}
168
169const fn default_address() -> SocketAddr {
170 SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9598)
171}
172
173const fn default_distributions_as_summaries() -> bool {
174 false
175}
176
177const fn default_flush_period_secs() -> Duration {
178 Duration::from_secs(60)
179}
180
181const fn default_suppress_timestamp() -> bool {
182 false
183}
184
185impl GenerateConfig for PrometheusExporterConfig {
186 fn generate_config() -> toml::Value {
187 toml::Value::try_from(Self::default()).unwrap()
188 }
189}
190
191#[async_trait::async_trait]
192#[typetag::serde(name = "prometheus_exporter")]
193impl SinkConfig for PrometheusExporterConfig {
194 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
195 if self.flush_period_secs.as_secs() < MIN_FLUSH_PERIOD_SECS {
196 return Err(Box::new(BuildError::FlushPeriodTooShort {
197 min: MIN_FLUSH_PERIOD_SECS,
198 }));
199 }
200
201 validate_quantiles(&self.quantiles)?;
202
203 let sink = PrometheusExporter::new(self.clone());
204 let healthcheck = future::ok(()).boxed();
205
206 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
207 }
208
209 fn input(&self) -> Input {
210 Input::metric()
211 }
212
213 fn resources(&self) -> Vec<Resource> {
214 vec![Resource::tcp(self.address)]
215 }
216
217 fn acknowledgements(&self) -> &AcknowledgementsConfig {
218 &self.acknowledgements
219 }
220}
221
222struct PrometheusExporter {
223 server_shutdown_trigger: Option<Trigger>,
224 config: PrometheusExporterConfig,
225 metrics: Arc<RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>>,
226}
227
228#[derive(Clone, Copy, Debug)]
230struct MetricMetadata {
231 expiration_window: Duration,
232 expires_at: Instant,
233}
234
235impl MetricMetadata {
236 pub fn new(expiration_window: Duration) -> Self {
237 Self {
238 expiration_window,
239 expires_at: Instant::now() + expiration_window,
240 }
241 }
242
243 pub fn refresh(&mut self) {
245 self.expires_at = Instant::now() + self.expiration_window;
246 }
247
248 pub fn has_expired(&self, now: Instant) -> bool {
250 now >= self.expires_at
251 }
252}
253
254#[derive(Clone, Debug)]
263struct MetricRef {
264 series: MetricSeries,
265 value: Discriminant<MetricValue>,
266 bounds: Option<Vec<f64>>,
267}
268
269impl MetricRef {
270 pub fn from_metric(metric: &Metric) -> Self {
272 let bounds = match metric.value() {
274 MetricValue::AggregatedHistogram { buckets, .. } => {
275 Some(buckets.iter().map(|b| b.upper_limit).collect())
276 }
277 MetricValue::AggregatedSummary { quantiles, .. } => {
278 Some(quantiles.iter().map(|q| q.quantile).collect())
279 }
280 _ => None,
281 };
282
283 Self {
284 series: metric.series().clone(),
285 value: discriminant(metric.value()),
286 bounds,
287 }
288 }
289}
290
291impl PartialEq for MetricRef {
292 fn eq(&self, other: &Self) -> bool {
293 self.series == other.series && self.value == other.value && self.bounds == other.bounds
294 }
295}
296
297impl Eq for MetricRef {}
298
299impl Hash for MetricRef {
300 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
301 self.series.hash(state);
302 self.value.hash(state);
303 if let Some(bounds) = &self.bounds {
304 for bound in bounds {
305 bound.to_bits().hash(state);
306 }
307 }
308 }
309}
310
311fn authorized<T: HttpBody>(req: &Request<T>, auth: &Option<Auth>) -> bool {
312 if let Some(auth) = auth {
313 let headers = req.headers();
314 if let Some(auth_header) = headers.get(hyper::header::AUTHORIZATION) {
315 let encoded_credentials = match auth {
316 Auth::Basic { user, password } => Some(HeaderValue::from_str(
317 format!(
318 "Basic {}",
319 BASE64_STANDARD.encode(format!("{}:{}", user, password.inner()))
320 )
321 .as_str(),
322 )),
323 Auth::Bearer { token } => Some(HeaderValue::from_str(
324 format!("Bearer {}", token.inner()).as_str(),
325 )),
326 #[cfg(feature = "aws-core")]
327 _ => None,
328 };
329
330 if let Some(Ok(encoded_credentials)) = encoded_credentials
331 && auth_header == encoded_credentials
332 {
333 return true;
334 }
335 }
336 } else {
337 return true;
338 }
339
340 false
341}
342
343#[derive(Clone)]
344struct Handler {
345 auth: Option<Auth>,
346 default_namespace: Option<String>,
347 buckets: Box<[f64]>,
348 quantiles: Box<[f64]>,
349 bytes_sent: Registered<BytesSent>,
350 events_sent: Registered<EventsSent>,
351}
352
353impl Handler {
354 fn handle<T: HttpBody>(
355 &self,
356 req: Request<T>,
357 metrics: &RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>,
358 ) -> Response<Body> {
359 let mut response = Response::new(Body::empty());
360
361 match (authorized(&req, &self.auth), req.method(), req.uri().path()) {
362 (false, _, _) => {
363 *response.status_mut() = StatusCode::UNAUTHORIZED;
364 response.headers_mut().insert(
365 http::header::WWW_AUTHENTICATE,
366 HeaderValue::from_static("Basic, Bearer"),
367 );
368 }
369
370 (true, &Method::GET, "/metrics") => {
371 let metrics = metrics.read().expect(LOCK_FAILED);
372
373 let count = metrics.len();
374 let byte_size = metrics
375 .iter()
376 .map(|(_, (metric, _))| metric.estimated_json_encoded_size_of())
377 .sum();
378
379 let mut collector = StringCollector::new();
380
381 for (_, (metric, _)) in metrics.iter() {
382 collector.encode_metric(
383 self.default_namespace.as_deref(),
384 &self.buckets,
385 &self.quantiles,
386 metric,
387 );
388 }
389
390 drop(metrics);
391
392 let body = collector.finish();
393 let body_size = body.size_of();
394
395 *response.body_mut() = body.into();
396
397 response.headers_mut().insert(
398 "Content-Type",
399 HeaderValue::from_static("text/plain; version=0.0.4"),
400 );
401
402 self.events_sent.emit(CountByteSize(count, byte_size));
403 self.bytes_sent.emit(ByteSize(body_size));
404 }
405
406 (true, _, _) => {
407 *response.status_mut() = StatusCode::NOT_FOUND;
408 }
409 }
410
411 response
412 }
413}
414
415impl PrometheusExporter {
416 fn new(config: PrometheusExporterConfig) -> Self {
417 Self {
418 server_shutdown_trigger: None,
419 config,
420 metrics: Arc::new(RwLock::new(IndexMap::new())),
421 }
422 }
423
424 async fn start_server_if_needed(&mut self) -> crate::Result<()> {
425 if self.server_shutdown_trigger.is_some() {
426 return Ok(());
427 }
428
429 let handler = Handler {
430 bytes_sent: register!(BytesSent::from(Protocol::HTTP)),
431 events_sent: register!(EventsSent::from(Output(None))),
432 default_namespace: self.config.default_namespace.clone(),
433 buckets: self.config.buckets.clone().into(),
434 quantiles: self.config.quantiles.clone().into(),
435 auth: self.config.auth.clone(),
436 };
437
438 let span = Span::current();
439 let metrics = Arc::clone(&self.metrics);
440
441 let new_service = make_service_fn(move |_| {
442 let span = Span::current();
443 let metrics = Arc::clone(&metrics);
444 let handler = handler.clone();
445
446 let inner = service_fn(move |req| {
447 let response = handler.handle(req, &metrics);
448
449 future::ok::<_, Infallible>(response)
450 });
451
452 let service = ServiceBuilder::new()
453 .layer(build_http_trace_layer(span.clone()))
454 .layer(CompressionLayer::new())
455 .service(inner);
456
457 async move { Ok::<_, Infallible>(service) }
458 });
459
460 let (trigger, tripwire) = Tripwire::new();
461
462 let tls = self.config.tls.clone();
463 let address = self.config.address;
464
465 let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?;
466 let listener = tls.bind(&address).await?;
467
468 tokio::spawn(async move {
469 info!(message = "Building HTTP server.", address = %address);
470
471 Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
472 .serve(new_service)
473 .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler))
474 .instrument(span)
475 .await
476 .map_err(|error| error!("Server error: {}.", error))?;
477
478 Ok::<(), ()>(())
479 });
480
481 self.server_shutdown_trigger = Some(trigger);
482 Ok(())
483 }
484
485 fn normalize(&mut self, metric: Metric) -> Option<Metric> {
486 let new_metric = match metric.value() {
487 MetricValue::Distribution { .. } => {
488 let (series, data, metadata) = metric.into_parts();
490 let (time, kind, value) = data.into_parts();
491
492 let new_value = if self.config.distributions_as_summaries {
493 value
498 .distribution_to_sketch()
499 .expect("value should be distribution already")
500 } else {
501 value
502 .distribution_to_agg_histogram(&self.config.buckets)
503 .expect("value should be distribution already")
504 };
505
506 let data = MetricData::from_parts(time, kind, new_value);
507 Metric::from_parts(series, data, metadata)
508 }
509 _ => metric,
510 };
511
512 match new_metric.kind() {
513 MetricKind::Absolute => Some(new_metric),
514 MetricKind::Incremental => {
515 let metrics = self.metrics.read().expect(LOCK_FAILED);
516 let metric_ref = MetricRef::from_metric(&new_metric);
517
518 if let Some(existing) = metrics.get(&metric_ref) {
519 let mut current = existing.0.value().clone();
520 if current.add(new_metric.value()) {
521 return Some(new_metric.with_value(current).into_absolute());
524 }
525 }
526
527 Some(new_metric.into_absolute())
530 }
531 }
532 }
533}
534
535#[async_trait]
536impl StreamSink<Event> for PrometheusExporter {
537 async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
538 self.start_server_if_needed()
539 .await
540 .map_err(|error| error!("Failed to start Prometheus exporter: {}.", error))?;
541
542 let mut last_flush = Instant::now();
543 let flush_period = self.config.flush_period_secs;
544
545 while let Some(event) = input.next().await {
546 if last_flush.elapsed() > self.config.flush_period_secs {
555 last_flush = Instant::now();
556
557 let mut metrics = self.metrics.write().expect(LOCK_FAILED);
558
559 metrics.retain(|_metric_ref, (_, metadata)| !metadata.has_expired(last_flush));
560 }
561
562 let mut metric = event.into_metric();
564 let finalizers = metric.take_finalizers();
565
566 match self.normalize(metric) {
567 Some(normalized) => {
568 let normalized = if self.config.suppress_timestamp {
569 normalized.with_timestamp(None)
570 } else {
571 normalized
572 };
573
574 let mut metrics = self.metrics.write().expect(LOCK_FAILED);
577
578 match metrics.entry(MetricRef::from_metric(&normalized)) {
579 Entry::Occupied(mut entry) => {
580 let (data, metadata) = entry.get_mut();
581 *data = normalized;
582 metadata.refresh();
583 }
584 Entry::Vacant(entry) => {
585 entry.insert((normalized, MetricMetadata::new(flush_period)));
586 }
587 }
588 finalizers.update_status(EventStatus::Delivered);
589 }
590 _ => {
591 emit!(PrometheusNormalizationError {});
592 finalizers.update_status(EventStatus::Errored);
593 }
594 }
595 }
596
597 Ok(())
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use std::io::Read;
604
605 use chrono::{Duration, Utc};
606 use flate2::read::GzDecoder;
607 use futures::stream;
608 use indoc::indoc;
609 use similar_asserts::assert_eq;
610 use tokio::{sync::oneshot::error::TryRecvError, time};
611 use vector_lib::{
612 event::{MetricTags, StatisticKind},
613 finalization::{BatchNotifier, BatchStatus},
614 metric_tags, samples,
615 sensitive_string::SensitiveString,
616 };
617
618 use super::*;
619 use crate::{
620 config::ProxyConfig,
621 event::metric::{Metric, MetricValue},
622 http::HttpClient,
623 sinks::prometheus::{distribution_to_agg_histogram, distribution_to_ddsketch},
624 test_util::{
625 components::{SINK_TAGS, run_and_assert_sink_compliance},
626 next_addr, random_string, trace_init,
627 },
628 tls::MaybeTlsSettings,
629 };
630
631 #[test]
632 fn generate_config() {
633 crate::test_util::test_generate_config::<PrometheusExporterConfig>();
634 }
635
636 #[tokio::test]
637 async fn prometheus_notls() {
638 export_and_fetch_simple(None).await;
639 }
640
641 #[tokio::test]
642 async fn prometheus_tls() {
643 let mut tls_config = TlsEnableableConfig::test_config();
644 tls_config.options.verify_hostname = Some(false);
645 export_and_fetch_simple(Some(tls_config)).await;
646 }
647
648 #[tokio::test]
649 async fn prometheus_noauth() {
650 let (name1, event1) = create_metric_gauge(None, 123.4);
651 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
652 let events = vec![event1, event2];
653
654 let response_result = export_and_fetch_with_auth(None, None, events, false).await;
655
656 assert!(response_result.is_ok());
657
658 let body = response_result.expect("Cannot extract body from the response");
659
660 assert!(body.contains(&format!(
661 indoc! {r#"
662 # HELP {name} {name}
663 # TYPE {name} gauge
664 {name}{{some_tag="some_value"}} 123.4
665 "#},
666 name = name1
667 )));
668 assert!(body.contains(&format!(
669 indoc! {r#"
670 # HELP {name} {name}
671 # TYPE {name} gauge
672 {name}{{some_tag="some_value"}} 3
673 "#},
674 name = name2
675 )));
676 }
677
678 #[tokio::test]
679 async fn prometheus_successful_basic_auth() {
680 let (name1, event1) = create_metric_gauge(None, 123.4);
681 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
682 let events = vec![event1, event2];
683
684 let auth_config = Auth::Basic {
685 user: "user".to_string(),
686 password: SensitiveString::from("password".to_string()),
687 };
688
689 let response_result =
690 export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
691 .await;
692
693 assert!(response_result.is_ok());
694
695 let body = response_result.expect("Cannot extract body from the response");
696
697 assert!(body.contains(&format!(
698 indoc! {r#"
699 # HELP {name} {name}
700 # TYPE {name} gauge
701 {name}{{some_tag="some_value"}} 123.4
702 "#},
703 name = name1
704 )));
705 assert!(body.contains(&format!(
706 indoc! {r#"
707 # HELP {name} {name}
708 # TYPE {name} gauge
709 {name}{{some_tag="some_value"}} 3
710 "#},
711 name = name2
712 )));
713 }
714
715 #[tokio::test]
716 async fn prometheus_successful_token_auth() {
717 let (name1, event1) = create_metric_gauge(None, 123.4);
718 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
719 let events = vec![event1, event2];
720
721 let auth_config = Auth::Bearer {
722 token: SensitiveString::from("token".to_string()),
723 };
724
725 let response_result =
726 export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
727 .await;
728
729 assert!(response_result.is_ok());
730
731 let body = response_result.expect("Cannot extract body from the response");
732
733 assert!(body.contains(&format!(
734 indoc! {r#"
735 # HELP {name} {name}
736 # TYPE {name} gauge
737 {name}{{some_tag="some_value"}} 123.4
738 "#},
739 name = name1
740 )));
741 assert!(body.contains(&format!(
742 indoc! {r#"
743 # HELP {name} {name}
744 # TYPE {name} gauge
745 {name}{{some_tag="some_value"}} 3
746 "#},
747 name = name2
748 )));
749 }
750
751 #[tokio::test]
752 async fn prometheus_missing_auth() {
753 let (_, event1) = create_metric_gauge(None, 123.4);
754 let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
755 let events = vec![event1, event2];
756
757 let server_auth_config = Auth::Bearer {
758 token: SensitiveString::from("token".to_string()),
759 };
760
761 let response_result =
762 export_and_fetch_with_auth(Some(server_auth_config), None, events, false).await;
763
764 assert!(response_result.is_err());
765 assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
766 }
767
768 #[tokio::test]
769 async fn prometheus_wrong_auth() {
770 let (_, event1) = create_metric_gauge(None, 123.4);
771 let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
772 let events = vec![event1, event2];
773
774 let server_auth_config = Auth::Bearer {
775 token: SensitiveString::from("token".to_string()),
776 };
777
778 let client_auth_config = Auth::Basic {
779 user: "user".to_string(),
780 password: SensitiveString::from("password".to_string()),
781 };
782
783 let response_result = export_and_fetch_with_auth(
784 Some(server_auth_config),
785 Some(client_auth_config),
786 events,
787 false,
788 )
789 .await;
790
791 assert!(response_result.is_err());
792 assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
793 }
794
795 #[tokio::test]
796 async fn encoding_gzip() {
797 let (name1, event1) = create_metric_gauge(None, 123.4);
798 let events = vec![event1];
799
800 let body_raw = export_and_fetch_raw(None, events, false, Some(String::from("gzip"))).await;
801 let expected = format!(
802 indoc! {r#"
803 # HELP {name} {name}
804 # TYPE {name} gauge
805 {name}{{some_tag="some_value"}} 123.4
806 "#},
807 name = name1,
808 );
809
810 let mut gz = GzDecoder::new(&body_raw[..]);
811 let mut body_decoded = String::new();
812 let _ = gz.read_to_string(&mut body_decoded);
813
814 assert!(body_raw.len() < expected.len());
815 assert_eq!(body_decoded, expected);
816 }
817
818 #[tokio::test]
819 async fn updates_timestamps() {
820 let timestamp1 = Utc::now();
821 let (name, event1) = create_metric_gauge(None, 123.4);
822 let event1 = Event::from(event1.into_metric().with_timestamp(Some(timestamp1)));
823 let (_, event2) = create_metric_gauge(Some(name.clone()), 12.0);
824 let timestamp2 = timestamp1 + Duration::seconds(1);
825 let event2 = Event::from(event2.into_metric().with_timestamp(Some(timestamp2)));
826 let events = vec![event1, event2];
827
828 let body = export_and_fetch(None, events, false).await;
829 let timestamp = timestamp2.timestamp_millis();
830 assert_eq!(
831 body,
832 format!(
833 indoc! {r#"
834 # HELP {name} {name}
835 # TYPE {name} gauge
836 {name}{{some_tag="some_value"}} 135.4 {timestamp}
837 "#},
838 name = name,
839 timestamp = timestamp
840 )
841 );
842 }
843
844 #[tokio::test]
845 async fn suppress_timestamp() {
846 let timestamp = Utc::now();
847 let (name, event) = create_metric_gauge(None, 123.4);
848 let event = Event::from(event.into_metric().with_timestamp(Some(timestamp)));
849 let events = vec![event];
850
851 let body = export_and_fetch(None, events, true).await;
852 assert_eq!(
853 body,
854 format!(
855 indoc! {r#"
856 # HELP {name} {name}
857 # TYPE {name} gauge
858 {name}{{some_tag="some_value"}} 123.4
859 "#},
860 name = name,
861 )
862 );
863 }
864
865 #[tokio::test]
870 async fn prometheus_duplicate_labels() {
871 let (name, event) = create_metric_with_tags(
872 None,
873 MetricValue::Gauge { value: 123.4 },
874 Some(metric_tags!("code" => "200", "code" => "success")),
875 );
876 let events = vec![event];
877
878 let response_result = export_and_fetch_with_auth(None, None, events, false).await;
879
880 assert!(response_result.is_ok());
881
882 let body = response_result.expect("Cannot extract body from the response");
883
884 assert!(body.contains(&format!(
885 indoc! {r#"
886 # HELP {name} {name}
887 # TYPE {name} gauge
888 {name}{{code="success"}} 123.4
889 "# },
890 name = name
891 )));
892 }
893
894 async fn export_and_fetch_raw(
895 tls_config: Option<TlsEnableableConfig>,
896 mut events: Vec<Event>,
897 suppress_timestamp: bool,
898 encoding: Option<String>,
899 ) -> hyper::body::Bytes {
900 trace_init();
901
902 let client_settings = MaybeTlsSettings::from_config(tls_config.as_ref(), false).unwrap();
903 let proto = client_settings.http_protocol_name();
904
905 let address = next_addr();
906 let config = PrometheusExporterConfig {
907 address,
908 tls: tls_config,
909 suppress_timestamp,
910 ..Default::default()
911 };
912
913 let mut receiver = BatchNotifier::apply_to(&mut events[..]);
915 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
916
917 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
918 let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
919 let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
920 sink,
921 stream::iter(events).chain(stream::once(async move {
922 time::sleep(time::Duration::from_millis(500)).await;
924 delayed_event
925 })),
926 &SINK_TAGS,
927 ));
928
929 time::sleep(time::Duration::from_millis(100)).await;
930
931 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
933
934 let mut request = Request::get(format!("{proto}://{address}/metrics"))
935 .body(Body::empty())
936 .expect("Error creating request.");
937
938 if let Some(ref encoding) = encoding {
939 request.headers_mut().insert(
940 http::header::ACCEPT_ENCODING,
941 HeaderValue::from_str(encoding.as_str()).unwrap(),
942 );
943 }
944
945 let proxy = ProxyConfig::default();
946 let result = HttpClient::new(client_settings, &proxy)
947 .unwrap()
948 .send(request)
949 .await
950 .expect("Could not fetch query");
951
952 assert!(result.status().is_success());
953
954 if encoding.is_some() {
955 assert!(
956 result
957 .headers()
958 .contains_key(http::header::CONTENT_ENCODING)
959 );
960 }
961
962 let body = result.into_body();
963 let bytes = hyper::body::to_bytes(body)
964 .await
965 .expect("Reading body failed");
966
967 sink_handle.await.unwrap();
968
969 bytes
970 }
971
972 async fn export_and_fetch(
973 tls_config: Option<TlsEnableableConfig>,
974 events: Vec<Event>,
975 suppress_timestamp: bool,
976 ) -> String {
977 let bytes = export_and_fetch_raw(tls_config, events, suppress_timestamp, None);
978 String::from_utf8(bytes.await.to_vec()).unwrap()
979 }
980
981 async fn export_and_fetch_with_auth(
982 server_auth_config: Option<Auth>,
983 client_auth_config: Option<Auth>,
984 mut events: Vec<Event>,
985 suppress_timestamp: bool,
986 ) -> Result<String, http::status::StatusCode> {
987 trace_init();
988
989 let client_settings = MaybeTlsSettings::from_config(None, false).unwrap();
990 let proto = client_settings.http_protocol_name();
991
992 let address = next_addr();
993 let config = PrometheusExporterConfig {
994 address,
995 auth: server_auth_config,
996 tls: None,
997 suppress_timestamp,
998 ..Default::default()
999 };
1000
1001 let mut receiver = BatchNotifier::apply_to(&mut events[..]);
1003 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
1004
1005 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1006 let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
1007 let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
1008 sink,
1009 stream::iter(events).chain(stream::once(async move {
1010 time::sleep(time::Duration::from_millis(500)).await;
1012 delayed_event
1013 })),
1014 &SINK_TAGS,
1015 ));
1016
1017 time::sleep(time::Duration::from_millis(100)).await;
1018
1019 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
1021
1022 let mut request = Request::get(format!("{proto}://{address}/metrics"))
1023 .body(Body::empty())
1024 .expect("Error creating request.");
1025
1026 if let Some(client_auth_config) = client_auth_config {
1027 client_auth_config.apply(&mut request);
1028 }
1029
1030 let proxy = ProxyConfig::default();
1031 let result = HttpClient::new(client_settings, &proxy)
1032 .unwrap()
1033 .send(request)
1034 .await
1035 .expect("Could not fetch query");
1036
1037 if !result.status().is_success() {
1038 return Err(result.status());
1039 }
1040
1041 let body = result.into_body();
1042 let bytes = hyper::body::to_bytes(body)
1043 .await
1044 .expect("Reading body failed");
1045 let result = String::from_utf8(bytes.to_vec()).unwrap();
1046
1047 sink_handle.await.unwrap();
1048
1049 Ok(result)
1050 }
1051
1052 async fn export_and_fetch_simple(tls_config: Option<TlsEnableableConfig>) {
1053 let (name1, event1) = create_metric_gauge(None, 123.4);
1054 let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1055 let events = vec![event1, event2];
1056
1057 let body = export_and_fetch(tls_config, events, false).await;
1058
1059 assert!(body.contains(&format!(
1060 indoc! {r#"
1061 # HELP {name} {name}
1062 # TYPE {name} gauge
1063 {name}{{some_tag="some_value"}} 123.4
1064 "#},
1065 name = name1
1066 )));
1067 assert!(body.contains(&format!(
1068 indoc! {r#"
1069 # HELP {name} {name}
1070 # TYPE {name} gauge
1071 {name}{{some_tag="some_value"}} 3
1072 "#},
1073 name = name2
1074 )));
1075 }
1076
1077 pub fn create_metric_gauge(name: Option<String>, value: f64) -> (String, Event) {
1078 create_metric(name, MetricValue::Gauge { value })
1079 }
1080
1081 pub fn create_metric_set(name: Option<String>, values: Vec<&'static str>) -> (String, Event) {
1082 create_metric(
1083 name,
1084 MetricValue::Set {
1085 values: values.into_iter().map(Into::into).collect(),
1086 },
1087 )
1088 }
1089
1090 fn create_metric(name: Option<String>, value: MetricValue) -> (String, Event) {
1091 create_metric_with_tags(name, value, Some(metric_tags!("some_tag" => "some_value")))
1092 }
1093
1094 fn create_metric_with_tags(
1095 name: Option<String>,
1096 value: MetricValue,
1097 tags: Option<MetricTags>,
1098 ) -> (String, Event) {
1099 let name = name.unwrap_or_else(|| format!("vector_set_{}", random_string(16)));
1100 let event = Metric::new(name.clone(), MetricKind::Incremental, value)
1101 .with_tags(tags)
1102 .into();
1103 (name, event)
1104 }
1105
1106 #[tokio::test]
1107 async fn sink_absolute() {
1108 let config = PrometheusExporterConfig {
1109 address: next_addr(), tls: None,
1111 ..Default::default()
1112 };
1113
1114 let sink = PrometheusExporter::new(config);
1115
1116 let m1 = Metric::new(
1117 "absolute",
1118 MetricKind::Absolute,
1119 MetricValue::Counter { value: 32. },
1120 )
1121 .with_tags(Some(metric_tags!("tag1" => "value1")));
1122
1123 let m2 = m1.clone().with_tags(Some(metric_tags!("tag1" => "value2")));
1124
1125 let events = vec![
1126 Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 32. })),
1127 Event::Metric(m2.clone().with_value(MetricValue::Counter { value: 33. })),
1128 Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 40. })),
1129 ];
1130
1131 let metrics_handle = Arc::clone(&sink.metrics);
1132
1133 let sink = VectorSink::from_event_streamsink(sink);
1134 let input_events = stream::iter(events).map(Into::into);
1135 sink.run(input_events).await.unwrap();
1136
1137 let metrics_after = metrics_handle.read().unwrap();
1138
1139 let expected_m1 = metrics_after
1140 .get(&MetricRef::from_metric(&m1))
1141 .expect("m1 should exist");
1142 let expected_m1_value = MetricValue::Counter { value: 40. };
1143 assert_eq!(expected_m1.0.value(), &expected_m1_value);
1144
1145 let expected_m2 = metrics_after
1146 .get(&MetricRef::from_metric(&m2))
1147 .expect("m2 should exist");
1148 let expected_m2_value = MetricValue::Counter { value: 33. };
1149 assert_eq!(expected_m2.0.value(), &expected_m2_value);
1150 }
1151
1152 #[tokio::test]
1153 async fn sink_distributions_as_histograms() {
1154 let config = PrometheusExporterConfig {
1162 address: next_addr(), tls: None,
1164 ..Default::default()
1165 };
1166 let buckets = config.buckets.clone();
1167
1168 let sink = PrometheusExporter::new(config);
1169
1170 let base_summary_metric = Metric::new(
1172 "distrib_summary",
1173 MetricKind::Incremental,
1174 MetricValue::Distribution {
1175 statistic: StatisticKind::Summary,
1176 samples: samples!(1.0 => 1, 3.0 => 2),
1177 },
1178 );
1179
1180 let base_histogram_metric = Metric::new(
1181 "distrib_histo",
1182 MetricKind::Incremental,
1183 MetricValue::Distribution {
1184 statistic: StatisticKind::Histogram,
1185 samples: samples!(7.0 => 1, 9.0 => 2),
1186 },
1187 );
1188
1189 let metrics = vec![
1190 base_summary_metric.clone(),
1191 base_summary_metric
1192 .clone()
1193 .with_value(MetricValue::Distribution {
1194 statistic: StatisticKind::Summary,
1195 samples: samples!(1.0 => 2, 2.9 => 1),
1196 }),
1197 base_summary_metric
1198 .clone()
1199 .with_value(MetricValue::Distribution {
1200 statistic: StatisticKind::Summary,
1201 samples: samples!(1.0 => 4, 3.2 => 1),
1202 }),
1203 base_histogram_metric.clone(),
1204 base_histogram_metric
1205 .clone()
1206 .with_value(MetricValue::Distribution {
1207 statistic: StatisticKind::Histogram,
1208 samples: samples!(7.0 => 2, 9.9 => 1),
1209 }),
1210 base_histogram_metric
1211 .clone()
1212 .with_value(MetricValue::Distribution {
1213 statistic: StatisticKind::Histogram,
1214 samples: samples!(7.0 => 4, 10.2 => 1),
1215 }),
1216 ];
1217
1218 let mut merged_summary = base_summary_metric.clone();
1220 assert!(merged_summary.update(&metrics[1]));
1221 assert!(merged_summary.update(&metrics[2]));
1222 let expected_summary = distribution_to_agg_histogram(merged_summary, &buckets)
1223 .expect("input summary metric should have been distribution")
1224 .into_absolute();
1225
1226 let mut merged_histogram = base_histogram_metric.clone();
1227 assert!(merged_histogram.update(&metrics[4]));
1228 assert!(merged_histogram.update(&metrics[5]));
1229 let expected_histogram = distribution_to_agg_histogram(merged_histogram, &buckets)
1230 .expect("input histogram metric should have been distribution")
1231 .into_absolute();
1232
1233 let metrics_handle = Arc::clone(&sink.metrics);
1238
1239 let events = metrics
1240 .iter()
1241 .cloned()
1242 .map(Event::Metric)
1243 .collect::<Vec<_>>();
1244
1245 let sink = VectorSink::from_event_streamsink(sink);
1246 let input_events = stream::iter(events).map(Into::into);
1247 sink.run(input_events).await.unwrap();
1248
1249 let metrics_after = metrics_handle.read().unwrap();
1250
1251 assert_eq!(metrics_after.len(), 2);
1253
1254 let actual_summary = metrics_after
1255 .get(&MetricRef::from_metric(&expected_summary))
1256 .expect("summary metric should exist");
1257 assert_eq!(actual_summary.0.value(), expected_summary.value());
1258
1259 let actual_histogram = metrics_after
1260 .get(&MetricRef::from_metric(&expected_histogram))
1261 .expect("histogram metric should exist");
1262 assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1263 }
1264
1265 #[tokio::test]
1266 async fn sink_distributions_as_summaries() {
1267 let config = PrometheusExporterConfig {
1281 address: next_addr(), tls: None,
1283 distributions_as_summaries: true,
1284 ..Default::default()
1285 };
1286
1287 let sink = PrometheusExporter::new(config);
1288
1289 let base_summary_metric = Metric::new(
1291 "distrib_summary",
1292 MetricKind::Incremental,
1293 MetricValue::Distribution {
1294 statistic: StatisticKind::Summary,
1295 samples: samples!(1.0 => 1, 3.0 => 2),
1296 },
1297 );
1298
1299 let base_histogram_metric = Metric::new(
1300 "distrib_histo",
1301 MetricKind::Incremental,
1302 MetricValue::Distribution {
1303 statistic: StatisticKind::Histogram,
1304 samples: samples!(7.0 => 1, 9.0 => 2),
1305 },
1306 );
1307
1308 let metrics = vec![
1309 base_summary_metric.clone(),
1310 base_summary_metric
1311 .clone()
1312 .with_value(MetricValue::Distribution {
1313 statistic: StatisticKind::Summary,
1314 samples: samples!(1.0 => 2, 2.9 => 1),
1315 }),
1316 base_summary_metric
1317 .clone()
1318 .with_value(MetricValue::Distribution {
1319 statistic: StatisticKind::Summary,
1320 samples: samples!(1.0 => 4, 3.2 => 1),
1321 }),
1322 base_histogram_metric.clone(),
1323 base_histogram_metric
1324 .clone()
1325 .with_value(MetricValue::Distribution {
1326 statistic: StatisticKind::Histogram,
1327 samples: samples!(7.0 => 2, 9.9 => 1),
1328 }),
1329 base_histogram_metric
1330 .clone()
1331 .with_value(MetricValue::Distribution {
1332 statistic: StatisticKind::Histogram,
1333 samples: samples!(7.0 => 4, 10.2 => 1),
1334 }),
1335 ];
1336
1337 let mut merged_summary = base_summary_metric.clone();
1339 assert!(merged_summary.update(&metrics[1]));
1340 assert!(merged_summary.update(&metrics[2]));
1341 let expected_summary = distribution_to_ddsketch(merged_summary)
1342 .expect("input summary metric should have been distribution")
1343 .into_absolute();
1344
1345 let mut merged_histogram = base_histogram_metric.clone();
1346 assert!(merged_histogram.update(&metrics[4]));
1347 assert!(merged_histogram.update(&metrics[5]));
1348 let expected_histogram = distribution_to_ddsketch(merged_histogram)
1349 .expect("input histogram metric should have been distribution")
1350 .into_absolute();
1351
1352 let metrics_handle = Arc::clone(&sink.metrics);
1354
1355 let events = metrics
1356 .iter()
1357 .cloned()
1358 .map(Event::Metric)
1359 .collect::<Vec<_>>();
1360
1361 let sink = VectorSink::from_event_streamsink(sink);
1362 let input_events = stream::iter(events).map(Into::into);
1363 sink.run(input_events).await.unwrap();
1364
1365 let metrics_after = metrics_handle.read().unwrap();
1366
1367 assert_eq!(metrics_after.len(), 2);
1369
1370 let actual_summary = metrics_after
1371 .get(&MetricRef::from_metric(&expected_summary))
1372 .expect("summary metric should exist");
1373 assert_eq!(actual_summary.0.value(), expected_summary.value());
1374
1375 let actual_histogram = metrics_after
1376 .get(&MetricRef::from_metric(&expected_histogram))
1377 .expect("histogram metric should exist");
1378 assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1379 }
1380
1381 #[tokio::test]
1382 async fn sink_gauge_incremental_absolute_mix() {
1383 let config = PrometheusExporterConfig {
1390 address: next_addr(), tls: None,
1392 ..Default::default()
1393 };
1394
1395 let sink = PrometheusExporter::new(config);
1396
1397 let base_absolute_gauge_metric = Metric::new(
1398 "gauge",
1399 MetricKind::Absolute,
1400 MetricValue::Gauge { value: 100.0 },
1401 );
1402
1403 let base_incremental_gauge_metric = Metric::new(
1404 "gauge",
1405 MetricKind::Incremental,
1406 MetricValue::Gauge { value: -10.0 },
1407 );
1408
1409 let metrics = vec![
1410 base_absolute_gauge_metric.clone(),
1411 base_absolute_gauge_metric
1412 .clone()
1413 .with_value(MetricValue::Gauge { value: 333.0 }),
1414 base_incremental_gauge_metric.clone(),
1415 base_incremental_gauge_metric
1416 .clone()
1417 .with_value(MetricValue::Gauge { value: 4.0 }),
1418 ];
1419
1420 let metrics_handle = Arc::clone(&sink.metrics);
1422
1423 let events = metrics
1424 .iter()
1425 .cloned()
1426 .map(Event::Metric)
1427 .collect::<Vec<_>>();
1428
1429 let sink = VectorSink::from_event_streamsink(sink);
1430 let input_events = stream::iter(events).map(Into::into);
1431 sink.run(input_events).await.unwrap();
1432
1433 let metrics_after = metrics_handle.read().unwrap();
1434
1435 assert_eq!(metrics_after.len(), 1);
1437
1438 let expected_gauge = Metric::new(
1439 "gauge",
1440 MetricKind::Absolute,
1441 MetricValue::Gauge { value: 327.0 },
1442 );
1443
1444 let actual_gauge = metrics_after
1445 .get(&MetricRef::from_metric(&expected_gauge))
1446 .expect("gauge metric should exist");
1447 assert_eq!(actual_gauge.0.value(), expected_gauge.value());
1448 }
1449}
1450
1451#[cfg(all(test, feature = "prometheus-integration-tests"))]
1452mod integration_tests {
1453 #![allow(clippy::print_stdout)] #![allow(clippy::print_stderr)] #![allow(clippy::dbg_macro)] use chrono::Utc;
1458 use futures::{future::ready, stream};
1459 use serde_json::Value;
1460 use tokio::{sync::mpsc, time};
1461 use tokio_stream::wrappers::UnboundedReceiverStream;
1462
1463 use super::*;
1464 use crate::{
1465 config::ProxyConfig,
1466 http::HttpClient,
1467 test_util::{
1468 components::{SINK_TAGS, run_and_assert_sink_compliance},
1469 trace_init,
1470 },
1471 };
1472
1473 fn sink_exporter_address() -> String {
1474 std::env::var("SINK_EXPORTER_ADDRESS").unwrap_or_else(|_| "127.0.0.1:9101".into())
1475 }
1476
1477 fn prometheus_address() -> String {
1478 std::env::var("PROMETHEUS_ADDRESS").unwrap_or_else(|_| "localhost:9090".into())
1479 }
1480
1481 async fn fetch_exporter_body() -> String {
1482 let url = format!("http://{}/metrics", sink_exporter_address());
1483 let request = Request::get(url)
1484 .body(Body::empty())
1485 .expect("Error creating request.");
1486 let proxy = ProxyConfig::default();
1487 let result = HttpClient::new(None, &proxy)
1488 .unwrap()
1489 .send(request)
1490 .await
1491 .expect("Could not send request");
1492 let result = hyper::body::to_bytes(result.into_body())
1493 .await
1494 .expect("Error fetching body");
1495 String::from_utf8_lossy(&result).to_string()
1496 }
1497
1498 async fn prometheus_query(query: &str) -> Value {
1499 let url = format!(
1500 "http://{}/api/v1/query?query={}",
1501 prometheus_address(),
1502 query
1503 );
1504 let request = Request::post(url)
1505 .body(Body::empty())
1506 .expect("Error creating request.");
1507 let proxy = ProxyConfig::default();
1508 let result = HttpClient::new(None, &proxy)
1509 .unwrap()
1510 .send(request)
1511 .await
1512 .expect("Could not fetch query");
1513 let result = hyper::body::to_bytes(result.into_body())
1514 .await
1515 .expect("Error fetching body");
1516 let result = String::from_utf8_lossy(&result);
1517 serde_json::from_str(result.as_ref()).expect("Invalid JSON from prometheus")
1518 }
1519
1520 #[tokio::test]
1521 async fn prometheus_metrics() {
1522 trace_init();
1523
1524 prometheus_scrapes_metrics().await;
1525 time::sleep(time::Duration::from_millis(500)).await;
1526 reset_on_flush_period().await;
1527 expire_on_flush_period().await;
1528 }
1529
1530 async fn prometheus_scrapes_metrics() {
1531 let start = Utc::now().timestamp();
1532
1533 let config = PrometheusExporterConfig {
1534 address: sink_exporter_address().parse().unwrap(),
1535 flush_period_secs: Duration::from_secs(2),
1536 ..Default::default()
1537 };
1538 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1539 let (name, event) = tests::create_metric_gauge(None, 123.4);
1540 let (_, delayed_event) = tests::create_metric_gauge(Some("delayed".to_string()), 123.4);
1541
1542 run_and_assert_sink_compliance(
1543 sink,
1544 stream::once(ready(event)).chain(stream::once(async move {
1545 time::sleep(time::Duration::from_secs(2)).await;
1547 delayed_event
1548 })),
1549 &SINK_TAGS,
1550 )
1551 .await;
1552
1553 let result = prometheus_query(&name).await;
1555
1556 let data = &result["data"]["result"][0];
1557 assert_eq!(data["metric"]["__name__"], Value::String(name));
1558 assert_eq!(
1559 data["metric"]["instance"],
1560 Value::String(sink_exporter_address())
1561 );
1562 assert_eq!(
1563 data["metric"]["some_tag"],
1564 Value::String("some_value".into())
1565 );
1566 assert!(data["value"][0].as_f64().unwrap() >= start as f64);
1567 assert_eq!(data["value"][1], Value::String("123.4".into()));
1568 }
1569
1570 async fn reset_on_flush_period() {
1571 let config = PrometheusExporterConfig {
1572 address: sink_exporter_address().parse().unwrap(),
1573 flush_period_secs: Duration::from_secs(3),
1574 ..Default::default()
1575 };
1576 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1577 let (tx, rx) = mpsc::unbounded_channel();
1578 let input_events = UnboundedReceiverStream::new(rx);
1579
1580 let input_events = input_events.map(Into::into);
1581 let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1582
1583 let (name1, event) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1585 tx.send(event).expect("Failed to send.");
1586 let (name2, event) = tests::create_metric_set(None, vec!["3", "4", "5"]);
1587 tx.send(event).expect("Failed to send.");
1588
1589 time::sleep(time::Duration::from_secs(2)).await;
1592
1593 let result = prometheus_query(&name1).await;
1595 assert_eq!(
1596 result["data"]["result"][0]["value"][1],
1597 Value::String("3".into())
1598 );
1599 let result = prometheus_query(&name2).await;
1600 assert_eq!(
1601 result["data"]["result"][0]["value"][1],
1602 Value::String("3".into())
1603 );
1604
1605 time::sleep(time::Duration::from_secs(3)).await;
1609
1610 let (name2, event) = tests::create_metric_set(Some(name2), vec!["8", "9"]);
1611 tx.send(event).expect("Failed to send.");
1612
1613 time::sleep(time::Duration::from_secs(2)).await;
1615 let result = prometheus_query(&name1).await;
1616 assert_eq!(result["data"]["result"][0]["value"][1], Value::Null);
1617 let result = prometheus_query(&name2).await;
1618 assert_eq!(
1619 result["data"]["result"][0]["value"][1],
1620 Value::String("2".into())
1621 );
1622
1623 drop(tx);
1624 sink_handle.await.unwrap();
1625 }
1626
1627 async fn expire_on_flush_period() {
1628 let config = PrometheusExporterConfig {
1629 address: sink_exporter_address().parse().unwrap(),
1630 flush_period_secs: Duration::from_secs(3),
1631 ..Default::default()
1632 };
1633 let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1634 let (tx, rx) = mpsc::unbounded_channel();
1635 let input_events = UnboundedReceiverStream::new(rx);
1636
1637 let input_events = input_events.map(Into::into);
1638 let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1639
1640 let (name1, event) = tests::create_metric_set(None, vec!["42"]);
1642 tx.send(event).expect("Failed to send.");
1643 let (name2, event) = tests::create_metric_gauge(None, 100.0);
1644 tx.send(event).expect("Failed to send.");
1645
1646 time::sleep(time::Duration::from_secs(1)).await;
1648
1649 let body = fetch_exporter_body().await;
1651 assert!(body.contains(&name1));
1652 assert!(body.contains(&name2));
1653
1654 for _ in 0..7 {
1656 let (_, event) = tests::create_metric_set(Some(name1.clone()), vec!["43"]);
1658 tx.send(event).expect("Failed to send.");
1659
1660 time::sleep(time::Duration::from_secs(1)).await;
1662 }
1663
1664 let body = fetch_exporter_body().await;
1666 assert!(body.contains(&name1));
1667 assert!(!body.contains(&name2));
1668
1669 drop(tx);
1670 sink_handle.await.unwrap();
1671 }
1672}