vector/sinks/new_relic/
sink.rs1use std::{fmt::Debug, sync::Arc};
2
3use async_trait::async_trait;
4use bytes::Bytes;
5
6use super::{NewRelicApiRequest, NewRelicCredentials, NewRelicEncoder};
7use crate::{
8 http::get_http_scheme_from_uri, internal_events::SinkRequestBuildError, sinks::prelude::*,
9};
10
11#[derive(Debug)]
12pub struct NewRelicSinkError {
13 message: String,
14}
15
16impl NewRelicSinkError {
17 pub fn new(msg: &str) -> Self {
18 NewRelicSinkError {
19 message: String::from(msg),
20 }
21 }
22
23 pub fn boxed(msg: &str) -> Box<Self> {
24 Box::new(NewRelicSinkError {
25 message: String::from(msg),
26 })
27 }
28}
29
30impl std::fmt::Display for NewRelicSinkError {
31 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
32 write!(f, "{}", self.message)
33 }
34}
35
36impl std::error::Error for NewRelicSinkError {
37 fn description(&self) -> &str {
38 &self.message
39 }
40}
41
42impl From<std::io::Error> for NewRelicSinkError {
43 fn from(error: std::io::Error) -> Self {
44 Self::new(&error.to_string())
45 }
46}
47
48impl From<NewRelicSinkError> for std::io::Error {
49 fn from(error: NewRelicSinkError) -> Self {
50 Self::other(error)
51 }
52}
53
54struct NewRelicRequestBuilder {
55 encoder: NewRelicEncoder,
56 compression: Compression,
57 credentials: Arc<NewRelicCredentials>,
58}
59
60impl RequestBuilder<Vec<Event>> for NewRelicRequestBuilder {
61 type Metadata = EventFinalizers;
62 type Events = Vec<Event>;
63 type Encoder = NewRelicEncoder;
64 type Payload = Bytes;
65 type Request = NewRelicApiRequest;
66 type Error = NewRelicSinkError;
67
68 fn compression(&self) -> Compression {
69 self.compression
70 }
71
72 fn encoder(&self) -> &Self::Encoder {
73 &self.encoder
74 }
75
76 fn split_input(
77 &self,
78 mut input: Vec<Event>,
79 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
80 let builder = RequestMetadataBuilder::from_events(&input);
81 let finalizers = input.take_finalizers();
82
83 (finalizers, builder, input)
84 }
85
86 fn build_request(
87 &self,
88 finalizers: Self::Metadata,
89 metadata: RequestMetadata,
90 payload: EncodeResult<Self::Payload>,
91 ) -> Self::Request {
92 NewRelicApiRequest {
93 metadata,
94 finalizers,
95 credentials: Arc::clone(&self.credentials),
96 payload: payload.into_payload(),
97 compression: self.compression,
98 }
99 }
100}
101
102pub struct NewRelicSink<S> {
103 pub service: S,
104 pub encoder: NewRelicEncoder,
105 pub credentials: Arc<NewRelicCredentials>,
106 pub compression: Compression,
107 pub batcher_settings: BatcherSettings,
108}
109
110impl<S> NewRelicSink<S>
111where
112 S: Service<NewRelicApiRequest> + Send + 'static,
113 S::Future: Send + 'static,
114 S::Response: DriverResponse + Send + 'static,
115 S::Error: Debug + Into<crate::Error> + Send,
116{
117 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
118 let request_builder = NewRelicRequestBuilder {
119 encoder: self.encoder,
120 compression: self.compression,
121 credentials: Arc::clone(&self.credentials),
122 };
123 let protocol = get_http_scheme_from_uri(&self.credentials.get_uri());
124
125 input
126 .batched(self.batcher_settings.as_byte_size_config())
127 .request_builder(default_request_builder_concurrency_limit(), request_builder)
128 .filter_map(
129 |request: Result<NewRelicApiRequest, NewRelicSinkError>| async move {
130 match request {
131 Err(error) => {
132 emit!(SinkRequestBuildError { error });
133 None
134 }
135 Ok(req) => Some(req),
136 }
137 },
138 )
139 .into_driver(self.service)
140 .protocol(protocol)
141 .run()
142 .await
143 }
144}
145
146#[async_trait]
147impl<S> StreamSink<Event> for NewRelicSink<S>
148where
149 S: Service<NewRelicApiRequest> + Send + 'static,
150 S::Future: Send + 'static,
151 S::Response: DriverResponse + Send + 'static,
152 S::Error: Debug + Into<crate::Error> + Send,
153{
154 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
155 self.run_inner(input).await
156 }
157}