vector/sinks/loki/
event.rs

1use std::{collections::HashMap, io};
2
3use crate::sinks::{prelude::*, util::encoding::Encoder};
4use bytes::Bytes;
5use serde::{ser::SerializeSeq, Serialize};
6use vector_lib::config::telemetry;
7
8pub type Labels = Vec<(String, String)>;
9pub type StructuredMetadata = Vec<(String, String)>;
10
11#[derive(Clone)]
12pub enum LokiBatchEncoding {
13    Json,
14    Protobuf,
15}
16
17#[derive(Clone)]
18pub struct LokiBatchEncoder(pub LokiBatchEncoding);
19
20impl Encoder<Vec<LokiRecord>> for LokiBatchEncoder {
21    fn encode_input(
22        &self,
23        input: Vec<LokiRecord>,
24        writer: &mut dyn io::Write,
25    ) -> io::Result<(usize, GroupedCountByteSize)> {
26        let count = input.len();
27        let mut byte_size = telemetry().create_request_count_byte_size();
28        for event in &input {
29            byte_size.add_event(event, event.estimated_json_encoded_size_of());
30        }
31
32        let batch = LokiBatch::from(input);
33        let body = match self.0 {
34            LokiBatchEncoding::Json => {
35                let streams: Vec<LokiStream> = batch.stream_by_labels.into_values().collect();
36                let body = serde_json::json!({ "streams": streams });
37                serde_json::to_vec(&body)?
38            }
39            LokiBatchEncoding::Protobuf => {
40                let streams = batch.stream_by_labels.into_values();
41                let batch = loki_logproto::util::Batch(
42                    streams
43                        .map(|stream| {
44                            let labels = stream.stream;
45                            let entries = stream
46                                .values
47                                .iter()
48                                .map(|event| {
49                                    loki_logproto::util::Entry(
50                                        event.timestamp,
51                                        String::from_utf8_lossy(&event.event).into_owned(),
52                                        event.structured_metadata.clone(),
53                                    )
54                                })
55                                .collect();
56                            loki_logproto::util::Stream(labels, entries)
57                        })
58                        .collect(),
59                );
60                batch.encode()
61            }
62        };
63        write_all(writer, count, &body).map(|()| (body.len(), byte_size))
64    }
65}
66
67#[derive(Debug, Default, Serialize)]
68pub struct LokiBatch {
69    stream_by_labels: HashMap<String, LokiStream>,
70    #[serde(skip)]
71    finalizers: EventFinalizers,
72}
73
74#[derive(Debug, Default, Serialize)]
75pub struct LokiStream {
76    stream: HashMap<String, String>,
77    values: Vec<LokiEvent>,
78}
79
80impl From<Vec<LokiRecord>> for LokiBatch {
81    fn from(events: Vec<LokiRecord>) -> Self {
82        let mut result = events
83            .into_iter()
84            .fold(Self::default(), |mut res, mut item| {
85                res.finalizers.merge(item.take_finalizers());
86                item.labels.sort();
87                // Convert a HashMap of keys and values into a string in the
88                // format "k1,v1,k2,v2,". If any of the keys or values contain
89                // a comma, it escapes the comma by adding a backslash before
90                // it (e.g. "val,ue" becomes "val\,ue").
91                let labels: String = item
92                    .labels
93                    .iter()
94                    .flat_map(|(a, b)| [a, b])
95                    .map(|s| {
96                        let mut escaped: String = s
97                            .chars()
98                            .map(|c| match c {
99                                '\\' => "\\\\".to_string(),
100                                ',' => "\\,".to_string(),
101                                c => c.to_string(),
102                            })
103                            .collect();
104                        escaped.push(',');
105                        escaped
106                    })
107                    .collect();
108                if !res.stream_by_labels.contains_key(&labels) {
109                    res.stream_by_labels.insert(
110                        labels.clone(),
111                        LokiStream {
112                            stream: item.labels.into_iter().collect(),
113                            values: Vec::new(),
114                        },
115                    );
116                }
117                let stream = res
118                    .stream_by_labels
119                    .get_mut(&labels)
120                    .expect("stream must exist");
121                stream.values.push(item.event);
122                res
123            });
124        for (_k, stream) in result.stream_by_labels.iter_mut() {
125            stream.values.sort_by_key(|e| e.timestamp);
126        }
127        result
128    }
129}
130
131#[derive(Clone, Debug)]
132pub struct LokiEvent {
133    pub timestamp: i64,
134    pub event: Bytes,
135    pub structured_metadata: StructuredMetadata,
136}
137
138impl ByteSizeOf for LokiEvent {
139    fn allocated_bytes(&self) -> usize {
140        self.timestamp.allocated_bytes()
141            + self.event.allocated_bytes()
142            + self.structured_metadata.iter().fold(0, |res, item| {
143                res + item.0.allocated_bytes() + item.1.allocated_bytes()
144            })
145    }
146}
147
148impl Serialize for LokiEvent {
149    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
150    where
151        S: serde::Serializer,
152    {
153        let mut seq = serializer.serialize_seq(Some(3))?;
154        seq.serialize_element(&self.timestamp.to_string())?;
155        let event = String::from_utf8_lossy(&self.event);
156        seq.serialize_element(&event)?;
157        // Convert structured_metadata into a map structure
158        seq.serialize_element(
159            &self
160                .structured_metadata
161                .iter()
162                .cloned()
163                .collect::<HashMap<String, String>>(),
164        )?;
165        seq.end()
166    }
167}
168
169#[derive(Clone, Debug)]
170pub struct LokiRecord {
171    pub partition: PartitionKey,
172    pub labels: Labels,
173    pub event: LokiEvent,
174    pub json_byte_size: JsonSize,
175    pub finalizers: EventFinalizers,
176    pub event_count_tags: TaggedEventsSent,
177}
178
179impl ByteSizeOf for LokiRecord {
180    fn allocated_bytes(&self) -> usize {
181        self.partition.allocated_bytes()
182            + self.labels.iter().fold(0, |res, item| {
183                res + item.0.allocated_bytes() + item.1.allocated_bytes()
184            })
185            + self.event.allocated_bytes()
186    }
187}
188
189impl EstimatedJsonEncodedSizeOf for LokiRecord {
190    fn estimated_json_encoded_size_of(&self) -> JsonSize {
191        self.json_byte_size
192    }
193}
194
195impl EventCount for LokiRecord {
196    fn event_count(&self) -> usize {
197        // A Loki record is mapped one-to-one with an event.
198        1
199    }
200}
201
202impl Finalizable for LokiRecord {
203    fn take_finalizers(&mut self) -> EventFinalizers {
204        std::mem::take(&mut self.finalizers)
205    }
206}
207
208impl GetEventCountTags for LokiRecord {
209    fn get_tags(&self) -> TaggedEventsSent {
210        self.event_count_tags.clone()
211    }
212}
213
214#[derive(Hash, Eq, PartialEq, Clone, Debug)]
215pub struct PartitionKey {
216    pub tenant_id: Option<String>,
217}
218
219impl ByteSizeOf for PartitionKey {
220    fn allocated_bytes(&self) -> usize {
221        self.tenant_id
222            .as_ref()
223            .map(|value| value.allocated_bytes())
224            .unwrap_or(0)
225    }
226}