vector/sinks/honeycomb/
sink.rs

1//! Implementation of the `honeycomb` sink.
2
3use crate::sinks::{
4    prelude::*,
5    util::http::{HttpJsonBatchSizer, HttpRequest},
6};
7
8use super::request_builder::HoneycombRequestBuilder;
9
10pub(super) struct HoneycombSink<S> {
11    service: S,
12    batch_settings: BatcherSettings,
13    request_builder: HoneycombRequestBuilder,
14}
15
16impl<S> HoneycombSink<S>
17where
18    S: Service<HttpRequest<()>> + Send + 'static,
19    S::Future: Send + 'static,
20    S::Response: DriverResponse + Send + 'static,
21    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
22{
23    /// Creates a new `HoneycombSink`.
24    pub(super) const fn new(
25        service: S,
26        batch_settings: BatcherSettings,
27        request_builder: HoneycombRequestBuilder,
28    ) -> Self {
29        Self {
30            service,
31            batch_settings,
32            request_builder,
33        }
34    }
35
36    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
37        input
38            // Batch the input stream with size calculation based on the estimated encoded json size
39            .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer))
40            // Build requests with default concurrency limit.
41            .request_builder(
42                default_request_builder_concurrency_limit(),
43                self.request_builder,
44            )
45            // Filter out any errors that occurred in the request building.
46            .filter_map(|request| async move {
47                match request {
48                    Err(error) => {
49                        emit!(SinkRequestBuildError { error });
50                        None
51                    }
52                    Ok(req) => Some(req),
53                }
54            })
55            // Generate the driver that will send requests and handle retries,
56            // event finalization, and logging/internal metric reporting.
57            .into_driver(self.service)
58            .run()
59            .await
60    }
61}
62
63#[async_trait::async_trait]
64impl<S> StreamSink<Event> for HoneycombSink<S>
65where
66    S: Service<HttpRequest<()>> + Send + 'static,
67    S::Future: Send + 'static,
68    S::Response: DriverResponse + Send + 'static,
69    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
70{
71    async fn run(
72        self: Box<Self>,
73        input: futures_util::stream::BoxStream<'_, Event>,
74    ) -> Result<(), ()> {
75        self.run_inner(input).await
76    }
77}