vector/sinks/http/
config.rs

1//! Configuration for the `http` sink.
2
3use std::{collections::BTreeMap, path::PathBuf};
4
5#[cfg(feature = "aws-core")]
6use aws_config::meta::region::ProvideRegion;
7#[cfg(feature = "aws-core")]
8use aws_types::region::Region;
9use http::{HeaderName, HeaderValue, Method, Request, StatusCode, header::AUTHORIZATION};
10use hyper::Body;
11use vector_lib::codecs::{
12    CharacterDelimitedEncoder,
13    encoding::{Framer, Serializer},
14};
15#[cfg(feature = "aws-core")]
16use vector_lib::config::proxy::ProxyConfig;
17
18use super::{
19    encoder::HttpEncoder, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder,
20    sink::HttpSink,
21};
22#[cfg(feature = "aws-core")]
23use crate::aws::AwsAuthentication;
24#[cfg(feature = "aws-core")]
25use crate::sinks::util::http::SigV4Config;
26use crate::{
27    codecs::{EncodingConfigWithFraming, SinkType},
28    http::{Auth, HttpClient, MaybeAuth},
29    sinks::{
30        prelude::*,
31        util::{
32            RealtimeSizeBasedDefaultBatchSettings, UriSerde,
33            http::{HttpService, OrderedHeaderName, RequestConfig, http_response_retry_logic},
34        },
35    },
36};
37
38const CONTENT_TYPE_TEXT: &str = "text/plain";
39const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson";
40const CONTENT_TYPE_JSON: &str = "application/json";
41
42/// Configuration for the `http` sink.
43#[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))]
44#[derive(Clone, Debug)]
45#[serde(deny_unknown_fields)]
46pub struct HttpSinkConfig {
47    /// The full URI to make HTTP requests to.
48    ///
49    /// This should include the protocol and host, but can also include the port, path, and any other valid part of a URI.
50    #[configurable(metadata(docs::examples = "https://10.22.212.22:9000/endpoint"))]
51    pub uri: Template,
52
53    /// The HTTP method to use when making the request.
54    #[serde(default)]
55    pub method: HttpMethod,
56
57    #[configurable(derived)]
58    pub auth: Option<Auth>,
59
60    /// A list of custom headers to add to each request.
61    #[configurable(deprecated = "This option has been deprecated, use `request.headers` instead.")]
62    #[configurable(metadata(
63        docs::additional_props_description = "An HTTP request header and it's value."
64    ))]
65    pub headers: Option<BTreeMap<String, String>>,
66
67    #[configurable(derived)]
68    #[serde(default)]
69    pub compression: Compression,
70
71    #[serde(flatten)]
72    pub encoding: EncodingConfigWithFraming,
73
74    /// A string to prefix the payload with.
75    ///
76    /// This option is ignored if the encoding is not character delimited JSON.
77    ///
78    /// If specified, the `payload_suffix` must also be specified and together they must produce a valid JSON object.
79    #[configurable(metadata(docs::examples = "{\"data\":"))]
80    #[serde(default)]
81    pub payload_prefix: String,
82
83    /// A string to suffix the payload with.
84    ///
85    /// This option is ignored if the encoding is not character delimited JSON.
86    ///
87    /// If specified, the `payload_prefix` must also be specified and together they must produce a valid JSON object.
88    #[configurable(metadata(docs::examples = "}"))]
89    #[serde(default)]
90    pub payload_suffix: String,
91
92    #[configurable(derived)]
93    #[serde(default)]
94    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
95
96    #[configurable(derived)]
97    #[serde(default)]
98    pub request: RequestConfig,
99
100    #[configurable(derived)]
101    pub tls: Option<TlsConfig>,
102
103    #[configurable(derived)]
104    #[serde(
105        default,
106        deserialize_with = "crate::serde::bool_or_struct",
107        skip_serializing_if = "crate::serde::is_default"
108    )]
109    pub acknowledgements: AcknowledgementsConfig,
110}
111
112/// HTTP method.
113///
114/// A subset of the HTTP methods described in [RFC 9110, section 9.1][rfc9110] are supported.
115///
116/// [rfc9110]: https://datatracker.ietf.org/doc/html/rfc9110#section-9.1
117#[configurable_component]
118#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
119#[serde(rename_all = "snake_case")]
120#[derivative(Default)]
121pub enum HttpMethod {
122    /// GET.
123    Get,
124
125    /// HEAD.
126    Head,
127
128    /// POST.
129    #[derivative(Default)]
130    Post,
131
132    /// PUT.
133    Put,
134
135    /// DELETE.
136    Delete,
137
138    /// OPTIONS.
139    Options,
140
141    /// TRACE.
142    Trace,
143
144    /// PATCH.
145    Patch,
146}
147
148impl From<HttpMethod> for Method {
149    fn from(http_method: HttpMethod) -> Self {
150        match http_method {
151            HttpMethod::Head => Self::HEAD,
152            HttpMethod::Get => Self::GET,
153            HttpMethod::Post => Self::POST,
154            HttpMethod::Put => Self::PUT,
155            HttpMethod::Patch => Self::PATCH,
156            HttpMethod::Delete => Self::DELETE,
157            HttpMethod::Options => Self::OPTIONS,
158            HttpMethod::Trace => Self::TRACE,
159        }
160    }
161}
162
163impl HttpSinkConfig {
164    fn build_http_client(&self, cx: &SinkContext) -> crate::Result<HttpClient> {
165        let tls = TlsSettings::from_options(self.tls.as_ref())?;
166        Ok(HttpClient::new(tls, cx.proxy())?)
167    }
168
169    pub(super) fn build_encoder(&self) -> crate::Result<Encoder<Framer>> {
170        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
171        Ok(Encoder::<Framer>::new(framer, serializer))
172    }
173}
174
175impl GenerateConfig for HttpSinkConfig {
176    fn generate_config() -> toml::Value {
177        toml::from_str(
178            r#"uri = "https://10.22.212.22:9000/endpoint"
179            encoding.codec = "json""#,
180        )
181        .unwrap()
182    }
183}
184
185async fn healthcheck(uri: UriSerde, auth: Option<Auth>, client: HttpClient) -> crate::Result<()> {
186    let auth = auth.choose_one(&uri.auth)?;
187    let uri = uri.with_default_parts();
188    let mut request = Request::head(&uri.uri).body(Body::empty()).unwrap();
189
190    if let Some(auth) = auth {
191        auth.apply(&mut request);
192    }
193
194    let response = client.send(request).await?;
195
196    match response.status() {
197        StatusCode::OK => Ok(()),
198        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
199    }
200}
201
202pub(super) fn validate_headers(
203    headers: &BTreeMap<String, String>,
204    configures_auth: bool,
205) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
206    let headers = crate::sinks::util::http::validate_headers(headers)?;
207
208    for name in headers.keys() {
209        if configures_auth && name.inner() == AUTHORIZATION {
210            return Err("Authorization header can not be used with defined auth options".into());
211        }
212    }
213
214    Ok(headers)
215}
216
217pub(super) fn validate_payload_wrapper(
218    payload_prefix: &str,
219    payload_suffix: &str,
220    encoder: &Encoder<Framer>,
221) -> crate::Result<(String, String)> {
222    let payload = [payload_prefix, "{}", payload_suffix].join("");
223    match (
224        encoder.serializer(),
225        encoder.framer(),
226        serde_json::from_str::<serde_json::Value>(&payload),
227    ) {
228        (
229            Serializer::Json(_),
230            Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
231            Err(_),
232        ) => Err("Payload prefix and suffix wrapper must produce a valid JSON object.".into()),
233        _ => Ok((payload_prefix.to_owned(), payload_suffix.to_owned())),
234    }
235}
236
237#[async_trait]
238#[typetag::serde(name = "http")]
239impl SinkConfig for HttpSinkConfig {
240    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
241        let batch_settings = self.batch.validate()?.into_batcher_settings()?;
242
243        let encoder = self.build_encoder()?;
244        let transformer = self.encoding.transformer();
245
246        let mut request = self.request.clone();
247        request.add_old_option(self.headers.clone());
248
249        validate_headers(&request.headers, self.auth.is_some())?;
250        let (static_headers, template_headers) = request.split_headers();
251
252        let (payload_prefix, payload_suffix) =
253            validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?;
254
255        let client = self.build_http_client(&cx)?;
256
257        let healthcheck = match cx.healthcheck.uri {
258            Some(healthcheck_uri) => {
259                healthcheck(healthcheck_uri, self.auth.clone(), client.clone()).boxed()
260            }
261            None => future::ok(()).boxed(),
262        };
263
264        let content_type = {
265            use Framer::*;
266            use Serializer::*;
267            match (encoder.serializer(), encoder.framer()) {
268                (RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()),
269                (Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()),
270                (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
271                    Some(CONTENT_TYPE_JSON.to_owned())
272                }
273                #[cfg(feature = "codecs-opentelemetry")]
274                (Otlp(_), _) => Some("application/x-protobuf".to_owned()),
275                _ => None,
276            }
277        };
278
279        let request_builder = HttpRequestBuilder {
280            encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix),
281            compression: self.compression,
282        };
283
284        let content_encoding = self.compression.is_compressed().then(|| {
285            self.compression
286                .content_encoding()
287                .expect("Encoding should be specified for compression.")
288                .to_string()
289        });
290
291        let converted_static_headers = static_headers
292            .into_iter()
293            .map(|(name, value)| -> crate::Result<_> {
294                let header_name =
295                    HeaderName::from_bytes(name.as_bytes()).map(OrderedHeaderName::from)?;
296                let header_value = HeaderValue::try_from(value)?;
297                Ok((header_name, header_value))
298            })
299            .collect::<Result<BTreeMap<_, _>, _>>()?;
300
301        let http_sink_request_builder = HttpSinkRequestBuilder::new(
302            self.method,
303            self.auth.clone(),
304            converted_static_headers,
305            content_type,
306            content_encoding,
307        );
308
309        let service = match &self.auth {
310            #[cfg(feature = "aws-core")]
311            Some(Auth::Aws { auth, service }) => {
312                let default_region = crate::aws::region_provider(&ProxyConfig::default(), None)?
313                    .region()
314                    .await;
315                let region = (match &auth {
316                    AwsAuthentication::AccessKey { region, .. } => region.clone(),
317                    AwsAuthentication::File { .. } => None,
318                    AwsAuthentication::Role { region, .. } => region.clone(),
319                    AwsAuthentication::Default { region, .. } => region.clone(),
320                })
321                .map_or(default_region, |r| Some(Region::new(r.to_string())))
322                .expect("Region must be specified");
323
324                HttpService::new_with_sig_v4(
325                    client,
326                    http_sink_request_builder,
327                    SigV4Config {
328                        shared_credentials_provider: auth
329                            .credentials_provider(region.clone(), &ProxyConfig::default(), None)
330                            .await?,
331                        region: region.clone(),
332                        service: service.clone(),
333                    },
334                )
335            }
336            _ => HttpService::new(client, http_sink_request_builder),
337        };
338
339        let request_limits = self.request.tower.into_settings();
340
341        let service = ServiceBuilder::new()
342            .settings(request_limits, http_response_retry_logic())
343            .service(service);
344
345        let sink = HttpSink::new(
346            service,
347            self.uri.clone(),
348            template_headers,
349            batch_settings,
350            request_builder,
351        );
352
353        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
354    }
355
356    fn input(&self) -> Input {
357        Input::new(self.encoding.config().1.input_type())
358    }
359
360    fn files_to_watch(&self) -> Vec<&PathBuf> {
361        let mut files = Vec::new();
362        if let Some(tls) = &self.tls {
363            if let Some(crt_file) = &tls.crt_file {
364                files.push(crt_file)
365            }
366            if let Some(key_file) = &tls.key_file {
367                files.push(key_file)
368            }
369        };
370        files
371    }
372
373    fn acknowledgements(&self) -> &AcknowledgementsConfig {
374        &self.acknowledgements
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use vector_lib::codecs::encoding::format::JsonSerializerOptions;
381
382    use super::*;
383    use crate::components::validation::prelude::*;
384
385    impl ValidatableComponent for HttpSinkConfig {
386        fn validation_configuration() -> ValidationConfiguration {
387            use std::str::FromStr;
388
389            use vector_lib::{
390                codecs::{JsonSerializerConfig, MetricTagValues},
391                config::LogNamespace,
392            };
393
394            let endpoint = "http://127.0.0.1:9000/endpoint";
395            let uri = UriSerde::from_str(endpoint).expect("should never fail to parse");
396
397            let config = HttpSinkConfig {
398                uri: Template::try_from(endpoint).expect("should never fail to parse"),
399                method: HttpMethod::Post,
400                encoding: EncodingConfigWithFraming::new(
401                    None,
402                    JsonSerializerConfig::new(
403                        MetricTagValues::Full,
404                        JsonSerializerOptions::default(),
405                    )
406                    .into(),
407                    Transformer::default(),
408                ),
409                auth: None,
410                headers: None,
411                compression: Compression::default(),
412                batch: BatchConfig::default(),
413                request: RequestConfig::default(),
414                tls: None,
415                acknowledgements: AcknowledgementsConfig::default(),
416                payload_prefix: String::new(),
417                payload_suffix: String::new(),
418            };
419
420            let external_resource = ExternalResource::new(
421                ResourceDirection::Push,
422                HttpResourceConfig::from_parts(uri.uri, Some(config.method.into())),
423                config.encoding.clone(),
424            );
425
426            ValidationConfiguration::from_sink(
427                Self::NAME,
428                LogNamespace::Legacy,
429                vec![ComponentTestCaseConfig::from_sink(
430                    config,
431                    None,
432                    Some(external_resource),
433                )],
434            )
435        }
436    }
437
438    register_validatable_component!(HttpSinkConfig);
439}