vector/sinks/new_relic/
config.rs

1use std::{fmt::Debug, sync::Arc};
2
3use http::Uri;
4use tower::ServiceBuilder;
5use vector_lib::sensitive_string::SensitiveString;
6
7use super::{
8    NewRelicApiResponse, NewRelicApiService, NewRelicEncoder, NewRelicSink, NewRelicSinkError,
9    healthcheck, service::NewRelicApiRequest,
10};
11use crate::{http::HttpClient, sinks::prelude::*};
12
13/// New Relic region.
14#[configurable_component]
15#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
16#[serde(rename_all = "snake_case")]
17#[derivative(Default)]
18pub enum NewRelicRegion {
19    /// US region.
20    #[derivative(Default)]
21    Us,
22
23    /// EU region.
24    Eu,
25}
26
27/// New Relic API endpoint.
28#[configurable_component]
29#[derive(Clone, Copy, Derivative, Debug, Eq, PartialEq)]
30#[serde(rename_all = "snake_case")]
31#[derivative(Default)]
32pub enum NewRelicApi {
33    /// Events API.
34    #[derivative(Default)]
35    Events,
36
37    /// Metrics API.
38    Metrics,
39
40    /// Logs API.
41    Logs,
42}
43
44#[derive(Clone, Copy, Debug, Default)]
45pub struct NewRelicDefaultBatchSettings;
46
47impl SinkBatchSettings for NewRelicDefaultBatchSettings {
48    const MAX_EVENTS: Option<usize> = Some(100);
49    const MAX_BYTES: Option<usize> = Some(1_000_000);
50    const TIMEOUT_SECS: f64 = 1.0;
51}
52
53#[derive(Debug, Default, Clone)]
54pub struct NewRelicApiRetry;
55
56impl RetryLogic for NewRelicApiRetry {
57    type Error = NewRelicSinkError;
58    type Request = NewRelicApiRequest;
59    type Response = NewRelicApiResponse;
60
61    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
62        // Never retry.
63        false
64    }
65}
66
67/// Configuration for the `new_relic` sink.
68#[configurable_component(sink("new_relic", "Deliver events to New Relic."))]
69#[derive(Clone, Debug, Default)]
70#[serde(deny_unknown_fields)]
71pub struct NewRelicConfig {
72    /// A valid New Relic license key.
73    #[configurable(metadata(docs::examples = "xxxx"))]
74    #[configurable(metadata(docs::examples = "${NEW_RELIC_LICENSE_KEY}"))]
75    pub license_key: SensitiveString,
76
77    /// The New Relic account ID.
78    #[configurable(metadata(docs::examples = "xxxx"))]
79    #[configurable(metadata(docs::examples = "${NEW_RELIC_ACCOUNT_KEY}"))]
80    pub account_id: SensitiveString,
81
82    #[configurable(derived)]
83    pub region: Option<NewRelicRegion>,
84
85    #[configurable(derived)]
86    pub api: NewRelicApi,
87
88    #[configurable(derived)]
89    #[serde(default = "Compression::gzip_default")]
90    pub compression: Compression,
91
92    #[configurable(derived)]
93    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
94    pub encoding: Transformer,
95
96    #[configurable(derived)]
97    #[serde(default)]
98    pub batch: BatchConfig<NewRelicDefaultBatchSettings>,
99
100    #[configurable(derived)]
101    #[serde(default)]
102    pub request: TowerRequestConfig,
103
104    #[configurable(derived)]
105    #[serde(
106        default,
107        deserialize_with = "crate::serde::bool_or_struct",
108        skip_serializing_if = "crate::serde::is_default"
109    )]
110    acknowledgements: AcknowledgementsConfig,
111
112    #[serde(skip)]
113    pub override_uri: Option<Uri>,
114}
115
116impl_generate_config_from_default!(NewRelicConfig);
117
118impl NewRelicConfig {
119    pub fn build_healthcheck(
120        &self,
121        client: HttpClient,
122        credentials: Arc<NewRelicCredentials>,
123    ) -> crate::Result<super::Healthcheck> {
124        Ok(healthcheck::healthcheck(client, credentials).boxed())
125    }
126}
127
128#[async_trait::async_trait]
129#[typetag::serde(name = "new_relic")]
130impl SinkConfig for NewRelicConfig {
131    async fn build(
132        &self,
133        cx: SinkContext,
134    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
135        let batcher_settings = self
136            .batch
137            .validate()?
138            .limit_max_events(self.batch.max_events.unwrap_or(100))?
139            .into_batcher_settings()?;
140
141        let request_limits = self.request.into_settings();
142        let tls_settings = TlsSettings::from_options(None)?;
143        let client = HttpClient::new(tls_settings, &cx.proxy)?;
144        let credentials = Arc::from(NewRelicCredentials::from(self));
145
146        let healthcheck = self.build_healthcheck(client.clone(), Arc::clone(&credentials))?;
147
148        let service = ServiceBuilder::new()
149            .settings(request_limits, NewRelicApiRetry)
150            .service(NewRelicApiService { client });
151
152        let sink = NewRelicSink {
153            service,
154            encoder: NewRelicEncoder {
155                transformer: self.encoding.clone(),
156                credentials: Arc::clone(&credentials),
157            },
158            credentials,
159            compression: self.compression,
160            batcher_settings,
161        };
162
163        Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
164    }
165
166    fn input(&self) -> Input {
167        Input::new(DataType::Log | DataType::Metric)
168    }
169
170    fn acknowledgements(&self) -> &AcknowledgementsConfig {
171        &self.acknowledgements
172    }
173}
174
175#[derive(Debug, Clone)]
176pub struct NewRelicCredentials {
177    pub license_key: String,
178    pub account_id: String,
179    pub api: NewRelicApi,
180    pub region: NewRelicRegion,
181    pub override_uri: Option<Uri>,
182}
183
184impl NewRelicCredentials {
185    pub fn get_uri(&self) -> Uri {
186        if let Some(override_uri) = self.override_uri.as_ref() {
187            return override_uri.clone();
188        }
189
190        match self.api {
191            NewRelicApi::Events => match self.region {
192                NewRelicRegion::Us => format!(
193                    "https://insights-collector.newrelic.com/v1/accounts/{}/events",
194                    self.account_id
195                )
196                .parse::<Uri>()
197                .unwrap(),
198                NewRelicRegion::Eu => format!(
199                    "https://insights-collector.eu01.nr-data.net/v1/accounts/{}/events",
200                    self.account_id
201                )
202                .parse::<Uri>()
203                .unwrap(),
204            },
205            NewRelicApi::Metrics => match self.region {
206                NewRelicRegion::Us => Uri::from_static("https://metric-api.newrelic.com/metric/v1"),
207                NewRelicRegion::Eu => {
208                    Uri::from_static("https://metric-api.eu.newrelic.com/metric/v1")
209                }
210            },
211            NewRelicApi::Logs => match self.region {
212                NewRelicRegion::Us => Uri::from_static("https://log-api.newrelic.com/log/v1"),
213                NewRelicRegion::Eu => Uri::from_static("https://log-api.eu.newrelic.com/log/v1"),
214            },
215        }
216    }
217}
218
219impl From<&NewRelicConfig> for NewRelicCredentials {
220    fn from(config: &NewRelicConfig) -> Self {
221        Self {
222            license_key: config.license_key.inner().to_string(),
223            account_id: config.account_id.inner().to_string(),
224            api: config.api,
225            region: config.region.unwrap_or(NewRelicRegion::Us),
226            override_uri: config.override_uri.clone(),
227        }
228    }
229}