vector/sinks/util/
http.rs

1#[cfg(feature = "aws-core")]
2use aws_credential_types::provider::SharedCredentialsProvider;
3#[cfg(feature = "aws-core")]
4use aws_types::region::Region;
5use bytes::{Buf, Bytes};
6use futures::{future::BoxFuture, Sink};
7use headers::HeaderName;
8use http::{header, HeaderValue, Request, Response, StatusCode};
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub struct OrderedHeaderName(HeaderName);
12
13impl OrderedHeaderName {
14    pub const fn new(header_name: HeaderName) -> Self {
15        Self(header_name)
16    }
17
18    pub const fn inner(&self) -> &HeaderName {
19        &self.0
20    }
21}
22
23impl From<HeaderName> for OrderedHeaderName {
24    fn from(header_name: HeaderName) -> Self {
25        Self(header_name)
26    }
27}
28
29impl Ord for OrderedHeaderName {
30    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
31        self.0.as_str().cmp(other.0.as_str())
32    }
33}
34
35impl PartialOrd for OrderedHeaderName {
36    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
37        Some(self.cmp(other))
38    }
39}
40use hyper::{body, Body};
41use pin_project::pin_project;
42use snafu::{ResultExt, Snafu};
43use std::{
44    collections::BTreeMap,
45    fmt,
46    future::Future,
47    hash::Hash,
48    marker::PhantomData,
49    pin::Pin,
50    sync::Arc,
51    task::{ready, Context, Poll},
52    time::Duration,
53};
54use tower::{Service, ServiceBuilder};
55use tower_http::decompression::DecompressionLayer;
56use vector_lib::{
57    configurable::configurable_component, stream::batcher::limiter::ItemBatchSize, ByteSizeOf,
58    EstimatedJsonEncodedSizeOf,
59};
60
61use super::{
62    retries::{RetryAction, RetryLogic},
63    sink::{self, Response as _},
64    uri, Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
65    TowerRequestSettings,
66};
67
68#[cfg(feature = "aws-core")]
69use crate::aws::sign_request;
70
71use crate::{
72    event::Event,
73    http::{HttpClient, HttpError},
74    internal_events::{EndpointBytesSent, SinkRequestBuildError},
75    sinks::prelude::*,
76    template::Template,
77};
78
79pub trait HttpEventEncoder<Output> {
80    // The encoder handles internal event emission for Error and EventsDropped.
81    fn encode_event(&mut self, event: Event) -> Option<Output>;
82}
83
84pub trait HttpSink: Send + Sync + 'static {
85    type Input;
86    type Output;
87    type Encoder: HttpEventEncoder<Self::Input>;
88
89    fn build_encoder(&self) -> Self::Encoder;
90    fn build_request(
91        &self,
92        events: Self::Output,
93    ) -> impl Future<Output = crate::Result<http::Request<Bytes>>> + Send;
94}
95
96/// Provides a simple wrapper around internal tower and
97/// batching sinks for http.
98///
99/// This type wraps some `HttpSink` and some `Batch` type
100/// and will apply request, batch and tls settings. Internally,
101/// it holds an Arc reference to the `HttpSink`. It then exposes
102/// a `Sink` interface that can be returned from `SinkConfig`.
103///
104/// Implementation details we require to buffer a single item due
105/// to how `Sink` works. This is because we must "encode" the type
106/// to be able to send it to the inner batch type and sink. Because of
107/// this we must provide a single buffer slot. To ensure the buffer is
108/// fully flushed make sure `poll_flush` returns ready.
109///
110/// Note: This has been deprecated, please do not use when creating new Sinks.
111#[pin_project]
112pub struct BatchedHttpSink<T, B, RL = HttpRetryLogic<<B as Batch>::Output>>
113where
114    B: Batch,
115    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
116    T: HttpSink<Input = B::Input, Output = B::Output>,
117    RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
118        + Send
119        + 'static,
120{
121    sink: Arc<T>,
122    #[pin]
123    inner: TowerBatchedSink<
124        HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
125        B,
126        RL,
127    >,
128    encoder: T::Encoder,
129    // An empty slot is needed to buffer an item where we encoded it but
130    // the inner sink is applying back pressure. This trick is used in the `WithFlatMap`
131    // sink combinator. https://docs.rs/futures/0.1.29/src/futures/sink/with_flat_map.rs.html#20
132    slot: Option<EncodedEvent<B::Input>>,
133}
134
135impl<T, B> BatchedHttpSink<T, B>
136where
137    B: Batch,
138    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
139    T: HttpSink<Input = B::Input, Output = B::Output>,
140{
141    pub fn new(
142        sink: T,
143        batch: B,
144        request_settings: TowerRequestSettings,
145        batch_timeout: Duration,
146        client: HttpClient,
147    ) -> Self {
148        Self::with_logic(
149            sink,
150            batch,
151            HttpRetryLogic::default(),
152            request_settings,
153            batch_timeout,
154            client,
155        )
156    }
157}
158
159impl<T, B, RL> BatchedHttpSink<T, B, RL>
160where
161    B: Batch,
162    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
163    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
164        + Send
165        + 'static,
166    T: HttpSink<Input = B::Input, Output = B::Output>,
167{
168    pub fn with_logic(
169        sink: T,
170        batch: B,
171        retry_logic: RL,
172        request_settings: TowerRequestSettings,
173        batch_timeout: Duration,
174        client: HttpClient,
175    ) -> Self {
176        let sink = Arc::new(sink);
177
178        let sink1 = Arc::clone(&sink);
179        let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
180            let sink = Arc::clone(&sink1);
181            Box::pin(async move { sink.build_request(b).await })
182        };
183
184        let svc = HttpBatchService::new(client, request_builder);
185        let inner = request_settings.batch_sink(retry_logic, svc, batch, batch_timeout);
186        let encoder = sink.build_encoder();
187
188        Self {
189            sink,
190            inner,
191            encoder,
192            slot: None,
193        }
194    }
195}
196
197impl<T, B, RL> Sink<Event> for BatchedHttpSink<T, B, RL>
198where
199    B: Batch,
200    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
201    T: HttpSink<Input = B::Input, Output = B::Output>,
202    RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
203        + Send
204        + 'static,
205{
206    type Error = crate::Error;
207
208    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
209        if self.slot.is_some() {
210            match self.as_mut().poll_flush(cx) {
211                Poll::Ready(Ok(())) => {}
212                Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
213                Poll::Pending => {
214                    if self.slot.is_some() {
215                        return Poll::Pending;
216                    }
217                }
218            }
219        }
220
221        Poll::Ready(Ok(()))
222    }
223
224    fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
225        let byte_size = event.size_of();
226        let json_byte_size = event.estimated_json_encoded_size_of();
227        let finalizers = event.metadata_mut().take_finalizers();
228        if let Some(item) = self.encoder.encode_event(event) {
229            *self.project().slot = Some(EncodedEvent {
230                item,
231                finalizers,
232                byte_size,
233                json_byte_size,
234            });
235        }
236
237        Ok(())
238    }
239
240    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
241        let mut this = self.project();
242        if this.slot.is_some() {
243            ready!(this.inner.as_mut().poll_ready(cx))?;
244            this.inner.as_mut().start_send(this.slot.take().unwrap())?;
245        }
246
247        this.inner.poll_flush(cx)
248    }
249
250    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
251        ready!(self.as_mut().poll_flush(cx))?;
252        self.project().inner.poll_close(cx)
253    }
254}
255
256/// Note: This has been deprecated, please do not use when creating new Sinks.
257#[pin_project]
258pub struct PartitionHttpSink<T, B, K, RL = HttpRetryLogic<<B as Batch>::Output>>
259where
260    B: Batch,
261    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
262    B::Input: Partition<K>,
263    K: Hash + Eq + Clone + Send + 'static,
264    T: HttpSink<Input = B::Input, Output = B::Output>,
265    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
266{
267    sink: Arc<T>,
268    #[pin]
269    inner: TowerPartitionSink<
270        HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
271        B,
272        RL,
273        K,
274    >,
275    encoder: T::Encoder,
276    slot: Option<EncodedEvent<B::Input>>,
277}
278
279impl<T, B, K> PartitionHttpSink<T, B, K, HttpRetryLogic<<B as Batch>::Output>>
280where
281    B: Batch,
282    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
283    B::Input: Partition<K>,
284    K: Hash + Eq + Clone + Send + 'static,
285    T: HttpSink<Input = B::Input, Output = B::Output>,
286{
287    pub fn new(
288        sink: T,
289        batch: B,
290        request_settings: TowerRequestSettings,
291        batch_timeout: Duration,
292        client: HttpClient,
293    ) -> Self {
294        Self::with_retry_logic(
295            sink,
296            batch,
297            HttpRetryLogic::default(),
298            request_settings,
299            batch_timeout,
300            client,
301        )
302    }
303}
304
305impl<T, B, K, RL> PartitionHttpSink<T, B, K, RL>
306where
307    B: Batch,
308    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
309    B::Input: Partition<K>,
310    K: Hash + Eq + Clone + Send + 'static,
311    T: HttpSink<Input = B::Input, Output = B::Output>,
312    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
313        + Send
314        + 'static,
315{
316    pub fn with_retry_logic(
317        sink: T,
318        batch: B,
319        retry_logic: RL,
320        request_settings: TowerRequestSettings,
321        batch_timeout: Duration,
322        client: HttpClient,
323    ) -> Self {
324        let sink = Arc::new(sink);
325
326        let sink1 = Arc::clone(&sink);
327        let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
328            let sink = Arc::clone(&sink1);
329            Box::pin(async move { sink.build_request(b).await })
330        };
331
332        let svc = HttpBatchService::new(client, request_builder);
333        let inner = request_settings.partition_sink(retry_logic, svc, batch, batch_timeout);
334        let encoder = sink.build_encoder();
335
336        Self {
337            sink,
338            inner,
339            encoder,
340            slot: None,
341        }
342    }
343
344    /// Enforces per partition ordering of request.
345    pub fn ordered(mut self) -> Self {
346        self.inner.ordered();
347        self
348    }
349}
350
351impl<T, B, K, RL> Sink<Event> for PartitionHttpSink<T, B, K, RL>
352where
353    B: Batch,
354    B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
355    B::Input: Partition<K>,
356    K: Hash + Eq + Clone + Send + 'static,
357    T: HttpSink<Input = B::Input, Output = B::Output>,
358    RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
359{
360    type Error = crate::Error;
361
362    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
363        if self.slot.is_some() {
364            match self.as_mut().poll_flush(cx) {
365                Poll::Ready(Ok(())) => {}
366                Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
367                Poll::Pending => {
368                    if self.slot.is_some() {
369                        return Poll::Pending;
370                    }
371                }
372            }
373        }
374
375        Poll::Ready(Ok(()))
376    }
377
378    fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
379        let finalizers = event.metadata_mut().take_finalizers();
380        let byte_size = event.size_of();
381        let json_byte_size = event.estimated_json_encoded_size_of();
382
383        if let Some(item) = self.encoder.encode_event(event) {
384            *self.project().slot = Some(EncodedEvent {
385                item,
386                finalizers,
387                byte_size,
388                json_byte_size,
389            });
390        }
391
392        Ok(())
393    }
394
395    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
396        let mut this = self.project();
397        if this.slot.is_some() {
398            ready!(this.inner.as_mut().poll_ready(cx))?;
399            this.inner.as_mut().start_send(this.slot.take().unwrap())?;
400        }
401
402        this.inner.poll_flush(cx)
403    }
404
405    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
406        ready!(self.as_mut().poll_flush(cx))?;
407        self.project().inner.poll_close(cx)
408    }
409}
410
411#[cfg(feature = "aws-core")]
412#[derive(Clone)]
413pub struct SigV4Config {
414    pub(crate) shared_credentials_provider: SharedCredentialsProvider,
415    pub(crate) region: Region,
416    pub(crate) service: String,
417}
418
419/// @struct HttpBatchService
420///
421/// NOTE: This has been deprecated, please do not use directly when creating new sinks.
422///       The `HttpService` currently wraps this structure. Eventually all sinks currently using the
423///       HttpBatchService directly should be updated to use `HttpService`. At which time we can
424///       remove this struct and inline the functionality into the `HttpService` directly.
425pub struct HttpBatchService<F, B = Bytes> {
426    inner: HttpClient<Body>,
427    request_builder: Arc<dyn Fn(B) -> F + Send + Sync>,
428    #[cfg(feature = "aws-core")]
429    sig_v4_config: Option<SigV4Config>,
430}
431
432impl<F, B> HttpBatchService<F, B> {
433    pub fn new(
434        inner: HttpClient,
435        request_builder: impl Fn(B) -> F + Send + Sync + 'static,
436    ) -> Self {
437        HttpBatchService {
438            inner,
439            request_builder: Arc::new(Box::new(request_builder)),
440            #[cfg(feature = "aws-core")]
441            sig_v4_config: None,
442        }
443    }
444
445    #[cfg(feature = "aws-core")]
446    pub fn new_with_sig_v4(
447        inner: HttpClient,
448        request_builder: impl Fn(B) -> F + Send + Sync + 'static,
449        sig_v4_config: SigV4Config,
450    ) -> Self {
451        HttpBatchService {
452            inner,
453            request_builder: Arc::new(Box::new(request_builder)),
454            sig_v4_config: Some(sig_v4_config),
455        }
456    }
457}
458
459impl<F, B> Service<B> for HttpBatchService<F, B>
460where
461    F: Future<Output = crate::Result<hyper::Request<Bytes>>> + Send + 'static,
462    B: ByteSizeOf + Send + 'static,
463{
464    type Response = http::Response<Bytes>;
465    type Error = crate::Error;
466    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
467
468    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
469        Poll::Ready(Ok(()))
470    }
471
472    fn call(&mut self, body: B) -> Self::Future {
473        let request_builder = Arc::clone(&self.request_builder);
474        #[cfg(feature = "aws-core")]
475        let sig_v4_config = self.sig_v4_config.clone();
476        let http_client = self.inner.clone();
477
478        Box::pin(async move {
479            let request = request_builder(body).await.inspect_err(|error| {
480                emit!(SinkRequestBuildError { error });
481            })?;
482
483            #[cfg(feature = "aws-core")]
484            let request = match sig_v4_config {
485                None => request,
486                Some(sig_v4_config) => {
487                    let mut signed_request = request;
488                    sign_request(
489                        sig_v4_config.service.as_str(),
490                        &mut signed_request,
491                        &sig_v4_config.shared_credentials_provider,
492                        Some(&sig_v4_config.region),
493                        false,
494                    )
495                    .await?;
496
497                    signed_request
498                }
499            };
500            let byte_size = request.body().len();
501            let request = request.map(Body::from);
502            let (protocol, endpoint) = uri::protocol_endpoint(request.uri().clone());
503
504            let mut decompression_service = ServiceBuilder::new()
505                .layer(DecompressionLayer::new())
506                .service(http_client);
507
508            // Any errors raised in `http_client.call` results in a `GotHttpWarning` event being emitted
509            // in `HttpClient::send`. This does not result in incrementing `component_errors_total` however,
510            // because that is incremented by the driver when retries have been exhausted.
511            let response = decompression_service.call(request).await?;
512
513            if response.status().is_success() {
514                emit!(EndpointBytesSent {
515                    byte_size,
516                    protocol: &protocol,
517                    endpoint: &endpoint
518                });
519            }
520
521            let (parts, body) = response.into_parts();
522            let mut body = body::aggregate(body).await?;
523            Ok(hyper::Response::from_parts(
524                parts,
525                body.copy_to_bytes(body.remaining()),
526            ))
527        })
528    }
529}
530
531impl<F, B> Clone for HttpBatchService<F, B> {
532    fn clone(&self) -> Self {
533        Self {
534            inner: self.inner.clone(),
535            request_builder: Arc::clone(&self.request_builder),
536            #[cfg(feature = "aws-core")]
537            sig_v4_config: self.sig_v4_config.clone(),
538        }
539    }
540}
541
542impl<T: fmt::Debug> sink::Response for http::Response<T> {
543    fn is_successful(&self) -> bool {
544        self.status().is_success()
545    }
546
547    fn is_transient(&self) -> bool {
548        self.status().is_server_error()
549    }
550}
551
552#[derive(Debug, Clone)]
553pub struct HttpRetryLogic<Req> {
554    request: PhantomData<Req>,
555}
556impl<Req> Default for HttpRetryLogic<Req> {
557    fn default() -> Self {
558        Self {
559            request: PhantomData,
560        }
561    }
562}
563
564impl<Req: Clone + Send + Sync + 'static> RetryLogic for HttpRetryLogic<Req> {
565    type Error = HttpError;
566    type Request = Req;
567    type Response = hyper::Response<Bytes>;
568
569    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
570        true
571    }
572
573    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
574        let status = response.status();
575
576        match status {
577            StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
578            StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
579            StatusCode::NOT_IMPLEMENTED => {
580                RetryAction::DontRetry("endpoint not implemented".into())
581            }
582            _ if status.is_server_error() => RetryAction::Retry(
583                format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(),
584            ),
585            _ if status.is_success() => RetryAction::Successful,
586            _ => RetryAction::DontRetry(format!("response status: {status}").into()),
587        }
588    }
589}
590
591/// A more generic version of `HttpRetryLogic` that accepts anything that can be converted
592/// to a status code
593#[derive(Debug)]
594pub struct HttpStatusRetryLogic<F, Req, Res> {
595    func: F,
596    request: PhantomData<Req>,
597    response: PhantomData<Res>,
598}
599
600impl<F, Req, Res> HttpStatusRetryLogic<F, Req, Res>
601where
602    F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
603    Req: Send + Sync + 'static,
604    Res: Send + Sync + 'static,
605{
606    pub const fn new(func: F) -> HttpStatusRetryLogic<F, Req, Res> {
607        HttpStatusRetryLogic {
608            func,
609            request: PhantomData,
610            response: PhantomData,
611        }
612    }
613}
614
615impl<F, Req, Res> RetryLogic for HttpStatusRetryLogic<F, Req, Res>
616where
617    F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
618    Req: Send + Sync + 'static,
619    Res: Send + Sync + 'static,
620{
621    type Error = HttpError;
622    type Request = Req;
623    type Response = Res;
624
625    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
626        true
627    }
628
629    fn should_retry_response(&self, response: &Res) -> RetryAction<Req> {
630        let status = (self.func)(response);
631
632        match status {
633            StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
634            StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
635            StatusCode::NOT_IMPLEMENTED => {
636                RetryAction::DontRetry("endpoint not implemented".into())
637            }
638            _ if status.is_server_error() => {
639                RetryAction::Retry(format!("Http Status: {status}").into())
640            }
641            _ if status.is_success() => RetryAction::Successful,
642            _ => RetryAction::DontRetry(format!("Http status: {status}").into()),
643        }
644    }
645}
646
647impl<F, Req, Res> Clone for HttpStatusRetryLogic<F, Req, Res>
648where
649    F: Clone,
650{
651    fn clone(&self) -> Self {
652        Self {
653            func: self.func.clone(),
654            request: PhantomData,
655            response: PhantomData,
656        }
657    }
658}
659
660/// Outbound HTTP request settings.
661#[configurable_component]
662#[derive(Clone, Debug, Default)]
663pub struct RequestConfig {
664    #[serde(flatten)]
665    pub tower: TowerRequestConfig,
666
667    /// Additional HTTP headers to add to every HTTP request.
668    #[serde(default)]
669    #[configurable(metadata(
670        docs::additional_props_description = "An HTTP request header and its value. Both header names and values support templating with event data."
671    ))]
672    #[configurable(metadata(docs::examples = "headers_examples()"))]
673    pub headers: BTreeMap<String, String>,
674}
675
676fn headers_examples() -> BTreeMap<String, String> {
677    btreemap! {
678        "Accept" => "text/plain",
679        "X-My-Custom-Header" => "A-Value",
680        "X-Event-Level" => "{{level}}",
681        "X-Event-Timestamp" => "{{timestamp}}",
682    }
683}
684
685impl RequestConfig {
686    pub fn add_old_option(&mut self, headers: Option<BTreeMap<String, String>>) {
687        if let Some(headers) = headers {
688            warn!("Option `headers` has been deprecated. Use `request.headers` instead.");
689            self.headers.extend(headers);
690        }
691    }
692
693    pub fn split_headers(&self) -> (BTreeMap<String, String>, BTreeMap<String, Template>) {
694        let mut static_headers = BTreeMap::new();
695        let mut template_headers = BTreeMap::new();
696
697        for (name, value) in &self.headers {
698            match Template::try_from(value.as_str()) {
699                Ok(template) if !template.is_dynamic() => {
700                    static_headers.insert(name.clone(), value.clone());
701                }
702                Ok(template) => {
703                    template_headers.insert(name.clone(), template);
704                }
705                Err(_) => {
706                    static_headers.insert(name.clone(), value.clone());
707                }
708            }
709        }
710
711        (static_headers, template_headers)
712    }
713}
714
715#[derive(Debug, Snafu)]
716pub enum HeaderValidationError {
717    #[snafu(display("{}: {}", source, name))]
718    InvalidHeaderName {
719        name: String,
720        source: header::InvalidHeaderName,
721    },
722    #[snafu(display("{}: {}", source, value))]
723    InvalidHeaderValue {
724        value: String,
725        source: header::InvalidHeaderValue,
726    },
727}
728
729pub fn validate_headers(
730    headers: &BTreeMap<String, String>,
731) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
732    let mut validated_headers = BTreeMap::new();
733    for (name, value) in headers {
734        let name = HeaderName::from_bytes(name.as_bytes())
735            .with_context(|_| InvalidHeaderNameSnafu { name })?;
736        let value = HeaderValue::from_bytes(value.as_bytes())
737            .with_context(|_| InvalidHeaderValueSnafu { value })?;
738
739        validated_headers.insert(name.into(), value);
740    }
741
742    Ok(validated_headers)
743}
744
745/// Request type for use in the `Service` implementation of HTTP stream sinks.
746#[derive(Debug, Clone)]
747pub struct HttpRequest<T: Send> {
748    payload: Bytes,
749    finalizers: EventFinalizers,
750    request_metadata: RequestMetadata,
751    additional_metadata: T,
752}
753
754impl<T: Send> HttpRequest<T> {
755    /// Creates a new `HttpRequest`.
756    pub const fn new(
757        payload: Bytes,
758        finalizers: EventFinalizers,
759        request_metadata: RequestMetadata,
760        additional_metadata: T,
761    ) -> Self {
762        Self {
763            payload,
764            finalizers,
765            request_metadata,
766            additional_metadata,
767        }
768    }
769
770    pub const fn get_additional_metadata(&self) -> &T {
771        &self.additional_metadata
772    }
773
774    pub fn take_payload(&mut self) -> Bytes {
775        std::mem::take(&mut self.payload)
776    }
777}
778
779impl<T: Send> Finalizable for HttpRequest<T> {
780    fn take_finalizers(&mut self) -> EventFinalizers {
781        self.finalizers.take_finalizers()
782    }
783}
784
785impl<T: Send> MetaDescriptive for HttpRequest<T> {
786    fn get_metadata(&self) -> &RequestMetadata {
787        &self.request_metadata
788    }
789
790    fn metadata_mut(&mut self) -> &mut RequestMetadata {
791        &mut self.request_metadata
792    }
793}
794
795impl<T: Send> ByteSizeOf for HttpRequest<T> {
796    fn allocated_bytes(&self) -> usize {
797        self.payload.allocated_bytes() + self.finalizers.allocated_bytes()
798    }
799}
800
801/// Response type for use in the `Service` implementation of HTTP stream sinks.
802pub struct HttpResponse {
803    pub http_response: Response<Bytes>,
804    pub events_byte_size: GroupedCountByteSize,
805    pub raw_byte_size: usize,
806}
807
808impl DriverResponse for HttpResponse {
809    fn event_status(&self) -> EventStatus {
810        if self.http_response.is_successful() {
811            EventStatus::Delivered
812        } else if self.http_response.is_transient() {
813            EventStatus::Errored
814        } else {
815            EventStatus::Rejected
816        }
817    }
818
819    fn events_sent(&self) -> &GroupedCountByteSize {
820        &self.events_byte_size
821    }
822
823    fn bytes_sent(&self) -> Option<usize> {
824        Some(self.raw_byte_size)
825    }
826}
827
828/// Creates a `RetryLogic` for use with `HttpResponse`.
829pub fn http_response_retry_logic<Request: Clone + Send + Sync + 'static>() -> HttpStatusRetryLogic<
830    impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static,
831    Request,
832    HttpResponse,
833> {
834    HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status())
835}
836
837/// Uses the estimated json encoded size to determine batch sizing.
838#[derive(Default)]
839pub struct HttpJsonBatchSizer;
840
841impl ItemBatchSize<Event> for HttpJsonBatchSizer {
842    fn size(&self, item: &Event) -> usize {
843        item.estimated_json_encoded_size_of().get()
844    }
845}
846
847/// HTTP request builder for HTTP stream sinks using the generic `HttpService`
848pub trait HttpServiceRequestBuilder<T: Send> {
849    fn build(&self, request: HttpRequest<T>) -> Result<Request<Bytes>, crate::Error>;
850}
851
852/// Generic 'Service' implementation for HTTP stream sinks.
853#[derive(Clone)]
854pub struct HttpService<B, T: Send> {
855    batch_service:
856        HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HttpRequest<T>>,
857    _phantom: PhantomData<B>,
858}
859
860impl<B, T: Send + 'static> HttpService<B, T>
861where
862    B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
863{
864    pub fn new(http_client: HttpClient<Body>, http_request_builder: B) -> Self {
865        let http_request_builder = Arc::new(http_request_builder);
866
867        let batch_service = HttpBatchService::new(http_client, move |req: HttpRequest<T>| {
868            let request_builder = Arc::clone(&http_request_builder);
869
870            let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
871                Box::pin(async move { request_builder.build(req) });
872
873            fut
874        });
875        Self {
876            batch_service,
877            _phantom: PhantomData,
878        }
879    }
880
881    #[cfg(feature = "aws-core")]
882    pub fn new_with_sig_v4(
883        http_client: HttpClient<Body>,
884        http_request_builder: B,
885        sig_v4_config: SigV4Config,
886    ) -> Self {
887        let http_request_builder = Arc::new(http_request_builder);
888
889        let batch_service = HttpBatchService::new_with_sig_v4(
890            http_client,
891            move |req: HttpRequest<T>| {
892                let request_builder = Arc::clone(&http_request_builder);
893
894                let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
895                    Box::pin(async move { request_builder.build(req) });
896
897                fut
898            },
899            sig_v4_config,
900        );
901        Self {
902            batch_service,
903            _phantom: PhantomData,
904        }
905    }
906}
907
908impl<B, T: Send + 'static> Service<HttpRequest<T>> for HttpService<B, T>
909where
910    B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
911{
912    type Response = HttpResponse;
913    type Error = crate::Error;
914    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
915
916    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
917        Poll::Ready(Ok(()))
918    }
919
920    fn call(&mut self, mut request: HttpRequest<T>) -> Self::Future {
921        let mut http_service = self.batch_service.clone();
922
923        // NOTE: By taking the metadata here, when passing the request to `call()` below,
924        //       that function does not have access to the metadata anymore.
925        let metadata = std::mem::take(request.metadata_mut());
926        let raw_byte_size = metadata.request_encoded_size();
927        let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
928
929        Box::pin(async move {
930            let http_response = http_service.call(request).await?;
931
932            Ok(HttpResponse {
933                http_response,
934                events_byte_size,
935                raw_byte_size,
936            })
937        })
938    }
939}
940
941#[cfg(test)]
942mod test {
943    #![allow(clippy::print_stderr)] //tests
944
945    use futures::{future::ready, StreamExt};
946    use hyper::{
947        service::{make_service_fn, service_fn},
948        Response, Server, Uri,
949    };
950
951    use super::*;
952    use crate::{config::ProxyConfig, test_util::next_addr};
953
954    #[test]
955    fn util_http_retry_logic() {
956        let logic = HttpRetryLogic::<()>::default();
957
958        let response_408 = Response::builder().status(408).body(Bytes::new()).unwrap();
959        let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap();
960        let response_500 = Response::builder().status(500).body(Bytes::new()).unwrap();
961        let response_400 = Response::builder().status(400).body(Bytes::new()).unwrap();
962        let response_501 = Response::builder().status(501).body(Bytes::new()).unwrap();
963        assert!(logic.should_retry_response(&response_429).is_retryable());
964        assert!(logic.should_retry_response(&response_500).is_retryable());
965        assert!(logic.should_retry_response(&response_408).is_retryable());
966        assert!(logic
967            .should_retry_response(&response_400)
968            .is_not_retryable());
969        assert!(logic
970            .should_retry_response(&response_501)
971            .is_not_retryable());
972    }
973
974    #[tokio::test]
975    async fn util_http_it_makes_http_requests() {
976        let addr = next_addr();
977
978        let uri = format!("http://{}:{}/", addr.ip(), addr.port())
979            .parse::<Uri>()
980            .unwrap();
981
982        let request = Bytes::from("hello");
983        let proxy = ProxyConfig::default();
984        let client = HttpClient::new(None, &proxy).unwrap();
985        let mut service = HttpBatchService::new(client, move |body: Bytes| {
986            Box::pin(ready(
987                http::Request::post(&uri).body(body).map_err(Into::into),
988            ))
989        });
990
991        let (tx, rx) = futures::channel::mpsc::channel(10);
992
993        let new_service = make_service_fn(move |_| {
994            let tx = tx.clone();
995
996            let svc = service_fn(move |req| {
997                let mut tx = tx.clone();
998
999                async move {
1000                    let mut body = hyper::body::aggregate(req.into_body())
1001                        .await
1002                        .map_err(|error| format!("error: {error}"))?;
1003                    let string = String::from_utf8(body.copy_to_bytes(body.remaining()).to_vec())
1004                        .map_err(|_| "Wasn't UTF-8".to_string())?;
1005                    tx.try_send(string).map_err(|_| "Send error".to_string())?;
1006
1007                    Ok::<_, crate::Error>(Response::new(Body::from("")))
1008                }
1009            });
1010
1011            async move { Ok::<_, std::convert::Infallible>(svc) }
1012        });
1013
1014        tokio::spawn(async move {
1015            if let Err(error) = Server::bind(&addr).serve(new_service).await {
1016                eprintln!("Server error: {error}");
1017            }
1018        });
1019
1020        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1021        service.call(request).await.unwrap();
1022
1023        let (body, _rest) = StreamExt::into_future(rx).await;
1024        assert_eq!(body.unwrap(), "hello");
1025    }
1026}