vector/sinks/honeycomb/
sink.rs1use super::request_builder::HoneycombRequestBuilder;
4use crate::sinks::{
5 prelude::*,
6 util::http::{HttpJsonBatchSizer, HttpRequest},
7};
8
9pub(super) struct HoneycombSink<S> {
10 service: S,
11 batch_settings: BatcherSettings,
12 request_builder: HoneycombRequestBuilder,
13}
14
15impl<S> HoneycombSink<S>
16where
17 S: Service<HttpRequest<()>> + Send + 'static,
18 S::Future: Send + 'static,
19 S::Response: DriverResponse + Send + 'static,
20 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
21{
22 pub(super) const fn new(
24 service: S,
25 batch_settings: BatcherSettings,
26 request_builder: HoneycombRequestBuilder,
27 ) -> Self {
28 Self {
29 service,
30 batch_settings,
31 request_builder,
32 }
33 }
34
35 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
36 input
37 .batched(self.batch_settings.as_item_size_config(HttpJsonBatchSizer))
39 .request_builder(
41 default_request_builder_concurrency_limit(),
42 self.request_builder,
43 )
44 .filter_map(|request| async move {
46 match request {
47 Err(error) => {
48 emit!(SinkRequestBuildError { error });
49 None
50 }
51 Ok(req) => Some(req),
52 }
53 })
54 .into_driver(self.service)
57 .run()
58 .await
59 }
60}
61
62#[async_trait::async_trait]
63impl<S> StreamSink<Event> for HoneycombSink<S>
64where
65 S: Service<HttpRequest<()>> + Send + 'static,
66 S::Future: Send + 'static,
67 S::Response: DriverResponse + Send + 'static,
68 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
69{
70 async fn run(
71 self: Box<Self>,
72 input: futures_util::stream::BoxStream<'_, Event>,
73 ) -> Result<(), ()> {
74 self.run_inner(input).await
75 }
76}