1use vector_lib::codecs::encoding::FramingConfig;
2use vector_lib::codecs::encoding::JsonSerializerConfig;
3use vector_lib::codecs::encoding::JsonSerializerOptions;
4use vector_lib::codecs::encoding::SerializerConfig;
5use vector_lib::codecs::MetricTagValues;
6use vector_lib::configurable::configurable_component;
7use vector_lib::sensitive_string::SensitiveString;
8
9use crate::{
10 codecs::{EncodingConfigWithFraming, Transformer},
11 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
12 http::Auth as HttpAuthConfig,
13 sinks::{
14 http::config::{HttpMethod, HttpSinkConfig},
15 util::{
16 http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
17 },
18 Healthcheck, VectorSink,
19 },
20 tls::TlsConfig,
21};
22
23static CLOUD_URL: &str = "https://api.axiom.co";
24
25#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
27#[derive(Clone, Debug, Default)]
28pub struct AxiomConfig {
29 #[configurable(validation(format = "uri"))]
33 #[configurable(metadata(docs::examples = "https://axiom.my-domain.com"))]
34 #[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
35 url: Option<String>,
36
37 #[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
41 #[configurable(metadata(docs::examples = "123abc"))]
42 org_id: Option<String>,
43
44 #[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
46 #[configurable(metadata(docs::examples = "123abc"))]
47 token: SensitiveString,
48
49 #[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
51 #[configurable(metadata(docs::examples = "vector_rocks"))]
52 dataset: String,
53
54 #[configurable(derived)]
55 #[serde(default)]
56 request: RequestConfig,
57
58 #[configurable(derived)]
60 #[serde(default = "Compression::zstd_default")]
61 compression: Compression,
62
63 #[configurable(derived)]
67 tls: Option<TlsConfig>,
68
69 #[configurable(derived)]
71 #[serde(default)]
72 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
73
74 #[configurable(derived)]
76 #[serde(
77 default,
78 deserialize_with = "crate::serde::bool_or_struct",
79 skip_serializing_if = "crate::serde::is_default"
80 )]
81 acknowledgements: AcknowledgementsConfig,
82}
83
84impl GenerateConfig for AxiomConfig {
85 fn generate_config() -> toml::Value {
86 toml::from_str(
87 r#"token = "${AXIOM_TOKEN}"
88 dataset = "${AXIOM_DATASET}"
89 url = "${AXIOM_URL}"
90 org_id = "${AXIOM_ORG_ID}""#,
91 )
92 .unwrap()
93 }
94}
95
96#[async_trait::async_trait]
97#[typetag::serde(name = "axiom")]
98impl SinkConfig for AxiomConfig {
99 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
100 let mut request = self.request.clone();
101 if let Some(org_id) = &self.org_id {
102 request
104 .headers
105 .insert("X-Axiom-Org-Id".to_string(), org_id.clone());
106 }
107
108 let http_sink_config = HttpSinkConfig {
115 uri: self.build_endpoint().try_into()?,
116 compression: self.compression,
117 auth: Some(HttpAuthConfig::Bearer {
118 token: self.token.clone(),
119 }),
120 method: HttpMethod::Post,
121 tls: self.tls.clone(),
122 request,
123 acknowledgements: self.acknowledgements,
124 batch: self.batch,
125 headers: None,
126 encoding: EncodingConfigWithFraming::new(
127 Some(FramingConfig::NewlineDelimited),
128 SerializerConfig::Json(JsonSerializerConfig {
129 metric_tag_values: MetricTagValues::Single,
130 options: JsonSerializerOptions { pretty: false }, }),
132 Transformer::default(),
133 ),
134 payload_prefix: "".into(), payload_suffix: "".into(), };
137
138 http_sink_config.build(cx).await
139 }
140
141 fn input(&self) -> Input {
142 Input::new(DataType::Metric | DataType::Log | DataType::Trace)
143 }
144
145 fn acknowledgements(&self) -> &AcknowledgementsConfig {
146 &self.acknowledgements
147 }
148}
149
150impl AxiomConfig {
151 fn build_endpoint(&self) -> String {
152 let url = if let Some(url) = self.url.as_ref() {
153 url.clone()
154 } else {
155 CLOUD_URL.to_string()
156 };
157
158 let url = url.trim_end_matches('/');
161
162 format!("{}/v1/datasets/{}/ingest", url, self.dataset)
163 }
164}
165
166#[cfg(test)]
167mod test {
168 #[test]
169 fn generate_config() {
170 crate::test_util::test_generate_config::<super::AxiomConfig>();
171
172 let config = super::AxiomConfig {
173 url: Some("https://axiom.my-domain.com///".to_string()),
174 org_id: None,
175 dataset: "vector_rocks".to_string(),
176 ..Default::default()
177 };
178 let endpoint = config.build_endpoint();
179 assert_eq!(
180 endpoint,
181 "https://axiom.my-domain.com/v1/datasets/vector_rocks/ingest"
182 );
183 }
184}
185
186#[cfg(feature = "axiom-integration-tests")]
187#[cfg(test)]
188mod integration_tests {
189 use chrono::{DateTime, Duration, Utc};
190 use futures::stream;
191 use serde::{Deserialize, Serialize};
192 use std::env;
193 use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
194
195 use super::*;
196 use crate::{
197 config::SinkContext,
198 sinks::axiom::AxiomConfig,
199 test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
200 };
201
202 #[tokio::test]
203 async fn axiom_logs_put_data() {
204 let client = reqwest::Client::new();
205 let url = env::var("AXIOM_URL").unwrap();
206 let token = env::var("AXIOM_TOKEN").expect("AXIOM_TOKEN environment variable to be set");
207 assert!(!token.is_empty(), "$AXIOM_TOKEN required");
208 let dataset = env::var("AXIOM_DATASET").unwrap();
209 let org_id = env::var("AXIOM_ORG_ID").unwrap();
210
211 let cx = SinkContext::default();
212
213 let config = AxiomConfig {
214 url: Some(url.clone()),
215 token: token.clone().into(),
216 dataset: dataset.clone(),
217 org_id: Some(org_id.clone()),
218 ..Default::default()
219 };
220
221 let test_id = uuid::Uuid::new_v4().to_string();
223
224 let (sink, _) = config.build(cx).await.unwrap();
225
226 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
227
228 let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
229 event1.insert("host", "aws.cloud.eur");
230 event1.insert("source_type", "file");
231 event1.insert("test_id", test_id.clone());
232
233 let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
234 event2.insert("host", "aws.cloud.eur");
235 event2.insert("source_type", "file");
236 event2.insert("test_id", test_id.clone());
237
238 drop(batch);
239
240 let events = vec![Event::Log(event1), Event::Log(event2)];
241
242 run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
243
244 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
245
246 #[derive(Serialize)]
247 struct QueryRequest {
248 apl: String,
249 #[serde(rename = "endTime")]
250 end_time: DateTime<Utc>,
251 #[serde(rename = "startTime")]
252 start_time: DateTime<Utc>,
253 }
255
256 #[derive(Deserialize, Debug)]
257 struct QueryResponseMatch {
258 data: serde_json::Value,
259 }
261
262 #[derive(Deserialize, Debug)]
263 struct QueryResponse {
264 matches: Vec<QueryResponseMatch>,
265 }
267
268 let query_req = QueryRequest {
269 apl: format!(
270 "['{dataset}'] | where test_id == '{test_id}' | order by _time desc | limit 2"
271 ),
272 start_time: Utc::now() - Duration::minutes(10),
273 end_time: Utc::now() + Duration::minutes(10),
274 };
275 let query_res: QueryResponse = client
276 .post(format!("{url}/v1/datasets/_apl?format=legacy"))
277 .header("X-Axiom-Org-Id", org_id)
278 .header("Authorization", format!("Bearer {token}"))
279 .json(&query_req)
280 .send()
281 .await
282 .unwrap()
283 .error_for_status()
284 .unwrap()
285 .json()
286 .await
287 .unwrap();
288
289 assert_eq!(2, query_res.matches.len());
290
291 let fst = match query_res.matches[0].data {
292 serde_json::Value::Object(ref obj) => obj,
293 _ => panic!("Unexpected value, expected object"),
294 };
295 assert_eq!("message_2", fst.get("message").unwrap().as_str().unwrap());
297
298 let snd = match query_res.matches[1].data {
299 serde_json::Value::Object(ref obj) => obj,
300 _ => panic!("Unexpected value, expected object"),
301 };
302 assert_eq!("message_1", snd.get("message").unwrap().as_str().unwrap());
303 }
304}