use std::collections::HashMap;
use vrl::value::Kind;
use super::{healthcheck::healthcheck, sink::LokiSink};
use crate::{
http::{Auth, HttpClient, MaybeAuth},
schema,
sinks::{prelude::*, util::UriSerde},
};
const fn default_compression() -> Compression {
Compression::Snappy
}
fn default_loki_path() -> String {
"/loki/api/v1/push".to_string()
}
#[configurable_component(sink("loki", "Deliver log event data to the Loki aggregation system."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct LokiConfig {
#[configurable(metadata(docs::examples = "http://localhost:3100"))]
pub endpoint: UriSerde,
#[serde(default = "default_loki_path")]
pub path: String,
#[configurable(derived)]
pub encoding: EncodingConfig,
#[configurable(metadata(
docs::examples = "some_tenant_id",
docs::examples = "{{ event_field }}",
))]
pub tenant_id: Option<Template>,
#[configurable(metadata(docs::examples = "loki_labels_examples()"))]
#[configurable(metadata(docs::additional_props_description = "A Loki label."))]
pub labels: HashMap<Template, Template>,
#[serde(default = "crate::serde::default_false")]
pub remove_label_fields: bool,
#[configurable(metadata(docs::examples = "loki_structured_metadata_examples()"))]
#[configurable(metadata(docs::additional_props_description = "Loki structured metadata."))]
#[serde(default)]
pub structured_metadata: HashMap<Template, Template>,
#[serde(default = "crate::serde::default_false")]
pub remove_structured_metadata_fields: bool,
#[serde(default = "crate::serde::default_true")]
pub remove_timestamp: bool,
#[serde(default = "default_compression")]
pub compression: Compression,
#[configurable(derived)]
#[serde(default)]
pub out_of_order_action: OutOfOrderAction,
#[configurable(derived)]
pub auth: Option<Auth>,
#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<LokiDefaultBatchSettings>,
#[configurable(derived)]
pub tls: Option<TlsConfig>,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,
}
fn loki_labels_examples() -> HashMap<String, String> {
let mut examples = HashMap::new();
examples.insert("source".to_string(), "vector".to_string());
examples.insert(
"\"pod_labels_*\"".to_string(),
"{{ kubernetes.pod_labels }}".to_string(),
);
examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
examples.insert(
"{{ event_field }}".to_string(),
"{{ some_other_event_field }}".to_string(),
);
examples
}
fn loki_structured_metadata_examples() -> HashMap<String, String> {
let mut examples = HashMap::new();
examples.insert("source".to_string(), "vector".to_string());
examples.insert(
"\"pod_labels_*\"".to_string(),
"{{ kubernetes.pod_labels }}".to_string(),
);
examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
examples.insert(
"{{ event_field }}".to_string(),
"{{ some_other_event_field }}".to_string(),
);
examples
}
#[derive(Clone, Copy, Debug, Default)]
pub struct LokiDefaultBatchSettings;
impl SinkBatchSettings for LokiDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(100_000);
const MAX_BYTES: Option<usize> = Some(1_000_000);
const TIMEOUT_SECS: f64 = 1.0;
}
#[configurable_component]
#[derive(Copy, Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(rename_all = "snake_case")]
pub enum OutOfOrderAction {
#[derivative(Default)]
Accept,
RewriteTimestamp,
Drop,
}
impl GenerateConfig for LokiConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"endpoint = "http://localhost:3100"
encoding.codec = "json"
labels = {}"#,
)
.unwrap()
}
}
impl LokiConfig {
pub(super) fn build_client(&self, cx: SinkContext) -> crate::Result<HttpClient> {
let tls = TlsSettings::from_options(&self.tls)?;
let client = HttpClient::new(tls, cx.proxy())?;
Ok(client)
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "loki")]
impl SinkConfig for LokiConfig {
async fn build(
&self,
cx: SinkContext,
) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
if self.labels.is_empty() {
return Err("`labels` must include at least one label.".into());
}
for label in self.labels.keys() {
if !valid_label_name(label) {
return Err(format!("Invalid label name {:?}", label.get_ref()).into());
}
}
let client = self.build_client(cx)?;
let config = LokiConfig {
auth: self.auth.choose_one(&self.endpoint.auth)?,
..self.clone()
};
let sink = LokiSink::new(config.clone(), client.clone())?;
let healthcheck = healthcheck(config, client).boxed();
Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}
fn input(&self) -> Input {
let requirement =
schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
Input::new(self.encoding.config().input_type() & DataType::Log)
.with_schema_requirement(requirement)
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
pub fn valid_label_name(label: &Template) -> bool {
label.is_dynamic() || {
let mut label_trim = label.get_ref().trim();
if let Some(without_opening_end) = label_trim.strip_suffix('*') {
label_trim = without_opening_end
}
let mut label_chars = label_trim.chars();
if let Some(ch) = label_chars.next() {
(ch.is_ascii_alphabetic() || ch == '_')
&& label_chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
} else {
label.get_ref().trim() == "*"
}
}
}
#[cfg(test)]
mod tests {
use std::convert::TryInto;
use super::valid_label_name;
#[test]
fn valid_label_names() {
assert!(valid_label_name(&"name".try_into().unwrap()));
assert!(valid_label_name(&" name ".try_into().unwrap()));
assert!(valid_label_name(&"bee_bop".try_into().unwrap()));
assert!(valid_label_name(&"a09b".try_into().unwrap()));
assert!(valid_label_name(&"abc_*".try_into().unwrap()));
assert!(valid_label_name(&"_*".try_into().unwrap()));
assert!(valid_label_name(&"*".try_into().unwrap()));
assert!(!valid_label_name(&"0ab".try_into().unwrap()));
assert!(!valid_label_name(&"".try_into().unwrap()));
assert!(!valid_label_name(&" ".try_into().unwrap()));
assert!(valid_label_name(&"{{field}}".try_into().unwrap()));
}
}