vector/sinks/loki/
event.rs

1use std::{collections::HashMap, io};
2
3use bytes::Bytes;
4use serde::{Serialize, ser::SerializeSeq};
5use vector_lib::config::telemetry;
6
7use crate::sinks::{prelude::*, util::encoding::Encoder};
8
9pub type Labels = Vec<(String, String)>;
10pub type StructuredMetadata = Vec<(String, String)>;
11
12#[derive(Clone)]
13pub enum LokiBatchEncoding {
14    Json,
15    Protobuf,
16}
17
18#[derive(Clone)]
19pub struct LokiBatchEncoder(pub LokiBatchEncoding);
20
21impl Encoder<Vec<LokiRecord>> for LokiBatchEncoder {
22    fn encode_input(
23        &self,
24        input: Vec<LokiRecord>,
25        writer: &mut dyn io::Write,
26    ) -> io::Result<(usize, GroupedCountByteSize)> {
27        let count = input.len();
28        let mut byte_size = telemetry().create_request_count_byte_size();
29        for event in &input {
30            byte_size.add_event(event, event.estimated_json_encoded_size_of());
31        }
32
33        let batch = LokiBatch::from(input);
34        let body = match self.0 {
35            LokiBatchEncoding::Json => {
36                let streams: Vec<LokiStream> = batch.stream_by_labels.into_values().collect();
37                let body = serde_json::json!({ "streams": streams });
38                serde_json::to_vec(&body)?
39            }
40            LokiBatchEncoding::Protobuf => {
41                let streams = batch.stream_by_labels.into_values();
42                let batch = loki_logproto::util::Batch(
43                    streams
44                        .map(|stream| {
45                            let labels = stream.stream;
46                            let entries = stream
47                                .values
48                                .iter()
49                                .map(|event| {
50                                    loki_logproto::util::Entry(
51                                        event.timestamp,
52                                        String::from_utf8_lossy(&event.event).into_owned(),
53                                        event.structured_metadata.clone(),
54                                    )
55                                })
56                                .collect();
57                            loki_logproto::util::Stream(labels, entries)
58                        })
59                        .collect(),
60                );
61                batch.encode()
62            }
63        };
64        write_all(writer, count, &body).map(|()| (body.len(), byte_size))
65    }
66}
67
68#[derive(Debug, Default, Serialize)]
69pub struct LokiBatch {
70    stream_by_labels: HashMap<String, LokiStream>,
71    #[serde(skip)]
72    finalizers: EventFinalizers,
73}
74
75#[derive(Debug, Default, Serialize)]
76pub struct LokiStream {
77    stream: HashMap<String, String>,
78    values: Vec<LokiEvent>,
79}
80
81impl From<Vec<LokiRecord>> for LokiBatch {
82    fn from(events: Vec<LokiRecord>) -> Self {
83        let mut result = events
84            .into_iter()
85            .fold(Self::default(), |mut res, mut item| {
86                res.finalizers.merge(item.take_finalizers());
87                item.labels.sort();
88                // Convert a HashMap of keys and values into a string in the
89                // format "k1,v1,k2,v2,". If any of the keys or values contain
90                // a comma, it escapes the comma by adding a backslash before
91                // it (e.g. "val,ue" becomes "val\,ue").
92                let labels: String = item
93                    .labels
94                    .iter()
95                    .flat_map(|(a, b)| [a, b])
96                    .map(|s| {
97                        let mut escaped: String = s
98                            .chars()
99                            .map(|c| match c {
100                                '\\' => "\\\\".to_string(),
101                                ',' => "\\,".to_string(),
102                                c => c.to_string(),
103                            })
104                            .collect();
105                        escaped.push(',');
106                        escaped
107                    })
108                    .collect();
109                if !res.stream_by_labels.contains_key(&labels) {
110                    res.stream_by_labels.insert(
111                        labels.clone(),
112                        LokiStream {
113                            stream: item.labels.into_iter().collect(),
114                            values: Vec::new(),
115                        },
116                    );
117                }
118                let stream = res
119                    .stream_by_labels
120                    .get_mut(&labels)
121                    .expect("stream must exist");
122                stream.values.push(item.event);
123                res
124            });
125        for (_k, stream) in result.stream_by_labels.iter_mut() {
126            stream.values.sort_by_key(|e| e.timestamp);
127        }
128        result
129    }
130}
131
132#[derive(Clone, Debug)]
133pub struct LokiEvent {
134    pub timestamp: i64,
135    pub event: Bytes,
136    pub structured_metadata: StructuredMetadata,
137}
138
139impl ByteSizeOf for LokiEvent {
140    fn allocated_bytes(&self) -> usize {
141        self.timestamp.allocated_bytes()
142            + self.event.allocated_bytes()
143            + self.structured_metadata.iter().fold(0, |res, item| {
144                res + item.0.allocated_bytes() + item.1.allocated_bytes()
145            })
146    }
147}
148
149impl Serialize for LokiEvent {
150    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
151    where
152        S: serde::Serializer,
153    {
154        let mut seq = serializer.serialize_seq(Some(3))?;
155        seq.serialize_element(&self.timestamp.to_string())?;
156        let event = String::from_utf8_lossy(&self.event);
157        seq.serialize_element(&event)?;
158        // Convert structured_metadata into a map structure
159        seq.serialize_element(
160            &self
161                .structured_metadata
162                .iter()
163                .cloned()
164                .collect::<HashMap<String, String>>(),
165        )?;
166        seq.end()
167    }
168}
169
170#[derive(Clone, Debug)]
171pub struct LokiRecord {
172    pub partition: PartitionKey,
173    pub labels: Labels,
174    pub event: LokiEvent,
175    pub json_byte_size: JsonSize,
176    pub finalizers: EventFinalizers,
177    pub event_count_tags: TaggedEventsSent,
178}
179
180impl ByteSizeOf for LokiRecord {
181    fn allocated_bytes(&self) -> usize {
182        self.partition.allocated_bytes()
183            + self.labels.iter().fold(0, |res, item| {
184                res + item.0.allocated_bytes() + item.1.allocated_bytes()
185            })
186            + self.event.allocated_bytes()
187    }
188}
189
190impl EstimatedJsonEncodedSizeOf for LokiRecord {
191    fn estimated_json_encoded_size_of(&self) -> JsonSize {
192        self.json_byte_size
193    }
194}
195
196impl EventCount for LokiRecord {
197    fn event_count(&self) -> usize {
198        // A Loki record is mapped one-to-one with an event.
199        1
200    }
201}
202
203impl Finalizable for LokiRecord {
204    fn take_finalizers(&mut self) -> EventFinalizers {
205        std::mem::take(&mut self.finalizers)
206    }
207}
208
209impl GetEventCountTags for LokiRecord {
210    fn get_tags(&self) -> TaggedEventsSent {
211        self.event_count_tags.clone()
212    }
213}
214
215#[derive(Hash, Eq, PartialEq, Clone, Debug)]
216pub struct PartitionKey {
217    pub tenant_id: Option<String>,
218}
219
220impl ByteSizeOf for PartitionKey {
221    fn allocated_bytes(&self) -> usize {
222        self.tenant_id
223            .as_ref()
224            .map(|value| value.allocated_bytes())
225            .unwrap_or(0)
226    }
227}