vector/sinks/prometheus/
exporter.rs

1use std::{
2    convert::Infallible,
3    hash::Hash,
4    mem::{discriminant, Discriminant},
5    net::{IpAddr, Ipv4Addr, SocketAddr},
6    sync::{Arc, RwLock},
7    time::{Duration, Instant},
8};
9
10use async_trait::async_trait;
11use base64::prelude::{Engine as _, BASE64_STANDARD};
12use futures::{future, stream::BoxStream, FutureExt, StreamExt};
13use hyper::{
14    body::HttpBody,
15    header::HeaderValue,
16    service::{make_service_fn, service_fn},
17    Body, Method, Request, Response, Server, StatusCode,
18};
19use indexmap::{map::Entry, IndexMap};
20use serde_with::serde_as;
21use snafu::Snafu;
22use stream_cancel::{Trigger, Tripwire};
23use tower::ServiceBuilder;
24use tower_http::compression::CompressionLayer;
25use tracing::{Instrument, Span};
26use vector_lib::configurable::configurable_component;
27use vector_lib::{
28    internal_event::{
29        ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle as _, Output, Protocol,
30        Registered,
31    },
32    ByteSizeOf, EstimatedJsonEncodedSizeOf,
33};
34
35use super::collector::{MetricCollector, StringCollector};
36use crate::{
37    config::{AcknowledgementsConfig, GenerateConfig, Input, Resource, SinkConfig, SinkContext},
38    event::{
39        metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue},
40        Event, EventStatus, Finalizable,
41    },
42    http::{build_http_trace_layer, Auth},
43    internal_events::PrometheusNormalizationError,
44    sinks::{
45        util::{statistic::validate_quantiles, StreamSink},
46        Healthcheck, VectorSink,
47    },
48    tls::{MaybeTlsSettings, TlsEnableableConfig},
49};
50
51const MIN_FLUSH_PERIOD_SECS: u64 = 1;
52
53const LOCK_FAILED: &str = "Prometheus exporter data lock is poisoned";
54
55#[derive(Debug, Snafu)]
56enum BuildError {
57    #[snafu(display("Flush period for sets must be greater or equal to {} secs", min))]
58    FlushPeriodTooShort { min: u64 },
59}
60
61/// Configuration for the `prometheus_exporter` sink.
62#[serde_as]
63#[configurable_component(sink(
64    "prometheus_exporter",
65    "Expose metric events on a Prometheus compatible endpoint."
66))]
67#[derive(Clone, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct PrometheusExporterConfig {
70    /// The default namespace for any metrics sent.
71    ///
72    /// This namespace is only used if a metric has no existing namespace. When a namespace is
73    /// present, it is used as a prefix to the metric name, and separated with an underscore (`_`).
74    ///
75    /// It should follow the Prometheus [naming conventions][prom_naming_docs].
76    ///
77    /// [prom_naming_docs]: https://prometheus.io/docs/practices/naming/#metric-names
78    #[serde(alias = "namespace")]
79    #[configurable(metadata(docs::advanced))]
80    pub default_namespace: Option<String>,
81
82    /// The address to expose for scraping.
83    ///
84    /// The metrics are exposed at the typical Prometheus exporter path, `/metrics`.
85    #[serde(default = "default_address")]
86    #[configurable(metadata(docs::examples = "192.160.0.10:9598"))]
87    pub address: SocketAddr,
88
89    #[configurable(derived)]
90    pub auth: Option<Auth>,
91
92    #[configurable(derived)]
93    pub tls: Option<TlsEnableableConfig>,
94
95    /// Default buckets to use for aggregating [distribution][dist_metric_docs] metrics into histograms.
96    ///
97    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
98    #[serde(default = "super::default_histogram_buckets")]
99    #[configurable(metadata(docs::advanced))]
100    pub buckets: Vec<f64>,
101
102    /// Quantiles to use for aggregating [distribution][dist_metric_docs] metrics into a summary.
103    ///
104    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
105    #[serde(default = "super::default_summary_quantiles")]
106    #[configurable(metadata(docs::advanced))]
107    pub quantiles: Vec<f64>,
108
109    /// Whether or not to render [distributions][dist_metric_docs] as an [aggregated histogram][prom_agg_hist_docs] or  [aggregated summary][prom_agg_summ_docs].
110    ///
111    /// While distributions as a lossless way to represent a set of samples for a
112    /// metric is supported, Prometheus clients (the application being scraped, which is this sink) must
113    /// aggregate locally into either an aggregated histogram or aggregated summary.
114    ///
115    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
116    /// [prom_agg_hist_docs]: https://prometheus.io/docs/concepts/metric_types/#histogram
117    /// [prom_agg_summ_docs]: https://prometheus.io/docs/concepts/metric_types/#summary
118    #[serde(default = "default_distributions_as_summaries")]
119    #[configurable(metadata(docs::advanced))]
120    pub distributions_as_summaries: bool,
121
122    /// The interval, in seconds, on which metrics are flushed.
123    ///
124    /// On the flush interval, if a metric has not been seen since the last flush interval, it is
125    /// considered expired and is removed.
126    ///
127    /// Be sure to configure this value higher than your client’s scrape interval.
128    #[serde(default = "default_flush_period_secs")]
129    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
130    #[configurable(metadata(docs::advanced))]
131    #[configurable(metadata(docs::human_name = "Flush Interval"))]
132    pub flush_period_secs: Duration,
133
134    /// Suppresses timestamps on the Prometheus output.
135    ///
136    /// This can sometimes be useful when the source of metrics leads to their timestamps being too
137    /// far in the past for Prometheus to allow them, such as when aggregating metrics over long
138    /// time periods, or when replaying old metrics from a disk buffer.
139    #[serde(default)]
140    #[configurable(metadata(docs::advanced))]
141    pub suppress_timestamp: bool,
142
143    #[configurable(derived)]
144    #[serde(
145        default,
146        deserialize_with = "crate::serde::bool_or_struct",
147        skip_serializing_if = "crate::serde::is_default"
148    )]
149    pub acknowledgements: AcknowledgementsConfig,
150}
151
152impl Default for PrometheusExporterConfig {
153    fn default() -> Self {
154        Self {
155            default_namespace: None,
156            address: default_address(),
157            auth: None,
158            tls: None,
159            buckets: super::default_histogram_buckets(),
160            quantiles: super::default_summary_quantiles(),
161            distributions_as_summaries: default_distributions_as_summaries(),
162            flush_period_secs: default_flush_period_secs(),
163            suppress_timestamp: default_suppress_timestamp(),
164            acknowledgements: Default::default(),
165        }
166    }
167}
168
169const fn default_address() -> SocketAddr {
170    SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9598)
171}
172
173const fn default_distributions_as_summaries() -> bool {
174    false
175}
176
177const fn default_flush_period_secs() -> Duration {
178    Duration::from_secs(60)
179}
180
181const fn default_suppress_timestamp() -> bool {
182    false
183}
184
185impl GenerateConfig for PrometheusExporterConfig {
186    fn generate_config() -> toml::Value {
187        toml::Value::try_from(Self::default()).unwrap()
188    }
189}
190
191#[async_trait::async_trait]
192#[typetag::serde(name = "prometheus_exporter")]
193impl SinkConfig for PrometheusExporterConfig {
194    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
195        if self.flush_period_secs.as_secs() < MIN_FLUSH_PERIOD_SECS {
196            return Err(Box::new(BuildError::FlushPeriodTooShort {
197                min: MIN_FLUSH_PERIOD_SECS,
198            }));
199        }
200
201        validate_quantiles(&self.quantiles)?;
202
203        let sink = PrometheusExporter::new(self.clone());
204        let healthcheck = future::ok(()).boxed();
205
206        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
207    }
208
209    fn input(&self) -> Input {
210        Input::metric()
211    }
212
213    fn resources(&self) -> Vec<Resource> {
214        vec![Resource::tcp(self.address)]
215    }
216
217    fn acknowledgements(&self) -> &AcknowledgementsConfig {
218        &self.acknowledgements
219    }
220}
221
222struct PrometheusExporter {
223    server_shutdown_trigger: Option<Trigger>,
224    config: PrometheusExporterConfig,
225    metrics: Arc<RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>>,
226}
227
228/// Expiration metadata for a metric.
229#[derive(Clone, Copy, Debug)]
230struct MetricMetadata {
231    expiration_window: Duration,
232    expires_at: Instant,
233}
234
235impl MetricMetadata {
236    pub fn new(expiration_window: Duration) -> Self {
237        Self {
238            expiration_window,
239            expires_at: Instant::now() + expiration_window,
240        }
241    }
242
243    /// Resets the expiration deadline.
244    pub fn refresh(&mut self) {
245        self.expires_at = Instant::now() + self.expiration_window;
246    }
247
248    /// Whether or not the referenced metric has expired yet.
249    pub fn has_expired(&self, now: Instant) -> bool {
250        now >= self.expires_at
251    }
252}
253
254// Composite identifier that uniquely represents a metric.
255//
256// Instead of simply working off of the name (series) alone, we include the metric kind as well as
257// the type (counter, gauge, etc) and any subtype information like histogram buckets.
258//
259// Specifically, though, we do _not_ include the actual metric value.  This type is used
260// specifically to look up the entry in a map for a metric in the sense of "get the metric whose
261// name is X and type is Y and has these tags".
262#[derive(Clone, Debug)]
263struct MetricRef {
264    series: MetricSeries,
265    value: Discriminant<MetricValue>,
266    bounds: Option<Vec<f64>>,
267}
268
269impl MetricRef {
270    /// Creates a `MetricRef` based on the given `Metric`.
271    pub fn from_metric(metric: &Metric) -> Self {
272        // Either the buckets for an aggregated histogram, or the quantiles for an aggregated summary.
273        let bounds = match metric.value() {
274            MetricValue::AggregatedHistogram { buckets, .. } => {
275                Some(buckets.iter().map(|b| b.upper_limit).collect())
276            }
277            MetricValue::AggregatedSummary { quantiles, .. } => {
278                Some(quantiles.iter().map(|q| q.quantile).collect())
279            }
280            _ => None,
281        };
282
283        Self {
284            series: metric.series().clone(),
285            value: discriminant(metric.value()),
286            bounds,
287        }
288    }
289}
290
291impl PartialEq for MetricRef {
292    fn eq(&self, other: &Self) -> bool {
293        self.series == other.series && self.value == other.value && self.bounds == other.bounds
294    }
295}
296
297impl Eq for MetricRef {}
298
299impl Hash for MetricRef {
300    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
301        self.series.hash(state);
302        self.value.hash(state);
303        if let Some(bounds) = &self.bounds {
304            for bound in bounds {
305                bound.to_bits().hash(state);
306            }
307        }
308    }
309}
310
311fn authorized<T: HttpBody>(req: &Request<T>, auth: &Option<Auth>) -> bool {
312    if let Some(auth) = auth {
313        let headers = req.headers();
314        if let Some(auth_header) = headers.get(hyper::header::AUTHORIZATION) {
315            let encoded_credentials = match auth {
316                Auth::Basic { user, password } => Some(HeaderValue::from_str(
317                    format!(
318                        "Basic {}",
319                        BASE64_STANDARD.encode(format!("{}:{}", user, password.inner()))
320                    )
321                    .as_str(),
322                )),
323                Auth::Bearer { token } => Some(HeaderValue::from_str(
324                    format!("Bearer {}", token.inner()).as_str(),
325                )),
326                #[cfg(feature = "aws-core")]
327                _ => None,
328            };
329
330            if let Some(Ok(encoded_credentials)) = encoded_credentials {
331                if auth_header == encoded_credentials {
332                    return true;
333                }
334            }
335        }
336    } else {
337        return true;
338    }
339
340    false
341}
342
343#[derive(Clone)]
344struct Handler {
345    auth: Option<Auth>,
346    default_namespace: Option<String>,
347    buckets: Box<[f64]>,
348    quantiles: Box<[f64]>,
349    bytes_sent: Registered<BytesSent>,
350    events_sent: Registered<EventsSent>,
351}
352
353impl Handler {
354    fn handle<T: HttpBody>(
355        &self,
356        req: Request<T>,
357        metrics: &RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>,
358    ) -> Response<Body> {
359        let mut response = Response::new(Body::empty());
360
361        match (authorized(&req, &self.auth), req.method(), req.uri().path()) {
362            (false, _, _) => {
363                *response.status_mut() = StatusCode::UNAUTHORIZED;
364                response.headers_mut().insert(
365                    http::header::WWW_AUTHENTICATE,
366                    HeaderValue::from_static("Basic, Bearer"),
367                );
368            }
369
370            (true, &Method::GET, "/metrics") => {
371                let metrics = metrics.read().expect(LOCK_FAILED);
372
373                let count = metrics.len();
374                let byte_size = metrics
375                    .iter()
376                    .map(|(_, (metric, _))| metric.estimated_json_encoded_size_of())
377                    .sum();
378
379                let mut collector = StringCollector::new();
380
381                for (_, (metric, _)) in metrics.iter() {
382                    collector.encode_metric(
383                        self.default_namespace.as_deref(),
384                        &self.buckets,
385                        &self.quantiles,
386                        metric,
387                    );
388                }
389
390                drop(metrics);
391
392                let body = collector.finish();
393                let body_size = body.size_of();
394
395                *response.body_mut() = body.into();
396
397                response.headers_mut().insert(
398                    "Content-Type",
399                    HeaderValue::from_static("text/plain; version=0.0.4"),
400                );
401
402                self.events_sent.emit(CountByteSize(count, byte_size));
403                self.bytes_sent.emit(ByteSize(body_size));
404            }
405
406            (true, _, _) => {
407                *response.status_mut() = StatusCode::NOT_FOUND;
408            }
409        }
410
411        response
412    }
413}
414
415impl PrometheusExporter {
416    fn new(config: PrometheusExporterConfig) -> Self {
417        Self {
418            server_shutdown_trigger: None,
419            config,
420            metrics: Arc::new(RwLock::new(IndexMap::new())),
421        }
422    }
423
424    async fn start_server_if_needed(&mut self) -> crate::Result<()> {
425        if self.server_shutdown_trigger.is_some() {
426            return Ok(());
427        }
428
429        let handler = Handler {
430            bytes_sent: register!(BytesSent::from(Protocol::HTTP)),
431            events_sent: register!(EventsSent::from(Output(None))),
432            default_namespace: self.config.default_namespace.clone(),
433            buckets: self.config.buckets.clone().into(),
434            quantiles: self.config.quantiles.clone().into(),
435            auth: self.config.auth.clone(),
436        };
437
438        let span = Span::current();
439        let metrics = Arc::clone(&self.metrics);
440
441        let new_service = make_service_fn(move |_| {
442            let span = Span::current();
443            let metrics = Arc::clone(&metrics);
444            let handler = handler.clone();
445
446            let inner = service_fn(move |req| {
447                let response = handler.handle(req, &metrics);
448
449                future::ok::<_, Infallible>(response)
450            });
451
452            let service = ServiceBuilder::new()
453                .layer(build_http_trace_layer(span.clone()))
454                .layer(CompressionLayer::new())
455                .service(inner);
456
457            async move { Ok::<_, Infallible>(service) }
458        });
459
460        let (trigger, tripwire) = Tripwire::new();
461
462        let tls = self.config.tls.clone();
463        let address = self.config.address;
464
465        let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?;
466        let listener = tls.bind(&address).await?;
467
468        tokio::spawn(async move {
469            info!(message = "Building HTTP server.", address = %address);
470
471            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
472                .serve(new_service)
473                .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler))
474                .instrument(span)
475                .await
476                .map_err(|error| error!("Server error: {}.", error))?;
477
478            Ok::<(), ()>(())
479        });
480
481        self.server_shutdown_trigger = Some(trigger);
482        Ok(())
483    }
484
485    fn normalize(&mut self, metric: Metric) -> Option<Metric> {
486        let new_metric = match metric.value() {
487            MetricValue::Distribution { .. } => {
488                // Convert the distribution as-is, and then absolute-ify it.
489                let (series, data, metadata) = metric.into_parts();
490                let (time, kind, value) = data.into_parts();
491
492                let new_value = if self.config.distributions_as_summaries {
493                    // We use a sketch when in summary mode because they're actually able to be
494                    // merged and provide correct output, unlike the aggregated summaries that
495                    // we handle from _sources_ like Prometheus.  The collector code itself
496                    // will render sketches as aggregated summaries, so we have continuity there.
497                    value
498                        .distribution_to_sketch()
499                        .expect("value should be distribution already")
500                } else {
501                    value
502                        .distribution_to_agg_histogram(&self.config.buckets)
503                        .expect("value should be distribution already")
504                };
505
506                let data = MetricData::from_parts(time, kind, new_value);
507                Metric::from_parts(series, data, metadata)
508            }
509            _ => metric,
510        };
511
512        match new_metric.kind() {
513            MetricKind::Absolute => Some(new_metric),
514            MetricKind::Incremental => {
515                let metrics = self.metrics.read().expect(LOCK_FAILED);
516                let metric_ref = MetricRef::from_metric(&new_metric);
517
518                if let Some(existing) = metrics.get(&metric_ref) {
519                    let mut current = existing.0.value().clone();
520                    if current.add(new_metric.value()) {
521                        // If we were able to add to the existing value (i.e. they were compatible),
522                        // return the result as an absolute metric.
523                        return Some(new_metric.with_value(current).into_absolute());
524                    }
525                }
526
527                // Otherwise, if we didn't have an existing value or we did and it was not
528                // compatible with the new value, simply return the new value as absolute.
529                Some(new_metric.into_absolute())
530            }
531        }
532    }
533}
534
535#[async_trait]
536impl StreamSink<Event> for PrometheusExporter {
537    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
538        self.start_server_if_needed()
539            .await
540            .map_err(|error| error!("Failed to start Prometheus exporter: {}.", error))?;
541
542        let mut last_flush = Instant::now();
543        let flush_period = self.config.flush_period_secs;
544
545        while let Some(event) = input.next().await {
546            // If we've exceed our flush interval, go through all of the metrics we're currently
547            // tracking and remove any which have exceeded the flush interval in terms of not
548            // having been updated within that long of a time.
549            //
550            // TODO: Can we be smarter about this? As is, we might wait up to 2x the flush period to
551            // remove an expired metric depending on how things line up.  It'd be cool to _check_
552            // for expired metrics more often, but we also don't want to check _way_ too often, like
553            // every second, since then we're constantly iterating through every metric, etc etc.
554            if last_flush.elapsed() > self.config.flush_period_secs {
555                last_flush = Instant::now();
556
557                let mut metrics = self.metrics.write().expect(LOCK_FAILED);
558
559                metrics.retain(|_metric_ref, (_, metadata)| !metadata.has_expired(last_flush));
560            }
561
562            // Now process the metric we got.
563            let mut metric = event.into_metric();
564            let finalizers = metric.take_finalizers();
565
566            match self.normalize(metric) {
567                Some(normalized) => {
568                    let normalized = if self.config.suppress_timestamp {
569                        normalized.with_timestamp(None)
570                    } else {
571                        normalized
572                    };
573
574                    // We have a normalized metric, in absolute form.  If we're already aware of this
575                    // metric, update its expiration deadline, otherwise, start tracking it.
576                    let mut metrics = self.metrics.write().expect(LOCK_FAILED);
577
578                    match metrics.entry(MetricRef::from_metric(&normalized)) {
579                        Entry::Occupied(mut entry) => {
580                            let (data, metadata) = entry.get_mut();
581                            *data = normalized;
582                            metadata.refresh();
583                        }
584                        Entry::Vacant(entry) => {
585                            entry.insert((normalized, MetricMetadata::new(flush_period)));
586                        }
587                    }
588                    finalizers.update_status(EventStatus::Delivered);
589                }
590                _ => {
591                    emit!(PrometheusNormalizationError {});
592                    finalizers.update_status(EventStatus::Errored);
593                }
594            }
595        }
596
597        Ok(())
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use chrono::{Duration, Utc};
604    use flate2::read::GzDecoder;
605    use futures::stream;
606    use indoc::indoc;
607    use similar_asserts::assert_eq;
608    use std::io::Read;
609    use tokio::{sync::oneshot::error::TryRecvError, time};
610    use vector_lib::{
611        event::{MetricTags, StatisticKind},
612        finalization::{BatchNotifier, BatchStatus},
613        metric_tags, samples,
614        sensitive_string::SensitiveString,
615    };
616
617    use super::*;
618    use crate::{
619        config::ProxyConfig,
620        event::metric::{Metric, MetricValue},
621        http::HttpClient,
622        sinks::prometheus::{distribution_to_agg_histogram, distribution_to_ddsketch},
623        test_util::{
624            components::{run_and_assert_sink_compliance, SINK_TAGS},
625            next_addr, random_string, trace_init,
626        },
627        tls::MaybeTlsSettings,
628    };
629
630    #[test]
631    fn generate_config() {
632        crate::test_util::test_generate_config::<PrometheusExporterConfig>();
633    }
634
635    #[tokio::test]
636    async fn prometheus_notls() {
637        export_and_fetch_simple(None).await;
638    }
639
640    #[tokio::test]
641    async fn prometheus_tls() {
642        let mut tls_config = TlsEnableableConfig::test_config();
643        tls_config.options.verify_hostname = Some(false);
644        export_and_fetch_simple(Some(tls_config)).await;
645    }
646
647    #[tokio::test]
648    async fn prometheus_noauth() {
649        let (name1, event1) = create_metric_gauge(None, 123.4);
650        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
651        let events = vec![event1, event2];
652
653        let response_result = export_and_fetch_with_auth(None, None, events, false).await;
654
655        assert!(response_result.is_ok());
656
657        let body = response_result.expect("Cannot extract body from the response");
658
659        assert!(body.contains(&format!(
660            indoc! {r#"
661               # HELP {name} {name}
662               # TYPE {name} gauge
663               {name}{{some_tag="some_value"}} 123.4
664            "#},
665            name = name1
666        )));
667        assert!(body.contains(&format!(
668            indoc! {r#"
669               # HELP {name} {name}
670               # TYPE {name} gauge
671               {name}{{some_tag="some_value"}} 3
672            "#},
673            name = name2
674        )));
675    }
676
677    #[tokio::test]
678    async fn prometheus_successful_basic_auth() {
679        let (name1, event1) = create_metric_gauge(None, 123.4);
680        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
681        let events = vec![event1, event2];
682
683        let auth_config = Auth::Basic {
684            user: "user".to_string(),
685            password: SensitiveString::from("password".to_string()),
686        };
687
688        let response_result =
689            export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
690                .await;
691
692        assert!(response_result.is_ok());
693
694        let body = response_result.expect("Cannot extract body from the response");
695
696        assert!(body.contains(&format!(
697            indoc! {r#"
698               # HELP {name} {name}
699               # TYPE {name} gauge
700               {name}{{some_tag="some_value"}} 123.4
701            "#},
702            name = name1
703        )));
704        assert!(body.contains(&format!(
705            indoc! {r#"
706               # HELP {name} {name}
707               # TYPE {name} gauge
708               {name}{{some_tag="some_value"}} 3
709            "#},
710            name = name2
711        )));
712    }
713
714    #[tokio::test]
715    async fn prometheus_successful_token_auth() {
716        let (name1, event1) = create_metric_gauge(None, 123.4);
717        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
718        let events = vec![event1, event2];
719
720        let auth_config = Auth::Bearer {
721            token: SensitiveString::from("token".to_string()),
722        };
723
724        let response_result =
725            export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
726                .await;
727
728        assert!(response_result.is_ok());
729
730        let body = response_result.expect("Cannot extract body from the response");
731
732        assert!(body.contains(&format!(
733            indoc! {r#"
734               # HELP {name} {name}
735               # TYPE {name} gauge
736               {name}{{some_tag="some_value"}} 123.4
737            "#},
738            name = name1
739        )));
740        assert!(body.contains(&format!(
741            indoc! {r#"
742               # HELP {name} {name}
743               # TYPE {name} gauge
744               {name}{{some_tag="some_value"}} 3
745            "#},
746            name = name2
747        )));
748    }
749
750    #[tokio::test]
751    async fn prometheus_missing_auth() {
752        let (_, event1) = create_metric_gauge(None, 123.4);
753        let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
754        let events = vec![event1, event2];
755
756        let server_auth_config = Auth::Bearer {
757            token: SensitiveString::from("token".to_string()),
758        };
759
760        let response_result =
761            export_and_fetch_with_auth(Some(server_auth_config), None, events, false).await;
762
763        assert!(response_result.is_err());
764        assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
765    }
766
767    #[tokio::test]
768    async fn prometheus_wrong_auth() {
769        let (_, event1) = create_metric_gauge(None, 123.4);
770        let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
771        let events = vec![event1, event2];
772
773        let server_auth_config = Auth::Bearer {
774            token: SensitiveString::from("token".to_string()),
775        };
776
777        let client_auth_config = Auth::Basic {
778            user: "user".to_string(),
779            password: SensitiveString::from("password".to_string()),
780        };
781
782        let response_result = export_and_fetch_with_auth(
783            Some(server_auth_config),
784            Some(client_auth_config),
785            events,
786            false,
787        )
788        .await;
789
790        assert!(response_result.is_err());
791        assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
792    }
793
794    #[tokio::test]
795    async fn encoding_gzip() {
796        let (name1, event1) = create_metric_gauge(None, 123.4);
797        let events = vec![event1];
798
799        let body_raw = export_and_fetch_raw(None, events, false, Some(String::from("gzip"))).await;
800        let expected = format!(
801            indoc! {r#"
802                # HELP {name} {name}
803                # TYPE {name} gauge
804                {name}{{some_tag="some_value"}} 123.4
805            "#},
806            name = name1,
807        );
808
809        let mut gz = GzDecoder::new(&body_raw[..]);
810        let mut body_decoded = String::new();
811        let _ = gz.read_to_string(&mut body_decoded);
812
813        assert!(body_raw.len() < expected.len());
814        assert_eq!(body_decoded, expected);
815    }
816
817    #[tokio::test]
818    async fn updates_timestamps() {
819        let timestamp1 = Utc::now();
820        let (name, event1) = create_metric_gauge(None, 123.4);
821        let event1 = Event::from(event1.into_metric().with_timestamp(Some(timestamp1)));
822        let (_, event2) = create_metric_gauge(Some(name.clone()), 12.0);
823        let timestamp2 = timestamp1 + Duration::seconds(1);
824        let event2 = Event::from(event2.into_metric().with_timestamp(Some(timestamp2)));
825        let events = vec![event1, event2];
826
827        let body = export_and_fetch(None, events, false).await;
828        let timestamp = timestamp2.timestamp_millis();
829        assert_eq!(
830            body,
831            format!(
832                indoc! {r#"
833                    # HELP {name} {name}
834                    # TYPE {name} gauge
835                    {name}{{some_tag="some_value"}} 135.4 {timestamp}
836                "#},
837                name = name,
838                timestamp = timestamp
839            )
840        );
841    }
842
843    #[tokio::test]
844    async fn suppress_timestamp() {
845        let timestamp = Utc::now();
846        let (name, event) = create_metric_gauge(None, 123.4);
847        let event = Event::from(event.into_metric().with_timestamp(Some(timestamp)));
848        let events = vec![event];
849
850        let body = export_and_fetch(None, events, true).await;
851        assert_eq!(
852            body,
853            format!(
854                indoc! {r#"
855                    # HELP {name} {name}
856                    # TYPE {name} gauge
857                    {name}{{some_tag="some_value"}} 123.4
858                "#},
859                name = name,
860            )
861        );
862    }
863
864    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
865    /// > Label names MUST be unique within a LabelSet.
866    /// Prometheus itself will reject the metric with an error. Largely to remain backward compatible with older versions of Vector,
867    /// we only publish the last tag in the list.
868    #[tokio::test]
869    async fn prometheus_duplicate_labels() {
870        let (name, event) = create_metric_with_tags(
871            None,
872            MetricValue::Gauge { value: 123.4 },
873            Some(metric_tags!("code" => "200", "code" => "success")),
874        );
875        let events = vec![event];
876
877        let response_result = export_and_fetch_with_auth(None, None, events, false).await;
878
879        assert!(response_result.is_ok());
880
881        let body = response_result.expect("Cannot extract body from the response");
882
883        assert!(body.contains(&format!(
884            indoc! {r#"
885               # HELP {name} {name}
886               # TYPE {name} gauge
887               {name}{{code="success"}} 123.4
888            "# },
889            name = name
890        )));
891    }
892
893    async fn export_and_fetch_raw(
894        tls_config: Option<TlsEnableableConfig>,
895        mut events: Vec<Event>,
896        suppress_timestamp: bool,
897        encoding: Option<String>,
898    ) -> hyper::body::Bytes {
899        trace_init();
900
901        let client_settings = MaybeTlsSettings::from_config(tls_config.as_ref(), false).unwrap();
902        let proto = client_settings.http_protocol_name();
903
904        let address = next_addr();
905        let config = PrometheusExporterConfig {
906            address,
907            tls: tls_config,
908            suppress_timestamp,
909            ..Default::default()
910        };
911
912        // Set up acknowledgement notification
913        let mut receiver = BatchNotifier::apply_to(&mut events[..]);
914        assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
915
916        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
917        let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
918        let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
919            sink,
920            stream::iter(events).chain(stream::once(async move {
921                // Wait a bit to have time to scrape metrics
922                time::sleep(time::Duration::from_millis(500)).await;
923                delayed_event
924            })),
925            &SINK_TAGS,
926        ));
927
928        time::sleep(time::Duration::from_millis(100)).await;
929
930        // Events are marked as delivered as soon as they are aggregated.
931        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
932
933        let mut request = Request::get(format!("{proto}://{address}/metrics"))
934            .body(Body::empty())
935            .expect("Error creating request.");
936
937        if let Some(ref encoding) = encoding {
938            request.headers_mut().insert(
939                http::header::ACCEPT_ENCODING,
940                HeaderValue::from_str(encoding.as_str()).unwrap(),
941            );
942        }
943
944        let proxy = ProxyConfig::default();
945        let result = HttpClient::new(client_settings, &proxy)
946            .unwrap()
947            .send(request)
948            .await
949            .expect("Could not fetch query");
950
951        assert!(result.status().is_success());
952
953        if encoding.is_some() {
954            assert!(result
955                .headers()
956                .contains_key(http::header::CONTENT_ENCODING));
957        }
958
959        let body = result.into_body();
960        let bytes = hyper::body::to_bytes(body)
961            .await
962            .expect("Reading body failed");
963
964        sink_handle.await.unwrap();
965
966        bytes
967    }
968
969    async fn export_and_fetch(
970        tls_config: Option<TlsEnableableConfig>,
971        events: Vec<Event>,
972        suppress_timestamp: bool,
973    ) -> String {
974        let bytes = export_and_fetch_raw(tls_config, events, suppress_timestamp, None);
975        String::from_utf8(bytes.await.to_vec()).unwrap()
976    }
977
978    async fn export_and_fetch_with_auth(
979        server_auth_config: Option<Auth>,
980        client_auth_config: Option<Auth>,
981        mut events: Vec<Event>,
982        suppress_timestamp: bool,
983    ) -> Result<String, http::status::StatusCode> {
984        trace_init();
985
986        let client_settings = MaybeTlsSettings::from_config(None, false).unwrap();
987        let proto = client_settings.http_protocol_name();
988
989        let address = next_addr();
990        let config = PrometheusExporterConfig {
991            address,
992            auth: server_auth_config,
993            tls: None,
994            suppress_timestamp,
995            ..Default::default()
996        };
997
998        // Set up acknowledgement notification
999        let mut receiver = BatchNotifier::apply_to(&mut events[..]);
1000        assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
1001
1002        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1003        let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
1004        let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
1005            sink,
1006            stream::iter(events).chain(stream::once(async move {
1007                // Wait a bit to have time to scrape metrics
1008                time::sleep(time::Duration::from_millis(500)).await;
1009                delayed_event
1010            })),
1011            &SINK_TAGS,
1012        ));
1013
1014        time::sleep(time::Duration::from_millis(100)).await;
1015
1016        // Events are marked as delivered as soon as they are aggregated.
1017        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
1018
1019        let mut request = Request::get(format!("{proto}://{address}/metrics"))
1020            .body(Body::empty())
1021            .expect("Error creating request.");
1022
1023        if let Some(client_auth_config) = client_auth_config {
1024            client_auth_config.apply(&mut request);
1025        }
1026
1027        let proxy = ProxyConfig::default();
1028        let result = HttpClient::new(client_settings, &proxy)
1029            .unwrap()
1030            .send(request)
1031            .await
1032            .expect("Could not fetch query");
1033
1034        if !result.status().is_success() {
1035            return Err(result.status());
1036        }
1037
1038        let body = result.into_body();
1039        let bytes = hyper::body::to_bytes(body)
1040            .await
1041            .expect("Reading body failed");
1042        let result = String::from_utf8(bytes.to_vec()).unwrap();
1043
1044        sink_handle.await.unwrap();
1045
1046        Ok(result)
1047    }
1048
1049    async fn export_and_fetch_simple(tls_config: Option<TlsEnableableConfig>) {
1050        let (name1, event1) = create_metric_gauge(None, 123.4);
1051        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1052        let events = vec![event1, event2];
1053
1054        let body = export_and_fetch(tls_config, events, false).await;
1055
1056        assert!(body.contains(&format!(
1057            indoc! {r#"
1058               # HELP {name} {name}
1059               # TYPE {name} gauge
1060               {name}{{some_tag="some_value"}} 123.4
1061            "#},
1062            name = name1
1063        )));
1064        assert!(body.contains(&format!(
1065            indoc! {r#"
1066               # HELP {name} {name}
1067               # TYPE {name} gauge
1068               {name}{{some_tag="some_value"}} 3
1069            "#},
1070            name = name2
1071        )));
1072    }
1073
1074    pub fn create_metric_gauge(name: Option<String>, value: f64) -> (String, Event) {
1075        create_metric(name, MetricValue::Gauge { value })
1076    }
1077
1078    pub fn create_metric_set(name: Option<String>, values: Vec<&'static str>) -> (String, Event) {
1079        create_metric(
1080            name,
1081            MetricValue::Set {
1082                values: values.into_iter().map(Into::into).collect(),
1083            },
1084        )
1085    }
1086
1087    fn create_metric(name: Option<String>, value: MetricValue) -> (String, Event) {
1088        create_metric_with_tags(name, value, Some(metric_tags!("some_tag" => "some_value")))
1089    }
1090
1091    fn create_metric_with_tags(
1092        name: Option<String>,
1093        value: MetricValue,
1094        tags: Option<MetricTags>,
1095    ) -> (String, Event) {
1096        let name = name.unwrap_or_else(|| format!("vector_set_{}", random_string(16)));
1097        let event = Metric::new(name.clone(), MetricKind::Incremental, value)
1098            .with_tags(tags)
1099            .into();
1100        (name, event)
1101    }
1102
1103    #[tokio::test]
1104    async fn sink_absolute() {
1105        let config = PrometheusExporterConfig {
1106            address: next_addr(), // Not actually bound, just needed to fill config
1107            tls: None,
1108            ..Default::default()
1109        };
1110
1111        let sink = PrometheusExporter::new(config);
1112
1113        let m1 = Metric::new(
1114            "absolute",
1115            MetricKind::Absolute,
1116            MetricValue::Counter { value: 32. },
1117        )
1118        .with_tags(Some(metric_tags!("tag1" => "value1")));
1119
1120        let m2 = m1.clone().with_tags(Some(metric_tags!("tag1" => "value2")));
1121
1122        let events = vec![
1123            Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 32. })),
1124            Event::Metric(m2.clone().with_value(MetricValue::Counter { value: 33. })),
1125            Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 40. })),
1126        ];
1127
1128        let metrics_handle = Arc::clone(&sink.metrics);
1129
1130        let sink = VectorSink::from_event_streamsink(sink);
1131        let input_events = stream::iter(events).map(Into::into);
1132        sink.run(input_events).await.unwrap();
1133
1134        let metrics_after = metrics_handle.read().unwrap();
1135
1136        let expected_m1 = metrics_after
1137            .get(&MetricRef::from_metric(&m1))
1138            .expect("m1 should exist");
1139        let expected_m1_value = MetricValue::Counter { value: 40. };
1140        assert_eq!(expected_m1.0.value(), &expected_m1_value);
1141
1142        let expected_m2 = metrics_after
1143            .get(&MetricRef::from_metric(&m2))
1144            .expect("m2 should exist");
1145        let expected_m2_value = MetricValue::Counter { value: 33. };
1146        assert_eq!(expected_m2.0.value(), &expected_m2_value);
1147    }
1148
1149    #[tokio::test]
1150    async fn sink_distributions_as_histograms() {
1151        // When we get summary distributions, unless we've been configured to actually emit
1152        // summaries for distributions, we just forcefully turn them into histograms.  This is
1153        // simpler and uses less memory, as aggregated histograms are better supported by Prometheus
1154        // since they can actually be aggregated anywhere in the pipeline -- so long as the buckets
1155        // are the same -- without loss of accuracy.
1156
1157        // This expects that the default for the sink is to render distributions as aggregated histograms.
1158        let config = PrometheusExporterConfig {
1159            address: next_addr(), // Not actually bound, just needed to fill config
1160            tls: None,
1161            ..Default::default()
1162        };
1163        let buckets = config.buckets.clone();
1164
1165        let sink = PrometheusExporter::new(config);
1166
1167        // Define a series of incremental distribution updates.
1168        let base_summary_metric = Metric::new(
1169            "distrib_summary",
1170            MetricKind::Incremental,
1171            MetricValue::Distribution {
1172                statistic: StatisticKind::Summary,
1173                samples: samples!(1.0 => 1, 3.0 => 2),
1174            },
1175        );
1176
1177        let base_histogram_metric = Metric::new(
1178            "distrib_histo",
1179            MetricKind::Incremental,
1180            MetricValue::Distribution {
1181                statistic: StatisticKind::Histogram,
1182                samples: samples!(7.0 => 1, 9.0 => 2),
1183            },
1184        );
1185
1186        let metrics = vec![
1187            base_summary_metric.clone(),
1188            base_summary_metric
1189                .clone()
1190                .with_value(MetricValue::Distribution {
1191                    statistic: StatisticKind::Summary,
1192                    samples: samples!(1.0 => 2, 2.9 => 1),
1193                }),
1194            base_summary_metric
1195                .clone()
1196                .with_value(MetricValue::Distribution {
1197                    statistic: StatisticKind::Summary,
1198                    samples: samples!(1.0 => 4, 3.2 => 1),
1199                }),
1200            base_histogram_metric.clone(),
1201            base_histogram_metric
1202                .clone()
1203                .with_value(MetricValue::Distribution {
1204                    statistic: StatisticKind::Histogram,
1205                    samples: samples!(7.0 => 2, 9.9 => 1),
1206                }),
1207            base_histogram_metric
1208                .clone()
1209                .with_value(MetricValue::Distribution {
1210                    statistic: StatisticKind::Histogram,
1211                    samples: samples!(7.0 => 4, 10.2 => 1),
1212                }),
1213        ];
1214
1215        // Figure out what the merged distributions should add up to.
1216        let mut merged_summary = base_summary_metric.clone();
1217        assert!(merged_summary.update(&metrics[1]));
1218        assert!(merged_summary.update(&metrics[2]));
1219        let expected_summary = distribution_to_agg_histogram(merged_summary, &buckets)
1220            .expect("input summary metric should have been distribution")
1221            .into_absolute();
1222
1223        let mut merged_histogram = base_histogram_metric.clone();
1224        assert!(merged_histogram.update(&metrics[4]));
1225        assert!(merged_histogram.update(&metrics[5]));
1226        let expected_histogram = distribution_to_agg_histogram(merged_histogram, &buckets)
1227            .expect("input histogram metric should have been distribution")
1228            .into_absolute();
1229
1230        // TODO: make a new metric based on merged_distrib_histogram, with expected_histogram_value,
1231        // so that the discriminant matches and our lookup in the indexmap can actually find it
1232
1233        // Now run the events through the sink and see what ends up in the internal metric map.
1234        let metrics_handle = Arc::clone(&sink.metrics);
1235
1236        let events = metrics
1237            .iter()
1238            .cloned()
1239            .map(Event::Metric)
1240            .collect::<Vec<_>>();
1241
1242        let sink = VectorSink::from_event_streamsink(sink);
1243        let input_events = stream::iter(events).map(Into::into);
1244        sink.run(input_events).await.unwrap();
1245
1246        let metrics_after = metrics_handle.read().unwrap();
1247
1248        // Both metrics should be present, and both should be aggregated histograms.
1249        assert_eq!(metrics_after.len(), 2);
1250
1251        let actual_summary = metrics_after
1252            .get(&MetricRef::from_metric(&expected_summary))
1253            .expect("summary metric should exist");
1254        assert_eq!(actual_summary.0.value(), expected_summary.value());
1255
1256        let actual_histogram = metrics_after
1257            .get(&MetricRef::from_metric(&expected_histogram))
1258            .expect("histogram metric should exist");
1259        assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1260    }
1261
1262    #[tokio::test]
1263    async fn sink_distributions_as_summaries() {
1264        // When we get summary distributions, unless we've been configured to actually emit
1265        // summaries for distributions, we just forcefully turn them into histograms.  This is
1266        // simpler and uses less memory, as aggregated histograms are better supported by Prometheus
1267        // since they can actually be aggregated anywhere in the pipeline -- so long as the buckets
1268        // are the same -- without loss of accuracy.
1269
1270        // This assumes that when we turn on `distributions_as_summaries`, we'll get aggregated
1271        // summaries from distributions.  This is technically true, but the way this test works is
1272        // that we check the internal metric data, which, when in this mode, will actually be a
1273        // sketch (so that we can merge without loss of accuracy).
1274        //
1275        // The render code is actually what will end up rendering those sketches as aggregated
1276        // summaries in the scrape output.
1277        let config = PrometheusExporterConfig {
1278            address: next_addr(), // Not actually bound, just needed to fill config
1279            tls: None,
1280            distributions_as_summaries: true,
1281            ..Default::default()
1282        };
1283
1284        let sink = PrometheusExporter::new(config);
1285
1286        // Define a series of incremental distribution updates.
1287        let base_summary_metric = Metric::new(
1288            "distrib_summary",
1289            MetricKind::Incremental,
1290            MetricValue::Distribution {
1291                statistic: StatisticKind::Summary,
1292                samples: samples!(1.0 => 1, 3.0 => 2),
1293            },
1294        );
1295
1296        let base_histogram_metric = Metric::new(
1297            "distrib_histo",
1298            MetricKind::Incremental,
1299            MetricValue::Distribution {
1300                statistic: StatisticKind::Histogram,
1301                samples: samples!(7.0 => 1, 9.0 => 2),
1302            },
1303        );
1304
1305        let metrics = vec![
1306            base_summary_metric.clone(),
1307            base_summary_metric
1308                .clone()
1309                .with_value(MetricValue::Distribution {
1310                    statistic: StatisticKind::Summary,
1311                    samples: samples!(1.0 => 2, 2.9 => 1),
1312                }),
1313            base_summary_metric
1314                .clone()
1315                .with_value(MetricValue::Distribution {
1316                    statistic: StatisticKind::Summary,
1317                    samples: samples!(1.0 => 4, 3.2 => 1),
1318                }),
1319            base_histogram_metric.clone(),
1320            base_histogram_metric
1321                .clone()
1322                .with_value(MetricValue::Distribution {
1323                    statistic: StatisticKind::Histogram,
1324                    samples: samples!(7.0 => 2, 9.9 => 1),
1325                }),
1326            base_histogram_metric
1327                .clone()
1328                .with_value(MetricValue::Distribution {
1329                    statistic: StatisticKind::Histogram,
1330                    samples: samples!(7.0 => 4, 10.2 => 1),
1331                }),
1332        ];
1333
1334        // Figure out what the merged distributions should add up to.
1335        let mut merged_summary = base_summary_metric.clone();
1336        assert!(merged_summary.update(&metrics[1]));
1337        assert!(merged_summary.update(&metrics[2]));
1338        let expected_summary = distribution_to_ddsketch(merged_summary)
1339            .expect("input summary metric should have been distribution")
1340            .into_absolute();
1341
1342        let mut merged_histogram = base_histogram_metric.clone();
1343        assert!(merged_histogram.update(&metrics[4]));
1344        assert!(merged_histogram.update(&metrics[5]));
1345        let expected_histogram = distribution_to_ddsketch(merged_histogram)
1346            .expect("input histogram metric should have been distribution")
1347            .into_absolute();
1348
1349        // Now run the events through the sink and see what ends up in the internal metric map.
1350        let metrics_handle = Arc::clone(&sink.metrics);
1351
1352        let events = metrics
1353            .iter()
1354            .cloned()
1355            .map(Event::Metric)
1356            .collect::<Vec<_>>();
1357
1358        let sink = VectorSink::from_event_streamsink(sink);
1359        let input_events = stream::iter(events).map(Into::into);
1360        sink.run(input_events).await.unwrap();
1361
1362        let metrics_after = metrics_handle.read().unwrap();
1363
1364        // Both metrics should be present, and both should be aggregated histograms.
1365        assert_eq!(metrics_after.len(), 2);
1366
1367        let actual_summary = metrics_after
1368            .get(&MetricRef::from_metric(&expected_summary))
1369            .expect("summary metric should exist");
1370        assert_eq!(actual_summary.0.value(), expected_summary.value());
1371
1372        let actual_histogram = metrics_after
1373            .get(&MetricRef::from_metric(&expected_histogram))
1374            .expect("histogram metric should exist");
1375        assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1376    }
1377
1378    #[tokio::test]
1379    async fn sink_gauge_incremental_absolute_mix() {
1380        // Because Prometheus does not, itself, have the concept of an Incremental metric, the
1381        // Exporter must apply a normalization function that converts all metrics to Absolute ones
1382        // before handling them.
1383
1384        // This test ensures that this normalization works correctly when applied to a mix of both
1385        // Incremental and Absolute inputs.
1386        let config = PrometheusExporterConfig {
1387            address: next_addr(), // Not actually bound, just needed to fill config
1388            tls: None,
1389            ..Default::default()
1390        };
1391
1392        let sink = PrometheusExporter::new(config);
1393
1394        let base_absolute_gauge_metric = Metric::new(
1395            "gauge",
1396            MetricKind::Absolute,
1397            MetricValue::Gauge { value: 100.0 },
1398        );
1399
1400        let base_incremental_gauge_metric = Metric::new(
1401            "gauge",
1402            MetricKind::Incremental,
1403            MetricValue::Gauge { value: -10.0 },
1404        );
1405
1406        let metrics = vec![
1407            base_absolute_gauge_metric.clone(),
1408            base_absolute_gauge_metric
1409                .clone()
1410                .with_value(MetricValue::Gauge { value: 333.0 }),
1411            base_incremental_gauge_metric.clone(),
1412            base_incremental_gauge_metric
1413                .clone()
1414                .with_value(MetricValue::Gauge { value: 4.0 }),
1415        ];
1416
1417        // Now run the events through the sink and see what ends up in the internal metric map.
1418        let metrics_handle = Arc::clone(&sink.metrics);
1419
1420        let events = metrics
1421            .iter()
1422            .cloned()
1423            .map(Event::Metric)
1424            .collect::<Vec<_>>();
1425
1426        let sink = VectorSink::from_event_streamsink(sink);
1427        let input_events = stream::iter(events).map(Into::into);
1428        sink.run(input_events).await.unwrap();
1429
1430        let metrics_after = metrics_handle.read().unwrap();
1431
1432        // The gauge metric should be present.
1433        assert_eq!(metrics_after.len(), 1);
1434
1435        let expected_gauge = Metric::new(
1436            "gauge",
1437            MetricKind::Absolute,
1438            MetricValue::Gauge { value: 327.0 },
1439        );
1440
1441        let actual_gauge = metrics_after
1442            .get(&MetricRef::from_metric(&expected_gauge))
1443            .expect("gauge metric should exist");
1444        assert_eq!(actual_gauge.0.value(), expected_gauge.value());
1445    }
1446}
1447
1448#[cfg(all(test, feature = "prometheus-integration-tests"))]
1449mod integration_tests {
1450    #![allow(clippy::print_stdout)] // tests
1451    #![allow(clippy::print_stderr)] // tests
1452    #![allow(clippy::dbg_macro)] // tests
1453
1454    use chrono::Utc;
1455    use futures::{future::ready, stream};
1456    use serde_json::Value;
1457    use tokio::{sync::mpsc, time};
1458    use tokio_stream::wrappers::UnboundedReceiverStream;
1459
1460    use super::*;
1461    use crate::{
1462        config::ProxyConfig,
1463        http::HttpClient,
1464        test_util::{
1465            components::{run_and_assert_sink_compliance, SINK_TAGS},
1466            trace_init,
1467        },
1468    };
1469
1470    fn sink_exporter_address() -> String {
1471        std::env::var("SINK_EXPORTER_ADDRESS").unwrap_or_else(|_| "127.0.0.1:9101".into())
1472    }
1473
1474    fn prometheus_address() -> String {
1475        std::env::var("PROMETHEUS_ADDRESS").unwrap_or_else(|_| "localhost:9090".into())
1476    }
1477
1478    async fn fetch_exporter_body() -> String {
1479        let url = format!("http://{}/metrics", sink_exporter_address());
1480        let request = Request::get(url)
1481            .body(Body::empty())
1482            .expect("Error creating request.");
1483        let proxy = ProxyConfig::default();
1484        let result = HttpClient::new(None, &proxy)
1485            .unwrap()
1486            .send(request)
1487            .await
1488            .expect("Could not send request");
1489        let result = hyper::body::to_bytes(result.into_body())
1490            .await
1491            .expect("Error fetching body");
1492        String::from_utf8_lossy(&result).to_string()
1493    }
1494
1495    async fn prometheus_query(query: &str) -> Value {
1496        let url = format!(
1497            "http://{}/api/v1/query?query={}",
1498            prometheus_address(),
1499            query
1500        );
1501        let request = Request::post(url)
1502            .body(Body::empty())
1503            .expect("Error creating request.");
1504        let proxy = ProxyConfig::default();
1505        let result = HttpClient::new(None, &proxy)
1506            .unwrap()
1507            .send(request)
1508            .await
1509            .expect("Could not fetch query");
1510        let result = hyper::body::to_bytes(result.into_body())
1511            .await
1512            .expect("Error fetching body");
1513        let result = String::from_utf8_lossy(&result);
1514        serde_json::from_str(result.as_ref()).expect("Invalid JSON from prometheus")
1515    }
1516
1517    #[tokio::test]
1518    async fn prometheus_metrics() {
1519        trace_init();
1520
1521        prometheus_scrapes_metrics().await;
1522        time::sleep(time::Duration::from_millis(500)).await;
1523        reset_on_flush_period().await;
1524        expire_on_flush_period().await;
1525    }
1526
1527    async fn prometheus_scrapes_metrics() {
1528        let start = Utc::now().timestamp();
1529
1530        let config = PrometheusExporterConfig {
1531            address: sink_exporter_address().parse().unwrap(),
1532            flush_period_secs: Duration::from_secs(2),
1533            ..Default::default()
1534        };
1535        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1536        let (name, event) = tests::create_metric_gauge(None, 123.4);
1537        let (_, delayed_event) = tests::create_metric_gauge(Some("delayed".to_string()), 123.4);
1538
1539        run_and_assert_sink_compliance(
1540            sink,
1541            stream::once(ready(event)).chain(stream::once(async move {
1542                // Wait a bit for the prometheus server to scrape the metrics
1543                time::sleep(time::Duration::from_secs(2)).await;
1544                delayed_event
1545            })),
1546            &SINK_TAGS,
1547        )
1548        .await;
1549
1550        // Now try to download them from prometheus
1551        let result = prometheus_query(&name).await;
1552
1553        let data = &result["data"]["result"][0];
1554        assert_eq!(data["metric"]["__name__"], Value::String(name));
1555        assert_eq!(
1556            data["metric"]["instance"],
1557            Value::String(sink_exporter_address())
1558        );
1559        assert_eq!(
1560            data["metric"]["some_tag"],
1561            Value::String("some_value".into())
1562        );
1563        assert!(data["value"][0].as_f64().unwrap() >= start as f64);
1564        assert_eq!(data["value"][1], Value::String("123.4".into()));
1565    }
1566
1567    async fn reset_on_flush_period() {
1568        let config = PrometheusExporterConfig {
1569            address: sink_exporter_address().parse().unwrap(),
1570            flush_period_secs: Duration::from_secs(3),
1571            ..Default::default()
1572        };
1573        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1574        let (tx, rx) = mpsc::unbounded_channel();
1575        let input_events = UnboundedReceiverStream::new(rx);
1576
1577        let input_events = input_events.map(Into::into);
1578        let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1579
1580        // Create two sets with different names but the same size.
1581        let (name1, event) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1582        tx.send(event).expect("Failed to send.");
1583        let (name2, event) = tests::create_metric_set(None, vec!["3", "4", "5"]);
1584        tx.send(event).expect("Failed to send.");
1585
1586        // Wait for the Prometheus server to scrape them, and then query it to ensure both metrics
1587        // have their correct set size value.
1588        time::sleep(time::Duration::from_secs(2)).await;
1589
1590        // Now query Prometheus to make sure we see them there.
1591        let result = prometheus_query(&name1).await;
1592        assert_eq!(
1593            result["data"]["result"][0]["value"][1],
1594            Value::String("3".into())
1595        );
1596        let result = prometheus_query(&name2).await;
1597        assert_eq!(
1598            result["data"]["result"][0]["value"][1],
1599            Value::String("3".into())
1600        );
1601
1602        // Wait a few more seconds to ensure that the two original sets have logically expired.
1603        // We'll update `name2` but not `name1`, which should lead to both being expired, but
1604        // `name2` being recreated with two values only, while `name1` is entirely gone.
1605        time::sleep(time::Duration::from_secs(3)).await;
1606
1607        let (name2, event) = tests::create_metric_set(Some(name2), vec!["8", "9"]);
1608        tx.send(event).expect("Failed to send.");
1609
1610        // Again, wait for the Prometheus server to scrape the metrics, and then query it again.
1611        time::sleep(time::Duration::from_secs(2)).await;
1612        let result = prometheus_query(&name1).await;
1613        assert_eq!(result["data"]["result"][0]["value"][1], Value::Null);
1614        let result = prometheus_query(&name2).await;
1615        assert_eq!(
1616            result["data"]["result"][0]["value"][1],
1617            Value::String("2".into())
1618        );
1619
1620        drop(tx);
1621        sink_handle.await.unwrap();
1622    }
1623
1624    async fn expire_on_flush_period() {
1625        let config = PrometheusExporterConfig {
1626            address: sink_exporter_address().parse().unwrap(),
1627            flush_period_secs: Duration::from_secs(3),
1628            ..Default::default()
1629        };
1630        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1631        let (tx, rx) = mpsc::unbounded_channel();
1632        let input_events = UnboundedReceiverStream::new(rx);
1633
1634        let input_events = input_events.map(Into::into);
1635        let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1636
1637        // metrics that will not be updated for a full flush period and therefore should expire
1638        let (name1, event) = tests::create_metric_set(None, vec!["42"]);
1639        tx.send(event).expect("Failed to send.");
1640        let (name2, event) = tests::create_metric_gauge(None, 100.0);
1641        tx.send(event).expect("Failed to send.");
1642
1643        // Wait a bit for the sink to process the events
1644        time::sleep(time::Duration::from_secs(1)).await;
1645
1646        // Exporter should present both metrics at first
1647        let body = fetch_exporter_body().await;
1648        assert!(body.contains(&name1));
1649        assert!(body.contains(&name2));
1650
1651        // Wait long enough to put us past flush_period_secs for the metric that wasn't updated
1652        for _ in 0..7 {
1653            // Update the first metric, ensuring it doesn't expire
1654            let (_, event) = tests::create_metric_set(Some(name1.clone()), vec!["43"]);
1655            tx.send(event).expect("Failed to send.");
1656
1657            // Wait a bit for time to pass
1658            time::sleep(time::Duration::from_secs(1)).await;
1659        }
1660
1661        // Exporter should present only the one that got updated
1662        let body = fetch_exporter_body().await;
1663        assert!(body.contains(&name1));
1664        assert!(!body.contains(&name2));
1665
1666        drop(tx);
1667        sink_handle.await.unwrap();
1668    }
1669}