vector/sinks/http/
service.rs

1//! Service implementation for the `http` sink.
2
3use std::collections::BTreeMap;
4use std::str::FromStr;
5
6use bytes::Bytes;
7use http::{
8    header::{CONTENT_ENCODING, CONTENT_TYPE},
9    HeaderName, HeaderValue, Method, Request,
10};
11
12use crate::{
13    http::{Auth, MaybeAuth},
14    sinks::{
15        util::{
16            http::{HttpRequest, HttpServiceRequestBuilder, OrderedHeaderName},
17            UriSerde,
18        },
19        HTTPRequestBuilderSnafu,
20    },
21};
22use snafu::ResultExt;
23
24use super::config::HttpMethod;
25use super::sink::PartitionKey;
26
27#[derive(Debug, Clone)]
28pub(super) struct HttpSinkRequestBuilder {
29    method: HttpMethod,
30    auth: Option<Auth>,
31    static_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
32    content_type: Option<String>,
33    content_encoding: Option<String>,
34}
35
36impl HttpSinkRequestBuilder {
37    /// Creates a new `HttpSinkRequestBuilder`
38    pub(super) const fn new(
39        method: HttpMethod,
40        auth: Option<Auth>,
41        static_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
42        content_type: Option<String>,
43        content_encoding: Option<String>,
44    ) -> Self {
45        Self {
46            method,
47            auth,
48            static_headers,
49            content_type,
50            content_encoding,
51        }
52    }
53}
54
55impl HttpServiceRequestBuilder<PartitionKey> for HttpSinkRequestBuilder {
56    fn build(
57        &self,
58        mut request: HttpRequest<PartitionKey>,
59    ) -> Result<Request<Bytes>, crate::Error> {
60        let metadata = request.get_additional_metadata();
61        let uri_serde = UriSerde::from_str(&metadata.uri)?;
62        let uri_auth = uri_serde.auth;
63        let uri = uri_serde.uri;
64
65        let auth = self.auth.choose_one(&uri_auth)?;
66
67        let method: Method = self.method.into();
68        let mut builder = Request::builder().method(method).uri(uri);
69
70        if let Some(content_type) = &self.content_type {
71            builder = builder.header(CONTENT_TYPE, content_type);
72        }
73
74        if let Some(content_encoding) = &self.content_encoding {
75            builder = builder.header(CONTENT_ENCODING, content_encoding);
76        }
77
78        let headers = builder
79            .headers_mut()
80            // The request building should not have errors at this point, and if it did it would fail in the call to `body()` also.
81            .expect("Failed to access headers in http::Request builder- builder has errors.");
82
83        // Static headers from config
84        for (header_name, header_value) in self.static_headers.iter() {
85            headers.insert(header_name.inner(), header_value.clone());
86        }
87
88        // Template headers from the partition key
89        for (name, value) in metadata.headers.iter() {
90            let header_name = HeaderName::from_bytes(name.as_bytes())
91                .map_err(|e| format!("Invalid header name '{name}': {e}"))?;
92            let header_value = HeaderValue::from_bytes(value.as_bytes())
93                .map_err(|e| format!("Invalid header value '{value}': {e}"))?;
94            headers.insert(header_name, header_value);
95        }
96
97        // The request building should not have errors at this point
98        let mut request = builder
99            .body(request.take_payload())
100            .context(HTTPRequestBuilderSnafu)
101            .map_err(Into::<crate::Error>::into)?;
102
103        if let Some(auth) = auth {
104            auth.apply(&mut request);
105        }
106
107        Ok(request)
108    }
109}