vector/sinks/util/buffer/
json.rs

1use serde_json::value::{to_raw_value, RawValue, Value};
2
3use super::super::batch::{err_event_too_large, Batch, BatchSize, PushResult};
4
5pub type BoxedRawValue = Box<RawValue>;
6
7/// A `batch` implementation for storing an array of json
8/// values.
9///
10/// Note: This has been deprecated, please do not use when creating new Sinks.
11#[derive(Debug)]
12pub struct JsonArrayBuffer {
13    buffer: Vec<BoxedRawValue>,
14    total_bytes: usize,
15    settings: BatchSize<Self>,
16}
17
18impl JsonArrayBuffer {
19    pub const fn new(settings: BatchSize<Self>) -> Self {
20        Self {
21            buffer: Vec::new(),
22            total_bytes: 0,
23            settings,
24        }
25    }
26}
27
28impl Batch for JsonArrayBuffer {
29    type Input = Value;
30    type Output = Vec<BoxedRawValue>;
31
32    fn push(&mut self, item: Self::Input) -> PushResult<Self::Input> {
33        let raw_item = to_raw_value(&item).expect("Value should be valid json");
34        let new_len = self.total_bytes + raw_item.get().len() + 1;
35        if self.is_empty() && new_len >= self.settings.bytes {
36            err_event_too_large(raw_item.get().len(), self.settings.bytes)
37        } else if self.buffer.len() >= self.settings.events || new_len > self.settings.bytes {
38            PushResult::Overflow(item)
39        } else {
40            self.total_bytes = new_len;
41            self.buffer.push(raw_item);
42            PushResult::Ok(
43                self.buffer.len() >= self.settings.events || new_len >= self.settings.bytes,
44            )
45        }
46    }
47
48    fn is_empty(&self) -> bool {
49        self.buffer.is_empty()
50    }
51
52    fn fresh(&self) -> Self {
53        Self::new(self.settings)
54    }
55
56    fn finish(self) -> Self::Output {
57        self.buffer
58    }
59
60    fn num_items(&self) -> usize {
61        self.buffer.len()
62    }
63}
64
65#[cfg(test)]
66mod tests {
67    use serde_json::json;
68
69    use super::{super::PushResult, *};
70    use crate::sinks::util::BatchSettings;
71
72    #[test]
73    fn multi_object_array() {
74        let mut batch_settings = BatchSettings::default();
75        batch_settings.size.bytes = 9999;
76        batch_settings.size.events = 2;
77
78        let mut buffer = JsonArrayBuffer::new(batch_settings.size);
79
80        assert_eq!(
81            buffer.push(json!({
82                "key1": "value1"
83            })),
84            PushResult::Ok(false)
85        );
86
87        assert_eq!(
88            buffer.push(json!({
89                "key2": "value2"
90            })),
91            PushResult::Ok(true)
92        );
93
94        assert!(matches!(buffer.push(json!({})), PushResult::Overflow(_)));
95
96        assert_eq!(buffer.num_items(), 2);
97        assert_eq!(buffer.total_bytes, 36);
98
99        let json = buffer.finish();
100
101        let wrapped = serde_json::to_string(&json!({
102            "arr": json,
103        }))
104        .unwrap();
105
106        let expected = serde_json::to_string(&json!({
107            "arr": [
108                {
109                    "key1": "value1"
110                },
111                {
112                    "key2": "value2"
113                },
114            ]
115        }))
116        .unwrap();
117
118        assert_eq!(wrapped, expected);
119    }
120}