vector/sinks/util/
http.rs

1#[cfg(feature = "aws-core")]
2use aws_credential_types::provider::SharedCredentialsProvider;
3#[cfg(feature = "aws-core")]
4use aws_types::region::Region;
5use bytes::{Buf, Bytes};
6use futures::{Sink, future::BoxFuture};
7use headers::HeaderName;
8use http::{HeaderValue, Request, Response, StatusCode, header};
9use http_body::Body as _;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12pub struct OrderedHeaderName(HeaderName);
13
14impl OrderedHeaderName {
15    pub const fn new(header_name: HeaderName) -> Self {
16        Self(header_name)
17    }
18
19    pub const fn inner(&self) -> &HeaderName {
20        &self.0
21    }
22}
23
24impl From<HeaderName> for OrderedHeaderName {
25    fn from(header_name: HeaderName) -> Self {
26        Self(header_name)
27    }
28}
29
30impl Ord for OrderedHeaderName {
31    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
32        self.0.as_str().cmp(other.0.as_str())
33    }
34}
35
36impl PartialOrd for OrderedHeaderName {
37    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
38        Some(self.cmp(other))
39    }
40}
41use std::{
42    collections::BTreeMap,
43    fmt,
44    future::Future,
45    hash::Hash,
46    marker::PhantomData,
47    pin::Pin,
48    sync::Arc,
49    task::{Context, Poll, ready},
50    time::Duration,
51};
52
53use hyper::Body;
54use pin_project::pin_project;
55use snafu::{ResultExt, Snafu};
56use tower::{Service, ServiceBuilder};
57use tower_http::decompression::DecompressionLayer;
58use vector_lib::{
59    ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
60    stream::batcher::limiter::ItemBatchSize,
61};
62
63use super::{
64    Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
65    TowerRequestSettings,
66    retries::{RetryAction, RetryLogic},
67    sink::{self, Response as _},
68    uri,
69};
70#[cfg(feature = "aws-core")]
71use crate::aws::sign_request;
72use crate::{
73    event::Event,
74    http::{HttpClient, HttpError},
75    internal_events::{EndpointBytesSent, SinkRequestBuildError},
76    sinks::prelude::*,
77    template::Template,
78};
79
80pub trait HttpEventEncoder<Output> {
81    // The encoder handles internal event emission for Error and EventsDropped.
82    fn encode_event(&mut self, event: Event) -> Option<Output>;
83}
84
85pub trait HttpSink: Send + Sync + 'static {
86    type Input;
87    type Output;
88    type Encoder: HttpEventEncoder<Self::Input>;
89
90    fn build_encoder(&self) -> Self::Encoder;
91    fn build_request(
92        &self,
93        events: Self::Output,
94    ) -> impl Future<Output = crate::Result<http::Request<Bytes>>> + Send;
95}
96
97/// Provides a simple wrapper around internal tower and
98/// batching sinks for http.
99///
100/// This type wraps some `HttpSink` and some `Batch` type
101/// and will apply request, batch and tls settings. Internally,
102/// it holds an Arc reference to the `HttpSink`. It then exposes
103/// a `Sink` interface that can be returned from `SinkConfig`.
104///
105/// Implementation details we require to buffer a single item due
106/// to how `Sink` works. This is because we must "encode" the type
107/// to be able to send it to the inner batch type and sink. Because of
108/// this we must provide a single buffer slot. To ensure the buffer is
109/// fully flushed make sure `poll_flush` returns ready.
110///
111/// Note: This has been deprecated, please do not use when creating new Sinks.
112#[pin_project]
113pub struct BatchedHttpSink<T, B, RL = HttpRetryLogic<<B as Batch>::Output>>
114where
115    B: Batch,
116    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
117    T: HttpSink<Input = B::Input, Output = B::Output>,
118    RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
119        + Send
120        + 'static,
121{
122    sink: Arc<T>,
123    #[pin]
124    inner: TowerBatchedSink<
125        HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
126        B,
127        RL,
128    >,
129    encoder: T::Encoder,
130    // An empty slot is needed to buffer an item where we encoded it but
131    // the inner sink is applying back pressure. This trick is used in the `WithFlatMap`
132    // sink combinator. https://docs.rs/futures/0.1.29/src/futures/sink/with_flat_map.rs.html#20
133    slot: Option<EncodedEvent<B::Input>>,
134}
135
136impl<T, B> BatchedHttpSink<T, B>
137where
138    B: Batch,
139    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
140    T: HttpSink<Input = B::Input, Output = B::Output>,
141{
142    pub fn new(
143        sink: T,
144        batch: B,
145        request_settings: TowerRequestSettings,
146        batch_timeout: Duration,
147        client: HttpClient,
148    ) -> Self {
149        Self::with_logic(
150            sink,
151            batch,
152            HttpRetryLogic::default(),
153            request_settings,
154            batch_timeout,
155            client,
156        )
157    }
158}
159
160impl<T, B, RL> BatchedHttpSink<T, B, RL>
161where
162    B: Batch,
163    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
164    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
165        + Send
166        + 'static,
167    T: HttpSink<Input = B::Input, Output = B::Output>,
168{
169    pub fn with_logic(
170        sink: T,
171        batch: B,
172        retry_logic: RL,
173        request_settings: TowerRequestSettings,
174        batch_timeout: Duration,
175        client: HttpClient,
176    ) -> Self {
177        let sink = Arc::new(sink);
178
179        let sink1 = Arc::clone(&sink);
180        let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
181            let sink = Arc::clone(&sink1);
182            Box::pin(async move { sink.build_request(b).await })
183        };
184
185        let svc = HttpBatchService::new(client, request_builder);
186        let inner = request_settings.batch_sink(retry_logic, svc, batch, batch_timeout);
187        let encoder = sink.build_encoder();
188
189        Self {
190            sink,
191            inner,
192            encoder,
193            slot: None,
194        }
195    }
196}
197
198impl<T, B, RL> Sink<Event> for BatchedHttpSink<T, B, RL>
199where
200    B: Batch,
201    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
202    T: HttpSink<Input = B::Input, Output = B::Output>,
203    RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
204        + Send
205        + 'static,
206{
207    type Error = crate::Error;
208
209    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210        if self.slot.is_some() {
211            match self.as_mut().poll_flush(cx) {
212                Poll::Ready(Ok(())) => {}
213                Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
214                Poll::Pending => {
215                    if self.slot.is_some() {
216                        return Poll::Pending;
217                    }
218                }
219            }
220        }
221
222        Poll::Ready(Ok(()))
223    }
224
225    fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
226        let byte_size = event.size_of();
227        let json_byte_size = event.estimated_json_encoded_size_of();
228        let finalizers = event.metadata_mut().take_finalizers();
229        if let Some(item) = self.encoder.encode_event(event) {
230            *self.project().slot = Some(EncodedEvent {
231                item,
232                finalizers,
233                byte_size,
234                json_byte_size,
235            });
236        }
237
238        Ok(())
239    }
240
241    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
242        let mut this = self.project();
243        if this.slot.is_some() {
244            ready!(this.inner.as_mut().poll_ready(cx))?;
245            this.inner.as_mut().start_send(this.slot.take().unwrap())?;
246        }
247
248        this.inner.poll_flush(cx)
249    }
250
251    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
252        ready!(self.as_mut().poll_flush(cx))?;
253        self.project().inner.poll_close(cx)
254    }
255}
256
257/// Note: This has been deprecated, please do not use when creating new Sinks.
258#[pin_project]
259pub struct PartitionHttpSink<T, B, K, RL = HttpRetryLogic<<B as Batch>::Output>>
260where
261    B: Batch,
262    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
263    B::Input: Partition<K>,
264    K: Hash + Eq + Clone + Send + 'static,
265    T: HttpSink<Input = B::Input, Output = B::Output>,
266    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
267{
268    sink: Arc<T>,
269    #[pin]
270    inner: TowerPartitionSink<
271        HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
272        B,
273        RL,
274        K,
275    >,
276    encoder: T::Encoder,
277    slot: Option<EncodedEvent<B::Input>>,
278}
279
280impl<T, B, K> PartitionHttpSink<T, B, K, HttpRetryLogic<<B as Batch>::Output>>
281where
282    B: Batch,
283    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
284    B::Input: Partition<K>,
285    K: Hash + Eq + Clone + Send + 'static,
286    T: HttpSink<Input = B::Input, Output = B::Output>,
287{
288    pub fn new(
289        sink: T,
290        batch: B,
291        request_settings: TowerRequestSettings,
292        batch_timeout: Duration,
293        client: HttpClient,
294    ) -> Self {
295        Self::with_retry_logic(
296            sink,
297            batch,
298            HttpRetryLogic::default(),
299            request_settings,
300            batch_timeout,
301            client,
302        )
303    }
304}
305
306impl<T, B, K, RL> PartitionHttpSink<T, B, K, RL>
307where
308    B: Batch,
309    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
310    B::Input: Partition<K>,
311    K: Hash + Eq + Clone + Send + 'static,
312    T: HttpSink<Input = B::Input, Output = B::Output>,
313    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
314        + Send
315        + 'static,
316{
317    pub fn with_retry_logic(
318        sink: T,
319        batch: B,
320        retry_logic: RL,
321        request_settings: TowerRequestSettings,
322        batch_timeout: Duration,
323        client: HttpClient,
324    ) -> Self {
325        let sink = Arc::new(sink);
326
327        let sink1 = Arc::clone(&sink);
328        let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
329            let sink = Arc::clone(&sink1);
330            Box::pin(async move { sink.build_request(b).await })
331        };
332
333        let svc = HttpBatchService::new(client, request_builder);
334        let inner = request_settings.partition_sink(retry_logic, svc, batch, batch_timeout);
335        let encoder = sink.build_encoder();
336
337        Self {
338            sink,
339            inner,
340            encoder,
341            slot: None,
342        }
343    }
344
345    /// Enforces per partition ordering of request.
346    pub fn ordered(mut self) -> Self {
347        self.inner.ordered();
348        self
349    }
350}
351
352impl<T, B, K, RL> Sink<Event> for PartitionHttpSink<T, B, K, RL>
353where
354    B: Batch,
355    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
356    B::Input: Partition<K>,
357    K: Hash + Eq + Clone + Send + 'static,
358    T: HttpSink<Input = B::Input, Output = B::Output>,
359    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
360{
361    type Error = crate::Error;
362
363    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
364        if self.slot.is_some() {
365            match self.as_mut().poll_flush(cx) {
366                Poll::Ready(Ok(())) => {}
367                Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
368                Poll::Pending => {
369                    if self.slot.is_some() {
370                        return Poll::Pending;
371                    }
372                }
373            }
374        }
375
376        Poll::Ready(Ok(()))
377    }
378
379    fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
380        let finalizers = event.metadata_mut().take_finalizers();
381        let byte_size = event.size_of();
382        let json_byte_size = event.estimated_json_encoded_size_of();
383
384        if let Some(item) = self.encoder.encode_event(event) {
385            *self.project().slot = Some(EncodedEvent {
386                item,
387                finalizers,
388                byte_size,
389                json_byte_size,
390            });
391        }
392
393        Ok(())
394    }
395
396    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
397        let mut this = self.project();
398        if this.slot.is_some() {
399            ready!(this.inner.as_mut().poll_ready(cx))?;
400            this.inner.as_mut().start_send(this.slot.take().unwrap())?;
401        }
402
403        this.inner.poll_flush(cx)
404    }
405
406    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
407        ready!(self.as_mut().poll_flush(cx))?;
408        self.project().inner.poll_close(cx)
409    }
410}
411
412#[cfg(feature = "aws-core")]
413#[derive(Clone)]
414pub struct SigV4Config {
415    pub(crate) shared_credentials_provider: SharedCredentialsProvider,
416    pub(crate) region: Region,
417    pub(crate) service: String,
418}
419
420/// @struct HttpBatchService
421///
422/// NOTE: This has been deprecated, please do not use directly when creating new sinks.
423///       The `HttpService` currently wraps this structure. Eventually all sinks currently using the
424///       HttpBatchService directly should be updated to use `HttpService`. At which time we can
425///       remove this struct and inline the functionality into the `HttpService` directly.
426pub struct HttpBatchService<F, B = Bytes> {
427    inner: HttpClient<Body>,
428    request_builder: Arc<dyn Fn(B) -> F + Send + Sync>,
429    #[cfg(feature = "aws-core")]
430    sig_v4_config: Option<SigV4Config>,
431}
432
433impl<F, B> HttpBatchService<F, B> {
434    pub fn new(
435        inner: HttpClient,
436        request_builder: impl Fn(B) -> F + Send + Sync + 'static,
437    ) -> Self {
438        HttpBatchService {
439            inner,
440            request_builder: Arc::new(Box::new(request_builder)),
441            #[cfg(feature = "aws-core")]
442            sig_v4_config: None,
443        }
444    }
445
446    #[cfg(feature = "aws-core")]
447    pub fn new_with_sig_v4(
448        inner: HttpClient,
449        request_builder: impl Fn(B) -> F + Send + Sync + 'static,
450        sig_v4_config: SigV4Config,
451    ) -> Self {
452        HttpBatchService {
453            inner,
454            request_builder: Arc::new(Box::new(request_builder)),
455            sig_v4_config: Some(sig_v4_config),
456        }
457    }
458}
459
460impl<F, B> Service<B> for HttpBatchService<F, B>
461where
462    F: Future<Output = crate::Result<hyper::Request<Bytes>>> + Send + 'static,
463    B: ByteSizeOf + Send + 'static,
464{
465    type Response = http::Response<Bytes>;
466    type Error = crate::Error;
467    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
468
469    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
470        Poll::Ready(Ok(()))
471    }
472
473    fn call(&mut self, body: B) -> Self::Future {
474        let request_builder = Arc::clone(&self.request_builder);
475        #[cfg(feature = "aws-core")]
476        let sig_v4_config = self.sig_v4_config.clone();
477        let http_client = self.inner.clone();
478
479        Box::pin(async move {
480            let request = request_builder(body).await.inspect_err(|error| {
481                emit!(SinkRequestBuildError { error });
482            })?;
483
484            #[cfg(feature = "aws-core")]
485            let request = match sig_v4_config {
486                None => request,
487                Some(sig_v4_config) => {
488                    let mut signed_request = request;
489                    sign_request(
490                        sig_v4_config.service.as_str(),
491                        &mut signed_request,
492                        &sig_v4_config.shared_credentials_provider,
493                        Some(&sig_v4_config.region),
494                        false,
495                    )
496                    .await?;
497
498                    signed_request
499                }
500            };
501            let byte_size = request.body().len();
502            let request = request.map(Body::from);
503            let (protocol, endpoint) = uri::protocol_endpoint(request.uri().clone());
504
505            let mut decompression_service = ServiceBuilder::new()
506                .layer(DecompressionLayer::new())
507                .service(http_client);
508
509            // Any errors raised in `http_client.call` results in a `GotHttpWarning` event being emitted
510            // in `HttpClient::send`. This does not result in incrementing `component_errors_total` however,
511            // because that is incremented by the driver when retries have been exhausted.
512            let response = decompression_service.call(request).await?;
513
514            if response.status().is_success() {
515                emit!(EndpointBytesSent {
516                    byte_size,
517                    protocol: &protocol,
518                    endpoint: &endpoint
519                });
520            }
521
522            let (parts, body) = response.into_parts();
523            let mut body = body.collect().await?.aggregate();
524            Ok(hyper::Response::from_parts(
525                parts,
526                body.copy_to_bytes(body.remaining()),
527            ))
528        })
529    }
530}
531
532impl<F, B> Clone for HttpBatchService<F, B> {
533    fn clone(&self) -> Self {
534        Self {
535            inner: self.inner.clone(),
536            request_builder: Arc::clone(&self.request_builder),
537            #[cfg(feature = "aws-core")]
538            sig_v4_config: self.sig_v4_config.clone(),
539        }
540    }
541}
542
543impl<T: fmt::Debug> sink::Response for http::Response<T> {
544    fn is_successful(&self) -> bool {
545        self.status().is_success()
546    }
547
548    fn is_transient(&self) -> bool {
549        self.status().is_server_error()
550    }
551}
552
553#[derive(Debug, Clone)]
554pub struct HttpRetryLogic<Req> {
555    request: PhantomData<Req>,
556}
557impl<Req> Default for HttpRetryLogic<Req> {
558    fn default() -> Self {
559        Self {
560            request: PhantomData,
561        }
562    }
563}
564
565impl<Req: Clone + Send + Sync + 'static> RetryLogic for HttpRetryLogic<Req> {
566    type Error = HttpError;
567    type Request = Req;
568    type Response = hyper::Response<Bytes>;
569
570    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
571        true
572    }
573
574    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
575        let status = response.status();
576
577        match status {
578            StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
579            StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
580            StatusCode::NOT_IMPLEMENTED => {
581                RetryAction::DontRetry("endpoint not implemented".into())
582            }
583            _ if status.is_server_error() => RetryAction::Retry(
584                format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(),
585            ),
586            _ if status.is_success() => RetryAction::Successful,
587            _ => RetryAction::DontRetry(format!("response status: {status}").into()),
588        }
589    }
590}
591
592/// A more generic version of `HttpRetryLogic` that accepts anything that can be converted
593/// to a status code
594#[derive(Debug)]
595pub struct HttpStatusRetryLogic<F, Req, Res> {
596    func: F,
597    request: PhantomData<Req>,
598    response: PhantomData<Res>,
599}
600
601impl<F, Req, Res> HttpStatusRetryLogic<F, Req, Res>
602where
603    F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
604    Req: Send + Sync + 'static,
605    Res: Send + Sync + 'static,
606{
607    pub const fn new(func: F) -> HttpStatusRetryLogic<F, Req, Res> {
608        HttpStatusRetryLogic {
609            func,
610            request: PhantomData,
611            response: PhantomData,
612        }
613    }
614}
615
616impl<F, Req, Res> RetryLogic for HttpStatusRetryLogic<F, Req, Res>
617where
618    F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
619    Req: Send + Sync + 'static,
620    Res: Send + Sync + 'static,
621{
622    type Error = HttpError;
623    type Request = Req;
624    type Response = Res;
625
626    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
627        true
628    }
629
630    fn should_retry_response(&self, response: &Res) -> RetryAction<Req> {
631        let status = (self.func)(response);
632
633        match status {
634            StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
635            StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
636            StatusCode::NOT_IMPLEMENTED => {
637                RetryAction::DontRetry("endpoint not implemented".into())
638            }
639            _ if status.is_server_error() => {
640                RetryAction::Retry(format!("Http Status: {status}").into())
641            }
642            _ if status.is_success() => RetryAction::Successful,
643            _ => RetryAction::DontRetry(format!("Http status: {status}").into()),
644        }
645    }
646}
647
648impl<F, Req, Res> Clone for HttpStatusRetryLogic<F, Req, Res>
649where
650    F: Clone,
651{
652    fn clone(&self) -> Self {
653        Self {
654            func: self.func.clone(),
655            request: PhantomData,
656            response: PhantomData,
657        }
658    }
659}
660
661/// Outbound HTTP request settings.
662#[configurable_component]
663#[derive(Clone, Debug, Default)]
664pub struct RequestConfig {
665    #[serde(flatten)]
666    pub tower: TowerRequestConfig,
667
668    /// Additional HTTP headers to add to every HTTP request.
669    #[serde(default)]
670    #[configurable(metadata(
671        docs::additional_props_description = "An HTTP request header and its value. Both header names and values support templating with event data."
672    ))]
673    #[configurable(metadata(docs::examples = "headers_examples()"))]
674    pub headers: BTreeMap<String, String>,
675}
676
677fn headers_examples() -> BTreeMap<String, String> {
678    btreemap! {
679        "Accept" => "text/plain",
680        "X-My-Custom-Header" => "A-Value",
681        "X-Event-Level" => "{{level}}",
682        "X-Event-Timestamp" => "{{timestamp}}",
683    }
684}
685
686impl RequestConfig {
687    pub fn add_old_option(&mut self, headers: Option<BTreeMap<String, String>>) {
688        if let Some(headers) = headers {
689            warn!("Option `headers` has been deprecated. Use `request.headers` instead.");
690            self.headers.extend(headers);
691        }
692    }
693
694    pub fn split_headers(&self) -> (BTreeMap<String, String>, BTreeMap<String, Template>) {
695        let mut static_headers = BTreeMap::new();
696        let mut template_headers = BTreeMap::new();
697
698        for (name, value) in &self.headers {
699            match Template::try_from(value.as_str()) {
700                Ok(template) if !template.is_dynamic() => {
701                    static_headers.insert(name.clone(), value.clone());
702                }
703                Ok(template) => {
704                    template_headers.insert(name.clone(), template);
705                }
706                Err(_) => {
707                    static_headers.insert(name.clone(), value.clone());
708                }
709            }
710        }
711
712        (static_headers, template_headers)
713    }
714}
715
716#[derive(Debug, Snafu)]
717pub enum HeaderValidationError {
718    #[snafu(display("{}: {}", source, name))]
719    InvalidHeaderName {
720        name: String,
721        source: header::InvalidHeaderName,
722    },
723    #[snafu(display("{}: {}", source, value))]
724    InvalidHeaderValue {
725        value: String,
726        source: header::InvalidHeaderValue,
727    },
728}
729
730pub fn validate_headers(
731    headers: &BTreeMap<String, String>,
732) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
733    let mut validated_headers = BTreeMap::new();
734    for (name, value) in headers {
735        let name = HeaderName::from_bytes(name.as_bytes())
736            .with_context(|_| InvalidHeaderNameSnafu { name })?;
737        let value = HeaderValue::from_bytes(value.as_bytes())
738            .with_context(|_| InvalidHeaderValueSnafu { value })?;
739
740        validated_headers.insert(name.into(), value);
741    }
742
743    Ok(validated_headers)
744}
745
746/// Request type for use in the `Service` implementation of HTTP stream sinks.
747#[derive(Debug, Clone)]
748pub struct HttpRequest<T: Send> {
749    payload: Bytes,
750    finalizers: EventFinalizers,
751    request_metadata: RequestMetadata,
752    additional_metadata: T,
753}
754
755impl<T: Send> HttpRequest<T> {
756    /// Creates a new `HttpRequest`.
757    pub const fn new(
758        payload: Bytes,
759        finalizers: EventFinalizers,
760        request_metadata: RequestMetadata,
761        additional_metadata: T,
762    ) -> Self {
763        Self {
764            payload,
765            finalizers,
766            request_metadata,
767            additional_metadata,
768        }
769    }
770
771    pub const fn get_additional_metadata(&self) -> &T {
772        &self.additional_metadata
773    }
774
775    pub fn take_payload(&mut self) -> Bytes {
776        std::mem::take(&mut self.payload)
777    }
778}
779
780impl<T: Send> Finalizable for HttpRequest<T> {
781    fn take_finalizers(&mut self) -> EventFinalizers {
782        self.finalizers.take_finalizers()
783    }
784}
785
786impl<T: Send> MetaDescriptive for HttpRequest<T> {
787    fn get_metadata(&self) -> &RequestMetadata {
788        &self.request_metadata
789    }
790
791    fn metadata_mut(&mut self) -> &mut RequestMetadata {
792        &mut self.request_metadata
793    }
794}
795
796impl<T: Send> ByteSizeOf for HttpRequest<T> {
797    fn allocated_bytes(&self) -> usize {
798        self.payload.allocated_bytes() + self.finalizers.allocated_bytes()
799    }
800}
801
802/// Response type for use in the `Service` implementation of HTTP stream sinks.
803pub struct HttpResponse {
804    pub http_response: Response<Bytes>,
805    pub events_byte_size: GroupedCountByteSize,
806    pub raw_byte_size: usize,
807}
808
809impl DriverResponse for HttpResponse {
810    fn event_status(&self) -> EventStatus {
811        if self.http_response.is_successful() {
812            EventStatus::Delivered
813        } else if self.http_response.is_transient() {
814            EventStatus::Errored
815        } else {
816            EventStatus::Rejected
817        }
818    }
819
820    fn events_sent(&self) -> &GroupedCountByteSize {
821        &self.events_byte_size
822    }
823
824    fn bytes_sent(&self) -> Option<usize> {
825        Some(self.raw_byte_size)
826    }
827}
828
829/// Creates a `RetryLogic` for use with `HttpResponse`.
830pub fn http_response_retry_logic<Request: Clone + Send + Sync + 'static>() -> HttpStatusRetryLogic<
831    impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static,
832    Request,
833    HttpResponse,
834> {
835    HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status())
836}
837
838/// Uses the estimated json encoded size to determine batch sizing.
839#[derive(Default)]
840pub struct HttpJsonBatchSizer;
841
842impl ItemBatchSize<Event> for HttpJsonBatchSizer {
843    fn size(&self, item: &Event) -> usize {
844        item.estimated_json_encoded_size_of().get()
845    }
846}
847
848/// HTTP request builder for HTTP stream sinks using the generic `HttpService`
849pub trait HttpServiceRequestBuilder<T: Send> {
850    fn build(&self, request: HttpRequest<T>) -> Result<Request<Bytes>, crate::Error>;
851}
852
853/// Generic 'Service' implementation for HTTP stream sinks.
854#[derive(Clone)]
855pub struct HttpService<B, T: Send> {
856    batch_service:
857        HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HttpRequest<T>>,
858    _phantom: PhantomData<B>,
859}
860
861impl<B, T: Send + 'static> HttpService<B, T>
862where
863    B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
864{
865    pub fn new(http_client: HttpClient<Body>, http_request_builder: B) -> Self {
866        let http_request_builder = Arc::new(http_request_builder);
867
868        let batch_service = HttpBatchService::new(http_client, move |req: HttpRequest<T>| {
869            let request_builder = Arc::clone(&http_request_builder);
870
871            let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
872                Box::pin(async move { request_builder.build(req) });
873
874            fut
875        });
876        Self {
877            batch_service,
878            _phantom: PhantomData,
879        }
880    }
881
882    #[cfg(feature = "aws-core")]
883    pub fn new_with_sig_v4(
884        http_client: HttpClient<Body>,
885        http_request_builder: B,
886        sig_v4_config: SigV4Config,
887    ) -> Self {
888        let http_request_builder = Arc::new(http_request_builder);
889
890        let batch_service = HttpBatchService::new_with_sig_v4(
891            http_client,
892            move |req: HttpRequest<T>| {
893                let request_builder = Arc::clone(&http_request_builder);
894
895                let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
896                    Box::pin(async move { request_builder.build(req) });
897
898                fut
899            },
900            sig_v4_config,
901        );
902        Self {
903            batch_service,
904            _phantom: PhantomData,
905        }
906    }
907}
908
909impl<B, T: Send + 'static> Service<HttpRequest<T>> for HttpService<B, T>
910where
911    B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
912{
913    type Response = HttpResponse;
914    type Error = crate::Error;
915    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
916
917    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
918        Poll::Ready(Ok(()))
919    }
920
921    fn call(&mut self, mut request: HttpRequest<T>) -> Self::Future {
922        let mut http_service = self.batch_service.clone();
923
924        // NOTE: By taking the metadata here, when passing the request to `call()` below,
925        //       that function does not have access to the metadata anymore.
926        let metadata = std::mem::take(request.metadata_mut());
927        let raw_byte_size = metadata.request_encoded_size();
928        let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
929
930        Box::pin(async move {
931            let http_response = http_service.call(request).await?;
932
933            Ok(HttpResponse {
934                http_response,
935                events_byte_size,
936                raw_byte_size,
937            })
938        })
939    }
940}
941
942#[cfg(test)]
943mod test {
944    #![allow(clippy::print_stderr)] //tests
945
946    use futures::{StreamExt, future::ready};
947    use hyper::{
948        Response, Server, Uri,
949        service::{make_service_fn, service_fn},
950    };
951
952    use super::*;
953    use crate::{config::ProxyConfig, test_util::addr::next_addr};
954
955    #[test]
956    fn util_http_retry_logic() {
957        let logic = HttpRetryLogic::<()>::default();
958
959        let response_408 = Response::builder().status(408).body(Bytes::new()).unwrap();
960        let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap();
961        let response_500 = Response::builder().status(500).body(Bytes::new()).unwrap();
962        let response_400 = Response::builder().status(400).body(Bytes::new()).unwrap();
963        let response_501 = Response::builder().status(501).body(Bytes::new()).unwrap();
964        assert!(logic.should_retry_response(&response_429).is_retryable());
965        assert!(logic.should_retry_response(&response_500).is_retryable());
966        assert!(logic.should_retry_response(&response_408).is_retryable());
967        assert!(
968            logic
969                .should_retry_response(&response_400)
970                .is_not_retryable()
971        );
972        assert!(
973            logic
974                .should_retry_response(&response_501)
975                .is_not_retryable()
976        );
977    }
978
979    #[tokio::test]
980    async fn util_http_it_makes_http_requests() {
981        let (_guard, addr) = next_addr();
982
983        let uri = format!("http://{}:{}/", addr.ip(), addr.port())
984            .parse::<Uri>()
985            .unwrap();
986
987        let request = Bytes::from("hello");
988        let proxy = ProxyConfig::default();
989        let client = HttpClient::new(None, &proxy).unwrap();
990        let mut service = HttpBatchService::new(client, move |body: Bytes| {
991            Box::pin(ready(
992                http::Request::post(&uri).body(body).map_err(Into::into),
993            ))
994        });
995
996        let (tx, rx) = futures::channel::mpsc::channel(10);
997
998        let new_service = make_service_fn(move |_| {
999            let tx = tx.clone();
1000
1001            let svc = service_fn(move |req: http::Request<Body>| {
1002                let mut tx = tx.clone();
1003
1004                async move {
1005                    let mut body = http_body::Body::collect(req.into_body())
1006                        .await
1007                        .map_err(|error| format!("error: {error}"))?
1008                        .aggregate();
1009                    let string = String::from_utf8(body.copy_to_bytes(body.remaining()).to_vec())
1010                        .map_err(|_| "Wasn't UTF-8".to_string())?;
1011                    tx.try_send(string).map_err(|_| "Send error".to_string())?;
1012
1013                    Ok::<_, crate::Error>(Response::new(Body::from("")))
1014                }
1015            });
1016
1017            async move { Ok::<_, std::convert::Infallible>(svc) }
1018        });
1019
1020        tokio::spawn(async move {
1021            if let Err(error) = Server::bind(&addr).serve(new_service).await {
1022                eprintln!("Server error: {error}");
1023            }
1024        });
1025
1026        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1027        service.call(request).await.unwrap();
1028
1029        let (body, _rest) = StreamExt::into_future(rx).await;
1030        assert_eq!(body.unwrap(), "hello");
1031    }
1032}