1use vector_lib::{
2 codecs::{
3 MetricTagValues,
4 encoding::{FramingConfig, JsonSerializerConfig, JsonSerializerOptions, SerializerConfig},
5 },
6 configurable::configurable_component,
7 sensitive_string::SensitiveString,
8};
9
10use crate::{
11 codecs::{EncodingConfigWithFraming, Transformer},
12 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
13 http::Auth as HttpAuthConfig,
14 sinks::{
15 Healthcheck, VectorSink,
16 http::config::{HttpMethod, HttpSinkConfig},
17 util::{
18 BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings, http::RequestConfig,
19 },
20 },
21 tls::TlsConfig,
22};
23
24static CLOUD_URL: &str = "https://api.axiom.co";
25
26#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
28#[derive(Clone, Debug, Default)]
29pub struct AxiomConfig {
30 #[configurable(validation(format = "uri"))]
34 #[configurable(metadata(docs::examples = "https://axiom.my-domain.com"))]
35 #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
36 url: Option<String>,
37
38 #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
42 #[configurable(metadata(docs::examples = "123abc"))]
43 org_id: Option<String>,
44
45 #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
47 #[configurable(metadata(docs::examples = "123abc"))]
48 token: SensitiveString,
49
50 #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
52 #[configurable(metadata(docs::examples = "vector_rocks"))]
53 dataset: String,
54
55 #[configurable(derived)]
56 #[serde(default)]
57 request: RequestConfig,
58
59 #[configurable(derived)]
61 #[serde(default = "Compression::zstd_default")]
62 compression: Compression,
63
64 #[configurable(derived)]
68 tls: Option<TlsConfig>,
69
70 #[configurable(derived)]
72 #[serde(default)]
73 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
74
75 #[configurable(derived)]
77 #[serde(
78 default,
79 deserialize_with = "crate::serde::bool_or_struct",
80 skip_serializing_if = "crate::serde::is_default"
81 )]
82 acknowledgements: AcknowledgementsConfig,
83}
84
85impl GenerateConfig for AxiomConfig {
86 fn generate_config() -> toml::Value {
87 toml::from_str(
88 r#"token = "${AXIOM_TOKEN}"
89 dataset = "${AXIOM_DATASET}"
90 url = "${AXIOM_URL}"
91 org_id = "${AXIOM_ORG_ID}""#,
92 )
93 .unwrap()
94 }
95}
96
97#[async_trait::async_trait]
98#[typetag::serde(name = "axiom")]
99impl SinkConfig for AxiomConfig {
100 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
101 let mut request = self.request.clone();
102 if let Some(org_id) = &self.org_id {
103 request
105 .headers
106 .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
107 }
108
109 let http_sink_config = HttpSinkConfig {
116 uri: self.build_endpoint().try_into()?,
117 compression: self.compression,
118 auth: Some(HttpAuthConfig::Bearer {
119 token: self.token.clone(),
120 }),
121 method: HttpMethod::Post,
122 tls: self.tls.clone(),
123 request,
124 acknowledgements: self.acknowledgements,
125 batch: self.batch,
126 headers: None,
127 encoding: EncodingConfigWithFraming::new(
128 Some(FramingConfig::NewlineDelimited),
129 SerializerConfig::Json(JsonSerializerConfig {
130 metric_tag_values: MetricTagValues::Single,
131 options: JsonSerializerOptions { pretty: false }, }),
133 Transformer::default(),
134 ),
135 payload_prefix: "".into(), payload_suffix: "".into(), };
138
139 http_sink_config.build(cx).await
140 }
141
142 fn input(&self) -> Input {
143 Input::new(DataType::Metric | DataType::Log | DataType::Trace)
144 }
145
146 fn acknowledgements(&self) -> &AcknowledgementsConfig {
147 &self.acknowledgements
148 }
149}
150
151impl AxiomConfig {
152 fn build_endpoint(&self) -> String {
153 let url = if let Some(url) = self.url.as_ref() {
154 url.clone()
155 } else {
156 CLOUD_URL.to_string()
157 };
158
159 let url = url.trim_end_matches('/');
162
163 format!("{}/v1/datasets/{}/ingest", url, self.dataset)
164 }
165}
166
167#[cfg(test)]
168mod test {
169 #[test]
170 fn generate_config() {
171 crate::test_util::test_generate_config::<super::AxiomConfig>();
172
173 let config = super::AxiomConfig {
174 url: Some("https://axiom.my-domain.com///".to_string()),
175 org_id: None,
176 dataset: "vector_rocks".to_string(),
177 ..Default::default()
178 };
179 let endpoint = config.build_endpoint();
180 assert_eq!(
181 endpoint,
182 "https://axiom.my-domain.com/v1/datasets/vector_rocks/ingest"
183 );
184 }
185}
186
187#[cfg(feature = "axiom-integration-tests")]
188#[cfg(test)]
189mod integration_tests {
190 use std::env;
191
192 use chrono::{DateTime, Duration, Utc};
193 use futures::stream;
194 use serde::{Deserialize, Serialize};
195 use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
196
197 use super::*;
198 use crate::{
199 config::SinkContext,
200 sinks::axiom::AxiomConfig,
201 test_util::components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
202 };
203
204 #[tokio::test]
205 async fn axiom_logs_put_data() {
206 let client = reqwest::Client::new();
207 let url = env::var("AXIOM_URL").unwrap();
208 let token = env::var("AXIOM_TOKEN").expect("AXIOM_TOKEN environment variable to be set");
209 assert!(!token.is_empty(), "$AXIOM_TOKEN required");
210 let dataset = env::var("AXIOM_DATASET").unwrap();
211 let org_id = env::var("AXIOM_ORG_ID").unwrap();
212
213 let cx = SinkContext::default();
214
215 let config = AxiomConfig {
216 url: Some(url.clone()),
217 token: token.clone().into(),
218 dataset: dataset.clone(),
219 org_id: Some(org_id.clone()),
220 ..Default::default()
221 };
222
223 let test_id = uuid::Uuid::new_v4().to_string();
225
226 let (sink, _) = config.build(cx).await.unwrap();
227
228 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
229
230 let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
231 event1.insert("host", "aws.cloud.eur");
232 event1.insert("source_type", "file");
233 event1.insert("test_id", test_id.clone());
234
235 let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
236 event2.insert("host", "aws.cloud.eur");
237 event2.insert("source_type", "file");
238 event2.insert("test_id", test_id.clone());
239
240 drop(batch);
241
242 let events = vec![Event::Log(event1), Event::Log(event2)];
243
244 run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
245
246 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
247
248 #[derive(Serialize)]
249 struct QueryRequest {
250 apl: String,
251 #[serde(rename = "endTime")]
252 end_time: DateTime<Utc>,
253 #[serde(rename = "startTime")]
254 start_time: DateTime<Utc>,
255 }
257
258 #[derive(Deserialize, Debug)]
259 struct QueryResponseMatch {
260 data: serde_json::Value,
261 }
263
264 #[derive(Deserialize, Debug)]
265 struct QueryResponse {
266 matches: Vec<QueryResponseMatch>,
267 }
269
270 let query_req = QueryRequest {
271 apl: format!(
272 "['{dataset}'] | where test_id == '{test_id}' | order by _time desc | limit 2"
273 ),
274 start_time: Utc::now() - Duration::minutes(10),
275 end_time: Utc::now() + Duration::minutes(10),
276 };
277 let query_res: QueryResponse = client
278 .post(format!("{url}/v1/datasets/_apl?format=legacy"))
279 .header("X-Axiom-Org-Id", org_id)
280 .header("Authorization", format!("Bearer {token}"))
281 .json(&query_req)
282 .send()
283 .await
284 .unwrap()
285 .error_for_status()
286 .unwrap()
287 .json()
288 .await
289 .unwrap();
290
291 assert_eq!(2, query_res.matches.len());
292
293 let fst = match query_res.matches[0].data {
294 serde_json::Value::Object(ref obj) => obj,
295 _ => panic!("Unexpected value, expected object"),
296 };
297 assert_eq!("message_2", fst.get("message").unwrap().as_str().unwrap());
299
300 let snd = match query_res.matches[1].data {
301 serde_json::Value::Object(ref obj) => obj,
302 _ => panic!("Unexpected value, expected object"),
303 };
304 assert_eq!("message_1", snd.get("message").unwrap().as_str().unwrap());
305 }
306}