1use std::{collections::BTreeMap, path::PathBuf};
4
5#[cfg(feature = "aws-core")]
6use aws_config::meta::region::ProvideRegion;
7#[cfg(feature = "aws-core")]
8use aws_types::region::Region;
9use http::{HeaderName, HeaderValue, Method, Request, StatusCode, header::AUTHORIZATION};
10use hyper::Body;
11use vector_lib::codecs::{
12 CharacterDelimitedEncoder,
13 encoding::{Framer, Serializer},
14};
15#[cfg(feature = "aws-core")]
16use vector_lib::config::proxy::ProxyConfig;
17
18use super::{
19 encoder::HttpEncoder, request_builder::HttpRequestBuilder, service::HttpSinkRequestBuilder,
20 sink::HttpSink,
21};
22#[cfg(feature = "aws-core")]
23use crate::aws::AwsAuthentication;
24#[cfg(feature = "aws-core")]
25use crate::sinks::util::http::SigV4Config;
26use crate::{
27 codecs::{EncodingConfigWithFraming, SinkType},
28 http::{Auth, HttpClient, MaybeAuth},
29 sinks::{
30 prelude::*,
31 util::{
32 RealtimeSizeBasedDefaultBatchSettings, UriSerde,
33 http::{HttpService, OrderedHeaderName, RequestConfig, http_response_retry_logic},
34 },
35 },
36};
37
38const CONTENT_TYPE_TEXT: &str = "text/plain";
39const CONTENT_TYPE_NDJSON: &str = "application/x-ndjson";
40const CONTENT_TYPE_JSON: &str = "application/json";
41
42#[configurable_component(sink("http", "Deliver observability event data to an HTTP server."))]
44#[derive(Clone, Debug)]
45#[serde(deny_unknown_fields)]
46pub struct HttpSinkConfig {
47 #[configurable(metadata(docs::examples = "https://10.22.212.22:9000/endpoint"))]
51 pub uri: Template,
52
53 #[serde(default)]
55 pub method: HttpMethod,
56
57 #[configurable(derived)]
58 pub auth: Option<Auth>,
59
60 #[configurable(derived)]
61 #[serde(default)]
62 pub compression: Compression,
63
64 #[serde(flatten)]
65 pub encoding: EncodingConfigWithFraming,
66
67 #[configurable(metadata(docs::examples = "{\"data\":"))]
73 #[serde(default)]
74 pub payload_prefix: String,
75
76 #[configurable(metadata(docs::examples = "}"))]
82 #[serde(default)]
83 pub payload_suffix: String,
84
85 #[configurable(derived)]
86 #[serde(default)]
87 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
88
89 #[configurable(derived)]
90 #[serde(default)]
91 pub request: RequestConfig,
92
93 #[configurable(derived)]
94 pub tls: Option<TlsConfig>,
95
96 #[configurable(derived)]
97 #[serde(
98 default,
99 deserialize_with = "crate::serde::bool_or_struct",
100 skip_serializing_if = "crate::serde::is_default"
101 )]
102 pub acknowledgements: AcknowledgementsConfig,
103}
104
105#[configurable_component]
111#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
112#[serde(rename_all = "snake_case")]
113pub enum HttpMethod {
114 Get,
116
117 Head,
119
120 #[default]
122 Post,
123
124 Put,
126
127 Delete,
129
130 Options,
132
133 Trace,
135
136 Patch,
138}
139
140impl From<HttpMethod> for Method {
141 fn from(http_method: HttpMethod) -> Self {
142 match http_method {
143 HttpMethod::Head => Self::HEAD,
144 HttpMethod::Get => Self::GET,
145 HttpMethod::Post => Self::POST,
146 HttpMethod::Put => Self::PUT,
147 HttpMethod::Patch => Self::PATCH,
148 HttpMethod::Delete => Self::DELETE,
149 HttpMethod::Options => Self::OPTIONS,
150 HttpMethod::Trace => Self::TRACE,
151 }
152 }
153}
154
155impl HttpSinkConfig {
156 fn build_http_client(&self, cx: &SinkContext) -> crate::Result<HttpClient> {
157 let tls = TlsSettings::from_options(self.tls.as_ref())?;
158 Ok(HttpClient::new(tls, cx.proxy())?)
159 }
160
161 pub(super) fn build_encoder(&self) -> crate::Result<Encoder<Framer>> {
162 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
163 Ok(Encoder::<Framer>::new(framer, serializer))
164 }
165}
166
167impl GenerateConfig for HttpSinkConfig {
168 fn generate_config() -> toml::Value {
169 toml::from_str(
170 r#"uri = "https://10.22.212.22:9000/endpoint"
171 encoding.codec = "json""#,
172 )
173 .unwrap()
174 }
175}
176
177async fn healthcheck(uri: UriSerde, auth: Option<Auth>, client: HttpClient) -> crate::Result<()> {
178 let auth = auth.choose_one(&uri.auth)?;
179 let uri = uri.with_default_parts();
180 let mut request = Request::head(&uri.uri).body(Body::empty()).unwrap();
181
182 if let Some(auth) = auth {
183 auth.apply(&mut request);
184 }
185
186 let response = client.send(request).await?;
187
188 match response.status() {
189 StatusCode::OK => Ok(()),
190 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
191 }
192}
193
194pub(super) fn validate_headers(
195 headers: &BTreeMap<String, String>,
196 configures_auth: bool,
197) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
198 let headers = crate::sinks::util::http::validate_headers(headers)?;
199
200 for name in headers.keys() {
201 if configures_auth && name.inner() == AUTHORIZATION {
202 return Err("Authorization header can not be used with defined auth options".into());
203 }
204 }
205
206 Ok(headers)
207}
208
209pub(super) fn validate_payload_wrapper(
210 payload_prefix: &str,
211 payload_suffix: &str,
212 encoder: &Encoder<Framer>,
213) -> crate::Result<(String, String)> {
214 let payload = [payload_prefix, "{}", payload_suffix].join("");
215 match (
216 encoder.serializer(),
217 encoder.framer(),
218 serde_json::from_str::<serde_json::Value>(&payload),
219 ) {
220 (
221 Serializer::Json(_),
222 Framer::CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' }),
223 Err(_),
224 ) => Err("Payload prefix and suffix wrapper must produce a valid JSON object.".into()),
225 _ => Ok((payload_prefix.to_owned(), payload_suffix.to_owned())),
226 }
227}
228
229#[async_trait]
230#[typetag::serde(name = "http")]
231impl SinkConfig for HttpSinkConfig {
232 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
233 let batch_settings = self.batch.validate()?.into_batcher_settings()?;
234
235 let encoder = self.build_encoder()?;
236 let transformer = self.encoding.transformer();
237
238 let request = self.request.clone();
239
240 validate_headers(&request.headers, self.auth.is_some())?;
241 let (static_headers, template_headers) = request.split_headers();
242
243 let (payload_prefix, payload_suffix) =
244 validate_payload_wrapper(&self.payload_prefix, &self.payload_suffix, &encoder)?;
245
246 let client = self.build_http_client(&cx)?;
247
248 let healthcheck = match cx.healthcheck.uri {
249 Some(healthcheck_uri) => {
250 healthcheck(healthcheck_uri, self.auth.clone(), client.clone()).boxed()
251 }
252 None => future::ok(()).boxed(),
253 };
254
255 let content_type = {
256 use Framer::*;
257 use Serializer::*;
258 match (encoder.serializer(), encoder.framer()) {
259 (RawMessage(_) | Text(_), _) => Some(CONTENT_TYPE_TEXT.to_owned()),
260 (Json(_), NewlineDelimited(_)) => Some(CONTENT_TYPE_NDJSON.to_owned()),
261 (Json(_), CharacterDelimited(CharacterDelimitedEncoder { delimiter: b',' })) => {
262 Some(CONTENT_TYPE_JSON.to_owned())
263 }
264 #[cfg(feature = "codecs-opentelemetry")]
265 (Otlp(_), _) => Some("application/x-protobuf".to_owned()),
266 _ => None,
267 }
268 };
269
270 let request_builder = HttpRequestBuilder {
271 encoder: HttpEncoder::new(encoder, transformer, payload_prefix, payload_suffix),
272 compression: self.compression,
273 };
274
275 let content_encoding = self.compression.is_compressed().then(|| {
276 self.compression
277 .content_encoding()
278 .expect("Encoding should be specified for compression.")
279 .to_string()
280 });
281
282 let converted_static_headers = static_headers
283 .into_iter()
284 .map(|(name, value)| -> crate::Result<_> {
285 let header_name =
286 HeaderName::from_bytes(name.as_bytes()).map(OrderedHeaderName::from)?;
287 let header_value = HeaderValue::try_from(value)?;
288 Ok((header_name, header_value))
289 })
290 .collect::<Result<BTreeMap<_, _>, _>>()?;
291
292 let http_sink_request_builder = HttpSinkRequestBuilder::new(
293 self.method,
294 self.auth.clone(),
295 converted_static_headers,
296 content_type,
297 content_encoding,
298 );
299
300 let service = match &self.auth {
301 #[cfg(feature = "aws-core")]
302 Some(Auth::Aws { auth, service }) => {
303 let default_region = crate::aws::region_provider(&ProxyConfig::default(), None)?
304 .region()
305 .await;
306 let region = (match &auth {
307 AwsAuthentication::AccessKey { region, .. } => region.clone(),
308 AwsAuthentication::File { .. } => None,
309 AwsAuthentication::Role { region, .. } => region.clone(),
310 AwsAuthentication::Default { region, .. } => region.clone(),
311 })
312 .map_or(default_region, |r| Some(Region::new(r.to_string())))
313 .expect("Region must be specified");
314
315 HttpService::new_with_sig_v4(
316 client,
317 http_sink_request_builder,
318 SigV4Config {
319 shared_credentials_provider: auth
320 .credentials_provider(region.clone(), &ProxyConfig::default(), None)
321 .await?,
322 region: region.clone(),
323 service: service.clone(),
324 },
325 )
326 }
327 _ => HttpService::new(client, http_sink_request_builder),
328 };
329
330 let request_limits = self.request.tower.into_settings();
331
332 let service = ServiceBuilder::new()
333 .settings(request_limits, http_response_retry_logic())
334 .service(service);
335
336 let sink = HttpSink::new(
337 service,
338 self.uri.clone(),
339 template_headers,
340 batch_settings,
341 request_builder,
342 );
343
344 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
345 }
346
347 fn input(&self) -> Input {
348 Input::new(self.encoding.config().1.input_type())
349 }
350
351 fn files_to_watch(&self) -> Vec<&PathBuf> {
352 let mut files = Vec::new();
353 if let Some(tls) = &self.tls {
354 if let Some(crt_file) = &tls.crt_file {
355 files.push(crt_file)
356 }
357 if let Some(key_file) = &tls.key_file {
358 files.push(key_file)
359 }
360 };
361 files
362 }
363
364 fn acknowledgements(&self) -> &AcknowledgementsConfig {
365 &self.acknowledgements
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use vector_lib::codecs::encoding::format::JsonSerializerOptions;
372
373 use super::*;
374 use crate::components::validation::prelude::*;
375
376 impl ValidatableComponent for HttpSinkConfig {
377 fn validation_configuration() -> ValidationConfiguration {
378 use std::str::FromStr;
379
380 use vector_lib::{
381 codecs::{JsonSerializerConfig, MetricTagValues},
382 config::LogNamespace,
383 };
384
385 let endpoint = "http://127.0.0.1:9000/endpoint";
386 let uri = UriSerde::from_str(endpoint).expect("should never fail to parse");
387
388 let config = HttpSinkConfig {
389 uri: Template::try_from(endpoint).expect("should never fail to parse"),
390 method: HttpMethod::Post,
391 encoding: EncodingConfigWithFraming::new(
392 None,
393 JsonSerializerConfig::new(
394 MetricTagValues::Full,
395 JsonSerializerOptions::default(),
396 )
397 .into(),
398 Transformer::default(),
399 ),
400 auth: None,
401 compression: Compression::default(),
402 batch: BatchConfig::default(),
403 request: RequestConfig::default(),
404 tls: None,
405 acknowledgements: AcknowledgementsConfig::default(),
406 payload_prefix: String::new(),
407 payload_suffix: String::new(),
408 };
409
410 let external_resource = ExternalResource::new(
411 ResourceDirection::Push,
412 HttpResourceConfig::from_parts(uri.uri, Some(config.method.into())),
413 config.encoding.clone(),
414 );
415
416 ValidationConfiguration::from_sink(
417 Self::NAME,
418 LogNamespace::Legacy,
419 vec![ComponentTestCaseConfig::from_sink(
420 config,
421 None,
422 Some(external_resource),
423 )],
424 )
425 }
426 }
427
428 register_validatable_component!(HttpSinkConfig);
429}