vector/sinks/
axiom.rs

1use vector_lib::{
2    codecs::{
3        MetricTagValues,
4        encoding::{FramingConfig, JsonSerializerConfig, JsonSerializerOptions, SerializerConfig},
5    },
6    configurable::configurable_component,
7    sensitive_string::SensitiveString,
8};
9
10use crate::{
11    codecs::{EncodingConfigWithFraming, Transformer},
12    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
13    http::Auth as HttpAuthConfig,
14    sinks::{
15        Healthcheck, VectorSink,
16        http::config::{HttpMethod, HttpSinkConfig},
17        util::{
18            BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
19        },
20    },
21    tls::TlsConfig,
22};
23
24static CLOUD_URL: &str = "https://api.axiom.co";
25
26/// Configuration for the `axiom` sink.
27#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
28#[derive(Clone, Debug, Default)]
29pub struct AxiomConfig {
30    /// URI of the Axiom endpoint to send data to.
31    ///
32    /// Only required if not using Axiom Cloud.
33    #[configurable(validation(format = "uri"))]
34    #[configurable(metadata(docs::examples = "https://axiom.my-domain.com"))]
35    #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
36    url: Option<String>,
37
38    /// The Axiom organization ID.
39    ///
40    /// Only required when using personal tokens.
41    #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
42    #[configurable(metadata(docs::examples = "123abc"))]
43    org_id: Option<String>,
44
45    /// The Axiom API token.
46    #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
47    #[configurable(metadata(docs::examples = "123abc"))]
48    token: SensitiveString,
49
50    /// The Axiom dataset to write to.
51    #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
52    #[configurable(metadata(docs::examples = "vector_rocks"))]
53    dataset: String,
54
55    #[configurable(derived)]
56    #[serde(default)]
57    request: RequestConfig,
58
59    /// The compression algorithm to use.
60    #[configurable(derived)]
61    #[serde(default = "Compression::zstd_default")]
62    compression: Compression,
63
64    /// The TLS settings for the connection.
65    ///
66    /// Optional, constrains TLS settings for this sink.
67    #[configurable(derived)]
68    tls: Option<TlsConfig>,
69
70    /// The batch settings for the sink.
71    #[configurable(derived)]
72    #[serde(default)]
73    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
74
75    /// Controls how acknowledgements are handled for this sink.
76    #[configurable(derived)]
77    #[serde(
78        default,
79        deserialize_with = "crate::serde::bool_or_struct",
80        skip_serializing_if = "crate::serde::is_default"
81    )]
82    acknowledgements: AcknowledgementsConfig,
83}
84
85impl GenerateConfig for AxiomConfig {
86    fn generate_config() -> toml::Value {
87        toml::from_str(
88            r#"token = "${AXIOM_TOKEN}"
89            dataset = "${AXIOM_DATASET}"
90            url = "${AXIOM_URL}"
91            org_id = "${AXIOM_ORG_ID}""#,
92        )
93        .unwrap()
94    }
95}
96
97#[async_trait::async_trait]
98#[typetag::serde(name = "axiom")]
99impl SinkConfig for AxiomConfig {
100    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
101        let mut request = self.request.clone();
102        if let Some(org_id) = &self.org_id {
103            // NOTE: Only add the org id header if an org id is provided
104            request
105                .headers
106                .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
107        }
108
109        // Axiom has a custom high-performance database that can be ingested
110        // into using the native HTTP ingest endpoint. This configuration wraps
111        // the vector HTTP sink with the necessary adjustments to send data
112        // to Axiom, whilst keeping the configuration simple and easy to use
113        // and maintenance of the vector axiom sink to a minimum.
114        //
115        let http_sink_config = HttpSinkConfig {
116            uri: self.build_endpoint().try_into()?,
117            compression: self.compression,
118            auth: Some(HttpAuthConfig::Bearer {
119                token: self.token.clone(),
120            }),
121            method: HttpMethod::Post,
122            tls: self.tls.clone(),
123            request,
124            acknowledgements: self.acknowledgements,
125            batch: self.batch,
126            headers: None,
127            encoding: EncodingConfigWithFraming::new(
128                Some(FramingConfig::NewlineDelimited),
129                SerializerConfig::Json(JsonSerializerConfig {
130                    metric_tag_values: MetricTagValues::Single,
131                    options: JsonSerializerOptions { pretty: false }, // Minified JSON
132                }),
133                Transformer::default(),
134            ),
135            payload_prefix: "".into(), // Always newline delimited JSON
136            payload_suffix: "".into(), // Always newline delimited JSON
137        };
138
139        http_sink_config.build(cx).await
140    }
141
142    fn input(&self) -> Input {
143        Input::new(DataType::Metric | DataType::Log | DataType::Trace)
144    }
145
146    fn acknowledgements(&self) -> &AcknowledgementsConfig {
147        &self.acknowledgements
148    }
149}
150
151impl AxiomConfig {
152    fn build_endpoint(&self) -> String {
153        let url = if let Some(url) = self.url.as_ref() {
154            url.clone()
155        } else {
156            CLOUD_URL.to_string()
157        };
158
159        // NOTE trim any trailing slashes to avoid redundant rewriting or 301 redirects from intermediate proxies
160        // NOTE Most axiom users will not need to configure a url, this is for the other 1%
161        let url = url.trim_end_matches('/');
162
163        format!("{}/v1/datasets/{}/ingest", url, self.dataset)
164    }
165}
166
167#[cfg(test)]
168mod test {
169    #[test]
170    fn generate_config() {
171        crate::test_util::test_generate_config::<super::AxiomConfig>();
172
173        let config = super::AxiomConfig {
174            url: Some("https://axiom.my-domain.com///".to_string()),
175            org_id: None,
176            dataset: "vector_rocks".to_string(),
177            ..Default::default()
178        };
179        let endpoint = config.build_endpoint();
180        assert_eq!(
181            endpoint,
182            "https://axiom.my-domain.com/v1/datasets/vector_rocks/ingest"
183        );
184    }
185}
186
187#[cfg(feature = "axiom-integration-tests")]
188#[cfg(test)]
189mod integration_tests {
190    use std::env;
191
192    use chrono::{DateTime, Duration, Utc};
193    use futures::stream;
194    use serde::{Deserialize, Serialize};
195    use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
196
197    use super::*;
198    use crate::{
199        config::SinkContext,
200        sinks::axiom::AxiomConfig,
201        test_util::components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
202    };
203
204    #[tokio::test]
205    async fn axiom_logs_put_data() {
206        let client = reqwest::Client::new();
207        let url = env::var("AXIOM_URL").unwrap();
208        let token = env::var("AXIOM_TOKEN").expect("AXIOM_TOKEN environment variable to be set");
209        assert!(!token.is_empty(), "$AXIOM_TOKEN required");
210        let dataset = env::var("AXIOM_DATASET").unwrap();
211        let org_id = env::var("AXIOM_ORG_ID").unwrap();
212
213        let cx = SinkContext::default();
214
215        let config = AxiomConfig {
216            url: Some(url.clone()),
217            token: token.clone().into(),
218            dataset: dataset.clone(),
219            org_id: Some(org_id.clone()),
220            ..Default::default()
221        };
222
223        // create unique test id so tests can run in parallel
224        let test_id = uuid::Uuid::new_v4().to_string();
225
226        let (sink, _) = config.build(cx).await.unwrap();
227
228        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
229
230        let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
231        event1.insert("host", "aws.cloud.eur");
232        event1.insert("source_type", "file");
233        event1.insert("test_id", test_id.clone());
234
235        let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
236        event2.insert("host", "aws.cloud.eur");
237        event2.insert("source_type", "file");
238        event2.insert("test_id", test_id.clone());
239
240        drop(batch);
241
242        let events = vec![Event::Log(event1), Event::Log(event2)];
243
244        run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
245
246        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
247
248        #[derive(Serialize)]
249        struct QueryRequest {
250            apl: String,
251            #[serde(rename = "endTime")]
252            end_time: DateTime<Utc>,
253            #[serde(rename = "startTime")]
254            start_time: DateTime<Utc>,
255            // ...
256        }
257
258        #[derive(Deserialize, Debug)]
259        struct QueryResponseMatch {
260            data: serde_json::Value,
261            // ...
262        }
263
264        #[derive(Deserialize, Debug)]
265        struct QueryResponse {
266            matches: Vec<QueryResponseMatch>,
267            // ...
268        }
269
270        let query_req = QueryRequest {
271            apl: format!(
272                "['{dataset}'] | where test_id == '{test_id}' | order by _time desc | limit 2"
273            ),
274            start_time: Utc::now() - Duration::minutes(10),
275            end_time: Utc::now() + Duration::minutes(10),
276        };
277        let query_res: QueryResponse = client
278            .post(format!("{url}/v1/datasets/_apl?format=legacy"))
279            .header("X-Axiom-Org-Id", org_id)
280            .header("Authorization", format!("Bearer {token}"))
281            .json(&query_req)
282            .send()
283            .await
284            .unwrap()
285            .error_for_status()
286            .unwrap()
287            .json()
288            .await
289            .unwrap();
290
291        assert_eq!(2, query_res.matches.len());
292
293        let fst = match query_res.matches[0].data {
294            serde_json::Value::Object(ref obj) => obj,
295            _ => panic!("Unexpected value, expected object"),
296        };
297        // Note that we order descending, so message_2 comes first
298        assert_eq!("message_2", fst.get("message").unwrap().as_str().unwrap());
299
300        let snd = match query_res.matches[1].data {
301            serde_json::Value::Object(ref obj) => obj,
302            _ => panic!("Unexpected value, expected object"),
303        };
304        assert_eq!("message_1", snd.get("message").unwrap().as_str().unwrap());
305    }
306}