1use vector_lib::{
2 codecs::{
3 MetricTagValues,
4 encoding::{FramingConfig, JsonSerializerConfig, JsonSerializerOptions, SerializerConfig},
5 },
6 configurable::configurable_component,
7 sensitive_string::SensitiveString,
8};
9
10use crate::{
11 codecs::{EncodingConfigWithFraming, Transformer},
12 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
13 http::Auth as HttpAuthConfig,
14 sinks::{
15 Healthcheck, VectorSink,
16 http::config::{HttpMethod, HttpSinkConfig},
17 util::{
18 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
19 },
20 },
21 tls::TlsConfig,
22};
23
24static CLOUD_URL: &str = "https://api.axiom.co";
25
26#[configurable_component]
28#[derive(Clone, Debug, Default)]
29#[serde(default)]
30pub struct UrlOrRegion {
31 #[configurable(validation(format = "uri"))]
37 #[configurable(metadata(docs::examples = "https://api.eu.axiom.co"))]
38 #[configurable(metadata(docs::examples = "http://localhost:3400/ingest"))]
39 #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
40 pub url: Option<String>,
41
42 #[configurable(metadata(docs::examples = "${AXIOM_REGION}"))]
48 #[configurable(metadata(docs::examples = "mumbai.axiom.co"))]
49 #[configurable(metadata(docs::examples = "eu-central-1.aws.edge.axiom.co"))]
50 pub region: Option<String>,
51}
52
53impl UrlOrRegion {
54 fn validate(&self) -> crate::Result<()> {
56 if self.url.is_some() && self.region.is_some() {
57 return Err("Cannot set both `url` and `region`. Please use only one.".into());
58 }
59 Ok(())
60 }
61
62 pub fn url(&self) -> Option<&str> {
64 self.url.as_deref()
65 }
66
67 pub fn region(&self) -> Option<&str> {
69 self.region.as_deref()
70 }
71}
72
73#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
75#[derive(Clone, Debug, Default)]
76pub struct AxiomConfig {
77 #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
81 #[configurable(metadata(docs::examples = "123abc"))]
82 pub org_id: Option<String>,
83
84 #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
86 #[configurable(metadata(docs::examples = "123abc"))]
87 pub token: SensitiveString,
88
89 #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
91 #[configurable(metadata(docs::examples = "vector_rocks"))]
92 pub dataset: String,
93
94 #[serde(flatten)]
96 #[configurable(derived)]
97 pub endpoint: UrlOrRegion,
98
99 #[configurable(derived)]
100 #[serde(default)]
101 pub request: RequestConfig,
102
103 #[configurable(derived)]
105 #[serde(default = "Compression::zstd_default")]
106 pub compression: Compression,
107
108 #[configurable(derived)]
112 pub tls: Option<TlsConfig>,
113
114 #[configurable(derived)]
116 #[serde(default)]
117 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
118
119 #[configurable(derived)]
121 #[serde(
122 default,
123 deserialize_with = "crate::serde::bool_or_struct",
124 skip_serializing_if = "crate::serde::is_default"
125 )]
126 pub acknowledgements: AcknowledgementsConfig,
127}
128
129impl GenerateConfig for AxiomConfig {
130 fn generate_config() -> toml::Value {
131 toml::from_str(
132 r#"token = "${AXIOM_TOKEN}"
133 dataset = "${AXIOM_DATASET}"
134 url = "${AXIOM_URL}"
135 org_id = "${AXIOM_ORG_ID}""#,
136 )
137 .unwrap()
138 }
139}
140
141#[async_trait::async_trait]
142#[typetag::serde(name = "axiom")]
143impl SinkConfig for AxiomConfig {
144 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
145 self.endpoint.validate()?;
147
148 let mut request = self.request.clone();
149 if let Some(org_id) = &self.org_id {
150 request
152 .headers
153 .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
154 }
155
156 let http_sink_config = HttpSinkConfig {
163 uri: self.build_endpoint().try_into()?,
164 compression: self.compression,
165 auth: Some(HttpAuthConfig::Bearer {
166 token: self.token.clone(),
167 }),
168 method: HttpMethod::Post,
169 tls: self.tls.clone(),
170 request,
171 acknowledgements: self.acknowledgements,
172 batch: self.batch,
173 encoding: EncodingConfigWithFraming::new(
174 Some(FramingConfig::NewlineDelimited),
175 SerializerConfig::Json(JsonSerializerConfig {
176 metric_tag_values: MetricTagValues::Single,
177 options: JsonSerializerOptions { pretty: false }, }),
179 Transformer::default(),
180 ),
181 payload_prefix: "".into(), payload_suffix: "".into(), };
184
185 http_sink_config.build(cx).await
186 }
187
188 fn input(&self) -> Input {
189 Input::new(DataType::Metric | DataType::Log | DataType::Trace)
190 }
191
192 fn acknowledgements(&self) -> &AcknowledgementsConfig {
193 &self.acknowledgements
194 }
195}
196
197impl AxiomConfig {
198 fn build_endpoint(&self) -> String {
199 if let Some(url) = self.endpoint.url() {
203 let url = url.trim_end_matches('/');
204
205 if let Ok(parsed) = url::Url::parse(url) {
209 let path = parsed.path();
210 if path.is_empty() || path == "/" {
211 return format!("{url}/v1/datasets/{}/ingest", self.dataset);
213 }
214 }
215
216 return url.to_string();
218 }
219
220 if let Some(region) = self.endpoint.region() {
222 let region = region.trim_end_matches('/');
223 return format!("https://{region}/v1/ingest/{}", self.dataset);
224 }
225
226 format!("{CLOUD_URL}/v1/datasets/{}/ingest", self.dataset)
228 }
229}
230
231#[cfg(test)]
232mod test {
233 #[test]
234 fn generate_config() {
235 crate::test_util::test_generate_config::<super::AxiomConfig>();
236 }
237
238 #[test]
239 fn test_region_domain_only() {
240 let config = super::AxiomConfig {
242 endpoint: super::UrlOrRegion {
243 region: Some("mumbai.axiomdomain.co".to_string()),
244 url: None,
245 },
246 dataset: "test-3".to_string(),
247 ..Default::default()
248 };
249 let endpoint = config.build_endpoint();
250 assert_eq!(endpoint, "https://mumbai.axiomdomain.co/v1/ingest/test-3");
251 }
252
253 #[test]
254 fn test_default_no_config() {
255 let config = super::AxiomConfig {
257 dataset: "foo".to_string(),
258 ..Default::default()
259 };
260 let endpoint = config.build_endpoint();
261 assert_eq!(endpoint, "https://api.axiom.co/v1/datasets/foo/ingest");
262 }
263
264 #[test]
265 fn test_url_with_custom_path() {
266 let config = super::AxiomConfig {
268 endpoint: super::UrlOrRegion {
269 url: Some("http://localhost:3400/ingest".to_string()),
270 region: None,
271 },
272 dataset: "meh".to_string(),
273 ..Default::default()
274 };
275 let endpoint = config.build_endpoint();
276 assert_eq!(endpoint, "http://localhost:3400/ingest");
277 }
278
279 #[test]
280 fn test_url_without_path_backwards_compat() {
281 let config = super::AxiomConfig {
283 endpoint: super::UrlOrRegion {
284 url: Some("https://api.eu.axiom.co".to_string()),
285 region: None,
286 },
287 dataset: "qoo".to_string(),
288 ..Default::default()
289 };
290 let endpoint = config.build_endpoint();
291 assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
292
293 let config = super::AxiomConfig {
295 endpoint: super::UrlOrRegion {
296 url: Some("https://api.eu.axiom.co/".to_string()),
297 region: None,
298 },
299 dataset: "qoo".to_string(),
300 ..Default::default()
301 };
302 let endpoint = config.build_endpoint();
303 assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
304 }
305
306 #[test]
307 fn test_both_url_and_region_fails_validation() {
308 let endpoint = super::UrlOrRegion {
310 url: Some("http://localhost:3400/ingest".to_string()),
311 region: Some("mumbai.axiomdomain.co".to_string()),
312 };
313
314 let result = endpoint.validate();
315 assert!(result.is_err());
316 assert_eq!(
317 result.unwrap_err().to_string(),
318 "Cannot set both `url` and `region`. Please use only one."
319 );
320 }
321
322 #[test]
323 fn test_url_or_region_deserialization_with_url() {
324 let config: super::AxiomConfig = toml::from_str(
326 r#"
327 token = "test-token"
328 dataset = "test-dataset"
329 url = "https://api.eu.axiom.co"
330 "#,
331 )
332 .unwrap();
333
334 assert_eq!(config.endpoint.url(), Some("https://api.eu.axiom.co"));
335 assert_eq!(config.endpoint.region(), None);
336 }
337
338 #[test]
339 fn test_url_or_region_deserialization_with_region() {
340 let config: super::AxiomConfig = toml::from_str(
342 r#"
343 token = "test-token"
344 dataset = "test-dataset"
345 region = "mumbai.axiom.co"
346 "#,
347 )
348 .unwrap();
349
350 assert_eq!(config.endpoint.url(), None);
351 assert_eq!(config.endpoint.region(), Some("mumbai.axiom.co"));
352 }
353
354 #[test]
355 fn test_production_regional_edges() {
356 let config = super::AxiomConfig {
358 endpoint: super::UrlOrRegion {
359 region: Some("eu-central-1.aws.edge.axiom.co".to_string()),
360 url: None,
361 },
362 dataset: "my-dataset".to_string(),
363 ..Default::default()
364 };
365 let endpoint = config.build_endpoint();
366 assert_eq!(
367 endpoint,
368 "https://eu-central-1.aws.edge.axiom.co/v1/ingest/my-dataset"
369 );
370 }
371
372 #[test]
373 fn test_staging_environment_edges() {
374 let config = super::AxiomConfig {
376 endpoint: super::UrlOrRegion {
377 region: Some("us-east-1.edge.staging.axiomdomain.co".to_string()),
378 url: None,
379 },
380 dataset: "test-dataset".to_string(),
381 ..Default::default()
382 };
383 let endpoint = config.build_endpoint();
384 assert_eq!(
385 endpoint,
386 "https://us-east-1.edge.staging.axiomdomain.co/v1/ingest/test-dataset"
387 );
388 }
389
390 #[test]
391 fn test_dev_environment_edges() {
392 let config = super::AxiomConfig {
394 endpoint: super::UrlOrRegion {
395 region: Some("eu-west-1.edge.dev.axiomdomain.co".to_string()),
396 url: None,
397 },
398 dataset: "dev-dataset".to_string(),
399 ..Default::default()
400 };
401 let endpoint = config.build_endpoint();
402 assert_eq!(
403 endpoint,
404 "https://eu-west-1.edge.dev.axiomdomain.co/v1/ingest/dev-dataset"
405 );
406 }
407}