vector/sinks/loki/
event.rs1use std::{collections::HashMap, io};
2
3use bytes::Bytes;
4use serde::{Serialize, ser::SerializeSeq};
5use vector_lib::config::telemetry;
6
7use crate::sinks::{prelude::*, util::encoding::Encoder};
8
9pub type Labels = Vec<(String, String)>;
10pub type StructuredMetadata = Vec<(String, String)>;
11
12#[derive(Clone)]
13pub enum LokiBatchEncoding {
14 Json,
15 Protobuf,
16}
17
18#[derive(Clone)]
19pub struct LokiBatchEncoder(pub LokiBatchEncoding);
20
21impl Encoder<Vec<LokiRecord>> for LokiBatchEncoder {
22 fn encode_input(
23 &self,
24 input: Vec<LokiRecord>,
25 writer: &mut dyn io::Write,
26 ) -> io::Result<(usize, GroupedCountByteSize)> {
27 let count = input.len();
28 let mut byte_size = telemetry().create_request_count_byte_size();
29 for event in &input {
30 byte_size.add_event(event, event.estimated_json_encoded_size_of());
31 }
32
33 let batch = LokiBatch::from(input);
34 let body = match self.0 {
35 LokiBatchEncoding::Json => {
36 let streams: Vec<LokiStream> = batch.stream_by_labels.into_values().collect();
37 let body = serde_json::json!({ "streams": streams });
38 serde_json::to_vec(&body)?
39 }
40 LokiBatchEncoding::Protobuf => {
41 let streams = batch.stream_by_labels.into_values();
42 let batch = loki_logproto::util::Batch(
43 streams
44 .map(|stream| {
45 let labels = stream.stream;
46 let entries = stream
47 .values
48 .iter()
49 .map(|event| {
50 loki_logproto::util::Entry(
51 event.timestamp,
52 String::from_utf8_lossy(&event.event).into_owned(),
53 event.structured_metadata.clone(),
54 )
55 })
56 .collect();
57 loki_logproto::util::Stream(labels, entries)
58 })
59 .collect(),
60 );
61 batch.encode()
62 }
63 };
64 write_all(writer, count, &body).map(|()| (body.len(), byte_size))
65 }
66}
67
68#[derive(Debug, Default, Serialize)]
69pub struct LokiBatch {
70 stream_by_labels: HashMap<String, LokiStream>,
71 #[serde(skip)]
72 finalizers: EventFinalizers,
73}
74
75#[derive(Debug, Default, Serialize)]
76pub struct LokiStream {
77 stream: HashMap<String, String>,
78 values: Vec<LokiEvent>,
79}
80
81impl From<Vec<LokiRecord>> for LokiBatch {
82 fn from(events: Vec<LokiRecord>) -> Self {
83 let mut result = events
84 .into_iter()
85 .fold(Self::default(), |mut res, mut item| {
86 res.finalizers.merge(item.take_finalizers());
87 item.labels.sort();
88 let labels: String = item
93 .labels
94 .iter()
95 .flat_map(|(a, b)| [a, b])
96 .map(|s| {
97 let mut escaped: String = s
98 .chars()
99 .map(|c| match c {
100 '\\' => "\\\\".to_string(),
101 ',' => "\\,".to_string(),
102 c => c.to_string(),
103 })
104 .collect();
105 escaped.push(',');
106 escaped
107 })
108 .collect();
109 if !res.stream_by_labels.contains_key(&labels) {
110 res.stream_by_labels.insert(
111 labels.clone(),
112 LokiStream {
113 stream: item.labels.into_iter().collect(),
114 values: Vec::new(),
115 },
116 );
117 }
118 let stream = res
119 .stream_by_labels
120 .get_mut(&labels)
121 .expect("stream must exist");
122 stream.values.push(item.event);
123 res
124 });
125 for (_k, stream) in result.stream_by_labels.iter_mut() {
126 stream.values.sort_by_key(|e| e.timestamp);
127 }
128 result
129 }
130}
131
132#[derive(Clone, Debug)]
133pub struct LokiEvent {
134 pub timestamp: i64,
135 pub event: Bytes,
136 pub structured_metadata: StructuredMetadata,
137}
138
139impl ByteSizeOf for LokiEvent {
140 fn allocated_bytes(&self) -> usize {
141 self.timestamp.allocated_bytes()
142 + self.event.allocated_bytes()
143 + self.structured_metadata.iter().fold(0, |res, item| {
144 res + item.0.allocated_bytes() + item.1.allocated_bytes()
145 })
146 }
147}
148
149impl Serialize for LokiEvent {
150 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
151 where
152 S: serde::Serializer,
153 {
154 let mut seq = serializer.serialize_seq(Some(3))?;
155 seq.serialize_element(&self.timestamp.to_string())?;
156 let event = String::from_utf8_lossy(&self.event);
157 seq.serialize_element(&event)?;
158 seq.serialize_element(
160 &self
161 .structured_metadata
162 .iter()
163 .cloned()
164 .collect::<HashMap<String, String>>(),
165 )?;
166 seq.end()
167 }
168}
169
170#[derive(Clone, Debug)]
171pub struct LokiRecord {
172 pub partition: PartitionKey,
173 pub labels: Labels,
174 pub event: LokiEvent,
175 pub json_byte_size: JsonSize,
176 pub finalizers: EventFinalizers,
177 pub event_count_tags: TaggedEventsSent,
178}
179
180impl ByteSizeOf for LokiRecord {
181 fn allocated_bytes(&self) -> usize {
182 self.partition.allocated_bytes()
183 + self.labels.iter().fold(0, |res, item| {
184 res + item.0.allocated_bytes() + item.1.allocated_bytes()
185 })
186 + self.event.allocated_bytes()
187 }
188}
189
190impl EstimatedJsonEncodedSizeOf for LokiRecord {
191 fn estimated_json_encoded_size_of(&self) -> JsonSize {
192 self.json_byte_size
193 }
194}
195
196impl EventCount for LokiRecord {
197 fn event_count(&self) -> usize {
198 1
200 }
201}
202
203impl Finalizable for LokiRecord {
204 fn take_finalizers(&mut self) -> EventFinalizers {
205 std::mem::take(&mut self.finalizers)
206 }
207}
208
209impl GetEventCountTags for LokiRecord {
210 fn get_tags(&self) -> TaggedEventsSent {
211 self.event_count_tags.clone()
212 }
213}
214
215#[derive(Hash, Eq, PartialEq, Clone, Debug)]
216pub struct PartitionKey {
217 pub tenant_id: Option<String>,
218}
219
220impl ByteSizeOf for PartitionKey {
221 fn allocated_bytes(&self) -> usize {
222 self.tenant_id
223 .as_ref()
224 .map(|value| value.allocated_bytes())
225 .unwrap_or(0)
226 }
227}