1#[cfg(feature = "aws-core")]
2use aws_credential_types::provider::SharedCredentialsProvider;
3#[cfg(feature = "aws-core")]
4use aws_types::region::Region;
5use bytes::{Buf, Bytes};
6use futures::{Sink, future::BoxFuture};
7use headers::HeaderName;
8use http::{HeaderValue, Request, Response, StatusCode, header};
9use http_body::Body as _;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12pub struct OrderedHeaderName(HeaderName);
13
14impl OrderedHeaderName {
15 pub const fn new(header_name: HeaderName) -> Self {
16 Self(header_name)
17 }
18
19 pub const fn inner(&self) -> &HeaderName {
20 &self.0
21 }
22}
23
24impl From<HeaderName> for OrderedHeaderName {
25 fn from(header_name: HeaderName) -> Self {
26 Self(header_name)
27 }
28}
29
30impl Ord for OrderedHeaderName {
31 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
32 self.0.as_str().cmp(other.0.as_str())
33 }
34}
35
36impl PartialOrd for OrderedHeaderName {
37 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
38 Some(self.cmp(other))
39 }
40}
41use std::{
42 collections::BTreeMap,
43 fmt,
44 future::Future,
45 hash::Hash,
46 marker::PhantomData,
47 pin::Pin,
48 sync::Arc,
49 task::{Context, Poll, ready},
50 time::Duration,
51};
52
53use hyper::Body;
54use pin_project::pin_project;
55use snafu::{ResultExt, Snafu};
56use tower::{Service, ServiceBuilder};
57use tower_http::decompression::DecompressionLayer;
58use vector_lib::{
59 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
60 stream::batcher::limiter::ItemBatchSize,
61};
62
63use super::{
64 Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
65 TowerRequestSettings,
66 retries::{RetryAction, RetryLogic},
67 sink::{self, Response as _},
68 uri,
69};
70#[cfg(feature = "aws-core")]
71use crate::aws::sign_request;
72use crate::{
73 event::Event,
74 http::{HttpClient, HttpError},
75 internal_events::{EndpointBytesSent, SinkRequestBuildError},
76 sinks::prelude::*,
77 template::Template,
78};
79
80pub trait HttpEventEncoder<Output> {
81 fn encode_event(&mut self, event: Event) -> Option<Output>;
83}
84
85pub trait HttpSink: Send + Sync + 'static {
86 type Input;
87 type Output;
88 type Encoder: HttpEventEncoder<Self::Input>;
89
90 fn build_encoder(&self) -> Self::Encoder;
91 fn build_request(
92 &self,
93 events: Self::Output,
94 ) -> impl Future<Output = crate::Result<http::Request<Bytes>>> + Send;
95}
96
97#[pin_project]
113pub struct BatchedHttpSink<T, B, RL = HttpRetryLogic<<B as Batch>::Output>>
114where
115 B: Batch,
116 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
117 T: HttpSink<Input = B::Input, Output = B::Output>,
118 RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
119 + Send
120 + 'static,
121{
122 sink: Arc<T>,
123 #[pin]
124 inner: TowerBatchedSink<
125 HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
126 B,
127 RL,
128 >,
129 encoder: T::Encoder,
130 slot: Option<EncodedEvent<B::Input>>,
134}
135
136impl<T, B> BatchedHttpSink<T, B>
137where
138 B: Batch,
139 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
140 T: HttpSink<Input = B::Input, Output = B::Output>,
141{
142 pub fn new(
143 sink: T,
144 batch: B,
145 request_settings: TowerRequestSettings,
146 batch_timeout: Duration,
147 client: HttpClient,
148 ) -> Self {
149 Self::with_logic(
150 sink,
151 batch,
152 HttpRetryLogic::default(),
153 request_settings,
154 batch_timeout,
155 client,
156 )
157 }
158}
159
160impl<T, B, RL> BatchedHttpSink<T, B, RL>
161where
162 B: Batch,
163 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
164 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
165 + Send
166 + 'static,
167 T: HttpSink<Input = B::Input, Output = B::Output>,
168{
169 pub fn with_logic(
170 sink: T,
171 batch: B,
172 retry_logic: RL,
173 request_settings: TowerRequestSettings,
174 batch_timeout: Duration,
175 client: HttpClient,
176 ) -> Self {
177 let sink = Arc::new(sink);
178
179 let sink1 = Arc::clone(&sink);
180 let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
181 let sink = Arc::clone(&sink1);
182 Box::pin(async move { sink.build_request(b).await })
183 };
184
185 let svc = HttpBatchService::new(client, request_builder);
186 let inner = request_settings.batch_sink(retry_logic, svc, batch, batch_timeout);
187 let encoder = sink.build_encoder();
188
189 Self {
190 sink,
191 inner,
192 encoder,
193 slot: None,
194 }
195 }
196}
197
198impl<T, B, RL> Sink<Event> for BatchedHttpSink<T, B, RL>
199where
200 B: Batch,
201 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
202 T: HttpSink<Input = B::Input, Output = B::Output>,
203 RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
204 + Send
205 + 'static,
206{
207 type Error = crate::Error;
208
209 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210 if self.slot.is_some() {
211 match self.as_mut().poll_flush(cx) {
212 Poll::Ready(Ok(())) => {}
213 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
214 Poll::Pending => {
215 if self.slot.is_some() {
216 return Poll::Pending;
217 }
218 }
219 }
220 }
221
222 Poll::Ready(Ok(()))
223 }
224
225 fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
226 let byte_size = event.size_of();
227 let json_byte_size = event.estimated_json_encoded_size_of();
228 let finalizers = event.metadata_mut().take_finalizers();
229 if let Some(item) = self.encoder.encode_event(event) {
230 *self.project().slot = Some(EncodedEvent {
231 item,
232 finalizers,
233 byte_size,
234 json_byte_size,
235 });
236 }
237
238 Ok(())
239 }
240
241 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
242 let mut this = self.project();
243 if this.slot.is_some() {
244 ready!(this.inner.as_mut().poll_ready(cx))?;
245 this.inner.as_mut().start_send(this.slot.take().unwrap())?;
246 }
247
248 this.inner.poll_flush(cx)
249 }
250
251 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
252 ready!(self.as_mut().poll_flush(cx))?;
253 self.project().inner.poll_close(cx)
254 }
255}
256
257#[pin_project]
259pub struct PartitionHttpSink<T, B, K, RL = HttpRetryLogic<<B as Batch>::Output>>
260where
261 B: Batch,
262 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
263 B::Input: Partition<K>,
264 K: Hash + Eq + Clone + Send + 'static,
265 T: HttpSink<Input = B::Input, Output = B::Output>,
266 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
267{
268 sink: Arc<T>,
269 #[pin]
270 inner: TowerPartitionSink<
271 HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
272 B,
273 RL,
274 K,
275 >,
276 encoder: T::Encoder,
277 slot: Option<EncodedEvent<B::Input>>,
278}
279
280impl<T, B, K> PartitionHttpSink<T, B, K, HttpRetryLogic<<B as Batch>::Output>>
281where
282 B: Batch,
283 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
284 B::Input: Partition<K>,
285 K: Hash + Eq + Clone + Send + 'static,
286 T: HttpSink<Input = B::Input, Output = B::Output>,
287{
288 pub fn new(
289 sink: T,
290 batch: B,
291 request_settings: TowerRequestSettings,
292 batch_timeout: Duration,
293 client: HttpClient,
294 ) -> Self {
295 Self::with_retry_logic(
296 sink,
297 batch,
298 HttpRetryLogic::default(),
299 request_settings,
300 batch_timeout,
301 client,
302 )
303 }
304}
305
306impl<T, B, K, RL> PartitionHttpSink<T, B, K, RL>
307where
308 B: Batch,
309 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
310 B::Input: Partition<K>,
311 K: Hash + Eq + Clone + Send + 'static,
312 T: HttpSink<Input = B::Input, Output = B::Output>,
313 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
314 + Send
315 + 'static,
316{
317 pub fn with_retry_logic(
318 sink: T,
319 batch: B,
320 retry_logic: RL,
321 request_settings: TowerRequestSettings,
322 batch_timeout: Duration,
323 client: HttpClient,
324 ) -> Self {
325 let sink = Arc::new(sink);
326
327 let sink1 = Arc::clone(&sink);
328 let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
329 let sink = Arc::clone(&sink1);
330 Box::pin(async move { sink.build_request(b).await })
331 };
332
333 let svc = HttpBatchService::new(client, request_builder);
334 let inner = request_settings.partition_sink(retry_logic, svc, batch, batch_timeout);
335 let encoder = sink.build_encoder();
336
337 Self {
338 sink,
339 inner,
340 encoder,
341 slot: None,
342 }
343 }
344
345 pub fn ordered(mut self) -> Self {
347 self.inner.ordered();
348 self
349 }
350}
351
352impl<T, B, K, RL> Sink<Event> for PartitionHttpSink<T, B, K, RL>
353where
354 B: Batch,
355 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
356 B::Input: Partition<K>,
357 K: Hash + Eq + Clone + Send + 'static,
358 T: HttpSink<Input = B::Input, Output = B::Output>,
359 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
360{
361 type Error = crate::Error;
362
363 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
364 if self.slot.is_some() {
365 match self.as_mut().poll_flush(cx) {
366 Poll::Ready(Ok(())) => {}
367 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
368 Poll::Pending => {
369 if self.slot.is_some() {
370 return Poll::Pending;
371 }
372 }
373 }
374 }
375
376 Poll::Ready(Ok(()))
377 }
378
379 fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
380 let finalizers = event.metadata_mut().take_finalizers();
381 let byte_size = event.size_of();
382 let json_byte_size = event.estimated_json_encoded_size_of();
383
384 if let Some(item) = self.encoder.encode_event(event) {
385 *self.project().slot = Some(EncodedEvent {
386 item,
387 finalizers,
388 byte_size,
389 json_byte_size,
390 });
391 }
392
393 Ok(())
394 }
395
396 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
397 let mut this = self.project();
398 if this.slot.is_some() {
399 ready!(this.inner.as_mut().poll_ready(cx))?;
400 this.inner.as_mut().start_send(this.slot.take().unwrap())?;
401 }
402
403 this.inner.poll_flush(cx)
404 }
405
406 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
407 ready!(self.as_mut().poll_flush(cx))?;
408 self.project().inner.poll_close(cx)
409 }
410}
411
412#[cfg(feature = "aws-core")]
413#[derive(Clone)]
414pub struct SigV4Config {
415 pub(crate) shared_credentials_provider: SharedCredentialsProvider,
416 pub(crate) region: Region,
417 pub(crate) service: String,
418}
419
420pub struct HttpBatchService<F, B = Bytes> {
427 inner: HttpClient<Body>,
428 request_builder: Arc<dyn Fn(B) -> F + Send + Sync>,
429 #[cfg(feature = "aws-core")]
430 sig_v4_config: Option<SigV4Config>,
431}
432
433impl<F, B> HttpBatchService<F, B> {
434 pub fn new(
435 inner: HttpClient,
436 request_builder: impl Fn(B) -> F + Send + Sync + 'static,
437 ) -> Self {
438 HttpBatchService {
439 inner,
440 request_builder: Arc::new(Box::new(request_builder)),
441 #[cfg(feature = "aws-core")]
442 sig_v4_config: None,
443 }
444 }
445
446 #[cfg(feature = "aws-core")]
447 pub fn new_with_sig_v4(
448 inner: HttpClient,
449 request_builder: impl Fn(B) -> F + Send + Sync + 'static,
450 sig_v4_config: SigV4Config,
451 ) -> Self {
452 HttpBatchService {
453 inner,
454 request_builder: Arc::new(Box::new(request_builder)),
455 sig_v4_config: Some(sig_v4_config),
456 }
457 }
458}
459
460impl<F, B> Service<B> for HttpBatchService<F, B>
461where
462 F: Future<Output = crate::Result<hyper::Request<Bytes>>> + Send + 'static,
463 B: ByteSizeOf + Send + 'static,
464{
465 type Response = http::Response<Bytes>;
466 type Error = crate::Error;
467 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
468
469 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
470 Poll::Ready(Ok(()))
471 }
472
473 fn call(&mut self, body: B) -> Self::Future {
474 let request_builder = Arc::clone(&self.request_builder);
475 #[cfg(feature = "aws-core")]
476 let sig_v4_config = self.sig_v4_config.clone();
477 let http_client = self.inner.clone();
478
479 Box::pin(async move {
480 let request = request_builder(body).await.inspect_err(|error| {
481 emit!(SinkRequestBuildError { error });
482 })?;
483
484 #[cfg(feature = "aws-core")]
485 let request = match sig_v4_config {
486 None => request,
487 Some(sig_v4_config) => {
488 let mut signed_request = request;
489 sign_request(
490 sig_v4_config.service.as_str(),
491 &mut signed_request,
492 &sig_v4_config.shared_credentials_provider,
493 Some(&sig_v4_config.region),
494 false,
495 )
496 .await?;
497
498 signed_request
499 }
500 };
501 let byte_size = request.body().len();
502 let request = request.map(Body::from);
503 let (protocol, endpoint) = uri::protocol_endpoint(request.uri().clone());
504
505 let mut decompression_service = ServiceBuilder::new()
506 .layer(DecompressionLayer::new())
507 .service(http_client);
508
509 let response = decompression_service.call(request).await?;
513
514 if response.status().is_success() {
515 emit!(EndpointBytesSent {
516 byte_size,
517 protocol: &protocol,
518 endpoint: &endpoint
519 });
520 }
521
522 let (parts, body) = response.into_parts();
523 let mut body = body.collect().await?.aggregate();
524 Ok(hyper::Response::from_parts(
525 parts,
526 body.copy_to_bytes(body.remaining()),
527 ))
528 })
529 }
530}
531
532impl<F, B> Clone for HttpBatchService<F, B> {
533 fn clone(&self) -> Self {
534 Self {
535 inner: self.inner.clone(),
536 request_builder: Arc::clone(&self.request_builder),
537 #[cfg(feature = "aws-core")]
538 sig_v4_config: self.sig_v4_config.clone(),
539 }
540 }
541}
542
543impl<T: fmt::Debug> sink::Response for http::Response<T> {
544 fn is_successful(&self) -> bool {
545 self.status().is_success()
546 }
547
548 fn is_transient(&self) -> bool {
549 self.status().is_server_error()
550 }
551}
552
553#[derive(Debug, Clone)]
554pub struct HttpRetryLogic<Req> {
555 request: PhantomData<Req>,
556}
557impl<Req> Default for HttpRetryLogic<Req> {
558 fn default() -> Self {
559 Self {
560 request: PhantomData,
561 }
562 }
563}
564
565impl<Req: Clone + Send + Sync + 'static> RetryLogic for HttpRetryLogic<Req> {
566 type Error = HttpError;
567 type Request = Req;
568 type Response = hyper::Response<Bytes>;
569
570 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
571 true
572 }
573
574 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
575 let status = response.status();
576
577 match status {
578 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
579 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
580 StatusCode::NOT_IMPLEMENTED => {
581 RetryAction::DontRetry("endpoint not implemented".into())
582 }
583 _ if status.is_server_error() => RetryAction::Retry(
584 format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(),
585 ),
586 _ if status.is_success() => RetryAction::Successful,
587 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
588 }
589 }
590}
591
592#[derive(Debug)]
595pub struct HttpStatusRetryLogic<F, Req, Res> {
596 func: F,
597 request: PhantomData<Req>,
598 response: PhantomData<Res>,
599}
600
601impl<F, Req, Res> HttpStatusRetryLogic<F, Req, Res>
602where
603 F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
604 Req: Send + Sync + 'static,
605 Res: Send + Sync + 'static,
606{
607 pub const fn new(func: F) -> HttpStatusRetryLogic<F, Req, Res> {
608 HttpStatusRetryLogic {
609 func,
610 request: PhantomData,
611 response: PhantomData,
612 }
613 }
614}
615
616impl<F, Req, Res> RetryLogic for HttpStatusRetryLogic<F, Req, Res>
617where
618 F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
619 Req: Send + Sync + 'static,
620 Res: Send + Sync + 'static,
621{
622 type Error = HttpError;
623 type Request = Req;
624 type Response = Res;
625
626 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
627 true
628 }
629
630 fn should_retry_response(&self, response: &Res) -> RetryAction<Req> {
631 let status = (self.func)(response);
632
633 match status {
634 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
635 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
636 StatusCode::NOT_IMPLEMENTED => {
637 RetryAction::DontRetry("endpoint not implemented".into())
638 }
639 _ if status.is_server_error() => {
640 RetryAction::Retry(format!("Http Status: {status}").into())
641 }
642 _ if status.is_success() => RetryAction::Successful,
643 _ => RetryAction::DontRetry(format!("Http status: {status}").into()),
644 }
645 }
646}
647
648impl<F, Req, Res> Clone for HttpStatusRetryLogic<F, Req, Res>
649where
650 F: Clone,
651{
652 fn clone(&self) -> Self {
653 Self {
654 func: self.func.clone(),
655 request: PhantomData,
656 response: PhantomData,
657 }
658 }
659}
660
661#[configurable_component]
663#[derive(Clone, Debug, Default)]
664pub struct RequestConfig {
665 #[serde(flatten)]
666 pub tower: TowerRequestConfig,
667
668 #[serde(default)]
670 #[configurable(metadata(
671 docs::additional_props_description = "An HTTP request header and its value. Both header names and values support templating with event data."
672 ))]
673 #[configurable(metadata(docs::examples = "headers_examples()"))]
674 pub headers: BTreeMap<String, String>,
675}
676
677fn headers_examples() -> BTreeMap<String, String> {
678 btreemap! {
679 "Accept" => "text/plain",
680 "X-My-Custom-Header" => "A-Value",
681 "X-Event-Level" => "{{level}}",
682 "X-Event-Timestamp" => "{{timestamp}}",
683 }
684}
685
686impl RequestConfig {
687 pub fn split_headers(&self) -> (BTreeMap<String, String>, BTreeMap<String, Template>) {
688 let mut static_headers = BTreeMap::new();
689 let mut template_headers = BTreeMap::new();
690
691 for (name, value) in &self.headers {
692 match Template::try_from(value.as_str()) {
693 Ok(template) if !template.is_dynamic() => {
694 static_headers.insert(name.clone(), value.clone());
695 }
696 Ok(template) => {
697 template_headers.insert(name.clone(), template);
698 }
699 Err(_) => {
700 static_headers.insert(name.clone(), value.clone());
701 }
702 }
703 }
704
705 (static_headers, template_headers)
706 }
707}
708
709#[derive(Debug, Snafu)]
710pub enum HeaderValidationError {
711 #[snafu(display("{}: {}", source, name))]
712 InvalidHeaderName {
713 name: String,
714 source: header::InvalidHeaderName,
715 },
716 #[snafu(display("{}: {}", source, value))]
717 InvalidHeaderValue {
718 value: String,
719 source: header::InvalidHeaderValue,
720 },
721}
722
723pub fn validate_headers(
724 headers: &BTreeMap<String, String>,
725) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
726 let mut validated_headers = BTreeMap::new();
727 for (name, value) in headers {
728 let name = HeaderName::from_bytes(name.as_bytes())
729 .with_context(|_| InvalidHeaderNameSnafu { name })?;
730 let value = HeaderValue::from_bytes(value.as_bytes())
731 .with_context(|_| InvalidHeaderValueSnafu { value })?;
732
733 validated_headers.insert(name.into(), value);
734 }
735
736 Ok(validated_headers)
737}
738
739#[derive(Debug, Clone)]
741pub struct HttpRequest<T: Send> {
742 payload: Bytes,
743 finalizers: EventFinalizers,
744 request_metadata: RequestMetadata,
745 additional_metadata: T,
746}
747
748impl<T: Send> HttpRequest<T> {
749 pub const fn new(
751 payload: Bytes,
752 finalizers: EventFinalizers,
753 request_metadata: RequestMetadata,
754 additional_metadata: T,
755 ) -> Self {
756 Self {
757 payload,
758 finalizers,
759 request_metadata,
760 additional_metadata,
761 }
762 }
763
764 pub const fn get_additional_metadata(&self) -> &T {
765 &self.additional_metadata
766 }
767
768 pub fn take_payload(&mut self) -> Bytes {
769 std::mem::take(&mut self.payload)
770 }
771}
772
773impl<T: Send> Finalizable for HttpRequest<T> {
774 fn take_finalizers(&mut self) -> EventFinalizers {
775 self.finalizers.take_finalizers()
776 }
777}
778
779impl<T: Send> MetaDescriptive for HttpRequest<T> {
780 fn get_metadata(&self) -> &RequestMetadata {
781 &self.request_metadata
782 }
783
784 fn metadata_mut(&mut self) -> &mut RequestMetadata {
785 &mut self.request_metadata
786 }
787}
788
789impl<T: Send> ByteSizeOf for HttpRequest<T> {
790 fn allocated_bytes(&self) -> usize {
791 self.payload.allocated_bytes() + self.finalizers.allocated_bytes()
792 }
793}
794
795pub struct HttpResponse {
797 pub http_response: Response<Bytes>,
798 pub events_byte_size: GroupedCountByteSize,
799 pub raw_byte_size: usize,
800}
801
802impl DriverResponse for HttpResponse {
803 fn event_status(&self) -> EventStatus {
804 if self.http_response.is_successful() {
805 EventStatus::Delivered
806 } else if self.http_response.is_transient() {
807 EventStatus::Errored
808 } else {
809 EventStatus::Rejected
810 }
811 }
812
813 fn events_sent(&self) -> &GroupedCountByteSize {
814 &self.events_byte_size
815 }
816
817 fn bytes_sent(&self) -> Option<usize> {
818 Some(self.raw_byte_size)
819 }
820}
821
822pub fn http_response_retry_logic<Request: Clone + Send + Sync + 'static>() -> HttpStatusRetryLogic<
824 impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static,
825 Request,
826 HttpResponse,
827> {
828 HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status())
829}
830
831#[derive(Default)]
833pub struct HttpJsonBatchSizer;
834
835impl ItemBatchSize<Event> for HttpJsonBatchSizer {
836 fn size(&self, item: &Event) -> usize {
837 item.estimated_json_encoded_size_of().get()
838 }
839}
840
841pub trait HttpServiceRequestBuilder<T: Send> {
843 fn build(&self, request: HttpRequest<T>) -> Result<Request<Bytes>, crate::Error>;
844}
845
846#[derive(Clone)]
848pub struct HttpService<B, T: Send> {
849 batch_service:
850 HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HttpRequest<T>>,
851 _phantom: PhantomData<B>,
852}
853
854impl<B, T: Send + 'static> HttpService<B, T>
855where
856 B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
857{
858 pub fn new(http_client: HttpClient<Body>, http_request_builder: B) -> Self {
859 let http_request_builder = Arc::new(http_request_builder);
860
861 let batch_service = HttpBatchService::new(http_client, move |req: HttpRequest<T>| {
862 let request_builder = Arc::clone(&http_request_builder);
863
864 let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
865 Box::pin(async move { request_builder.build(req) });
866
867 fut
868 });
869 Self {
870 batch_service,
871 _phantom: PhantomData,
872 }
873 }
874
875 #[cfg(feature = "aws-core")]
876 pub fn new_with_sig_v4(
877 http_client: HttpClient<Body>,
878 http_request_builder: B,
879 sig_v4_config: SigV4Config,
880 ) -> Self {
881 let http_request_builder = Arc::new(http_request_builder);
882
883 let batch_service = HttpBatchService::new_with_sig_v4(
884 http_client,
885 move |req: HttpRequest<T>| {
886 let request_builder = Arc::clone(&http_request_builder);
887
888 let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
889 Box::pin(async move { request_builder.build(req) });
890
891 fut
892 },
893 sig_v4_config,
894 );
895 Self {
896 batch_service,
897 _phantom: PhantomData,
898 }
899 }
900}
901
902impl<B, T: Send + 'static> Service<HttpRequest<T>> for HttpService<B, T>
903where
904 B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
905{
906 type Response = HttpResponse;
907 type Error = crate::Error;
908 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
909
910 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
911 Poll::Ready(Ok(()))
912 }
913
914 fn call(&mut self, mut request: HttpRequest<T>) -> Self::Future {
915 let mut http_service = self.batch_service.clone();
916
917 let metadata = std::mem::take(request.metadata_mut());
920 let raw_byte_size = metadata.request_encoded_size();
921 let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
922
923 Box::pin(async move {
924 let http_response = http_service.call(request).await?;
925
926 Ok(HttpResponse {
927 http_response,
928 events_byte_size,
929 raw_byte_size,
930 })
931 })
932 }
933}
934
935#[cfg(test)]
936mod test {
937 #![allow(clippy::print_stderr)] use futures::{StreamExt, future::ready};
940 use hyper::{
941 Response, Server, Uri,
942 service::{make_service_fn, service_fn},
943 };
944
945 use super::*;
946 use crate::{config::ProxyConfig, test_util::addr::next_addr};
947
948 #[test]
949 fn util_http_retry_logic() {
950 let logic = HttpRetryLogic::<()>::default();
951
952 let response_408 = Response::builder().status(408).body(Bytes::new()).unwrap();
953 let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap();
954 let response_500 = Response::builder().status(500).body(Bytes::new()).unwrap();
955 let response_400 = Response::builder().status(400).body(Bytes::new()).unwrap();
956 let response_501 = Response::builder().status(501).body(Bytes::new()).unwrap();
957 assert!(logic.should_retry_response(&response_429).is_retryable());
958 assert!(logic.should_retry_response(&response_500).is_retryable());
959 assert!(logic.should_retry_response(&response_408).is_retryable());
960 assert!(
961 logic
962 .should_retry_response(&response_400)
963 .is_not_retryable()
964 );
965 assert!(
966 logic
967 .should_retry_response(&response_501)
968 .is_not_retryable()
969 );
970 }
971
972 #[tokio::test]
973 async fn util_http_it_makes_http_requests() {
974 let (_guard, addr) = next_addr();
975
976 let uri = format!("http://{}:{}/", addr.ip(), addr.port())
977 .parse::<Uri>()
978 .unwrap();
979
980 let request = Bytes::from("hello");
981 let proxy = ProxyConfig::default();
982 let client = HttpClient::new(None, &proxy).unwrap();
983 let mut service = HttpBatchService::new(client, move |body: Bytes| {
984 Box::pin(ready(
985 http::Request::post(&uri).body(body).map_err(Into::into),
986 ))
987 });
988
989 let (tx, rx) = futures::channel::mpsc::channel(10);
990
991 let new_service = make_service_fn(move |_| {
992 let tx = tx.clone();
993
994 let svc = service_fn(move |req: http::Request<Body>| {
995 let mut tx = tx.clone();
996
997 async move {
998 let mut body = http_body::Body::collect(req.into_body())
999 .await
1000 .map_err(|error| format!("error: {error}"))?
1001 .aggregate();
1002 let string = String::from_utf8(body.copy_to_bytes(body.remaining()).to_vec())
1003 .map_err(|_| "Wasn't UTF-8".to_string())?;
1004 tx.try_send(string).map_err(|_| "Send error".to_string())?;
1005
1006 Ok::<_, crate::Error>(Response::new(Body::from("")))
1007 }
1008 });
1009
1010 async move { Ok::<_, std::convert::Infallible>(svc) }
1011 });
1012
1013 tokio::spawn(async move {
1014 if let Err(error) = Server::bind(&addr).serve(new_service).await {
1015 eprintln!("Server error: {error}");
1016 }
1017 });
1018
1019 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1020 service.call(request).await.unwrap();
1021
1022 let (body, _rest) = StreamExt::into_future(rx).await;
1023 assert_eq!(body.unwrap(), "hello");
1024 }
1025}