use vector_lib::codecs::encoding::FramingConfig;
use vector_lib::codecs::encoding::JsonSerializerConfig;
use vector_lib::codecs::encoding::JsonSerializerOptions;
use vector_lib::codecs::encoding::SerializerConfig;
use vector_lib::codecs::MetricTagValues;
use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;
use crate::{
codecs::{EncodingConfigWithFraming, Transformer},
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
http::Auth as HttpAuthConfig,
sinks::{
http::config::{HttpMethod, HttpSinkConfig},
util::{
http::RequestConfig, BatchConfig, Compression, RealtimeSizeBasedDefaultBatchSettings,
},
Healthcheck, VectorSink,
},
tls::TlsConfig,
};
static CLOUD_URL: &str = "https://api.axiom.co";
#[configurable_component(sink("axiom", "Deliver log events to Axiom."))]
#[derive(Clone, Debug, Default)]
pub struct AxiomConfig {
#[configurable(validation(format = "uri"))]
#[configurable(metadata(docs::examples = "https://axiom.my-domain.com"))]
#[configurable(metadata(docs::examples = "${AXIOM_URL}"))]
url: Option<String>,
#[configurable(metadata(docs::examples = "${AXIOM_ORG_ID}"))]
#[configurable(metadata(docs::examples = "123abc"))]
org_id: Option<String>,
#[configurable(metadata(docs::examples = "${AXIOM_TOKEN}"))]
#[configurable(metadata(docs::examples = "123abc"))]
token: SensitiveString,
#[configurable(metadata(docs::examples = "${AXIOM_DATASET}"))]
#[configurable(metadata(docs::examples = "vector_rocks"))]
dataset: String,
#[configurable(derived)]
#[serde(default)]
request: RequestConfig,
#[configurable(derived)]
#[serde(default = "Compression::zstd_default")]
compression: Compression,
#[configurable(derived)]
tls: Option<TlsConfig>,
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,
}
impl GenerateConfig for AxiomConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"token = "${AXIOM_TOKEN}"
dataset = "${AXIOM_DATASET}"
url = "${AXIOM_URL}"
org_id = "${AXIOM_ORG_ID}""#,
)
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "axiom")]
impl SinkConfig for AxiomConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let mut request = self.request.clone();
if let Some(org_id) = &self.org_id {
request
.headers
.insert("X-Axiom-Org-Id".to_string(), org_id.clone());
}
let http_sink_config = HttpSinkConfig {
uri: self.build_endpoint().try_into()?,
compression: self.compression,
auth: Some(HttpAuthConfig::Bearer {
token: self.token.clone(),
}),
method: HttpMethod::Post,
tls: self.tls.clone(),
authorization_config: None,
request,
acknowledgements: self.acknowledgements,
batch: self.batch,
headers: None,
encoding: EncodingConfigWithFraming::new(
Some(FramingConfig::NewlineDelimited),
SerializerConfig::Json(JsonSerializerConfig {
metric_tag_values: MetricTagValues::Single,
options: JsonSerializerOptions { pretty: false }, }),
Transformer::default(),
),
payload_prefix: "".into(), payload_suffix: "".into(), };
http_sink_config.build(cx).await
}
fn input(&self) -> Input {
Input::new(DataType::Metric | DataType::Log)
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
impl AxiomConfig {
fn build_endpoint(&self) -> String {
let url = if let Some(url) = self.url.as_ref() {
url.clone()
} else {
CLOUD_URL.to_string()
};
let url = url.trim_end_matches('/');
format!("{}/v1/datasets/{}/ingest", url, self.dataset)
}
}
#[cfg(test)]
mod test {
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<super::AxiomConfig>();
let config = super::AxiomConfig {
url: Some("https://axiom.my-domain.com///".to_string()),
org_id: None,
dataset: "vector_rocks".to_string(),
..Default::default()
};
let endpoint = config.build_endpoint();
assert_eq!(
endpoint,
"https://axiom.my-domain.com/v1/datasets/vector_rocks/ingest"
);
}
}
#[cfg(feature = "axiom-integration-tests")]
#[cfg(test)]
mod integration_tests {
use chrono::{DateTime, Duration, Utc};
use futures::stream;
use serde::{Deserialize, Serialize};
use std::env;
use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
use super::*;
use crate::{
config::SinkContext,
sinks::axiom::AxiomConfig,
test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
};
#[tokio::test]
async fn axiom_logs_put_data() {
let client = reqwest::Client::new();
let url = env::var("AXIOM_URL").unwrap();
let token = env::var("AXIOM_TOKEN").expect("AXIOM_TOKEN environment variable to be set");
assert!(!token.is_empty(), "$AXIOM_TOKEN required");
let dataset = env::var("AXIOM_DATASET").unwrap();
let org_id = env::var("AXIOM_ORG_ID").unwrap();
let cx = SinkContext::default();
let config = AxiomConfig {
url: Some(url.clone()),
token: token.clone().into(),
dataset: dataset.clone(),
org_id: Some(org_id.clone()),
..Default::default()
};
let test_id = uuid::Uuid::new_v4().to_string();
let (sink, _) = config.build(cx).await.unwrap();
let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
event1.insert("host", "aws.cloud.eur");
event1.insert("source_type", "file");
event1.insert("test_id", test_id.clone());
let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
event2.insert("host", "aws.cloud.eur");
event2.insert("source_type", "file");
event2.insert("test_id", test_id.clone());
drop(batch);
let events = vec![Event::Log(event1), Event::Log(event2)];
run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
#[derive(Serialize)]
struct QueryRequest {
apl: String,
#[serde(rename = "endTime")]
end_time: DateTime<Utc>,
#[serde(rename = "startTime")]
start_time: DateTime<Utc>,
}
#[derive(Deserialize, Debug)]
struct QueryResponseMatch {
data: serde_json::Value,
}
#[derive(Deserialize, Debug)]
struct QueryResponse {
matches: Vec<QueryResponseMatch>,
}
let query_req = QueryRequest {
apl: format!(
"['{}'] | where test_id == '{}' | order by _time desc | limit 2",
dataset, test_id
),
start_time: Utc::now() - Duration::minutes(10),
end_time: Utc::now() + Duration::minutes(10),
};
let query_res: QueryResponse = client
.post(format!("{}/v1/datasets/_apl?format=legacy", url))
.header("X-Axiom-Org-Id", org_id)
.header("Authorization", format!("Bearer {}", token))
.json(&query_req)
.send()
.await
.unwrap()
.error_for_status()
.unwrap()
.json()
.await
.unwrap();
assert_eq!(2, query_res.matches.len());
let fst = match query_res.matches[0].data {
serde_json::Value::Object(ref obj) => obj,
_ => panic!("Unexpected value, expected object"),
};
assert_eq!("message_2", fst.get("message").unwrap().as_str().unwrap());
let snd = match query_res.matches[1].data {
serde_json::Value::Object(ref obj) => obj,
_ => panic!("Unexpected value, expected object"),
};
assert_eq!("message_1", snd.get("message").unwrap().as_str().unwrap());
}
}