1#![allow(clippy::derive_partial_eq_without_eq)]
2
3pub mod stats {
4 include!(concat!(env!("OUT_DIR"), "/stats.rs"));
5}
6
7pub mod logproto {
8 include!(concat!(env!("OUT_DIR"), "/logproto.rs"));
9}
10
11pub mod util {
12 use super::logproto;
13 use prost::Message;
14 use std::collections::HashMap;
15
16 const NANOS_RANGE: i64 = 1_000_000_000;
17
18 impl From<(String, String)> for logproto::LabelPairAdapter {
20 fn from(pair: (String, String)) -> Self {
21 logproto::LabelPairAdapter {
22 name: pair.0,
23 value: pair.1,
24 }
25 }
26 }
27
28 pub struct Entry(pub i64, pub String, pub Vec<(String, String)>);
30
31 impl From<Entry> for logproto::EntryAdapter {
32 fn from(entry: Entry) -> Self {
33 let line = entry.1;
34 let structured_metadata: Vec<logproto::LabelPairAdapter> =
35 entry.2.into_iter().map(|entry| entry.into()).collect();
36
37 logproto::EntryAdapter {
38 timestamp: Some(prost_types::Timestamp {
39 seconds: entry.0 / NANOS_RANGE,
40 nanos: (entry.0 % NANOS_RANGE) as i32,
41 }),
42 line,
43 structured_metadata,
44 parsed: vec![], }
47 }
48 }
49
50 pub struct Stream(pub HashMap<String, String>, pub Vec<Entry>);
52
53 impl From<Stream> for logproto::StreamAdapter {
54 fn from(batch: Stream) -> Self {
55 let labels = encode_labels_map_to_string(&batch.0);
56 let entries: Vec<logproto::EntryAdapter> =
57 batch.1.into_iter().map(|entry| entry.into()).collect();
58
59 logproto::StreamAdapter {
60 labels,
61 entries,
62 hash: 0,
63 }
64 }
65 }
66
67 pub struct Batch(pub Vec<Stream>);
68
69 impl Batch {
70 pub fn encode(self) -> Vec<u8> {
71 let streams: Vec<logproto::StreamAdapter> =
72 self.0.into_iter().map(|stream| stream.into()).collect();
73 let push_request = logproto::PushRequest { streams };
74 push_request.encode_to_vec()
75 }
76 }
77
78 const RESERVED_LABEL_TENANT_ID: &str = "__tenant_id__";
79 const RESERVED_LABELS: [&str; 1] = [RESERVED_LABEL_TENANT_ID];
80
81 pub fn encode_labels_map_to_string(labels: &HashMap<String, String>) -> String {
83 let mut labels: Vec<String> = labels
84 .iter()
85 .filter(|(k, _)| !RESERVED_LABELS.contains(&k.as_str()))
86 .map(|(k, v)| format!("{k}=\"{v}\""))
87 .collect();
88 labels.sort();
89 format!("{{{}}}", labels.join(", "))
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use super::util;
96 use crate::util::{Batch, Entry, Stream};
97 use chrono::prelude::*;
98 use std::collections::HashMap;
99
100 #[test]
101 fn encode_labels() {
102 let mut labels: HashMap<String, String> = HashMap::new();
103 labels.insert("__tenant_id__".into(), "tenant_id".into());
104 labels.insert("agent".into(), "vector".into());
105 labels.insert("host".into(), "localhost".into());
106 labels.insert("file".into(), "/path/to/log".into());
107 labels.insert("job".into(), "file_logs".into());
108 let s = util::encode_labels_map_to_string(&labels);
109 assert_eq!(
110 s,
111 r#"{agent="vector", file="/path/to/log", host="localhost", job="file_logs"}"#
112 );
113 }
114
115 #[test]
116 fn encode_batch() {
117 let ts1 = Utc
118 .timestamp_opt(1640244790, 0)
119 .single()
120 .expect("invalid timestamp");
121 let entry1 = Entry(
122 ts1.timestamp_nanos_opt().expect("Timestamp out of range"),
123 "hello".into(),
124 vec![],
125 );
126 let ts2 = Utc
127 .timestamp_opt(1640244791, 0)
128 .single()
129 .expect("invalid timestamp");
130 let entry2 = Entry(
131 ts2.timestamp_nanos_opt().expect("Timestamp out of range"),
132 "world".into(),
133 vec![],
134 );
135 let labels = vec![("source".into(), "protobuf-test".into())]
136 .into_iter()
137 .collect();
138 let batch = Batch(vec![Stream(labels, vec![entry1, entry2])]);
139 let expect = vec![
141 10, 60, 10, 24, 123, 115, 111, 117, 114, 99, 101, 61, 34, 112, 114, 111, 116, 111, 98,
142 117, 102, 45, 116, 101, 115, 116, 34, 125, 18, 15, 10, 6, 8, 182, 204, 144, 142, 6, 18,
143 5, 104, 101, 108, 108, 111, 18, 15, 10, 6, 8, 183, 204, 144, 142, 6, 18, 5, 119, 111,
144 114, 108, 100,
145 ];
146 let buf = batch.encode();
147 assert_eq!(expect, buf);
148 }
149}