vector/sinks/appsignal/
sink.rs1use futures_util::future::ready;
2
3use crate::sinks::{prelude::*, util::buffer::metrics::MetricNormalizer};
4
5use super::{
6 encoder::AppsignalEncoder,
7 normalizer::AppsignalMetricsNormalizer,
8 request_builder::{AppsignalRequest, AppsignalRequestBuilder},
9};
10
11pub(super) struct AppsignalSink<S> {
12 pub(super) service: S,
13 pub(super) compression: Compression,
14 pub(super) transformer: Transformer,
15 pub(super) batch_settings: BatcherSettings,
16}
17
18impl<S> AppsignalSink<S>
19where
20 S: Service<AppsignalRequest> + Send + 'static,
21 S::Future: Send + 'static,
22 S::Response: DriverResponse + Send + 'static,
23 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
24{
25 pub(super) async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
26 let service = ServiceBuilder::new().service(self.service);
27 let mut normalizer = MetricNormalizer::<AppsignalMetricsNormalizer>::default();
28
29 input
30 .filter_map(move |event| {
31 ready(if let Event::Metric(metric) = event {
32 normalizer.normalize(metric).map(Event::Metric)
33 } else {
34 Some(event)
35 })
36 })
37 .batched(self.batch_settings.as_byte_size_config())
38 .request_builder(
39 default_request_builder_concurrency_limit(),
40 AppsignalRequestBuilder {
41 compression: self.compression,
42 encoder: AppsignalEncoder {
43 transformer: self.transformer.clone(),
44 },
45 },
46 )
47 .filter_map(|request| async move {
48 match request {
49 Err(error) => {
50 emit!(SinkRequestBuildError { error });
51 None
52 }
53 Ok(req) => Some(req),
54 }
55 })
56 .into_driver(service)
57 .run()
58 .await
59 }
60}
61
62#[async_trait::async_trait]
63impl<S> StreamSink<Event> for AppsignalSink<S>
64where
65 S: Service<AppsignalRequest> + Send + 'static,
66 S::Future: Send + 'static,
67 S::Response: DriverResponse + Send + 'static,
68 S::Error: std::fmt::Debug + Into<crate::Error> + Send,
69{
70 async fn run(
71 self: Box<Self>,
72 input: futures_util::stream::BoxStream<'_, Event>,
73 ) -> Result<(), ()> {
74 self.run_inner(input).await
75 }
76}