vector/sinks/prometheus/remote_write/
service.rs

1use std::task::{Context, Poll};
2
3#[cfg(feature = "aws-core")]
4use aws_credential_types::provider::SharedCredentialsProvider;
5#[cfg(feature = "aws-core")]
6use aws_types::region::Region;
7use bytes::Bytes;
8use http::Uri;
9
10use super::request_builder::RemoteWriteRequest;
11use crate::{
12    http::HttpClient,
13    internal_events::EndpointBytesSent,
14    sinks::{
15        prelude::*,
16        util::{auth::Auth, http::HttpResponse},
17    },
18};
19
20/// Constants for header strings.
21mod headers {
22    pub(super) const X_PROMETHEUS_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version";
23    pub(super) const CONTENT_ENCODING: &str = "Content-Encoding";
24    pub(super) const CONTENT_TYPE: &str = "Content-Type";
25    pub(super) const X_SCOPE_ORGID: &str = "X-Scope-OrgID";
26
27    pub(super) const VERSION: &str = "0.1.0";
28    pub(super) const APPLICATION_X_PROTOBUF: &str = "application/x-protobuf";
29}
30
31#[derive(Clone)]
32pub(super) struct RemoteWriteService {
33    pub(super) endpoint: Uri,
34    pub(super) auth: Option<Auth>,
35    pub(super) client: HttpClient,
36    pub(super) compression: super::Compression,
37}
38
39impl Service<RemoteWriteRequest> for RemoteWriteService {
40    type Response = HttpResponse;
41    type Error = crate::Error;
42    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
43
44    fn poll_ready(&mut self, _task: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
45        Poll::Ready(Ok(()))
46    }
47
48    // Emission of internal events for errors and dropped events is handled upstream by the caller.
49    fn call(&mut self, mut request: RemoteWriteRequest) -> Self::Future {
50        let client = self.client.clone();
51        let endpoint = self.endpoint.clone();
52        let auth = self.auth.clone();
53        let compression = self.compression;
54
55        Box::pin(async move {
56            let metadata = std::mem::take(request.metadata_mut());
57            let json_size = metadata.into_events_estimated_json_encoded_byte_size();
58            let raw_byte_size = request.request.len();
59
60            let http_request = build_request(
61                http::Method::POST,
62                &endpoint,
63                compression,
64                request.request,
65                request.tenant_id.as_ref(),
66                auth,
67            )
68            .await?;
69
70            let response = client.send(http_request).await?;
71            let (parts, body) = response.into_parts();
72            let body = hyper::body::to_bytes(body).await?;
73            let http_response = hyper::Response::from_parts(parts, body);
74
75            if http_response.status().is_success() {
76                // We can't rely on the framework to emit this because we need to specify the additional `endpoint` tag.
77                emit!(EndpointBytesSent {
78                    byte_size: raw_byte_size,
79                    protocol: "http",
80                    endpoint: &endpoint.to_string(),
81                });
82            }
83
84            Ok(HttpResponse {
85                events_byte_size: json_size,
86                http_response,
87                raw_byte_size,
88            })
89        })
90    }
91}
92
93#[cfg(feature = "aws-core")]
94async fn sign_request(
95    request: &mut http::Request<Bytes>,
96    credentials_provider: &SharedCredentialsProvider,
97    region: Option<&Region>,
98) -> crate::Result<()> {
99    crate::aws::sign_request("aps", request, credentials_provider, region, false).await
100}
101
102pub(super) async fn build_request(
103    method: http::Method,
104    endpoint: &Uri,
105    compression: Compression,
106    body: Bytes,
107    tenant_id: Option<&String>,
108    auth: Option<Auth>,
109) -> crate::Result<http::Request<hyper::Body>> {
110    let mut builder = http::Request::builder()
111        .method(method)
112        .uri(endpoint)
113        .header(headers::X_PROMETHEUS_REMOTE_WRITE_VERSION, headers::VERSION)
114        .header(headers::CONTENT_TYPE, headers::APPLICATION_X_PROTOBUF);
115
116    if let Some(content_encoding) = compression.content_encoding() {
117        builder = builder.header(headers::CONTENT_ENCODING, content_encoding);
118    }
119
120    if let Some(tenant_id) = tenant_id {
121        builder = builder.header(headers::X_SCOPE_ORGID, tenant_id);
122    }
123
124    let mut request = builder.body(body)?;
125
126    if let Some(auth) = auth {
127        match auth {
128            Auth::Basic(http_auth) => http_auth.apply(&mut request),
129            #[cfg(feature = "aws-core")]
130            Auth::Aws {
131                credentials_provider: provider,
132                region,
133            } => sign_request(&mut request, &provider, Some(&region)).await?,
134        }
135    }
136
137    Ok(request.map(hyper::Body::from))
138}