vector/sinks/http/
config.rs

1//! Configuration for the `http` sink.
2
3use std::{collections::BTreeMap, path::PathBuf};
4
5#[cfg(feature = "aws-core")]
6use aws_config::meta::region::ProvideRegion;
7#[cfg(feature = "aws-core")]
8use aws_types::region::Region;
9use http::{HeaderName, HeaderValue, Method, Request, StatusCode, header::AUTHORIZATION};
10use hyper::Body;
11use vector_lib::codecs::{
12    CharacterDelimitedEncoder,
13    encoding::{Framer, Serializer},
14};
15#[cfg(feature = "aws-core")]
16use vector_lib::config::proxy::ProxyConfig;
17
18use super::{
19    encoder::HttpEncoder, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder,
20    sink::HttpSink,
21};
22#[cfg(feature = "aws-core")]
23use crate::aws::AwsAuthentication;
24#[cfg(feature = "aws-core")]
25use crate::sinks::util::http::SigV4Config;
26use crate::{
27    codecs::{EncodingConfigWithFraming, SinkType},
28    http::{Auth, HttpClient, MaybeAuth},
29    sinks::{
30        prelude::*,
31        util::{
32            RealtimeSizeBasedDefaultBatchSettings, UriSerde,
33            http::{HttpService, OrderedHeaderName, RequestConfig, http_response_retry_logic},
34        },
35    },
36};
37
38const CONTENT_TYPE_TEXT: &str = "text/plain";
39const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson";
40const CONTENT_TYPE_JSON: &str = "application/json";
41
42/// Configuration for the `http` sink.
43#[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))]
44#[derive(Clone, Debug)]
45#[serde(deny_unknown_fields)]
46pub struct HttpSinkConfig {
47    /// The full URI to make HTTP requests to.
48    ///
49    /// This should include the protocol and host, but can also include the port, path, and any other valid part of a URI.
50    #[configurable(metadata(docs::examples = "https://10.22.212.22:9000/endpoint"))]
51    pub uri: Template,
52
53    /// The HTTP method to use when making the request.
54    #[serde(default)]
55    pub method: HttpMethod,
56
57    #[configurable(derived)]
58    pub auth: Option<Auth>,
59
60    #[configurable(derived)]
61    #[serde(default)]
62    pub compression: Compression,
63
64    #[serde(flatten)]
65    pub encoding: EncodingConfigWithFraming,
66
67    /// A string to prefix the payload with.
68    ///
69    /// This option is ignored if the encoding is not character delimited JSON.
70    ///
71    /// If specified, the `payload_suffix` must also be specified and together they must produce a valid JSON object.
72    #[configurable(metadata(docs::examples = "{\"data\":"))]
73    #[serde(default)]
74    pub payload_prefix: String,
75
76    /// A string to suffix the payload with.
77    ///
78    /// This option is ignored if the encoding is not character delimited JSON.
79    ///
80    /// If specified, the `payload_prefix` must also be specified and together they must produce a valid JSON object.
81    #[configurable(metadata(docs::examples = "}"))]
82    #[serde(default)]
83    pub payload_suffix: String,
84
85    #[configurable(derived)]
86    #[serde(default)]
87    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
88
89    #[configurable(derived)]
90    #[serde(default)]
91    pub request: RequestConfig,
92
93    #[configurable(derived)]
94    pub tls: Option<TlsConfig>,
95
96    #[configurable(derived)]
97    #[serde(
98        default,
99        deserialize_with = "crate::serde::bool_or_struct",
100        skip_serializing_if = "crate::serde::is_default"
101    )]
102    pub acknowledgements: AcknowledgementsConfig,
103}
104
105/// HTTP method.
106///
107/// A subset of the HTTP methods described in [RFC 9110, section 9.1][rfc9110] are supported.
108///
109/// [rfc9110]: https://datatracker.ietf.org/doc/html/rfc9110#section-9.1
110#[configurable_component]
111#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
112#[serde(rename_all = "snake_case")]
113pub enum HttpMethod {
114    /// GET.
115    Get,
116
117    /// HEAD.
118    Head,
119
120    /// POST.
121    #[default]
122    Post,
123
124    /// PUT.
125    Put,
126
127    /// DELETE.
128    Delete,
129
130    /// OPTIONS.
131    Options,
132
133    /// TRACE.
134    Trace,
135
136    /// PATCH.
137    Patch,
138}
139
140impl From<HttpMethod> for Method {
141    fn from(http_method: HttpMethod) -> Self {
142        match http_method {
143            HttpMethod::Head => Self::HEAD,
144            HttpMethod::Get => Self::GET,
145            HttpMethod::Post => Self::POST,
146            HttpMethod::Put => Self::PUT,
147            HttpMethod::Patch => Self::PATCH,
148            HttpMethod::Delete => Self::DELETE,
149            HttpMethod::Options => Self::OPTIONS,
150            HttpMethod::Trace => Self::TRACE,
151        }
152    }
153}
154
155impl HttpSinkConfig {
156    fn build_http_client(&self, cx: &SinkContext) -> crate::Result<HttpClient> {
157        let tls = TlsSettings::from_options(self.tls.as_ref())?;
158        Ok(HttpClient::new(tls, cx.proxy())?)
159    }
160
161    pub(super) fn build_encoder(&self) -> crate::Result<Encoder<Framer>> {
162        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
163        Ok(Encoder::<Framer>::new(framer, serializer))
164    }
165}
166
167impl GenerateConfig for HttpSinkConfig {
168    fn generate_config() -> toml::Value {
169        toml::from_str(
170            r#"uri = "https://10.22.212.22:9000/endpoint"
171            encoding.codec = "json""#,
172        )
173        .unwrap()
174    }
175}
176
177async fn healthcheck(uri: UriSerde, auth: Option<Auth>, client: HttpClient) -> crate::Result<()> {
178    let auth = auth.choose_one(&uri.auth)?;
179    let uri = uri.with_default_parts();
180    let mut request = Request::head(&uri.uri).body(Body::empty()).unwrap();
181
182    if let Some(auth) = auth {
183        auth.apply(&mut request);
184    }
185
186    let response = client.send(request).await?;
187
188    match response.status() {
189        StatusCode::OK => Ok(()),
190        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
191    }
192}
193
194pub(super) fn validate_headers(
195    headers: &BTreeMap<String, String>,
196    configures_auth: bool,
197) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
198    let headers = crate::sinks::util::http::validate_headers(headers)?;
199
200    for name in headers.keys() {
201        if configures_auth && name.inner() == AUTHORIZATION {
202            return Err("Authorization header can not be used with defined auth options".into());
203        }
204    }
205
206    Ok(headers)
207}
208
209pub(super) fn validate_payload_wrapper(
210    payload_prefix: &str,
211    payload_suffix: &str,
212    encoder: &Encoder<Framer>,
213) -> crate::Result<(String, String)> {
214    let payload = [payload_prefix, "{}", payload_suffix].join("");
215    match (
216        encoder.serializer(),
217        encoder.framer(),
218        serde_json::from_str::<serde_json::Value>(&payload),
219    ) {
220        (
221            Serializer::Json(_),
222            Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
223            Err(_),
224        ) => Err("Payload prefix and suffix wrapper must produce a valid JSON object.".into()),
225        _ => Ok((payload_prefix.to_owned(), payload_suffix.to_owned())),
226    }
227}
228
229#[async_trait]
230#[typetag::serde(name = "http")]
231impl SinkConfig for HttpSinkConfig {
232    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
233        let batch_settings = self.batch.validate()?.into_batcher_settings()?;
234
235        let encoder = self.build_encoder()?;
236        let transformer = self.encoding.transformer();
237
238        let request = self.request.clone();
239
240        validate_headers(&request.headers, self.auth.is_some())?;
241        let (static_headers, template_headers) = request.split_headers();
242
243        let (payload_prefix, payload_suffix) =
244            validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?;
245
246        let client = self.build_http_client(&cx)?;
247
248        let healthcheck = match cx.healthcheck.uri {
249            Some(healthcheck_uri) => {
250                healthcheck(healthcheck_uri, self.auth.clone(), client.clone()).boxed()
251            }
252            None => future::ok(()).boxed(),
253        };
254
255        let content_type = {
256            use Framer::*;
257            use Serializer::*;
258            match (encoder.serializer(), encoder.framer()) {
259                (RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()),
260                (Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()),
261                (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
262                    Some(CONTENT_TYPE_JSON.to_owned())
263                }
264                #[cfg(feature = "codecs-opentelemetry")]
265                (Otlp(_), _) => Some("application/x-protobuf".to_owned()),
266                _ => None,
267            }
268        };
269
270        let request_builder = HttpRequestBuilder {
271            encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix),
272            compression: self.compression,
273        };
274
275        let content_encoding = self.compression.is_compressed().then(|| {
276            self.compression
277                .content_encoding()
278                .expect("Encoding should be specified for compression.")
279                .to_string()
280        });
281
282        let converted_static_headers = static_headers
283            .into_iter()
284            .map(|(name, value)| -> crate::Result<_> {
285                let header_name =
286                    HeaderName::from_bytes(name.as_bytes()).map(OrderedHeaderName::from)?;
287                let header_value = HeaderValue::try_from(value)?;
288                Ok((header_name, header_value))
289            })
290            .collect::<Result<BTreeMap<_, _>, _>>()?;
291
292        let http_sink_request_builder = HttpSinkRequestBuilder::new(
293            self.method,
294            self.auth.clone(),
295            converted_static_headers,
296            content_type,
297            content_encoding,
298        );
299
300        let service = match &self.auth {
301            #[cfg(feature = "aws-core")]
302            Some(Auth::Aws { auth, service }) => {
303                let default_region = crate::aws::region_provider(&ProxyConfig::default(), None)?
304                    .region()
305                    .await;
306                let region = (match &auth {
307                    AwsAuthentication::AccessKey { region, .. } => region.clone(),
308                    AwsAuthentication::File { .. } => None,
309                    AwsAuthentication::Role { region, .. } => region.clone(),
310                    AwsAuthentication::Default { region, .. } => region.clone(),
311                })
312                .map_or(default_region, |r| Some(Region::new(r.to_string())))
313                .expect("Region must be specified");
314
315                HttpService::new_with_sig_v4(
316                    client,
317                    http_sink_request_builder,
318                    SigV4Config {
319                        shared_credentials_provider: auth
320                            .credentials_provider(region.clone(), &ProxyConfig::default(), None)
321                            .await?,
322                        region: region.clone(),
323                        service: service.clone(),
324                    },
325                )
326            }
327            _ => HttpService::new(client, http_sink_request_builder),
328        };
329
330        let request_limits = self.request.tower.into_settings();
331
332        let service = ServiceBuilder::new()
333            .settings(request_limits, http_response_retry_logic())
334            .service(service);
335
336        let sink = HttpSink::new(
337            service,
338            self.uri.clone(),
339            template_headers,
340            batch_settings,
341            request_builder,
342        );
343
344        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
345    }
346
347    fn input(&self) -> Input {
348        Input::new(self.encoding.config().1.input_type())
349    }
350
351    fn files_to_watch(&self) -> Vec<&PathBuf> {
352        let mut files = Vec::new();
353        if let Some(tls) = &self.tls {
354            if let Some(crt_file) = &tls.crt_file {
355                files.push(crt_file)
356            }
357            if let Some(key_file) = &tls.key_file {
358                files.push(key_file)
359            }
360        };
361        files
362    }
363
364    fn acknowledgements(&self) -> &AcknowledgementsConfig {
365        &self.acknowledgements
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use vector_lib::codecs::encoding::format::JsonSerializerOptions;
372
373    use super::*;
374    use crate::components::validation::prelude::*;
375
376    impl ValidatableComponent for HttpSinkConfig {
377        fn validation_configuration() -> ValidationConfiguration {
378            use std::str::FromStr;
379
380            use vector_lib::{
381                codecs::{JsonSerializerConfig, MetricTagValues},
382                config::LogNamespace,
383            };
384
385            let endpoint = "http://127.0.0.1:9000/endpoint";
386            let uri = UriSerde::from_str(endpoint).expect("should never fail to parse");
387
388            let config = HttpSinkConfig {
389                uri: Template::try_from(endpoint).expect("should never fail to parse"),
390                method: HttpMethod::Post,
391                encoding: EncodingConfigWithFraming::new(
392                    None,
393                    JsonSerializerConfig::new(
394                        MetricTagValues::Full,
395                        JsonSerializerOptions::default(),
396                    )
397                    .into(),
398                    Transformer::default(),
399                ),
400                auth: None,
401                compression: Compression::default(),
402                batch: BatchConfig::default(),
403                request: RequestConfig::default(),
404                tls: None,
405                acknowledgements: AcknowledgementsConfig::default(),
406                payload_prefix: String::new(),
407                payload_suffix: String::new(),
408            };
409
410            let external_resource = ExternalResource::new(
411                ResourceDirection::Push,
412                HttpResourceConfig::from_parts(uri.uri, Some(config.method.into())),
413                config.encoding.clone(),
414            );
415
416            ValidationConfiguration::from_sink(
417                Self::NAME,
418                LogNamespace::Legacy,
419                vec![ComponentTestCaseConfig::from_sink(
420                    config,
421                    None,
422                    Some(external_resource),
423                )],
424            )
425        }
426    }
427
428    register_validatable_component!(HttpSinkConfig);
429}