1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::Bytes;
4use prost::Message;
5use vector_lib::{
6 config::LogNamespace, configurable::configurable_component, prometheus::parser::proto,
7};
8use warp::http::{HeaderMap, StatusCode};
9
10use super::parser;
11
12use crate::{
13 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
14 config::{
15 GenerateConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
16 },
17 event::Event,
18 http::KeepaliveConfig,
19 internal_events::PrometheusRemoteWriteParseError,
20 serde::bool_or_struct,
21 sources::{
22 self,
23 util::{HttpSource, decompress_body, http::HttpMethod},
24 },
25 tls::TlsEnableableConfig,
26};
27
28#[configurable_component]
30#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
31#[serde(rename_all = "snake_case")]
32pub enum MetadataConflictStrategy {
33 Ignore,
35 #[default]
37 Reject,
38}
39
40#[configurable_component(source(
42 "prometheus_remote_write",
43 "Receive metric via the Prometheus Remote Write protocol."
44))]
45#[derive(Clone, Debug)]
46pub struct PrometheusRemoteWriteConfig {
47 #[configurable(metadata(docs::examples = "0.0.0.0:9090"))]
51 address: SocketAddr,
52
53 #[serde(default = "default_path")]
55 #[configurable(metadata(docs::examples = "/api/v1/write"))]
56 #[configurable(metadata(docs::examples = "/remote-write"))]
57 path: String,
58
59 #[configurable(derived)]
60 tls: Option<TlsEnableableConfig>,
61
62 #[configurable(derived)]
63 #[configurable(metadata(docs::advanced))]
64 auth: Option<HttpServerAuthConfig>,
65
66 #[configurable(metadata(docs::advanced))]
68 #[serde(default)]
69 metadata_conflict_strategy: MetadataConflictStrategy,
70
71 #[configurable(derived)]
72 #[serde(default, deserialize_with = "bool_or_struct")]
73 acknowledgements: SourceAcknowledgementsConfig,
74
75 #[configurable(derived)]
76 #[serde(default)]
77 keepalive: KeepaliveConfig,
78
79 #[configurable(metadata(docs::advanced))]
84 #[serde(default)]
85 skip_nan_values: bool,
86}
87
88impl PrometheusRemoteWriteConfig {
89 #[cfg(test)]
90 pub fn from_address(address: SocketAddr) -> Self {
91 Self {
92 address,
93 path: default_path(),
94 tls: None,
95 auth: None,
96 metadata_conflict_strategy: MetadataConflictStrategy::default(),
97 acknowledgements: false.into(),
98 keepalive: KeepaliveConfig::default(),
99 skip_nan_values: false,
100 }
101 }
102}
103
104fn default_path() -> String {
105 "/".to_string()
106}
107
108impl GenerateConfig for PrometheusRemoteWriteConfig {
109 fn generate_config() -> toml::Value {
110 toml::Value::try_from(Self {
111 address: "127.0.0.1:9090".parse().unwrap(),
112 path: default_path(),
113 tls: None,
114 auth: None,
115 metadata_conflict_strategy: MetadataConflictStrategy::default(),
116 acknowledgements: SourceAcknowledgementsConfig::default(),
117 keepalive: KeepaliveConfig::default(),
118 skip_nan_values: false,
119 })
120 .unwrap()
121 }
122}
123
124#[async_trait::async_trait]
125#[typetag::serde(name = "prometheus_remote_write")]
126impl SourceConfig for PrometheusRemoteWriteConfig {
127 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
128 let source = RemoteWriteSource {
129 metadata_conflict_strategy: self.metadata_conflict_strategy,
130 skip_nan_values: self.skip_nan_values,
131 };
132 source.run(
133 self.address,
134 self.path.as_str(),
135 HttpMethod::Post,
136 StatusCode::OK,
137 true,
138 self.tls.as_ref(),
139 self.auth.as_ref(),
140 cx,
141 self.acknowledgements,
142 self.keepalive.clone(),
143 )
144 }
145
146 fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
147 vec![SourceOutput::new_metrics()]
148 }
149
150 fn can_acknowledge(&self) -> bool {
151 true
152 }
153}
154
155#[derive(Clone)]
156struct RemoteWriteSource {
157 metadata_conflict_strategy: MetadataConflictStrategy,
158 skip_nan_values: bool,
159}
160
161impl RemoteWriteSource {
162 fn decode_body(&self, body: Bytes) -> Result<Vec<Event>, ErrorMessage> {
163 let request = proto::WriteRequest::decode(body).map_err(|error| {
164 emit!(PrometheusRemoteWriteParseError {
165 error: error.clone()
166 });
167 ErrorMessage::new(
168 StatusCode::BAD_REQUEST,
169 format!("Could not decode write request: {error}"),
170 )
171 })?;
172 parser::parse_request(
173 request,
174 self.metadata_conflict_strategy,
175 self.skip_nan_values,
176 )
177 .map_err(|error| {
178 ErrorMessage::new(
179 StatusCode::BAD_REQUEST,
180 format!("Could not decode write request: {error}"),
181 )
182 })
183 }
184}
185
186impl HttpSource for RemoteWriteSource {
187 fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
188 decompress_body(encoding_header.or(Some("snappy")), body)
190 }
191
192 fn build_events(
193 &self,
194 body: Bytes,
195 _header_map: &HeaderMap,
196 _query_parameters: &HashMap<String, String>,
197 _full_path: &str,
198 ) -> Result<Vec<Event>, ErrorMessage> {
199 let events = self.decode_body(body)?;
200 Ok(events)
201 }
202}
203
204#[cfg(test)]
205mod test {
206 use chrono::{SubsecRound as _, Utc};
207 use vector_lib::{
208 event::{EventStatus, Metric, MetricKind, MetricValue},
209 metric_tags,
210 };
211
212 use super::*;
213 use crate::{
214 SourceSender,
215 config::{SinkConfig, SinkContext},
216 sinks::prometheus::remote_write::RemoteWriteConfig,
217 test_util::{self, wait_for_tcp},
218 tls::MaybeTlsSettings,
219 };
220
221 #[test]
222 fn generate_config() {
223 crate::test_util::test_generate_config::<PrometheusRemoteWriteConfig>();
224 }
225
226 #[tokio::test]
227 async fn receives_metrics_over_http() {
228 receives_metrics(None).await;
229 }
230
231 #[tokio::test]
232 async fn receives_metrics_over_https() {
233 receives_metrics(Some(TlsEnableableConfig::test_config())).await;
234 }
235
236 async fn receives_metrics(tls: Option<TlsEnableableConfig>) {
237 let address = test_util::next_addr();
238 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
239
240 let proto = MaybeTlsSettings::from_config(tls.as_ref(), true)
241 .unwrap()
242 .http_protocol_name();
243 let source = PrometheusRemoteWriteConfig {
244 address,
245 path: default_path(),
246 auth: None,
247 tls: tls.clone(),
248 metadata_conflict_strategy: Default::default(),
249 acknowledgements: SourceAcknowledgementsConfig::default(),
250 keepalive: KeepaliveConfig::default(),
251 skip_nan_values: false,
252 };
253 let source = source
254 .build(SourceContext::new_test(tx, None))
255 .await
256 .unwrap();
257 tokio::spawn(source);
258 wait_for_tcp(address).await;
259
260 let sink = RemoteWriteConfig {
261 endpoint: format!("{}://localhost:{}/", proto, address.port()),
262 tls: tls.map(|tls| tls.options),
263 ..Default::default()
264 };
265 let (sink, _) = sink
266 .build(SinkContext::default())
267 .await
268 .expect("Error building config.");
269
270 let events = make_events();
271 let events_copy = events.clone();
272 let mut output = test_util::spawn_collect_ready(
273 async move {
274 sink.run_events(events_copy).await.unwrap();
275 },
276 rx,
277 1,
278 )
279 .await;
280
281 output.sort_unstable_by_key(|event| event.as_metric().name().to_owned());
284
285 vector_lib::assert_event_data_eq!(events, output);
286 }
287
288 fn make_events() -> Vec<Event> {
289 let timestamp = || Utc::now().trunc_subsecs(3);
290 vec![
291 Metric::new(
292 "counter_1",
293 MetricKind::Absolute,
294 MetricValue::Counter { value: 42.0 },
295 )
296 .with_timestamp(Some(timestamp()))
297 .into(),
298 Metric::new(
299 "gauge_2",
300 MetricKind::Absolute,
301 MetricValue::Gauge { value: 41.0 },
302 )
303 .with_timestamp(Some(timestamp()))
304 .into(),
305 Metric::new(
306 "histogram_3",
307 MetricKind::Absolute,
308 MetricValue::AggregatedHistogram {
309 buckets: vector_lib::buckets![ 2.3 => 11, 4.2 => 85 ],
310 count: 96,
311 sum: 156.2,
312 },
313 )
314 .with_timestamp(Some(timestamp()))
315 .into(),
316 Metric::new(
317 "summary_4",
318 MetricKind::Absolute,
319 MetricValue::AggregatedSummary {
320 quantiles: vector_lib::quantiles![ 0.1 => 1.2, 0.5 => 3.6, 0.9 => 5.2 ],
321 count: 23,
322 sum: 8.6,
323 },
324 )
325 .with_timestamp(Some(timestamp()))
326 .into(),
327 ]
328 }
329
330 async fn send_request_and_assert(port: u16, request_body: Vec<u8>) {
331 let client = reqwest::Client::new();
333 let response = client
334 .post(format!("http://localhost:{}{}", port, default_path()))
335 .header("Content-Type", "application/x-protobuf")
336 .header("Content-Encoding", "snappy")
337 .body(request_body)
338 .send()
339 .await
340 .unwrap();
341
342 assert!(
344 response.status().is_success(),
345 "Expected success but got: {}",
346 response.status()
347 );
348 }
349
350 fn create_default_request_body() -> Vec<u8> {
351 use prost::Message;
352 use vector_lib::prometheus::parser::proto;
353
354 let request = proto::WriteRequest {
355 metadata: vec![proto::MetricMetadata {
356 r#type: proto::MetricType::Gauge as i32,
357 metric_family_name: "test_metric".into(),
358 help: "Gauge definition".into(),
359 unit: String::default(),
360 }],
361 timeseries: vec![proto::TimeSeries {
362 labels: vec![proto::Label {
363 name: "__name__".into(),
364 value: "test_metric".into(),
365 }],
366 samples: vec![proto::Sample {
367 value: 42.0,
368 timestamp: chrono::Utc::now().timestamp_millis(),
369 }],
370 }],
371 };
372
373 let mut buf = Vec::new();
374 request.encode(&mut buf).unwrap();
375
376 snap::raw::Encoder::new().compress_vec(&buf).unwrap()
378 }
379
380 fn create_conflicting_metadata_request_body() -> Vec<u8> {
381 use prost::Message;
382 use vector_lib::prometheus::parser::proto;
383
384 let request = proto::WriteRequest {
385 metadata: vec![
386 proto::MetricMetadata {
387 r#type: proto::MetricType::Gauge as i32,
388 metric_family_name: "test_metric".into(),
389 help: "First definition as gauge".into(),
390 unit: String::default(),
391 },
392 proto::MetricMetadata {
393 r#type: proto::MetricType::Counter as i32,
394 metric_family_name: "test_metric".into(),
395 help: "Conflicting definition as counter".into(),
396 unit: String::default(),
397 },
398 ],
399 timeseries: vec![proto::TimeSeries {
400 labels: vec![proto::Label {
401 name: "__name__".into(),
402 value: "test_metric".into(),
403 }],
404 samples: vec![proto::Sample {
405 value: 42.0,
406 timestamp: chrono::Utc::now().timestamp_millis(),
407 }],
408 }],
409 };
410
411 let mut buf = Vec::new();
412 request.encode(&mut buf).unwrap();
413
414 snap::raw::Encoder::new().compress_vec(&buf).unwrap()
416 }
417
418 async fn send_request(port: u16, request_body: Vec<u8>) -> reqwest::Response {
419 let client = reqwest::Client::new();
420 client
421 .post(format!("http://localhost:{}{}", port, default_path()))
422 .header("Content-Type", "application/x-protobuf")
423 .header("Content-Encoding", "snappy")
424 .body(request_body)
425 .send()
426 .await
427 .unwrap()
428 }
429
430 #[tokio::test]
435 async fn receives_metrics_duplicate_labels() {
436 let address = test_util::next_addr();
437 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
438
439 let source = PrometheusRemoteWriteConfig {
440 address,
441 path: default_path(),
442 auth: None,
443 tls: None,
444 metadata_conflict_strategy: Default::default(),
445 acknowledgements: SourceAcknowledgementsConfig::default(),
446 keepalive: KeepaliveConfig::default(),
447 skip_nan_values: false,
448 };
449 let source = source
450 .build(SourceContext::new_test(tx, None))
451 .await
452 .unwrap();
453 tokio::spawn(source);
454 wait_for_tcp(address).await;
455
456 let sink = RemoteWriteConfig {
457 endpoint: format!("http://localhost:{}/", address.port()),
458 ..Default::default()
459 };
460 let (sink, _) = sink
461 .build(SinkContext::default())
462 .await
463 .expect("Error building config.");
464
465 let timestamp = Utc::now().trunc_subsecs(3);
466
467 let events = vec![
468 Metric::new(
469 "gauge_2",
470 MetricKind::Absolute,
471 MetricValue::Gauge { value: 41.0 },
472 )
473 .with_timestamp(Some(timestamp))
474 .with_tags(Some(metric_tags! {
475 "code" => "200".to_string(),
476 "code" => "success".to_string(),
477 }))
478 .into(),
479 ];
480
481 let expected = vec![
482 Metric::new(
483 "gauge_2",
484 MetricKind::Absolute,
485 MetricValue::Gauge { value: 41.0 },
486 )
487 .with_timestamp(Some(timestamp))
488 .with_tags(Some(metric_tags! {
489 "code" => "success".to_string(),
490 }))
491 .into(),
492 ];
493
494 let output = test_util::spawn_collect_ready(
495 async move {
496 sink.run_events(events).await.unwrap();
497 },
498 rx,
499 1,
500 )
501 .await;
502
503 vector_lib::assert_event_data_eq!(expected, output);
504 }
505
506 #[tokio::test]
507 async fn test_skip_nan_values_enabled() {
508 let address = test_util::next_addr();
509 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
510
511 let source = PrometheusRemoteWriteConfig {
512 address,
513 path: default_path(),
514 auth: None,
515 tls: None,
516 metadata_conflict_strategy: Default::default(),
517 acknowledgements: SourceAcknowledgementsConfig::default(),
518 keepalive: KeepaliveConfig::default(),
519 skip_nan_values: true,
520 };
521 let source = source
522 .build(SourceContext::new_test(tx, None))
523 .await
524 .unwrap();
525 tokio::spawn(source);
526 wait_for_tcp(address).await;
527
528 let request_body = {
530 use prost::Message;
531 use vector_lib::prometheus::parser::proto;
532
533 let request = proto::WriteRequest {
534 metadata: vec![],
535 timeseries: vec![
536 proto::TimeSeries {
537 labels: vec![proto::Label {
538 name: "__name__".into(),
539 value: "test_metric_valid".into(),
540 }],
541 samples: vec![proto::Sample {
542 value: 42.0,
543 timestamp: chrono::Utc::now().timestamp_millis(),
544 }],
545 },
546 proto::TimeSeries {
547 labels: vec![proto::Label {
548 name: "__name__".into(),
549 value: "test_metric_nan".into(),
550 }],
551 samples: vec![proto::Sample {
552 value: f64::NAN,
553 timestamp: chrono::Utc::now().timestamp_millis(),
554 }],
555 },
556 ],
557 };
558
559 let mut buf = Vec::new();
560 request.encode(&mut buf).unwrap();
561
562 snap::raw::Encoder::new().compress_vec(&buf).unwrap()
564 };
565
566 send_request_and_assert(address.port(), request_body).await;
567
568 let output = test_util::collect_ready(rx).await;
570 assert_eq!(output.len(), 1);
571
572 let metric = output[0].as_metric();
573 assert_eq!(metric.name(), "test_metric_valid");
574 assert_eq!(metric.value(), &MetricValue::Gauge { value: 42.0 });
575 }
576
577 #[tokio::test]
578 async fn test_skip_nan_values_disabled() {
579 let address = test_util::next_addr();
580 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
581
582 let source = PrometheusRemoteWriteConfig {
583 address,
584 path: default_path(),
585 auth: None,
586 tls: None,
587 metadata_conflict_strategy: Default::default(),
588 acknowledgements: SourceAcknowledgementsConfig::default(),
589 keepalive: KeepaliveConfig::default(),
590 skip_nan_values: false,
591 };
592 let source = source
593 .build(SourceContext::new_test(tx, None))
594 .await
595 .unwrap();
596 tokio::spawn(source);
597 wait_for_tcp(address).await;
598
599 let request_body = {
601 use prost::Message;
602 use vector_lib::prometheus::parser::proto;
603
604 let request = proto::WriteRequest {
605 metadata: vec![],
606 timeseries: vec![
607 proto::TimeSeries {
608 labels: vec![proto::Label {
609 name: "__name__".into(),
610 value: "test_metric_valid".into(),
611 }],
612 samples: vec![proto::Sample {
613 value: 42.0,
614 timestamp: chrono::Utc::now().timestamp_millis(),
615 }],
616 },
617 proto::TimeSeries {
618 labels: vec![proto::Label {
619 name: "__name__".into(),
620 value: "test_metric_nan".into(),
621 }],
622 samples: vec![proto::Sample {
623 value: f64::NAN,
624 timestamp: chrono::Utc::now().timestamp_millis(),
625 }],
626 },
627 ],
628 };
629
630 let mut buf = Vec::new();
631 request.encode(&mut buf).unwrap();
632
633 snap::raw::Encoder::new().compress_vec(&buf).unwrap()
635 };
636
637 send_request_and_assert(address.port(), request_body).await;
638
639 let mut output = test_util::collect_ready(rx).await;
641 assert_eq!(output.len(), 2);
642
643 output.sort_by(|a, b| a.as_metric().name().cmp(b.as_metric().name()));
645
646 let nan_metric = output[0].as_metric();
648 assert_eq!(nan_metric.name(), "test_metric_nan");
649 match nan_metric.value() {
650 MetricValue::Gauge { value } => {
651 assert!(value.is_nan());
652 }
653 _ => panic!("Expected gauge metric"),
654 }
655
656 let valid_metric = output[1].as_metric();
658 assert_eq!(valid_metric.name(), "test_metric_valid");
659 assert_eq!(valid_metric.value(), &MetricValue::Gauge { value: 42.0 });
660 }
661
662 #[tokio::test]
663 async fn receives_metrics_on_custom_path() {
664 let address = test_util::next_addr();
665 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
666
667 let source = PrometheusRemoteWriteConfig {
668 address,
669 path: "/api/v1/write".to_string(),
670 auth: None,
671 tls: None,
672 metadata_conflict_strategy: Default::default(),
673 acknowledgements: SourceAcknowledgementsConfig::default(),
674 keepalive: KeepaliveConfig::default(),
675 skip_nan_values: false,
676 };
677 let source = source
678 .build(SourceContext::new_test(tx, None))
679 .await
680 .unwrap();
681 tokio::spawn(source);
682 wait_for_tcp(address).await;
683
684 let sink = RemoteWriteConfig {
685 endpoint: format!("http://localhost:{}/api/v1/write", address.port()),
686 ..Default::default()
687 };
688 let (sink, _) = sink
689 .build(SinkContext::default())
690 .await
691 .expect("Error building config.");
692
693 let events = make_events();
694 let events_copy = events.clone();
695 let mut output = test_util::spawn_collect_ready(
696 async move {
697 sink.run_events(events_copy).await.unwrap();
698 },
699 rx,
700 1,
701 )
702 .await;
703
704 output.sort_unstable_by_key(|event| event.as_metric().name().to_owned());
707
708 vector_lib::assert_event_data_eq!(events, output);
709 }
710
711 #[tokio::test]
712 async fn rejects_metrics_on_wrong_path() {
713 let address = test_util::next_addr();
714 let (tx, _rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
715
716 let source = PrometheusRemoteWriteConfig {
717 address,
718 path: "/api/v1/write".to_string(),
719 auth: None,
720 tls: None,
721 metadata_conflict_strategy: Default::default(),
722 acknowledgements: SourceAcknowledgementsConfig::default(),
723 keepalive: KeepaliveConfig::default(),
724 skip_nan_values: false,
725 };
726 let source = source
727 .build(SourceContext::new_test(tx, None))
728 .await
729 .unwrap();
730 tokio::spawn(source);
731 wait_for_tcp(address).await;
732
733 let client = reqwest::Client::new();
735 let response = client
736 .post(format!("http://localhost:{}/wrong/path", address.port()))
737 .header("Content-Type", "application/x-protobuf")
738 .body(vec![])
739 .send()
740 .await
741 .unwrap();
742
743 assert!(
745 response.status().is_client_error(),
746 "Expected 4xx error, got {}",
747 response.status()
748 );
749 }
750
751 #[tokio::test]
752 async fn receives_metrics_on_default_path() {
753 let address = test_util::next_addr();
754 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
755
756 let source = PrometheusRemoteWriteConfig {
757 address,
758 path: default_path(),
759 auth: None,
760 tls: None,
761 metadata_conflict_strategy: Default::default(),
762 acknowledgements: SourceAcknowledgementsConfig::default(),
763 keepalive: KeepaliveConfig::default(),
764 skip_nan_values: false,
765 };
766 let source = source
767 .build(SourceContext::new_test(tx, None))
768 .await
769 .unwrap();
770 tokio::spawn(source);
771 wait_for_tcp(address).await;
772
773 let request_body = create_default_request_body();
774 send_request_and_assert(address.port(), request_body).await;
775
776 let output = test_util::collect_ready(rx).await;
778 assert_eq!(output.len(), 1);
779
780 let metric = output[0].as_metric();
781 assert_eq!(metric.name(), "test_metric");
782 assert_eq!(metric.value(), &MetricValue::Gauge { value: 42.0 });
783 }
784
785 #[tokio::test]
786 async fn rejects_metrics_on_wrong_path_with_skip_nan_enabled() {
787 let address = test_util::next_addr();
788 let (tx, _rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
789
790 let source = PrometheusRemoteWriteConfig {
791 address,
792 path: "/api/v1/write".to_string(),
793 auth: None,
794 tls: None,
795 metadata_conflict_strategy: Default::default(),
796 acknowledgements: SourceAcknowledgementsConfig::default(),
797 keepalive: KeepaliveConfig::default(),
798 skip_nan_values: true,
799 };
800 let source = source
801 .build(SourceContext::new_test(tx, None))
802 .await
803 .unwrap();
804 tokio::spawn(source);
805 wait_for_tcp(address).await;
806
807 let client = reqwest::Client::new();
809 let response = client
810 .post(format!("http://localhost:{}/wrong/path", address.port()))
811 .header("Content-Type", "application/x-protobuf")
812 .body(vec![])
813 .send()
814 .await
815 .unwrap();
816
817 assert!(
819 response.status().is_client_error(),
820 "Expected 4xx error, got {}",
821 response.status()
822 );
823 }
824
825 #[tokio::test]
826 async fn accepts_conflicting_metadata() {
827 let address = test_util::next_addr();
828 let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
829
830 let source = PrometheusRemoteWriteConfig {
831 address,
832 path: default_path(),
833 auth: None,
834 tls: None,
835 metadata_conflict_strategy: MetadataConflictStrategy::Ignore,
836 acknowledgements: SourceAcknowledgementsConfig::default(),
837 keepalive: KeepaliveConfig::default(),
838 skip_nan_values: false,
839 };
840 let source = source
841 .build(SourceContext::new_test(tx, None))
842 .await
843 .unwrap();
844 tokio::spawn(source);
845 wait_for_tcp(address).await;
846
847 let request_body = create_conflicting_metadata_request_body();
848 let response = send_request(address.port(), request_body).await;
849
850 assert!(
852 response.status().is_success(),
853 "Expected success but got: {}",
854 response.status()
855 );
856
857 let output = test_util::collect_ready(rx).await;
859 assert_eq!(output.len(), 1);
860
861 let metric = output[0].as_metric();
862 assert_eq!(metric.name(), "test_metric");
863 assert_eq!(metric.value(), &MetricValue::Gauge { value: 42.0 });
864 }
865
866 #[tokio::test]
867 async fn rejects_conflicting_metadata() {
868 let address = test_util::next_addr();
869 let (tx, _rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
870
871 let source = PrometheusRemoteWriteConfig {
872 address,
873 path: default_path(),
874 auth: None,
875 tls: None,
876 metadata_conflict_strategy: MetadataConflictStrategy::Reject,
877 acknowledgements: SourceAcknowledgementsConfig::default(),
878 keepalive: KeepaliveConfig::default(),
879 skip_nan_values: false,
880 };
881 let source = source
882 .build(SourceContext::new_test(tx, None))
883 .await
884 .unwrap();
885 tokio::spawn(source);
886 wait_for_tcp(address).await;
887
888 let request_body = create_conflicting_metadata_request_body();
889 let response = send_request(address.port(), request_body).await;
890
891 assert_eq!(response.status(), StatusCode::BAD_REQUEST);
893 }
894}
895
896#[cfg(all(test, feature = "prometheus-integration-tests"))]
897mod integration_tests {
898 use std::net::{SocketAddr, ToSocketAddrs as _};
899
900 use tokio::time::Duration;
901
902 use super::*;
903 use crate::test_util::components::{HTTP_PUSH_SOURCE_TAGS, run_and_assert_source_compliance};
904
905 fn source_receive_address() -> SocketAddr {
906 let address = std::env::var("REMOTE_WRITE_SOURCE_RECEIVE_ADDRESS")
907 .unwrap_or_else(|_| "127.0.0.1:9102".into());
908 address
911 .to_socket_addrs()
912 .unwrap()
913 .next()
914 .unwrap_or_else(|| panic!("Socket address {address:?} did not resolve"))
915 }
916
917 #[tokio::test]
918 async fn receive_something() {
919 let config = PrometheusRemoteWriteConfig {
927 address: source_receive_address(),
928 path: default_path(),
929 auth: None,
930 tls: None,
931 metadata_conflict_strategy: Default::default(),
932 acknowledgements: SourceAcknowledgementsConfig::default(),
933 keepalive: KeepaliveConfig::default(),
934 skip_nan_values: false,
935 };
936
937 let events = run_and_assert_source_compliance(
938 config,
939 Duration::from_secs(5),
940 &HTTP_PUSH_SOURCE_TAGS,
941 )
942 .await;
943 assert!(!events.is_empty());
944 }
945}