vector/sinks/honeycomb/
sink.rs1use crate::sinks::{
4 prelude::*,
5 util::http::{HttpJsonBatchSizer, HttpRequest},
6};
7
8use super::request_builder::HoneycombRequestBuilder;
9
10pub(super) struct HoneycombSink<S> {
11 service: S,
12 batch_settings: BatcherSettings,
13 request_builder: HoneycombRequestBuilder,
14}
15
16impl<S> HoneycombSink<S>
17where
18 S: Service<HttpRequest<()>> + Send + 'static,
19 S::Future: Send + 'static,
20 S::Response: DriverResponse + Send + 'static,
21 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
22{
23 pub(super) const fn new(
25 service: S,
26 batch_settings: BatcherSettings,
27 request_builder: HoneycombRequestBuilder,
28 ) -> Self {
29 Self {
30 service,
31 batch_settings,
32 request_builder,
33 }
34 }
35
36 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
37 input
38 .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer))
40 .request_builder(
42 default_request_builder_concurrency_limit(),
43 self.request_builder,
44 )
45 .filter_map(|request| async move {
47 match request {
48 Err(error) => {
49 emit!(SinkRequestBuildError { error });
50 None
51 }
52 Ok(req) => Some(req),
53 }
54 })
55 .into_driver(self.service)
58 .run()
59 .await
60 }
61}
62
63#[async_trait::async_trait]
64impl<S> StreamSink<Event> for HoneycombSink<S>
65where
66 S: Service<HttpRequest<()>> + Send + 'static,
67 S::Future: Send + 'static,
68 S::Response: DriverResponse + Send + 'static,
69 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
70{
71 async fn run(
72 self: Box<Self>,
73 input: futures_util::stream::BoxStream<'_, Event>,
74 ) -> Result<(), ()> {
75 self.run_inner(input).await
76 }
77}