vector/sinks/new_relic/
config.rs1use std::{fmt::Debug, sync::Arc};
2
3use http::Uri;
4use tower::ServiceBuilder;
5use vector_lib::sensitive_string::SensitiveString;
6
7use super::{
8 NewRelicApiResponse, NewRelicApiService, NewRelicEncoder, NewRelicSink, NewRelicSinkError,
9 healthcheck, service::NewRelicApiRequest,
10};
11use crate::{http::HttpClient, sinks::prelude::*};
12
13#[configurable_component]
15#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
16#[serde(rename_all = "snake_case")]
17#[derivative(Default)]
18pub enum NewRelicRegion {
19 #[derivative(Default)]
21 Us,
22
23 Eu,
25}
26
27#[configurable_component]
29#[derive(Clone, Copy, Derivative, Debug, Eq, PartialEq)]
30#[serde(rename_all = "snake_case")]
31#[derivative(Default)]
32pub enum NewRelicApi {
33 #[derivative(Default)]
35 Events,
36
37 Metrics,
39
40 Logs,
42}
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct NewRelicDefaultBatchSettings;
46
47impl SinkBatchSettings for NewRelicDefaultBatchSettings {
48 const MAX_EVENTS: Option<usize> = Some(100);
49 const MAX_BYTES: Option<usize> = Some(1_000_000);
50 const TIMEOUT_SECS: f64 = 1.0;
51}
52
53#[derive(Debug, Default, Clone)]
54pub struct NewRelicApiRetry;
55
56impl RetryLogic for NewRelicApiRetry {
57 type Error = NewRelicSinkError;
58 type Request = NewRelicApiRequest;
59 type Response = NewRelicApiResponse;
60
61 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
62 false
64 }
65}
66
67#[configurable_component(sink("new_relic", "Deliver events to New Relic."))]
69#[derive(Clone, Debug, Default)]
70#[serde(deny_unknown_fields)]
71pub struct NewRelicConfig {
72 #[configurable(metadata(docs::examples = "xxxx"))]
74 #[configurable(metadata(docs::examples = "${NEW_RELIC_LICENSE_KEY}"))]
75 pub license_key: SensitiveString,
76
77 #[configurable(metadata(docs::examples = "xxxx"))]
79 #[configurable(metadata(docs::examples = "${NEW_RELIC_ACCOUNT_KEY}"))]
80 pub account_id: SensitiveString,
81
82 #[configurable(derived)]
83 pub region: Option<NewRelicRegion>,
84
85 #[configurable(derived)]
86 pub api: NewRelicApi,
87
88 #[configurable(derived)]
89 #[serde(default = "Compression::gzip_default")]
90 pub compression: Compression,
91
92 #[configurable(derived)]
93 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
94 pub encoding: Transformer,
95
96 #[configurable(derived)]
97 #[serde(default)]
98 pub batch: BatchConfig<NewRelicDefaultBatchSettings>,
99
100 #[configurable(derived)]
101 #[serde(default)]
102 pub request: TowerRequestConfig,
103
104 #[configurable(derived)]
105 #[serde(
106 default,
107 deserialize_with = "crate::serde::bool_or_struct",
108 skip_serializing_if = "crate::serde::is_default"
109 )]
110 acknowledgements: AcknowledgementsConfig,
111
112 #[serde(skip)]
113 pub override_uri: Option<Uri>,
114}
115
116impl_generate_config_from_default!(NewRelicConfig);
117
118impl NewRelicConfig {
119 pub fn build_healthcheck(
120 &self,
121 client: HttpClient,
122 credentials: Arc<NewRelicCredentials>,
123 ) -> crate::Result<super::Healthcheck> {
124 Ok(healthcheck::healthcheck(client, credentials).boxed())
125 }
126}
127
128#[async_trait::async_trait]
129#[typetag::serde(name = "new_relic")]
130impl SinkConfig for NewRelicConfig {
131 async fn build(
132 &self,
133 cx: SinkContext,
134 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
135 let batcher_settings = self
136 .batch
137 .validate()?
138 .limit_max_events(self.batch.max_events.unwrap_or(100))?
139 .into_batcher_settings()?;
140
141 let request_limits = self.request.into_settings();
142 let tls_settings = TlsSettings::from_options(None)?;
143 let client = HttpClient::new(tls_settings, &cx.proxy)?;
144 let credentials = Arc::from(NewRelicCredentials::from(self));
145
146 let healthcheck = self.build_healthcheck(client.clone(), Arc::clone(&credentials))?;
147
148 let service = ServiceBuilder::new()
149 .settings(request_limits, NewRelicApiRetry)
150 .service(NewRelicApiService { client });
151
152 let sink = NewRelicSink {
153 service,
154 encoder: NewRelicEncoder {
155 transformer: self.encoding.clone(),
156 credentials: Arc::clone(&credentials),
157 },
158 credentials,
159 compression: self.compression,
160 batcher_settings,
161 };
162
163 Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
164 }
165
166 fn input(&self) -> Input {
167 Input::new(DataType::Log | DataType::Metric)
168 }
169
170 fn acknowledgements(&self) -> &AcknowledgementsConfig {
171 &self.acknowledgements
172 }
173}
174
175#[derive(Debug, Clone)]
176pub struct NewRelicCredentials {
177 pub license_key: String,
178 pub account_id: String,
179 pub api: NewRelicApi,
180 pub region: NewRelicRegion,
181 pub override_uri: Option<Uri>,
182}
183
184impl NewRelicCredentials {
185 pub fn get_uri(&self) -> Uri {
186 if let Some(override_uri) = self.override_uri.as_ref() {
187 return override_uri.clone();
188 }
189
190 match self.api {
191 NewRelicApi::Events => match self.region {
192 NewRelicRegion::Us => format!(
193 "https://insights-collector.newrelic.com/v1/accounts/{}/events",
194 self.account_id
195 )
196 .parse::<Uri>()
197 .unwrap(),
198 NewRelicRegion::Eu => format!(
199 "https://insights-collector.eu01.nr-data.net/v1/accounts/{}/events",
200 self.account_id
201 )
202 .parse::<Uri>()
203 .unwrap(),
204 },
205 NewRelicApi::Metrics => match self.region {
206 NewRelicRegion::Us => Uri::from_static("https://metric-api.newrelic.com/metric/v1"),
207 NewRelicRegion::Eu => {
208 Uri::from_static("https://metric-api.eu.newrelic.com/metric/v1")
209 }
210 },
211 NewRelicApi::Logs => match self.region {
212 NewRelicRegion::Us => Uri::from_static("https://log-api.newrelic.com/log/v1"),
213 NewRelicRegion::Eu => Uri::from_static("https://log-api.eu.newrelic.com/log/v1"),
214 },
215 }
216 }
217}
218
219impl From<&NewRelicConfig> for NewRelicCredentials {
220 fn from(config: &NewRelicConfig) -> Self {
221 Self {
222 license_key: config.license_key.inner().to_string(),
223 account_id: config.account_id.inner().to_string(),
224 api: config.api,
225 region: config.region.unwrap_or(NewRelicRegion::Us),
226 override_uri: config.override_uri.clone(),
227 }
228 }
229}