vector/sinks/http/
service.rs

1//! Service implementation for the `http` sink.
2
3use std::{collections::BTreeMap, str::FromStr};
4
5use bytes::Bytes;
6use http::{
7    HeaderName, HeaderValue, Method, Request,
8    header::{CONTENT_ENCODING, CONTENT_TYPE},
9};
10use snafu::ResultExt;
11
12use super::{config::HttpMethod, sink::PartitionKey};
13use crate::{
14    http::{Auth, MaybeAuth},
15    sinks::{
16        HTTPRequestBuilderSnafu,
17        util::{
18            UriSerde,
19            http::{HttpRequest, HttpServiceRequestBuilder, OrderedHeaderName},
20        },
21    },
22};
23
24#[derive(Debug, Clone)]
25pub(super) struct HttpSinkRequestBuilder {
26    method: HttpMethod,
27    auth: Option<Auth>,
28    static_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
29    content_type: Option<String>,
30    content_encoding: Option<String>,
31}
32
33impl HttpSinkRequestBuilder {
34    /// Creates a new `HttpSinkRequestBuilder`
35    pub(super) const fn new(
36        method: HttpMethod,
37        auth: Option<Auth>,
38        static_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
39        content_type: Option<String>,
40        content_encoding: Option<String>,
41    ) -> Self {
42        Self {
43            method,
44            auth,
45            static_headers,
46            content_type,
47            content_encoding,
48        }
49    }
50}
51
52impl HttpServiceRequestBuilder<PartitionKey> for HttpSinkRequestBuilder {
53    fn build(
54        &self,
55        mut request: HttpRequest<PartitionKey>,
56    ) -> Result<Request<Bytes>, crate::Error> {
57        let metadata = request.get_additional_metadata();
58        let uri_serde = UriSerde::from_str(&metadata.uri)?;
59        let uri_auth = uri_serde.auth;
60        let uri = uri_serde.uri;
61
62        let auth = self.auth.choose_one(&uri_auth)?;
63
64        let method: Method = self.method.into();
65        let mut builder = Request::builder().method(method).uri(uri);
66
67        if let Some(content_type) = &self.content_type {
68            builder = builder.header(CONTENT_TYPE, content_type);
69        }
70
71        if let Some(content_encoding) = &self.content_encoding {
72            builder = builder.header(CONTENT_ENCODING, content_encoding);
73        }
74
75        let headers = builder
76            .headers_mut()
77            // The request building should not have errors at this point, and if it did it would fail in the call to `body()` also.
78            .expect("Failed to access headers in http::Request builder- builder has errors.");
79
80        // Static headers from config
81        for (header_name, header_value) in self.static_headers.iter() {
82            headers.insert(header_name.inner(), header_value.clone());
83        }
84
85        // Template headers from the partition key
86        for (name, value) in metadata.headers.iter() {
87            let header_name = HeaderName::from_bytes(name.as_bytes())
88                .map_err(|e| format!("Invalid header name '{name}': {e}"))?;
89            let header_value = HeaderValue::from_bytes(value.as_bytes())
90                .map_err(|e| format!("Invalid header value '{value}': {e}"))?;
91            headers.insert(header_name, header_value);
92        }
93
94        // The request building should not have errors at this point
95        let mut request = builder
96            .body(request.take_payload())
97            .context(HTTPRequestBuilderSnafu)
98            .map_err(Into::<crate::Error>::into)?;
99
100        if let Some(auth) = auth {
101            auth.apply(&mut request);
102        }
103
104        Ok(request)
105    }
106}