vector/sinks/new_relic/
config.rs1use std::{fmt::Debug, sync::Arc};
2
3use http::Uri;
4use tower::ServiceBuilder;
5use vector_lib::sensitive_string::SensitiveString;
6
7use super::{
8 healthcheck, service::NewRelicApiRequest, NewRelicApiResponse, NewRelicApiService,
9 NewRelicEncoder, NewRelicSink, NewRelicSinkError,
10};
11
12use crate::{http::HttpClient, sinks::prelude::*};
13
14#[configurable_component]
16#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
17#[serde(rename_all = "snake_case")]
18#[derivative(Default)]
19pub enum NewRelicRegion {
20 #[derivative(Default)]
22 Us,
23
24 Eu,
26}
27
28#[configurable_component]
30#[derive(Clone, Copy, Derivative, Debug, Eq, PartialEq)]
31#[serde(rename_all = "snake_case")]
32#[derivative(Default)]
33pub enum NewRelicApi {
34 #[derivative(Default)]
36 Events,
37
38 Metrics,
40
41 Logs,
43}
44
45#[derive(Clone, Copy, Debug, Default)]
46pub struct NewRelicDefaultBatchSettings;
47
48impl SinkBatchSettings for NewRelicDefaultBatchSettings {
49 const MAX_EVENTS: Option<usize> = Some(100);
50 const MAX_BYTES: Option<usize> = Some(1_000_000);
51 const TIMEOUT_SECS: f64 = 1.0;
52}
53
54#[derive(Debug, Default, Clone)]
55pub struct NewRelicApiRetry;
56
57impl RetryLogic for NewRelicApiRetry {
58 type Error = NewRelicSinkError;
59 type Request = NewRelicApiRequest;
60 type Response = NewRelicApiResponse;
61
62 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
63 false
65 }
66}
67
68#[configurable_component(sink("new_relic", "Deliver events to New Relic."))]
70#[derive(Clone, Debug, Default)]
71#[serde(deny_unknown_fields)]
72pub struct NewRelicConfig {
73 #[configurable(metadata(docs::examples = "xxxx"))]
75 #[configurable(metadata(docs::examples = "${NEW_RELIC_LICENSE_KEY}"))]
76 pub license_key: SensitiveString,
77
78 #[configurable(metadata(docs::examples = "xxxx"))]
80 #[configurable(metadata(docs::examples = "${NEW_RELIC_ACCOUNT_KEY}"))]
81 pub account_id: SensitiveString,
82
83 #[configurable(derived)]
84 pub region: Option<NewRelicRegion>,
85
86 #[configurable(derived)]
87 pub api: NewRelicApi,
88
89 #[configurable(derived)]
90 #[serde(default = "Compression::gzip_default")]
91 pub compression: Compression,
92
93 #[configurable(derived)]
94 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
95 pub encoding: Transformer,
96
97 #[configurable(derived)]
98 #[serde(default)]
99 pub batch: BatchConfig<NewRelicDefaultBatchSettings>,
100
101 #[configurable(derived)]
102 #[serde(default)]
103 pub request: TowerRequestConfig,
104
105 #[configurable(derived)]
106 #[serde(
107 default,
108 deserialize_with = "crate::serde::bool_or_struct",
109 skip_serializing_if = "crate::serde::is_default"
110 )]
111 acknowledgements: AcknowledgementsConfig,
112
113 #[serde(skip)]
114 pub override_uri: Option<Uri>,
115}
116
117impl_generate_config_from_default!(NewRelicConfig);
118
119impl NewRelicConfig {
120 pub fn build_healthcheck(
121 &self,
122 client: HttpClient,
123 credentials: Arc<NewRelicCredentials>,
124 ) -> crate::Result<super::Healthcheck> {
125 Ok(healthcheck::healthcheck(client, credentials).boxed())
126 }
127}
128
129#[async_trait::async_trait]
130#[typetag::serde(name = "new_relic")]
131impl SinkConfig for NewRelicConfig {
132 async fn build(
133 &self,
134 cx: SinkContext,
135 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
136 let batcher_settings = self
137 .batch
138 .validate()?
139 .limit_max_events(self.batch.max_events.unwrap_or(100))?
140 .into_batcher_settings()?;
141
142 let request_limits = self.request.into_settings();
143 let tls_settings = TlsSettings::from_options(None)?;
144 let client = HttpClient::new(tls_settings, &cx.proxy)?;
145 let credentials = Arc::from(NewRelicCredentials::from(self));
146
147 let healthcheck = self.build_healthcheck(client.clone(), Arc::clone(&credentials))?;
148
149 let service = ServiceBuilder::new()
150 .settings(request_limits, NewRelicApiRetry)
151 .service(NewRelicApiService { client });
152
153 let sink = NewRelicSink {
154 service,
155 encoder: NewRelicEncoder {
156 transformer: self.encoding.clone(),
157 credentials: Arc::clone(&credentials),
158 },
159 credentials,
160 compression: self.compression,
161 batcher_settings,
162 };
163
164 Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
165 }
166
167 fn input(&self) -> Input {
168 Input::new(DataType::Log | DataType::Metric)
169 }
170
171 fn acknowledgements(&self) -> &AcknowledgementsConfig {
172 &self.acknowledgements
173 }
174}
175
176#[derive(Debug, Clone)]
177pub struct NewRelicCredentials {
178 pub license_key: String,
179 pub account_id: String,
180 pub api: NewRelicApi,
181 pub region: NewRelicRegion,
182 pub override_uri: Option<Uri>,
183}
184
185impl NewRelicCredentials {
186 pub fn get_uri(&self) -> Uri {
187 if let Some(override_uri) = self.override_uri.as_ref() {
188 return override_uri.clone();
189 }
190
191 match self.api {
192 NewRelicApi::Events => match self.region {
193 NewRelicRegion::Us => format!(
194 "https://insights-collector.newrelic.com/v1/accounts/{}/events",
195 self.account_id
196 )
197 .parse::<Uri>()
198 .unwrap(),
199 NewRelicRegion::Eu => format!(
200 "https://insights-collector.eu01.nr-data.net/v1/accounts/{}/events",
201 self.account_id
202 )
203 .parse::<Uri>()
204 .unwrap(),
205 },
206 NewRelicApi::Metrics => match self.region {
207 NewRelicRegion::Us => Uri::from_static("https://metric-api.newrelic.com/metric/v1"),
208 NewRelicRegion::Eu => {
209 Uri::from_static("https://metric-api.eu.newrelic.com/metric/v1")
210 }
211 },
212 NewRelicApi::Logs => match self.region {
213 NewRelicRegion::Us => Uri::from_static("https://log-api.newrelic.com/log/v1"),
214 NewRelicRegion::Eu => Uri::from_static("https://log-api.eu.newrelic.com/log/v1"),
215 },
216 }
217 }
218}
219
220impl From<&NewRelicConfig> for NewRelicCredentials {
221 fn from(config: &NewRelicConfig) -> Self {
222 Self {
223 license_key: config.license_key.inner().to_string(),
224 account_id: config.account_id.inner().to_string(),
225 api: config.api,
226 region: config.region.unwrap_or(NewRelicRegion::Us),
227 override_uri: config.override_uri.clone(),
228 }
229 }
230}