vector/sinks/appsignal/
sink.rs

1use futures_util::future::ready;
2
3use super::{
4    encoder::AppsignalEncoder,
5    normalizer::AppsignalMetricsNormalizer,
6    request_builder::{AppsignalRequest, AppsignalRequestBuilder},
7};
8use crate::sinks::{prelude::*, util::buffer::metrics::MetricNormalizer};
9
10pub(super) struct AppsignalSink<S> {
11    pub(super) service: S,
12    pub(super) compression: Compression,
13    pub(super) transformer: Transformer,
14    pub(super) batch_settings: BatcherSettings,
15}
16
17impl<S> AppsignalSink<S>
18where
19    S: Service<AppsignalRequest> + Send + 'static,
20    S::Future: Send + 'static,
21    S::Response: DriverResponse + Send + 'static,
22    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
23{
24    pub(super) async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
25        let service = ServiceBuilder::new().service(self.service);
26        let mut normalizer = MetricNormalizer::<AppsignalMetricsNormalizer>::default();
27
28        input
29            .filter_map(move |event| {
30                ready(if let Event::Metric(metric) = event {
31                    normalizer.normalize(metric).map(Event::Metric)
32                } else {
33                    Some(event)
34                })
35            })
36            .batched(self.batch_settings.as_byte_size_config())
37            .request_builder(
38                default_request_builder_concurrency_limit(),
39                AppsignalRequestBuilder {
40                    compression: self.compression,
41                    encoder: AppsignalEncoder {
42                        transformer: self.transformer.clone(),
43                    },
44                },
45            )
46            .filter_map(|request| async move {
47                match request {
48                    Err(error) => {
49                        emit!(SinkRequestBuildError { error });
50                        None
51                    }
52                    Ok(req) => Some(req),
53                }
54            })
55            .into_driver(service)
56            .run()
57            .await
58    }
59}
60
61#[async_trait::async_trait]
62impl<S> StreamSink<Event> for AppsignalSink<S>
63where
64    S: Service<AppsignalRequest> + Send + 'static,
65    S::Future: Send + 'static,
66    S::Response: DriverResponse + Send + 'static,
67    S::Error: std::fmt::Debug + Into<crate::Error> + Send,
68{
69    async fn run(
70        self: Box<Self>,
71        input: futures_util::stream::BoxStream<'_, Event>,
72    ) -> Result<(), ()> {
73        self.run_inner(input).await
74    }
75}