vector/sinks/loki/
event.rs1use std::{collections::HashMap, io};
2
3use crate::sinks::{prelude::*, util::encoding::Encoder};
4use bytes::Bytes;
5use serde::{ser::SerializeSeq, Serialize};
6use vector_lib::config::telemetry;
7
8pub type Labels = Vec<(String, String)>;
9pub type StructuredMetadata = Vec<(String, String)>;
10
11#[derive(Clone)]
12pub enum LokiBatchEncoding {
13 Json,
14 Protobuf,
15}
16
17#[derive(Clone)]
18pub struct LokiBatchEncoder(pub LokiBatchEncoding);
19
20impl Encoder<Vec<LokiRecord>> for LokiBatchEncoder {
21 fn encode_input(
22 &self,
23 input: Vec<LokiRecord>,
24 writer: &mut dyn io::Write,
25 ) -> io::Result<(usize, GroupedCountByteSize)> {
26 let count = input.len();
27 let mut byte_size = telemetry().create_request_count_byte_size();
28 for event in &input {
29 byte_size.add_event(event, event.estimated_json_encoded_size_of());
30 }
31
32 let batch = LokiBatch::from(input);
33 let body = match self.0 {
34 LokiBatchEncoding::Json => {
35 let streams: Vec<LokiStream> = batch.stream_by_labels.into_values().collect();
36 let body = serde_json::json!({ "streams": streams });
37 serde_json::to_vec(&body)?
38 }
39 LokiBatchEncoding::Protobuf => {
40 let streams = batch.stream_by_labels.into_values();
41 let batch = loki_logproto::util::Batch(
42 streams
43 .map(|stream| {
44 let labels = stream.stream;
45 let entries = stream
46 .values
47 .iter()
48 .map(|event| {
49 loki_logproto::util::Entry(
50 event.timestamp,
51 String::from_utf8_lossy(&event.event).into_owned(),
52 event.structured_metadata.clone(),
53 )
54 })
55 .collect();
56 loki_logproto::util::Stream(labels, entries)
57 })
58 .collect(),
59 );
60 batch.encode()
61 }
62 };
63 write_all(writer, count, &body).map(|()| (body.len(), byte_size))
64 }
65}
66
67#[derive(Debug, Default, Serialize)]
68pub struct LokiBatch {
69 stream_by_labels: HashMap<String, LokiStream>,
70 #[serde(skip)]
71 finalizers: EventFinalizers,
72}
73
74#[derive(Debug, Default, Serialize)]
75pub struct LokiStream {
76 stream: HashMap<String, String>,
77 values: Vec<LokiEvent>,
78}
79
80impl From<Vec<LokiRecord>> for LokiBatch {
81 fn from(events: Vec<LokiRecord>) -> Self {
82 let mut result = events
83 .into_iter()
84 .fold(Self::default(), |mut res, mut item| {
85 res.finalizers.merge(item.take_finalizers());
86 item.labels.sort();
87 let labels: String = item
92 .labels
93 .iter()
94 .flat_map(|(a, b)| [a, b])
95 .map(|s| {
96 let mut escaped: String = s
97 .chars()
98 .map(|c| match c {
99 '\\' => "\\\\".to_string(),
100 ',' => "\\,".to_string(),
101 c => c.to_string(),
102 })
103 .collect();
104 escaped.push(',');
105 escaped
106 })
107 .collect();
108 if !res.stream_by_labels.contains_key(&labels) {
109 res.stream_by_labels.insert(
110 labels.clone(),
111 LokiStream {
112 stream: item.labels.into_iter().collect(),
113 values: Vec::new(),
114 },
115 );
116 }
117 let stream = res
118 .stream_by_labels
119 .get_mut(&labels)
120 .expect("stream must exist");
121 stream.values.push(item.event);
122 res
123 });
124 for (_k, stream) in result.stream_by_labels.iter_mut() {
125 stream.values.sort_by_key(|e| e.timestamp);
126 }
127 result
128 }
129}
130
131#[derive(Clone, Debug)]
132pub struct LokiEvent {
133 pub timestamp: i64,
134 pub event: Bytes,
135 pub structured_metadata: StructuredMetadata,
136}
137
138impl ByteSizeOf for LokiEvent {
139 fn allocated_bytes(&self) -> usize {
140 self.timestamp.allocated_bytes()
141 + self.event.allocated_bytes()
142 + self.structured_metadata.iter().fold(0, |res, item| {
143 res + item.0.allocated_bytes() + item.1.allocated_bytes()
144 })
145 }
146}
147
148impl Serialize for LokiEvent {
149 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
150 where
151 S: serde::Serializer,
152 {
153 let mut seq = serializer.serialize_seq(Some(3))?;
154 seq.serialize_element(&self.timestamp.to_string())?;
155 let event = String::from_utf8_lossy(&self.event);
156 seq.serialize_element(&event)?;
157 seq.serialize_element(
159 &self
160 .structured_metadata
161 .iter()
162 .cloned()
163 .collect::<HashMap<String, String>>(),
164 )?;
165 seq.end()
166 }
167}
168
169#[derive(Clone, Debug)]
170pub struct LokiRecord {
171 pub partition: PartitionKey,
172 pub labels: Labels,
173 pub event: LokiEvent,
174 pub json_byte_size: JsonSize,
175 pub finalizers: EventFinalizers,
176 pub event_count_tags: TaggedEventsSent,
177}
178
179impl ByteSizeOf for LokiRecord {
180 fn allocated_bytes(&self) -> usize {
181 self.partition.allocated_bytes()
182 + self.labels.iter().fold(0, |res, item| {
183 res + item.0.allocated_bytes() + item.1.allocated_bytes()
184 })
185 + self.event.allocated_bytes()
186 }
187}
188
189impl EstimatedJsonEncodedSizeOf for LokiRecord {
190 fn estimated_json_encoded_size_of(&self) -> JsonSize {
191 self.json_byte_size
192 }
193}
194
195impl EventCount for LokiRecord {
196 fn event_count(&self) -> usize {
197 1
199 }
200}
201
202impl Finalizable for LokiRecord {
203 fn take_finalizers(&mut self) -> EventFinalizers {
204 std::mem::take(&mut self.finalizers)
205 }
206}
207
208impl GetEventCountTags for LokiRecord {
209 fn get_tags(&self) -> TaggedEventsSent {
210 self.event_count_tags.clone()
211 }
212}
213
214#[derive(Hash, Eq, PartialEq, Clone, Debug)]
215pub struct PartitionKey {
216 pub tenant_id: Option<String>,
217}
218
219impl ByteSizeOf for PartitionKey {
220 fn allocated_bytes(&self) -> usize {
221 self.tenant_id
222 .as_ref()
223 .map(|value| value.allocated_bytes())
224 .unwrap_or(0)
225 }
226}