1#[cfg(feature = "aws-core")]
2use aws_credential_types::provider::SharedCredentialsProvider;
3#[cfg(feature = "aws-core")]
4use aws_types::region::Region;
5use bytes::{Buf, Bytes};
6use futures::{future::BoxFuture, Sink};
7use headers::HeaderName;
8use http::{header, HeaderValue, Request, Response, StatusCode};
9
10#[derive(Debug, Clone, PartialEq, Eq, Hash)]
11pub struct OrderedHeaderName(HeaderName);
12
13impl OrderedHeaderName {
14 pub const fn new(header_name: HeaderName) -> Self {
15 Self(header_name)
16 }
17
18 pub const fn inner(&self) -> &HeaderName {
19 &self.0
20 }
21}
22
23impl From<HeaderName> for OrderedHeaderName {
24 fn from(header_name: HeaderName) -> Self {
25 Self(header_name)
26 }
27}
28
29impl Ord for OrderedHeaderName {
30 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
31 self.0.as_str().cmp(other.0.as_str())
32 }
33}
34
35impl PartialOrd for OrderedHeaderName {
36 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
37 Some(self.cmp(other))
38 }
39}
40use hyper::{body, Body};
41use pin_project::pin_project;
42use snafu::{ResultExt, Snafu};
43use std::{
44 collections::BTreeMap,
45 fmt,
46 future::Future,
47 hash::Hash,
48 marker::PhantomData,
49 pin::Pin,
50 sync::Arc,
51 task::{ready, Context, Poll},
52 time::Duration,
53};
54use tower::{Service, ServiceBuilder};
55use tower_http::decompression::DecompressionLayer;
56use vector_lib::{
57 configurable::configurable_component, stream::batcher::limiter::ItemBatchSize, ByteSizeOf,
58 EstimatedJsonEncodedSizeOf,
59};
60
61use super::{
62 retries::{RetryAction, RetryLogic},
63 sink::{self, Response as _},
64 uri, Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
65 TowerRequestSettings,
66};
67
68#[cfg(feature = "aws-core")]
69use crate::aws::sign_request;
70
71use crate::{
72 event::Event,
73 http::{HttpClient, HttpError},
74 internal_events::{EndpointBytesSent, SinkRequestBuildError},
75 sinks::prelude::*,
76 template::Template,
77};
78
79pub trait HttpEventEncoder<Output> {
80 fn encode_event(&mut self, event: Event) -> Option<Output>;
82}
83
84pub trait HttpSink: Send + Sync + 'static {
85 type Input;
86 type Output;
87 type Encoder: HttpEventEncoder<Self::Input>;
88
89 fn build_encoder(&self) -> Self::Encoder;
90 fn build_request(
91 &self,
92 events: Self::Output,
93 ) -> impl Future<Output = crate::Result<http::Request<Bytes>>> + Send;
94}
95
96#[pin_project]
112pub struct BatchedHttpSink<T, B, RL = HttpRetryLogic<<B as Batch>::Output>>
113where
114 B: Batch,
115 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
116 T: HttpSink<Input = B::Input, Output = B::Output>,
117 RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
118 + Send
119 + 'static,
120{
121 sink: Arc<T>,
122 #[pin]
123 inner: TowerBatchedSink<
124 HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
125 B,
126 RL,
127 >,
128 encoder: T::Encoder,
129 slot: Option<EncodedEvent<B::Input>>,
133}
134
135impl<T, B> BatchedHttpSink<T, B>
136where
137 B: Batch,
138 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
139 T: HttpSink<Input = B::Input, Output = B::Output>,
140{
141 pub fn new(
142 sink: T,
143 batch: B,
144 request_settings: TowerRequestSettings,
145 batch_timeout: Duration,
146 client: HttpClient,
147 ) -> Self {
148 Self::with_logic(
149 sink,
150 batch,
151 HttpRetryLogic::default(),
152 request_settings,
153 batch_timeout,
154 client,
155 )
156 }
157}
158
159impl<T, B, RL> BatchedHttpSink<T, B, RL>
160where
161 B: Batch,
162 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
163 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
164 + Send
165 + 'static,
166 T: HttpSink<Input = B::Input, Output = B::Output>,
167{
168 pub fn with_logic(
169 sink: T,
170 batch: B,
171 retry_logic: RL,
172 request_settings: TowerRequestSettings,
173 batch_timeout: Duration,
174 client: HttpClient,
175 ) -> Self {
176 let sink = Arc::new(sink);
177
178 let sink1 = Arc::clone(&sink);
179 let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
180 let sink = Arc::clone(&sink1);
181 Box::pin(async move { sink.build_request(b).await })
182 };
183
184 let svc = HttpBatchService::new(client, request_builder);
185 let inner = request_settings.batch_sink(retry_logic, svc, batch, batch_timeout);
186 let encoder = sink.build_encoder();
187
188 Self {
189 sink,
190 inner,
191 encoder,
192 slot: None,
193 }
194 }
195}
196
197impl<T, B, RL> Sink<Event> for BatchedHttpSink<T, B, RL>
198where
199 B: Batch,
200 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
201 T: HttpSink<Input = B::Input, Output = B::Output>,
202 RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
203 + Send
204 + 'static,
205{
206 type Error = crate::Error;
207
208 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
209 if self.slot.is_some() {
210 match self.as_mut().poll_flush(cx) {
211 Poll::Ready(Ok(())) => {}
212 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
213 Poll::Pending => {
214 if self.slot.is_some() {
215 return Poll::Pending;
216 }
217 }
218 }
219 }
220
221 Poll::Ready(Ok(()))
222 }
223
224 fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
225 let byte_size = event.size_of();
226 let json_byte_size = event.estimated_json_encoded_size_of();
227 let finalizers = event.metadata_mut().take_finalizers();
228 if let Some(item) = self.encoder.encode_event(event) {
229 *self.project().slot = Some(EncodedEvent {
230 item,
231 finalizers,
232 byte_size,
233 json_byte_size,
234 });
235 }
236
237 Ok(())
238 }
239
240 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
241 let mut this = self.project();
242 if this.slot.is_some() {
243 ready!(this.inner.as_mut().poll_ready(cx))?;
244 this.inner.as_mut().start_send(this.slot.take().unwrap())?;
245 }
246
247 this.inner.poll_flush(cx)
248 }
249
250 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
251 ready!(self.as_mut().poll_flush(cx))?;
252 self.project().inner.poll_close(cx)
253 }
254}
255
256#[pin_project]
258pub struct PartitionHttpSink<T, B, K, RL = HttpRetryLogic<<B as Batch>::Output>>
259where
260 B: Batch,
261 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
262 B::Input: Partition<K>,
263 K: Hash + Eq + Clone + Send + 'static,
264 T: HttpSink<Input = B::Input, Output = B::Output>,
265 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
266{
267 sink: Arc<T>,
268 #[pin]
269 inner: TowerPartitionSink<
270 HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
271 B,
272 RL,
273 K,
274 >,
275 encoder: T::Encoder,
276 slot: Option<EncodedEvent<B::Input>>,
277}
278
279impl<T, B, K> PartitionHttpSink<T, B, K, HttpRetryLogic<<B as Batch>::Output>>
280where
281 B: Batch,
282 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
283 B::Input: Partition<K>,
284 K: Hash + Eq + Clone + Send + 'static,
285 T: HttpSink<Input = B::Input, Output = B::Output>,
286{
287 pub fn new(
288 sink: T,
289 batch: B,
290 request_settings: TowerRequestSettings,
291 batch_timeout: Duration,
292 client: HttpClient,
293 ) -> Self {
294 Self::with_retry_logic(
295 sink,
296 batch,
297 HttpRetryLogic::default(),
298 request_settings,
299 batch_timeout,
300 client,
301 )
302 }
303}
304
305impl<T, B, K, RL> PartitionHttpSink<T, B, K, RL>
306where
307 B: Batch,
308 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
309 B::Input: Partition<K>,
310 K: Hash + Eq + Clone + Send + 'static,
311 T: HttpSink<Input = B::Input, Output = B::Output>,
312 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
313 + Send
314 + 'static,
315{
316 pub fn with_retry_logic(
317 sink: T,
318 batch: B,
319 retry_logic: RL,
320 request_settings: TowerRequestSettings,
321 batch_timeout: Duration,
322 client: HttpClient,
323 ) -> Self {
324 let sink = Arc::new(sink);
325
326 let sink1 = Arc::clone(&sink);
327 let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
328 let sink = Arc::clone(&sink1);
329 Box::pin(async move { sink.build_request(b).await })
330 };
331
332 let svc = HttpBatchService::new(client, request_builder);
333 let inner = request_settings.partition_sink(retry_logic, svc, batch, batch_timeout);
334 let encoder = sink.build_encoder();
335
336 Self {
337 sink,
338 inner,
339 encoder,
340 slot: None,
341 }
342 }
343
344 pub fn ordered(mut self) -> Self {
346 self.inner.ordered();
347 self
348 }
349}
350
351impl<T, B, K, RL> Sink<Event> for PartitionHttpSink<T, B, K, RL>
352where
353 B: Batch,
354 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
355 B::Input: Partition<K>,
356 K: Hash + Eq + Clone + Send + 'static,
357 T: HttpSink<Input = B::Input, Output = B::Output>,
358 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
359{
360 type Error = crate::Error;
361
362 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
363 if self.slot.is_some() {
364 match self.as_mut().poll_flush(cx) {
365 Poll::Ready(Ok(())) => {}
366 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
367 Poll::Pending => {
368 if self.slot.is_some() {
369 return Poll::Pending;
370 }
371 }
372 }
373 }
374
375 Poll::Ready(Ok(()))
376 }
377
378 fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
379 let finalizers = event.metadata_mut().take_finalizers();
380 let byte_size = event.size_of();
381 let json_byte_size = event.estimated_json_encoded_size_of();
382
383 if let Some(item) = self.encoder.encode_event(event) {
384 *self.project().slot = Some(EncodedEvent {
385 item,
386 finalizers,
387 byte_size,
388 json_byte_size,
389 });
390 }
391
392 Ok(())
393 }
394
395 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
396 let mut this = self.project();
397 if this.slot.is_some() {
398 ready!(this.inner.as_mut().poll_ready(cx))?;
399 this.inner.as_mut().start_send(this.slot.take().unwrap())?;
400 }
401
402 this.inner.poll_flush(cx)
403 }
404
405 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
406 ready!(self.as_mut().poll_flush(cx))?;
407 self.project().inner.poll_close(cx)
408 }
409}
410
411#[cfg(feature = "aws-core")]
412#[derive(Clone)]
413pub struct SigV4Config {
414 pub(crate) shared_credentials_provider: SharedCredentialsProvider,
415 pub(crate) region: Region,
416 pub(crate) service: String,
417}
418
419pub struct HttpBatchService<F, B = Bytes> {
426 inner: HttpClient<Body>,
427 request_builder: Arc<dyn Fn(B) -> F + Send + Sync>,
428 #[cfg(feature = "aws-core")]
429 sig_v4_config: Option<SigV4Config>,
430}
431
432impl<F, B> HttpBatchService<F, B> {
433 pub fn new(
434 inner: HttpClient,
435 request_builder: impl Fn(B) -> F + Send + Sync + 'static,
436 ) -> Self {
437 HttpBatchService {
438 inner,
439 request_builder: Arc::new(Box::new(request_builder)),
440 #[cfg(feature = "aws-core")]
441 sig_v4_config: None,
442 }
443 }
444
445 #[cfg(feature = "aws-core")]
446 pub fn new_with_sig_v4(
447 inner: HttpClient,
448 request_builder: impl Fn(B) -> F + Send + Sync + 'static,
449 sig_v4_config: SigV4Config,
450 ) -> Self {
451 HttpBatchService {
452 inner,
453 request_builder: Arc::new(Box::new(request_builder)),
454 sig_v4_config: Some(sig_v4_config),
455 }
456 }
457}
458
459impl<F, B> Service<B> for HttpBatchService<F, B>
460where
461 F: Future<Output = crate::Result<hyper::Request<Bytes>>> + Send + 'static,
462 B: ByteSizeOf + Send + 'static,
463{
464 type Response = http::Response<Bytes>;
465 type Error = crate::Error;
466 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
467
468 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
469 Poll::Ready(Ok(()))
470 }
471
472 fn call(&mut self, body: B) -> Self::Future {
473 let request_builder = Arc::clone(&self.request_builder);
474 #[cfg(feature = "aws-core")]
475 let sig_v4_config = self.sig_v4_config.clone();
476 let http_client = self.inner.clone();
477
478 Box::pin(async move {
479 let request = request_builder(body).await.inspect_err(|error| {
480 emit!(SinkRequestBuildError { error });
481 })?;
482
483 #[cfg(feature = "aws-core")]
484 let request = match sig_v4_config {
485 None => request,
486 Some(sig_v4_config) => {
487 let mut signed_request = request;
488 sign_request(
489 sig_v4_config.service.as_str(),
490 &mut signed_request,
491 &sig_v4_config.shared_credentials_provider,
492 Some(&sig_v4_config.region),
493 false,
494 )
495 .await?;
496
497 signed_request
498 }
499 };
500 let byte_size = request.body().len();
501 let request = request.map(Body::from);
502 let (protocol, endpoint) = uri::protocol_endpoint(request.uri().clone());
503
504 let mut decompression_service = ServiceBuilder::new()
505 .layer(DecompressionLayer::new())
506 .service(http_client);
507
508 let response = decompression_service.call(request).await?;
512
513 if response.status().is_success() {
514 emit!(EndpointBytesSent {
515 byte_size,
516 protocol: &protocol,
517 endpoint: &endpoint
518 });
519 }
520
521 let (parts, body) = response.into_parts();
522 let mut body = body::aggregate(body).await?;
523 Ok(hyper::Response::from_parts(
524 parts,
525 body.copy_to_bytes(body.remaining()),
526 ))
527 })
528 }
529}
530
531impl<F, B> Clone for HttpBatchService<F, B> {
532 fn clone(&self) -> Self {
533 Self {
534 inner: self.inner.clone(),
535 request_builder: Arc::clone(&self.request_builder),
536 #[cfg(feature = "aws-core")]
537 sig_v4_config: self.sig_v4_config.clone(),
538 }
539 }
540}
541
542impl<T: fmt::Debug> sink::Response for http::Response<T> {
543 fn is_successful(&self) -> bool {
544 self.status().is_success()
545 }
546
547 fn is_transient(&self) -> bool {
548 self.status().is_server_error()
549 }
550}
551
552#[derive(Debug, Clone)]
553pub struct HttpRetryLogic<Req> {
554 request: PhantomData<Req>,
555}
556impl<Req> Default for HttpRetryLogic<Req> {
557 fn default() -> Self {
558 Self {
559 request: PhantomData,
560 }
561 }
562}
563
564impl<Req: Clone + Send + Sync + 'static> RetryLogic for HttpRetryLogic<Req> {
565 type Error = HttpError;
566 type Request = Req;
567 type Response = hyper::Response<Bytes>;
568
569 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
570 true
571 }
572
573 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
574 let status = response.status();
575
576 match status {
577 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
578 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
579 StatusCode::NOT_IMPLEMENTED => {
580 RetryAction::DontRetry("endpoint not implemented".into())
581 }
582 _ if status.is_server_error() => RetryAction::Retry(
583 format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(),
584 ),
585 _ if status.is_success() => RetryAction::Successful,
586 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
587 }
588 }
589}
590
591#[derive(Debug)]
594pub struct HttpStatusRetryLogic<F, Req, Res> {
595 func: F,
596 request: PhantomData<Req>,
597 response: PhantomData<Res>,
598}
599
600impl<F, Req, Res> HttpStatusRetryLogic<F, Req, Res>
601where
602 F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
603 Req: Send + Sync + 'static,
604 Res: Send + Sync + 'static,
605{
606 pub const fn new(func: F) -> HttpStatusRetryLogic<F, Req, Res> {
607 HttpStatusRetryLogic {
608 func,
609 request: PhantomData,
610 response: PhantomData,
611 }
612 }
613}
614
615impl<F, Req, Res> RetryLogic for HttpStatusRetryLogic<F, Req, Res>
616where
617 F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
618 Req: Send + Sync + 'static,
619 Res: Send + Sync + 'static,
620{
621 type Error = HttpError;
622 type Request = Req;
623 type Response = Res;
624
625 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
626 true
627 }
628
629 fn should_retry_response(&self, response: &Res) -> RetryAction<Req> {
630 let status = (self.func)(response);
631
632 match status {
633 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
634 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
635 StatusCode::NOT_IMPLEMENTED => {
636 RetryAction::DontRetry("endpoint not implemented".into())
637 }
638 _ if status.is_server_error() => {
639 RetryAction::Retry(format!("Http Status: {status}").into())
640 }
641 _ if status.is_success() => RetryAction::Successful,
642 _ => RetryAction::DontRetry(format!("Http status: {status}").into()),
643 }
644 }
645}
646
647impl<F, Req, Res> Clone for HttpStatusRetryLogic<F, Req, Res>
648where
649 F: Clone,
650{
651 fn clone(&self) -> Self {
652 Self {
653 func: self.func.clone(),
654 request: PhantomData,
655 response: PhantomData,
656 }
657 }
658}
659
660#[configurable_component]
662#[derive(Clone, Debug, Default)]
663pub struct RequestConfig {
664 #[serde(flatten)]
665 pub tower: TowerRequestConfig,
666
667 #[serde(default)]
669 #[configurable(metadata(
670 docs::additional_props_description = "An HTTP request header and its value. Both header names and values support templating with event data."
671 ))]
672 #[configurable(metadata(docs::examples = "headers_examples()"))]
673 pub headers: BTreeMap<String, String>,
674}
675
676fn headers_examples() -> BTreeMap<String, String> {
677 btreemap! {
678 "Accept" => "text/plain",
679 "X-My-Custom-Header" => "A-Value",
680 "X-Event-Level" => "{{level}}",
681 "X-Event-Timestamp" => "{{timestamp}}",
682 }
683}
684
685impl RequestConfig {
686 pub fn add_old_option(&mut self, headers: Option<BTreeMap<String, String>>) {
687 if let Some(headers) = headers {
688 warn!("Option `headers` has been deprecated. Use `request.headers` instead.");
689 self.headers.extend(headers);
690 }
691 }
692
693 pub fn split_headers(&self) -> (BTreeMap<String, String>, BTreeMap<String, Template>) {
694 let mut static_headers = BTreeMap::new();
695 let mut template_headers = BTreeMap::new();
696
697 for (name, value) in &self.headers {
698 match Template::try_from(value.as_str()) {
699 Ok(template) if !template.is_dynamic() => {
700 static_headers.insert(name.clone(), value.clone());
701 }
702 Ok(template) => {
703 template_headers.insert(name.clone(), template);
704 }
705 Err(_) => {
706 static_headers.insert(name.clone(), value.clone());
707 }
708 }
709 }
710
711 (static_headers, template_headers)
712 }
713}
714
715#[derive(Debug, Snafu)]
716pub enum HeaderValidationError {
717 #[snafu(display("{}: {}", source, name))]
718 InvalidHeaderName {
719 name: String,
720 source: header::InvalidHeaderName,
721 },
722 #[snafu(display("{}: {}", source, value))]
723 InvalidHeaderValue {
724 value: String,
725 source: header::InvalidHeaderValue,
726 },
727}
728
729pub fn validate_headers(
730 headers: &BTreeMap<String, String>,
731) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
732 let mut validated_headers = BTreeMap::new();
733 for (name, value) in headers {
734 let name = HeaderName::from_bytes(name.as_bytes())
735 .with_context(|_| InvalidHeaderNameSnafu { name })?;
736 let value = HeaderValue::from_bytes(value.as_bytes())
737 .with_context(|_| InvalidHeaderValueSnafu { value })?;
738
739 validated_headers.insert(name.into(), value);
740 }
741
742 Ok(validated_headers)
743}
744
745#[derive(Debug, Clone)]
747pub struct HttpRequest<T: Send> {
748 payload: Bytes,
749 finalizers: EventFinalizers,
750 request_metadata: RequestMetadata,
751 additional_metadata: T,
752}
753
754impl<T: Send> HttpRequest<T> {
755 pub const fn new(
757 payload: Bytes,
758 finalizers: EventFinalizers,
759 request_metadata: RequestMetadata,
760 additional_metadata: T,
761 ) -> Self {
762 Self {
763 payload,
764 finalizers,
765 request_metadata,
766 additional_metadata,
767 }
768 }
769
770 pub const fn get_additional_metadata(&self) -> &T {
771 &self.additional_metadata
772 }
773
774 pub fn take_payload(&mut self) -> Bytes {
775 std::mem::take(&mut self.payload)
776 }
777}
778
779impl<T: Send> Finalizable for HttpRequest<T> {
780 fn take_finalizers(&mut self) -> EventFinalizers {
781 self.finalizers.take_finalizers()
782 }
783}
784
785impl<T: Send> MetaDescriptive for HttpRequest<T> {
786 fn get_metadata(&self) -> &RequestMetadata {
787 &self.request_metadata
788 }
789
790 fn metadata_mut(&mut self) -> &mut RequestMetadata {
791 &mut self.request_metadata
792 }
793}
794
795impl<T: Send> ByteSizeOf for HttpRequest<T> {
796 fn allocated_bytes(&self) -> usize {
797 self.payload.allocated_bytes() + self.finalizers.allocated_bytes()
798 }
799}
800
801pub struct HttpResponse {
803 pub http_response: Response<Bytes>,
804 pub events_byte_size: GroupedCountByteSize,
805 pub raw_byte_size: usize,
806}
807
808impl DriverResponse for HttpResponse {
809 fn event_status(&self) -> EventStatus {
810 if self.http_response.is_successful() {
811 EventStatus::Delivered
812 } else if self.http_response.is_transient() {
813 EventStatus::Errored
814 } else {
815 EventStatus::Rejected
816 }
817 }
818
819 fn events_sent(&self) -> &GroupedCountByteSize {
820 &self.events_byte_size
821 }
822
823 fn bytes_sent(&self) -> Option<usize> {
824 Some(self.raw_byte_size)
825 }
826}
827
828pub fn http_response_retry_logic<Request: Clone + Send + Sync + 'static>() -> HttpStatusRetryLogic<
830 impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static,
831 Request,
832 HttpResponse,
833> {
834 HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status())
835}
836
837#[derive(Default)]
839pub struct HttpJsonBatchSizer;
840
841impl ItemBatchSize<Event> for HttpJsonBatchSizer {
842 fn size(&self, item: &Event) -> usize {
843 item.estimated_json_encoded_size_of().get()
844 }
845}
846
847pub trait HttpServiceRequestBuilder<T: Send> {
849 fn build(&self, request: HttpRequest<T>) -> Result<Request<Bytes>, crate::Error>;
850}
851
852#[derive(Clone)]
854pub struct HttpService<B, T: Send> {
855 batch_service:
856 HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HttpRequest<T>>,
857 _phantom: PhantomData<B>,
858}
859
860impl<B, T: Send + 'static> HttpService<B, T>
861where
862 B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
863{
864 pub fn new(http_client: HttpClient<Body>, http_request_builder: B) -> Self {
865 let http_request_builder = Arc::new(http_request_builder);
866
867 let batch_service = HttpBatchService::new(http_client, move |req: HttpRequest<T>| {
868 let request_builder = Arc::clone(&http_request_builder);
869
870 let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
871 Box::pin(async move { request_builder.build(req) });
872
873 fut
874 });
875 Self {
876 batch_service,
877 _phantom: PhantomData,
878 }
879 }
880
881 #[cfg(feature = "aws-core")]
882 pub fn new_with_sig_v4(
883 http_client: HttpClient<Body>,
884 http_request_builder: B,
885 sig_v4_config: SigV4Config,
886 ) -> Self {
887 let http_request_builder = Arc::new(http_request_builder);
888
889 let batch_service = HttpBatchService::new_with_sig_v4(
890 http_client,
891 move |req: HttpRequest<T>| {
892 let request_builder = Arc::clone(&http_request_builder);
893
894 let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
895 Box::pin(async move { request_builder.build(req) });
896
897 fut
898 },
899 sig_v4_config,
900 );
901 Self {
902 batch_service,
903 _phantom: PhantomData,
904 }
905 }
906}
907
908impl<B, T: Send + 'static> Service<HttpRequest<T>> for HttpService<B, T>
909where
910 B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
911{
912 type Response = HttpResponse;
913 type Error = crate::Error;
914 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
915
916 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
917 Poll::Ready(Ok(()))
918 }
919
920 fn call(&mut self, mut request: HttpRequest<T>) -> Self::Future {
921 let mut http_service = self.batch_service.clone();
922
923 let metadata = std::mem::take(request.metadata_mut());
926 let raw_byte_size = metadata.request_encoded_size();
927 let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
928
929 Box::pin(async move {
930 let http_response = http_service.call(request).await?;
931
932 Ok(HttpResponse {
933 http_response,
934 events_byte_size,
935 raw_byte_size,
936 })
937 })
938 }
939}
940
941#[cfg(test)]
942mod test {
943 #![allow(clippy::print_stderr)] use futures::{future::ready, StreamExt};
946 use hyper::{
947 service::{make_service_fn, service_fn},
948 Response, Server, Uri,
949 };
950
951 use super::*;
952 use crate::{config::ProxyConfig, test_util::next_addr};
953
954 #[test]
955 fn util_http_retry_logic() {
956 let logic = HttpRetryLogic::<()>::default();
957
958 let response_408 = Response::builder().status(408).body(Bytes::new()).unwrap();
959 let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap();
960 let response_500 = Response::builder().status(500).body(Bytes::new()).unwrap();
961 let response_400 = Response::builder().status(400).body(Bytes::new()).unwrap();
962 let response_501 = Response::builder().status(501).body(Bytes::new()).unwrap();
963 assert!(logic.should_retry_response(&response_429).is_retryable());
964 assert!(logic.should_retry_response(&response_500).is_retryable());
965 assert!(logic.should_retry_response(&response_408).is_retryable());
966 assert!(logic
967 .should_retry_response(&response_400)
968 .is_not_retryable());
969 assert!(logic
970 .should_retry_response(&response_501)
971 .is_not_retryable());
972 }
973
974 #[tokio::test]
975 async fn util_http_it_makes_http_requests() {
976 let addr = next_addr();
977
978 let uri = format!("http://{}:{}/", addr.ip(), addr.port())
979 .parse::<Uri>()
980 .unwrap();
981
982 let request = Bytes::from("hello");
983 let proxy = ProxyConfig::default();
984 let client = HttpClient::new(None, &proxy).unwrap();
985 let mut service = HttpBatchService::new(client, move |body: Bytes| {
986 Box::pin(ready(
987 http::Request::post(&uri).body(body).map_err(Into::into),
988 ))
989 });
990
991 let (tx, rx) = futures::channel::mpsc::channel(10);
992
993 let new_service = make_service_fn(move |_| {
994 let tx = tx.clone();
995
996 let svc = service_fn(move |req| {
997 let mut tx = tx.clone();
998
999 async move {
1000 let mut body = hyper::body::aggregate(req.into_body())
1001 .await
1002 .map_err(|error| format!("error: {error}"))?;
1003 let string = String::from_utf8(body.copy_to_bytes(body.remaining()).to_vec())
1004 .map_err(|_| "Wasn't UTF-8".to_string())?;
1005 tx.try_send(string).map_err(|_| "Send error".to_string())?;
1006
1007 Ok::<_, crate::Error>(Response::new(Body::from("")))
1008 }
1009 });
1010
1011 async move { Ok::<_, std::convert::Infallible>(svc) }
1012 });
1013
1014 tokio::spawn(async move {
1015 if let Err(error) = Server::bind(&addr).serve(new_service).await {
1016 eprintln!("Server error: {error}");
1017 }
1018 });
1019
1020 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1021 service.call(request).await.unwrap();
1022
1023 let (body, _rest) = StreamExt::into_future(rx).await;
1024 assert_eq!(body.unwrap(), "hello");
1025 }
1026}