vector/sinks/http/
sink.rs

1//! Implementation of the `http` sink.
2
3use crate::sinks::{prelude::*, util::http::HttpRequest};
4use std::collections::BTreeMap;
5
6use super::{batch::HttpBatchSizer, request_builder::HttpRequestBuilder};
7
8pub(super) struct HttpSink<S> {
9    service: S,
10    uri: Template,
11    headers: BTreeMap<String, Template>,
12    batch_settings: BatcherSettings,
13    request_builder: HttpRequestBuilder,
14}
15
16impl<S> HttpSink<S>
17where
18    S: Service<HttpRequest<PartitionKey>> + Send + 'static,
19    S::Future: Send + 'static,
20    S::Response: DriverResponse + Send + 'static,
21    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
22{
23    /// Creates a new `HttpSink`.
24    pub(super) const fn new(
25        service: S,
26        uri: Template,
27        headers: BTreeMap<String, Template>,
28        batch_settings: BatcherSettings,
29        request_builder: HttpRequestBuilder,
30    ) -> Self {
31        Self {
32            service,
33            uri,
34            headers,
35            batch_settings,
36            request_builder,
37        }
38    }
39
40    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41        let batch_sizer = HttpBatchSizer {
42            encoder: self.request_builder.encoder.encoder.clone(),
43        };
44        input
45            // Batch the input stream with size calculation based on the configured codec
46            .batched_partitioned(KeyPartitioner::new(self.uri, self.headers), || {
47                self.batch_settings.as_item_size_config(batch_sizer.clone())
48            })
49            .filter_map(|(key, batch)| async move { key.map(move |k| (k, batch)) })
50            // Build requests with default concurrency limit.
51            .request_builder(
52                default_request_builder_concurrency_limit(),
53                self.request_builder,
54            )
55            // Filter out any errors that occurred in the request building.
56            .filter_map(|request| async move {
57                match request {
58                    Err(error) => {
59                        emit!(SinkRequestBuildError { error });
60                        None
61                    }
62                    Ok(req) => Some(req),
63                }
64            })
65            // Generate the driver that will send requests and handle retries,
66            // event finalization, and logging/internal metric reporting.
67            .into_driver(self.service)
68            .run()
69            .await
70    }
71}
72
73#[async_trait::async_trait]
74impl<S> StreamSink<Event> for HttpSink<S>
75where
76    S: Service<HttpRequest<PartitionKey>> + Send + 'static,
77    S::Future: Send + 'static,
78    S::Response: DriverResponse + Send + 'static,
79    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
80{
81    async fn run(
82        self: Box<Self>,
83        input: futures_util::stream::BoxStream<'_, Event>,
84    ) -> Result<(), ()> {
85        self.run_inner(input).await
86    }
87}
88
89#[derive(Eq, PartialEq, Clone, Debug, Hash)]
90pub struct PartitionKey {
91    pub uri: String,
92    pub headers: BTreeMap<String, String>,
93}
94
95struct KeyPartitioner {
96    uri: Template,
97    headers: BTreeMap<String, Template>,
98}
99
100impl KeyPartitioner {
101    const fn new(uri: Template, headers: BTreeMap<String, Template>) -> Self {
102        Self { uri, headers }
103    }
104}
105
106impl Partitioner for KeyPartitioner {
107    type Item = Event;
108    type Key = Option<PartitionKey>;
109
110    fn partition(&self, event: &Event) -> Self::Key {
111        let uri = self
112            .uri
113            .render_string(event)
114            .map_err(|error| {
115                emit!(TemplateRenderingError {
116                    error,
117                    field: Some("uri"),
118                    drop_event: true,
119                });
120            })
121            .ok()?;
122
123        let mut headers = BTreeMap::new();
124        for (name, template) in &self.headers {
125            let value = template
126                .render_string(event)
127                .map_err(|error| {
128                    emit!(TemplateRenderingError {
129                        error,
130                        field: Some("headers"),
131                        drop_event: true,
132                    });
133                })
134                .ok()?;
135            headers.insert(name.clone(), value);
136        }
137
138        Some(PartitionKey { uri, headers })
139    }
140}