loki_logproto/
lib.rs

1#![allow(clippy::derive_partial_eq_without_eq)]
2
3pub mod stats {
4    include!(concat!(env!("OUT_DIR"), "/stats.rs"));
5}
6
7pub mod logproto {
8    include!(concat!(env!("OUT_DIR"), "/logproto.rs"));
9}
10
11pub mod util {
12    use std::collections::HashMap;
13
14    use prost::Message;
15
16    use super::logproto;
17
18    const NANOS_RANGE: i64 = 1_000_000_000;
19
20    // (<name>, <value>)
21    impl From<(String, String)> for logproto::LabelPairAdapter {
22        fn from(pair: (String, String)) -> Self {
23            logproto::LabelPairAdapter {
24                name: pair.0,
25                value: pair.1,
26            }
27        }
28    }
29
30    // (<Timestamp in nanos>, <Line>, <Structured metadata>)
31    pub struct Entry(pub i64, pub String, pub Vec<(String, String)>);
32
33    impl From<Entry> for logproto::EntryAdapter {
34        fn from(entry: Entry) -> Self {
35            let line = entry.1;
36            let structured_metadata: Vec<logproto::LabelPairAdapter> =
37                entry.2.into_iter().map(|entry| entry.into()).collect();
38
39            logproto::EntryAdapter {
40                timestamp: Some(prost_types::Timestamp {
41                    seconds: entry.0 / NANOS_RANGE,
42                    nanos: (entry.0 % NANOS_RANGE) as i32,
43                }),
44                line,
45                structured_metadata,
46                parsed: vec![], // TODO: Remove when Loki's proto doesn't require this in the
47                                // write-path anymore.
48            }
49        }
50    }
51
52    // (<Labels>, <Lines>)
53    pub struct Stream(pub HashMap<String, String>, pub Vec<Entry>);
54
55    impl From<Stream> for logproto::StreamAdapter {
56        fn from(batch: Stream) -> Self {
57            let labels = encode_labels_map_to_string(&batch.0);
58            let entries: Vec<logproto::EntryAdapter> =
59                batch.1.into_iter().map(|entry| entry.into()).collect();
60
61            logproto::StreamAdapter {
62                labels,
63                entries,
64                hash: 0,
65            }
66        }
67    }
68
69    pub struct Batch(pub Vec<Stream>);
70
71    impl Batch {
72        pub fn encode(self) -> Vec<u8> {
73            let streams: Vec<logproto::StreamAdapter> =
74                self.0.into_iter().map(|stream| stream.into()).collect();
75            let push_request = logproto::PushRequest { streams };
76            push_request.encode_to_vec()
77        }
78    }
79
80    const RESERVED_LABEL_TENANT_ID: &str = "__tenant_id__";
81    const RESERVED_LABELS: [&str; 1] = [RESERVED_LABEL_TENANT_ID];
82
83    // ref: https://github.com/grafana/loki/blob/65c6e254bd22151ab7fc84ec46e13eee2e354aa0/clients/pkg/promtail/client/batch.go#L61-L75
84    pub fn encode_labels_map_to_string(labels: &HashMap<String, String>) -> String {
85        let mut labels: Vec<String> = labels
86            .iter()
87            .filter(|(k, _)| !RESERVED_LABELS.contains(&k.as_str()))
88            .map(|(k, v)| format!("{k}=\"{v}\""))
89            .collect();
90        labels.sort();
91        format!("{{{}}}", labels.join(", "))
92    }
93}
94
95#[cfg(test)]
96mod tests {
97    use std::collections::HashMap;
98
99    use chrono::prelude::*;
100
101    use super::util;
102    use crate::util::{Batch, Entry, Stream};
103
104    #[test]
105    fn encode_labels() {
106        let mut labels: HashMap<String, String> = HashMap::new();
107        labels.insert("__tenant_id__".into(), "tenant_id".into());
108        labels.insert("agent".into(), "vector".into());
109        labels.insert("host".into(), "localhost".into());
110        labels.insert("file".into(), "/path/to/log".into());
111        labels.insert("job".into(), "file_logs".into());
112        let s = util::encode_labels_map_to_string(&labels);
113        assert_eq!(
114            s,
115            r#"{agent="vector", file="/path/to/log", host="localhost", job="file_logs"}"#
116        );
117    }
118
119    #[test]
120    fn encode_batch() {
121        let ts1 = Utc
122            .timestamp_opt(1640244790, 0)
123            .single()
124            .expect("invalid timestamp");
125        let entry1 = Entry(
126            ts1.timestamp_nanos_opt().expect("Timestamp out of range"),
127            "hello".into(),
128            vec![],
129        );
130        let ts2 = Utc
131            .timestamp_opt(1640244791, 0)
132            .single()
133            .expect("invalid timestamp");
134        let entry2 = Entry(
135            ts2.timestamp_nanos_opt().expect("Timestamp out of range"),
136            "world".into(),
137            vec![],
138        );
139        let labels = vec![("source".into(), "protobuf-test".into())]
140            .into_iter()
141            .collect();
142        let batch = Batch(vec![Stream(labels, vec![entry1, entry2])]);
143        // generated by test codes from promtail
144        let expect = vec![
145            10, 60, 10, 24, 123, 115, 111, 117, 114, 99, 101, 61, 34, 112, 114, 111, 116, 111, 98,
146            117, 102, 45, 116, 101, 115, 116, 34, 125, 18, 15, 10, 6, 8, 182, 204, 144, 142, 6, 18,
147            5, 104, 101, 108, 108, 111, 18, 15, 10, 6, 8, 183, 204, 144, 142, 6, 18, 5, 119, 111,
148            114, 108, 100,
149        ];
150        let buf = batch.encode();
151        assert_eq!(expect, buf);
152    }
153}