1#[cfg(feature = "aws-core")]
2use aws_credential_types::provider::SharedCredentialsProvider;
3#[cfg(feature = "aws-core")]
4use aws_types::region::Region;
5use bytes::{Buf, Bytes};
6use futures::{Sink, future::BoxFuture};
7use headers::HeaderName;
8use http::{HeaderValue, Request, Response, StatusCode, header};
9use http_body::Body as _;
10
11#[derive(Debug, Clone, PartialEq, Eq, Hash)]
12pub struct OrderedHeaderName(HeaderName);
13
14impl OrderedHeaderName {
15 pub const fn new(header_name: HeaderName) -> Self {
16 Self(header_name)
17 }
18
19 pub const fn inner(&self) -> &HeaderName {
20 &self.0
21 }
22}
23
24impl From<HeaderName> for OrderedHeaderName {
25 fn from(header_name: HeaderName) -> Self {
26 Self(header_name)
27 }
28}
29
30impl Ord for OrderedHeaderName {
31 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
32 self.0.as_str().cmp(other.0.as_str())
33 }
34}
35
36impl PartialOrd for OrderedHeaderName {
37 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
38 Some(self.cmp(other))
39 }
40}
41use std::{
42 collections::BTreeMap,
43 fmt,
44 future::Future,
45 hash::Hash,
46 marker::PhantomData,
47 pin::Pin,
48 sync::Arc,
49 task::{Context, Poll, ready},
50 time::Duration,
51};
52
53use hyper::Body;
54use pin_project::pin_project;
55use snafu::{ResultExt, Snafu};
56use tower::{Service, ServiceBuilder};
57use tower_http::decompression::DecompressionLayer;
58use vector_lib::{
59 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
60 stream::batcher::limiter::ItemBatchSize,
61};
62
63use super::{
64 Batch, EncodedEvent, Partition, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
65 TowerRequestSettings,
66 retries::{RetryAction, RetryLogic},
67 sink::{self, Response as _},
68 uri,
69};
70#[cfg(feature = "aws-core")]
71use crate::aws::sign_request;
72use crate::{
73 event::Event,
74 http::{HttpClient, HttpError},
75 internal_events::{EndpointBytesSent, SinkRequestBuildError},
76 sinks::prelude::*,
77 template::Template,
78};
79
80pub trait HttpEventEncoder<Output> {
81 fn encode_event(&mut self, event: Event) -> Option<Output>;
83}
84
85pub trait HttpSink: Send + Sync + 'static {
86 type Input;
87 type Output;
88 type Encoder: HttpEventEncoder<Self::Input>;
89
90 fn build_encoder(&self) -> Self::Encoder;
91 fn build_request(
92 &self,
93 events: Self::Output,
94 ) -> impl Future<Output = crate::Result<http::Request<Bytes>>> + Send;
95}
96
97#[pin_project]
113pub struct BatchedHttpSink<T, B, RL = HttpRetryLogic<<B as Batch>::Output>>
114where
115 B: Batch,
116 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
117 T: HttpSink<Input = B::Input, Output = B::Output>,
118 RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
119 + Send
120 + 'static,
121{
122 sink: Arc<T>,
123 #[pin]
124 inner: TowerBatchedSink<
125 HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
126 B,
127 RL,
128 >,
129 encoder: T::Encoder,
130 slot: Option<EncodedEvent<B::Input>>,
134}
135
136impl<T, B> BatchedHttpSink<T, B>
137where
138 B: Batch,
139 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
140 T: HttpSink<Input = B::Input, Output = B::Output>,
141{
142 pub fn new(
143 sink: T,
144 batch: B,
145 request_settings: TowerRequestSettings,
146 batch_timeout: Duration,
147 client: HttpClient,
148 ) -> Self {
149 Self::with_logic(
150 sink,
151 batch,
152 HttpRetryLogic::default(),
153 request_settings,
154 batch_timeout,
155 client,
156 )
157 }
158}
159
160impl<T, B, RL> BatchedHttpSink<T, B, RL>
161where
162 B: Batch,
163 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
164 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
165 + Send
166 + 'static,
167 T: HttpSink<Input = B::Input, Output = B::Output>,
168{
169 pub fn with_logic(
170 sink: T,
171 batch: B,
172 retry_logic: RL,
173 request_settings: TowerRequestSettings,
174 batch_timeout: Duration,
175 client: HttpClient,
176 ) -> Self {
177 let sink = Arc::new(sink);
178
179 let sink1 = Arc::clone(&sink);
180 let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
181 let sink = Arc::clone(&sink1);
182 Box::pin(async move { sink.build_request(b).await })
183 };
184
185 let svc = HttpBatchService::new(client, request_builder);
186 let inner = request_settings.batch_sink(retry_logic, svc, batch, batch_timeout);
187 let encoder = sink.build_encoder();
188
189 Self {
190 sink,
191 inner,
192 encoder,
193 slot: None,
194 }
195 }
196}
197
198impl<T, B, RL> Sink<Event> for BatchedHttpSink<T, B, RL>
199where
200 B: Batch,
201 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
202 T: HttpSink<Input = B::Input, Output = B::Output>,
203 RL: RetryLogic<Request = <B as Batch>::Output, Response = http::Response<Bytes>>
204 + Send
205 + 'static,
206{
207 type Error = crate::Error;
208
209 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
210 if self.slot.is_some() {
211 match self.as_mut().poll_flush(cx) {
212 Poll::Ready(Ok(())) => {}
213 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
214 Poll::Pending => {
215 if self.slot.is_some() {
216 return Poll::Pending;
217 }
218 }
219 }
220 }
221
222 Poll::Ready(Ok(()))
223 }
224
225 fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
226 let byte_size = event.size_of();
227 let json_byte_size = event.estimated_json_encoded_size_of();
228 let finalizers = event.metadata_mut().take_finalizers();
229 if let Some(item) = self.encoder.encode_event(event) {
230 *self.project().slot = Some(EncodedEvent {
231 item,
232 finalizers,
233 byte_size,
234 json_byte_size,
235 });
236 }
237
238 Ok(())
239 }
240
241 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
242 let mut this = self.project();
243 if this.slot.is_some() {
244 ready!(this.inner.as_mut().poll_ready(cx))?;
245 this.inner.as_mut().start_send(this.slot.take().unwrap())?;
246 }
247
248 this.inner.poll_flush(cx)
249 }
250
251 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
252 ready!(self.as_mut().poll_flush(cx))?;
253 self.project().inner.poll_close(cx)
254 }
255}
256
257#[pin_project]
259pub struct PartitionHttpSink<T, B, K, RL = HttpRetryLogic<<B as Batch>::Output>>
260where
261 B: Batch,
262 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
263 B::Input: Partition<K>,
264 K: Hash + Eq + Clone + Send + 'static,
265 T: HttpSink<Input = B::Input, Output = B::Output>,
266 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
267{
268 sink: Arc<T>,
269 #[pin]
270 inner: TowerPartitionSink<
271 HttpBatchService<BoxFuture<'static, crate::Result<hyper::Request<Bytes>>>, B::Output>,
272 B,
273 RL,
274 K,
275 >,
276 encoder: T::Encoder,
277 slot: Option<EncodedEvent<B::Input>>,
278}
279
280impl<T, B, K> PartitionHttpSink<T, B, K, HttpRetryLogic<<B as Batch>::Output>>
281where
282 B: Batch,
283 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
284 B::Input: Partition<K>,
285 K: Hash + Eq + Clone + Send + 'static,
286 T: HttpSink<Input = B::Input, Output = B::Output>,
287{
288 pub fn new(
289 sink: T,
290 batch: B,
291 request_settings: TowerRequestSettings,
292 batch_timeout: Duration,
293 client: HttpClient,
294 ) -> Self {
295 Self::with_retry_logic(
296 sink,
297 batch,
298 HttpRetryLogic::default(),
299 request_settings,
300 batch_timeout,
301 client,
302 )
303 }
304}
305
306impl<T, B, K, RL> PartitionHttpSink<T, B, K, RL>
307where
308 B: Batch,
309 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
310 B::Input: Partition<K>,
311 K: Hash + Eq + Clone + Send + 'static,
312 T: HttpSink<Input = B::Input, Output = B::Output>,
313 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>, Error = HttpError>
314 + Send
315 + 'static,
316{
317 pub fn with_retry_logic(
318 sink: T,
319 batch: B,
320 retry_logic: RL,
321 request_settings: TowerRequestSettings,
322 batch_timeout: Duration,
323 client: HttpClient,
324 ) -> Self {
325 let sink = Arc::new(sink);
326
327 let sink1 = Arc::clone(&sink);
328 let request_builder = move |b| -> BoxFuture<'static, crate::Result<http::Request<Bytes>>> {
329 let sink = Arc::clone(&sink1);
330 Box::pin(async move { sink.build_request(b).await })
331 };
332
333 let svc = HttpBatchService::new(client, request_builder);
334 let inner = request_settings.partition_sink(retry_logic, svc, batch, batch_timeout);
335 let encoder = sink.build_encoder();
336
337 Self {
338 sink,
339 inner,
340 encoder,
341 slot: None,
342 }
343 }
344
345 pub fn ordered(mut self) -> Self {
347 self.inner.ordered();
348 self
349 }
350}
351
352impl<T, B, K, RL> Sink<Event> for PartitionHttpSink<T, B, K, RL>
353where
354 B: Batch,
355 B::Output: ByteSizeOf + Clone + Sync + Send + 'static,
356 B::Input: Partition<K>,
357 K: Hash + Eq + Clone + Send + 'static,
358 T: HttpSink<Input = B::Input, Output = B::Output>,
359 RL: RetryLogic<Request = B::Output, Response = http::Response<Bytes>> + Send + 'static,
360{
361 type Error = crate::Error;
362
363 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
364 if self.slot.is_some() {
365 match self.as_mut().poll_flush(cx) {
366 Poll::Ready(Ok(())) => {}
367 Poll::Ready(Err(error)) => return Poll::Ready(Err(error)),
368 Poll::Pending => {
369 if self.slot.is_some() {
370 return Poll::Pending;
371 }
372 }
373 }
374 }
375
376 Poll::Ready(Ok(()))
377 }
378
379 fn start_send(mut self: Pin<&mut Self>, mut event: Event) -> Result<(), Self::Error> {
380 let finalizers = event.metadata_mut().take_finalizers();
381 let byte_size = event.size_of();
382 let json_byte_size = event.estimated_json_encoded_size_of();
383
384 if let Some(item) = self.encoder.encode_event(event) {
385 *self.project().slot = Some(EncodedEvent {
386 item,
387 finalizers,
388 byte_size,
389 json_byte_size,
390 });
391 }
392
393 Ok(())
394 }
395
396 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
397 let mut this = self.project();
398 if this.slot.is_some() {
399 ready!(this.inner.as_mut().poll_ready(cx))?;
400 this.inner.as_mut().start_send(this.slot.take().unwrap())?;
401 }
402
403 this.inner.poll_flush(cx)
404 }
405
406 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
407 ready!(self.as_mut().poll_flush(cx))?;
408 self.project().inner.poll_close(cx)
409 }
410}
411
412#[cfg(feature = "aws-core")]
413#[derive(Clone)]
414pub struct SigV4Config {
415 pub(crate) shared_credentials_provider: SharedCredentialsProvider,
416 pub(crate) region: Region,
417 pub(crate) service: String,
418}
419
420pub struct HttpBatchService<F, B = Bytes> {
427 inner: HttpClient<Body>,
428 request_builder: Arc<dyn Fn(B) -> F + Send + Sync>,
429 #[cfg(feature = "aws-core")]
430 sig_v4_config: Option<SigV4Config>,
431}
432
433impl<F, B> HttpBatchService<F, B> {
434 pub fn new(
435 inner: HttpClient,
436 request_builder: impl Fn(B) -> F + Send + Sync + 'static,
437 ) -> Self {
438 HttpBatchService {
439 inner,
440 request_builder: Arc::new(Box::new(request_builder)),
441 #[cfg(feature = "aws-core")]
442 sig_v4_config: None,
443 }
444 }
445
446 #[cfg(feature = "aws-core")]
447 pub fn new_with_sig_v4(
448 inner: HttpClient,
449 request_builder: impl Fn(B) -> F + Send + Sync + 'static,
450 sig_v4_config: SigV4Config,
451 ) -> Self {
452 HttpBatchService {
453 inner,
454 request_builder: Arc::new(Box::new(request_builder)),
455 sig_v4_config: Some(sig_v4_config),
456 }
457 }
458}
459
460impl<F, B> Service<B> for HttpBatchService<F, B>
461where
462 F: Future<Output = crate::Result<hyper::Request<Bytes>>> + Send + 'static,
463 B: ByteSizeOf + Send + 'static,
464{
465 type Response = http::Response<Bytes>;
466 type Error = crate::Error;
467 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
468
469 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
470 Poll::Ready(Ok(()))
471 }
472
473 fn call(&mut self, body: B) -> Self::Future {
474 let request_builder = Arc::clone(&self.request_builder);
475 #[cfg(feature = "aws-core")]
476 let sig_v4_config = self.sig_v4_config.clone();
477 let http_client = self.inner.clone();
478
479 Box::pin(async move {
480 let request = request_builder(body).await.inspect_err(|error| {
481 emit!(SinkRequestBuildError { error });
482 })?;
483
484 #[cfg(feature = "aws-core")]
485 let request = match sig_v4_config {
486 None => request,
487 Some(sig_v4_config) => {
488 let mut signed_request = request;
489 sign_request(
490 sig_v4_config.service.as_str(),
491 &mut signed_request,
492 &sig_v4_config.shared_credentials_provider,
493 Some(&sig_v4_config.region),
494 false,
495 )
496 .await?;
497
498 signed_request
499 }
500 };
501 let byte_size = request.body().len();
502 let request = request.map(Body::from);
503 let (protocol, endpoint) = uri::protocol_endpoint(request.uri().clone());
504
505 let mut decompression_service = ServiceBuilder::new()
506 .layer(DecompressionLayer::new())
507 .service(http_client);
508
509 let response = decompression_service.call(request).await?;
513
514 if response.status().is_success() {
515 emit!(EndpointBytesSent {
516 byte_size,
517 protocol: &protocol,
518 endpoint: &endpoint
519 });
520 }
521
522 let (parts, body) = response.into_parts();
523 let mut body = body.collect().await?.aggregate();
524 Ok(hyper::Response::from_parts(
525 parts,
526 body.copy_to_bytes(body.remaining()),
527 ))
528 })
529 }
530}
531
532impl<F, B> Clone for HttpBatchService<F, B> {
533 fn clone(&self) -> Self {
534 Self {
535 inner: self.inner.clone(),
536 request_builder: Arc::clone(&self.request_builder),
537 #[cfg(feature = "aws-core")]
538 sig_v4_config: self.sig_v4_config.clone(),
539 }
540 }
541}
542
543impl<T: fmt::Debug> sink::Response for http::Response<T> {
544 fn is_successful(&self) -> bool {
545 self.status().is_success()
546 }
547
548 fn is_transient(&self) -> bool {
549 self.status().is_server_error()
550 }
551}
552
553#[derive(Debug, Clone)]
554pub struct HttpRetryLogic<Req> {
555 request: PhantomData<Req>,
556}
557impl<Req> Default for HttpRetryLogic<Req> {
558 fn default() -> Self {
559 Self {
560 request: PhantomData,
561 }
562 }
563}
564
565impl<Req: Clone + Send + Sync + 'static> RetryLogic for HttpRetryLogic<Req> {
566 type Error = HttpError;
567 type Request = Req;
568 type Response = hyper::Response<Bytes>;
569
570 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
571 true
572 }
573
574 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
575 let status = response.status();
576
577 match status {
578 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
579 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
580 StatusCode::NOT_IMPLEMENTED => {
581 RetryAction::DontRetry("endpoint not implemented".into())
582 }
583 _ if status.is_server_error() => RetryAction::Retry(
584 format!("{}: {}", status, String::from_utf8_lossy(response.body())).into(),
585 ),
586 _ if status.is_success() => RetryAction::Successful,
587 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
588 }
589 }
590}
591
592#[derive(Debug)]
595pub struct HttpStatusRetryLogic<F, Req, Res> {
596 func: F,
597 request: PhantomData<Req>,
598 response: PhantomData<Res>,
599}
600
601impl<F, Req, Res> HttpStatusRetryLogic<F, Req, Res>
602where
603 F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
604 Req: Send + Sync + 'static,
605 Res: Send + Sync + 'static,
606{
607 pub const fn new(func: F) -> HttpStatusRetryLogic<F, Req, Res> {
608 HttpStatusRetryLogic {
609 func,
610 request: PhantomData,
611 response: PhantomData,
612 }
613 }
614}
615
616impl<F, Req, Res> RetryLogic for HttpStatusRetryLogic<F, Req, Res>
617where
618 F: Fn(&Res) -> StatusCode + Clone + Send + Sync + 'static,
619 Req: Send + Sync + 'static,
620 Res: Send + Sync + 'static,
621{
622 type Error = HttpError;
623 type Request = Req;
624 type Response = Res;
625
626 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
627 true
628 }
629
630 fn should_retry_response(&self, response: &Res) -> RetryAction<Req> {
631 let status = (self.func)(response);
632
633 match status {
634 StatusCode::TOO_MANY_REQUESTS => RetryAction::Retry("too many requests".into()),
635 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
636 StatusCode::NOT_IMPLEMENTED => {
637 RetryAction::DontRetry("endpoint not implemented".into())
638 }
639 _ if status.is_server_error() => {
640 RetryAction::Retry(format!("Http Status: {status}").into())
641 }
642 _ if status.is_success() => RetryAction::Successful,
643 _ => RetryAction::DontRetry(format!("Http status: {status}").into()),
644 }
645 }
646}
647
648impl<F, Req, Res> Clone for HttpStatusRetryLogic<F, Req, Res>
649where
650 F: Clone,
651{
652 fn clone(&self) -> Self {
653 Self {
654 func: self.func.clone(),
655 request: PhantomData,
656 response: PhantomData,
657 }
658 }
659}
660
661#[configurable_component]
663#[derive(Clone, Debug, Default)]
664pub struct RequestConfig {
665 #[serde(flatten)]
666 pub tower: TowerRequestConfig,
667
668 #[serde(default)]
670 #[configurable(metadata(
671 docs::additional_props_description = "An HTTP request header and its value. Both header names and values support templating with event data."
672 ))]
673 #[configurable(metadata(docs::examples = "headers_examples()"))]
674 pub headers: BTreeMap<String, String>,
675}
676
677fn headers_examples() -> BTreeMap<String, String> {
678 btreemap! {
679 "Accept" => "text/plain",
680 "X-My-Custom-Header" => "A-Value",
681 "X-Event-Level" => "{{level}}",
682 "X-Event-Timestamp" => "{{timestamp}}",
683 }
684}
685
686impl RequestConfig {
687 pub fn add_old_option(&mut self, headers: Option<BTreeMap<String, String>>) {
688 if let Some(headers) = headers {
689 warn!("Option `headers` has been deprecated. Use `request.headers` instead.");
690 self.headers.extend(headers);
691 }
692 }
693
694 pub fn split_headers(&self) -> (BTreeMap<String, String>, BTreeMap<String, Template>) {
695 let mut static_headers = BTreeMap::new();
696 let mut template_headers = BTreeMap::new();
697
698 for (name, value) in &self.headers {
699 match Template::try_from(value.as_str()) {
700 Ok(template) if !template.is_dynamic() => {
701 static_headers.insert(name.clone(), value.clone());
702 }
703 Ok(template) => {
704 template_headers.insert(name.clone(), template);
705 }
706 Err(_) => {
707 static_headers.insert(name.clone(), value.clone());
708 }
709 }
710 }
711
712 (static_headers, template_headers)
713 }
714}
715
716#[derive(Debug, Snafu)]
717pub enum HeaderValidationError {
718 #[snafu(display("{}: {}", source, name))]
719 InvalidHeaderName {
720 name: String,
721 source: header::InvalidHeaderName,
722 },
723 #[snafu(display("{}: {}", source, value))]
724 InvalidHeaderValue {
725 value: String,
726 source: header::InvalidHeaderValue,
727 },
728}
729
730pub fn validate_headers(
731 headers: &BTreeMap<String, String>,
732) -> crate::Result<BTreeMap<OrderedHeaderName, HeaderValue>> {
733 let mut validated_headers = BTreeMap::new();
734 for (name, value) in headers {
735 let name = HeaderName::from_bytes(name.as_bytes())
736 .with_context(|_| InvalidHeaderNameSnafu { name })?;
737 let value = HeaderValue::from_bytes(value.as_bytes())
738 .with_context(|_| InvalidHeaderValueSnafu { value })?;
739
740 validated_headers.insert(name.into(), value);
741 }
742
743 Ok(validated_headers)
744}
745
746#[derive(Debug, Clone)]
748pub struct HttpRequest<T: Send> {
749 payload: Bytes,
750 finalizers: EventFinalizers,
751 request_metadata: RequestMetadata,
752 additional_metadata: T,
753}
754
755impl<T: Send> HttpRequest<T> {
756 pub const fn new(
758 payload: Bytes,
759 finalizers: EventFinalizers,
760 request_metadata: RequestMetadata,
761 additional_metadata: T,
762 ) -> Self {
763 Self {
764 payload,
765 finalizers,
766 request_metadata,
767 additional_metadata,
768 }
769 }
770
771 pub const fn get_additional_metadata(&self) -> &T {
772 &self.additional_metadata
773 }
774
775 pub fn take_payload(&mut self) -> Bytes {
776 std::mem::take(&mut self.payload)
777 }
778}
779
780impl<T: Send> Finalizable for HttpRequest<T> {
781 fn take_finalizers(&mut self) -> EventFinalizers {
782 self.finalizers.take_finalizers()
783 }
784}
785
786impl<T: Send> MetaDescriptive for HttpRequest<T> {
787 fn get_metadata(&self) -> &RequestMetadata {
788 &self.request_metadata
789 }
790
791 fn metadata_mut(&mut self) -> &mut RequestMetadata {
792 &mut self.request_metadata
793 }
794}
795
796impl<T: Send> ByteSizeOf for HttpRequest<T> {
797 fn allocated_bytes(&self) -> usize {
798 self.payload.allocated_bytes() + self.finalizers.allocated_bytes()
799 }
800}
801
802pub struct HttpResponse {
804 pub http_response: Response<Bytes>,
805 pub events_byte_size: GroupedCountByteSize,
806 pub raw_byte_size: usize,
807}
808
809impl DriverResponse for HttpResponse {
810 fn event_status(&self) -> EventStatus {
811 if self.http_response.is_successful() {
812 EventStatus::Delivered
813 } else if self.http_response.is_transient() {
814 EventStatus::Errored
815 } else {
816 EventStatus::Rejected
817 }
818 }
819
820 fn events_sent(&self) -> &GroupedCountByteSize {
821 &self.events_byte_size
822 }
823
824 fn bytes_sent(&self) -> Option<usize> {
825 Some(self.raw_byte_size)
826 }
827}
828
829pub fn http_response_retry_logic<Request: Clone + Send + Sync + 'static>() -> HttpStatusRetryLogic<
831 impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static,
832 Request,
833 HttpResponse,
834> {
835 HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status())
836}
837
838#[derive(Default)]
840pub struct HttpJsonBatchSizer;
841
842impl ItemBatchSize<Event> for HttpJsonBatchSizer {
843 fn size(&self, item: &Event) -> usize {
844 item.estimated_json_encoded_size_of().get()
845 }
846}
847
848pub trait HttpServiceRequestBuilder<T: Send> {
850 fn build(&self, request: HttpRequest<T>) -> Result<Request<Bytes>, crate::Error>;
851}
852
853#[derive(Clone)]
855pub struct HttpService<B, T: Send> {
856 batch_service:
857 HttpBatchService<BoxFuture<'static, Result<Request<Bytes>, crate::Error>>, HttpRequest<T>>,
858 _phantom: PhantomData<B>,
859}
860
861impl<B, T: Send + 'static> HttpService<B, T>
862where
863 B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
864{
865 pub fn new(http_client: HttpClient<Body>, http_request_builder: B) -> Self {
866 let http_request_builder = Arc::new(http_request_builder);
867
868 let batch_service = HttpBatchService::new(http_client, move |req: HttpRequest<T>| {
869 let request_builder = Arc::clone(&http_request_builder);
870
871 let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
872 Box::pin(async move { request_builder.build(req) });
873
874 fut
875 });
876 Self {
877 batch_service,
878 _phantom: PhantomData,
879 }
880 }
881
882 #[cfg(feature = "aws-core")]
883 pub fn new_with_sig_v4(
884 http_client: HttpClient<Body>,
885 http_request_builder: B,
886 sig_v4_config: SigV4Config,
887 ) -> Self {
888 let http_request_builder = Arc::new(http_request_builder);
889
890 let batch_service = HttpBatchService::new_with_sig_v4(
891 http_client,
892 move |req: HttpRequest<T>| {
893 let request_builder = Arc::clone(&http_request_builder);
894
895 let fut: BoxFuture<'static, Result<http::Request<Bytes>, crate::Error>> =
896 Box::pin(async move { request_builder.build(req) });
897
898 fut
899 },
900 sig_v4_config,
901 );
902 Self {
903 batch_service,
904 _phantom: PhantomData,
905 }
906 }
907}
908
909impl<B, T: Send + 'static> Service<HttpRequest<T>> for HttpService<B, T>
910where
911 B: HttpServiceRequestBuilder<T> + std::marker::Sync + std::marker::Send + 'static,
912{
913 type Response = HttpResponse;
914 type Error = crate::Error;
915 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
916
917 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
918 Poll::Ready(Ok(()))
919 }
920
921 fn call(&mut self, mut request: HttpRequest<T>) -> Self::Future {
922 let mut http_service = self.batch_service.clone();
923
924 let metadata = std::mem::take(request.metadata_mut());
927 let raw_byte_size = metadata.request_encoded_size();
928 let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
929
930 Box::pin(async move {
931 let http_response = http_service.call(request).await?;
932
933 Ok(HttpResponse {
934 http_response,
935 events_byte_size,
936 raw_byte_size,
937 })
938 })
939 }
940}
941
942#[cfg(test)]
943mod test {
944 #![allow(clippy::print_stderr)] use futures::{StreamExt, future::ready};
947 use hyper::{
948 Response, Server, Uri,
949 service::{make_service_fn, service_fn},
950 };
951
952 use super::*;
953 use crate::{config::ProxyConfig, test_util::addr::next_addr};
954
955 #[test]
956 fn util_http_retry_logic() {
957 let logic = HttpRetryLogic::<()>::default();
958
959 let response_408 = Response::builder().status(408).body(Bytes::new()).unwrap();
960 let response_429 = Response::builder().status(429).body(Bytes::new()).unwrap();
961 let response_500 = Response::builder().status(500).body(Bytes::new()).unwrap();
962 let response_400 = Response::builder().status(400).body(Bytes::new()).unwrap();
963 let response_501 = Response::builder().status(501).body(Bytes::new()).unwrap();
964 assert!(logic.should_retry_response(&response_429).is_retryable());
965 assert!(logic.should_retry_response(&response_500).is_retryable());
966 assert!(logic.should_retry_response(&response_408).is_retryable());
967 assert!(
968 logic
969 .should_retry_response(&response_400)
970 .is_not_retryable()
971 );
972 assert!(
973 logic
974 .should_retry_response(&response_501)
975 .is_not_retryable()
976 );
977 }
978
979 #[tokio::test]
980 async fn util_http_it_makes_http_requests() {
981 let (_guard, addr) = next_addr();
982
983 let uri = format!("http://{}:{}/", addr.ip(), addr.port())
984 .parse::<Uri>()
985 .unwrap();
986
987 let request = Bytes::from("hello");
988 let proxy = ProxyConfig::default();
989 let client = HttpClient::new(None, &proxy).unwrap();
990 let mut service = HttpBatchService::new(client, move |body: Bytes| {
991 Box::pin(ready(
992 http::Request::post(&uri).body(body).map_err(Into::into),
993 ))
994 });
995
996 let (tx, rx) = futures::channel::mpsc::channel(10);
997
998 let new_service = make_service_fn(move |_| {
999 let tx = tx.clone();
1000
1001 let svc = service_fn(move |req: http::Request<Body>| {
1002 let mut tx = tx.clone();
1003
1004 async move {
1005 let mut body = http_body::Body::collect(req.into_body())
1006 .await
1007 .map_err(|error| format!("error: {error}"))?
1008 .aggregate();
1009 let string = String::from_utf8(body.copy_to_bytes(body.remaining()).to_vec())
1010 .map_err(|_| "Wasn't UTF-8".to_string())?;
1011 tx.try_send(string).map_err(|_| "Send error".to_string())?;
1012
1013 Ok::<_, crate::Error>(Response::new(Body::from("")))
1014 }
1015 });
1016
1017 async move { Ok::<_, std::convert::Infallible>(svc) }
1018 });
1019
1020 tokio::spawn(async move {
1021 if let Err(error) = Server::bind(&addr).serve(new_service).await {
1022 eprintln!("Server error: {error}");
1023 }
1024 });
1025
1026 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1027 service.call(request).await.unwrap();
1028
1029 let (body, _rest) = StreamExt::into_future(rx).await;
1030 assert_eq!(body.unwrap(), "hello");
1031 }
1032}