vector/sinks/prometheus/remote_write/
service.rs1use std::task::{Context, Poll};
2
3#[cfg(feature = "aws-core")]
4use aws_credential_types::provider::SharedCredentialsProvider;
5#[cfg(feature = "aws-core")]
6use aws_types::region::Region;
7use bytes::Bytes;
8use http::Uri;
9
10use super::request_builder::RemoteWriteRequest;
11use crate::{
12 http::HttpClient,
13 internal_events::EndpointBytesSent,
14 sinks::{
15 prelude::*,
16 util::{auth::Auth, http::HttpResponse},
17 },
18};
19
20mod headers {
22 pub(super) const X_PROMETHEUS_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version";
23 pub(super) const CONTENT_ENCODING: &str = "Content-Encoding";
24 pub(super) const CONTENT_TYPE: &str = "Content-Type";
25 pub(super) const X_SCOPE_ORGID: &str = "X-Scope-OrgID";
26
27 pub(super) const VERSION: &str = "0.1.0";
28 pub(super) const APPLICATION_X_PROTOBUF: &str = "application/x-protobuf";
29}
30
31#[derive(Clone)]
32pub(super) struct RemoteWriteService {
33 pub(super) endpoint: Uri,
34 pub(super) auth: Option<Auth>,
35 pub(super) client: HttpClient,
36 pub(super) compression: super::Compression,
37}
38
39impl Service<RemoteWriteRequest> for RemoteWriteService {
40 type Response = HttpResponse;
41 type Error = crate::Error;
42 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
43
44 fn poll_ready(&mut self, _task: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
45 Poll::Ready(Ok(()))
46 }
47
48 fn call(&mut self, mut request: RemoteWriteRequest) -> Self::Future {
50 let client = self.client.clone();
51 let endpoint = self.endpoint.clone();
52 let auth = self.auth.clone();
53 let compression = self.compression;
54
55 Box::pin(async move {
56 let metadata = std::mem::take(request.metadata_mut());
57 let json_size = metadata.into_events_estimated_json_encoded_byte_size();
58 let raw_byte_size = request.request.len();
59
60 let http_request = build_request(
61 http::Method::POST,
62 &endpoint,
63 compression,
64 request.request,
65 request.tenant_id.as_ref(),
66 auth,
67 )
68 .await?;
69
70 let response = client.send(http_request).await?;
71 let (parts, body) = response.into_parts();
72 let body = hyper::body::to_bytes(body).await?;
73 let http_response = hyper::Response::from_parts(parts, body);
74
75 if http_response.status().is_success() {
76 emit!(EndpointBytesSent {
78 byte_size: raw_byte_size,
79 protocol: "http",
80 endpoint: &endpoint.to_string(),
81 });
82 }
83
84 Ok(HttpResponse {
85 events_byte_size: json_size,
86 http_response,
87 raw_byte_size,
88 })
89 })
90 }
91}
92
93#[cfg(feature = "aws-core")]
94async fn sign_request(
95 request: &mut http::Request<Bytes>,
96 credentials_provider: &SharedCredentialsProvider,
97 region: Option<&Region>,
98) -> crate::Result<()> {
99 crate::aws::sign_request("aps", request, credentials_provider, region, false).await
100}
101
102pub(super) async fn build_request(
103 method: http::Method,
104 endpoint: &Uri,
105 compression: Compression,
106 body: Bytes,
107 tenant_id: Option<&String>,
108 auth: Option<Auth>,
109) -> crate::Result<http::Request<hyper::Body>> {
110 let mut builder = http::Request::builder()
111 .method(method)
112 .uri(endpoint)
113 .header(headers::X_PROMETHEUS_REMOTE_WRITE_VERSION, headers::VERSION)
114 .header(headers::CONTENT_TYPE, headers::APPLICATION_X_PROTOBUF);
115
116 if let Some(content_encoding) = compression.content_encoding() {
117 builder = builder.header(headers::CONTENT_ENCODING, content_encoding);
118 }
119
120 if let Some(tenant_id) = tenant_id {
121 builder = builder.header(headers::X_SCOPE_ORGID, tenant_id);
122 }
123
124 let mut request = builder.body(body)?;
125
126 if let Some(auth) = auth {
127 match auth {
128 Auth::Basic(http_auth) => http_auth.apply(&mut request),
129 #[cfg(feature = "aws-core")]
130 Auth::Aws {
131 credentials_provider: provider,
132 region,
133 } => sign_request(&mut request, &provider, Some(®ion)).await?,
134 }
135 }
136
137 Ok(request.map(hyper::Body::from))
138}