1use vector_lib::{
2 codecs::{
3 MetricTagValues,
4 encoding::{FramingConfig, JsonSerializerConfig, JsonSerializerOptions, SerializerConfig},
5 },
6 configurable::configurable_component,
7 sensitive_string::SensitiveString,
8};
9
10use crate::{
11 codecs::{EncodingConfigWithFraming, Transformer},
12 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
13 http::Auth as HttpAuthConfig,
14 sinks::{
15 Healthcheck, VectorSink,
16 http::config::{HttpMethod, HttpSinkConfig},
17 util::{
18 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
19 },
20 },
21 tls::TlsConfig,
22};
23
24static CLOUD_URL: &str = "https://api.axiom.co";
25
26#[configurable_component]
28#[derive(Clone, Debug, Default)]
29#[serde(default)]
30pub struct UrlOrRegion {
31 #[configurable(validation(format = "uri"))]
37 #[configurable(metadata(docs::examples = "https://api.eu.axiom.co"))]
38 #[configurable(metadata(docs::examples = "http://localhost:3400/ingest"))]
39 #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
40 pub url: Option<String>,
41
42 #[configurable(metadata(docs::examples = "${AXIOM_REGION}"))]
48 #[configurable(metadata(docs::examples = "mumbai.axiom.co"))]
49 #[configurable(metadata(docs::examples = "eu-central-1.aws.edge.axiom.co"))]
50 pub region: Option<String>,
51}
52
53impl UrlOrRegion {
54 fn validate(&self) -> crate::Result<()> {
56 if self.url.is_some() && self.region.is_some() {
57 return Err("Cannot set both `url` and `region`. Please use only one.".into());
58 }
59 Ok(())
60 }
61
62 pub fn url(&self) -> Option<&str> {
64 self.url.as_deref()
65 }
66
67 pub fn region(&self) -> Option<&str> {
69 self.region.as_deref()
70 }
71}
72
73#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
75#[derive(Clone, Debug, Default)]
76pub struct AxiomConfig {
77 #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
81 #[configurable(metadata(docs::examples = "123abc"))]
82 pub org_id: Option<String>,
83
84 #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
86 #[configurable(metadata(docs::examples = "123abc"))]
87 pub token: SensitiveString,
88
89 #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
91 #[configurable(metadata(docs::examples = "vector_rocks"))]
92 pub dataset: String,
93
94 #[serde(flatten)]
96 #[configurable(derived)]
97 pub endpoint: UrlOrRegion,
98
99 #[configurable(derived)]
100 #[serde(default)]
101 pub request: RequestConfig,
102
103 #[configurable(derived)]
105 #[serde(default = "Compression::zstd_default")]
106 pub compression: Compression,
107
108 #[configurable(derived)]
112 pub tls: Option<TlsConfig>,
113
114 #[configurable(derived)]
116 #[serde(default)]
117 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
118
119 #[configurable(derived)]
121 #[serde(
122 default,
123 deserialize_with = "crate::serde::bool_or_struct",
124 skip_serializing_if = "crate::serde::is_default"
125 )]
126 pub acknowledgements: AcknowledgementsConfig,
127}
128
129impl GenerateConfig for AxiomConfig {
130 fn generate_config() -> toml::Value {
131 toml::from_str(
132 r#"token = "${AXIOM_TOKEN}"
133 dataset = "${AXIOM_DATASET}"
134 url = "${AXIOM_URL}"
135 org_id = "${AXIOM_ORG_ID}""#,
136 )
137 .unwrap()
138 }
139}
140
141#[async_trait::async_trait]
142#[typetag::serde(name = "axiom")]
143impl SinkConfig for AxiomConfig {
144 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
145 self.endpoint.validate()?;
147
148 let mut request = self.request.clone();
149 if let Some(org_id) = &self.org_id {
150 request
152 .headers
153 .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
154 }
155
156 let http_sink_config = HttpSinkConfig {
163 uri: self.build_endpoint().try_into()?,
164 compression: self.compression,
165 auth: Some(HttpAuthConfig::Bearer {
166 token: self.token.clone(),
167 }),
168 method: HttpMethod::Post,
169 tls: self.tls.clone(),
170 request,
171 acknowledgements: self.acknowledgements,
172 batch: self.batch,
173 headers: None,
174 encoding: EncodingConfigWithFraming::new(
175 Some(FramingConfig::NewlineDelimited),
176 SerializerConfig::Json(JsonSerializerConfig {
177 metric_tag_values: MetricTagValues::Single,
178 options: JsonSerializerOptions { pretty: false }, }),
180 Transformer::default(),
181 ),
182 payload_prefix: "".into(), payload_suffix: "".into(), };
185
186 http_sink_config.build(cx).await
187 }
188
189 fn input(&self) -> Input {
190 Input::new(DataType::Metric | DataType::Log | DataType::Trace)
191 }
192
193 fn acknowledgements(&self) -> &AcknowledgementsConfig {
194 &self.acknowledgements
195 }
196}
197
198impl AxiomConfig {
199 fn build_endpoint(&self) -> String {
200 if let Some(url) = self.endpoint.url() {
204 let url = url.trim_end_matches('/');
205
206 if let Ok(parsed) = url::Url::parse(url) {
210 let path = parsed.path();
211 if path.is_empty() || path == "/" {
212 return format!("{url}/v1/datasets/{}/ingest", self.dataset);
214 }
215 }
216
217 return url.to_string();
219 }
220
221 if let Some(region) = self.endpoint.region() {
223 let region = region.trim_end_matches('/');
224 return format!("https://{region}/v1/ingest/{}", self.dataset);
225 }
226
227 format!("{CLOUD_URL}/v1/datasets/{}/ingest", self.dataset)
229 }
230}
231
232#[cfg(test)]
233mod test {
234 #[test]
235 fn generate_config() {
236 crate::test_util::test_generate_config::<super::AxiomConfig>();
237 }
238
239 #[test]
240 fn test_region_domain_only() {
241 let config = super::AxiomConfig {
243 endpoint: super::UrlOrRegion {
244 region: Some("mumbai.axiomdomain.co".to_string()),
245 url: None,
246 },
247 dataset: "test-3".to_string(),
248 ..Default::default()
249 };
250 let endpoint = config.build_endpoint();
251 assert_eq!(endpoint, "https://mumbai.axiomdomain.co/v1/ingest/test-3");
252 }
253
254 #[test]
255 fn test_default_no_config() {
256 let config = super::AxiomConfig {
258 dataset: "foo".to_string(),
259 ..Default::default()
260 };
261 let endpoint = config.build_endpoint();
262 assert_eq!(endpoint, "https://api.axiom.co/v1/datasets/foo/ingest");
263 }
264
265 #[test]
266 fn test_url_with_custom_path() {
267 let config = super::AxiomConfig {
269 endpoint: super::UrlOrRegion {
270 url: Some("http://localhost:3400/ingest".to_string()),
271 region: None,
272 },
273 dataset: "meh".to_string(),
274 ..Default::default()
275 };
276 let endpoint = config.build_endpoint();
277 assert_eq!(endpoint, "http://localhost:3400/ingest");
278 }
279
280 #[test]
281 fn test_url_without_path_backwards_compat() {
282 let config = super::AxiomConfig {
284 endpoint: super::UrlOrRegion {
285 url: Some("https://api.eu.axiom.co".to_string()),
286 region: None,
287 },
288 dataset: "qoo".to_string(),
289 ..Default::default()
290 };
291 let endpoint = config.build_endpoint();
292 assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
293
294 let config = super::AxiomConfig {
296 endpoint: super::UrlOrRegion {
297 url: Some("https://api.eu.axiom.co/".to_string()),
298 region: None,
299 },
300 dataset: "qoo".to_string(),
301 ..Default::default()
302 };
303 let endpoint = config.build_endpoint();
304 assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
305 }
306
307 #[test]
308 fn test_both_url_and_region_fails_validation() {
309 let endpoint = super::UrlOrRegion {
311 url: Some("http://localhost:3400/ingest".to_string()),
312 region: Some("mumbai.axiomdomain.co".to_string()),
313 };
314
315 let result = endpoint.validate();
316 assert!(result.is_err());
317 assert_eq!(
318 result.unwrap_err().to_string(),
319 "Cannot set both `url` and `region`. Please use only one."
320 );
321 }
322
323 #[test]
324 fn test_url_or_region_deserialization_with_url() {
325 let config: super::AxiomConfig = toml::from_str(
327 r#"
328 token = "test-token"
329 dataset = "test-dataset"
330 url = "https://api.eu.axiom.co"
331 "#,
332 )
333 .unwrap();
334
335 assert_eq!(config.endpoint.url(), Some("https://api.eu.axiom.co"));
336 assert_eq!(config.endpoint.region(), None);
337 }
338
339 #[test]
340 fn test_url_or_region_deserialization_with_region() {
341 let config: super::AxiomConfig = toml::from_str(
343 r#"
344 token = "test-token"
345 dataset = "test-dataset"
346 region = "mumbai.axiom.co"
347 "#,
348 )
349 .unwrap();
350
351 assert_eq!(config.endpoint.url(), None);
352 assert_eq!(config.endpoint.region(), Some("mumbai.axiom.co"));
353 }
354
355 #[test]
356 fn test_production_regional_edges() {
357 let config = super::AxiomConfig {
359 endpoint: super::UrlOrRegion {
360 region: Some("eu-central-1.aws.edge.axiom.co".to_string()),
361 url: None,
362 },
363 dataset: "my-dataset".to_string(),
364 ..Default::default()
365 };
366 let endpoint = config.build_endpoint();
367 assert_eq!(
368 endpoint,
369 "https://eu-central-1.aws.edge.axiom.co/v1/ingest/my-dataset"
370 );
371 }
372
373 #[test]
374 fn test_staging_environment_edges() {
375 let config = super::AxiomConfig {
377 endpoint: super::UrlOrRegion {
378 region: Some("us-east-1.edge.staging.axiomdomain.co".to_string()),
379 url: None,
380 },
381 dataset: "test-dataset".to_string(),
382 ..Default::default()
383 };
384 let endpoint = config.build_endpoint();
385 assert_eq!(
386 endpoint,
387 "https://us-east-1.edge.staging.axiomdomain.co/v1/ingest/test-dataset"
388 );
389 }
390
391 #[test]
392 fn test_dev_environment_edges() {
393 let config = super::AxiomConfig {
395 endpoint: super::UrlOrRegion {
396 region: Some("eu-west-1.edge.dev.axiomdomain.co".to_string()),
397 url: None,
398 },
399 dataset: "dev-dataset".to_string(),
400 ..Default::default()
401 };
402 let endpoint = config.build_endpoint();
403 assert_eq!(
404 endpoint,
405 "https://eu-west-1.edge.dev.axiomdomain.co/v1/ingest/dev-dataset"
406 );
407 }
408}