vector/sinks/new_relic/
config.rs1use std::{fmt::Debug, sync::Arc};
2
3use http::Uri;
4use tower::ServiceBuilder;
5use vector_lib::sensitive_string::SensitiveString;
6
7use super::{
8 NewRelicApiResponse, NewRelicApiService, NewRelicEncoder, NewRelicSink, NewRelicSinkError,
9 healthcheck, service::NewRelicApiRequest,
10};
11use crate::{http::HttpClient, sinks::prelude::*};
12
13#[configurable_component]
15#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
16#[serde(rename_all = "snake_case")]
17pub enum NewRelicRegion {
18 #[default]
20 Us,
21
22 Eu,
24}
25
26#[configurable_component]
28#[derive(Clone, Copy, Default, Debug, Eq, PartialEq)]
29#[serde(rename_all = "snake_case")]
30pub enum NewRelicApi {
31 #[default]
33 Events,
34
35 Metrics,
37
38 Logs,
40}
41
42#[derive(Clone, Copy, Debug, Default)]
43pub struct NewRelicDefaultBatchSettings;
44
45impl SinkBatchSettings for NewRelicDefaultBatchSettings {
46 const MAX_EVENTS: Option<usize> = Some(100);
47 const MAX_BYTES: Option<usize> = Some(1_000_000);
48 const TIMEOUT_SECS: f64 = 1.0;
49}
50
51#[derive(Debug, Default, Clone)]
52pub struct NewRelicApiRetry;
53
54impl RetryLogic for NewRelicApiRetry {
55 type Error = NewRelicSinkError;
56 type Request = NewRelicApiRequest;
57 type Response = NewRelicApiResponse;
58
59 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
60 false
62 }
63}
64
65#[configurable_component(sink("new_relic", "Deliver events to New Relic."))]
67#[derive(Clone, Debug, Default)]
68#[serde(deny_unknown_fields)]
69pub struct NewRelicConfig {
70 #[configurable(metadata(docs::examples = "xxxx"))]
72 #[configurable(metadata(docs::examples = "${NEW_RELIC_LICENSE_KEY}"))]
73 pub license_key: SensitiveString,
74
75 #[configurable(metadata(docs::examples = "xxxx"))]
77 #[configurable(metadata(docs::examples = "${NEW_RELIC_ACCOUNT_KEY}"))]
78 pub account_id: SensitiveString,
79
80 #[configurable(derived)]
81 pub region: Option<NewRelicRegion>,
82
83 #[configurable(derived)]
84 pub api: NewRelicApi,
85
86 #[configurable(derived)]
87 #[serde(default = "Compression::gzip_default")]
88 pub compression: Compression,
89
90 #[configurable(derived)]
91 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
92 pub encoding: Transformer,
93
94 #[configurable(derived)]
95 #[serde(default)]
96 pub batch: BatchConfig<NewRelicDefaultBatchSettings>,
97
98 #[configurable(derived)]
99 #[serde(default)]
100 pub request: TowerRequestConfig,
101
102 #[configurable(derived)]
103 #[serde(
104 default,
105 deserialize_with = "crate::serde::bool_or_struct",
106 skip_serializing_if = "crate::serde::is_default"
107 )]
108 acknowledgements: AcknowledgementsConfig,
109
110 #[serde(skip)]
111 pub override_uri: Option<Uri>,
112}
113
114impl_generate_config_from_default!(NewRelicConfig);
115
116impl NewRelicConfig {
117 pub fn build_healthcheck(
118 &self,
119 client: HttpClient,
120 credentials: Arc<NewRelicCredentials>,
121 ) -> crate::Result<super::Healthcheck> {
122 Ok(healthcheck::healthcheck(client, credentials).boxed())
123 }
124}
125
126#[async_trait::async_trait]
127#[typetag::serde(name = "new_relic")]
128impl SinkConfig for NewRelicConfig {
129 async fn build(
130 &self,
131 cx: SinkContext,
132 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
133 let batcher_settings = self
134 .batch
135 .validate()?
136 .limit_max_events(self.batch.max_events.unwrap_or(100))?
137 .into_batcher_settings()?;
138
139 let request_limits = self.request.into_settings();
140 let tls_settings = TlsSettings::from_options(None)?;
141 let client = HttpClient::new(tls_settings, &cx.proxy)?;
142 let credentials = Arc::from(NewRelicCredentials::from(self));
143
144 let healthcheck = self.build_healthcheck(client.clone(), Arc::clone(&credentials))?;
145
146 let service = ServiceBuilder::new()
147 .settings(request_limits, NewRelicApiRetry)
148 .service(NewRelicApiService { client });
149
150 let sink = NewRelicSink {
151 service,
152 encoder: NewRelicEncoder {
153 transformer: self.encoding.clone(),
154 credentials: Arc::clone(&credentials),
155 },
156 credentials,
157 compression: self.compression,
158 batcher_settings,
159 };
160
161 Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
162 }
163
164 fn input(&self) -> Input {
165 Input::new(DataType::Log | DataType::Metric)
166 }
167
168 fn acknowledgements(&self) -> &AcknowledgementsConfig {
169 &self.acknowledgements
170 }
171}
172
173#[derive(Debug, Clone)]
174pub struct NewRelicCredentials {
175 pub license_key: String,
176 pub account_id: String,
177 pub api: NewRelicApi,
178 pub region: NewRelicRegion,
179 pub override_uri: Option<Uri>,
180}
181
182impl NewRelicCredentials {
183 pub fn get_uri(&self) -> Uri {
184 if let Some(override_uri) = self.override_uri.as_ref() {
185 return override_uri.clone();
186 }
187
188 match self.api {
189 NewRelicApi::Events => match self.region {
190 NewRelicRegion::Us => format!(
191 "https://insights-collector.newrelic.com/v1/accounts/{}/events",
192 self.account_id
193 )
194 .parse::<Uri>()
195 .unwrap(),
196 NewRelicRegion::Eu => format!(
197 "https://insights-collector.eu01.nr-data.net/v1/accounts/{}/events",
198 self.account_id
199 )
200 .parse::<Uri>()
201 .unwrap(),
202 },
203 NewRelicApi::Metrics => match self.region {
204 NewRelicRegion::Us => Uri::from_static("https://metric-api.newrelic.com/metric/v1"),
205 NewRelicRegion::Eu => {
206 Uri::from_static("https://metric-api.eu.newrelic.com/metric/v1")
207 }
208 },
209 NewRelicApi::Logs => match self.region {
210 NewRelicRegion::Us => Uri::from_static("https://log-api.newrelic.com/log/v1"),
211 NewRelicRegion::Eu => Uri::from_static("https://log-api.eu.newrelic.com/log/v1"),
212 },
213 }
214 }
215}
216
217impl From<&NewRelicConfig> for NewRelicCredentials {
218 fn from(config: &NewRelicConfig) -> Self {
219 Self {
220 license_key: config.license_key.inner().to_string(),
221 account_id: config.account_id.inner().to_string(),
222 api: config.api,
223 region: config.region.unwrap_or(NewRelicRegion::Us),
224 override_uri: config.override_uri.clone(),
225 }
226 }
227}