codecs/encoding/format/
csv.rs

1use bytes::BytesMut;
2use chrono::SecondsFormat;
3use csv_core::{WriteResult, Writer, WriterBuilder};
4use lookup::lookup_v2::ConfigTargetPath;
5use tokio_util::codec::Encoder;
6use vector_config_macros::configurable_component;
7use vector_core::{
8    config::DataType,
9    event::{Event, Value},
10    schema,
11};
12
13use crate::encoding::BuildError;
14
15/// The user configuration to choose the metric tag strategy.
16#[configurable_component]
17#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
18#[serde(rename_all = "snake_case")]
19pub enum QuoteStyle {
20    /// Always puts quotes around every field.
21    Always,
22
23    /// Puts quotes around fields only when necessary.
24    /// They are necessary when fields contain a quote, delimiter, or record terminator.
25    /// Quotes are also necessary when writing an empty record
26    /// (which is indistinguishable from a record with one empty field).
27    #[default]
28    Necessary,
29
30    /// Puts quotes around all fields that are non-numeric.
31    /// This means that when writing a field that does not parse as a valid float or integer,
32    /// quotes are used even if they aren't strictly necessary.
33    NonNumeric,
34
35    /// Never writes quotes, even if it produces invalid CSV data.
36    Never,
37}
38
39/// Config used to build a `CsvSerializer`.
40#[configurable_component]
41#[derive(Debug, Clone)]
42pub struct CsvSerializerConfig {
43    /// The CSV Serializer Options.
44    pub csv: CsvSerializerOptions,
45}
46
47impl CsvSerializerConfig {
48    /// Creates a new `CsvSerializerConfig`.
49    pub const fn new(csv: CsvSerializerOptions) -> Self {
50        Self { csv }
51    }
52
53    /// Build the `CsvSerializer` from this configuration.
54    pub fn build(&self) -> Result<CsvSerializer, BuildError> {
55        if self.csv.fields.is_empty() {
56            Err("At least one CSV field must be specified".into())
57        } else {
58            Ok(CsvSerializer::new(self.clone()))
59        }
60    }
61
62    /// The data type of events that are accepted by `CsvSerializer`.
63    pub fn input_type(&self) -> DataType {
64        DataType::Log
65    }
66
67    /// The schema required by the serializer.
68    pub fn schema_requirement(&self) -> schema::Requirement {
69        // While technically we support `Value` variants that can't be losslessly serialized to
70        // CSV, we don't want to enforce that limitation to users yet.
71        schema::Requirement::empty()
72    }
73}
74
75/// Config used to build a `CsvSerializer`.
76#[configurable_component]
77#[derive(Debug, Clone)]
78pub struct CsvSerializerOptions {
79    /// The field delimiter to use when writing CSV.
80    #[configurable(metadata(docs::type_override = "ascii_char"))]
81    #[serde(
82        default = "default_delimiter",
83        with = "vector_core::serde::ascii_char",
84        skip_serializing_if = "vector_core::serde::is_default"
85    )]
86    pub delimiter: u8,
87
88    /// Enables double quote escapes.
89    ///
90    /// This is enabled by default, but you can disable it. When disabled, quotes in
91    /// field data are escaped instead of doubled.
92    #[serde(
93        default = "default_double_quote",
94        skip_serializing_if = "vector_core::serde::is_default"
95    )]
96    pub double_quote: bool,
97
98    /// The escape character to use when writing CSV.
99    ///
100    /// In some variants of CSV, quotes are escaped using a special escape character
101    /// like \ (instead of escaping quotes by doubling them).
102    ///
103    /// To use this, `double_quotes` needs to be disabled as well; otherwise, this setting is ignored.
104    #[configurable(metadata(docs::type_override = "ascii_char"))]
105    #[serde(
106        default = "default_escape",
107        with = "vector_core::serde::ascii_char",
108        skip_serializing_if = "vector_core::serde::is_default"
109    )]
110    pub escape: u8,
111
112    /// The quote character to use when writing CSV.
113    #[configurable(metadata(docs::type_override = "ascii_char"))]
114    #[serde(
115        default = "default_escape",
116        with = "vector_core::serde::ascii_char",
117        skip_serializing_if = "vector_core::serde::is_default"
118    )]
119    quote: u8,
120
121    /// The quoting style to use when writing CSV data.
122    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
123    pub quote_style: QuoteStyle,
124
125    /// Sets the capacity (in bytes) of the internal buffer used in the CSV writer.
126    /// This defaults to 8192 bytes (8KB).
127    #[serde(default = "default_capacity")]
128    pub capacity: usize,
129
130    /// Configures the fields that are encoded, as well as the order in which they
131    /// appear in the output.
132    ///
133    /// If a field is not present in the event, the output for that field is an empty string.
134    ///
135    /// Values of type `Array`, `Object`, and `Regex` are not supported, and the
136    /// output for any of these types is an empty string.
137    pub fields: Vec<ConfigTargetPath>,
138}
139
140const fn default_delimiter() -> u8 {
141    b','
142}
143
144const fn default_escape() -> u8 {
145    b'"'
146}
147
148const fn default_double_quote() -> bool {
149    true
150}
151
152const fn default_capacity() -> usize {
153    8 * (1 << 10)
154}
155
156impl Default for CsvSerializerOptions {
157    fn default() -> Self {
158        Self {
159            delimiter: default_delimiter(),
160            double_quote: default_double_quote(),
161            escape: default_escape(),
162            quote: default_escape(),
163            quote_style: QuoteStyle::default(),
164            capacity: default_capacity(),
165            fields: Vec::new(),
166        }
167    }
168}
169
170impl CsvSerializerOptions {
171    fn csv_quote_style(&self) -> csv_core::QuoteStyle {
172        match self.quote_style {
173            QuoteStyle::Always => csv_core::QuoteStyle::Always,
174            QuoteStyle::Necessary => csv_core::QuoteStyle::Necessary,
175            QuoteStyle::NonNumeric => csv_core::QuoteStyle::NonNumeric,
176            QuoteStyle::Never => csv_core::QuoteStyle::Never,
177        }
178    }
179}
180
181/// Serializer that converts an `Event` to bytes using the CSV format.
182#[derive(Debug, Clone)]
183pub struct CsvSerializer {
184    // Box because of clippy error: 'large size difference between variants'
185    // in SerializerConfig enum
186    writer: Box<Writer>,
187    fields: Vec<ConfigTargetPath>,
188    internal_buffer: Vec<u8>,
189}
190
191impl CsvSerializer {
192    /// Creates a new `CsvSerializer`.
193    pub fn new(config: CsvSerializerConfig) -> Self {
194        // 'flexible' is not needed since every event is a single context free csv line
195        let writer = Box::new(
196            WriterBuilder::new()
197                .delimiter(config.csv.delimiter)
198                .double_quote(config.csv.double_quote)
199                .escape(config.csv.escape)
200                .quote_style(config.csv.csv_quote_style())
201                .quote(config.csv.quote)
202                .build(),
203        );
204
205        let internal_buffer = if config.csv.capacity < 1 {
206            vec![0; 1]
207        } else {
208            vec![0; config.csv.capacity]
209        };
210
211        Self {
212            writer,
213            internal_buffer,
214            fields: config.csv.fields,
215        }
216    }
217}
218
219impl Encoder<Event> for CsvSerializer {
220    type Error = vector_common::Error;
221
222    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
223        let log = event.into_log();
224
225        let mut used_buffer_bytes = 0;
226        for (fields_written, field) in self.fields.iter().enumerate() {
227            let field_value = log.get(field);
228
229            // write field delimiter
230            if fields_written > 0 {
231                loop {
232                    let (res, bytes_written) = self
233                        .writer
234                        .delimiter(&mut self.internal_buffer[used_buffer_bytes..]);
235                    used_buffer_bytes += bytes_written;
236                    match res {
237                        WriteResult::InputEmpty => {
238                            break;
239                        }
240                        WriteResult::OutputFull => {
241                            buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
242                            used_buffer_bytes = 0;
243                        }
244                    }
245                }
246            }
247
248            // get string value of current field
249            let field_value = match field_value {
250                Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(),
251                Some(Value::Integer(int)) => int.to_string(),
252                Some(Value::Float(float)) => float.to_string(),
253                Some(Value::Boolean(bool)) => bool.to_string(),
254                Some(Value::Timestamp(timestamp)) => {
255                    timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true)
256                }
257                Some(Value::Null) => String::new(),
258                // Other value types: Array, Regex, Object are not supported by the CSV format.
259                Some(_) => String::new(),
260                None => String::new(),
261            };
262
263            // mutable byte_slice so it can be written in chunks if internal_buffer fills up
264            let mut field_value = field_value.as_bytes();
265            // write field_value to internal buffer
266            loop {
267                let (res, bytes_read, bytes_written) = self
268                    .writer
269                    .field(field_value, &mut self.internal_buffer[used_buffer_bytes..]);
270
271                field_value = &field_value[bytes_read..];
272                used_buffer_bytes += bytes_written;
273
274                match res {
275                    WriteResult::InputEmpty => break,
276                    WriteResult::OutputFull => {
277                        buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
278                        used_buffer_bytes = 0;
279                    }
280                }
281            }
282        }
283
284        // finish current event (potentially add closing quotes)
285        loop {
286            let (res, bytes_written) = self
287                .writer
288                .finish(&mut self.internal_buffer[used_buffer_bytes..]);
289            used_buffer_bytes += bytes_written;
290            match res {
291                WriteResult::InputEmpty => break,
292                WriteResult::OutputFull => {
293                    buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
294                    used_buffer_bytes = 0;
295                }
296            }
297        }
298
299        // final flush of internal_buffer
300        if used_buffer_bytes > 0 {
301            buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
302        }
303
304        Ok(())
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use bytes::BytesMut;
311    use chrono::DateTime;
312    use ordered_float::NotNan;
313    use vector_common::btreemap;
314    use vector_core::event::{LogEvent, ObjectMap, Value};
315
316    use super::*;
317
318    fn make_event_with_fields(field_data: Vec<(&str, &str)>) -> (Vec<ConfigTargetPath>, Event) {
319        let mut fields: Vec<ConfigTargetPath> = std::vec::Vec::new();
320        let mut tree = ObjectMap::new();
321
322        for (field_name, field_value) in field_data.into_iter() {
323            let field = field_name.into();
324            fields.push(field);
325
326            let field_value = Value::from(field_value.to_string());
327            tree.insert(field_name.into(), field_value);
328        }
329
330        let event = Event::Log(LogEvent::from(tree));
331        (fields, event)
332    }
333
334    #[test]
335    fn build_error_on_empty_fields() {
336        let opts = CsvSerializerOptions::default();
337        let config = CsvSerializerConfig::new(opts);
338        let err = config.build().unwrap_err();
339        assert_eq!(err.to_string(), "At least one CSV field must be specified");
340    }
341
342    #[test]
343    fn serialize_fields() {
344        let event = Event::Log(LogEvent::from(btreemap! {
345            "foo" => Value::from("bar"),
346            "int" => Value::from(123),
347            "comma" => Value::from("abc,bcd"),
348            "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
349            "space" => Value::from("sp ace"),
350            "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
351            "quote" => Value::from("the \"quote\" should be escaped"),
352            "bool" => Value::from(true),
353            "other" => Value::from("data"),
354        }));
355        let fields = vec![
356            "foo".into(),
357            "int".into(),
358            "comma".into(),
359            "float".into(),
360            "missing".into(),
361            "space".into(),
362            "time".into(),
363            "quote".into(),
364            "bool".into(),
365        ];
366
367        let opts = CsvSerializerOptions {
368            fields,
369            ..Default::default()
370        };
371        let config = CsvSerializerConfig::new(opts);
372        let mut serializer = config.build().unwrap();
373        let mut bytes = BytesMut::new();
374
375        serializer.encode(event, &mut bytes).unwrap();
376
377        assert_eq!(
378            bytes.freeze(),
379            b"bar,123,\"abc,bcd\",3.1415925,,sp ace,2023-02-27T07:04:49.363Z,\"the \"\"quote\"\" should be escaped\",true".as_slice()
380        );
381    }
382
383    #[test]
384    fn serialize_order() {
385        let event = Event::Log(LogEvent::from(btreemap! {
386            "field1" => Value::from("value1"),
387            "field2" => Value::from("value2"),
388            "field3" => Value::from("value3"),
389            "field4" => Value::from("value4"),
390            "field5" => Value::from("value5"),
391        }));
392        let fields = vec![
393            "field1".into(),
394            "field5".into(),
395            "field5".into(),
396            "field3".into(),
397            "field2".into(),
398        ];
399        let opts = CsvSerializerOptions {
400            fields,
401            ..Default::default()
402        };
403        let config = CsvSerializerConfig::new(opts);
404        let mut serializer = config.build().unwrap();
405        let mut bytes = BytesMut::new();
406
407        serializer.encode(event, &mut bytes).unwrap();
408
409        assert_eq!(
410            bytes.freeze(),
411            b"value1,value5,value5,value3,value2".as_slice()
412        );
413    }
414
415    #[test]
416    fn correct_quoting() {
417        let event = Event::Log(LogEvent::from(btreemap! {
418            "field1" => Value::from("hello world"),
419            "field2" => Value::from(1),
420            "field3" => Value::from("foo\"bar"),
421            "field4" => Value::from("baz,bas"),
422        }));
423        let fields = vec![
424            "field1".into(),
425            "field2".into(),
426            "field3".into(),
427            "field4".into(),
428        ];
429
430        let mut default_bytes = BytesMut::new();
431        let mut never_bytes = BytesMut::new();
432        let mut always_bytes = BytesMut::new();
433        let mut non_numeric_bytes = BytesMut::new();
434
435        CsvSerializerConfig::new(CsvSerializerOptions {
436            fields: fields.clone(),
437            ..Default::default()
438        })
439        .build()
440        .unwrap()
441        .encode(event.clone(), &mut default_bytes)
442        .unwrap();
443
444        CsvSerializerConfig::new(CsvSerializerOptions {
445            fields: fields.clone(),
446            quote_style: QuoteStyle::Never,
447            ..Default::default()
448        })
449        .build()
450        .unwrap()
451        .encode(event.clone(), &mut never_bytes)
452        .unwrap();
453
454        CsvSerializerConfig::new(CsvSerializerOptions {
455            fields: fields.clone(),
456            quote_style: QuoteStyle::Always,
457            ..Default::default()
458        })
459        .build()
460        .unwrap()
461        .encode(event.clone(), &mut always_bytes)
462        .unwrap();
463
464        CsvSerializerConfig::new(CsvSerializerOptions {
465            fields: fields.clone(),
466            quote_style: QuoteStyle::NonNumeric,
467            ..Default::default()
468        })
469        .build()
470        .unwrap()
471        .encode(event.clone(), &mut non_numeric_bytes)
472        .unwrap();
473
474        assert_eq!(
475            default_bytes.freeze(),
476            b"hello world,1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
477        );
478        assert_eq!(
479            never_bytes.freeze(),
480            b"hello world,1,foo\"bar,baz,bas".as_slice()
481        );
482        assert_eq!(
483            always_bytes.freeze(),
484            b"\"hello world\",\"1\",\"foo\"\"bar\",\"baz,bas\"".as_slice()
485        );
486        assert_eq!(
487            non_numeric_bytes.freeze(),
488            b"\"hello world\",1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
489        );
490    }
491
492    #[test]
493    fn custom_delimiter() {
494        let (fields, event) =
495            make_event_with_fields(vec![("field1", "value1"), ("field2", "value2")]);
496        let opts = CsvSerializerOptions {
497            fields,
498            delimiter: b'\t',
499            ..Default::default()
500        };
501        let config = CsvSerializerConfig::new(opts);
502        let mut serializer = config.build().unwrap();
503        let mut bytes = BytesMut::new();
504
505        serializer.encode(event, &mut bytes).unwrap();
506
507        assert_eq!(bytes.freeze(), b"value1\tvalue2".as_slice());
508    }
509
510    #[test]
511    fn custom_escape_char() {
512        let (fields, event) = make_event_with_fields(vec![("field1", "foo\"bar")]);
513        let opts = CsvSerializerOptions {
514            fields,
515            double_quote: false,
516            escape: b'\\',
517            ..Default::default()
518        };
519        let config = CsvSerializerConfig::new(opts);
520        let mut serializer = config.build().unwrap();
521        let mut bytes = BytesMut::new();
522
523        serializer.encode(event, &mut bytes).unwrap();
524
525        assert_eq!(bytes.freeze(), b"\"foo\\\"bar\"".as_slice());
526    }
527
528    #[test]
529    fn custom_quote_char() {
530        let (fields, event) = make_event_with_fields(vec![("field1", "foo \" $ bar")]);
531        let opts = CsvSerializerOptions {
532            fields,
533            quote: b'$',
534            ..Default::default()
535        };
536        let config = CsvSerializerConfig::new(opts);
537        let mut serializer = config.build().unwrap();
538        let mut bytes = BytesMut::new();
539
540        serializer.encode(event, &mut bytes).unwrap();
541
542        assert_eq!(bytes.freeze(), b"$foo \" $$ bar$".as_slice());
543    }
544
545    #[test]
546    fn more_input_then_capacity() {
547        let (fields, event) = make_event_with_fields(vec![("field1", "foo bar")]);
548        let opts = CsvSerializerOptions {
549            fields,
550            capacity: 3,
551            ..Default::default()
552        };
553        let config = CsvSerializerConfig::new(opts);
554        let mut serializer = config.build().unwrap();
555        let mut bytes = BytesMut::new();
556
557        serializer.encode(event, &mut bytes).unwrap();
558
559        assert_eq!(bytes.freeze(), b"foo bar".as_slice());
560    }
561
562    #[test]
563    fn multiple_events() {
564        let (fields, event1) = make_event_with_fields(vec![("field1", "foo,")]);
565        let (_, event2) = make_event_with_fields(vec![("field1", "\nbar")]);
566        let opts = CsvSerializerOptions {
567            fields,
568            ..Default::default()
569        };
570        let config = CsvSerializerConfig::new(opts);
571        let mut serializer = config.build().unwrap();
572        let mut bytes = BytesMut::new();
573
574        serializer.encode(event1, &mut bytes).unwrap();
575        serializer.encode(event2, &mut bytes).unwrap();
576
577        assert_eq!(bytes.freeze(), b"\"foo,\"\"\nbar\"".as_slice());
578    }
579}