vector/sinks/new_relic/
sink.rs

1use std::{fmt::Debug, sync::Arc};
2
3use async_trait::async_trait;
4use bytes::Bytes;
5
6use super::{NewRelicApiRequest, NewRelicCredentials, NewRelicEncoder};
7use crate::{
8    http::get_http_scheme_from_uri, internal_events::SinkRequestBuildError, sinks::prelude::*,
9};
10
11#[derive(Debug)]
12pub struct NewRelicSinkError {
13    message: String,
14}
15
16impl NewRelicSinkError {
17    pub fn new(msg: &str) -> Self {
18        NewRelicSinkError {
19            message: String::from(msg),
20        }
21    }
22
23    pub fn boxed(msg: &str) -> Box<Self> {
24        Box::new(NewRelicSinkError {
25            message: String::from(msg),
26        })
27    }
28}
29
30impl std::fmt::Display for NewRelicSinkError {
31    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
32        write!(f, "{}", self.message)
33    }
34}
35
36impl std::error::Error for NewRelicSinkError {
37    fn description(&self) -> &str {
38        &self.message
39    }
40}
41
42impl From<std::io::Error> for NewRelicSinkError {
43    fn from(error: std::io::Error) -> Self {
44        Self::new(&error.to_string())
45    }
46}
47
48impl From<NewRelicSinkError> for std::io::Error {
49    fn from(error: NewRelicSinkError) -> Self {
50        Self::other(error)
51    }
52}
53
54struct NewRelicRequestBuilder {
55    encoder: NewRelicEncoder,
56    compression: Compression,
57    credentials: Arc<NewRelicCredentials>,
58}
59
60impl RequestBuilder<Vec<Event>> for NewRelicRequestBuilder {
61    type Metadata = EventFinalizers;
62    type Events = Vec<Event>;
63    type Encoder = NewRelicEncoder;
64    type Payload = Bytes;
65    type Request = NewRelicApiRequest;
66    type Error = NewRelicSinkError;
67
68    fn compression(&self) -> Compression {
69        self.compression
70    }
71
72    fn encoder(&self) -> &Self::Encoder {
73        &self.encoder
74    }
75
76    fn split_input(
77        &self,
78        mut input: Vec<Event>,
79    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
80        let builder = RequestMetadataBuilder::from_events(&input);
81        let finalizers = input.take_finalizers();
82
83        (finalizers, builder, input)
84    }
85
86    fn build_request(
87        &self,
88        finalizers: Self::Metadata,
89        metadata: RequestMetadata,
90        payload: EncodeResult<Self::Payload>,
91    ) -> Self::Request {
92        NewRelicApiRequest {
93            metadata,
94            finalizers,
95            credentials: Arc::clone(&self.credentials),
96            payload: payload.into_payload(),
97            compression: self.compression,
98        }
99    }
100}
101
102pub struct NewRelicSink<S> {
103    pub service: S,
104    pub encoder: NewRelicEncoder,
105    pub credentials: Arc<NewRelicCredentials>,
106    pub compression: Compression,
107    pub batcher_settings: BatcherSettings,
108}
109
110impl<S> NewRelicSink<S>
111where
112    S: Service<NewRelicApiRequest> + Send + 'static,
113    S::Future: Send + 'static,
114    S::Response: DriverResponse + Send + 'static,
115    S::Error: Debug + Into<crate::Error> + Send,
116{
117    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
118        let request_builder = NewRelicRequestBuilder {
119            encoder: self.encoder,
120            compression: self.compression,
121            credentials: Arc::clone(&self.credentials),
122        };
123        let protocol = get_http_scheme_from_uri(&self.credentials.get_uri());
124
125        input
126            .batched(self.batcher_settings.as_byte_size_config())
127            .request_builder(default_request_builder_concurrency_limit(), request_builder)
128            .filter_map(
129                |request: Result<NewRelicApiRequest, NewRelicSinkError>| async move {
130                    match request {
131                        Err(error) => {
132                            emit!(SinkRequestBuildError { error });
133                            None
134                        }
135                        Ok(req) => Some(req),
136                    }
137                },
138            )
139            .into_driver(self.service)
140            .protocol(protocol)
141            .run()
142            .await
143    }
144}
145
146#[async_trait]
147impl<S> StreamSink<Event> for NewRelicSink<S>
148where
149    S: Service<NewRelicApiRequest> + Send + 'static,
150    S::Future: Send + 'static,
151    S::Response: DriverResponse + Send + 'static,
152    S::Error: Debug + Into<crate::Error> + Send,
153{
154    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
155        self.run_inner(input).await
156    }
157}