vector/sinks/http/
service.rs1use std::{collections::BTreeMap, str::FromStr};
4
5use bytes::Bytes;
6use http::{
7 HeaderName, HeaderValue, Method, Request,
8 header::{CONTENT_ENCODING, CONTENT_TYPE},
9};
10use snafu::ResultExt;
11
12use super::{config::HttpMethod, sink::PartitionKey};
13use crate::{
14 http::{Auth, MaybeAuth},
15 sinks::{
16 HTTPRequestBuilderSnafu,
17 util::{
18 UriSerde,
19 http::{HttpRequest, HttpServiceRequestBuilder, OrderedHeaderName},
20 },
21 },
22};
23
24#[derive(Debug, Clone)]
25pub(super) struct HttpSinkRequestBuilder {
26 method: HttpMethod,
27 auth: Option<Auth>,
28 static_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
29 content_type: Option<String>,
30 content_encoding: Option<String>,
31}
32
33impl HttpSinkRequestBuilder {
34 pub(super) const fn new(
36 method: HttpMethod,
37 auth: Option<Auth>,
38 static_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
39 content_type: Option<String>,
40 content_encoding: Option<String>,
41 ) -> Self {
42 Self {
43 method,
44 auth,
45 static_headers,
46 content_type,
47 content_encoding,
48 }
49 }
50}
51
52impl HttpServiceRequestBuilder<PartitionKey> for HttpSinkRequestBuilder {
53 fn build(
54 &self,
55 mut request: HttpRequest<PartitionKey>,
56 ) -> Result<Request<Bytes>, crate::Error> {
57 let metadata = request.get_additional_metadata();
58 let uri_serde = UriSerde::from_str(&metadata.uri)?;
59 let uri_auth = uri_serde.auth;
60 let uri = uri_serde.uri;
61
62 let auth = self.auth.choose_one(&uri_auth)?;
63
64 let method: Method = self.method.into();
65 let mut builder = Request::builder().method(method).uri(uri);
66
67 if let Some(content_type) = &self.content_type {
68 builder = builder.header(CONTENT_TYPE, content_type);
69 }
70
71 if let Some(content_encoding) = &self.content_encoding {
72 builder = builder.header(CONTENT_ENCODING, content_encoding);
73 }
74
75 let headers = builder
76 .headers_mut()
77 .expect("Failed to access headers in http::Request builder- builder has errors.");
79
80 for (header_name, header_value) in self.static_headers.iter() {
82 headers.insert(header_name.inner(), header_value.clone());
83 }
84
85 for (name, value) in metadata.headers.iter() {
87 let header_name = HeaderName::from_bytes(name.as_bytes())
88 .map_err(|e| format!("Invalid header name '{name}': {e}"))?;
89 let header_value = HeaderValue::from_bytes(value.as_bytes())
90 .map_err(|e| format!("Invalid header value '{value}': {e}"))?;
91 headers.insert(header_name, header_value);
92 }
93
94 let mut request = builder
96 .body(request.take_payload())
97 .context(HTTPRequestBuilderSnafu)
98 .map_err(Into::<crate::Error>::into)?;
99
100 if let Some(auth) = auth {
101 auth.apply(&mut request);
102 }
103
104 Ok(request)
105 }
106}