vector/sinks/prometheus/remote_write/
service.rs

1use std::{
2    collections::BTreeMap,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7#[cfg(feature = "aws-core")]
8use aws_credential_types::provider::SharedCredentialsProvider;
9#[cfg(feature = "aws-core")]
10use aws_types::region::Region;
11use bytes::Bytes;
12use http::{HeaderValue, Uri};
13
14use super::request_builder::RemoteWriteRequest;
15use crate::{
16    http::HttpClient,
17    internal_events::EndpointBytesSent,
18    sinks::{
19        prelude::*,
20        util::{
21            auth::Auth,
22            http::{HttpResponse, OrderedHeaderName},
23        },
24    },
25};
26
27/// Constants for header strings.
28mod headers {
29    pub(super) const X_PROMETHEUS_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version";
30    pub(super) const CONTENT_ENCODING: &str = "Content-Encoding";
31    pub(super) const CONTENT_TYPE: &str = "Content-Type";
32    pub(super) const X_SCOPE_ORGID: &str = "X-Scope-OrgID";
33
34    pub(super) const VERSION: &str = "0.1.0";
35    pub(super) const APPLICATION_X_PROTOBUF: &str = "application/x-protobuf";
36}
37
38#[derive(Clone)]
39pub(super) struct RemoteWriteService {
40    pub(super) endpoint: Uri,
41    pub(super) auth: Option<Auth>,
42    pub(super) client: HttpClient,
43    pub(super) compression: super::Compression,
44    pub(super) headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
45}
46
47impl Service<RemoteWriteRequest> for RemoteWriteService {
48    type Response = HttpResponse;
49    type Error = crate::Error;
50    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
51
52    fn poll_ready(&mut self, _task: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53        Poll::Ready(Ok(()))
54    }
55
56    // Emission of internal events for errors and dropped events is handled upstream by the caller.
57    fn call(&mut self, mut request: RemoteWriteRequest) -> Self::Future {
58        let client = self.client.clone();
59        let endpoint = self.endpoint.clone();
60        let auth = self.auth.clone();
61        let compression = self.compression;
62        let headers = Arc::clone(&self.headers);
63
64        Box::pin(async move {
65            let metadata = std::mem::take(request.metadata_mut());
66            let json_size = metadata.into_events_estimated_json_encoded_byte_size();
67            let raw_byte_size = request.request.len();
68
69            let http_request = build_request(
70                http::Method::POST,
71                &endpoint,
72                compression,
73                request.request,
74                request.tenant_id.as_ref(),
75                auth,
76                headers,
77            )
78            .await?;
79
80            let response = client.send(http_request).await?;
81            let (parts, body) = response.into_parts();
82            let body = http_body::Body::collect(body).await?.to_bytes();
83            let http_response = hyper::Response::from_parts(parts, body);
84
85            if http_response.status().is_success() {
86                // We can't rely on the framework to emit this because we need to specify the additional `endpoint` tag.
87                emit!(EndpointBytesSent {
88                    byte_size: raw_byte_size,
89                    protocol: "http",
90                    endpoint: &endpoint.to_string(),
91                });
92            }
93
94            Ok(HttpResponse {
95                events_byte_size: json_size,
96                http_response,
97                raw_byte_size,
98            })
99        })
100    }
101}
102
103#[cfg(feature = "aws-core")]
104async fn sign_request(
105    request: &mut http::Request<Bytes>,
106    credentials_provider: &SharedCredentialsProvider,
107    region: Option<&Region>,
108) -> crate::Result<()> {
109    crate::aws::sign_request("aps", request, credentials_provider, region, false).await
110}
111
112pub(super) async fn build_request(
113    method: http::Method,
114    endpoint: &Uri,
115    compression: Compression,
116    body: Bytes,
117    tenant_id: Option<&String>,
118    auth: Option<Auth>,
119    custom_headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
120) -> crate::Result<http::Request<hyper::Body>> {
121    let mut builder = http::Request::builder()
122        .method(method)
123        .uri(endpoint)
124        .header(headers::X_PROMETHEUS_REMOTE_WRITE_VERSION, headers::VERSION)
125        .header(headers::CONTENT_TYPE, headers::APPLICATION_X_PROTOBUF);
126
127    if let Some(content_encoding) = compression.content_encoding() {
128        builder = builder.header(headers::CONTENT_ENCODING, content_encoding);
129    }
130
131    if let Some(tenant_id) = tenant_id {
132        builder = builder.header(headers::X_SCOPE_ORGID, tenant_id);
133    }
134
135    // Apply custom headers
136    for (name, value) in custom_headers.iter() {
137        builder = builder.header(name.inner().clone(), value.clone());
138    }
139
140    let mut request = builder.body(body)?;
141
142    if let Some(auth) = auth {
143        match auth {
144            Auth::Basic(http_auth) => http_auth.apply(&mut request),
145            #[cfg(feature = "aws-core")]
146            Auth::Aws {
147                credentials_provider: provider,
148                region,
149            } => sign_request(&mut request, &provider, Some(&region)).await?,
150        }
151    }
152
153    Ok(request.map(hyper::Body::from))
154}