vector/sinks/loki/
config.rs

1use std::collections::HashMap;
2
3use vrl::value::Kind;
4
5use super::{healthcheck::healthcheck, sink::LokiSink};
6use crate::{
7    http::{Auth, HttpClient, MaybeAuth},
8    schema,
9    sinks::{prelude::*, util::UriSerde},
10};
11
12const fn default_compression() -> Compression {
13    Compression::Snappy
14}
15
16fn default_loki_path() -> String {
17    "/loki/api/v1/push".to_string()
18}
19
20/// Configuration for the `loki` sink.
21#[configurable_component(sink("loki", "Deliver log event data to the Loki aggregation system."))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct LokiConfig {
25    /// The base URL of the Loki instance.
26    ///
27    /// The `path` value is appended to this.
28    #[configurable(metadata(docs::examples = "http://localhost:3100"))]
29    pub endpoint: UriSerde,
30
31    /// The path to use in the URL of the Loki instance.
32    #[serde(default = "default_loki_path")]
33    pub path: String,
34
35    #[configurable(derived)]
36    pub encoding: EncodingConfig,
37
38    /// The [tenant ID][tenant_id] to specify in requests to Loki.
39    ///
40    /// When running Loki locally, a tenant ID is not required.
41    ///
42    /// [tenant_id]: https://grafana.com/docs/loki/latest/operations/multi-tenancy/
43    #[configurable(metadata(
44        docs::examples = "some_tenant_id",
45        docs::examples = "{{ event_field }}",
46    ))]
47    pub tenant_id: Option<Template>,
48
49    /// A set of labels that are attached to each batch of events.
50    ///
51    /// Both keys and values are templateable, which enables you to attach dynamic labels to events.
52    ///
53    /// Valid label keys include `*`, and prefixes ending with `*`, to allow for the expansion of
54    /// objects into multiple labels. See [Label expansion][label_expansion] for more information.
55    ///
56    /// Note: If the set of labels has high cardinality, this can cause drastic performance issues
57    /// with Loki. To prevent this from happening, reduce the number of unique label keys and
58    /// values.
59    ///
60    /// [label_expansion]: https://vector.dev/docs/reference/configuration/sinks/loki/#label-expansion
61    #[configurable(metadata(docs::examples = "loki_labels_examples()"))]
62    #[configurable(metadata(docs::additional_props_description = "A Loki label."))]
63    pub labels: HashMap<Template, Template>,
64
65    /// Whether or not to delete fields from the event when they are used as labels.
66    #[serde(default = "crate::serde::default_false")]
67    pub remove_label_fields: bool,
68
69    /// Structured metadata that is attached to each batch of events.
70    ///
71    /// Both keys and values are templateable, which enables you to attach dynamic structured metadata to events.
72    ///
73    /// Valid metadata keys include `*`, and prefixes ending with `*`, to allow for the expansion of
74    /// objects into multiple metadata entries. This follows the same logic as [Label expansion][label_expansion].
75    ///
76    /// [label_expansion]: https://vector.dev/docs/reference/configuration/sinks/loki/#label-expansion
77    #[configurable(metadata(docs::examples = "loki_structured_metadata_examples()"))]
78    #[configurable(metadata(docs::additional_props_description = "Loki structured metadata."))]
79    #[serde(default)]
80    pub structured_metadata: HashMap<Template, Template>,
81
82    /// Whether or not to delete fields from the event when they are used in structured metadata.
83    #[serde(default = "crate::serde::default_false")]
84    pub remove_structured_metadata_fields: bool,
85
86    /// Whether or not to remove the timestamp from the event payload.
87    ///
88    /// The timestamp is still sent as event metadata for Loki to use for indexing.
89    #[serde(default = "crate::serde::default_true")]
90    pub remove_timestamp: bool,
91
92    /// Compression configuration.
93    /// Snappy compression implies sending push requests as Protocol Buffers.
94    #[serde(default = "default_compression")]
95    pub compression: Compression,
96
97    #[configurable(derived)]
98    #[serde(default)]
99    pub out_of_order_action: OutOfOrderAction,
100
101    #[configurable(derived)]
102    pub auth: Option<Auth>,
103
104    #[configurable(derived)]
105    #[serde(default)]
106    pub request: TowerRequestConfig,
107
108    #[configurable(derived)]
109    #[serde(default)]
110    pub batch: BatchConfig<LokiDefaultBatchSettings>,
111
112    #[configurable(derived)]
113    pub tls: Option<TlsConfig>,
114
115    #[configurable(derived)]
116    #[serde(
117        default,
118        deserialize_with = "crate::serde::bool_or_struct",
119        skip_serializing_if = "crate::serde::is_default"
120    )]
121    acknowledgements: AcknowledgementsConfig,
122}
123
124fn loki_labels_examples() -> HashMap<String, String> {
125    let mut examples = HashMap::new();
126    examples.insert("source".to_string(), "vector".to_string());
127    examples.insert(
128        "\"pod_labels_*\"".to_string(),
129        "{{ kubernetes.pod_labels }}".to_string(),
130    );
131    examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
132    examples.insert(
133        "{{ event_field }}".to_string(),
134        "{{ some_other_event_field }}".to_string(),
135    );
136    examples
137}
138
139fn loki_structured_metadata_examples() -> HashMap<String, String> {
140    let mut examples = HashMap::new();
141    examples.insert("source".to_string(), "vector".to_string());
142    examples.insert(
143        "\"pod_labels_*\"".to_string(),
144        "{{ kubernetes.pod_labels }}".to_string(),
145    );
146    examples.insert("\"*\"".to_string(), "{{ metadata }}".to_string());
147    examples.insert(
148        "{{ event_field }}".to_string(),
149        "{{ some_other_event_field }}".to_string(),
150    );
151    examples
152}
153
154#[derive(Clone, Copy, Debug, Default)]
155pub struct LokiDefaultBatchSettings;
156
157impl SinkBatchSettings for LokiDefaultBatchSettings {
158    const MAX_EVENTS: Option<usize> = Some(100_000);
159    const MAX_BYTES: Option<usize> = Some(1_000_000);
160    const TIMEOUT_SECS: f64 = 1.0;
161}
162
163/// Out-of-order event behavior.
164///
165/// Some sources may generate events with timestamps that aren't in chronological order. Even though the
166/// sink sorts the events before sending them to Loki, there is a chance that another event could come in
167/// that is out of order with the latest events sent to Loki. Prior to Loki 2.4.0, this
168/// was not supported and would result in an error during the push request.
169///
170/// If you're using Loki 2.4.0 or newer, `Accept` is the preferred action, which lets Loki handle
171/// any necessary sorting/reordering. If you're using an earlier version, then you must use `Drop`
172/// or `RewriteTimestamp` depending on which option makes the most sense for your use case.
173#[configurable_component]
174#[derive(Copy, Clone, Debug, Derivative)]
175#[derivative(Default)]
176#[serde(rename_all = "snake_case")]
177pub enum OutOfOrderAction {
178    /// Accept the event.
179    ///
180    /// The event is not dropped and is sent without modification.
181    ///
182    /// Requires Loki 2.4.0 or newer.
183    #[derivative(Default)]
184    Accept,
185
186    /// Rewrite the timestamp of the event to the timestamp of the latest event seen by the sink.
187    RewriteTimestamp,
188
189    /// Drop the event.
190    Drop,
191}
192
193impl GenerateConfig for LokiConfig {
194    fn generate_config() -> toml::Value {
195        toml::from_str(
196            r#"endpoint = "http://localhost:3100"
197            encoding.codec = "json"
198            labels = {}"#,
199        )
200        .unwrap()
201    }
202}
203
204impl LokiConfig {
205    pub(super) fn build_client(&self, cx: SinkContext) -> crate::Result<HttpClient> {
206        let tls = TlsSettings::from_options(self.tls.as_ref())?;
207        let client = HttpClient::new(tls, cx.proxy())?;
208        Ok(client)
209    }
210}
211
212#[async_trait::async_trait]
213#[typetag::serde(name = "loki")]
214impl SinkConfig for LokiConfig {
215    async fn build(
216        &self,
217        cx: SinkContext,
218    ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
219        if self.labels.is_empty() {
220            return Err("`labels` must include at least one label.".into());
221        }
222
223        for label in self.labels.keys() {
224            if !valid_label_name(label) {
225                return Err(format!("Invalid label name {:?}", label.get_ref()).into());
226            }
227        }
228
229        let client = self.build_client(cx)?;
230
231        let config = LokiConfig {
232            auth: self.auth.choose_one(&self.endpoint.auth)?,
233            ..self.clone()
234        };
235
236        let sink = LokiSink::new(config.clone(), client.clone())?;
237
238        let healthcheck = healthcheck(config, client).boxed();
239
240        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
241    }
242
243    fn input(&self) -> Input {
244        let requirement =
245            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
246
247        Input::new(self.encoding.config().input_type() & DataType::Log)
248            .with_schema_requirement(requirement)
249    }
250
251    fn acknowledgements(&self) -> &AcknowledgementsConfig {
252        &self.acknowledgements
253    }
254}
255
256pub fn valid_label_name(label: &Template) -> bool {
257    label.is_dynamic() || {
258        // Loki follows prometheus on this https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
259        // Although that isn't explicitly said anywhere besides what's in the code.
260        // The closest mention is in section about Parser Expression https://grafana.com/docs/loki/latest/logql/
261        //
262        // [a-zA-Z_][a-zA-Z0-9_]*
263        //
264        // '*' symbol at the end of the label name will be treated as a prefix for
265        // underlying object keys.
266        let mut label_trim = label.get_ref().trim();
267        if let Some(without_opening_end) = label_trim.strip_suffix('*') {
268            label_trim = without_opening_end
269        }
270
271        let mut label_chars = label_trim.chars();
272        if let Some(ch) = label_chars.next() {
273            (ch.is_ascii_alphabetic() || ch == '_')
274                && label_chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_')
275        } else {
276            label.get_ref().trim() == "*"
277        }
278    }
279}
280
281#[cfg(test)]
282mod tests {
283    use std::convert::TryInto;
284
285    use super::valid_label_name;
286
287    #[test]
288    fn valid_label_names() {
289        assert!(valid_label_name(&"name".try_into().unwrap()));
290        assert!(valid_label_name(&" name ".try_into().unwrap()));
291        assert!(valid_label_name(&"bee_bop".try_into().unwrap()));
292        assert!(valid_label_name(&"a09b".try_into().unwrap()));
293        assert!(valid_label_name(&"abc_*".try_into().unwrap()));
294        assert!(valid_label_name(&"_*".try_into().unwrap()));
295        assert!(valid_label_name(&"*".try_into().unwrap()));
296
297        assert!(!valid_label_name(&"0ab".try_into().unwrap()));
298        assert!(!valid_label_name(&"".try_into().unwrap()));
299        assert!(!valid_label_name(&" ".try_into().unwrap()));
300
301        assert!(valid_label_name(&"{{field}}".try_into().unwrap()));
302    }
303}