vector/sinks/loki/
config.rs1use std::collections::HashMap;
2
3use vrl::value::Kind;
4
5use super::{healthcheck::healthcheck, sink::LokiSink};
6use crate::{
7 http::{Auth, HttpClient, MaybeAuth},
8 schema,
9 sinks::{prelude::*, util::UriSerde},
10};
11
12const fn default_compression() -> Compression {
13 Compression::Snappy
14}
15
16fn default_loki_path() -> String {
17 "/loki/api/v1/push".to_string()
18}
19
20#[configurable_component(sink("loki", "Deliver log event data to the Loki aggregation system."))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct LokiConfig {
25 #[configurable(metadata(docs::examples = "http://localhost:3100"))]
29 pub endpoint: UriSerde,
30
31 #[serde(default = "default_loki_path")]
33 pub path: String,
34
35 #[configurable(derived)]
36 pub encoding: EncodingConfig,
37
38 #[configurable(metadata(
44 docs::examples = "some_tenant_id",
45 docs::examples = "{{ event_field }}",
46 ))]
47 pub tenant_id: Option<Template>,
48
49 #[configurable(metadata(docs::examples = "loki_labels_examples()"))]
62 #[configurable(metadata(docs::additional_props_description = "A Loki label."))]
63 pub labels: HashMap<Template, Template>,
64
65 #[serde(default = "crate::serde::default_false")]
67 pub remove_label_fields: bool,
68
69 #[configurable(metadata(docs::examples = "loki_structured_metadata_examples()"))]
78 #[configurable(metadata(docs::additional_props_description = "Loki structured metadata."))]
79 #[serde(default)]
80 pub structured_metadata: HashMap<Template, Template>,
81
82 #[serde(default = "crate::serde::default_false")]
84 pub remove_structured_metadata_fields: bool,
85
86 #[serde(default = "crate::serde::default_true")]
90 pub remove_timestamp: bool,
91
92 #[serde(default = "default_compression")]
95 pub compression: Compression,
96
97 #[configurable(derived)]
98 #[serde(default)]
99 pub out_of_order_action: OutOfOrderAction,
100
101 #[configurable(derived)]
102 pub auth: Option<Auth>,
103
104 #[configurable(derived)]
105 #[serde(default)]
106 pub request: TowerRequestConfig,
107
108 #[configurable(derived)]
109 #[serde(default)]
110 pub batch: BatchConfig<LokiDefaultBatchSettings>,
111
112 #[configurable(derived)]
113 pub tls: Option<TlsConfig>,
114
115 #[configurable(derived)]
116 #[serde(
117 default,
118 deserialize_with = "crate::serde::bool_or_struct",
119 skip_serializing_if = "crate::serde::is_default"
120 )]
121 acknowledgements: AcknowledgementsConfig,
122}
123
124fn loki_labels_examples() -> HashMap<String, String> {
125 let mut examples = HashMap::new();
126 examples.insert("source".to_string(), "vector".to_string());
127 examples.insert(
128 "\"pod_labels_*\"".to_string(),
129 "{{ kubernetes.pod_labels }}".to_string(),
130 );
131 examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
132 examples.insert(
133 "{{ event_field }}".to_string(),
134 "{{ some_other_event_field }}".to_string(),
135 );
136 examples
137}
138
139fn loki_structured_metadata_examples() -> HashMap<String, String> {
140 let mut examples = HashMap::new();
141 examples.insert("source".to_string(), "vector".to_string());
142 examples.insert(
143 "\"pod_labels_*\"".to_string(),
144 "{{ kubernetes.pod_labels }}".to_string(),
145 );
146 examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
147 examples.insert(
148 "{{ event_field }}".to_string(),
149 "{{ some_other_event_field }}".to_string(),
150 );
151 examples
152}
153
154#[derive(Clone, Copy, Debug, Default)]
155pub struct LokiDefaultBatchSettings;
156
157impl SinkBatchSettings for LokiDefaultBatchSettings {
158 const MAX_EVENTS: Option<usize> = Some(100_000);
159 const MAX_BYTES: Option<usize> = Some(1_000_000);
160 const TIMEOUT_SECS: f64 = 1.0;
161}
162
163#[configurable_component]
174#[derive(Copy, Clone, Debug, Derivative)]
175#[derivative(Default)]
176#[serde(rename_all = "snake_case")]
177pub enum OutOfOrderAction {
178 #[derivative(Default)]
184 Accept,
185
186 RewriteTimestamp,
188
189 Drop,
191}
192
193impl GenerateConfig for LokiConfig {
194 fn generate_config() -> toml::Value {
195 toml::from_str(
196 r#"endpoint = "http://localhost:3100"
197 encoding.codec = "json"
198 labels = {}"#,
199 )
200 .unwrap()
201 }
202}
203
204impl LokiConfig {
205 pub(super) fn build_client(&self, cx: SinkContext) -> crate::Result<HttpClient> {
206 let tls = TlsSettings::from_options(self.tls.as_ref())?;
207 let client = HttpClient::new(tls, cx.proxy())?;
208 Ok(client)
209 }
210}
211
212#[async_trait::async_trait]
213#[typetag::serde(name = "loki")]
214impl SinkConfig for LokiConfig {
215 async fn build(
216 &self,
217 cx: SinkContext,
218 ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
219 if self.labels.is_empty() {
220 return Err("`labels` must include at least one label.".into());
221 }
222
223 for label in self.labels.keys() {
224 if !valid_label_name(label) {
225 return Err(format!("Invalid label name {:?}", label.get_ref()).into());
226 }
227 }
228
229 let client = self.build_client(cx)?;
230
231 let config = LokiConfig {
232 auth: self.auth.choose_one(&self.endpoint.auth)?,
233 ..self.clone()
234 };
235
236 let sink = LokiSink::new(config.clone(), client.clone())?;
237
238 let healthcheck = healthcheck(config, client).boxed();
239
240 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
241 }
242
243 fn input(&self) -> Input {
244 let requirement =
245 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
246
247 Input::new(self.encoding.config().input_type() & DataType::Log)
248 .with_schema_requirement(requirement)
249 }
250
251 fn acknowledgements(&self) -> &AcknowledgementsConfig {
252 &self.acknowledgements
253 }
254}
255
256pub fn valid_label_name(label: &Template) -> bool {
257 label.is_dynamic() || {
258 let mut label_trim = label.get_ref().trim();
267 if let Some(without_opening_end) = label_trim.strip_suffix('*') {
268 label_trim = without_opening_end
269 }
270
271 let mut label_chars = label_trim.chars();
272 if let Some(ch) = label_chars.next() {
273 (ch.is_ascii_alphabetic() || ch == '_')
274 && label_chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
275 } else {
276 label.get_ref().trim() == "*"
277 }
278 }
279}
280
281#[cfg(test)]
282mod tests {
283 use std::convert::TryInto;
284
285 use super::valid_label_name;
286
287 #[test]
288 fn valid_label_names() {
289 assert!(valid_label_name(&"name".try_into().unwrap()));
290 assert!(valid_label_name(&" name ".try_into().unwrap()));
291 assert!(valid_label_name(&"bee_bop".try_into().unwrap()));
292 assert!(valid_label_name(&"a09b".try_into().unwrap()));
293 assert!(valid_label_name(&"abc_*".try_into().unwrap()));
294 assert!(valid_label_name(&"_*".try_into().unwrap()));
295 assert!(valid_label_name(&"*".try_into().unwrap()));
296
297 assert!(!valid_label_name(&"0ab".try_into().unwrap()));
298 assert!(!valid_label_name(&"".try_into().unwrap()));
299 assert!(!valid_label_name(&" ".try_into().unwrap()));
300
301 assert!(valid_label_name(&"{{field}}".try_into().unwrap()));
302 }
303}