vector/sinks/http/
sink.rs1use crate::sinks::{prelude::*, util::http::HttpRequest};
4use std::collections::BTreeMap;
5
6use super::{batch::HttpBatchSizer, request_builder::HttpRequestBuilder};
7
8pub(super) struct HttpSink<S> {
9 service: S,
10 uri: Template,
11 headers: BTreeMap<String, Template>,
12 batch_settings: BatcherSettings,
13 request_builder: HttpRequestBuilder,
14}
15
16impl<S> HttpSink<S>
17where
18 S: Service<HttpRequest<PartitionKey>> + Send + 'static,
19 S::Future: Send + 'static,
20 S::Response: DriverResponse + Send + 'static,
21 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
22{
23 pub(super) const fn new(
25 service: S,
26 uri: Template,
27 headers: BTreeMap<String, Template>,
28 batch_settings: BatcherSettings,
29 request_builder: HttpRequestBuilder,
30 ) -> Self {
31 Self {
32 service,
33 uri,
34 headers,
35 batch_settings,
36 request_builder,
37 }
38 }
39
40 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41 let batch_sizer = HttpBatchSizer {
42 encoder: self.request_builder.encoder.encoder.clone(),
43 };
44 input
45 .batched_partitioned(KeyPartitioner::new(self.uri, self.headers), || {
47 self.batch_settings.as_item_size_config(batch_sizer.clone())
48 })
49 .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
50 .request_builder(
52 default_request_builder_concurrency_limit(),
53 self.request_builder,
54 )
55 .filter_map(|request| async move {
57 match request {
58 Err(error) => {
59 emit!(SinkRequestBuildError { error });
60 None
61 }
62 Ok(req) => Some(req),
63 }
64 })
65 .into_driver(self.service)
68 .run()
69 .await
70 }
71}
72
73#[async_trait::async_trait]
74impl<S> StreamSink<Event> for HttpSink<S>
75where
76 S: Service<HttpRequest<PartitionKey>> + Send + 'static,
77 S::Future: Send + 'static,
78 S::Response: DriverResponse + Send + 'static,
79 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
80{
81 async fn run(
82 self: Box<Self>,
83 input: futures_util::stream::BoxStream<'_, Event>,
84 ) -> Result<(), ()> {
85 self.run_inner(input).await
86 }
87}
88
89#[derive(Eq, PartialEq, Clone, Debug, Hash)]
90pub struct PartitionKey {
91 pub uri: String,
92 pub headers: BTreeMap<String, String>,
93}
94
95struct KeyPartitioner {
96 uri: Template,
97 headers: BTreeMap<String, Template>,
98}
99
100impl KeyPartitioner {
101 const fn new(uri: Template, headers: BTreeMap<String, Template>) -> Self {
102 Self { uri, headers }
103 }
104}
105
106impl Partitioner for KeyPartitioner {
107 type Item = Event;
108 type Key = Option<PartitionKey>;
109
110 fn partition(&self, event: &Event) -> Self::Key {
111 let uri = self
112 .uri
113 .render_string(event)
114 .map_err(|error| {
115 emit!(TemplateRenderingError {
116 error,
117 field: Some("uri"),
118 drop_event: true,
119 });
120 })
121 .ok()?;
122
123 let mut headers = BTreeMap::new();
124 for (name, template) in &self.headers {
125 let value = template
126 .render_string(event)
127 .map_err(|error| {
128 emit!(TemplateRenderingError {
129 error,
130 field: Some("headers"),
131 drop_event: true,
132 });
133 })
134 .ok()?;
135 headers.insert(name.clone(), value);
136 }
137
138 Some(PartitionKey { uri, headers })
139 }
140}