vrl/stdlib/
decode_lz4.rs

1use crate::compiler::prelude::*;
2use lz4_flex::block::{decompress, decompress_size_prepended};
3use lz4_flex::frame::FrameDecoder;
4use std::io;
5use std::sync::LazyLock;
6
7static DEFAULT_BUF_SIZE: LazyLock<Value> = LazyLock::new(|| Value::Integer(1_000_000));
8static DEFAULT_PREPENDED_SIZE: LazyLock<Value> = LazyLock::new(|| Value::Boolean(false));
9
10const LZ4_FRAME_MAGIC: [u8; 4] = [0x04, 0x22, 0x4D, 0x18];
11
12static PARAMETERS: LazyLock<Vec<Parameter>> = LazyLock::new(|| {
13    vec![
14        Parameter::required("value", kind::BYTES, "The lz4 block data to decode."),
15        Parameter::optional("buf_size", kind::INTEGER, "The size of the buffer to decode into, this must be equal to or larger than the uncompressed size.")
16            .default(&DEFAULT_BUF_SIZE),
17        Parameter::optional("prepended_size", kind::BOOLEAN, "Some implementations of lz4 require the original uncompressed size to be prepended to the compressed data.")
18            .default(&DEFAULT_PREPENDED_SIZE),
19    ]
20});
21
22#[derive(Clone, Copy, Debug)]
23pub struct DecodeLz4;
24
25impl Function for DecodeLz4 {
26    fn identifier(&self) -> &'static str {
27        "decode_lz4"
28    }
29
30    fn usage(&self) -> &'static str {
31        "Decodes the `value` (an lz4 string) into its original string. `buf_size` is the size of the buffer to decode into, this must be equal to or larger than the uncompressed size.
32        If `prepended_size` is set to `true`, it expects the original uncompressed size to be prepended to the compressed data.
33        `prepended_size` is useful for some implementations of lz4 that require the original size to be known before decoding."
34    }
35
36    fn category(&self) -> &'static str {
37        Category::Codec.as_ref()
38    }
39
40    fn internal_failure_reasons(&self) -> &'static [&'static str] {
41        &[
42            "`value` unable to decode value with lz4 frame decoder.",
43            "`value` unable to decode value with lz4 block decoder.",
44            "`value` unable to decode because the output is too large for the buffer.",
45            "`value` unable to decode because the prepended size is not a valid integer.",
46        ]
47    }
48
49    fn return_kind(&self) -> u16 {
50        kind::BYTES
51    }
52
53    fn examples(&self) -> &'static [Example] {
54        &[
55            example! {
56                title: "LZ4 block with prepended size",
57                source: r#"decode_lz4!(decode_base64!("LAAAAPAdVGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVyIDEzIGxhenkgZG9ncy4="), prepended_size: true)"#,
58                result: Ok("The quick brown fox jumps over 13 lazy dogs."),
59            },
60            example! {
61                title: "Decode Lz4 data without prepended size.",
62                source: r#"decode_lz4!(decode_base64!("BCJNGGBAgiwAAIBUaGUgcXVpY2sgYnJvd24gZm94IGp1bXBzIG92ZXIgMTMgbGF6eSBkb2dzLgAAAAA="))"#,
63                result: Ok("The quick brown fox jumps over 13 lazy dogs."),
64            },
65        ]
66    }
67
68    fn compile(
69        &self,
70        _state: &state::TypeState,
71        _ctx: &mut FunctionCompileContext,
72        arguments: ArgumentList,
73    ) -> Compiled {
74        let value = arguments.required("value");
75        let buf_size = arguments.optional("buf_size");
76        let prepended_size = arguments.optional("prepended_size");
77
78        Ok(DecodeLz4Fn {
79            value,
80            buf_size,
81            prepended_size,
82        }
83        .as_expr())
84    }
85
86    fn parameters(&self) -> &'static [Parameter] {
87        PARAMETERS.as_slice()
88    }
89}
90
91#[derive(Clone, Debug)]
92struct DecodeLz4Fn {
93    value: Box<dyn Expression>,
94    buf_size: Option<Box<dyn Expression>>,
95    prepended_size: Option<Box<dyn Expression>>,
96}
97
98impl FunctionExpression for DecodeLz4Fn {
99    fn resolve(&self, ctx: &mut Context) -> Resolved {
100        let value = self.value.resolve(ctx)?;
101        let buf_size = self
102            .buf_size
103            .map_resolve_with_default(ctx, || DEFAULT_BUF_SIZE.clone())?
104            .try_integer()?;
105        let prepended_size = self
106            .prepended_size
107            .map_resolve_with_default(ctx, || DEFAULT_PREPENDED_SIZE.clone())?
108            .try_boolean()?;
109
110        let buffer_size: usize;
111        if let Ok(sz) = u32::try_from(buf_size) {
112            buffer_size = sz as usize;
113        } else {
114            // If the buffer size is too large, we default to a maximum size
115            buffer_size = usize::MAX;
116        }
117        decode_lz4(value, buffer_size, prepended_size)
118    }
119
120    fn type_def(&self, _: &state::TypeState) -> TypeDef {
121        // Always fallible due to the possibility of decoding errors that VRL can't detect
122        TypeDef::bytes().fallible()
123    }
124}
125
126fn decode_lz4(value: Value, buf_size: usize, prepended_size: bool) -> Resolved {
127    let compressed_data = value.try_bytes()?;
128
129    if is_lz4_frame(&compressed_data) {
130        decode_lz4_frame(&compressed_data, buf_size)
131    } else {
132        decode_lz4_block(&compressed_data, buf_size, prepended_size)
133    }
134}
135
136fn is_lz4_frame(data: &[u8]) -> bool {
137    data.starts_with(&LZ4_FRAME_MAGIC)
138}
139
140fn decode_lz4_frame(compressed_data: &[u8], initial_capacity: usize) -> Resolved {
141    let mut output_buffer = Vec::with_capacity(initial_capacity);
142    let mut decoder = FrameDecoder::new(std::io::Cursor::new(compressed_data));
143
144    match io::copy(&mut decoder, &mut output_buffer) {
145        Ok(_) => Ok(Value::Bytes(output_buffer.into())),
146        Err(e) => Err(format!("unable to decode value with lz4 frame decoder: {e}").into()),
147    }
148}
149
150fn decode_lz4_block(compressed_data: &[u8], buf_size: usize, prepended_size: bool) -> Resolved {
151    let decompression_result = if prepended_size {
152        // The compressed data includes the original size as a prefix
153        decompress_size_prepended(compressed_data)
154    } else {
155        // We need to provide the buffer size for decompression
156        decompress(compressed_data, buf_size)
157    };
158
159    match decompression_result {
160        Ok(decompressed_data) => Ok(Value::Bytes(decompressed_data.into())),
161        Err(e) => Err(format!("unable to decode value with lz4 block decoder: {e}").into()),
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168    use crate::value;
169
170    use nom::AsBytes;
171
172    // Define a constant for 256 KB, used in tests
173    const KB_256: usize = 262_144;
174
175    fn decode_base64(text: &str) -> Vec<u8> {
176        base64_simd::STANDARD
177            .decode_to_vec(text)
178            .expect("Cannot decode from Base64")
179    }
180
181    test_function![
182    decode_lz4 => DecodeLz4;
183
184    right_lz4_block {
185        args: func_args![value: value!(decode_base64("LAAAAPAdVGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVyIDk5IGxhenkgZG9ncy4=").as_bytes()), prepended_size: value!(true)],
186        want: Ok(value!(b"The quick brown fox jumps over 99 lazy dogs.")),
187        tdef: TypeDef::bytes().fallible(),
188    }
189
190    right_lz4_frame {
191        args: func_args![value: value!(decode_base64("BCJNGGBAgiwAAIBUaGUgcXVpY2sgYnJvd24gZm94IGp1bXBzIG92ZXIgMTMgbGF6eSBkb2dzLgAAAAA=").as_bytes())],
192        want: Ok(value!(b"The quick brown fox jumps over 13 lazy dogs.")),
193        tdef: TypeDef::bytes().fallible(),
194    }
195
196    right_lz4_block_no_prepend_size_with_buffer_size {
197        args: func_args![value: value!(decode_base64("8B1UaGUgcXVpY2sgYnJvd24gZm94IGp1bXBzIG92ZXIgMTMgbGF6eSBkb2dzLg==").as_bytes()), buf_size: value!(KB_256), prepended_size: value!(false)],
198        want: Ok(value!(b"The quick brown fox jumps over 13 lazy dogs.")),
199        tdef: TypeDef::bytes().fallible(),
200    }
201
202    right_lz4_frame_grow_buffer_size_from_zero {
203        args: func_args![value: value!(decode_base64("BCJNGGBAgiwAAIBUaGUgcXVpY2sgYnJvd24gZm94IGp1bXBzIG92ZXIgMTMgbGF6eSBkb2dzLgAAAAA=").as_bytes()), buf_size: value!(0), prepended_size: value!(false)],
204        want: Ok(value!(b"The quick brown fox jumps over 13 lazy dogs.")),
205        tdef: TypeDef::bytes().fallible(),
206    }
207
208    wrong_lz4_block_grow_buffer_size_from_zero_no_prepended_size {
209        args: func_args![value: value!(decode_base64("LAAAAPAdVGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVyIDEzIGxhenkgZG9ncy4=").as_bytes()), buf_size: value!(0), prepended_size: value!(false)],
210        want: Err("unable to decode value with lz4 block decoder: provided output is too small for the decompressed data, actual 0, expected 2"),
211        tdef: TypeDef::bytes().fallible(),
212    }
213
214    wrong_lz4 {
215        args: func_args![value: value!("xxxxxxxxx"), buf_size: value!(10), prepended_size: value!(false)],
216        want: Err("unable to decode value with lz4 block decoder: expected another byte, found none"),
217        tdef: TypeDef::bytes().fallible(),
218    }
219
220    wrong_lz4_block_false_prepended_size {
221        args: func_args![value: value!(decode_base64("LAAAAPAdVGhlIHF1aWNrIGJyb3duIGZveCBqdW1wcyBvdmVyIDEzIGxhenkgZG9ncy4=").as_bytes()), prepended_size: value!(false)],
222        want: Err("unable to decode value with lz4 block decoder: the offset to copy is not contained in the decompressed buffer"),
223        tdef: TypeDef::bytes().fallible(),
224    }];
225}