vector/sources/prometheus/
remote_write.rs

1use std::{collections::HashMap, net::SocketAddr};
2
3use bytes::Bytes;
4use prost::Message;
5use vector_lib::config::LogNamespace;
6use vector_lib::configurable::configurable_component;
7use vector_lib::prometheus::parser::proto;
8use warp::http::{HeaderMap, StatusCode};
9
10use super::parser;
11use crate::{
12    common::http::{server_auth::HttpServerAuthConfig, ErrorMessage},
13    config::{
14        GenerateConfig, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
15    },
16    event::Event,
17    http::KeepaliveConfig,
18    internal_events::PrometheusRemoteWriteParseError,
19    serde::bool_or_struct,
20    sources::{
21        self,
22        util::{decode, http::HttpMethod, HttpSource},
23    },
24    tls::TlsEnableableConfig,
25};
26
27/// Configuration for the `prometheus_remote_write` source.
28#[configurable_component(source(
29    "prometheus_remote_write",
30    "Receive metric via the Prometheus Remote Write protocol."
31))]
32#[derive(Clone, Debug)]
33pub struct PrometheusRemoteWriteConfig {
34    /// The socket address to accept connections on.
35    ///
36    /// The address _must_ include a port.
37    #[configurable(metadata(docs::examples = "0.0.0.0:9090"))]
38    address: SocketAddr,
39
40    #[configurable(derived)]
41    tls: Option<TlsEnableableConfig>,
42
43    #[configurable(derived)]
44    #[configurable(metadata(docs::advanced))]
45    auth: Option<HttpServerAuthConfig>,
46
47    #[configurable(derived)]
48    #[serde(default, deserialize_with = "bool_or_struct")]
49    acknowledgements: SourceAcknowledgementsConfig,
50
51    #[configurable(derived)]
52    #[serde(default)]
53    keepalive: KeepaliveConfig,
54}
55
56impl PrometheusRemoteWriteConfig {
57    #[cfg(test)]
58    pub fn from_address(address: SocketAddr) -> Self {
59        Self {
60            address,
61            tls: None,
62            auth: None,
63            acknowledgements: false.into(),
64            keepalive: KeepaliveConfig::default(),
65        }
66    }
67}
68
69impl GenerateConfig for PrometheusRemoteWriteConfig {
70    fn generate_config() -> toml::Value {
71        toml::Value::try_from(Self {
72            address: "127.0.0.1:9090".parse().unwrap(),
73            tls: None,
74            auth: None,
75            acknowledgements: SourceAcknowledgementsConfig::default(),
76            keepalive: KeepaliveConfig::default(),
77        })
78        .unwrap()
79    }
80}
81
82#[async_trait::async_trait]
83#[typetag::serde(name = "prometheus_remote_write")]
84impl SourceConfig for PrometheusRemoteWriteConfig {
85    async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
86        let source = RemoteWriteSource;
87        source.run(
88            self.address,
89            "",
90            HttpMethod::Post,
91            StatusCode::OK,
92            true,
93            self.tls.as_ref(),
94            self.auth.as_ref(),
95            cx,
96            self.acknowledgements,
97            self.keepalive.clone(),
98        )
99    }
100
101    fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
102        vec![SourceOutput::new_metrics()]
103    }
104
105    fn can_acknowledge(&self) -> bool {
106        true
107    }
108}
109
110#[derive(Clone)]
111struct RemoteWriteSource;
112
113impl RemoteWriteSource {
114    fn decode_body(&self, body: Bytes) -> Result<Vec<Event>, ErrorMessage> {
115        let request = proto::WriteRequest::decode(body).map_err(|error| {
116            emit!(PrometheusRemoteWriteParseError {
117                error: error.clone()
118            });
119            ErrorMessage::new(
120                StatusCode::BAD_REQUEST,
121                format!("Could not decode write request: {error}"),
122            )
123        })?;
124        parser::parse_request(request).map_err(|error| {
125            ErrorMessage::new(
126                StatusCode::BAD_REQUEST,
127                format!("Could not decode write request: {error}"),
128            )
129        })
130    }
131}
132
133impl HttpSource for RemoteWriteSource {
134    fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
135        // Default to snappy decoding the request body.
136        decode(encoding_header.or(Some("snappy")), body)
137    }
138
139    fn build_events(
140        &self,
141        body: Bytes,
142        _header_map: &HeaderMap,
143        _query_parameters: &HashMap<String, String>,
144        _full_path: &str,
145    ) -> Result<Vec<Event>, ErrorMessage> {
146        let events = self.decode_body(body)?;
147        Ok(events)
148    }
149}
150
151#[cfg(test)]
152mod test {
153    use chrono::{SubsecRound as _, Utc};
154    use vector_lib::{
155        event::{EventStatus, Metric, MetricKind, MetricValue},
156        metric_tags,
157    };
158
159    use super::*;
160    use crate::{
161        config::{SinkConfig, SinkContext},
162        sinks::prometheus::remote_write::RemoteWriteConfig,
163        test_util::{self, wait_for_tcp},
164        tls::MaybeTlsSettings,
165        SourceSender,
166    };
167
168    #[test]
169    fn generate_config() {
170        crate::test_util::test_generate_config::<PrometheusRemoteWriteConfig>();
171    }
172
173    #[tokio::test]
174    async fn receives_metrics_over_http() {
175        receives_metrics(None).await;
176    }
177
178    #[tokio::test]
179    async fn receives_metrics_over_https() {
180        receives_metrics(Some(TlsEnableableConfig::test_config())).await;
181    }
182
183    async fn receives_metrics(tls: Option<TlsEnableableConfig>) {
184        let address = test_util::next_addr();
185        let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
186
187        let proto = MaybeTlsSettings::from_config(tls.as_ref(), true)
188            .unwrap()
189            .http_protocol_name();
190        let source = PrometheusRemoteWriteConfig {
191            address,
192            auth: None,
193            tls: tls.clone(),
194            acknowledgements: SourceAcknowledgementsConfig::default(),
195            keepalive: KeepaliveConfig::default(),
196        };
197        let source = source
198            .build(SourceContext::new_test(tx, None))
199            .await
200            .unwrap();
201        tokio::spawn(source);
202        wait_for_tcp(address).await;
203
204        let sink = RemoteWriteConfig {
205            endpoint: format!("{}://localhost:{}/", proto, address.port()),
206            tls: tls.map(|tls| tls.options),
207            ..Default::default()
208        };
209        let (sink, _) = sink
210            .build(SinkContext::default())
211            .await
212            .expect("Error building config.");
213
214        let events = make_events();
215        let events_copy = events.clone();
216        let mut output = test_util::spawn_collect_ready(
217            async move {
218                sink.run_events(events_copy).await.unwrap();
219            },
220            rx,
221            1,
222        )
223        .await;
224
225        // The MetricBuffer used by the sink may reorder the metrics, so
226        // put them back into order before comparing.
227        output.sort_unstable_by_key(|event| event.as_metric().name().to_owned());
228
229        vector_lib::assert_event_data_eq!(events, output);
230    }
231
232    fn make_events() -> Vec<Event> {
233        let timestamp = || Utc::now().trunc_subsecs(3);
234        vec![
235            Metric::new(
236                "counter_1",
237                MetricKind::Absolute,
238                MetricValue::Counter { value: 42.0 },
239            )
240            .with_timestamp(Some(timestamp()))
241            .into(),
242            Metric::new(
243                "gauge_2",
244                MetricKind::Absolute,
245                MetricValue::Gauge { value: 41.0 },
246            )
247            .with_timestamp(Some(timestamp()))
248            .into(),
249            Metric::new(
250                "histogram_3",
251                MetricKind::Absolute,
252                MetricValue::AggregatedHistogram {
253                    buckets: vector_lib::buckets![ 2.3 => 11, 4.2 => 85 ],
254                    count: 96,
255                    sum: 156.2,
256                },
257            )
258            .with_timestamp(Some(timestamp()))
259            .into(),
260            Metric::new(
261                "summary_4",
262                MetricKind::Absolute,
263                MetricValue::AggregatedSummary {
264                    quantiles: vector_lib::quantiles![ 0.1 => 1.2, 0.5 => 3.6, 0.9 => 5.2 ],
265                    count: 23,
266                    sum: 8.6,
267                },
268            )
269            .with_timestamp(Some(timestamp()))
270            .into(),
271        ]
272    }
273
274    /// According to the [spec](https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md?plain=1#L115)
275    /// > Label names MUST be unique within a LabelSet.
276    /// Prometheus itself will reject the metric with an error. Largely to remain backward compatible with older versions of Vector,
277    /// we accept the metric, but take the last label in the list.
278    #[tokio::test]
279    async fn receives_metrics_duplicate_labels() {
280        let address = test_util::next_addr();
281        let (tx, rx) = SourceSender::new_test_finalize(EventStatus::Delivered);
282
283        let source = PrometheusRemoteWriteConfig {
284            address,
285            auth: None,
286            tls: None,
287            acknowledgements: SourceAcknowledgementsConfig::default(),
288            keepalive: KeepaliveConfig::default(),
289        };
290        let source = source
291            .build(SourceContext::new_test(tx, None))
292            .await
293            .unwrap();
294        tokio::spawn(source);
295        wait_for_tcp(address).await;
296
297        let sink = RemoteWriteConfig {
298            endpoint: format!("http://localhost:{}/", address.port()),
299            ..Default::default()
300        };
301        let (sink, _) = sink
302            .build(SinkContext::default())
303            .await
304            .expect("Error building config.");
305
306        let timestamp = Utc::now().trunc_subsecs(3);
307
308        let events = vec![Metric::new(
309            "gauge_2",
310            MetricKind::Absolute,
311            MetricValue::Gauge { value: 41.0 },
312        )
313        .with_timestamp(Some(timestamp))
314        .with_tags(Some(metric_tags! {
315            "code" => "200".to_string(),
316            "code" => "success".to_string(),
317        }))
318        .into()];
319
320        let expected = vec![Metric::new(
321            "gauge_2",
322            MetricKind::Absolute,
323            MetricValue::Gauge { value: 41.0 },
324        )
325        .with_timestamp(Some(timestamp))
326        .with_tags(Some(metric_tags! {
327            "code" => "success".to_string(),
328        }))
329        .into()];
330
331        let output = test_util::spawn_collect_ready(
332            async move {
333                sink.run_events(events).await.unwrap();
334            },
335            rx,
336            1,
337        )
338        .await;
339
340        vector_lib::assert_event_data_eq!(expected, output);
341    }
342}
343
344#[cfg(all(test, feature = "prometheus-integration-tests"))]
345mod integration_tests {
346    use std::net::{SocketAddr, ToSocketAddrs as _};
347    use tokio::time::Duration;
348
349    use super::*;
350    use crate::test_util::components::{run_and_assert_source_compliance, HTTP_PUSH_SOURCE_TAGS};
351
352    fn source_receive_address() -> SocketAddr {
353        let address = std::env::var("REMOTE_WRITE_SOURCE_RECEIVE_ADDRESS")
354            .unwrap_or_else(|_| "127.0.0.1:9102".into());
355        // TODO: This logic should maybe be moved up into the source, and possibly into other
356        // sources, wrapped in a new socket address type that does the lookup during config parsing.
357        address
358            .to_socket_addrs()
359            .unwrap()
360            .next()
361            .unwrap_or_else(|| panic!("Socket address {address:?} did not resolve"))
362    }
363
364    #[tokio::test]
365    async fn receive_something() {
366        // TODO: This test depends on the single instance of Prometheus that we spin up for
367        // integration tests both scraping an endpoint and then also remote writing that stuff to
368        // this remote write source.  This makes sense from a "test the actual behavior" standpoint
369        // but it feels a little fragile.
370        //
371        // It could be nice to split up the Prometheus integration tests in the future, or
372        // maybe there's a way to do a one-shot remote write from Prometheus? Not sure.
373        let config = PrometheusRemoteWriteConfig {
374            address: source_receive_address(),
375            auth: None,
376            tls: None,
377            acknowledgements: SourceAcknowledgementsConfig::default(),
378            keepalive: KeepaliveConfig::default(),
379        };
380
381        let events = run_and_assert_source_compliance(
382            config,
383            Duration::from_secs(5),
384            &HTTP_PUSH_SOURCE_TAGS,
385        )
386        .await;
387        assert!(!events.is_empty());
388    }
389}