vector/sinks/prometheus/remote_write/
service.rs1use std::task::{Context, Poll};
2
3#[cfg(feature = "aws-core")]
4use aws_credential_types::provider::SharedCredentialsProvider;
5#[cfg(feature = "aws-core")]
6use aws_types::region::Region;
7
8use bytes::Bytes;
9use http::Uri;
10
11use super::request_builder::RemoteWriteRequest;
12use crate::{
13 http::HttpClient,
14 internal_events::EndpointBytesSent,
15 sinks::{
16 prelude::*,
17 util::{auth::Auth, http::HttpResponse},
18 },
19};
20
21mod headers {
23 pub(super) const X_PROMETHEUS_REMOTE_WRITE_VERSION: &str = "X-Prometheus-Remote-Write-Version";
24 pub(super) const CONTENT_ENCODING: &str = "Content-Encoding";
25 pub(super) const CONTENT_TYPE: &str = "Content-Type";
26 pub(super) const X_SCOPE_ORGID: &str = "X-Scope-OrgID";
27
28 pub(super) const VERSION: &str = "0.1.0";
29 pub(super) const APPLICATION_X_PROTOBUF: &str = "application/x-protobuf";
30}
31
32#[derive(Clone)]
33pub(super) struct RemoteWriteService {
34 pub(super) endpoint: Uri,
35 pub(super) auth: Option<Auth>,
36 pub(super) client: HttpClient,
37 pub(super) compression: super::Compression,
38}
39
40impl Service<RemoteWriteRequest> for RemoteWriteService {
41 type Response = HttpResponse;
42 type Error = crate::Error;
43 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
44
45 fn poll_ready(&mut self, _task: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
46 Poll::Ready(Ok(()))
47 }
48
49 fn call(&mut self, mut request: RemoteWriteRequest) -> Self::Future {
51 let client = self.client.clone();
52 let endpoint = self.endpoint.clone();
53 let auth = self.auth.clone();
54 let compression = self.compression;
55
56 Box::pin(async move {
57 let metadata = std::mem::take(request.metadata_mut());
58 let json_size = metadata.into_events_estimated_json_encoded_byte_size();
59 let raw_byte_size = request.request.len();
60
61 let http_request = build_request(
62 http::Method::POST,
63 &endpoint,
64 compression,
65 request.request,
66 request.tenant_id.as_ref(),
67 auth,
68 )
69 .await?;
70
71 let response = client.send(http_request).await?;
72 let (parts, body) = response.into_parts();
73 let body = hyper::body::to_bytes(body).await?;
74 let http_response = hyper::Response::from_parts(parts, body);
75
76 if http_response.status().is_success() {
77 emit!(EndpointBytesSent {
79 byte_size: raw_byte_size,
80 protocol: "http",
81 endpoint: &endpoint.to_string(),
82 });
83 }
84
85 Ok(HttpResponse {
86 events_byte_size: json_size,
87 http_response,
88 raw_byte_size,
89 })
90 })
91 }
92}
93
94#[cfg(feature = "aws-core")]
95async fn sign_request(
96 request: &mut http::Request<Bytes>,
97 credentials_provider: &SharedCredentialsProvider,
98 region: Option<&Region>,
99) -> crate::Result<()> {
100 crate::aws::sign_request("aps", request, credentials_provider, region, false).await
101}
102
103pub(super) async fn build_request(
104 method: http::Method,
105 endpoint: &Uri,
106 compression: Compression,
107 body: Bytes,
108 tenant_id: Option<&String>,
109 auth: Option<Auth>,
110) -> crate::Result<http::Request<hyper::Body>> {
111 let mut builder = http::Request::builder()
112 .method(method)
113 .uri(endpoint)
114 .header(headers::X_PROMETHEUS_REMOTE_WRITE_VERSION, headers::VERSION)
115 .header(headers::CONTENT_TYPE, headers::APPLICATION_X_PROTOBUF);
116
117 if let Some(content_encoding) = compression.content_encoding() {
118 builder = builder.header(headers::CONTENT_ENCODING, content_encoding);
119 }
120
121 if let Some(tenant_id) = tenant_id {
122 builder = builder.header(headers::X_SCOPE_ORGID, tenant_id);
123 }
124
125 let mut request = builder.body(body)?;
126
127 if let Some(auth) = auth {
128 match auth {
129 Auth::Basic(http_auth) => http_auth.apply(&mut request),
130 #[cfg(feature = "aws-core")]
131 Auth::Aws {
132 credentials_provider: provider,
133 region,
134 } => sign_request(&mut request, &provider, Some(®ion)).await?,
135 }
136 }
137
138 Ok(request.map(hyper::Body::from))
139}