vector/sinks/prometheus/
exporter.rs

1use std::{
2    convert::Infallible,
3    hash::Hash,
4    mem::{Discriminant, discriminant},
5    net::{IpAddr, Ipv4Addr, SocketAddr},
6    sync::{Arc, RwLock},
7    time::{Duration, Instant},
8};
9
10use async_trait::async_trait;
11use base64::prelude::{BASE64_STANDARD, Engine as _};
12use futures::{FutureExt, StreamExt, future, stream::BoxStream};
13use hyper::{
14    Body, Method, Request, Response, Server, StatusCode,
15    body::HttpBody,
16    header::HeaderValue,
17    service::{make_service_fn, service_fn},
18};
19use indexmap::{IndexMap, map::Entry};
20use serde_with::serde_as;
21use snafu::Snafu;
22use stream_cancel::{Trigger, Tripwire};
23use tower::ServiceBuilder;
24use tower_http::compression::CompressionLayer;
25use tracing::{Instrument, Span};
26use vector_lib::{
27    ByteSizeOf, EstimatedJsonEncodedSizeOf,
28    configurable::configurable_component,
29    internal_event::{
30        ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle as _, Output, Protocol,
31        Registered,
32    },
33};
34
35use super::collector::{MetricCollector, StringCollector};
36use crate::{
37    config::{AcknowledgementsConfig, GenerateConfig, Input, Resource, SinkConfig, SinkContext},
38    event::{
39        Event, EventStatus, Finalizable,
40        metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue},
41    },
42    http::{Auth, build_http_trace_layer},
43    internal_events::PrometheusNormalizationError,
44    sinks::{
45        Healthcheck, VectorSink,
46        util::{StreamSink, statistic::validate_quantiles},
47    },
48    tls::{MaybeTlsSettings, TlsEnableableConfig},
49};
50
51const MIN_FLUSH_PERIOD_SECS: u64 = 1;
52
53const LOCK_FAILED: &str = "Prometheus exporter data lock is poisoned";
54
55#[derive(Debug, Snafu)]
56enum BuildError {
57    #[snafu(display("Flush period for sets must be greater or equal to {} secs", min))]
58    FlushPeriodTooShort { min: u64 },
59}
60
61/// Configuration for the `prometheus_exporter` sink.
62#[serde_as]
63#[configurable_component(sink(
64    "prometheus_exporter",
65    "Expose metric events on a Prometheus compatible endpoint."
66))]
67#[derive(Clone, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct PrometheusExporterConfig {
70    /// The default namespace for any metrics sent.
71    ///
72    /// This namespace is only used if a metric has no existing namespace. When a namespace is
73    /// present, it is used as a prefix to the metric name, and separated with an underscore (`_`).
74    ///
75    /// It should follow the Prometheus [naming conventions][prom_naming_docs].
76    ///
77    /// [prom_naming_docs]: https://prometheus.io/docs/practices/naming/#metric-names
78    #[serde(alias = "namespace")]
79    #[configurable(metadata(docs::advanced))]
80    pub default_namespace: Option<String>,
81
82    /// The address to expose for scraping.
83    ///
84    /// The metrics are exposed at the typical Prometheus exporter path, `/metrics`.
85    #[serde(default = "default_address")]
86    #[configurable(metadata(docs::examples = "192.160.0.10:9598"))]
87    pub address: SocketAddr,
88
89    #[configurable(derived)]
90    pub auth: Option<Auth>,
91
92    #[configurable(derived)]
93    pub tls: Option<TlsEnableableConfig>,
94
95    /// Default buckets to use for aggregating [distribution][dist_metric_docs] metrics into histograms.
96    ///
97    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
98    #[serde(default = "super::default_histogram_buckets")]
99    #[configurable(metadata(docs::advanced))]
100    pub buckets: Vec<f64>,
101
102    /// Quantiles to use for aggregating [distribution][dist_metric_docs] metrics into a summary.
103    ///
104    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
105    #[serde(default = "super::default_summary_quantiles")]
106    #[configurable(metadata(docs::advanced))]
107    pub quantiles: Vec<f64>,
108
109    /// Whether or not to render [distributions][dist_metric_docs] as an [aggregated histogram][prom_agg_hist_docs] or  [aggregated summary][prom_agg_summ_docs].
110    ///
111    /// While distributions as a lossless way to represent a set of samples for a
112    /// metric is supported, Prometheus clients (the application being scraped, which is this sink) must
113    /// aggregate locally into either an aggregated histogram or aggregated summary.
114    ///
115    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
116    /// [prom_agg_hist_docs]: https://prometheus.io/docs/concepts/metric_types/#histogram
117    /// [prom_agg_summ_docs]: https://prometheus.io/docs/concepts/metric_types/#summary
118    #[serde(default = "default_distributions_as_summaries")]
119    #[configurable(metadata(docs::advanced))]
120    pub distributions_as_summaries: bool,
121
122    /// The interval, in seconds, on which metrics are flushed.
123    ///
124    /// On the flush interval, if a metric has not been seen since the last flush interval, it is
125    /// considered expired and is removed.
126    ///
127    /// Be sure to configure this value higher than your client’s scrape interval.
128    #[serde(default = "default_flush_period_secs")]
129    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
130    #[configurable(metadata(docs::advanced))]
131    #[configurable(metadata(docs::human_name = "Flush Interval"))]
132    pub flush_period_secs: Duration,
133
134    /// Suppresses timestamps on the Prometheus output.
135    ///
136    /// This can sometimes be useful when the source of metrics leads to their timestamps being too
137    /// far in the past for Prometheus to allow them, such as when aggregating metrics over long
138    /// time periods, or when replaying old metrics from a disk buffer.
139    #[serde(default)]
140    #[configurable(metadata(docs::advanced))]
141    pub suppress_timestamp: bool,
142
143    #[configurable(derived)]
144    #[serde(
145        default,
146        deserialize_with = "crate::serde::bool_or_struct",
147        skip_serializing_if = "crate::serde::is_default"
148    )]
149    pub acknowledgements: AcknowledgementsConfig,
150}
151
152impl Default for PrometheusExporterConfig {
153    fn default() -> Self {
154        Self {
155            default_namespace: None,
156            address: default_address(),
157            auth: None,
158            tls: None,
159            buckets: super::default_histogram_buckets(),
160            quantiles: super::default_summary_quantiles(),
161            distributions_as_summaries: default_distributions_as_summaries(),
162            flush_period_secs: default_flush_period_secs(),
163            suppress_timestamp: default_suppress_timestamp(),
164            acknowledgements: Default::default(),
165        }
166    }
167}
168
169const fn default_address() -> SocketAddr {
170    SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9598)
171}
172
173const fn default_distributions_as_summaries() -> bool {
174    false
175}
176
177const fn default_flush_period_secs() -> Duration {
178    Duration::from_secs(60)
179}
180
181const fn default_suppress_timestamp() -> bool {
182    false
183}
184
185impl GenerateConfig for PrometheusExporterConfig {
186    fn generate_config() -> toml::Value {
187        toml::Value::try_from(Self::default()).unwrap()
188    }
189}
190
191#[async_trait::async_trait]
192#[typetag::serde(name = "prometheus_exporter")]
193impl SinkConfig for PrometheusExporterConfig {
194    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
195        if self.flush_period_secs.as_secs() < MIN_FLUSH_PERIOD_SECS {
196            return Err(Box::new(BuildError::FlushPeriodTooShort {
197                min: MIN_FLUSH_PERIOD_SECS,
198            }));
199        }
200
201        validate_quantiles(&self.quantiles)?;
202
203        let sink = PrometheusExporter::new(self.clone());
204        let healthcheck = future::ok(()).boxed();
205
206        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
207    }
208
209    fn input(&self) -> Input {
210        Input::metric()
211    }
212
213    fn resources(&self) -> Vec<Resource> {
214        vec![Resource::tcp(self.address)]
215    }
216
217    fn acknowledgements(&self) -> &AcknowledgementsConfig {
218        &self.acknowledgements
219    }
220}
221
222struct PrometheusExporter {
223    server_shutdown_trigger: Option<Trigger>,
224    config: PrometheusExporterConfig,
225    metrics: Arc<RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>>,
226}
227
228/// Expiration metadata for a metric.
229#[derive(Clone, Copy, Debug)]
230struct MetricMetadata {
231    expiration_window: Duration,
232    expires_at: Instant,
233}
234
235impl MetricMetadata {
236    pub fn new(expiration_window: Duration) -> Self {
237        Self {
238            expiration_window,
239            expires_at: Instant::now() + expiration_window,
240        }
241    }
242
243    /// Resets the expiration deadline.
244    pub fn refresh(&mut self) {
245        self.expires_at = Instant::now() + self.expiration_window;
246    }
247
248    /// Whether or not the referenced metric has expired yet.
249    pub fn has_expired(&self, now: Instant) -> bool {
250        now >= self.expires_at
251    }
252}
253
254// Composite identifier that uniquely represents a metric.
255//
256// Instead of simply working off of the name (series) alone, we include the metric kind as well as
257// the type (counter, gauge, etc) and any subtype information like histogram buckets.
258//
259// Specifically, though, we do _not_ include the actual metric value.  This type is used
260// specifically to look up the entry in a map for a metric in the sense of "get the metric whose
261// name is X and type is Y and has these tags".
262#[derive(Clone, Debug)]
263struct MetricRef {
264    series: MetricSeries,
265    value: Discriminant<MetricValue>,
266    bounds: Option<Vec<f64>>,
267}
268
269impl MetricRef {
270    /// Creates a `MetricRef` based on the given `Metric`.
271    pub fn from_metric(metric: &Metric) -> Self {
272        // Either the buckets for an aggregated histogram, or the quantiles for an aggregated summary.
273        let bounds = match metric.value() {
274            MetricValue::AggregatedHistogram { buckets, .. } => {
275                Some(buckets.iter().map(|b| b.upper_limit).collect())
276            }
277            MetricValue::AggregatedSummary { quantiles, .. } => {
278                Some(quantiles.iter().map(|q| q.quantile).collect())
279            }
280            _ => None,
281        };
282
283        Self {
284            series: metric.series().clone(),
285            value: discriminant(metric.value()),
286            bounds,
287        }
288    }
289}
290
291impl PartialEq for MetricRef {
292    fn eq(&self, other: &Self) -> bool {
293        self.series == other.series && self.value == other.value && self.bounds == other.bounds
294    }
295}
296
297impl Eq for MetricRef {}
298
299impl Hash for MetricRef {
300    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
301        self.series.hash(state);
302        self.value.hash(state);
303        if let Some(bounds) = &self.bounds {
304            for bound in bounds {
305                bound.to_bits().hash(state);
306            }
307        }
308    }
309}
310
311fn authorized<T: HttpBody>(req: &Request<T>, auth: &Option<Auth>) -> bool {
312    if let Some(auth) = auth {
313        let headers = req.headers();
314        if let Some(auth_header) = headers.get(hyper::header::AUTHORIZATION) {
315            let encoded_credentials = match auth {
316                Auth::Basic { user, password } => Some(HeaderValue::from_str(
317                    format!(
318                        "Basic {}",
319                        BASE64_STANDARD.encode(format!("{}:{}", user, password.inner()))
320                    )
321                    .as_str(),
322                )),
323                Auth::Bearer { token } => Some(HeaderValue::from_str(
324                    format!("Bearer {}", token.inner()).as_str(),
325                )),
326                Auth::Custom { value } => Some(HeaderValue::from_str(value)),
327                #[cfg(feature = "aws-core")]
328                _ => None,
329            };
330
331            if let Some(Ok(encoded_credentials)) = encoded_credentials
332                && auth_header == encoded_credentials
333            {
334                return true;
335            }
336        }
337    } else {
338        return true;
339    }
340
341    false
342}
343
344#[derive(Clone)]
345struct Handler {
346    auth: Option<Auth>,
347    default_namespace: Option<String>,
348    buckets: Box<[f64]>,
349    quantiles: Box<[f64]>,
350    bytes_sent: Registered<BytesSent>,
351    events_sent: Registered<EventsSent>,
352}
353
354impl Handler {
355    fn handle<T: HttpBody>(
356        &self,
357        req: Request<T>,
358        metrics: &RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>,
359    ) -> Response<Body> {
360        let mut response = Response::new(Body::empty());
361
362        match (authorized(&req, &self.auth), req.method(), req.uri().path()) {
363            (false, _, _) => {
364                *response.status_mut() = StatusCode::UNAUTHORIZED;
365                response.headers_mut().insert(
366                    http::header::WWW_AUTHENTICATE,
367                    HeaderValue::from_static("Basic, Bearer"),
368                );
369            }
370
371            (true, &Method::GET, "/metrics") => {
372                let metrics = metrics.read().expect(LOCK_FAILED);
373
374                let count = metrics.len();
375                let byte_size = metrics
376                    .iter()
377                    .map(|(_, (metric, _))| metric.estimated_json_encoded_size_of())
378                    .sum();
379
380                let mut collector = StringCollector::new();
381
382                for (_, (metric, _)) in metrics.iter() {
383                    collector.encode_metric(
384                        self.default_namespace.as_deref(),
385                        &self.buckets,
386                        &self.quantiles,
387                        metric,
388                    );
389                }
390
391                drop(metrics);
392
393                let body = collector.finish();
394                let body_size = body.size_of();
395
396                *response.body_mut() = body.into();
397
398                response.headers_mut().insert(
399                    "Content-Type",
400                    HeaderValue::from_static("text/plain; version=0.0.4"),
401                );
402
403                self.events_sent.emit(CountByteSize(count, byte_size));
404                self.bytes_sent.emit(ByteSize(body_size));
405            }
406
407            (true, _, _) => {
408                *response.status_mut() = StatusCode::NOT_FOUND;
409            }
410        }
411
412        response
413    }
414}
415
416impl PrometheusExporter {
417    fn new(config: PrometheusExporterConfig) -> Self {
418        Self {
419            server_shutdown_trigger: None,
420            config,
421            metrics: Arc::new(RwLock::new(IndexMap::new())),
422        }
423    }
424
425    async fn start_server_if_needed(&mut self) -> crate::Result<()> {
426        if self.server_shutdown_trigger.is_some() {
427            return Ok(());
428        }
429
430        let handler = Handler {
431            bytes_sent: register!(BytesSent::from(Protocol::HTTP)),
432            events_sent: register!(EventsSent::from(Output(None))),
433            default_namespace: self.config.default_namespace.clone(),
434            buckets: self.config.buckets.clone().into(),
435            quantiles: self.config.quantiles.clone().into(),
436            auth: self.config.auth.clone(),
437        };
438
439        let span = Span::current();
440        let metrics = Arc::clone(&self.metrics);
441
442        let new_service = make_service_fn(move |_| {
443            let span = Span::current();
444            let metrics = Arc::clone(&metrics);
445            let handler = handler.clone();
446
447            let inner = service_fn(move |req| {
448                let response = handler.handle(req, &metrics);
449
450                future::ok::<_, Infallible>(response)
451            });
452
453            let service = ServiceBuilder::new()
454                .layer(build_http_trace_layer(span.clone()))
455                .layer(CompressionLayer::new())
456                .service(inner);
457
458            async move { Ok::<_, Infallible>(service) }
459        });
460
461        let (trigger, tripwire) = Tripwire::new();
462
463        let tls = self.config.tls.clone();
464        let address = self.config.address;
465
466        let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?;
467        let listener = tls.bind(&address).await?;
468
469        tokio::spawn(async move {
470            info!(message = "Building HTTP server.", address = %address);
471
472            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
473                .serve(new_service)
474                .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler))
475                .instrument(span)
476                .await
477                .map_err(|error| error!("Server error: {}.", error))?;
478
479            Ok::<(), ()>(())
480        });
481
482        self.server_shutdown_trigger = Some(trigger);
483        Ok(())
484    }
485
486    fn normalize(&mut self, metric: Metric) -> Option<Metric> {
487        let new_metric = match metric.value() {
488            MetricValue::Distribution { .. } => {
489                // Convert the distribution as-is, and then absolute-ify it.
490                let (series, data, metadata) = metric.into_parts();
491                let (time, kind, value) = data.into_parts();
492
493                let new_value = if self.config.distributions_as_summaries {
494                    // We use a sketch when in summary mode because they're actually able to be
495                    // merged and provide correct output, unlike the aggregated summaries that
496                    // we handle from _sources_ like Prometheus.  The collector code itself
497                    // will render sketches as aggregated summaries, so we have continuity there.
498                    value
499                        .distribution_to_sketch()
500                        .expect("value should be distribution already")
501                } else {
502                    value
503                        .distribution_to_agg_histogram(&self.config.buckets)
504                        .expect("value should be distribution already")
505                };
506
507                let data = MetricData::from_parts(time, kind, new_value);
508                Metric::from_parts(series, data, metadata)
509            }
510            _ => metric,
511        };
512
513        match new_metric.kind() {
514            MetricKind::Absolute => Some(new_metric),
515            MetricKind::Incremental => {
516                let metrics = self.metrics.read().expect(LOCK_FAILED);
517                let metric_ref = MetricRef::from_metric(&new_metric);
518
519                if let Some(existing) = metrics.get(&metric_ref) {
520                    let mut current = existing.0.value().clone();
521                    if current.add(new_metric.value()) {
522                        // If we were able to add to the existing value (i.e. they were compatible),
523                        // return the result as an absolute metric.
524                        return Some(new_metric.with_value(current).into_absolute());
525                    }
526                }
527
528                // Otherwise, if we didn't have an existing value or we did and it was not
529                // compatible with the new value, simply return the new value as absolute.
530                Some(new_metric.into_absolute())
531            }
532        }
533    }
534}
535
536#[async_trait]
537impl StreamSink<Event> for PrometheusExporter {
538    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
539        self.start_server_if_needed()
540            .await
541            .map_err(|error| error!("Failed to start Prometheus exporter: {}.", error))?;
542
543        let mut last_flush = Instant::now();
544        let flush_period = self.config.flush_period_secs;
545
546        while let Some(event) = input.next().await {
547            // If we've exceed our flush interval, go through all of the metrics we're currently
548            // tracking and remove any which have exceeded the flush interval in terms of not
549            // having been updated within that long of a time.
550            //
551            // TODO: Can we be smarter about this? As is, we might wait up to 2x the flush period to
552            // remove an expired metric depending on how things line up.  It'd be cool to _check_
553            // for expired metrics more often, but we also don't want to check _way_ too often, like
554            // every second, since then we're constantly iterating through every metric, etc etc.
555            if last_flush.elapsed() > self.config.flush_period_secs {
556                last_flush = Instant::now();
557
558                let mut metrics = self.metrics.write().expect(LOCK_FAILED);
559
560                metrics.retain(|_metric_ref, (_, metadata)| !metadata.has_expired(last_flush));
561            }
562
563            // Now process the metric we got.
564            let mut metric = event.into_metric();
565            let finalizers = metric.take_finalizers();
566
567            match self.normalize(metric) {
568                Some(normalized) => {
569                    let normalized = if self.config.suppress_timestamp {
570                        normalized.with_timestamp(None)
571                    } else {
572                        normalized
573                    };
574
575                    // We have a normalized metric, in absolute form.  If we're already aware of this
576                    // metric, update its expiration deadline, otherwise, start tracking it.
577                    let mut metrics = self.metrics.write().expect(LOCK_FAILED);
578
579                    match metrics.entry(MetricRef::from_metric(&normalized)) {
580                        Entry::Occupied(mut entry) => {
581                            let (data, metadata) = entry.get_mut();
582                            *data = normalized;
583                            metadata.refresh();
584                        }
585                        Entry::Vacant(entry) => {
586                            entry.insert((normalized, MetricMetadata::new(flush_period)));
587                        }
588                    }
589                    finalizers.update_status(EventStatus::Delivered);
590                }
591                _ => {
592                    emit!(PrometheusNormalizationError {});
593                    finalizers.update_status(EventStatus::Errored);
594                }
595            }
596        }
597
598        Ok(())
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use std::io::Read;
605
606    use chrono::{Duration, Utc};
607    use flate2::read::GzDecoder;
608    use futures::stream;
609    use indoc::indoc;
610    use similar_asserts::assert_eq;
611    use tokio::{sync::oneshot::error::TryRecvError, time};
612    use vector_lib::{
613        event::{MetricTags, StatisticKind},
614        finalization::{BatchNotifier, BatchStatus},
615        metric_tags, samples,
616        sensitive_string::SensitiveString,
617    };
618
619    use super::*;
620    use crate::{
621        config::ProxyConfig,
622        event::metric::{Metric, MetricValue},
623        http::HttpClient,
624        sinks::prometheus::{distribution_to_agg_histogram, distribution_to_ddsketch},
625        test_util::{
626            addr::next_addr,
627            components::{SINK_TAGS, run_and_assert_sink_compliance},
628            random_string, trace_init,
629        },
630        tls::MaybeTlsSettings,
631    };
632
633    #[test]
634    fn generate_config() {
635        crate::test_util::test_generate_config::<PrometheusExporterConfig>();
636    }
637
638    #[tokio::test]
639    async fn prometheus_notls() {
640        export_and_fetch_simple(None).await;
641    }
642
643    #[tokio::test]
644    async fn prometheus_tls() {
645        let mut tls_config = TlsEnableableConfig::test_config();
646        tls_config.options.verify_hostname = Some(false);
647        export_and_fetch_simple(Some(tls_config)).await;
648    }
649
650    #[tokio::test]
651    async fn prometheus_noauth() {
652        let (name1, event1) = create_metric_gauge(None, 123.4);
653        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
654        let events = vec![event1, event2];
655
656        let response_result = export_and_fetch_with_auth(None, None, events, false).await;
657
658        assert!(response_result.is_ok());
659
660        let body = response_result.expect("Cannot extract body from the response");
661
662        assert!(body.contains(&format!(
663            indoc! {r#"
664               # HELP {name} {name}
665               # TYPE {name} gauge
666               {name}{{some_tag="some_value"}} 123.4
667            "#},
668            name = name1
669        )));
670        assert!(body.contains(&format!(
671            indoc! {r#"
672               # HELP {name} {name}
673               # TYPE {name} gauge
674               {name}{{some_tag="some_value"}} 3
675            "#},
676            name = name2
677        )));
678    }
679
680    #[tokio::test]
681    async fn prometheus_successful_basic_auth() {
682        let (name1, event1) = create_metric_gauge(None, 123.4);
683        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
684        let events = vec![event1, event2];
685
686        let auth_config = Auth::Basic {
687            user: "user".to_string(),
688            password: SensitiveString::from("password".to_string()),
689        };
690
691        let response_result =
692            export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
693                .await;
694
695        assert!(response_result.is_ok());
696
697        let body = response_result.expect("Cannot extract body from the response");
698
699        assert!(body.contains(&format!(
700            indoc! {r#"
701               # HELP {name} {name}
702               # TYPE {name} gauge
703               {name}{{some_tag="some_value"}} 123.4
704            "#},
705            name = name1
706        )));
707        assert!(body.contains(&format!(
708            indoc! {r#"
709               # HELP {name} {name}
710               # TYPE {name} gauge
711               {name}{{some_tag="some_value"}} 3
712            "#},
713            name = name2
714        )));
715    }
716
717    #[tokio::test]
718    async fn prometheus_successful_token_auth() {
719        let (name1, event1) = create_metric_gauge(None, 123.4);
720        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
721        let events = vec![event1, event2];
722
723        let auth_config = Auth::Bearer {
724            token: SensitiveString::from("token".to_string()),
725        };
726
727        let response_result =
728            export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
729                .await;
730
731        assert!(response_result.is_ok());
732
733        let body = response_result.expect("Cannot extract body from the response");
734
735        assert!(body.contains(&format!(
736            indoc! {r#"
737               # HELP {name} {name}
738               # TYPE {name} gauge
739               {name}{{some_tag="some_value"}} 123.4
740            "#},
741            name = name1
742        )));
743        assert!(body.contains(&format!(
744            indoc! {r#"
745               # HELP {name} {name}
746               # TYPE {name} gauge
747               {name}{{some_tag="some_value"}} 3
748            "#},
749            name = name2
750        )));
751    }
752
753    #[tokio::test]
754    async fn prometheus_missing_auth() {
755        let (_, event1) = create_metric_gauge(None, 123.4);
756        let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
757        let events = vec![event1, event2];
758
759        let server_auth_config = Auth::Bearer {
760            token: SensitiveString::from("token".to_string()),
761        };
762
763        let response_result =
764            export_and_fetch_with_auth(Some(server_auth_config), None, events, false).await;
765
766        assert!(response_result.is_err());
767        assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
768    }
769
770    #[tokio::test]
771    async fn prometheus_wrong_auth() {
772        let (_, event1) = create_metric_gauge(None, 123.4);
773        let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
774        let events = vec![event1, event2];
775
776        let server_auth_config = Auth::Bearer {
777            token: SensitiveString::from("token".to_string()),
778        };
779
780        let client_auth_config = Auth::Basic {
781            user: "user".to_string(),
782            password: SensitiveString::from("password".to_string()),
783        };
784
785        let response_result = export_and_fetch_with_auth(
786            Some(server_auth_config),
787            Some(client_auth_config),
788            events,
789            false,
790        )
791        .await;
792
793        assert!(response_result.is_err());
794        assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
795    }
796
797    #[tokio::test]
798    async fn encoding_gzip() {
799        let (name1, event1) = create_metric_gauge(None, 123.4);
800        let events = vec![event1];
801
802        let body_raw = export_and_fetch_raw(None, events, false, Some(String::from("gzip"))).await;
803        let expected = format!(
804            indoc! {r#"
805                # HELP {name} {name}
806                # TYPE {name} gauge
807                {name}{{some_tag="some_value"}} 123.4
808            "#},
809            name = name1,
810        );
811
812        let mut gz = GzDecoder::new(&body_raw[..]);
813        let mut body_decoded = String::new();
814        let _ = gz.read_to_string(&mut body_decoded);
815
816        assert!(body_raw.len() < expected.len());
817        assert_eq!(body_decoded, expected);
818    }
819
820    #[tokio::test]
821    async fn updates_timestamps() {
822        let timestamp1 = Utc::now();
823        let (name, event1) = create_metric_gauge(None, 123.4);
824        let event1 = Event::from(event1.into_metric().with_timestamp(Some(timestamp1)));
825        let (_, event2) = create_metric_gauge(Some(name.clone()), 12.0);
826        let timestamp2 = timestamp1 + Duration::seconds(1);
827        let event2 = Event::from(event2.into_metric().with_timestamp(Some(timestamp2)));
828        let events = vec![event1, event2];
829
830        let body = export_and_fetch(None, events, false).await;
831        let timestamp = timestamp2.timestamp_millis();
832        assert_eq!(
833            body,
834            format!(
835                indoc! {r#"
836                    # HELP {name} {name}
837                    # TYPE {name} gauge
838                    {name}{{some_tag="some_value"}} 135.4 {timestamp}
839                "#},
840                name = name,
841                timestamp = timestamp
842            )
843        );
844    }
845
846    #[tokio::test]
847    async fn suppress_timestamp() {
848        let timestamp = Utc::now();
849        let (name, event) = create_metric_gauge(None, 123.4);
850        let event = Event::from(event.into_metric().with_timestamp(Some(timestamp)));
851        let events = vec![event];
852
853        let body = export_and_fetch(None, events, true).await;
854        assert_eq!(
855            body,
856            format!(
857                indoc! {r#"
858                    # HELP {name} {name}
859                    # TYPE {name} gauge
860                    {name}{{some_tag="some_value"}} 123.4
861                "#},
862                name = name,
863            )
864        );
865    }
866
867    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
868    /// > Label names MUST be unique within a LabelSet.
869    /// Prometheus itself will reject the metric with an error. Largely to remain backward compatible with older versions of Vector,
870    /// we only publish the last tag in the list.
871    #[tokio::test]
872    async fn prometheus_duplicate_labels() {
873        let (name, event) = create_metric_with_tags(
874            None,
875            MetricValue::Gauge { value: 123.4 },
876            Some(metric_tags!("code" => "200", "code" => "success")),
877        );
878        let events = vec![event];
879
880        let response_result = export_and_fetch_with_auth(None, None, events, false).await;
881
882        assert!(response_result.is_ok());
883
884        let body = response_result.expect("Cannot extract body from the response");
885
886        assert!(body.contains(&format!(
887            indoc! {r#"
888               # HELP {name} {name}
889               # TYPE {name} gauge
890               {name}{{code="success"}} 123.4
891            "# },
892            name = name
893        )));
894    }
895
896    async fn export_and_fetch_raw(
897        tls_config: Option<TlsEnableableConfig>,
898        mut events: Vec<Event>,
899        suppress_timestamp: bool,
900        encoding: Option<String>,
901    ) -> hyper::body::Bytes {
902        trace_init();
903
904        let client_settings = MaybeTlsSettings::from_config(tls_config.as_ref(), false).unwrap();
905        let proto = client_settings.http_protocol_name();
906
907        let (_guard, address) = next_addr();
908        let config = PrometheusExporterConfig {
909            address,
910            tls: tls_config,
911            suppress_timestamp,
912            ..Default::default()
913        };
914
915        // Set up acknowledgement notification
916        let mut receiver = BatchNotifier::apply_to(&mut events[..]);
917        assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
918
919        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
920        let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
921        let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
922            sink,
923            stream::iter(events).chain(stream::once(async move {
924                // Wait a bit to have time to scrape metrics
925                time::sleep(time::Duration::from_millis(500)).await;
926                delayed_event
927            })),
928            &SINK_TAGS,
929        ));
930
931        time::sleep(time::Duration::from_millis(100)).await;
932
933        // Events are marked as delivered as soon as they are aggregated.
934        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
935
936        let mut request = Request::get(format!("{proto}://{address}/metrics"))
937            .body(Body::empty())
938            .expect("Error creating request.");
939
940        if let Some(ref encoding) = encoding {
941            request.headers_mut().insert(
942                http::header::ACCEPT_ENCODING,
943                HeaderValue::from_str(encoding.as_str()).unwrap(),
944            );
945        }
946
947        let proxy = ProxyConfig::default();
948        let result = HttpClient::new(client_settings, &proxy)
949            .unwrap()
950            .send(request)
951            .await
952            .expect("Could not fetch query");
953
954        assert!(result.status().is_success());
955
956        if encoding.is_some() {
957            assert!(
958                result
959                    .headers()
960                    .contains_key(http::header::CONTENT_ENCODING)
961            );
962        }
963
964        let body = result.into_body();
965        let bytes = http_body::Body::collect(body)
966            .await
967            .expect("Reading body failed")
968            .to_bytes();
969
970        sink_handle.await.unwrap();
971
972        bytes
973    }
974
975    async fn export_and_fetch(
976        tls_config: Option<TlsEnableableConfig>,
977        events: Vec<Event>,
978        suppress_timestamp: bool,
979    ) -> String {
980        let bytes = export_and_fetch_raw(tls_config, events, suppress_timestamp, None);
981        String::from_utf8(bytes.await.to_vec()).unwrap()
982    }
983
984    async fn export_and_fetch_with_auth(
985        server_auth_config: Option<Auth>,
986        client_auth_config: Option<Auth>,
987        mut events: Vec<Event>,
988        suppress_timestamp: bool,
989    ) -> Result<String, http::status::StatusCode> {
990        trace_init();
991
992        let client_settings = MaybeTlsSettings::from_config(None, false).unwrap();
993        let proto = client_settings.http_protocol_name();
994
995        let (_guard, address) = next_addr();
996        let config = PrometheusExporterConfig {
997            address,
998            auth: server_auth_config,
999            tls: None,
1000            suppress_timestamp,
1001            ..Default::default()
1002        };
1003
1004        // Set up acknowledgement notification
1005        let mut receiver = BatchNotifier::apply_to(&mut events[..]);
1006        assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
1007
1008        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1009        let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
1010        let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
1011            sink,
1012            stream::iter(events).chain(stream::once(async move {
1013                // Wait a bit to have time to scrape metrics
1014                time::sleep(time::Duration::from_millis(500)).await;
1015                delayed_event
1016            })),
1017            &SINK_TAGS,
1018        ));
1019
1020        time::sleep(time::Duration::from_millis(100)).await;
1021
1022        // Events are marked as delivered as soon as they are aggregated.
1023        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
1024
1025        let mut request = Request::get(format!("{proto}://{address}/metrics"))
1026            .body(Body::empty())
1027            .expect("Error creating request.");
1028
1029        if let Some(client_auth_config) = client_auth_config {
1030            client_auth_config.apply(&mut request);
1031        }
1032
1033        let proxy = ProxyConfig::default();
1034        let result = HttpClient::new(client_settings, &proxy)
1035            .unwrap()
1036            .send(request)
1037            .await
1038            .expect("Could not fetch query");
1039
1040        if !result.status().is_success() {
1041            return Err(result.status());
1042        }
1043
1044        let body = result.into_body();
1045        let bytes = http_body::Body::collect(body)
1046            .await
1047            .expect("Reading body failed")
1048            .to_bytes();
1049        let result = String::from_utf8(bytes.to_vec()).unwrap();
1050
1051        sink_handle.await.unwrap();
1052
1053        Ok(result)
1054    }
1055
1056    async fn export_and_fetch_simple(tls_config: Option<TlsEnableableConfig>) {
1057        let (name1, event1) = create_metric_gauge(None, 123.4);
1058        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1059        let events = vec![event1, event2];
1060
1061        let body = export_and_fetch(tls_config, events, false).await;
1062
1063        assert!(body.contains(&format!(
1064            indoc! {r#"
1065               # HELP {name} {name}
1066               # TYPE {name} gauge
1067               {name}{{some_tag="some_value"}} 123.4
1068            "#},
1069            name = name1
1070        )));
1071        assert!(body.contains(&format!(
1072            indoc! {r#"
1073               # HELP {name} {name}
1074               # TYPE {name} gauge
1075               {name}{{some_tag="some_value"}} 3
1076            "#},
1077            name = name2
1078        )));
1079    }
1080
1081    pub fn create_metric_gauge(name: Option<String>, value: f64) -> (String, Event) {
1082        create_metric(name, MetricValue::Gauge { value })
1083    }
1084
1085    pub fn create_metric_set(name: Option<String>, values: Vec<&'static str>) -> (String, Event) {
1086        create_metric(
1087            name,
1088            MetricValue::Set {
1089                values: values.into_iter().map(Into::into).collect(),
1090            },
1091        )
1092    }
1093
1094    fn create_metric(name: Option<String>, value: MetricValue) -> (String, Event) {
1095        create_metric_with_tags(name, value, Some(metric_tags!("some_tag" => "some_value")))
1096    }
1097
1098    fn create_metric_with_tags(
1099        name: Option<String>,
1100        value: MetricValue,
1101        tags: Option<MetricTags>,
1102    ) -> (String, Event) {
1103        let name = name.unwrap_or_else(|| format!("vector_set_{}", random_string(16)));
1104        let event = Metric::new(name.clone(), MetricKind::Incremental, value)
1105            .with_tags(tags)
1106            .into();
1107        (name, event)
1108    }
1109
1110    #[tokio::test]
1111    async fn sink_absolute() {
1112        let (_guard, address) = next_addr();
1113        let config = PrometheusExporterConfig {
1114            address,
1115            tls: None,
1116            ..Default::default()
1117        };
1118
1119        let sink = PrometheusExporter::new(config);
1120
1121        let m1 = Metric::new(
1122            "absolute",
1123            MetricKind::Absolute,
1124            MetricValue::Counter { value: 32. },
1125        )
1126        .with_tags(Some(metric_tags!("tag1" => "value1")));
1127
1128        let m2 = m1.clone().with_tags(Some(metric_tags!("tag1" => "value2")));
1129
1130        let events = vec![
1131            Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 32. })),
1132            Event::Metric(m2.clone().with_value(MetricValue::Counter { value: 33. })),
1133            Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 40. })),
1134        ];
1135
1136        let metrics_handle = Arc::clone(&sink.metrics);
1137
1138        let sink = VectorSink::from_event_streamsink(sink);
1139        let input_events = stream::iter(events).map(Into::into);
1140        sink.run(input_events).await.unwrap();
1141
1142        let metrics_after = metrics_handle.read().unwrap();
1143
1144        let expected_m1 = metrics_after
1145            .get(&MetricRef::from_metric(&m1))
1146            .expect("m1 should exist");
1147        let expected_m1_value = MetricValue::Counter { value: 40. };
1148        assert_eq!(expected_m1.0.value(), &expected_m1_value);
1149
1150        let expected_m2 = metrics_after
1151            .get(&MetricRef::from_metric(&m2))
1152            .expect("m2 should exist");
1153        let expected_m2_value = MetricValue::Counter { value: 33. };
1154        assert_eq!(expected_m2.0.value(), &expected_m2_value);
1155    }
1156
1157    #[tokio::test]
1158    async fn sink_distributions_as_histograms() {
1159        // When we get summary distributions, unless we've been configured to actually emit
1160        // summaries for distributions, we just forcefully turn them into histograms.  This is
1161        // simpler and uses less memory, as aggregated histograms are better supported by Prometheus
1162        // since they can actually be aggregated anywhere in the pipeline -- so long as the buckets
1163        // are the same -- without loss of accuracy.
1164
1165        // This expects that the default for the sink is to render distributions as aggregated histograms.
1166        let (_guard, address) = next_addr();
1167        let config = PrometheusExporterConfig {
1168            address,
1169            tls: None,
1170            ..Default::default()
1171        };
1172        let buckets = config.buckets.clone();
1173
1174        let sink = PrometheusExporter::new(config);
1175
1176        // Define a series of incremental distribution updates.
1177        let base_summary_metric = Metric::new(
1178            "distrib_summary",
1179            MetricKind::Incremental,
1180            MetricValue::Distribution {
1181                statistic: StatisticKind::Summary,
1182                samples: samples!(1.0 => 1, 3.0 => 2),
1183            },
1184        );
1185
1186        let base_histogram_metric = Metric::new(
1187            "distrib_histo",
1188            MetricKind::Incremental,
1189            MetricValue::Distribution {
1190                statistic: StatisticKind::Histogram,
1191                samples: samples!(7.0 => 1, 9.0 => 2),
1192            },
1193        );
1194
1195        let metrics = [
1196            base_summary_metric.clone(),
1197            base_summary_metric
1198                .clone()
1199                .with_value(MetricValue::Distribution {
1200                    statistic: StatisticKind::Summary,
1201                    samples: samples!(1.0 => 2, 2.9 => 1),
1202                }),
1203            base_summary_metric
1204                .clone()
1205                .with_value(MetricValue::Distribution {
1206                    statistic: StatisticKind::Summary,
1207                    samples: samples!(1.0 => 4, 3.2 => 1),
1208                }),
1209            base_histogram_metric.clone(),
1210            base_histogram_metric
1211                .clone()
1212                .with_value(MetricValue::Distribution {
1213                    statistic: StatisticKind::Histogram,
1214                    samples: samples!(7.0 => 2, 9.9 => 1),
1215                }),
1216            base_histogram_metric
1217                .clone()
1218                .with_value(MetricValue::Distribution {
1219                    statistic: StatisticKind::Histogram,
1220                    samples: samples!(7.0 => 4, 10.2 => 1),
1221                }),
1222        ];
1223
1224        // Figure out what the merged distributions should add up to.
1225        let mut merged_summary = base_summary_metric.clone();
1226        assert!(merged_summary.update(&metrics[1]));
1227        assert!(merged_summary.update(&metrics[2]));
1228        let expected_summary = distribution_to_agg_histogram(merged_summary, &buckets)
1229            .expect("input summary metric should have been distribution")
1230            .into_absolute();
1231
1232        let mut merged_histogram = base_histogram_metric.clone();
1233        assert!(merged_histogram.update(&metrics[4]));
1234        assert!(merged_histogram.update(&metrics[5]));
1235        let expected_histogram = distribution_to_agg_histogram(merged_histogram, &buckets)
1236            .expect("input histogram metric should have been distribution")
1237            .into_absolute();
1238
1239        // TODO: make a new metric based on merged_distrib_histogram, with expected_histogram_value,
1240        // so that the discriminant matches and our lookup in the indexmap can actually find it
1241
1242        // Now run the events through the sink and see what ends up in the internal metric map.
1243        let metrics_handle = Arc::clone(&sink.metrics);
1244
1245        let events = metrics
1246            .iter()
1247            .cloned()
1248            .map(Event::Metric)
1249            .collect::<Vec<_>>();
1250
1251        let sink = VectorSink::from_event_streamsink(sink);
1252        let input_events = stream::iter(events).map(Into::into);
1253        sink.run(input_events).await.unwrap();
1254
1255        let metrics_after = metrics_handle.read().unwrap();
1256
1257        // Both metrics should be present, and both should be aggregated histograms.
1258        assert_eq!(metrics_after.len(), 2);
1259
1260        let actual_summary = metrics_after
1261            .get(&MetricRef::from_metric(&expected_summary))
1262            .expect("summary metric should exist");
1263        assert_eq!(actual_summary.0.value(), expected_summary.value());
1264
1265        let actual_histogram = metrics_after
1266            .get(&MetricRef::from_metric(&expected_histogram))
1267            .expect("histogram metric should exist");
1268        assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1269    }
1270
1271    #[tokio::test]
1272    async fn sink_distributions_as_summaries() {
1273        // When we get summary distributions, unless we've been configured to actually emit
1274        // summaries for distributions, we just forcefully turn them into histograms.  This is
1275        // simpler and uses less memory, as aggregated histograms are better supported by Prometheus
1276        // since they can actually be aggregated anywhere in the pipeline -- so long as the buckets
1277        // are the same -- without loss of accuracy.
1278
1279        // This assumes that when we turn on `distributions_as_summaries`, we'll get aggregated
1280        // summaries from distributions.  This is technically true, but the way this test works is
1281        // that we check the internal metric data, which, when in this mode, will actually be a
1282        // sketch (so that we can merge without loss of accuracy).
1283        //
1284        // The render code is actually what will end up rendering those sketches as aggregated
1285        // summaries in the scrape output.
1286        let (_guard, address) = next_addr();
1287        let config = PrometheusExporterConfig {
1288            address,
1289            tls: None,
1290            distributions_as_summaries: true,
1291            ..Default::default()
1292        };
1293
1294        let sink = PrometheusExporter::new(config);
1295
1296        // Define a series of incremental distribution updates.
1297        let base_summary_metric = Metric::new(
1298            "distrib_summary",
1299            MetricKind::Incremental,
1300            MetricValue::Distribution {
1301                statistic: StatisticKind::Summary,
1302                samples: samples!(1.0 => 1, 3.0 => 2),
1303            },
1304        );
1305
1306        let base_histogram_metric = Metric::new(
1307            "distrib_histo",
1308            MetricKind::Incremental,
1309            MetricValue::Distribution {
1310                statistic: StatisticKind::Histogram,
1311                samples: samples!(7.0 => 1, 9.0 => 2),
1312            },
1313        );
1314
1315        let metrics = [
1316            base_summary_metric.clone(),
1317            base_summary_metric
1318                .clone()
1319                .with_value(MetricValue::Distribution {
1320                    statistic: StatisticKind::Summary,
1321                    samples: samples!(1.0 => 2, 2.9 => 1),
1322                }),
1323            base_summary_metric
1324                .clone()
1325                .with_value(MetricValue::Distribution {
1326                    statistic: StatisticKind::Summary,
1327                    samples: samples!(1.0 => 4, 3.2 => 1),
1328                }),
1329            base_histogram_metric.clone(),
1330            base_histogram_metric
1331                .clone()
1332                .with_value(MetricValue::Distribution {
1333                    statistic: StatisticKind::Histogram,
1334                    samples: samples!(7.0 => 2, 9.9 => 1),
1335                }),
1336            base_histogram_metric
1337                .clone()
1338                .with_value(MetricValue::Distribution {
1339                    statistic: StatisticKind::Histogram,
1340                    samples: samples!(7.0 => 4, 10.2 => 1),
1341                }),
1342        ];
1343
1344        // Figure out what the merged distributions should add up to.
1345        let mut merged_summary = base_summary_metric.clone();
1346        assert!(merged_summary.update(&metrics[1]));
1347        assert!(merged_summary.update(&metrics[2]));
1348        let expected_summary = distribution_to_ddsketch(merged_summary)
1349            .expect("input summary metric should have been distribution")
1350            .into_absolute();
1351
1352        let mut merged_histogram = base_histogram_metric.clone();
1353        assert!(merged_histogram.update(&metrics[4]));
1354        assert!(merged_histogram.update(&metrics[5]));
1355        let expected_histogram = distribution_to_ddsketch(merged_histogram)
1356            .expect("input histogram metric should have been distribution")
1357            .into_absolute();
1358
1359        // Now run the events through the sink and see what ends up in the internal metric map.
1360        let metrics_handle = Arc::clone(&sink.metrics);
1361
1362        let events = metrics
1363            .iter()
1364            .cloned()
1365            .map(Event::Metric)
1366            .collect::<Vec<_>>();
1367
1368        let sink = VectorSink::from_event_streamsink(sink);
1369        let input_events = stream::iter(events).map(Into::into);
1370        sink.run(input_events).await.unwrap();
1371
1372        let metrics_after = metrics_handle.read().unwrap();
1373
1374        // Both metrics should be present, and both should be aggregated histograms.
1375        assert_eq!(metrics_after.len(), 2);
1376
1377        let actual_summary = metrics_after
1378            .get(&MetricRef::from_metric(&expected_summary))
1379            .expect("summary metric should exist");
1380        assert_eq!(actual_summary.0.value(), expected_summary.value());
1381
1382        let actual_histogram = metrics_after
1383            .get(&MetricRef::from_metric(&expected_histogram))
1384            .expect("histogram metric should exist");
1385        assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1386    }
1387
1388    #[tokio::test]
1389    async fn sink_gauge_incremental_absolute_mix() {
1390        // Because Prometheus does not, itself, have the concept of an Incremental metric, the
1391        // Exporter must apply a normalization function that converts all metrics to Absolute ones
1392        // before handling them.
1393
1394        // This test ensures that this normalization works correctly when applied to a mix of both
1395        // Incremental and Absolute inputs.
1396        let (_guard, address) = next_addr();
1397        let config = PrometheusExporterConfig {
1398            address,
1399            tls: None,
1400            ..Default::default()
1401        };
1402
1403        let sink = PrometheusExporter::new(config);
1404
1405        let base_absolute_gauge_metric = Metric::new(
1406            "gauge",
1407            MetricKind::Absolute,
1408            MetricValue::Gauge { value: 100.0 },
1409        );
1410
1411        let base_incremental_gauge_metric = Metric::new(
1412            "gauge",
1413            MetricKind::Incremental,
1414            MetricValue::Gauge { value: -10.0 },
1415        );
1416
1417        let metrics = [
1418            base_absolute_gauge_metric.clone(),
1419            base_absolute_gauge_metric
1420                .clone()
1421                .with_value(MetricValue::Gauge { value: 333.0 }),
1422            base_incremental_gauge_metric.clone(),
1423            base_incremental_gauge_metric
1424                .clone()
1425                .with_value(MetricValue::Gauge { value: 4.0 }),
1426        ];
1427
1428        // Now run the events through the sink and see what ends up in the internal metric map.
1429        let metrics_handle = Arc::clone(&sink.metrics);
1430
1431        let events = metrics
1432            .iter()
1433            .cloned()
1434            .map(Event::Metric)
1435            .collect::<Vec<_>>();
1436
1437        let sink = VectorSink::from_event_streamsink(sink);
1438        let input_events = stream::iter(events).map(Into::into);
1439        sink.run(input_events).await.unwrap();
1440
1441        let metrics_after = metrics_handle.read().unwrap();
1442
1443        // The gauge metric should be present.
1444        assert_eq!(metrics_after.len(), 1);
1445
1446        let expected_gauge = Metric::new(
1447            "gauge",
1448            MetricKind::Absolute,
1449            MetricValue::Gauge { value: 327.0 },
1450        );
1451
1452        let actual_gauge = metrics_after
1453            .get(&MetricRef::from_metric(&expected_gauge))
1454            .expect("gauge metric should exist");
1455        assert_eq!(actual_gauge.0.value(), expected_gauge.value());
1456    }
1457}
1458
1459#[cfg(all(test, feature = "prometheus-integration-tests"))]
1460mod integration_tests {
1461    #![allow(clippy::print_stdout)] // tests
1462    #![allow(clippy::print_stderr)] // tests
1463    #![allow(clippy::dbg_macro)] // tests
1464
1465    use chrono::Utc;
1466    use futures::{future::ready, stream};
1467    use serde_json::Value;
1468    use tokio::{sync::mpsc, time};
1469    use tokio_stream::wrappers::UnboundedReceiverStream;
1470
1471    use super::*;
1472    use crate::{
1473        config::ProxyConfig,
1474        http::HttpClient,
1475        test_util::{
1476            components::{SINK_TAGS, run_and_assert_sink_compliance},
1477            trace_init,
1478        },
1479    };
1480
1481    fn sink_exporter_address() -> String {
1482        std::env::var("SINK_EXPORTER_ADDRESS").unwrap_or_else(|_| "127.0.0.1:9101".into())
1483    }
1484
1485    fn prometheus_address() -> String {
1486        std::env::var("PROMETHEUS_ADDRESS").unwrap_or_else(|_| "localhost:9090".into())
1487    }
1488
1489    async fn fetch_exporter_body() -> String {
1490        let url = format!("http://{}/metrics", sink_exporter_address());
1491        let request = Request::get(url)
1492            .body(Body::empty())
1493            .expect("Error creating request.");
1494        let proxy = ProxyConfig::default();
1495        let result = HttpClient::new(None, &proxy)
1496            .unwrap()
1497            .send(request)
1498            .await
1499            .expect("Could not send request");
1500        let result = http_body::Body::collect(result.into_body())
1501            .await
1502            .expect("Error fetching body")
1503            .to_bytes();
1504        String::from_utf8_lossy(&result).to_string()
1505    }
1506
1507    async fn prometheus_query(query: &str) -> Value {
1508        let url = format!(
1509            "http://{}/api/v1/query?query={}",
1510            prometheus_address(),
1511            query
1512        );
1513        let request = Request::post(url)
1514            .body(Body::empty())
1515            .expect("Error creating request.");
1516        let proxy = ProxyConfig::default();
1517        let result = HttpClient::new(None, &proxy)
1518            .unwrap()
1519            .send(request)
1520            .await
1521            .expect("Could not fetch query");
1522        let result = http_body::Body::collect(result.into_body())
1523            .await
1524            .expect("Error fetching body")
1525            .to_bytes();
1526        let result = String::from_utf8_lossy(&result);
1527        serde_json::from_str(result.as_ref()).expect("Invalid JSON from prometheus")
1528    }
1529
1530    #[tokio::test]
1531    async fn prometheus_metrics() {
1532        trace_init();
1533
1534        prometheus_scrapes_metrics().await;
1535        time::sleep(time::Duration::from_millis(500)).await;
1536        reset_on_flush_period().await;
1537        expire_on_flush_period().await;
1538    }
1539
1540    async fn prometheus_scrapes_metrics() {
1541        let start = Utc::now().timestamp();
1542
1543        let config = PrometheusExporterConfig {
1544            address: sink_exporter_address().parse().unwrap(),
1545            flush_period_secs: Duration::from_secs(2),
1546            ..Default::default()
1547        };
1548        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1549        let (name, event) = tests::create_metric_gauge(None, 123.4);
1550        let (_, delayed_event) = tests::create_metric_gauge(Some("delayed".to_string()), 123.4);
1551
1552        run_and_assert_sink_compliance(
1553            sink,
1554            stream::once(ready(event)).chain(stream::once(async move {
1555                // Wait a bit for the prometheus server to scrape the metrics
1556                time::sleep(time::Duration::from_secs(2)).await;
1557                delayed_event
1558            })),
1559            &SINK_TAGS,
1560        )
1561        .await;
1562
1563        // Now try to download them from prometheus
1564        let result = prometheus_query(&name).await;
1565
1566        let data = &result["data"]["result"][0];
1567        assert_eq!(data["metric"]["__name__"], Value::String(name));
1568        assert_eq!(
1569            data["metric"]["instance"],
1570            Value::String(sink_exporter_address())
1571        );
1572        assert_eq!(
1573            data["metric"]["some_tag"],
1574            Value::String("some_value".into())
1575        );
1576        assert!(data["value"][0].as_f64().unwrap() >= start as f64);
1577        assert_eq!(data["value"][1], Value::String("123.4".into()));
1578    }
1579
1580    async fn reset_on_flush_period() {
1581        let config = PrometheusExporterConfig {
1582            address: sink_exporter_address().parse().unwrap(),
1583            flush_period_secs: Duration::from_secs(3),
1584            ..Default::default()
1585        };
1586        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1587        let (tx, rx) = mpsc::unbounded_channel();
1588        let input_events = UnboundedReceiverStream::new(rx);
1589
1590        let input_events = input_events.map(Into::into);
1591        let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1592
1593        // Create two sets with different names but the same size.
1594        let (name1, event) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1595        tx.send(event).expect("Failed to send.");
1596        let (name2, event) = tests::create_metric_set(None, vec!["3", "4", "5"]);
1597        tx.send(event).expect("Failed to send.");
1598
1599        // Wait for the Prometheus server to scrape them, and then query it to ensure both metrics
1600        // have their correct set size value.
1601        time::sleep(time::Duration::from_secs(2)).await;
1602
1603        // Now query Prometheus to make sure we see them there.
1604        let result = prometheus_query(&name1).await;
1605        assert_eq!(
1606            result["data"]["result"][0]["value"][1],
1607            Value::String("3".into())
1608        );
1609        let result = prometheus_query(&name2).await;
1610        assert_eq!(
1611            result["data"]["result"][0]["value"][1],
1612            Value::String("3".into())
1613        );
1614
1615        // Wait a few more seconds to ensure that the two original sets have logically expired.
1616        // We'll update `name2` but not `name1`, which should lead to both being expired, but
1617        // `name2` being recreated with two values only, while `name1` is entirely gone.
1618        time::sleep(time::Duration::from_secs(3)).await;
1619
1620        let (name2, event) = tests::create_metric_set(Some(name2), vec!["8", "9"]);
1621        tx.send(event).expect("Failed to send.");
1622
1623        // Again, wait for the Prometheus server to scrape the metrics, and then query it again.
1624        time::sleep(time::Duration::from_secs(2)).await;
1625        let result = prometheus_query(&name1).await;
1626        assert_eq!(result["data"]["result"][0]["value"][1], Value::Null);
1627        let result = prometheus_query(&name2).await;
1628        assert_eq!(
1629            result["data"]["result"][0]["value"][1],
1630            Value::String("2".into())
1631        );
1632
1633        drop(tx);
1634        sink_handle.await.unwrap();
1635    }
1636
1637    async fn expire_on_flush_period() {
1638        let config = PrometheusExporterConfig {
1639            address: sink_exporter_address().parse().unwrap(),
1640            flush_period_secs: Duration::from_secs(3),
1641            ..Default::default()
1642        };
1643        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1644        let (tx, rx) = mpsc::unbounded_channel();
1645        let input_events = UnboundedReceiverStream::new(rx);
1646
1647        let input_events = input_events.map(Into::into);
1648        let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1649
1650        // metrics that will not be updated for a full flush period and therefore should expire
1651        let (name1, event) = tests::create_metric_set(None, vec!["42"]);
1652        tx.send(event).expect("Failed to send.");
1653        let (name2, event) = tests::create_metric_gauge(None, 100.0);
1654        tx.send(event).expect("Failed to send.");
1655
1656        // Wait a bit for the sink to process the events
1657        time::sleep(time::Duration::from_secs(1)).await;
1658
1659        // Exporter should present both metrics at first
1660        let body = fetch_exporter_body().await;
1661        assert!(body.contains(&name1));
1662        assert!(body.contains(&name2));
1663
1664        // Wait long enough to put us past flush_period_secs for the metric that wasn't updated
1665        for _ in 0..7 {
1666            // Update the first metric, ensuring it doesn't expire
1667            let (_, event) = tests::create_metric_set(Some(name1.clone()), vec!["43"]);
1668            tx.send(event).expect("Failed to send.");
1669
1670            // Wait a bit for time to pass
1671            time::sleep(time::Duration::from_secs(1)).await;
1672        }
1673
1674        // Exporter should present only the one that got updated
1675        let body = fetch_exporter_body().await;
1676        assert!(body.contains(&name1));
1677        assert!(!body.contains(&name2));
1678
1679        drop(tx);
1680        sink_handle.await.unwrap();
1681    }
1682}