vector/sources/prometheus/
remote_write.rs

1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::Bytes;
4use prost::Message;
5use vector_lib::{
6    config::LogNamespace, configurable::configurable_component, prometheus::parser::proto,
7};
8use warp::http::{HeaderMap, StatusCode};
9
10use super::parser;
11
12use crate::{
13    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
14    config::{
15        GenerateConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
16    },
17    event::Event,
18    http::KeepaliveConfig,
19    internal_events::PrometheusRemoteWriteParseError,
20    serde::bool_or_struct,
21    sources::{
22        self,
23        util::{HttpSource, decompress_body, http::HttpMethod},
24    },
25    tls::TlsEnableableConfig,
26};
27
28/// Defines the behavior for handling conflicting metric metadata.
29#[configurable_component]
30#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
31#[serde(rename_all = "snake_case")]
32pub enum MetadataConflictStrategy {
33    /// Silently ignore metadata conflicts, keeping the first metadata entry. This aligns with Prometheus/Thanos behavior.
34    Ignore,
35    /// Reject requests with conflicting metadata by returning an HTTP 400 error. This is the default to preserve backwards compatibility.
36    #[default]
37    Reject,
38}
39
40/// Configuration for the `prometheus_remote_write` source.
41#[configurable_component(source(
42    "prometheus_remote_write",
43    "Receive metric via the Prometheus Remote Write protocol."
44))]
45#[derive(Clone, Debug)]
46pub struct PrometheusRemoteWriteConfig {
47    /// The socket address to accept connections on.
48    ///
49    /// The address _must_ include a port.
50    #[configurable(metadata(docs::examples = "0.0.0.0:9090"))]
51    address: SocketAddr,
52
53    /// The URL path on which metric POST requests are accepted.
54    #[serde(default = "default_path")]
55    #[configurable(metadata(docs::examples = "/api/v1/write"))]
56    #[configurable(metadata(docs::examples = "/remote-write"))]
57    path: String,
58
59    #[configurable(derived)]
60    tls: Option<TlsEnableableConfig>,
61
62    #[configurable(derived)]
63    #[configurable(metadata(docs::advanced))]
64    auth: Option<HttpServerAuthConfig>,
65
66    /// Defines the behavior for handling conflicting metric metadata.
67    #[configurable(metadata(docs::advanced))]
68    #[serde(default)]
69    metadata_conflict_strategy: MetadataConflictStrategy,
70
71    #[configurable(derived)]
72    #[serde(default, deserialize_with = "bool_or_struct")]
73    acknowledgements: SourceAcknowledgementsConfig,
74
75    #[configurable(derived)]
76    #[serde(default)]
77    keepalive: KeepaliveConfig,
78
79    /// Whether to skip/discard received samples with NaN values.
80    ///
81    /// When enabled, any metric sample with a NaN value will be filtered out
82    /// during parsing, preventing downstream processing of invalid metrics.
83    #[configurable(metadata(docs::advanced))]
84    #[serde(default)]
85    skip_nan_values: bool,
86}
87
88impl PrometheusRemoteWriteConfig {
89    #[cfg(test)]
90    pub fn from_address(address: SocketAddr) -> Self {
91        Self {
92            address,
93            path: default_path(),
94            tls: None,
95            auth: None,
96            metadata_conflict_strategy: MetadataConflictStrategy::default(),
97            acknowledgements: false.into(),
98            keepalive: KeepaliveConfig::default(),
99            skip_nan_values: false,
100        }
101    }
102}
103
104fn default_path() -> String {
105    "/".to_string()
106}
107
108impl GenerateConfig for PrometheusRemoteWriteConfig {
109    fn generate_config() -> toml::Value {
110        toml::Value::try_from(Self {
111            address: "127.0.0.1:9090".parse().unwrap(),
112            path: default_path(),
113            tls: None,
114            auth: None,
115            metadata_conflict_strategy: MetadataConflictStrategy::default(),
116            acknowledgements: SourceAcknowledgementsConfig::default(),
117            keepalive: KeepaliveConfig::default(),
118            skip_nan_values: false,
119        })
120        .unwrap()
121    }
122}
123
124#[async_trait::async_trait]
125#[typetag::serde(name = "prometheus_remote_write")]
126impl SourceConfig for PrometheusRemoteWriteConfig {
127    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
128        let source = RemoteWriteSource {
129            metadata_conflict_strategy: self.metadata_conflict_strategy,
130            skip_nan_values: self.skip_nan_values,
131        };
132        source.run(
133            self.address,
134            self.path.as_str(),
135            HttpMethod::Post,
136            StatusCode::OK,
137            true,
138            self.tls.as_ref(),
139            self.auth.as_ref(),
140            cx,
141            self.acknowledgements,
142            self.keepalive.clone(),
143        )
144    }
145
146    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
147        vec![SourceOutput::new_metrics()]
148    }
149
150    fn can_acknowledge(&self) -> bool {
151        true
152    }
153}
154
155#[derive(Clone)]
156struct RemoteWriteSource {
157    metadata_conflict_strategy: MetadataConflictStrategy,
158    skip_nan_values: bool,
159}
160
161impl RemoteWriteSource {
162    fn decode_body(&self, body: Bytes) -> Result<Vec<Event>, ErrorMessage> {
163        let request = proto::WriteRequest::decode(body).map_err(|error| {
164            emit!(PrometheusRemoteWriteParseError {
165                error: error.clone()
166            });
167            ErrorMessage::new(
168                StatusCode::BAD_REQUEST,
169                format!("Could not decode write request: {error}"),
170            )
171        })?;
172        parser::parse_request(
173            request,
174            self.metadata_conflict_strategy,
175            self.skip_nan_values,
176        )
177        .map_err(|error| {
178            ErrorMessage::new(
179                StatusCode::BAD_REQUEST,
180                format!("Could not decode write request: {error}"),
181            )
182        })
183    }
184}
185
186impl HttpSource for RemoteWriteSource {
187    fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
188        // Default to snappy decoding the request body.
189        decompress_body(encoding_header.or(Some("snappy")), body)
190    }
191
192    fn build_events(
193        &self,
194        body: Bytes,
195        _header_map: &HeaderMap,
196        _query_parameters: &HashMap<String, String>,
197        _full_path: &str,
198    ) -> Result<Vec<Event>, ErrorMessage> {
199        let events = self.decode_body(body)?;
200        Ok(events)
201    }
202}
203
204#[cfg(test)]
205mod test {
206    use chrono::{SubsecRound as _, Utc};
207    use vector_lib::{
208        event::{EventStatus, Metric, MetricKind, MetricValue},
209        metric_tags,
210    };
211
212    use super::*;
213    use crate::{
214        SourceSender,
215        config::{SinkConfig, SinkContext},
216        sinks::prometheus::remote_write::RemoteWriteConfig,
217        test_util::{self, wait_for_tcp},
218        tls::MaybeTlsSettings,
219    };
220
221    #[test]
222    fn generate_config() {
223        crate::test_util::test_generate_config::<PrometheusRemoteWriteConfig>();
224    }
225
226    #[tokio::test]
227    async fn receives_metrics_over_http() {
228        receives_metrics(None).await;
229    }
230
231    #[tokio::test]
232    async fn receives_metrics_over_https() {
233        receives_metrics(Some(TlsEnableableConfig::test_config())).await;
234    }
235
236    async fn receives_metrics(tls: Option<TlsEnableableConfig>) {
237        let address = test_util::next_addr();
238        let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
239
240        let proto = MaybeTlsSettings::from_config(tls.as_ref(), true)
241            .unwrap()
242            .http_protocol_name();
243        let source = PrometheusRemoteWriteConfig {
244            address,
245            path: default_path(),
246            auth: None,
247            tls: tls.clone(),
248            metadata_conflict_strategy: Default::default(),
249            acknowledgements: SourceAcknowledgementsConfig::default(),
250            keepalive: KeepaliveConfig::default(),
251            skip_nan_values: false,
252        };
253        let source = source
254            .build(SourceContext::new_test(tx, None))
255            .await
256            .unwrap();
257        tokio::spawn(source);
258        wait_for_tcp(address).await;
259
260        let sink = RemoteWriteConfig {
261            endpoint: format!("{}://localhost:{}/", proto, address.port()),
262            tls: tls.map(|tls| tls.options),
263            ..Default::default()
264        };
265        let (sink, _) = sink
266            .build(SinkContext::default())
267            .await
268            .expect("Error building config.");
269
270        let events = make_events();
271        let events_copy = events.clone();
272        let mut output = test_util::spawn_collect_ready(
273            async move {
274                sink.run_events(events_copy).await.unwrap();
275            },
276            rx,
277            1,
278        )
279        .await;
280
281        // The MetricBuffer used by the sink may reorder the metrics, so
282        // put them back into order before comparing.
283        output.sort_unstable_by_key(|event| event.as_metric().name().to_owned());
284
285        vector_lib::assert_event_data_eq!(events, output);
286    }
287
288    fn make_events() -> Vec<Event> {
289        let timestamp = || Utc::now().trunc_subsecs(3);
290        vec![
291            Metric::new(
292                "counter_1",
293                MetricKind::Absolute,
294                MetricValue::Counter { value: 42.0 },
295            )
296            .with_timestamp(Some(timestamp()))
297            .into(),
298            Metric::new(
299                "gauge_2",
300                MetricKind::Absolute,
301                MetricValue::Gauge { value: 41.0 },
302            )
303            .with_timestamp(Some(timestamp()))
304            .into(),
305            Metric::new(
306                "histogram_3",
307                MetricKind::Absolute,
308                MetricValue::AggregatedHistogram {
309                    buckets: vector_lib::buckets![ 2.3 => 11, 4.2 => 85 ],
310                    count: 96,
311                    sum: 156.2,
312                },
313            )
314            .with_timestamp(Some(timestamp()))
315            .into(),
316            Metric::new(
317                "summary_4",
318                MetricKind::Absolute,
319                MetricValue::AggregatedSummary {
320                    quantiles: vector_lib::quantiles![ 0.1 => 1.2, 0.5 => 3.6, 0.9 => 5.2 ],
321                    count: 23,
322                    sum: 8.6,
323                },
324            )
325            .with_timestamp(Some(timestamp()))
326            .into(),
327        ]
328    }
329
330    async fn send_request_and_assert(port: u16, request_body: Vec<u8>) {
331        // Send the request via HTTP POST
332        let client = reqwest::Client::new();
333        let response = client
334            .post(format!("http://localhost:{}{}", port, default_path()))
335            .header("Content-Type", "application/x-protobuf")
336            .header("Content-Encoding", "snappy")
337            .body(request_body)
338            .send()
339            .await
340            .unwrap();
341
342        // Should succeed (not return 400) despite conflicting metadata
343        assert!(
344            response.status().is_success(),
345            "Expected success but got: {}",
346            response.status()
347        );
348    }
349
350    fn create_default_request_body() -> Vec<u8> {
351        use prost::Message;
352        use vector_lib::prometheus::parser::proto;
353
354        let request = proto::WriteRequest {
355            metadata: vec![proto::MetricMetadata {
356                r#type: proto::MetricType::Gauge as i32,
357                metric_family_name: "test_metric".into(),
358                help: "Gauge definition".into(),
359                unit: String::default(),
360            }],
361            timeseries: vec![proto::TimeSeries {
362                labels: vec![proto::Label {
363                    name: "__name__".into(),
364                    value: "test_metric".into(),
365                }],
366                samples: vec![proto::Sample {
367                    value: 42.0,
368                    timestamp: chrono::Utc::now().timestamp_millis(),
369                }],
370            }],
371        };
372
373        let mut buf = Vec::new();
374        request.encode(&mut buf).unwrap();
375
376        // Compress with snappy as expected by the remote_write endpoint
377        snap::raw::Encoder::new().compress_vec(&buf).unwrap()
378    }
379
380    fn create_conflicting_metadata_request_body() -> Vec<u8> {
381        use prost::Message;
382        use vector_lib::prometheus::parser::proto;
383
384        let request = proto::WriteRequest {
385            metadata: vec![
386                proto::MetricMetadata {
387                    r#type: proto::MetricType::Gauge as i32,
388                    metric_family_name: "test_metric".into(),
389                    help: "First definition as gauge".into(),
390                    unit: String::default(),
391                },
392                proto::MetricMetadata {
393                    r#type: proto::MetricType::Counter as i32,
394                    metric_family_name: "test_metric".into(),
395                    help: "Conflicting definition as counter".into(),
396                    unit: String::default(),
397                },
398            ],
399            timeseries: vec![proto::TimeSeries {
400                labels: vec![proto::Label {
401                    name: "__name__".into(),
402                    value: "test_metric".into(),
403                }],
404                samples: vec![proto::Sample {
405                    value: 42.0,
406                    timestamp: chrono::Utc::now().timestamp_millis(),
407                }],
408            }],
409        };
410
411        let mut buf = Vec::new();
412        request.encode(&mut buf).unwrap();
413
414        // Compress with snappy as expected by the remote_write endpoint
415        snap::raw::Encoder::new().compress_vec(&buf).unwrap()
416    }
417
418    async fn send_request(port: u16, request_body: Vec<u8>) -> reqwest::Response {
419        let client = reqwest::Client::new();
420        client
421            .post(format!("http://localhost:{}{}", port, default_path()))
422            .header("Content-Type", "application/x-protobuf")
423            .header("Content-Encoding", "snappy")
424            .body(request_body)
425            .send()
426            .await
427            .unwrap()
428    }
429
430    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
431    /// > Label names MUST be unique within a LabelSet.
432    /// Prometheus itself will reject the metric with an error. Largely to remain backward compatible with older versions of Vector,
433    /// we accept the metric, but take the last label in the list.
434    #[tokio::test]
435    async fn receives_metrics_duplicate_labels() {
436        let address = test_util::next_addr();
437        let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
438
439        let source = PrometheusRemoteWriteConfig {
440            address,
441            path: default_path(),
442            auth: None,
443            tls: None,
444            metadata_conflict_strategy: Default::default(),
445            acknowledgements: SourceAcknowledgementsConfig::default(),
446            keepalive: KeepaliveConfig::default(),
447            skip_nan_values: false,
448        };
449        let source = source
450            .build(SourceContext::new_test(tx, None))
451            .await
452            .unwrap();
453        tokio::spawn(source);
454        wait_for_tcp(address).await;
455
456        let sink = RemoteWriteConfig {
457            endpoint: format!("http://localhost:{}/", address.port()),
458            ..Default::default()
459        };
460        let (sink, _) = sink
461            .build(SinkContext::default())
462            .await
463            .expect("Error building config.");
464
465        let timestamp = Utc::now().trunc_subsecs(3);
466
467        let events = vec![
468            Metric::new(
469                "gauge_2",
470                MetricKind::Absolute,
471                MetricValue::Gauge { value: 41.0 },
472            )
473            .with_timestamp(Some(timestamp))
474            .with_tags(Some(metric_tags! {
475                "code" => "200".to_string(),
476                "code" => "success".to_string(),
477            }))
478            .into(),
479        ];
480
481        let expected = vec![
482            Metric::new(
483                "gauge_2",
484                MetricKind::Absolute,
485                MetricValue::Gauge { value: 41.0 },
486            )
487            .with_timestamp(Some(timestamp))
488            .with_tags(Some(metric_tags! {
489                "code" => "success".to_string(),
490            }))
491            .into(),
492        ];
493
494        let output = test_util::spawn_collect_ready(
495            async move {
496                sink.run_events(events).await.unwrap();
497            },
498            rx,
499            1,
500        )
501        .await;
502
503        vector_lib::assert_event_data_eq!(expected, output);
504    }
505
506    #[tokio::test]
507    async fn test_skip_nan_values_enabled() {
508        let address = test_util::next_addr();
509        let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
510
511        let source = PrometheusRemoteWriteConfig {
512            address,
513            path: default_path(),
514            auth: None,
515            tls: None,
516            metadata_conflict_strategy: Default::default(),
517            acknowledgements: SourceAcknowledgementsConfig::default(),
518            keepalive: KeepaliveConfig::default(),
519            skip_nan_values: true,
520        };
521        let source = source
522            .build(SourceContext::new_test(tx, None))
523            .await
524            .unwrap();
525        tokio::spawn(source);
526        wait_for_tcp(address).await;
527
528        // Create a request with NaN values
529        let request_body = {
530            use prost::Message;
531            use vector_lib::prometheus::parser::proto;
532
533            let request = proto::WriteRequest {
534                metadata: vec![],
535                timeseries: vec![
536                    proto::TimeSeries {
537                        labels: vec![proto::Label {
538                            name: "__name__".into(),
539                            value: "test_metric_valid".into(),
540                        }],
541                        samples: vec![proto::Sample {
542                            value: 42.0,
543                            timestamp: chrono::Utc::now().timestamp_millis(),
544                        }],
545                    },
546                    proto::TimeSeries {
547                        labels: vec![proto::Label {
548                            name: "__name__".into(),
549                            value: "test_metric_nan".into(),
550                        }],
551                        samples: vec![proto::Sample {
552                            value: f64::NAN,
553                            timestamp: chrono::Utc::now().timestamp_millis(),
554                        }],
555                    },
556                ],
557            };
558
559            let mut buf = Vec::new();
560            request.encode(&mut buf).unwrap();
561
562            // Compress with snappy as expected by the remote_write endpoint
563            snap::raw::Encoder::new().compress_vec(&buf).unwrap()
564        };
565
566        send_request_and_assert(address.port(), request_body).await;
567
568        // Verify we only received the valid metric (NaN metric should be filtered)
569        let output = test_util::collect_ready(rx).await;
570        assert_eq!(output.len(), 1);
571
572        let metric = output[0].as_metric();
573        assert_eq!(metric.name(), "test_metric_valid");
574        assert_eq!(metric.value(), &MetricValue::Gauge { value: 42.0 });
575    }
576
577    #[tokio::test]
578    async fn test_skip_nan_values_disabled() {
579        let address = test_util::next_addr();
580        let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
581
582        let source = PrometheusRemoteWriteConfig {
583            address,
584            path: default_path(),
585            auth: None,
586            tls: None,
587            metadata_conflict_strategy: Default::default(),
588            acknowledgements: SourceAcknowledgementsConfig::default(),
589            keepalive: KeepaliveConfig::default(),
590            skip_nan_values: false,
591        };
592        let source = source
593            .build(SourceContext::new_test(tx, None))
594            .await
595            .unwrap();
596        tokio::spawn(source);
597        wait_for_tcp(address).await;
598
599        // Create a request with NaN values
600        let request_body = {
601            use prost::Message;
602            use vector_lib::prometheus::parser::proto;
603
604            let request = proto::WriteRequest {
605                metadata: vec![],
606                timeseries: vec![
607                    proto::TimeSeries {
608                        labels: vec![proto::Label {
609                            name: "__name__".into(),
610                            value: "test_metric_valid".into(),
611                        }],
612                        samples: vec![proto::Sample {
613                            value: 42.0,
614                            timestamp: chrono::Utc::now().timestamp_millis(),
615                        }],
616                    },
617                    proto::TimeSeries {
618                        labels: vec![proto::Label {
619                            name: "__name__".into(),
620                            value: "test_metric_nan".into(),
621                        }],
622                        samples: vec![proto::Sample {
623                            value: f64::NAN,
624                            timestamp: chrono::Utc::now().timestamp_millis(),
625                        }],
626                    },
627                ],
628            };
629
630            let mut buf = Vec::new();
631            request.encode(&mut buf).unwrap();
632
633            // Compress with snappy as expected by the remote_write endpoint
634            snap::raw::Encoder::new().compress_vec(&buf).unwrap()
635        };
636
637        send_request_and_assert(address.port(), request_body).await;
638
639        // Verify we received both metrics (including NaN metric)
640        let mut output = test_util::collect_ready(rx).await;
641        assert_eq!(output.len(), 2);
642
643        // Sort by name for predictable testing
644        output.sort_by(|a, b| a.as_metric().name().cmp(b.as_metric().name()));
645
646        // Check the NaN metric
647        let nan_metric = output[0].as_metric();
648        assert_eq!(nan_metric.name(), "test_metric_nan");
649        match nan_metric.value() {
650            MetricValue::Gauge { value } => {
651                assert!(value.is_nan());
652            }
653            _ => panic!("Expected gauge metric"),
654        }
655
656        // Check the valid metric
657        let valid_metric = output[1].as_metric();
658        assert_eq!(valid_metric.name(), "test_metric_valid");
659        assert_eq!(valid_metric.value(), &MetricValue::Gauge { value: 42.0 });
660    }
661
662    #[tokio::test]
663    async fn receives_metrics_on_custom_path() {
664        let address = test_util::next_addr();
665        let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
666
667        let source = PrometheusRemoteWriteConfig {
668            address,
669            path: "/api/v1/write".to_string(),
670            auth: None,
671            tls: None,
672            metadata_conflict_strategy: Default::default(),
673            acknowledgements: SourceAcknowledgementsConfig::default(),
674            keepalive: KeepaliveConfig::default(),
675            skip_nan_values: false,
676        };
677        let source = source
678            .build(SourceContext::new_test(tx, None))
679            .await
680            .unwrap();
681        tokio::spawn(source);
682        wait_for_tcp(address).await;
683
684        let sink = RemoteWriteConfig {
685            endpoint: format!("http://localhost:{}/api/v1/write", address.port()),
686            ..Default::default()
687        };
688        let (sink, _) = sink
689            .build(SinkContext::default())
690            .await
691            .expect("Error building config.");
692
693        let events = make_events();
694        let events_copy = events.clone();
695        let mut output = test_util::spawn_collect_ready(
696            async move {
697                sink.run_events(events_copy).await.unwrap();
698            },
699            rx,
700            1,
701        )
702        .await;
703
704        // The MetricBuffer used by the sink may reorder the metrics, so
705        // put them back into order before comparing.
706        output.sort_unstable_by_key(|event| event.as_metric().name().to_owned());
707
708        vector_lib::assert_event_data_eq!(events, output);
709    }
710
711    #[tokio::test]
712    async fn rejects_metrics_on_wrong_path() {
713        let address = test_util::next_addr();
714        let (tx, _rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
715
716        let source = PrometheusRemoteWriteConfig {
717            address,
718            path: "/api/v1/write".to_string(),
719            auth: None,
720            tls: None,
721            metadata_conflict_strategy: Default::default(),
722            acknowledgements: SourceAcknowledgementsConfig::default(),
723            keepalive: KeepaliveConfig::default(),
724            skip_nan_values: false,
725        };
726        let source = source
727            .build(SourceContext::new_test(tx, None))
728            .await
729            .unwrap();
730        tokio::spawn(source);
731        wait_for_tcp(address).await;
732
733        // Try to send to the root path, which should be rejected
734        let client = reqwest::Client::new();
735        let response = client
736            .post(format!("http://localhost:{}/wrong/path", address.port()))
737            .header("Content-Type", "application/x-protobuf")
738            .body(vec![])
739            .send()
740            .await
741            .unwrap();
742
743        // Should return an error status code since we're sending to the wrong path
744        assert!(
745            response.status().is_client_error(),
746            "Expected 4xx error, got {}",
747            response.status()
748        );
749    }
750
751    #[tokio::test]
752    async fn receives_metrics_on_default_path() {
753        let address = test_util::next_addr();
754        let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
755
756        let source = PrometheusRemoteWriteConfig {
757            address,
758            path: default_path(),
759            auth: None,
760            tls: None,
761            metadata_conflict_strategy: Default::default(),
762            acknowledgements: SourceAcknowledgementsConfig::default(),
763            keepalive: KeepaliveConfig::default(),
764            skip_nan_values: false,
765        };
766        let source = source
767            .build(SourceContext::new_test(tx, None))
768            .await
769            .unwrap();
770        tokio::spawn(source);
771        wait_for_tcp(address).await;
772
773        let request_body = create_default_request_body();
774        send_request_and_assert(address.port(), request_body).await;
775
776        // Verify we received the metric data
777        let output = test_util::collect_ready(rx).await;
778        assert_eq!(output.len(), 1);
779
780        let metric = output[0].as_metric();
781        assert_eq!(metric.name(), "test_metric");
782        assert_eq!(metric.value(), &MetricValue::Gauge { value: 42.0 });
783    }
784
785    #[tokio::test]
786    async fn rejects_metrics_on_wrong_path_with_skip_nan_enabled() {
787        let address = test_util::next_addr();
788        let (tx, _rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
789
790        let source = PrometheusRemoteWriteConfig {
791            address,
792            path: "/api/v1/write".to_string(),
793            auth: None,
794            tls: None,
795            metadata_conflict_strategy: Default::default(),
796            acknowledgements: SourceAcknowledgementsConfig::default(),
797            keepalive: KeepaliveConfig::default(),
798            skip_nan_values: true,
799        };
800        let source = source
801            .build(SourceContext::new_test(tx, None))
802            .await
803            .unwrap();
804        tokio::spawn(source);
805        wait_for_tcp(address).await;
806
807        // Try to send to the root path, which should be rejected
808        let client = reqwest::Client::new();
809        let response = client
810            .post(format!("http://localhost:{}/wrong/path", address.port()))
811            .header("Content-Type", "application/x-protobuf")
812            .body(vec![])
813            .send()
814            .await
815            .unwrap();
816
817        // Should return an error status code since we're sending to the wrong path
818        assert!(
819            response.status().is_client_error(),
820            "Expected 4xx error, got {}",
821            response.status()
822        );
823    }
824
825    #[tokio::test]
826    async fn accepts_conflicting_metadata() {
827        let address = test_util::next_addr();
828        let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
829
830        let source = PrometheusRemoteWriteConfig {
831            address,
832            path: default_path(),
833            auth: None,
834            tls: None,
835            metadata_conflict_strategy: MetadataConflictStrategy::Ignore,
836            acknowledgements: SourceAcknowledgementsConfig::default(),
837            keepalive: KeepaliveConfig::default(),
838            skip_nan_values: false,
839        };
840        let source = source
841            .build(SourceContext::new_test(tx, None))
842            .await
843            .unwrap();
844        tokio::spawn(source);
845        wait_for_tcp(address).await;
846
847        let request_body = create_conflicting_metadata_request_body();
848        let response = send_request(address.port(), request_body).await;
849
850        // Should succeed (not return 400) despite conflicting metadata
851        assert!(
852            response.status().is_success(),
853            "Expected success but got: {}",
854            response.status()
855        );
856
857        // Verify we received the metric data
858        let output = test_util::collect_ready(rx).await;
859        assert_eq!(output.len(), 1);
860
861        let metric = output[0].as_metric();
862        assert_eq!(metric.name(), "test_metric");
863        assert_eq!(metric.value(), &MetricValue::Gauge { value: 42.0 });
864    }
865
866    #[tokio::test]
867    async fn rejects_conflicting_metadata() {
868        let address = test_util::next_addr();
869        let (tx, _rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
870
871        let source = PrometheusRemoteWriteConfig {
872            address,
873            path: default_path(),
874            auth: None,
875            tls: None,
876            metadata_conflict_strategy: MetadataConflictStrategy::Reject,
877            acknowledgements: SourceAcknowledgementsConfig::default(),
878            keepalive: KeepaliveConfig::default(),
879            skip_nan_values: false,
880        };
881        let source = source
882            .build(SourceContext::new_test(tx, None))
883            .await
884            .unwrap();
885        tokio::spawn(source);
886        wait_for_tcp(address).await;
887
888        let request_body = create_conflicting_metadata_request_body();
889        let response = send_request(address.port(), request_body).await;
890
891        // Should be rejected
892        assert_eq!(response.status(), StatusCode::BAD_REQUEST);
893    }
894}
895
896#[cfg(all(test, feature = "prometheus-integration-tests"))]
897mod integration_tests {
898    use std::net::{SocketAddr, ToSocketAddrs as _};
899
900    use tokio::time::Duration;
901
902    use super::*;
903    use crate::test_util::components::{HTTP_PUSH_SOURCE_TAGS, run_and_assert_source_compliance};
904
905    fn source_receive_address() -> SocketAddr {
906        let address = std::env::var("REMOTE_WRITE_SOURCE_RECEIVE_ADDRESS")
907            .unwrap_or_else(|_| "127.0.0.1:9102".into());
908        // TODO: This logic should maybe be moved up into the source, and possibly into other
909        // sources, wrapped in a new socket address type that does the lookup during config parsing.
910        address
911            .to_socket_addrs()
912            .unwrap()
913            .next()
914            .unwrap_or_else(|| panic!("Socket address {address:?} did not resolve"))
915    }
916
917    #[tokio::test]
918    async fn receive_something() {
919        // TODO: This test depends on the single instance of Prometheus that we spin up for
920        // integration tests both scraping an endpoint and then also remote writing that stuff to
921        // this remote write source.  This makes sense from a "test the actual behavior" standpoint
922        // but it feels a little fragile.
923        //
924        // It could be nice to split up the Prometheus integration tests in the future, or
925        // maybe there's a way to do a one-shot remote write from Prometheus? Not sure.
926        let config = PrometheusRemoteWriteConfig {
927            address: source_receive_address(),
928            path: default_path(),
929            auth: None,
930            tls: None,
931            metadata_conflict_strategy: Default::default(),
932            acknowledgements: SourceAcknowledgementsConfig::default(),
933            keepalive: KeepaliveConfig::default(),
934            skip_nan_values: false,
935        };
936
937        let events = run_and_assert_source_compliance(
938            config,
939            Duration::from_secs(5),
940            &HTTP_PUSH_SOURCE_TAGS,
941        )
942        .await;
943        assert!(!events.is_empty());
944    }
945}