vector/sinks/new_relic/
config.rs

1use std::{fmt::Debug, sync::Arc};
2
3use http::Uri;
4use tower::ServiceBuilder;
5use vector_lib::sensitive_string::SensitiveString;
6
7use super::{
8    healthcheck, service::NewRelicApiRequest, NewRelicApiResponse, NewRelicApiService,
9    NewRelicEncoder, NewRelicSink, NewRelicSinkError,
10};
11
12use crate::{http::HttpClient, sinks::prelude::*};
13
14/// New Relic region.
15#[configurable_component]
16#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
17#[serde(rename_all = "snake_case")]
18#[derivative(Default)]
19pub enum NewRelicRegion {
20    /// US region.
21    #[derivative(Default)]
22    Us,
23
24    /// EU region.
25    Eu,
26}
27
28/// New Relic API endpoint.
29#[configurable_component]
30#[derive(Clone, Copy, Derivative, Debug, Eq, PartialEq)]
31#[serde(rename_all = "snake_case")]
32#[derivative(Default)]
33pub enum NewRelicApi {
34    /// Events API.
35    #[derivative(Default)]
36    Events,
37
38    /// Metrics API.
39    Metrics,
40
41    /// Logs API.
42    Logs,
43}
44
45#[derive(Clone, Copy, Debug, Default)]
46pub struct NewRelicDefaultBatchSettings;
47
48impl SinkBatchSettings for NewRelicDefaultBatchSettings {
49    const MAX_EVENTS: Option<usize> = Some(100);
50    const MAX_BYTES: Option<usize> = Some(1_000_000);
51    const TIMEOUT_SECS: f64 = 1.0;
52}
53
54#[derive(Debug, Default, Clone)]
55pub struct NewRelicApiRetry;
56
57impl RetryLogic for NewRelicApiRetry {
58    type Error = NewRelicSinkError;
59    type Request = NewRelicApiRequest;
60    type Response = NewRelicApiResponse;
61
62    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
63        // Never retry.
64        false
65    }
66}
67
68/// Configuration for the `new_relic` sink.
69#[configurable_component(sink("new_relic", "Deliver events to New Relic."))]
70#[derive(Clone, Debug, Default)]
71#[serde(deny_unknown_fields)]
72pub struct NewRelicConfig {
73    /// A valid New Relic license key.
74    #[configurable(metadata(docs::examples = "xxxx"))]
75    #[configurable(metadata(docs::examples = "${NEW_RELIC_LICENSE_KEY}"))]
76    pub license_key: SensitiveString,
77
78    /// The New Relic account ID.
79    #[configurable(metadata(docs::examples = "xxxx"))]
80    #[configurable(metadata(docs::examples = "${NEW_RELIC_ACCOUNT_KEY}"))]
81    pub account_id: SensitiveString,
82
83    #[configurable(derived)]
84    pub region: Option<NewRelicRegion>,
85
86    #[configurable(derived)]
87    pub api: NewRelicApi,
88
89    #[configurable(derived)]
90    #[serde(default = "Compression::gzip_default")]
91    pub compression: Compression,
92
93    #[configurable(derived)]
94    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
95    pub encoding: Transformer,
96
97    #[configurable(derived)]
98    #[serde(default)]
99    pub batch: BatchConfig<NewRelicDefaultBatchSettings>,
100
101    #[configurable(derived)]
102    #[serde(default)]
103    pub request: TowerRequestConfig,
104
105    #[configurable(derived)]
106    #[serde(
107        default,
108        deserialize_with = "crate::serde::bool_or_struct",
109        skip_serializing_if = "crate::serde::is_default"
110    )]
111    acknowledgements: AcknowledgementsConfig,
112
113    #[serde(skip)]
114    pub override_uri: Option<Uri>,
115}
116
117impl_generate_config_from_default!(NewRelicConfig);
118
119impl NewRelicConfig {
120    pub fn build_healthcheck(
121        &self,
122        client: HttpClient,
123        credentials: Arc<NewRelicCredentials>,
124    ) -> crate::Result<super::Healthcheck> {
125        Ok(healthcheck::healthcheck(client, credentials).boxed())
126    }
127}
128
129#[async_trait::async_trait]
130#[typetag::serde(name = "new_relic")]
131impl SinkConfig for NewRelicConfig {
132    async fn build(
133        &self,
134        cx: SinkContext,
135    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
136        let batcher_settings = self
137            .batch
138            .validate()?
139            .limit_max_events(self.batch.max_events.unwrap_or(100))?
140            .into_batcher_settings()?;
141
142        let request_limits = self.request.into_settings();
143        let tls_settings = TlsSettings::from_options(None)?;
144        let client = HttpClient::new(tls_settings, &cx.proxy)?;
145        let credentials = Arc::from(NewRelicCredentials::from(self));
146
147        let healthcheck = self.build_healthcheck(client.clone(), Arc::clone(&credentials))?;
148
149        let service = ServiceBuilder::new()
150            .settings(request_limits, NewRelicApiRetry)
151            .service(NewRelicApiService { client });
152
153        let sink = NewRelicSink {
154            service,
155            encoder: NewRelicEncoder {
156                transformer: self.encoding.clone(),
157                credentials: Arc::clone(&credentials),
158            },
159            credentials,
160            compression: self.compression,
161            batcher_settings,
162        };
163
164        Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
165    }
166
167    fn input(&self) -> Input {
168        Input::new(DataType::Log | DataType::Metric)
169    }
170
171    fn acknowledgements(&self) -> &AcknowledgementsConfig {
172        &self.acknowledgements
173    }
174}
175
176#[derive(Debug, Clone)]
177pub struct NewRelicCredentials {
178    pub license_key: String,
179    pub account_id: String,
180    pub api: NewRelicApi,
181    pub region: NewRelicRegion,
182    pub override_uri: Option<Uri>,
183}
184
185impl NewRelicCredentials {
186    pub fn get_uri(&self) -> Uri {
187        if let Some(override_uri) = self.override_uri.as_ref() {
188            return override_uri.clone();
189        }
190
191        match self.api {
192            NewRelicApi::Events => match self.region {
193                NewRelicRegion::Us => format!(
194                    "https://insights-collector.newrelic.com/v1/accounts/{}/events",
195                    self.account_id
196                )
197                .parse::<Uri>()
198                .unwrap(),
199                NewRelicRegion::Eu => format!(
200                    "https://insights-collector.eu01.nr-data.net/v1/accounts/{}/events",
201                    self.account_id
202                )
203                .parse::<Uri>()
204                .unwrap(),
205            },
206            NewRelicApi::Metrics => match self.region {
207                NewRelicRegion::Us => Uri::from_static("https://metric-api.newrelic.com/metric/v1"),
208                NewRelicRegion::Eu => {
209                    Uri::from_static("https://metric-api.eu.newrelic.com/metric/v1")
210                }
211            },
212            NewRelicApi::Logs => match self.region {
213                NewRelicRegion::Us => Uri::from_static("https://log-api.newrelic.com/log/v1"),
214                NewRelicRegion::Eu => Uri::from_static("https://log-api.eu.newrelic.com/log/v1"),
215            },
216        }
217    }
218}
219
220impl From<&NewRelicConfig> for NewRelicCredentials {
221    fn from(config: &NewRelicConfig) -> Self {
222        Self {
223            license_key: config.license_key.inner().to_string(),
224            account_id: config.account_id.inner().to_string(),
225            api: config.api,
226            region: config.region.unwrap_or(NewRelicRegion::Us),
227            override_uri: config.override_uri.clone(),
228        }
229    }
230}