vector/sinks/loki/
config.rs1use std::collections::HashMap;
2
3use vrl::value::Kind;
4
5use super::{healthcheck::healthcheck, sink::LokiSink};
6use crate::{
7 http::{Auth, HttpClient, MaybeAuth},
8 schema,
9 sinks::{prelude::*, util::UriSerde},
10};
11
12const fn default_compression() -> Compression {
13 Compression::Snappy
14}
15
16fn default_loki_path() -> String {
17 "/loki/api/v1/push".to_string()
18}
19
20#[configurable_component(sink("loki", "Deliver log event data to the Loki aggregation system."))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct LokiConfig {
25 #[configurable(metadata(docs::examples = "http://localhost:3100"))]
29 pub endpoint: UriSerde,
30
31 #[serde(default = "default_loki_path")]
33 pub path: String,
34
35 #[configurable(derived)]
36 pub encoding: EncodingConfig,
37
38 #[configurable(metadata(
44 docs::examples = "some_tenant_id",
45 docs::examples = "{{ event_field }}",
46 ))]
47 pub tenant_id: Option<Template>,
48
49 #[configurable(metadata(docs::examples = "loki_labels_examples()"))]
62 #[configurable(metadata(docs::additional_props_description = "A Loki label."))]
63 pub labels: HashMap<Template, Template>,
64
65 #[serde(default = "crate::serde::default_false")]
67 pub remove_label_fields: bool,
68
69 #[configurable(metadata(docs::examples = "loki_structured_metadata_examples()"))]
78 #[configurable(metadata(docs::additional_props_description = "Loki structured metadata."))]
79 #[serde(default)]
80 pub structured_metadata: HashMap<Template, Template>,
81
82 #[serde(default = "crate::serde::default_false")]
84 pub remove_structured_metadata_fields: bool,
85
86 #[serde(default = "crate::serde::default_true")]
90 pub remove_timestamp: bool,
91
92 #[serde(default = "default_compression")]
95 pub compression: Compression,
96
97 #[configurable(derived)]
98 #[serde(default)]
99 pub out_of_order_action: OutOfOrderAction,
100
101 #[configurable(derived)]
102 pub auth: Option<Auth>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 pub request: TowerRequestConfig,
107
108 #[configurable(derived)]
109 #[serde(default)]
110 pub batch: BatchConfig<LokiDefaultBatchSettings>,
111
112 #[configurable(derived)]
113 pub tls: Option<TlsConfig>,
114
115 #[configurable(derived)]
116 #[serde(
117 default,
118 deserialize_with = "crate::serde::bool_or_struct",
119 skip_serializing_if = "crate::serde::is_default"
120 )]
121 acknowledgements: AcknowledgementsConfig,
122}
123
124fn loki_labels_examples() -> HashMap<String, String> {
125 let mut examples = HashMap::new();
126 examples.insert("source".to_string(), "vector".to_string());
127 examples.insert(
128 "\"pod_labels_*\"".to_string(),
129 "{{ kubernetes.pod_labels }}".to_string(),
130 );
131 examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
132 examples.insert(
133 "{{ event_field }}".to_string(),
134 "{{ some_other_event_field }}".to_string(),
135 );
136 examples
137}
138
139fn loki_structured_metadata_examples() -> HashMap<String, String> {
140 let mut examples = HashMap::new();
141 examples.insert("source".to_string(), "vector".to_string());
142 examples.insert(
143 "\"pod_labels_*\"".to_string(),
144 "{{ kubernetes.pod_labels }}".to_string(),
145 );
146 examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
147 examples.insert(
148 "{{ event_field }}".to_string(),
149 "{{ some_other_event_field }}".to_string(),
150 );
151 examples
152}
153
154#[derive(Clone, Copy, Debug, Default)]
155pub struct LokiDefaultBatchSettings;
156
157impl SinkBatchSettings for LokiDefaultBatchSettings {
158 const MAX_EVENTS: Option<usize> = Some(100_000);
159 const MAX_BYTES: Option<usize> = Some(1_000_000);
160 const TIMEOUT_SECS: f64 = 1.0;
161}
162
163#[configurable_component]
174#[derive(Copy, Clone, Debug, Default)]
175#[serde(rename_all = "snake_case")]
176pub enum OutOfOrderAction {
177 #[default]
183 Accept,
184
185 RewriteTimestamp,
187
188 Drop,
190}
191
192impl GenerateConfig for LokiConfig {
193 fn generate_config() -> toml::Value {
194 toml::from_str(
195 r#"endpoint = "http://localhost:3100"
196 encoding.codec = "json"
197 labels = {}"#,
198 )
199 .unwrap()
200 }
201}
202
203impl LokiConfig {
204 pub(super) fn build_client(&self, cx: SinkContext) -> crate::Result<HttpClient> {
205 let tls = TlsSettings::from_options(self.tls.as_ref())?;
206 let client = HttpClient::new(tls, cx.proxy())?;
207 Ok(client)
208 }
209}
210
211#[async_trait::async_trait]
212#[typetag::serde(name = "loki")]
213impl SinkConfig for LokiConfig {
214 async fn build(
215 &self,
216 cx: SinkContext,
217 ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
218 if self.labels.is_empty() {
219 return Err("`labels` must include at least one label.".into());
220 }
221
222 for label in self.labels.keys() {
223 if !valid_label_name(label) {
224 return Err(format!("Invalid label name {:?}", label.get_ref()).into());
225 }
226 }
227
228 let client = self.build_client(cx)?;
229
230 let config = LokiConfig {
231 auth: self.auth.choose_one(&self.endpoint.auth)?,
232 ..self.clone()
233 };
234
235 let sink = LokiSink::new(config.clone(), client.clone())?;
236
237 let healthcheck = healthcheck(config, client).boxed();
238
239 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
240 }
241
242 fn input(&self) -> Input {
243 let requirement =
244 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
245
246 Input::new(self.encoding.config().input_type() & DataType::Log)
247 .with_schema_requirement(requirement)
248 }
249
250 fn acknowledgements(&self) -> &AcknowledgementsConfig {
251 &self.acknowledgements
252 }
253}
254
255pub fn valid_label_name(label: &Template) -> bool {
256 label.is_dynamic() || {
257 let mut label_trim = label.get_ref().trim();
266 if let Some(without_opening_end) = label_trim.strip_suffix('*') {
267 label_trim = without_opening_end
268 }
269
270 let mut label_chars = label_trim.chars();
271 if let Some(ch) = label_chars.next() {
272 (ch.is_ascii_alphabetic() || ch == '_')
273 && label_chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
274 } else {
275 label.get_ref().trim() == "*"
276 }
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use std::convert::TryInto;
283
284 use super::valid_label_name;
285
286 #[test]
287 fn valid_label_names() {
288 assert!(valid_label_name(&"name".try_into().unwrap()));
289 assert!(valid_label_name(&" name ".try_into().unwrap()));
290 assert!(valid_label_name(&"bee_bop".try_into().unwrap()));
291 assert!(valid_label_name(&"a09b".try_into().unwrap()));
292 assert!(valid_label_name(&"abc_*".try_into().unwrap()));
293 assert!(valid_label_name(&"_*".try_into().unwrap()));
294 assert!(valid_label_name(&"*".try_into().unwrap()));
295
296 assert!(!valid_label_name(&"0ab".try_into().unwrap()));
297 assert!(!valid_label_name(&"".try_into().unwrap()));
298 assert!(!valid_label_name(&" ".try_into().unwrap()));
299
300 assert!(valid_label_name(&"{{field}}".try_into().unwrap()));
301 }
302}