vector/sinks/axiom/
config.rs

1use vector_lib::{
2    codecs::{
3        MetricTagValues,
4        encoding::{FramingConfig, JsonSerializerConfig, JsonSerializerOptions, SerializerConfig},
5    },
6    configurable::configurable_component,
7    sensitive_string::SensitiveString,
8};
9
10use crate::{
11    codecs::{EncodingConfigWithFraming, Transformer},
12    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
13    http::Auth as HttpAuthConfig,
14    sinks::{
15        Healthcheck, VectorSink,
16        http::config::{HttpMethod, HttpSinkConfig},
17        util::{
18            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
19        },
20    },
21    tls::TlsConfig,
22};
23
24static CLOUD_URL: &str = "https://api.axiom.co";
25
26/// Configuration of the URL/region to use when interacting with Axiom.
27#[configurable_component]
28#[derive(Clone, Debug, Default)]
29#[serde(default)]
30pub struct UrlOrRegion {
31    /// URI of the Axiom endpoint to send data to.
32    ///
33    /// If a path is provided, the URL is used as-is.
34    /// If no path (or only `/`) is provided, `/v1/datasets/{dataset}/ingest` is appended for backwards compatibility.
35    /// This takes precedence over `region` if both are set (but both should not be set).
36    #[configurable(validation(format = "uri"))]
37    #[configurable(metadata(docs::examples = "https://api.eu.axiom.co"))]
38    #[configurable(metadata(docs::examples = "http://localhost:3400/ingest"))]
39    #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
40    pub url: Option<String>,
41
42    /// The Axiom regional edge domain to use for ingestion.
43    ///
44    /// Specify the domain name only (no scheme, no path).
45    /// When set, data is sent to `https://{region}/v1/ingest/{dataset}`.
46    /// Cannot be used together with `url`.
47    #[configurable(metadata(docs::examples = "${AXIOM_REGION}"))]
48    #[configurable(metadata(docs::examples = "mumbai.axiom.co"))]
49    #[configurable(metadata(docs::examples = "eu-central-1.aws.edge.axiom.co"))]
50    pub region: Option<String>,
51}
52
53impl UrlOrRegion {
54    /// Validates that url and region are not both set.
55    fn validate(&self) -> crate::Result<()> {
56        if self.url.is_some() && self.region.is_some() {
57            return Err("Cannot set both `url` and `region`. Please use only one.".into());
58        }
59        Ok(())
60    }
61
62    /// Returns the url if set.
63    pub fn url(&self) -> Option<&str> {
64        self.url.as_deref()
65    }
66
67    /// Returns the region if set.
68    pub fn region(&self) -> Option<&str> {
69        self.region.as_deref()
70    }
71}
72
73/// Configuration for the `axiom` sink.
74#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
75#[derive(Clone, Debug, Default)]
76pub struct AxiomConfig {
77    /// The Axiom organization ID.
78    ///
79    /// Only required when using personal tokens.
80    #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
81    #[configurable(metadata(docs::examples = "123abc"))]
82    pub org_id: Option<String>,
83
84    /// The Axiom API token.
85    #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
86    #[configurable(metadata(docs::examples = "123abc"))]
87    pub token: SensitiveString,
88
89    /// The Axiom dataset to write to.
90    #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
91    #[configurable(metadata(docs::examples = "vector_rocks"))]
92    pub dataset: String,
93
94    /// Configuration for the URL or regional edge endpoint.
95    #[serde(flatten)]
96    #[configurable(derived)]
97    pub endpoint: UrlOrRegion,
98
99    #[configurable(derived)]
100    #[serde(default)]
101    pub request: RequestConfig,
102
103    /// The compression algorithm to use.
104    #[configurable(derived)]
105    #[serde(default = "Compression::zstd_default")]
106    pub compression: Compression,
107
108    /// The TLS settings for the connection.
109    ///
110    /// Optional, constrains TLS settings for this sink.
111    #[configurable(derived)]
112    pub tls: Option<TlsConfig>,
113
114    /// The batch settings for the sink.
115    #[configurable(derived)]
116    #[serde(default)]
117    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
118
119    /// Controls how acknowledgements are handled for this sink.
120    #[configurable(derived)]
121    #[serde(
122        default,
123        deserialize_with = "crate::serde::bool_or_struct",
124        skip_serializing_if = "crate::serde::is_default"
125    )]
126    pub acknowledgements: AcknowledgementsConfig,
127}
128
129impl GenerateConfig for AxiomConfig {
130    fn generate_config() -> toml::Value {
131        toml::from_str(
132            r#"token = "${AXIOM_TOKEN}"
133            dataset = "${AXIOM_DATASET}"
134            url = "${AXIOM_URL}"
135            org_id = "${AXIOM_ORG_ID}""#,
136        )
137        .unwrap()
138    }
139}
140
141#[async_trait::async_trait]
142#[typetag::serde(name = "axiom")]
143impl SinkConfig for AxiomConfig {
144    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
145        // Validate that url and region are not both set
146        self.endpoint.validate()?;
147
148        let mut request = self.request.clone();
149        if let Some(org_id) = &self.org_id {
150            // NOTE: Only add the org id header if an org id is provided
151            request
152                .headers
153                .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
154        }
155
156        // Axiom has a custom high-performance database that can be ingested
157        // into using the native HTTP ingest endpoint. This configuration wraps
158        // the vector HTTP sink with the necessary adjustments to send data
159        // to Axiom, whilst keeping the configuration simple and easy to use
160        // and maintenance of the vector axiom sink to a minimum.
161        //
162        let http_sink_config = HttpSinkConfig {
163            uri: self.build_endpoint().try_into()?,
164            compression: self.compression,
165            auth: Some(HttpAuthConfig::Bearer {
166                token: self.token.clone(),
167            }),
168            method: HttpMethod::Post,
169            tls: self.tls.clone(),
170            request,
171            acknowledgements: self.acknowledgements,
172            batch: self.batch,
173            headers: None,
174            encoding: EncodingConfigWithFraming::new(
175                Some(FramingConfig::NewlineDelimited),
176                SerializerConfig::Json(JsonSerializerConfig {
177                    metric_tag_values: MetricTagValues::Single,
178                    options: JsonSerializerOptions { pretty: false }, // Minified JSON
179                }),
180                Transformer::default(),
181            ),
182            payload_prefix: "".into(), // Always newline delimited JSON
183            payload_suffix: "".into(), // Always newline delimited JSON
184        };
185
186        http_sink_config.build(cx).await
187    }
188
189    fn input(&self) -> Input {
190        Input::new(DataType::Metric | DataType::Log | DataType::Trace)
191    }
192
193    fn acknowledgements(&self) -> &AcknowledgementsConfig {
194        &self.acknowledgements
195    }
196}
197
198impl AxiomConfig {
199    fn build_endpoint(&self) -> String {
200        // Priority: url > region > default cloud endpoint
201
202        // If url is set, check if it has a path
203        if let Some(url) = self.endpoint.url() {
204            let url = url.trim_end_matches('/');
205
206            // Parse URL to check if path is provided
207            // If path is empty or just "/", append the legacy format for backwards compatibility
208            // Otherwise, use the URL as-is
209            if let Ok(parsed) = url::Url::parse(url) {
210                let path = parsed.path();
211                if path.is_empty() || path == "/" {
212                    // Backwards compatibility: append legacy path format
213                    return format!("{url}/v1/datasets/{}/ingest", self.dataset);
214                }
215            }
216
217            // URL has a custom path, use as-is
218            return url.to_string();
219        }
220
221        // If region is set, build the regional edge endpoint
222        if let Some(region) = self.endpoint.region() {
223            let region = region.trim_end_matches('/');
224            return format!("https://{region}/v1/ingest/{}", self.dataset);
225        }
226
227        // Default: use cloud endpoint with legacy path format
228        format!("{CLOUD_URL}/v1/datasets/{}/ingest", self.dataset)
229    }
230}
231
232#[cfg(test)]
233mod test {
234    #[test]
235    fn generate_config() {
236        crate::test_util::test_generate_config::<super::AxiomConfig>();
237    }
238
239    #[test]
240    fn test_region_domain_only() {
241        // region: mumbai.axiomdomain.co → https://mumbai.axiomdomain.co/v1/ingest/test-3
242        let config = super::AxiomConfig {
243            endpoint: super::UrlOrRegion {
244                region: Some("mumbai.axiomdomain.co".to_string()),
245                url: None,
246            },
247            dataset: "test-3".to_string(),
248            ..Default::default()
249        };
250        let endpoint = config.build_endpoint();
251        assert_eq!(endpoint, "https://mumbai.axiomdomain.co/v1/ingest/test-3");
252    }
253
254    #[test]
255    fn test_default_no_config() {
256        // No url, no region → https://api.axiom.co/v1/datasets/foo/ingest
257        let config = super::AxiomConfig {
258            dataset: "foo".to_string(),
259            ..Default::default()
260        };
261        let endpoint = config.build_endpoint();
262        assert_eq!(endpoint, "https://api.axiom.co/v1/datasets/foo/ingest");
263    }
264
265    #[test]
266    fn test_url_with_custom_path() {
267        // url: http://localhost:3400/ingest → http://localhost:3400/ingest (as-is)
268        let config = super::AxiomConfig {
269            endpoint: super::UrlOrRegion {
270                url: Some("http://localhost:3400/ingest".to_string()),
271                region: None,
272            },
273            dataset: "meh".to_string(),
274            ..Default::default()
275        };
276        let endpoint = config.build_endpoint();
277        assert_eq!(endpoint, "http://localhost:3400/ingest");
278    }
279
280    #[test]
281    fn test_url_without_path_backwards_compat() {
282        // url: https://api.eu.axiom.co/ → https://api.eu.axiom.co/v1/datasets/qoo/ingest
283        let config = super::AxiomConfig {
284            endpoint: super::UrlOrRegion {
285                url: Some("https://api.eu.axiom.co".to_string()),
286                region: None,
287            },
288            dataset: "qoo".to_string(),
289            ..Default::default()
290        };
291        let endpoint = config.build_endpoint();
292        assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
293
294        // Also test with trailing slash
295        let config = super::AxiomConfig {
296            endpoint: super::UrlOrRegion {
297                url: Some("https://api.eu.axiom.co/".to_string()),
298                region: None,
299            },
300            dataset: "qoo".to_string(),
301            ..Default::default()
302        };
303        let endpoint = config.build_endpoint();
304        assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
305    }
306
307    #[test]
308    fn test_both_url_and_region_fails_validation() {
309        // When both url and region are set, validation should fail
310        let endpoint = super::UrlOrRegion {
311            url: Some("http://localhost:3400/ingest".to_string()),
312            region: Some("mumbai.axiomdomain.co".to_string()),
313        };
314
315        let result = endpoint.validate();
316        assert!(result.is_err());
317        assert_eq!(
318            result.unwrap_err().to_string(),
319            "Cannot set both `url` and `region`. Please use only one."
320        );
321    }
322
323    #[test]
324    fn test_url_or_region_deserialization_with_url() {
325        // Test that url can be deserialized at the top level (flattened)
326        let config: super::AxiomConfig = toml::from_str(
327            r#"
328            token = "test-token"
329            dataset = "test-dataset"
330            url = "https://api.eu.axiom.co"
331            "#,
332        )
333        .unwrap();
334
335        assert_eq!(config.endpoint.url(), Some("https://api.eu.axiom.co"));
336        assert_eq!(config.endpoint.region(), None);
337    }
338
339    #[test]
340    fn test_url_or_region_deserialization_with_region() {
341        // Test that region can be deserialized at the top level (flattened)
342        let config: super::AxiomConfig = toml::from_str(
343            r#"
344            token = "test-token"
345            dataset = "test-dataset"
346            region = "mumbai.axiom.co"
347            "#,
348        )
349        .unwrap();
350
351        assert_eq!(config.endpoint.url(), None);
352        assert_eq!(config.endpoint.region(), Some("mumbai.axiom.co"));
353    }
354
355    #[test]
356    fn test_production_regional_edges() {
357        // Production AWS edge
358        let config = super::AxiomConfig {
359            endpoint: super::UrlOrRegion {
360                region: Some("eu-central-1.aws.edge.axiom.co".to_string()),
361                url: None,
362            },
363            dataset: "my-dataset".to_string(),
364            ..Default::default()
365        };
366        let endpoint = config.build_endpoint();
367        assert_eq!(
368            endpoint,
369            "https://eu-central-1.aws.edge.axiom.co/v1/ingest/my-dataset"
370        );
371    }
372
373    #[test]
374    fn test_staging_environment_edges() {
375        // Staging environment edge
376        let config = super::AxiomConfig {
377            endpoint: super::UrlOrRegion {
378                region: Some("us-east-1.edge.staging.axiomdomain.co".to_string()),
379                url: None,
380            },
381            dataset: "test-dataset".to_string(),
382            ..Default::default()
383        };
384        let endpoint = config.build_endpoint();
385        assert_eq!(
386            endpoint,
387            "https://us-east-1.edge.staging.axiomdomain.co/v1/ingest/test-dataset"
388        );
389    }
390
391    #[test]
392    fn test_dev_environment_edges() {
393        // Dev environment edge
394        let config = super::AxiomConfig {
395            endpoint: super::UrlOrRegion {
396                region: Some("eu-west-1.edge.dev.axiomdomain.co".to_string()),
397                url: None,
398            },
399            dataset: "dev-dataset".to_string(),
400            ..Default::default()
401        };
402        let endpoint = config.build_endpoint();
403        assert_eq!(
404            endpoint,
405            "https://eu-west-1.edge.dev.axiomdomain.co/v1/ingest/dev-dataset"
406        );
407    }
408}