vector/sinks/appsignal/
sink.rs

1use futures_util::future::ready;
2
3use crate::sinks::{prelude::*, util::buffer::metrics::MetricNormalizer};
4
5use super::{
6    encoder::AppsignalEncoder,
7    normalizer::AppsignalMetricsNormalizer,
8    request_builder::{AppsignalRequest, AppsignalRequestBuilder},
9};
10
11pub(super) struct AppsignalSink<S> {
12    pub(super) service: S,
13    pub(super) compression: Compression,
14    pub(super) transformer: Transformer,
15    pub(super) batch_settings: BatcherSettings,
16}
17
18impl<S> AppsignalSink<S>
19where
20    S: Service<AppsignalRequest> + Send + 'static,
21    S::Future: Send + 'static,
22    S::Response: DriverResponse + Send + 'static,
23    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
24{
25    pub(super) async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
26        let service = ServiceBuilder::new().service(self.service);
27        let mut normalizer = MetricNormalizer::<AppsignalMetricsNormalizer>::default();
28
29        input
30            .filter_map(move |event| {
31                ready(if let Event::Metric(metric) = event {
32                    normalizer.normalize(metric).map(Event::Metric)
33                } else {
34                    Some(event)
35                })
36            })
37            .batched(self.batch_settings.as_byte_size_config())
38            .request_builder(
39                default_request_builder_concurrency_limit(),
40                AppsignalRequestBuilder {
41                    compression: self.compression,
42                    encoder: AppsignalEncoder {
43                        transformer: self.transformer.clone(),
44                    },
45                },
46            )
47            .filter_map(|request| async move {
48                match request {
49                    Err(error) => {
50                        emit!(SinkRequestBuildError { error });
51                        None
52                    }
53                    Ok(req) => Some(req),
54                }
55            })
56            .into_driver(service)
57            .run()
58            .await
59    }
60}
61
62#[async_trait::async_trait]
63impl<S> StreamSink<Event> for AppsignalSink<S>
64where
65    S: Service<AppsignalRequest> + Send + 'static,
66    S::Future: Send + 'static,
67    S::Response: DriverResponse + Send + 'static,
68    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
69{
70    async fn run(
71        self: Box<Self>,
72        input: futures_util::stream::BoxStream<'_, Event>,
73    ) -> Result<(), ()> {
74        self.run_inner(input).await
75    }
76}