vector/sinks/honeycomb/
sink.rs

1//! Implementation of the `honeycomb` sink.
2
3use super::request_builder::HoneycombRequestBuilder;
4use crate::sinks::{
5    prelude::*,
6    util::http::{HttpJsonBatchSizer, HttpRequest},
7};
8
9pub(super) struct HoneycombSink<S> {
10    service: S,
11    batch_settings: BatcherSettings,
12    request_builder: HoneycombRequestBuilder,
13}
14
15impl<S> HoneycombSink<S>
16where
17    S: Service<HttpRequest<()>> + Send + 'static,
18    S::Future: Send + 'static,
19    S::Response: DriverResponse + Send + 'static,
20    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
21{
22    /// Creates a new `HoneycombSink`.
23    pub(super) const fn new(
24        service: S,
25        batch_settings: BatcherSettings,
26        request_builder: HoneycombRequestBuilder,
27    ) -> Self {
28        Self {
29            service,
30            batch_settings,
31            request_builder,
32        }
33    }
34
35    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
36        input
37            // Batch the input stream with size calculation based on the estimated encoded json size
38            .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer))
39            // Build requests with default concurrency limit.
40            .request_builder(
41                default_request_builder_concurrency_limit(),
42                self.request_builder,
43            )
44            // Filter out any errors that occurred in the request building.
45            .filter_map(|request| async move {
46                match request {
47                    Err(error) => {
48                        emit!(SinkRequestBuildError { error });
49                        None
50                    }
51                    Ok(req) => Some(req),
52                }
53            })
54            // Generate the driver that will send requests and handle retries,
55            // event finalization, and logging/internal metric reporting.
56            .into_driver(self.service)
57            .run()
58            .await
59    }
60}
61
62#[async_trait::async_trait]
63impl<S> StreamSink<Event> for HoneycombSink<S>
64where
65    S: Service<HttpRequest<()>> + Send + 'static,
66    S::Future: Send + 'static,
67    S::Response: DriverResponse + Send + 'static,
68    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
69{
70    async fn run(
71        self: Box<Self>,
72        input: futures_util::stream::BoxStream<'_, Event>,
73    ) -> Result<(), ()> {
74        self.run_inner(input).await
75    }
76}