1#![allow(clippy::derive_partial_eq_without_eq)]
2
3pub mod stats {
4 include!(concat!(env!("OUT_DIR"), "/stats.rs"));
5}
6
7pub mod logproto {
8 include!(concat!(env!("OUT_DIR"), "/logproto.rs"));
9}
10
11pub mod util {
12 use std::collections::HashMap;
13
14 use prost::Message;
15
16 use super::logproto;
17
18 const NANOS_RANGE: i64 = 1_000_000_000;
19
20 impl From<(String, String)> for logproto::LabelPairAdapter {
22 fn from(pair: (String, String)) -> Self {
23 logproto::LabelPairAdapter {
24 name: pair.0,
25 value: pair.1,
26 }
27 }
28 }
29
30 pub struct Entry(pub i64, pub String, pub Vec<(String, String)>);
32
33 impl From<Entry> for logproto::EntryAdapter {
34 fn from(entry: Entry) -> Self {
35 let line = entry.1;
36 let structured_metadata: Vec<logproto::LabelPairAdapter> =
37 entry.2.into_iter().map(|entry| entry.into()).collect();
38
39 logproto::EntryAdapter {
40 timestamp: Some(prost_types::Timestamp {
41 seconds: entry.0 / NANOS_RANGE,
42 nanos: (entry.0 % NANOS_RANGE) as i32,
43 }),
44 line,
45 structured_metadata,
46 parsed: vec![], }
49 }
50 }
51
52 pub struct Stream(pub HashMap<String, String>, pub Vec<Entry>);
54
55 impl From<Stream> for logproto::StreamAdapter {
56 fn from(batch: Stream) -> Self {
57 let labels = encode_labels_map_to_string(&batch.0);
58 let entries: Vec<logproto::EntryAdapter> =
59 batch.1.into_iter().map(|entry| entry.into()).collect();
60
61 logproto::StreamAdapter {
62 labels,
63 entries,
64 hash: 0,
65 }
66 }
67 }
68
69 pub struct Batch(pub Vec<Stream>);
70
71 impl Batch {
72 pub fn encode(self) -> Vec<u8> {
73 let streams: Vec<logproto::StreamAdapter> =
74 self.0.into_iter().map(|stream| stream.into()).collect();
75 let push_request = logproto::PushRequest { streams };
76 push_request.encode_to_vec()
77 }
78 }
79
80 const RESERVED_LABEL_TENANT_ID: &str = "__tenant_id__";
81 const RESERVED_LABELS: [&str; 1] = [RESERVED_LABEL_TENANT_ID];
82
83 pub fn encode_labels_map_to_string(labels: &HashMap<String, String>) -> String {
85 let mut labels: Vec<String> = labels
86 .iter()
87 .filter(|(k, _)| !RESERVED_LABELS.contains(&k.as_str()))
88 .map(|(k, v)| format!("{k}=\"{v}\""))
89 .collect();
90 labels.sort();
91 format!("{{{}}}", labels.join(", "))
92 }
93}
94
95#[cfg(test)]
96mod tests {
97 use std::collections::HashMap;
98
99 use chrono::prelude::*;
100
101 use super::util;
102 use crate::util::{Batch, Entry, Stream};
103
104 #[test]
105 fn encode_labels() {
106 let mut labels: HashMap<String, String> = HashMap::new();
107 labels.insert("__tenant_id__".into(), "tenant_id".into());
108 labels.insert("agent".into(), "vector".into());
109 labels.insert("host".into(), "localhost".into());
110 labels.insert("file".into(), "/path/to/log".into());
111 labels.insert("job".into(), "file_logs".into());
112 let s = util::encode_labels_map_to_string(&labels);
113 assert_eq!(
114 s,
115 r#"{agent="vector", file="/path/to/log", host="localhost", job="file_logs"}"#
116 );
117 }
118
119 #[test]
120 fn encode_batch() {
121 let ts1 = Utc
122 .timestamp_opt(1640244790, 0)
123 .single()
124 .expect("invalid timestamp");
125 let entry1 = Entry(
126 ts1.timestamp_nanos_opt().expect("Timestamp out of range"),
127 "hello".into(),
128 vec![],
129 );
130 let ts2 = Utc
131 .timestamp_opt(1640244791, 0)
132 .single()
133 .expect("invalid timestamp");
134 let entry2 = Entry(
135 ts2.timestamp_nanos_opt().expect("Timestamp out of range"),
136 "world".into(),
137 vec![],
138 );
139 let labels = vec![("source".into(), "protobuf-test".into())]
140 .into_iter()
141 .collect();
142 let batch = Batch(vec![Stream(labels, vec![entry1, entry2])]);
143 let expect = vec![
145 10, 60, 10, 24, 123, 115, 111, 117, 114, 99, 101, 61, 34, 112, 114, 111, 116, 111, 98,
146 117, 102, 45, 116, 101, 115, 116, 34, 125, 18, 15, 10, 6, 8, 182, 204, 144, 142, 6, 18,
147 5, 104, 101, 108, 108, 111, 18, 15, 10, 6, 8, 183, 204, 144, 142, 6, 18, 5, 119, 111,
148 114, 108, 100,
149 ];
150 let buf = batch.encode();
151 assert_eq!(expect, buf);
152 }
153}