vector/sinks/axiom/
config.rs

1use vector_lib::{
2    codecs::{
3        MetricTagValues,
4        encoding::{FramingConfig, JsonSerializerConfig, JsonSerializerOptions, SerializerConfig},
5    },
6    configurable::configurable_component,
7    sensitive_string::SensitiveString,
8};
9
10use crate::{
11    codecs::{EncodingConfigWithFraming, Transformer},
12    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
13    http::Auth as HttpAuthConfig,
14    sinks::{
15        Healthcheck, VectorSink,
16        http::config::{HttpMethod, HttpSinkConfig},
17        util::{
18            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
19        },
20    },
21    tls::TlsConfig,
22};
23
24static CLOUD_URL: &str = "https://api.axiom.co";
25
26/// Configuration of the URL/region to use when interacting with Axiom.
27#[configurable_component]
28#[derive(Clone, Debug, Default)]
29#[serde(default)]
30pub struct UrlOrRegion {
31    /// URI of the Axiom endpoint to send data to.
32    ///
33    /// If a path is provided, the URL is used as-is.
34    /// If no path (or only `/`) is provided, `/v1/datasets/{dataset}/ingest` is appended for backwards compatibility.
35    /// This takes precedence over `region` if both are set (but both should not be set).
36    #[configurable(validation(format = "uri"))]
37    #[configurable(metadata(docs::examples = "https://api.eu.axiom.co"))]
38    #[configurable(metadata(docs::examples = "http://localhost:3400/ingest"))]
39    #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
40    pub url: Option<String>,
41
42    /// The Axiom regional edge domain to use for ingestion.
43    ///
44    /// Specify the domain name only (no scheme, no path).
45    /// When set, data is sent to `https://{region}/v1/ingest/{dataset}`.
46    /// Cannot be used together with `url`.
47    #[configurable(metadata(docs::examples = "${AXIOM_REGION}"))]
48    #[configurable(metadata(docs::examples = "mumbai.axiom.co"))]
49    #[configurable(metadata(docs::examples = "eu-central-1.aws.edge.axiom.co"))]
50    pub region: Option<String>,
51}
52
53impl UrlOrRegion {
54    /// Validates that url and region are not both set.
55    fn validate(&self) -> crate::Result<()> {
56        if self.url.is_some() && self.region.is_some() {
57            return Err("Cannot set both `url` and `region`. Please use only one.".into());
58        }
59        Ok(())
60    }
61
62    /// Returns the url if set.
63    pub fn url(&self) -> Option<&str> {
64        self.url.as_deref()
65    }
66
67    /// Returns the region if set.
68    pub fn region(&self) -> Option<&str> {
69        self.region.as_deref()
70    }
71}
72
73/// Configuration for the `axiom` sink.
74#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
75#[derive(Clone, Debug, Default)]
76pub struct AxiomConfig {
77    /// The Axiom organization ID.
78    ///
79    /// Only required when using personal tokens.
80    #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
81    #[configurable(metadata(docs::examples = "123abc"))]
82    pub org_id: Option<String>,
83
84    /// The Axiom API token.
85    #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
86    #[configurable(metadata(docs::examples = "123abc"))]
87    pub token: SensitiveString,
88
89    /// The Axiom dataset to write to.
90    #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
91    #[configurable(metadata(docs::examples = "vector_rocks"))]
92    pub dataset: String,
93
94    /// Configuration for the URL or regional edge endpoint.
95    #[serde(flatten)]
96    #[configurable(derived)]
97    pub endpoint: UrlOrRegion,
98
99    #[configurable(derived)]
100    #[serde(default)]
101    pub request: RequestConfig,
102
103    /// The compression algorithm to use.
104    #[configurable(derived)]
105    #[serde(default = "Compression::zstd_default")]
106    pub compression: Compression,
107
108    /// The TLS settings for the connection.
109    ///
110    /// Optional, constrains TLS settings for this sink.
111    #[configurable(derived)]
112    pub tls: Option<TlsConfig>,
113
114    /// The batch settings for the sink.
115    #[configurable(derived)]
116    #[serde(default)]
117    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
118
119    /// Controls how acknowledgements are handled for this sink.
120    #[configurable(derived)]
121    #[serde(
122        default,
123        deserialize_with = "crate::serde::bool_or_struct",
124        skip_serializing_if = "crate::serde::is_default"
125    )]
126    pub acknowledgements: AcknowledgementsConfig,
127}
128
129impl GenerateConfig for AxiomConfig {
130    fn generate_config() -> toml::Value {
131        toml::from_str(
132            r#"token = "${AXIOM_TOKEN}"
133            dataset = "${AXIOM_DATASET}"
134            url = "${AXIOM_URL}"
135            org_id = "${AXIOM_ORG_ID}""#,
136        )
137        .unwrap()
138    }
139}
140
141#[async_trait::async_trait]
142#[typetag::serde(name = "axiom")]
143impl SinkConfig for AxiomConfig {
144    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
145        // Validate that url and region are not both set
146        self.endpoint.validate()?;
147
148        let mut request = self.request.clone();
149        if let Some(org_id) = &self.org_id {
150            // NOTE: Only add the org id header if an org id is provided
151            request
152                .headers
153                .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
154        }
155
156        // Axiom has a custom high-performance database that can be ingested
157        // into using the native HTTP ingest endpoint. This configuration wraps
158        // the vector HTTP sink with the necessary adjustments to send data
159        // to Axiom, whilst keeping the configuration simple and easy to use
160        // and maintenance of the vector axiom sink to a minimum.
161        //
162        let http_sink_config = HttpSinkConfig {
163            uri: self.build_endpoint().try_into()?,
164            compression: self.compression,
165            auth: Some(HttpAuthConfig::Bearer {
166                token: self.token.clone(),
167            }),
168            method: HttpMethod::Post,
169            tls: self.tls.clone(),
170            request,
171            acknowledgements: self.acknowledgements,
172            batch: self.batch,
173            encoding: EncodingConfigWithFraming::new(
174                Some(FramingConfig::NewlineDelimited),
175                SerializerConfig::Json(JsonSerializerConfig {
176                    metric_tag_values: MetricTagValues::Single,
177                    options: JsonSerializerOptions { pretty: false }, // Minified JSON
178                }),
179                Transformer::default(),
180            ),
181            payload_prefix: "".into(), // Always newline delimited JSON
182            payload_suffix: "".into(), // Always newline delimited JSON
183        };
184
185        http_sink_config.build(cx).await
186    }
187
188    fn input(&self) -> Input {
189        Input::new(DataType::Metric | DataType::Log | DataType::Trace)
190    }
191
192    fn acknowledgements(&self) -> &AcknowledgementsConfig {
193        &self.acknowledgements
194    }
195}
196
197impl AxiomConfig {
198    fn build_endpoint(&self) -> String {
199        // Priority: url > region > default cloud endpoint
200
201        // If url is set, check if it has a path
202        if let Some(url) = self.endpoint.url() {
203            let url = url.trim_end_matches('/');
204
205            // Parse URL to check if path is provided
206            // If path is empty or just "/", append the legacy format for backwards compatibility
207            // Otherwise, use the URL as-is
208            if let Ok(parsed) = url::Url::parse(url) {
209                let path = parsed.path();
210                if path.is_empty() || path == "/" {
211                    // Backwards compatibility: append legacy path format
212                    return format!("{url}/v1/datasets/{}/ingest", self.dataset);
213                }
214            }
215
216            // URL has a custom path, use as-is
217            return url.to_string();
218        }
219
220        // If region is set, build the regional edge endpoint
221        if let Some(region) = self.endpoint.region() {
222            let region = region.trim_end_matches('/');
223            return format!("https://{region}/v1/ingest/{}", self.dataset);
224        }
225
226        // Default: use cloud endpoint with legacy path format
227        format!("{CLOUD_URL}/v1/datasets/{}/ingest", self.dataset)
228    }
229}
230
231#[cfg(test)]
232mod test {
233    #[test]
234    fn generate_config() {
235        crate::test_util::test_generate_config::<super::AxiomConfig>();
236    }
237
238    #[test]
239    fn test_region_domain_only() {
240        // region: mumbai.axiomdomain.co → https://mumbai.axiomdomain.co/v1/ingest/test-3
241        let config = super::AxiomConfig {
242            endpoint: super::UrlOrRegion {
243                region: Some("mumbai.axiomdomain.co".to_string()),
244                url: None,
245            },
246            dataset: "test-3".to_string(),
247            ..Default::default()
248        };
249        let endpoint = config.build_endpoint();
250        assert_eq!(endpoint, "https://mumbai.axiomdomain.co/v1/ingest/test-3");
251    }
252
253    #[test]
254    fn test_default_no_config() {
255        // No url, no region → https://api.axiom.co/v1/datasets/foo/ingest
256        let config = super::AxiomConfig {
257            dataset: "foo".to_string(),
258            ..Default::default()
259        };
260        let endpoint = config.build_endpoint();
261        assert_eq!(endpoint, "https://api.axiom.co/v1/datasets/foo/ingest");
262    }
263
264    #[test]
265    fn test_url_with_custom_path() {
266        // url: http://localhost:3400/ingest → http://localhost:3400/ingest (as-is)
267        let config = super::AxiomConfig {
268            endpoint: super::UrlOrRegion {
269                url: Some("http://localhost:3400/ingest".to_string()),
270                region: None,
271            },
272            dataset: "meh".to_string(),
273            ..Default::default()
274        };
275        let endpoint = config.build_endpoint();
276        assert_eq!(endpoint, "http://localhost:3400/ingest");
277    }
278
279    #[test]
280    fn test_url_without_path_backwards_compat() {
281        // url: https://api.eu.axiom.co/ → https://api.eu.axiom.co/v1/datasets/qoo/ingest
282        let config = super::AxiomConfig {
283            endpoint: super::UrlOrRegion {
284                url: Some("https://api.eu.axiom.co".to_string()),
285                region: None,
286            },
287            dataset: "qoo".to_string(),
288            ..Default::default()
289        };
290        let endpoint = config.build_endpoint();
291        assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
292
293        // Also test with trailing slash
294        let config = super::AxiomConfig {
295            endpoint: super::UrlOrRegion {
296                url: Some("https://api.eu.axiom.co/".to_string()),
297                region: None,
298            },
299            dataset: "qoo".to_string(),
300            ..Default::default()
301        };
302        let endpoint = config.build_endpoint();
303        assert_eq!(endpoint, "https://api.eu.axiom.co/v1/datasets/qoo/ingest");
304    }
305
306    #[test]
307    fn test_both_url_and_region_fails_validation() {
308        // When both url and region are set, validation should fail
309        let endpoint = super::UrlOrRegion {
310            url: Some("http://localhost:3400/ingest".to_string()),
311            region: Some("mumbai.axiomdomain.co".to_string()),
312        };
313
314        let result = endpoint.validate();
315        assert!(result.is_err());
316        assert_eq!(
317            result.unwrap_err().to_string(),
318            "Cannot set both `url` and `region`. Please use only one."
319        );
320    }
321
322    #[test]
323    fn test_url_or_region_deserialization_with_url() {
324        // Test that url can be deserialized at the top level (flattened)
325        let config: super::AxiomConfig = toml::from_str(
326            r#"
327            token = "test-token"
328            dataset = "test-dataset"
329            url = "https://api.eu.axiom.co"
330            "#,
331        )
332        .unwrap();
333
334        assert_eq!(config.endpoint.url(), Some("https://api.eu.axiom.co"));
335        assert_eq!(config.endpoint.region(), None);
336    }
337
338    #[test]
339    fn test_url_or_region_deserialization_with_region() {
340        // Test that region can be deserialized at the top level (flattened)
341        let config: super::AxiomConfig = toml::from_str(
342            r#"
343            token = "test-token"
344            dataset = "test-dataset"
345            region = "mumbai.axiom.co"
346            "#,
347        )
348        .unwrap();
349
350        assert_eq!(config.endpoint.url(), None);
351        assert_eq!(config.endpoint.region(), Some("mumbai.axiom.co"));
352    }
353
354    #[test]
355    fn test_production_regional_edges() {
356        // Production AWS edge
357        let config = super::AxiomConfig {
358            endpoint: super::UrlOrRegion {
359                region: Some("eu-central-1.aws.edge.axiom.co".to_string()),
360                url: None,
361            },
362            dataset: "my-dataset".to_string(),
363            ..Default::default()
364        };
365        let endpoint = config.build_endpoint();
366        assert_eq!(
367            endpoint,
368            "https://eu-central-1.aws.edge.axiom.co/v1/ingest/my-dataset"
369        );
370    }
371
372    #[test]
373    fn test_staging_environment_edges() {
374        // Staging environment edge
375        let config = super::AxiomConfig {
376            endpoint: super::UrlOrRegion {
377                region: Some("us-east-1.edge.staging.axiomdomain.co".to_string()),
378                url: None,
379            },
380            dataset: "test-dataset".to_string(),
381            ..Default::default()
382        };
383        let endpoint = config.build_endpoint();
384        assert_eq!(
385            endpoint,
386            "https://us-east-1.edge.staging.axiomdomain.co/v1/ingest/test-dataset"
387        );
388    }
389
390    #[test]
391    fn test_dev_environment_edges() {
392        // Dev environment edge
393        let config = super::AxiomConfig {
394            endpoint: super::UrlOrRegion {
395                region: Some("eu-west-1.edge.dev.axiomdomain.co".to_string()),
396                url: None,
397            },
398            dataset: "dev-dataset".to_string(),
399            ..Default::default()
400        };
401        let endpoint = config.build_endpoint();
402        assert_eq!(
403            endpoint,
404            "https://eu-west-1.edge.dev.axiomdomain.co/v1/ingest/dev-dataset"
405        );
406    }
407}