vector/sinks/util/buffer/
json.rs1use serde_json::value::{to_raw_value, RawValue, Value};
2
3use super::super::batch::{err_event_too_large, Batch, BatchSize, PushResult};
4
5pub type BoxedRawValue = Box<RawValue>;
6
7#[derive(Debug)]
12pub struct JsonArrayBuffer {
13 buffer: Vec<BoxedRawValue>,
14 total_bytes: usize,
15 settings: BatchSize<Self>,
16}
17
18impl JsonArrayBuffer {
19 pub const fn new(settings: BatchSize<Self>) -> Self {
20 Self {
21 buffer: Vec::new(),
22 total_bytes: 0,
23 settings,
24 }
25 }
26}
27
28impl Batch for JsonArrayBuffer {
29 type Input = Value;
30 type Output = Vec<BoxedRawValue>;
31
32 fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
33 let raw_item = to_raw_value(&item).expect("Value should be valid json");
34 let new_len = self.total_bytes + raw_item.get().len() + 1;
35 if self.is_empty() && new_len >= self.settings.bytes {
36 err_event_too_large(raw_item.get().len(), self.settings.bytes)
37 } else if self.buffer.len() >= self.settings.events || new_len > self.settings.bytes {
38 PushResult::Overflow(item)
39 } else {
40 self.total_bytes = new_len;
41 self.buffer.push(raw_item);
42 PushResult::Ok(
43 self.buffer.len() >= self.settings.events || new_len >= self.settings.bytes,
44 )
45 }
46 }
47
48 fn is_empty(&self) -> bool {
49 self.buffer.is_empty()
50 }
51
52 fn fresh(&self) -> Self {
53 Self::new(self.settings)
54 }
55
56 fn finish(self) -> Self::Output {
57 self.buffer
58 }
59
60 fn num_items(&self) -> usize {
61 self.buffer.len()
62 }
63}
64
65#[cfg(test)]
66mod tests {
67 use serde_json::json;
68
69 use super::{super::PushResult, *};
70 use crate::sinks::util::BatchSettings;
71
72 #[test]
73 fn multi_object_array() {
74 let mut batch_settings = BatchSettings::default();
75 batch_settings.size.bytes = 9999;
76 batch_settings.size.events = 2;
77
78 let mut buffer = JsonArrayBuffer::new(batch_settings.size);
79
80 assert_eq!(
81 buffer.push(json!({
82 "key1": "value1"
83 })),
84 PushResult::Ok(false)
85 );
86
87 assert_eq!(
88 buffer.push(json!({
89 "key2": "value2"
90 })),
91 PushResult::Ok(true)
92 );
93
94 assert!(matches!(buffer.push(json!({})), PushResult::Overflow(_)));
95
96 assert_eq!(buffer.num_items(), 2);
97 assert_eq!(buffer.total_bytes, 36);
98
99 let json = buffer.finish();
100
101 let wrapped = serde_json::to_string(&json!({
102 "arr": json,
103 }))
104 .unwrap();
105
106 let expected = serde_json::to_string(&json!({
107 "arr": [
108 {
109 "key1": "value1"
110 },
111 {
112 "key2": "value2"
113 },
114 ]
115 }))
116 .unwrap();
117
118 assert_eq!(wrapped, expected);
119 }
120}