vector/sinks/new_relic/
config.rs

1use std::{fmt::Debug, sync::Arc};
2
3use http::Uri;
4use tower::ServiceBuilder;
5use vector_lib::sensitive_string::SensitiveString;
6
7use super::{
8    NewRelicApiResponse, NewRelicApiService, NewRelicEncoder, NewRelicSink, NewRelicSinkError,
9    healthcheck, service::NewRelicApiRequest,
10};
11use crate::{http::HttpClient, sinks::prelude::*};
12
13/// New Relic region.
14#[configurable_component]
15#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
16#[serde(rename_all = "snake_case")]
17pub enum NewRelicRegion {
18    /// US region.
19    #[default]
20    Us,
21
22    /// EU region.
23    Eu,
24}
25
26/// New Relic API endpoint.
27#[configurable_component]
28#[derive(Clone, Copy, Default, Debug, Eq, PartialEq)]
29#[serde(rename_all = "snake_case")]
30pub enum NewRelicApi {
31    /// Events API.
32    #[default]
33    Events,
34
35    /// Metrics API.
36    Metrics,
37
38    /// Logs API.
39    Logs,
40}
41
42#[derive(Clone, Copy, Debug, Default)]
43pub struct NewRelicDefaultBatchSettings;
44
45impl SinkBatchSettings for NewRelicDefaultBatchSettings {
46    const MAX_EVENTS: Option<usize> = Some(100);
47    const MAX_BYTES: Option<usize> = Some(1_000_000);
48    const TIMEOUT_SECS: f64 = 1.0;
49}
50
51#[derive(Debug, Default, Clone)]
52pub struct NewRelicApiRetry;
53
54impl RetryLogic for NewRelicApiRetry {
55    type Error = NewRelicSinkError;
56    type Request = NewRelicApiRequest;
57    type Response = NewRelicApiResponse;
58
59    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
60        // Never retry.
61        false
62    }
63}
64
65/// Configuration for the `new_relic` sink.
66#[configurable_component(sink("new_relic", "Deliver events to New Relic."))]
67#[derive(Clone, Debug, Default)]
68#[serde(deny_unknown_fields)]
69pub struct NewRelicConfig {
70    /// A valid New Relic license key.
71    #[configurable(metadata(docs::examples = "xxxx"))]
72    #[configurable(metadata(docs::examples = "${NEW_RELIC_LICENSE_KEY}"))]
73    pub license_key: SensitiveString,
74
75    /// The New Relic account ID.
76    #[configurable(metadata(docs::examples = "xxxx"))]
77    #[configurable(metadata(docs::examples = "${NEW_RELIC_ACCOUNT_KEY}"))]
78    pub account_id: SensitiveString,
79
80    #[configurable(derived)]
81    pub region: Option<NewRelicRegion>,
82
83    #[configurable(derived)]
84    pub api: NewRelicApi,
85
86    #[configurable(derived)]
87    #[serde(default = "Compression::gzip_default")]
88    pub compression: Compression,
89
90    #[configurable(derived)]
91    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
92    pub encoding: Transformer,
93
94    #[configurable(derived)]
95    #[serde(default)]
96    pub batch: BatchConfig<NewRelicDefaultBatchSettings>,
97
98    #[configurable(derived)]
99    #[serde(default)]
100    pub request: TowerRequestConfig,
101
102    #[configurable(derived)]
103    #[serde(
104        default,
105        deserialize_with = "crate::serde::bool_or_struct",
106        skip_serializing_if = "crate::serde::is_default"
107    )]
108    acknowledgements: AcknowledgementsConfig,
109
110    #[serde(skip)]
111    pub override_uri: Option<Uri>,
112}
113
114impl_generate_config_from_default!(NewRelicConfig);
115
116impl NewRelicConfig {
117    pub fn build_healthcheck(
118        &self,
119        client: HttpClient,
120        credentials: Arc<NewRelicCredentials>,
121    ) -> crate::Result<super::Healthcheck> {
122        Ok(healthcheck::healthcheck(client, credentials).boxed())
123    }
124}
125
126#[async_trait::async_trait]
127#[typetag::serde(name = "new_relic")]
128impl SinkConfig for NewRelicConfig {
129    async fn build(
130        &self,
131        cx: SinkContext,
132    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
133        let batcher_settings = self
134            .batch
135            .validate()?
136            .limit_max_events(self.batch.max_events.unwrap_or(100))?
137            .into_batcher_settings()?;
138
139        let request_limits = self.request.into_settings();
140        let tls_settings = TlsSettings::from_options(None)?;
141        let client = HttpClient::new(tls_settings, &cx.proxy)?;
142        let credentials = Arc::from(NewRelicCredentials::from(self));
143
144        let healthcheck = self.build_healthcheck(client.clone(), Arc::clone(&credentials))?;
145
146        let service = ServiceBuilder::new()
147            .settings(request_limits, NewRelicApiRetry)
148            .service(NewRelicApiService { client });
149
150        let sink = NewRelicSink {
151            service,
152            encoder: NewRelicEncoder {
153                transformer: self.encoding.clone(),
154                credentials: Arc::clone(&credentials),
155            },
156            credentials,
157            compression: self.compression,
158            batcher_settings,
159        };
160
161        Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
162    }
163
164    fn input(&self) -> Input {
165        Input::new(DataType::Log | DataType::Metric)
166    }
167
168    fn acknowledgements(&self) -> &AcknowledgementsConfig {
169        &self.acknowledgements
170    }
171}
172
173#[derive(Debug, Clone)]
174pub struct NewRelicCredentials {
175    pub license_key: String,
176    pub account_id: String,
177    pub api: NewRelicApi,
178    pub region: NewRelicRegion,
179    pub override_uri: Option<Uri>,
180}
181
182impl NewRelicCredentials {
183    pub fn get_uri(&self) -> Uri {
184        if let Some(override_uri) = self.override_uri.as_ref() {
185            return override_uri.clone();
186        }
187
188        match self.api {
189            NewRelicApi::Events => match self.region {
190                NewRelicRegion::Us => format!(
191                    "https://insights-collector.newrelic.com/v1/accounts/{}/events",
192                    self.account_id
193                )
194                .parse::<Uri>()
195                .unwrap(),
196                NewRelicRegion::Eu => format!(
197                    "https://insights-collector.eu01.nr-data.net/v1/accounts/{}/events",
198                    self.account_id
199                )
200                .parse::<Uri>()
201                .unwrap(),
202            },
203            NewRelicApi::Metrics => match self.region {
204                NewRelicRegion::Us => Uri::from_static("https://metric-api.newrelic.com/metric/v1"),
205                NewRelicRegion::Eu => {
206                    Uri::from_static("https://metric-api.eu.newrelic.com/metric/v1")
207                }
208            },
209            NewRelicApi::Logs => match self.region {
210                NewRelicRegion::Us => Uri::from_static("https://log-api.newrelic.com/log/v1"),
211                NewRelicRegion::Eu => Uri::from_static("https://log-api.eu.newrelic.com/log/v1"),
212            },
213        }
214    }
215}
216
217impl From<&NewRelicConfig> for NewRelicCredentials {
218    fn from(config: &NewRelicConfig) -> Self {
219        Self {
220            license_key: config.license_key.inner().to_string(),
221            account_id: config.account_id.inner().to_string(),
222            api: config.api,
223            region: config.region.unwrap_or(NewRelicRegion::Us),
224            override_uri: config.override_uri.clone(),
225        }
226    }
227}