vector/sinks/
axiom.rs

1use vector_lib::codecs::encoding::FramingConfig;
2use vector_lib::codecs::encoding::JsonSerializerConfig;
3use vector_lib::codecs::encoding::JsonSerializerOptions;
4use vector_lib::codecs::encoding::SerializerConfig;
5use vector_lib::codecs::MetricTagValues;
6use vector_lib::configurable::configurable_component;
7use vector_lib::sensitive_string::SensitiveString;
8
9use crate::{
10    codecs::{EncodingConfigWithFraming, Transformer},
11    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
12    http::Auth as HttpAuthConfig,
13    sinks::{
14        http::config::{HttpMethod, HttpSinkConfig},
15        util::{
16            http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
17        },
18        Healthcheck, VectorSink,
19    },
20    tls::TlsConfig,
21};
22
23static CLOUD_URL: &str = "https://api.axiom.co";
24
25/// Configuration for the `axiom` sink.
26#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
27#[derive(Clone, Debug, Default)]
28pub struct AxiomConfig {
29    /// URI of the Axiom endpoint to send data to.
30    ///
31    /// Only required if not using Axiom Cloud.
32    #[configurable(validation(format = "uri"))]
33    #[configurable(metadata(docs::examples = "https://axiom.my-domain.com"))]
34    #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
35    url: Option<String>,
36
37    /// The Axiom organization ID.
38    ///
39    /// Only required when using personal tokens.
40    #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
41    #[configurable(metadata(docs::examples = "123abc"))]
42    org_id: Option<String>,
43
44    /// The Axiom API token.
45    #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
46    #[configurable(metadata(docs::examples = "123abc"))]
47    token: SensitiveString,
48
49    /// The Axiom dataset to write to.
50    #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
51    #[configurable(metadata(docs::examples = "vector_rocks"))]
52    dataset: String,
53
54    #[configurable(derived)]
55    #[serde(default)]
56    request: RequestConfig,
57
58    /// The compression algorithm to use.
59    #[configurable(derived)]
60    #[serde(default = "Compression::zstd_default")]
61    compression: Compression,
62
63    /// The TLS settings for the connection.
64    ///
65    /// Optional, constrains TLS settings for this sink.
66    #[configurable(derived)]
67    tls: Option<TlsConfig>,
68
69    /// The batch settings for the sink.
70    #[configurable(derived)]
71    #[serde(default)]
72    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
73
74    /// Controls how acknowledgements are handled for this sink.
75    #[configurable(derived)]
76    #[serde(
77        default,
78        deserialize_with = "crate::serde::bool_or_struct",
79        skip_serializing_if = "crate::serde::is_default"
80    )]
81    acknowledgements: AcknowledgementsConfig,
82}
83
84impl GenerateConfig for AxiomConfig {
85    fn generate_config() -> toml::Value {
86        toml::from_str(
87            r#"token = "${AXIOM_TOKEN}"
88            dataset = "${AXIOM_DATASET}"
89            url = "${AXIOM_URL}"
90            org_id = "${AXIOM_ORG_ID}""#,
91        )
92        .unwrap()
93    }
94}
95
96#[async_trait::async_trait]
97#[typetag::serde(name = "axiom")]
98impl SinkConfig for AxiomConfig {
99    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
100        let mut request = self.request.clone();
101        if let Some(org_id) = &self.org_id {
102            // NOTE: Only add the org id header if an org id is provided
103            request
104                .headers
105                .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
106        }
107
108        // Axiom has a custom high-performance database that can be ingested
109        // into using the native HTTP ingest endpoint. This configuration wraps
110        // the vector HTTP sink with the necessary adjustments to send data
111        // to Axiom, whilst keeping the configuration simple and easy to use
112        // and maintenance of the vector axiom sink to a minimum.
113        //
114        let http_sink_config = HttpSinkConfig {
115            uri: self.build_endpoint().try_into()?,
116            compression: self.compression,
117            auth: Some(HttpAuthConfig::Bearer {
118                token: self.token.clone(),
119            }),
120            method: HttpMethod::Post,
121            tls: self.tls.clone(),
122            request,
123            acknowledgements: self.acknowledgements,
124            batch: self.batch,
125            headers: None,
126            encoding: EncodingConfigWithFraming::new(
127                Some(FramingConfig::NewlineDelimited),
128                SerializerConfig::Json(JsonSerializerConfig {
129                    metric_tag_values: MetricTagValues::Single,
130                    options: JsonSerializerOptions { pretty: false }, // Minified JSON
131                }),
132                Transformer::default(),
133            ),
134            payload_prefix: "".into(), // Always newline delimited JSON
135            payload_suffix: "".into(), // Always newline delimited JSON
136        };
137
138        http_sink_config.build(cx).await
139    }
140
141    fn input(&self) -> Input {
142        Input::new(DataType::Metric | DataType::Log | DataType::Trace)
143    }
144
145    fn acknowledgements(&self) -> &AcknowledgementsConfig {
146        &self.acknowledgements
147    }
148}
149
150impl AxiomConfig {
151    fn build_endpoint(&self) -> String {
152        let url = if let Some(url) = self.url.as_ref() {
153            url.clone()
154        } else {
155            CLOUD_URL.to_string()
156        };
157
158        // NOTE trim any trailing slashes to avoid redundant rewriting or 301 redirects from intermediate proxies
159        // NOTE Most axiom users will not need to configure a url, this is for the other 1%
160        let url = url.trim_end_matches('/');
161
162        format!("{}/v1/datasets/{}/ingest", url, self.dataset)
163    }
164}
165
166#[cfg(test)]
167mod test {
168    #[test]
169    fn generate_config() {
170        crate::test_util::test_generate_config::<super::AxiomConfig>();
171
172        let config = super::AxiomConfig {
173            url: Some("https://axiom.my-domain.com///".to_string()),
174            org_id: None,
175            dataset: "vector_rocks".to_string(),
176            ..Default::default()
177        };
178        let endpoint = config.build_endpoint();
179        assert_eq!(
180            endpoint,
181            "https://axiom.my-domain.com/v1/datasets/vector_rocks/ingest"
182        );
183    }
184}
185
186#[cfg(feature = "axiom-integration-tests")]
187#[cfg(test)]
188mod integration_tests {
189    use chrono::{DateTime, Duration, Utc};
190    use futures::stream;
191    use serde::{Deserialize, Serialize};
192    use std::env;
193    use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
194
195    use super::*;
196    use crate::{
197        config::SinkContext,
198        sinks::axiom::AxiomConfig,
199        test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
200    };
201
202    #[tokio::test]
203    async fn axiom_logs_put_data() {
204        let client = reqwest::Client::new();
205        let url = env::var("AXIOM_URL").unwrap();
206        let token = env::var("AXIOM_TOKEN").expect("AXIOM_TOKEN environment variable to be set");
207        assert!(!token.is_empty(), "$AXIOM_TOKEN required");
208        let dataset = env::var("AXIOM_DATASET").unwrap();
209        let org_id = env::var("AXIOM_ORG_ID").unwrap();
210
211        let cx = SinkContext::default();
212
213        let config = AxiomConfig {
214            url: Some(url.clone()),
215            token: token.clone().into(),
216            dataset: dataset.clone(),
217            org_id: Some(org_id.clone()),
218            ..Default::default()
219        };
220
221        // create unique test id so tests can run in parallel
222        let test_id = uuid::Uuid::new_v4().to_string();
223
224        let (sink, _) = config.build(cx).await.unwrap();
225
226        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
227
228        let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
229        event1.insert("host", "aws.cloud.eur");
230        event1.insert("source_type", "file");
231        event1.insert("test_id", test_id.clone());
232
233        let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
234        event2.insert("host", "aws.cloud.eur");
235        event2.insert("source_type", "file");
236        event2.insert("test_id", test_id.clone());
237
238        drop(batch);
239
240        let events = vec![Event::Log(event1), Event::Log(event2)];
241
242        run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
243
244        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
245
246        #[derive(Serialize)]
247        struct QueryRequest {
248            apl: String,
249            #[serde(rename = "endTime")]
250            end_time: DateTime<Utc>,
251            #[serde(rename = "startTime")]
252            start_time: DateTime<Utc>,
253            // ...
254        }
255
256        #[derive(Deserialize, Debug)]
257        struct QueryResponseMatch {
258            data: serde_json::Value,
259            // ...
260        }
261
262        #[derive(Deserialize, Debug)]
263        struct QueryResponse {
264            matches: Vec<QueryResponseMatch>,
265            // ...
266        }
267
268        let query_req = QueryRequest {
269            apl: format!(
270                "['{dataset}'] | where test_id == '{test_id}' | order by _time desc | limit 2"
271            ),
272            start_time: Utc::now() - Duration::minutes(10),
273            end_time: Utc::now() + Duration::minutes(10),
274        };
275        let query_res: QueryResponse = client
276            .post(format!("{url}/v1/datasets/_apl?format=legacy"))
277            .header("X-Axiom-Org-Id", org_id)
278            .header("Authorization", format!("Bearer {token}"))
279            .json(&query_req)
280            .send()
281            .await
282            .unwrap()
283            .error_for_status()
284            .unwrap()
285            .json()
286            .await
287            .unwrap();
288
289        assert_eq!(2, query_res.matches.len());
290
291        let fst = match query_res.matches[0].data {
292            serde_json::Value::Object(ref obj) => obj,
293            _ => panic!("Unexpected value, expected object"),
294        };
295        // Note that we order descending, so message_2 comes first
296        assert_eq!("message_2", fst.get("message").unwrap().as_str().unwrap());
297
298        let snd = match query_res.matches[1].data {
299            serde_json::Value::Object(ref obj) => obj,
300            _ => panic!("Unexpected value, expected object"),
301        };
302        assert_eq!("message_1", snd.get("message").unwrap().as_str().unwrap());
303    }
304}