loki_logproto/
lib.rs

1#![allow(clippy::derive_partial_eq_without_eq)]
2
3pub mod stats {
4    include!(concat!(env!("OUT_DIR"), "/stats.rs"));
5}
6
7pub mod logproto {
8    include!(concat!(env!("OUT_DIR"), "/logproto.rs"));
9}
10
11pub mod util {
12    use super::logproto;
13    use prost::Message;
14    use std::collections::HashMap;
15
16    const NANOS_RANGE: i64 = 1_000_000_000;
17
18    // (<name>, <value>)
19    impl From<(String, String)> for logproto::LabelPairAdapter {
20        fn from(pair: (String, String)) -> Self {
21            logproto::LabelPairAdapter {
22                name: pair.0,
23                value: pair.1,
24            }
25        }
26    }
27
28    // (<Timestamp in nanos>, <Line>, <Structured metadata>)
29    pub struct Entry(pub i64, pub String, pub Vec<(String, String)>);
30
31    impl From<Entry> for logproto::EntryAdapter {
32        fn from(entry: Entry) -> Self {
33            let line = entry.1;
34            let structured_metadata: Vec<logproto::LabelPairAdapter> =
35                entry.2.into_iter().map(|entry| entry.into()).collect();
36
37            logproto::EntryAdapter {
38                timestamp: Some(prost_types::Timestamp {
39                    seconds: entry.0 / NANOS_RANGE,
40                    nanos: (entry.0 % NANOS_RANGE) as i32,
41                }),
42                line,
43                structured_metadata,
44                parsed: vec![], // TODO: Remove when Loki's proto doesn't require this in the
45                                // write-path anymore.
46            }
47        }
48    }
49
50    // (<Labels>, <Lines>)
51    pub struct Stream(pub HashMap<String, String>, pub Vec<Entry>);
52
53    impl From<Stream> for logproto::StreamAdapter {
54        fn from(batch: Stream) -> Self {
55            let labels = encode_labels_map_to_string(&batch.0);
56            let entries: Vec<logproto::EntryAdapter> =
57                batch.1.into_iter().map(|entry| entry.into()).collect();
58
59            logproto::StreamAdapter {
60                labels,
61                entries,
62                hash: 0,
63            }
64        }
65    }
66
67    pub struct Batch(pub Vec<Stream>);
68
69    impl Batch {
70        pub fn encode(self) -> Vec<u8> {
71            let streams: Vec<logproto::StreamAdapter> =
72                self.0.into_iter().map(|stream| stream.into()).collect();
73            let push_request = logproto::PushRequest { streams };
74            push_request.encode_to_vec()
75        }
76    }
77
78    const RESERVED_LABEL_TENANT_ID: &str = "__tenant_id__";
79    const RESERVED_LABELS: [&str; 1] = [RESERVED_LABEL_TENANT_ID];
80
81    // ref: https://github.com/grafana/loki/blob/65c6e254bd22151ab7fc84ec46e13eee2e354aa0/clients/pkg/promtail/client/batch.go#L61-L75
82    pub fn encode_labels_map_to_string(labels: &HashMap<String, String>) -> String {
83        let mut labels: Vec<String> = labels
84            .iter()
85            .filter(|(k, _)| !RESERVED_LABELS.contains(&k.as_str()))
86            .map(|(k, v)| format!("{k}=\"{v}\""))
87            .collect();
88        labels.sort();
89        format!("{{{}}}", labels.join(", "))
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use super::util;
96    use crate::util::{Batch, Entry, Stream};
97    use chrono::prelude::*;
98    use std::collections::HashMap;
99
100    #[test]
101    fn encode_labels() {
102        let mut labels: HashMap<String, String> = HashMap::new();
103        labels.insert("__tenant_id__".into(), "tenant_id".into());
104        labels.insert("agent".into(), "vector".into());
105        labels.insert("host".into(), "localhost".into());
106        labels.insert("file".into(), "/path/to/log".into());
107        labels.insert("job".into(), "file_logs".into());
108        let s = util::encode_labels_map_to_string(&labels);
109        assert_eq!(
110            s,
111            r#"{agent="vector", file="/path/to/log", host="localhost", job="file_logs"}"#
112        );
113    }
114
115    #[test]
116    fn encode_batch() {
117        let ts1 = Utc
118            .timestamp_opt(1640244790, 0)
119            .single()
120            .expect("invalid timestamp");
121        let entry1 = Entry(
122            ts1.timestamp_nanos_opt().expect("Timestamp out of range"),
123            "hello".into(),
124            vec![],
125        );
126        let ts2 = Utc
127            .timestamp_opt(1640244791, 0)
128            .single()
129            .expect("invalid timestamp");
130        let entry2 = Entry(
131            ts2.timestamp_nanos_opt().expect("Timestamp out of range"),
132            "world".into(),
133            vec![],
134        );
135        let labels = vec![("source".into(), "protobuf-test".into())]
136            .into_iter()
137            .collect();
138        let batch = Batch(vec![Stream(labels, vec![entry1, entry2])]);
139        // generated by test codes from promtail
140        let expect = vec![
141            10, 60, 10, 24, 123, 115, 111, 117, 114, 99, 101, 61, 34, 112, 114, 111, 116, 111, 98,
142            117, 102, 45, 116, 101, 115, 116, 34, 125, 18, 15, 10, 6, 8, 182, 204, 144, 142, 6, 18,
143            5, 104, 101, 108, 108, 111, 18, 15, 10, 6, 8, 183, 204, 144, 142, 6, 18, 5, 119, 111,
144            114, 108, 100,
145        ];
146        let buf = batch.encode();
147        assert_eq!(expect, buf);
148    }
149}