vector/sinks/http/
service.rs1use std::collections::BTreeMap;
4use std::str::FromStr;
5
6use bytes::Bytes;
7use http::{
8 header::{CONTENT_ENCODING, CONTENT_TYPE},
9 HeaderName, HeaderValue, Method, Request,
10};
11
12use crate::{
13 http::{Auth, MaybeAuth},
14 sinks::{
15 util::{
16 http::{HttpRequest, HttpServiceRequestBuilder, OrderedHeaderName},
17 UriSerde,
18 },
19 HTTPRequestBuilderSnafu,
20 },
21};
22use snafu::ResultExt;
23
24use super::config::HttpMethod;
25use super::sink::PartitionKey;
26
27#[derive(Debug, Clone)]
28pub(super) struct HttpSinkRequestBuilder {
29 method: HttpMethod,
30 auth: Option<Auth>,
31 static_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
32 content_type: Option<String>,
33 content_encoding: Option<String>,
34}
35
36impl HttpSinkRequestBuilder {
37 pub(super) const fn new(
39 method: HttpMethod,
40 auth: Option<Auth>,
41 static_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
42 content_type: Option<String>,
43 content_encoding: Option<String>,
44 ) -> Self {
45 Self {
46 method,
47 auth,
48 static_headers,
49 content_type,
50 content_encoding,
51 }
52 }
53}
54
55impl HttpServiceRequestBuilder<PartitionKey> for HttpSinkRequestBuilder {
56 fn build(
57 &self,
58 mut request: HttpRequest<PartitionKey>,
59 ) -> Result<Request<Bytes>, crate::Error> {
60 let metadata = request.get_additional_metadata();
61 let uri_serde = UriSerde::from_str(&metadata.uri)?;
62 let uri_auth = uri_serde.auth;
63 let uri = uri_serde.uri;
64
65 let auth = self.auth.choose_one(&uri_auth)?;
66
67 let method: Method = self.method.into();
68 let mut builder = Request::builder().method(method).uri(uri);
69
70 if let Some(content_type) = &self.content_type {
71 builder = builder.header(CONTENT_TYPE, content_type);
72 }
73
74 if let Some(content_encoding) = &self.content_encoding {
75 builder = builder.header(CONTENT_ENCODING, content_encoding);
76 }
77
78 let headers = builder
79 .headers_mut()
80 .expect("Failed to access headers in http::Request builder- builder has errors.");
82
83 for (header_name, header_value) in self.static_headers.iter() {
85 headers.insert(header_name.inner(), header_value.clone());
86 }
87
88 for (name, value) in metadata.headers.iter() {
90 let header_name = HeaderName::from_bytes(name.as_bytes())
91 .map_err(|e| format!("Invalid header name '{name}': {e}"))?;
92 let header_value = HeaderValue::from_bytes(value.as_bytes())
93 .map_err(|e| format!("Invalid header value '{value}': {e}"))?;
94 headers.insert(header_name, header_value);
95 }
96
97 let mut request = builder
99 .body(request.take_payload())
100 .context(HTTPRequestBuilderSnafu)
101 .map_err(Into::<crate::Error>::into)?;
102
103 if let Some(auth) = auth {
104 auth.apply(&mut request);
105 }
106
107 Ok(request)
108 }
109}