vector/sinks/prometheus/
exporter.rs

1use std::{
2    convert::Infallible,
3    hash::Hash,
4    mem::{Discriminant, discriminant},
5    net::{IpAddr, Ipv4Addr, SocketAddr},
6    sync::{Arc, RwLock},
7    time::{Duration, Instant},
8};
9
10use async_trait::async_trait;
11use base64::prelude::{BASE64_STANDARD, Engine as _};
12use futures::{FutureExt, StreamExt, future, stream::BoxStream};
13use hyper::{
14    Body, Method, Request, Response, Server, StatusCode,
15    body::HttpBody,
16    header::HeaderValue,
17    service::{make_service_fn, service_fn},
18};
19use indexmap::{IndexMap, map::Entry};
20use serde_with::serde_as;
21use snafu::Snafu;
22use stream_cancel::{Trigger, Tripwire};
23use tower::ServiceBuilder;
24use tower_http::compression::CompressionLayer;
25use tracing::{Instrument, Span};
26use vector_lib::{
27    ByteSizeOf, EstimatedJsonEncodedSizeOf,
28    configurable::configurable_component,
29    internal_event::{
30        ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle as _, Output, Protocol,
31        Registered,
32    },
33};
34
35use super::collector::{MetricCollector, StringCollector};
36use crate::{
37    config::{AcknowledgementsConfig, GenerateConfig, Input, Resource, SinkConfig, SinkContext},
38    event::{
39        Event, EventStatus, Finalizable,
40        metric::{Metric, MetricData, MetricKind, MetricSeries, MetricValue},
41    },
42    http::{Auth, build_http_trace_layer},
43    internal_events::PrometheusNormalizationError,
44    sinks::{
45        Healthcheck, VectorSink,
46        util::{StreamSink, statistic::validate_quantiles},
47    },
48    tls::{MaybeTlsSettings, TlsEnableableConfig},
49};
50
51const MIN_FLUSH_PERIOD_SECS: u64 = 1;
52
53const LOCK_FAILED: &str = "Prometheus exporter data lock is poisoned";
54
55#[derive(Debug, Snafu)]
56enum BuildError {
57    #[snafu(display("Flush period for sets must be greater or equal to {} secs", min))]
58    FlushPeriodTooShort { min: u64 },
59}
60
61/// Configuration for the `prometheus_exporter` sink.
62#[serde_as]
63#[configurable_component(sink(
64    "prometheus_exporter",
65    "Expose metric events on a Prometheus compatible endpoint."
66))]
67#[derive(Clone, Debug)]
68#[serde(deny_unknown_fields)]
69pub struct PrometheusExporterConfig {
70    /// The default namespace for any metrics sent.
71    ///
72    /// This namespace is only used if a metric has no existing namespace. When a namespace is
73    /// present, it is used as a prefix to the metric name, and separated with an underscore (`_`).
74    ///
75    /// It should follow the Prometheus [naming conventions][prom_naming_docs].
76    ///
77    /// [prom_naming_docs]: https://prometheus.io/docs/practices/naming/#metric-names
78    #[serde(alias = "namespace")]
79    #[configurable(metadata(docs::advanced))]
80    pub default_namespace: Option<String>,
81
82    /// The address to expose for scraping.
83    ///
84    /// The metrics are exposed at the typical Prometheus exporter path, `/metrics`.
85    #[serde(default = "default_address")]
86    #[configurable(metadata(docs::examples = "192.160.0.10:9598"))]
87    pub address: SocketAddr,
88
89    #[configurable(derived)]
90    pub auth: Option<Auth>,
91
92    #[configurable(derived)]
93    pub tls: Option<TlsEnableableConfig>,
94
95    /// Default buckets to use for aggregating [distribution][dist_metric_docs] metrics into histograms.
96    ///
97    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
98    #[serde(default = "super::default_histogram_buckets")]
99    #[configurable(metadata(docs::advanced))]
100    pub buckets: Vec<f64>,
101
102    /// Quantiles to use for aggregating [distribution][dist_metric_docs] metrics into a summary.
103    ///
104    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
105    #[serde(default = "super::default_summary_quantiles")]
106    #[configurable(metadata(docs::advanced))]
107    pub quantiles: Vec<f64>,
108
109    /// Whether or not to render [distributions][dist_metric_docs] as an [aggregated histogram][prom_agg_hist_docs] or  [aggregated summary][prom_agg_summ_docs].
110    ///
111    /// While distributions as a lossless way to represent a set of samples for a
112    /// metric is supported, Prometheus clients (the application being scraped, which is this sink) must
113    /// aggregate locally into either an aggregated histogram or aggregated summary.
114    ///
115    /// [dist_metric_docs]: https://vector.dev/docs/architecture/data-model/metric/#distribution
116    /// [prom_agg_hist_docs]: https://prometheus.io/docs/concepts/metric_types/#histogram
117    /// [prom_agg_summ_docs]: https://prometheus.io/docs/concepts/metric_types/#summary
118    #[serde(default = "default_distributions_as_summaries")]
119    #[configurable(metadata(docs::advanced))]
120    pub distributions_as_summaries: bool,
121
122    /// The interval, in seconds, on which metrics are flushed.
123    ///
124    /// On the flush interval, if a metric has not been seen since the last flush interval, it is
125    /// considered expired and is removed.
126    ///
127    /// Be sure to configure this value higher than your client’s scrape interval.
128    #[serde(default = "default_flush_period_secs")]
129    #[serde_as(as = "serde_with::DurationSeconds<u64>")]
130    #[configurable(metadata(docs::advanced))]
131    #[configurable(metadata(docs::human_name = "Flush Interval"))]
132    pub flush_period_secs: Duration,
133
134    /// Suppresses timestamps on the Prometheus output.
135    ///
136    /// This can sometimes be useful when the source of metrics leads to their timestamps being too
137    /// far in the past for Prometheus to allow them, such as when aggregating metrics over long
138    /// time periods, or when replaying old metrics from a disk buffer.
139    #[serde(default)]
140    #[configurable(metadata(docs::advanced))]
141    pub suppress_timestamp: bool,
142
143    #[configurable(derived)]
144    #[serde(
145        default,
146        deserialize_with = "crate::serde::bool_or_struct",
147        skip_serializing_if = "crate::serde::is_default"
148    )]
149    pub acknowledgements: AcknowledgementsConfig,
150}
151
152impl Default for PrometheusExporterConfig {
153    fn default() -> Self {
154        Self {
155            default_namespace: None,
156            address: default_address(),
157            auth: None,
158            tls: None,
159            buckets: super::default_histogram_buckets(),
160            quantiles: super::default_summary_quantiles(),
161            distributions_as_summaries: default_distributions_as_summaries(),
162            flush_period_secs: default_flush_period_secs(),
163            suppress_timestamp: default_suppress_timestamp(),
164            acknowledgements: Default::default(),
165        }
166    }
167}
168
169const fn default_address() -> SocketAddr {
170    SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 9598)
171}
172
173const fn default_distributions_as_summaries() -> bool {
174    false
175}
176
177const fn default_flush_period_secs() -> Duration {
178    Duration::from_secs(60)
179}
180
181const fn default_suppress_timestamp() -> bool {
182    false
183}
184
185impl GenerateConfig for PrometheusExporterConfig {
186    fn generate_config() -> toml::Value {
187        toml::Value::try_from(Self::default()).unwrap()
188    }
189}
190
191#[async_trait::async_trait]
192#[typetag::serde(name = "prometheus_exporter")]
193impl SinkConfig for PrometheusExporterConfig {
194    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
195        if self.flush_period_secs.as_secs() < MIN_FLUSH_PERIOD_SECS {
196            return Err(Box::new(BuildError::FlushPeriodTooShort {
197                min: MIN_FLUSH_PERIOD_SECS,
198            }));
199        }
200
201        validate_quantiles(&self.quantiles)?;
202
203        let sink = PrometheusExporter::new(self.clone());
204        let healthcheck = future::ok(()).boxed();
205
206        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
207    }
208
209    fn input(&self) -> Input {
210        Input::metric()
211    }
212
213    fn resources(&self) -> Vec<Resource> {
214        vec![Resource::tcp(self.address)]
215    }
216
217    fn acknowledgements(&self) -> &AcknowledgementsConfig {
218        &self.acknowledgements
219    }
220}
221
222struct PrometheusExporter {
223    server_shutdown_trigger: Option<Trigger>,
224    config: PrometheusExporterConfig,
225    metrics: Arc<RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>>,
226}
227
228/// Expiration metadata for a metric.
229#[derive(Clone, Copy, Debug)]
230struct MetricMetadata {
231    expiration_window: Duration,
232    expires_at: Instant,
233}
234
235impl MetricMetadata {
236    pub fn new(expiration_window: Duration) -> Self {
237        Self {
238            expiration_window,
239            expires_at: Instant::now() + expiration_window,
240        }
241    }
242
243    /// Resets the expiration deadline.
244    pub fn refresh(&mut self) {
245        self.expires_at = Instant::now() + self.expiration_window;
246    }
247
248    /// Whether or not the referenced metric has expired yet.
249    pub fn has_expired(&self, now: Instant) -> bool {
250        now >= self.expires_at
251    }
252}
253
254// Composite identifier that uniquely represents a metric.
255//
256// Instead of simply working off of the name (series) alone, we include the metric kind as well as
257// the type (counter, gauge, etc) and any subtype information like histogram buckets.
258//
259// Specifically, though, we do _not_ include the actual metric value.  This type is used
260// specifically to look up the entry in a map for a metric in the sense of "get the metric whose
261// name is X and type is Y and has these tags".
262#[derive(Clone, Debug)]
263struct MetricRef {
264    series: MetricSeries,
265    value: Discriminant<MetricValue>,
266    bounds: Option<Vec<f64>>,
267}
268
269impl MetricRef {
270    /// Creates a `MetricRef` based on the given `Metric`.
271    pub fn from_metric(metric: &Metric) -> Self {
272        // Either the buckets for an aggregated histogram, or the quantiles for an aggregated summary.
273        let bounds = match metric.value() {
274            MetricValue::AggregatedHistogram { buckets, .. } => {
275                Some(buckets.iter().map(|b| b.upper_limit).collect())
276            }
277            MetricValue::AggregatedSummary { quantiles, .. } => {
278                Some(quantiles.iter().map(|q| q.quantile).collect())
279            }
280            _ => None,
281        };
282
283        Self {
284            series: metric.series().clone(),
285            value: discriminant(metric.value()),
286            bounds,
287        }
288    }
289}
290
291impl PartialEq for MetricRef {
292    fn eq(&self, other: &Self) -> bool {
293        self.series == other.series && self.value == other.value && self.bounds == other.bounds
294    }
295}
296
297impl Eq for MetricRef {}
298
299impl Hash for MetricRef {
300    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
301        self.series.hash(state);
302        self.value.hash(state);
303        if let Some(bounds) = &self.bounds {
304            for bound in bounds {
305                bound.to_bits().hash(state);
306            }
307        }
308    }
309}
310
311fn authorized<T: HttpBody>(req: &Request<T>, auth: &Option<Auth>) -> bool {
312    if let Some(auth) = auth {
313        let headers = req.headers();
314        if let Some(auth_header) = headers.get(hyper::header::AUTHORIZATION) {
315            let encoded_credentials = match auth {
316                Auth::Basic { user, password } => Some(HeaderValue::from_str(
317                    format!(
318                        "Basic {}",
319                        BASE64_STANDARD.encode(format!("{}:{}", user, password.inner()))
320                    )
321                    .as_str(),
322                )),
323                Auth::Bearer { token } => Some(HeaderValue::from_str(
324                    format!("Bearer {}", token.inner()).as_str(),
325                )),
326                #[cfg(feature = "aws-core")]
327                _ => None,
328            };
329
330            if let Some(Ok(encoded_credentials)) = encoded_credentials
331                && auth_header == encoded_credentials
332            {
333                return true;
334            }
335        }
336    } else {
337        return true;
338    }
339
340    false
341}
342
343#[derive(Clone)]
344struct Handler {
345    auth: Option<Auth>,
346    default_namespace: Option<String>,
347    buckets: Box<[f64]>,
348    quantiles: Box<[f64]>,
349    bytes_sent: Registered<BytesSent>,
350    events_sent: Registered<EventsSent>,
351}
352
353impl Handler {
354    fn handle<T: HttpBody>(
355        &self,
356        req: Request<T>,
357        metrics: &RwLock<IndexMap<MetricRef, (Metric, MetricMetadata)>>,
358    ) -> Response<Body> {
359        let mut response = Response::new(Body::empty());
360
361        match (authorized(&req, &self.auth), req.method(), req.uri().path()) {
362            (false, _, _) => {
363                *response.status_mut() = StatusCode::UNAUTHORIZED;
364                response.headers_mut().insert(
365                    http::header::WWW_AUTHENTICATE,
366                    HeaderValue::from_static("Basic, Bearer"),
367                );
368            }
369
370            (true, &Method::GET, "/metrics") => {
371                let metrics = metrics.read().expect(LOCK_FAILED);
372
373                let count = metrics.len();
374                let byte_size = metrics
375                    .iter()
376                    .map(|(_, (metric, _))| metric.estimated_json_encoded_size_of())
377                    .sum();
378
379                let mut collector = StringCollector::new();
380
381                for (_, (metric, _)) in metrics.iter() {
382                    collector.encode_metric(
383                        self.default_namespace.as_deref(),
384                        &self.buckets,
385                        &self.quantiles,
386                        metric,
387                    );
388                }
389
390                drop(metrics);
391
392                let body = collector.finish();
393                let body_size = body.size_of();
394
395                *response.body_mut() = body.into();
396
397                response.headers_mut().insert(
398                    "Content-Type",
399                    HeaderValue::from_static("text/plain; version=0.0.4"),
400                );
401
402                self.events_sent.emit(CountByteSize(count, byte_size));
403                self.bytes_sent.emit(ByteSize(body_size));
404            }
405
406            (true, _, _) => {
407                *response.status_mut() = StatusCode::NOT_FOUND;
408            }
409        }
410
411        response
412    }
413}
414
415impl PrometheusExporter {
416    fn new(config: PrometheusExporterConfig) -> Self {
417        Self {
418            server_shutdown_trigger: None,
419            config,
420            metrics: Arc::new(RwLock::new(IndexMap::new())),
421        }
422    }
423
424    async fn start_server_if_needed(&mut self) -> crate::Result<()> {
425        if self.server_shutdown_trigger.is_some() {
426            return Ok(());
427        }
428
429        let handler = Handler {
430            bytes_sent: register!(BytesSent::from(Protocol::HTTP)),
431            events_sent: register!(EventsSent::from(Output(None))),
432            default_namespace: self.config.default_namespace.clone(),
433            buckets: self.config.buckets.clone().into(),
434            quantiles: self.config.quantiles.clone().into(),
435            auth: self.config.auth.clone(),
436        };
437
438        let span = Span::current();
439        let metrics = Arc::clone(&self.metrics);
440
441        let new_service = make_service_fn(move |_| {
442            let span = Span::current();
443            let metrics = Arc::clone(&metrics);
444            let handler = handler.clone();
445
446            let inner = service_fn(move |req| {
447                let response = handler.handle(req, &metrics);
448
449                future::ok::<_, Infallible>(response)
450            });
451
452            let service = ServiceBuilder::new()
453                .layer(build_http_trace_layer(span.clone()))
454                .layer(CompressionLayer::new())
455                .service(inner);
456
457            async move { Ok::<_, Infallible>(service) }
458        });
459
460        let (trigger, tripwire) = Tripwire::new();
461
462        let tls = self.config.tls.clone();
463        let address = self.config.address;
464
465        let tls = MaybeTlsSettings::from_config(tls.as_ref(), true)?;
466        let listener = tls.bind(&address).await?;
467
468        tokio::spawn(async move {
469            info!(message = "Building HTTP server.", address = %address);
470
471            Server::builder(hyper::server::accept::from_stream(listener.accept_stream()))
472                .serve(new_service)
473                .with_graceful_shutdown(tripwire.then(crate::shutdown::tripwire_handler))
474                .instrument(span)
475                .await
476                .map_err(|error| error!("Server error: {}.", error))?;
477
478            Ok::<(), ()>(())
479        });
480
481        self.server_shutdown_trigger = Some(trigger);
482        Ok(())
483    }
484
485    fn normalize(&mut self, metric: Metric) -> Option<Metric> {
486        let new_metric = match metric.value() {
487            MetricValue::Distribution { .. } => {
488                // Convert the distribution as-is, and then absolute-ify it.
489                let (series, data, metadata) = metric.into_parts();
490                let (time, kind, value) = data.into_parts();
491
492                let new_value = if self.config.distributions_as_summaries {
493                    // We use a sketch when in summary mode because they're actually able to be
494                    // merged and provide correct output, unlike the aggregated summaries that
495                    // we handle from _sources_ like Prometheus.  The collector code itself
496                    // will render sketches as aggregated summaries, so we have continuity there.
497                    value
498                        .distribution_to_sketch()
499                        .expect("value should be distribution already")
500                } else {
501                    value
502                        .distribution_to_agg_histogram(&self.config.buckets)
503                        .expect("value should be distribution already")
504                };
505
506                let data = MetricData::from_parts(time, kind, new_value);
507                Metric::from_parts(series, data, metadata)
508            }
509            _ => metric,
510        };
511
512        match new_metric.kind() {
513            MetricKind::Absolute => Some(new_metric),
514            MetricKind::Incremental => {
515                let metrics = self.metrics.read().expect(LOCK_FAILED);
516                let metric_ref = MetricRef::from_metric(&new_metric);
517
518                if let Some(existing) = metrics.get(&metric_ref) {
519                    let mut current = existing.0.value().clone();
520                    if current.add(new_metric.value()) {
521                        // If we were able to add to the existing value (i.e. they were compatible),
522                        // return the result as an absolute metric.
523                        return Some(new_metric.with_value(current).into_absolute());
524                    }
525                }
526
527                // Otherwise, if we didn't have an existing value or we did and it was not
528                // compatible with the new value, simply return the new value as absolute.
529                Some(new_metric.into_absolute())
530            }
531        }
532    }
533}
534
535#[async_trait]
536impl StreamSink<Event> for PrometheusExporter {
537    async fn run(mut self: Box<Self>, mut input: BoxStream<'_, Event>) -> Result<(), ()> {
538        self.start_server_if_needed()
539            .await
540            .map_err(|error| error!("Failed to start Prometheus exporter: {}.", error))?;
541
542        let mut last_flush = Instant::now();
543        let flush_period = self.config.flush_period_secs;
544
545        while let Some(event) = input.next().await {
546            // If we've exceed our flush interval, go through all of the metrics we're currently
547            // tracking and remove any which have exceeded the flush interval in terms of not
548            // having been updated within that long of a time.
549            //
550            // TODO: Can we be smarter about this? As is, we might wait up to 2x the flush period to
551            // remove an expired metric depending on how things line up.  It'd be cool to _check_
552            // for expired metrics more often, but we also don't want to check _way_ too often, like
553            // every second, since then we're constantly iterating through every metric, etc etc.
554            if last_flush.elapsed() > self.config.flush_period_secs {
555                last_flush = Instant::now();
556
557                let mut metrics = self.metrics.write().expect(LOCK_FAILED);
558
559                metrics.retain(|_metric_ref, (_, metadata)| !metadata.has_expired(last_flush));
560            }
561
562            // Now process the metric we got.
563            let mut metric = event.into_metric();
564            let finalizers = metric.take_finalizers();
565
566            match self.normalize(metric) {
567                Some(normalized) => {
568                    let normalized = if self.config.suppress_timestamp {
569                        normalized.with_timestamp(None)
570                    } else {
571                        normalized
572                    };
573
574                    // We have a normalized metric, in absolute form.  If we're already aware of this
575                    // metric, update its expiration deadline, otherwise, start tracking it.
576                    let mut metrics = self.metrics.write().expect(LOCK_FAILED);
577
578                    match metrics.entry(MetricRef::from_metric(&normalized)) {
579                        Entry::Occupied(mut entry) => {
580                            let (data, metadata) = entry.get_mut();
581                            *data = normalized;
582                            metadata.refresh();
583                        }
584                        Entry::Vacant(entry) => {
585                            entry.insert((normalized, MetricMetadata::new(flush_period)));
586                        }
587                    }
588                    finalizers.update_status(EventStatus::Delivered);
589                }
590                _ => {
591                    emit!(PrometheusNormalizationError {});
592                    finalizers.update_status(EventStatus::Errored);
593                }
594            }
595        }
596
597        Ok(())
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use std::io::Read;
604
605    use chrono::{Duration, Utc};
606    use flate2::read::GzDecoder;
607    use futures::stream;
608    use indoc::indoc;
609    use similar_asserts::assert_eq;
610    use tokio::{sync::oneshot::error::TryRecvError, time};
611    use vector_lib::{
612        event::{MetricTags, StatisticKind},
613        finalization::{BatchNotifier, BatchStatus},
614        metric_tags, samples,
615        sensitive_string::SensitiveString,
616    };
617
618    use super::*;
619    use crate::{
620        config::ProxyConfig,
621        event::metric::{Metric, MetricValue},
622        http::HttpClient,
623        sinks::prometheus::{distribution_to_agg_histogram, distribution_to_ddsketch},
624        test_util::{
625            components::{SINK_TAGS, run_and_assert_sink_compliance},
626            next_addr, random_string, trace_init,
627        },
628        tls::MaybeTlsSettings,
629    };
630
631    #[test]
632    fn generate_config() {
633        crate::test_util::test_generate_config::<PrometheusExporterConfig>();
634    }
635
636    #[tokio::test]
637    async fn prometheus_notls() {
638        export_and_fetch_simple(None).await;
639    }
640
641    #[tokio::test]
642    async fn prometheus_tls() {
643        let mut tls_config = TlsEnableableConfig::test_config();
644        tls_config.options.verify_hostname = Some(false);
645        export_and_fetch_simple(Some(tls_config)).await;
646    }
647
648    #[tokio::test]
649    async fn prometheus_noauth() {
650        let (name1, event1) = create_metric_gauge(None, 123.4);
651        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
652        let events = vec![event1, event2];
653
654        let response_result = export_and_fetch_with_auth(None, None, events, false).await;
655
656        assert!(response_result.is_ok());
657
658        let body = response_result.expect("Cannot extract body from the response");
659
660        assert!(body.contains(&format!(
661            indoc! {r#"
662               # HELP {name} {name}
663               # TYPE {name} gauge
664               {name}{{some_tag="some_value"}} 123.4
665            "#},
666            name = name1
667        )));
668        assert!(body.contains(&format!(
669            indoc! {r#"
670               # HELP {name} {name}
671               # TYPE {name} gauge
672               {name}{{some_tag="some_value"}} 3
673            "#},
674            name = name2
675        )));
676    }
677
678    #[tokio::test]
679    async fn prometheus_successful_basic_auth() {
680        let (name1, event1) = create_metric_gauge(None, 123.4);
681        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
682        let events = vec![event1, event2];
683
684        let auth_config = Auth::Basic {
685            user: "user".to_string(),
686            password: SensitiveString::from("password".to_string()),
687        };
688
689        let response_result =
690            export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
691                .await;
692
693        assert!(response_result.is_ok());
694
695        let body = response_result.expect("Cannot extract body from the response");
696
697        assert!(body.contains(&format!(
698            indoc! {r#"
699               # HELP {name} {name}
700               # TYPE {name} gauge
701               {name}{{some_tag="some_value"}} 123.4
702            "#},
703            name = name1
704        )));
705        assert!(body.contains(&format!(
706            indoc! {r#"
707               # HELP {name} {name}
708               # TYPE {name} gauge
709               {name}{{some_tag="some_value"}} 3
710            "#},
711            name = name2
712        )));
713    }
714
715    #[tokio::test]
716    async fn prometheus_successful_token_auth() {
717        let (name1, event1) = create_metric_gauge(None, 123.4);
718        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
719        let events = vec![event1, event2];
720
721        let auth_config = Auth::Bearer {
722            token: SensitiveString::from("token".to_string()),
723        };
724
725        let response_result =
726            export_and_fetch_with_auth(Some(auth_config.clone()), Some(auth_config), events, false)
727                .await;
728
729        assert!(response_result.is_ok());
730
731        let body = response_result.expect("Cannot extract body from the response");
732
733        assert!(body.contains(&format!(
734            indoc! {r#"
735               # HELP {name} {name}
736               # TYPE {name} gauge
737               {name}{{some_tag="some_value"}} 123.4
738            "#},
739            name = name1
740        )));
741        assert!(body.contains(&format!(
742            indoc! {r#"
743               # HELP {name} {name}
744               # TYPE {name} gauge
745               {name}{{some_tag="some_value"}} 3
746            "#},
747            name = name2
748        )));
749    }
750
751    #[tokio::test]
752    async fn prometheus_missing_auth() {
753        let (_, event1) = create_metric_gauge(None, 123.4);
754        let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
755        let events = vec![event1, event2];
756
757        let server_auth_config = Auth::Bearer {
758            token: SensitiveString::from("token".to_string()),
759        };
760
761        let response_result =
762            export_and_fetch_with_auth(Some(server_auth_config), None, events, false).await;
763
764        assert!(response_result.is_err());
765        assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
766    }
767
768    #[tokio::test]
769    async fn prometheus_wrong_auth() {
770        let (_, event1) = create_metric_gauge(None, 123.4);
771        let (_, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
772        let events = vec![event1, event2];
773
774        let server_auth_config = Auth::Bearer {
775            token: SensitiveString::from("token".to_string()),
776        };
777
778        let client_auth_config = Auth::Basic {
779            user: "user".to_string(),
780            password: SensitiveString::from("password".to_string()),
781        };
782
783        let response_result = export_and_fetch_with_auth(
784            Some(server_auth_config),
785            Some(client_auth_config),
786            events,
787            false,
788        )
789        .await;
790
791        assert!(response_result.is_err());
792        assert_eq!(response_result.unwrap_err(), StatusCode::UNAUTHORIZED);
793    }
794
795    #[tokio::test]
796    async fn encoding_gzip() {
797        let (name1, event1) = create_metric_gauge(None, 123.4);
798        let events = vec![event1];
799
800        let body_raw = export_and_fetch_raw(None, events, false, Some(String::from("gzip"))).await;
801        let expected = format!(
802            indoc! {r#"
803                # HELP {name} {name}
804                # TYPE {name} gauge
805                {name}{{some_tag="some_value"}} 123.4
806            "#},
807            name = name1,
808        );
809
810        let mut gz = GzDecoder::new(&body_raw[..]);
811        let mut body_decoded = String::new();
812        let _ = gz.read_to_string(&mut body_decoded);
813
814        assert!(body_raw.len() < expected.len());
815        assert_eq!(body_decoded, expected);
816    }
817
818    #[tokio::test]
819    async fn updates_timestamps() {
820        let timestamp1 = Utc::now();
821        let (name, event1) = create_metric_gauge(None, 123.4);
822        let event1 = Event::from(event1.into_metric().with_timestamp(Some(timestamp1)));
823        let (_, event2) = create_metric_gauge(Some(name.clone()), 12.0);
824        let timestamp2 = timestamp1 + Duration::seconds(1);
825        let event2 = Event::from(event2.into_metric().with_timestamp(Some(timestamp2)));
826        let events = vec![event1, event2];
827
828        let body = export_and_fetch(None, events, false).await;
829        let timestamp = timestamp2.timestamp_millis();
830        assert_eq!(
831            body,
832            format!(
833                indoc! {r#"
834                    # HELP {name} {name}
835                    # TYPE {name} gauge
836                    {name}{{some_tag="some_value"}} 135.4 {timestamp}
837                "#},
838                name = name,
839                timestamp = timestamp
840            )
841        );
842    }
843
844    #[tokio::test]
845    async fn suppress_timestamp() {
846        let timestamp = Utc::now();
847        let (name, event) = create_metric_gauge(None, 123.4);
848        let event = Event::from(event.into_metric().with_timestamp(Some(timestamp)));
849        let events = vec![event];
850
851        let body = export_and_fetch(None, events, true).await;
852        assert_eq!(
853            body,
854            format!(
855                indoc! {r#"
856                    # HELP {name} {name}
857                    # TYPE {name} gauge
858                    {name}{{some_tag="some_value"}} 123.4
859                "#},
860                name = name,
861            )
862        );
863    }
864
865    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
866    /// > Label names MUST be unique within a LabelSet.
867    /// Prometheus itself will reject the metric with an error. Largely to remain backward compatible with older versions of Vector,
868    /// we only publish the last tag in the list.
869    #[tokio::test]
870    async fn prometheus_duplicate_labels() {
871        let (name, event) = create_metric_with_tags(
872            None,
873            MetricValue::Gauge { value: 123.4 },
874            Some(metric_tags!("code" => "200", "code" => "success")),
875        );
876        let events = vec![event];
877
878        let response_result = export_and_fetch_with_auth(None, None, events, false).await;
879
880        assert!(response_result.is_ok());
881
882        let body = response_result.expect("Cannot extract body from the response");
883
884        assert!(body.contains(&format!(
885            indoc! {r#"
886               # HELP {name} {name}
887               # TYPE {name} gauge
888               {name}{{code="success"}} 123.4
889            "# },
890            name = name
891        )));
892    }
893
894    async fn export_and_fetch_raw(
895        tls_config: Option<TlsEnableableConfig>,
896        mut events: Vec<Event>,
897        suppress_timestamp: bool,
898        encoding: Option<String>,
899    ) -> hyper::body::Bytes {
900        trace_init();
901
902        let client_settings = MaybeTlsSettings::from_config(tls_config.as_ref(), false).unwrap();
903        let proto = client_settings.http_protocol_name();
904
905        let address = next_addr();
906        let config = PrometheusExporterConfig {
907            address,
908            tls: tls_config,
909            suppress_timestamp,
910            ..Default::default()
911        };
912
913        // Set up acknowledgement notification
914        let mut receiver = BatchNotifier::apply_to(&mut events[..]);
915        assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
916
917        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
918        let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
919        let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
920            sink,
921            stream::iter(events).chain(stream::once(async move {
922                // Wait a bit to have time to scrape metrics
923                time::sleep(time::Duration::from_millis(500)).await;
924                delayed_event
925            })),
926            &SINK_TAGS,
927        ));
928
929        time::sleep(time::Duration::from_millis(100)).await;
930
931        // Events are marked as delivered as soon as they are aggregated.
932        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
933
934        let mut request = Request::get(format!("{proto}://{address}/metrics"))
935            .body(Body::empty())
936            .expect("Error creating request.");
937
938        if let Some(ref encoding) = encoding {
939            request.headers_mut().insert(
940                http::header::ACCEPT_ENCODING,
941                HeaderValue::from_str(encoding.as_str()).unwrap(),
942            );
943        }
944
945        let proxy = ProxyConfig::default();
946        let result = HttpClient::new(client_settings, &proxy)
947            .unwrap()
948            .send(request)
949            .await
950            .expect("Could not fetch query");
951
952        assert!(result.status().is_success());
953
954        if encoding.is_some() {
955            assert!(
956                result
957                    .headers()
958                    .contains_key(http::header::CONTENT_ENCODING)
959            );
960        }
961
962        let body = result.into_body();
963        let bytes = hyper::body::to_bytes(body)
964            .await
965            .expect("Reading body failed");
966
967        sink_handle.await.unwrap();
968
969        bytes
970    }
971
972    async fn export_and_fetch(
973        tls_config: Option<TlsEnableableConfig>,
974        events: Vec<Event>,
975        suppress_timestamp: bool,
976    ) -> String {
977        let bytes = export_and_fetch_raw(tls_config, events, suppress_timestamp, None);
978        String::from_utf8(bytes.await.to_vec()).unwrap()
979    }
980
981    async fn export_and_fetch_with_auth(
982        server_auth_config: Option<Auth>,
983        client_auth_config: Option<Auth>,
984        mut events: Vec<Event>,
985        suppress_timestamp: bool,
986    ) -> Result<String, http::status::StatusCode> {
987        trace_init();
988
989        let client_settings = MaybeTlsSettings::from_config(None, false).unwrap();
990        let proto = client_settings.http_protocol_name();
991
992        let address = next_addr();
993        let config = PrometheusExporterConfig {
994            address,
995            auth: server_auth_config,
996            tls: None,
997            suppress_timestamp,
998            ..Default::default()
999        };
1000
1001        // Set up acknowledgement notification
1002        let mut receiver = BatchNotifier::apply_to(&mut events[..]);
1003        assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
1004
1005        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1006        let (_, delayed_event) = create_metric_gauge(Some("delayed".to_string()), 123.4);
1007        let sink_handle = tokio::spawn(run_and_assert_sink_compliance(
1008            sink,
1009            stream::iter(events).chain(stream::once(async move {
1010                // Wait a bit to have time to scrape metrics
1011                time::sleep(time::Duration::from_millis(500)).await;
1012                delayed_event
1013            })),
1014            &SINK_TAGS,
1015        ));
1016
1017        time::sleep(time::Duration::from_millis(100)).await;
1018
1019        // Events are marked as delivered as soon as they are aggregated.
1020        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
1021
1022        let mut request = Request::get(format!("{proto}://{address}/metrics"))
1023            .body(Body::empty())
1024            .expect("Error creating request.");
1025
1026        if let Some(client_auth_config) = client_auth_config {
1027            client_auth_config.apply(&mut request);
1028        }
1029
1030        let proxy = ProxyConfig::default();
1031        let result = HttpClient::new(client_settings, &proxy)
1032            .unwrap()
1033            .send(request)
1034            .await
1035            .expect("Could not fetch query");
1036
1037        if !result.status().is_success() {
1038            return Err(result.status());
1039        }
1040
1041        let body = result.into_body();
1042        let bytes = hyper::body::to_bytes(body)
1043            .await
1044            .expect("Reading body failed");
1045        let result = String::from_utf8(bytes.to_vec()).unwrap();
1046
1047        sink_handle.await.unwrap();
1048
1049        Ok(result)
1050    }
1051
1052    async fn export_and_fetch_simple(tls_config: Option<TlsEnableableConfig>) {
1053        let (name1, event1) = create_metric_gauge(None, 123.4);
1054        let (name2, event2) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1055        let events = vec![event1, event2];
1056
1057        let body = export_and_fetch(tls_config, events, false).await;
1058
1059        assert!(body.contains(&format!(
1060            indoc! {r#"
1061               # HELP {name} {name}
1062               # TYPE {name} gauge
1063               {name}{{some_tag="some_value"}} 123.4
1064            "#},
1065            name = name1
1066        )));
1067        assert!(body.contains(&format!(
1068            indoc! {r#"
1069               # HELP {name} {name}
1070               # TYPE {name} gauge
1071               {name}{{some_tag="some_value"}} 3
1072            "#},
1073            name = name2
1074        )));
1075    }
1076
1077    pub fn create_metric_gauge(name: Option<String>, value: f64) -> (String, Event) {
1078        create_metric(name, MetricValue::Gauge { value })
1079    }
1080
1081    pub fn create_metric_set(name: Option<String>, values: Vec<&'static str>) -> (String, Event) {
1082        create_metric(
1083            name,
1084            MetricValue::Set {
1085                values: values.into_iter().map(Into::into).collect(),
1086            },
1087        )
1088    }
1089
1090    fn create_metric(name: Option<String>, value: MetricValue) -> (String, Event) {
1091        create_metric_with_tags(name, value, Some(metric_tags!("some_tag" => "some_value")))
1092    }
1093
1094    fn create_metric_with_tags(
1095        name: Option<String>,
1096        value: MetricValue,
1097        tags: Option<MetricTags>,
1098    ) -> (String, Event) {
1099        let name = name.unwrap_or_else(|| format!("vector_set_{}", random_string(16)));
1100        let event = Metric::new(name.clone(), MetricKind::Incremental, value)
1101            .with_tags(tags)
1102            .into();
1103        (name, event)
1104    }
1105
1106    #[tokio::test]
1107    async fn sink_absolute() {
1108        let config = PrometheusExporterConfig {
1109            address: next_addr(), // Not actually bound, just needed to fill config
1110            tls: None,
1111            ..Default::default()
1112        };
1113
1114        let sink = PrometheusExporter::new(config);
1115
1116        let m1 = Metric::new(
1117            "absolute",
1118            MetricKind::Absolute,
1119            MetricValue::Counter { value: 32. },
1120        )
1121        .with_tags(Some(metric_tags!("tag1" => "value1")));
1122
1123        let m2 = m1.clone().with_tags(Some(metric_tags!("tag1" => "value2")));
1124
1125        let events = vec![
1126            Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 32. })),
1127            Event::Metric(m2.clone().with_value(MetricValue::Counter { value: 33. })),
1128            Event::Metric(m1.clone().with_value(MetricValue::Counter { value: 40. })),
1129        ];
1130
1131        let metrics_handle = Arc::clone(&sink.metrics);
1132
1133        let sink = VectorSink::from_event_streamsink(sink);
1134        let input_events = stream::iter(events).map(Into::into);
1135        sink.run(input_events).await.unwrap();
1136
1137        let metrics_after = metrics_handle.read().unwrap();
1138
1139        let expected_m1 = metrics_after
1140            .get(&MetricRef::from_metric(&m1))
1141            .expect("m1 should exist");
1142        let expected_m1_value = MetricValue::Counter { value: 40. };
1143        assert_eq!(expected_m1.0.value(), &expected_m1_value);
1144
1145        let expected_m2 = metrics_after
1146            .get(&MetricRef::from_metric(&m2))
1147            .expect("m2 should exist");
1148        let expected_m2_value = MetricValue::Counter { value: 33. };
1149        assert_eq!(expected_m2.0.value(), &expected_m2_value);
1150    }
1151
1152    #[tokio::test]
1153    async fn sink_distributions_as_histograms() {
1154        // When we get summary distributions, unless we've been configured to actually emit
1155        // summaries for distributions, we just forcefully turn them into histograms.  This is
1156        // simpler and uses less memory, as aggregated histograms are better supported by Prometheus
1157        // since they can actually be aggregated anywhere in the pipeline -- so long as the buckets
1158        // are the same -- without loss of accuracy.
1159
1160        // This expects that the default for the sink is to render distributions as aggregated histograms.
1161        let config = PrometheusExporterConfig {
1162            address: next_addr(), // Not actually bound, just needed to fill config
1163            tls: None,
1164            ..Default::default()
1165        };
1166        let buckets = config.buckets.clone();
1167
1168        let sink = PrometheusExporter::new(config);
1169
1170        // Define a series of incremental distribution updates.
1171        let base_summary_metric = Metric::new(
1172            "distrib_summary",
1173            MetricKind::Incremental,
1174            MetricValue::Distribution {
1175                statistic: StatisticKind::Summary,
1176                samples: samples!(1.0 => 1, 3.0 => 2),
1177            },
1178        );
1179
1180        let base_histogram_metric = Metric::new(
1181            "distrib_histo",
1182            MetricKind::Incremental,
1183            MetricValue::Distribution {
1184                statistic: StatisticKind::Histogram,
1185                samples: samples!(7.0 => 1, 9.0 => 2),
1186            },
1187        );
1188
1189        let metrics = vec![
1190            base_summary_metric.clone(),
1191            base_summary_metric
1192                .clone()
1193                .with_value(MetricValue::Distribution {
1194                    statistic: StatisticKind::Summary,
1195                    samples: samples!(1.0 => 2, 2.9 => 1),
1196                }),
1197            base_summary_metric
1198                .clone()
1199                .with_value(MetricValue::Distribution {
1200                    statistic: StatisticKind::Summary,
1201                    samples: samples!(1.0 => 4, 3.2 => 1),
1202                }),
1203            base_histogram_metric.clone(),
1204            base_histogram_metric
1205                .clone()
1206                .with_value(MetricValue::Distribution {
1207                    statistic: StatisticKind::Histogram,
1208                    samples: samples!(7.0 => 2, 9.9 => 1),
1209                }),
1210            base_histogram_metric
1211                .clone()
1212                .with_value(MetricValue::Distribution {
1213                    statistic: StatisticKind::Histogram,
1214                    samples: samples!(7.0 => 4, 10.2 => 1),
1215                }),
1216        ];
1217
1218        // Figure out what the merged distributions should add up to.
1219        let mut merged_summary = base_summary_metric.clone();
1220        assert!(merged_summary.update(&metrics[1]));
1221        assert!(merged_summary.update(&metrics[2]));
1222        let expected_summary = distribution_to_agg_histogram(merged_summary, &buckets)
1223            .expect("input summary metric should have been distribution")
1224            .into_absolute();
1225
1226        let mut merged_histogram = base_histogram_metric.clone();
1227        assert!(merged_histogram.update(&metrics[4]));
1228        assert!(merged_histogram.update(&metrics[5]));
1229        let expected_histogram = distribution_to_agg_histogram(merged_histogram, &buckets)
1230            .expect("input histogram metric should have been distribution")
1231            .into_absolute();
1232
1233        // TODO: make a new metric based on merged_distrib_histogram, with expected_histogram_value,
1234        // so that the discriminant matches and our lookup in the indexmap can actually find it
1235
1236        // Now run the events through the sink and see what ends up in the internal metric map.
1237        let metrics_handle = Arc::clone(&sink.metrics);
1238
1239        let events = metrics
1240            .iter()
1241            .cloned()
1242            .map(Event::Metric)
1243            .collect::<Vec<_>>();
1244
1245        let sink = VectorSink::from_event_streamsink(sink);
1246        let input_events = stream::iter(events).map(Into::into);
1247        sink.run(input_events).await.unwrap();
1248
1249        let metrics_after = metrics_handle.read().unwrap();
1250
1251        // Both metrics should be present, and both should be aggregated histograms.
1252        assert_eq!(metrics_after.len(), 2);
1253
1254        let actual_summary = metrics_after
1255            .get(&MetricRef::from_metric(&expected_summary))
1256            .expect("summary metric should exist");
1257        assert_eq!(actual_summary.0.value(), expected_summary.value());
1258
1259        let actual_histogram = metrics_after
1260            .get(&MetricRef::from_metric(&expected_histogram))
1261            .expect("histogram metric should exist");
1262        assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1263    }
1264
1265    #[tokio::test]
1266    async fn sink_distributions_as_summaries() {
1267        // When we get summary distributions, unless we've been configured to actually emit
1268        // summaries for distributions, we just forcefully turn them into histograms.  This is
1269        // simpler and uses less memory, as aggregated histograms are better supported by Prometheus
1270        // since they can actually be aggregated anywhere in the pipeline -- so long as the buckets
1271        // are the same -- without loss of accuracy.
1272
1273        // This assumes that when we turn on `distributions_as_summaries`, we'll get aggregated
1274        // summaries from distributions.  This is technically true, but the way this test works is
1275        // that we check the internal metric data, which, when in this mode, will actually be a
1276        // sketch (so that we can merge without loss of accuracy).
1277        //
1278        // The render code is actually what will end up rendering those sketches as aggregated
1279        // summaries in the scrape output.
1280        let config = PrometheusExporterConfig {
1281            address: next_addr(), // Not actually bound, just needed to fill config
1282            tls: None,
1283            distributions_as_summaries: true,
1284            ..Default::default()
1285        };
1286
1287        let sink = PrometheusExporter::new(config);
1288
1289        // Define a series of incremental distribution updates.
1290        let base_summary_metric = Metric::new(
1291            "distrib_summary",
1292            MetricKind::Incremental,
1293            MetricValue::Distribution {
1294                statistic: StatisticKind::Summary,
1295                samples: samples!(1.0 => 1, 3.0 => 2),
1296            },
1297        );
1298
1299        let base_histogram_metric = Metric::new(
1300            "distrib_histo",
1301            MetricKind::Incremental,
1302            MetricValue::Distribution {
1303                statistic: StatisticKind::Histogram,
1304                samples: samples!(7.0 => 1, 9.0 => 2),
1305            },
1306        );
1307
1308        let metrics = vec![
1309            base_summary_metric.clone(),
1310            base_summary_metric
1311                .clone()
1312                .with_value(MetricValue::Distribution {
1313                    statistic: StatisticKind::Summary,
1314                    samples: samples!(1.0 => 2, 2.9 => 1),
1315                }),
1316            base_summary_metric
1317                .clone()
1318                .with_value(MetricValue::Distribution {
1319                    statistic: StatisticKind::Summary,
1320                    samples: samples!(1.0 => 4, 3.2 => 1),
1321                }),
1322            base_histogram_metric.clone(),
1323            base_histogram_metric
1324                .clone()
1325                .with_value(MetricValue::Distribution {
1326                    statistic: StatisticKind::Histogram,
1327                    samples: samples!(7.0 => 2, 9.9 => 1),
1328                }),
1329            base_histogram_metric
1330                .clone()
1331                .with_value(MetricValue::Distribution {
1332                    statistic: StatisticKind::Histogram,
1333                    samples: samples!(7.0 => 4, 10.2 => 1),
1334                }),
1335        ];
1336
1337        // Figure out what the merged distributions should add up to.
1338        let mut merged_summary = base_summary_metric.clone();
1339        assert!(merged_summary.update(&metrics[1]));
1340        assert!(merged_summary.update(&metrics[2]));
1341        let expected_summary = distribution_to_ddsketch(merged_summary)
1342            .expect("input summary metric should have been distribution")
1343            .into_absolute();
1344
1345        let mut merged_histogram = base_histogram_metric.clone();
1346        assert!(merged_histogram.update(&metrics[4]));
1347        assert!(merged_histogram.update(&metrics[5]));
1348        let expected_histogram = distribution_to_ddsketch(merged_histogram)
1349            .expect("input histogram metric should have been distribution")
1350            .into_absolute();
1351
1352        // Now run the events through the sink and see what ends up in the internal metric map.
1353        let metrics_handle = Arc::clone(&sink.metrics);
1354
1355        let events = metrics
1356            .iter()
1357            .cloned()
1358            .map(Event::Metric)
1359            .collect::<Vec<_>>();
1360
1361        let sink = VectorSink::from_event_streamsink(sink);
1362        let input_events = stream::iter(events).map(Into::into);
1363        sink.run(input_events).await.unwrap();
1364
1365        let metrics_after = metrics_handle.read().unwrap();
1366
1367        // Both metrics should be present, and both should be aggregated histograms.
1368        assert_eq!(metrics_after.len(), 2);
1369
1370        let actual_summary = metrics_after
1371            .get(&MetricRef::from_metric(&expected_summary))
1372            .expect("summary metric should exist");
1373        assert_eq!(actual_summary.0.value(), expected_summary.value());
1374
1375        let actual_histogram = metrics_after
1376            .get(&MetricRef::from_metric(&expected_histogram))
1377            .expect("histogram metric should exist");
1378        assert_eq!(actual_histogram.0.value(), expected_histogram.value());
1379    }
1380
1381    #[tokio::test]
1382    async fn sink_gauge_incremental_absolute_mix() {
1383        // Because Prometheus does not, itself, have the concept of an Incremental metric, the
1384        // Exporter must apply a normalization function that converts all metrics to Absolute ones
1385        // before handling them.
1386
1387        // This test ensures that this normalization works correctly when applied to a mix of both
1388        // Incremental and Absolute inputs.
1389        let config = PrometheusExporterConfig {
1390            address: next_addr(), // Not actually bound, just needed to fill config
1391            tls: None,
1392            ..Default::default()
1393        };
1394
1395        let sink = PrometheusExporter::new(config);
1396
1397        let base_absolute_gauge_metric = Metric::new(
1398            "gauge",
1399            MetricKind::Absolute,
1400            MetricValue::Gauge { value: 100.0 },
1401        );
1402
1403        let base_incremental_gauge_metric = Metric::new(
1404            "gauge",
1405            MetricKind::Incremental,
1406            MetricValue::Gauge { value: -10.0 },
1407        );
1408
1409        let metrics = vec![
1410            base_absolute_gauge_metric.clone(),
1411            base_absolute_gauge_metric
1412                .clone()
1413                .with_value(MetricValue::Gauge { value: 333.0 }),
1414            base_incremental_gauge_metric.clone(),
1415            base_incremental_gauge_metric
1416                .clone()
1417                .with_value(MetricValue::Gauge { value: 4.0 }),
1418        ];
1419
1420        // Now run the events through the sink and see what ends up in the internal metric map.
1421        let metrics_handle = Arc::clone(&sink.metrics);
1422
1423        let events = metrics
1424            .iter()
1425            .cloned()
1426            .map(Event::Metric)
1427            .collect::<Vec<_>>();
1428
1429        let sink = VectorSink::from_event_streamsink(sink);
1430        let input_events = stream::iter(events).map(Into::into);
1431        sink.run(input_events).await.unwrap();
1432
1433        let metrics_after = metrics_handle.read().unwrap();
1434
1435        // The gauge metric should be present.
1436        assert_eq!(metrics_after.len(), 1);
1437
1438        let expected_gauge = Metric::new(
1439            "gauge",
1440            MetricKind::Absolute,
1441            MetricValue::Gauge { value: 327.0 },
1442        );
1443
1444        let actual_gauge = metrics_after
1445            .get(&MetricRef::from_metric(&expected_gauge))
1446            .expect("gauge metric should exist");
1447        assert_eq!(actual_gauge.0.value(), expected_gauge.value());
1448    }
1449}
1450
1451#[cfg(all(test, feature = "prometheus-integration-tests"))]
1452mod integration_tests {
1453    #![allow(clippy::print_stdout)] // tests
1454    #![allow(clippy::print_stderr)] // tests
1455    #![allow(clippy::dbg_macro)] // tests
1456
1457    use chrono::Utc;
1458    use futures::{future::ready, stream};
1459    use serde_json::Value;
1460    use tokio::{sync::mpsc, time};
1461    use tokio_stream::wrappers::UnboundedReceiverStream;
1462
1463    use super::*;
1464    use crate::{
1465        config::ProxyConfig,
1466        http::HttpClient,
1467        test_util::{
1468            components::{SINK_TAGS, run_and_assert_sink_compliance},
1469            trace_init,
1470        },
1471    };
1472
1473    fn sink_exporter_address() -> String {
1474        std::env::var("SINK_EXPORTER_ADDRESS").unwrap_or_else(|_| "127.0.0.1:9101".into())
1475    }
1476
1477    fn prometheus_address() -> String {
1478        std::env::var("PROMETHEUS_ADDRESS").unwrap_or_else(|_| "localhost:9090".into())
1479    }
1480
1481    async fn fetch_exporter_body() -> String {
1482        let url = format!("http://{}/metrics", sink_exporter_address());
1483        let request = Request::get(url)
1484            .body(Body::empty())
1485            .expect("Error creating request.");
1486        let proxy = ProxyConfig::default();
1487        let result = HttpClient::new(None, &proxy)
1488            .unwrap()
1489            .send(request)
1490            .await
1491            .expect("Could not send request");
1492        let result = hyper::body::to_bytes(result.into_body())
1493            .await
1494            .expect("Error fetching body");
1495        String::from_utf8_lossy(&result).to_string()
1496    }
1497
1498    async fn prometheus_query(query: &str) -> Value {
1499        let url = format!(
1500            "http://{}/api/v1/query?query={}",
1501            prometheus_address(),
1502            query
1503        );
1504        let request = Request::post(url)
1505            .body(Body::empty())
1506            .expect("Error creating request.");
1507        let proxy = ProxyConfig::default();
1508        let result = HttpClient::new(None, &proxy)
1509            .unwrap()
1510            .send(request)
1511            .await
1512            .expect("Could not fetch query");
1513        let result = hyper::body::to_bytes(result.into_body())
1514            .await
1515            .expect("Error fetching body");
1516        let result = String::from_utf8_lossy(&result);
1517        serde_json::from_str(result.as_ref()).expect("Invalid JSON from prometheus")
1518    }
1519
1520    #[tokio::test]
1521    async fn prometheus_metrics() {
1522        trace_init();
1523
1524        prometheus_scrapes_metrics().await;
1525        time::sleep(time::Duration::from_millis(500)).await;
1526        reset_on_flush_period().await;
1527        expire_on_flush_period().await;
1528    }
1529
1530    async fn prometheus_scrapes_metrics() {
1531        let start = Utc::now().timestamp();
1532
1533        let config = PrometheusExporterConfig {
1534            address: sink_exporter_address().parse().unwrap(),
1535            flush_period_secs: Duration::from_secs(2),
1536            ..Default::default()
1537        };
1538        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1539        let (name, event) = tests::create_metric_gauge(None, 123.4);
1540        let (_, delayed_event) = tests::create_metric_gauge(Some("delayed".to_string()), 123.4);
1541
1542        run_and_assert_sink_compliance(
1543            sink,
1544            stream::once(ready(event)).chain(stream::once(async move {
1545                // Wait a bit for the prometheus server to scrape the metrics
1546                time::sleep(time::Duration::from_secs(2)).await;
1547                delayed_event
1548            })),
1549            &SINK_TAGS,
1550        )
1551        .await;
1552
1553        // Now try to download them from prometheus
1554        let result = prometheus_query(&name).await;
1555
1556        let data = &result["data"]["result"][0];
1557        assert_eq!(data["metric"]["__name__"], Value::String(name));
1558        assert_eq!(
1559            data["metric"]["instance"],
1560            Value::String(sink_exporter_address())
1561        );
1562        assert_eq!(
1563            data["metric"]["some_tag"],
1564            Value::String("some_value".into())
1565        );
1566        assert!(data["value"][0].as_f64().unwrap() >= start as f64);
1567        assert_eq!(data["value"][1], Value::String("123.4".into()));
1568    }
1569
1570    async fn reset_on_flush_period() {
1571        let config = PrometheusExporterConfig {
1572            address: sink_exporter_address().parse().unwrap(),
1573            flush_period_secs: Duration::from_secs(3),
1574            ..Default::default()
1575        };
1576        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1577        let (tx, rx) = mpsc::unbounded_channel();
1578        let input_events = UnboundedReceiverStream::new(rx);
1579
1580        let input_events = input_events.map(Into::into);
1581        let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1582
1583        // Create two sets with different names but the same size.
1584        let (name1, event) = tests::create_metric_set(None, vec!["0", "1", "2"]);
1585        tx.send(event).expect("Failed to send.");
1586        let (name2, event) = tests::create_metric_set(None, vec!["3", "4", "5"]);
1587        tx.send(event).expect("Failed to send.");
1588
1589        // Wait for the Prometheus server to scrape them, and then query it to ensure both metrics
1590        // have their correct set size value.
1591        time::sleep(time::Duration::from_secs(2)).await;
1592
1593        // Now query Prometheus to make sure we see them there.
1594        let result = prometheus_query(&name1).await;
1595        assert_eq!(
1596            result["data"]["result"][0]["value"][1],
1597            Value::String("3".into())
1598        );
1599        let result = prometheus_query(&name2).await;
1600        assert_eq!(
1601            result["data"]["result"][0]["value"][1],
1602            Value::String("3".into())
1603        );
1604
1605        // Wait a few more seconds to ensure that the two original sets have logically expired.
1606        // We'll update `name2` but not `name1`, which should lead to both being expired, but
1607        // `name2` being recreated with two values only, while `name1` is entirely gone.
1608        time::sleep(time::Duration::from_secs(3)).await;
1609
1610        let (name2, event) = tests::create_metric_set(Some(name2), vec!["8", "9"]);
1611        tx.send(event).expect("Failed to send.");
1612
1613        // Again, wait for the Prometheus server to scrape the metrics, and then query it again.
1614        time::sleep(time::Duration::from_secs(2)).await;
1615        let result = prometheus_query(&name1).await;
1616        assert_eq!(result["data"]["result"][0]["value"][1], Value::Null);
1617        let result = prometheus_query(&name2).await;
1618        assert_eq!(
1619            result["data"]["result"][0]["value"][1],
1620            Value::String("2".into())
1621        );
1622
1623        drop(tx);
1624        sink_handle.await.unwrap();
1625    }
1626
1627    async fn expire_on_flush_period() {
1628        let config = PrometheusExporterConfig {
1629            address: sink_exporter_address().parse().unwrap(),
1630            flush_period_secs: Duration::from_secs(3),
1631            ..Default::default()
1632        };
1633        let (sink, _) = config.build(SinkContext::default()).await.unwrap();
1634        let (tx, rx) = mpsc::unbounded_channel();
1635        let input_events = UnboundedReceiverStream::new(rx);
1636
1637        let input_events = input_events.map(Into::into);
1638        let sink_handle = tokio::spawn(async move { sink.run(input_events).await.unwrap() });
1639
1640        // metrics that will not be updated for a full flush period and therefore should expire
1641        let (name1, event) = tests::create_metric_set(None, vec!["42"]);
1642        tx.send(event).expect("Failed to send.");
1643        let (name2, event) = tests::create_metric_gauge(None, 100.0);
1644        tx.send(event).expect("Failed to send.");
1645
1646        // Wait a bit for the sink to process the events
1647        time::sleep(time::Duration::from_secs(1)).await;
1648
1649        // Exporter should present both metrics at first
1650        let body = fetch_exporter_body().await;
1651        assert!(body.contains(&name1));
1652        assert!(body.contains(&name2));
1653
1654        // Wait long enough to put us past flush_period_secs for the metric that wasn't updated
1655        for _ in 0..7 {
1656            // Update the first metric, ensuring it doesn't expire
1657            let (_, event) = tests::create_metric_set(Some(name1.clone()), vec!["43"]);
1658            tx.send(event).expect("Failed to send.");
1659
1660            // Wait a bit for time to pass
1661            time::sleep(time::Duration::from_secs(1)).await;
1662        }
1663
1664        // Exporter should present only the one that got updated
1665        let body = fetch_exporter_body().await;
1666        assert!(body.contains(&name1));
1667        assert!(!body.contains(&name2));
1668
1669        drop(tx);
1670        sink_handle.await.unwrap();
1671    }
1672}