codecs/encoding/format/
csv.rs

1use crate::encoding::BuildError;
2use bytes::BytesMut;
3use chrono::SecondsFormat;
4use csv_core::{WriteResult, Writer, WriterBuilder};
5use lookup::lookup_v2::ConfigTargetPath;
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9    config::DataType,
10    event::{Event, Value},
11    schema,
12};
13
14/// The user configuration to choose the metric tag strategy.
15#[configurable_component]
16#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
17#[serde(rename_all = "snake_case")]
18pub enum QuoteStyle {
19    /// Always puts quotes around every field.
20    Always,
21
22    /// Puts quotes around fields only when necessary.
23    /// They are necessary when fields contain a quote, delimiter, or record terminator.
24    /// Quotes are also necessary when writing an empty record
25    /// (which is indistinguishable from a record with one empty field).
26    #[default]
27    Necessary,
28
29    /// Puts quotes around all fields that are non-numeric.
30    /// This means that when writing a field that does not parse as a valid float or integer,
31    /// quotes are used even if they aren't strictly necessary.
32    NonNumeric,
33
34    /// Never writes quotes, even if it produces invalid CSV data.
35    Never,
36}
37
38/// Config used to build a `CsvSerializer`.
39#[configurable_component]
40#[derive(Debug, Clone)]
41pub struct CsvSerializerConfig {
42    /// The CSV Serializer Options.
43    pub csv: CsvSerializerOptions,
44}
45
46impl CsvSerializerConfig {
47    /// Creates a new `CsvSerializerConfig`.
48    pub const fn new(csv: CsvSerializerOptions) -> Self {
49        Self { csv }
50    }
51
52    /// Build the `CsvSerializer` from this configuration.
53    pub fn build(&self) -> Result<CsvSerializer, BuildError> {
54        if self.csv.fields.is_empty() {
55            Err("At least one CSV field must be specified".into())
56        } else {
57            Ok(CsvSerializer::new(self.clone()))
58        }
59    }
60
61    /// The data type of events that are accepted by `CsvSerializer`.
62    pub fn input_type(&self) -> DataType {
63        DataType::Log
64    }
65
66    /// The schema required by the serializer.
67    pub fn schema_requirement(&self) -> schema::Requirement {
68        // While technically we support `Value` variants that can't be losslessly serialized to
69        // CSV, we don't want to enforce that limitation to users yet.
70        schema::Requirement::empty()
71    }
72}
73
74/// Config used to build a `CsvSerializer`.
75#[configurable_component]
76#[derive(Debug, Clone)]
77pub struct CsvSerializerOptions {
78    /// The field delimiter to use when writing CSV.
79    #[configurable(metadata(docs::type_override = "ascii_char"))]
80    #[serde(
81        default = "default_delimiter",
82        with = "vector_core::serde::ascii_char",
83        skip_serializing_if = "vector_core::serde::is_default"
84    )]
85    pub delimiter: u8,
86
87    /// Enables double quote escapes.
88    ///
89    /// This is enabled by default, but you can disable it. When disabled, quotes in
90    /// field data are escaped instead of doubled.
91    #[serde(
92        default = "default_double_quote",
93        skip_serializing_if = "vector_core::serde::is_default"
94    )]
95    pub double_quote: bool,
96
97    /// The escape character to use when writing CSV.
98    ///
99    /// In some variants of CSV, quotes are escaped using a special escape character
100    /// like \ (instead of escaping quotes by doubling them).
101    ///
102    /// To use this, `double_quotes` needs to be disabled as well; otherwise, this setting is ignored.
103    #[configurable(metadata(docs::type_override = "ascii_char"))]
104    #[serde(
105        default = "default_escape",
106        with = "vector_core::serde::ascii_char",
107        skip_serializing_if = "vector_core::serde::is_default"
108    )]
109    pub escape: u8,
110
111    /// The quote character to use when writing CSV.
112    #[configurable(metadata(docs::type_override = "ascii_char"))]
113    #[serde(
114        default = "default_escape",
115        with = "vector_core::serde::ascii_char",
116        skip_serializing_if = "vector_core::serde::is_default"
117    )]
118    quote: u8,
119
120    /// The quoting style to use when writing CSV data.
121    #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
122    pub quote_style: QuoteStyle,
123
124    /// Sets the capacity (in bytes) of the internal buffer used in the CSV writer.
125    /// This defaults to 8KB.
126    #[serde(default = "default_capacity")]
127    pub capacity: usize,
128
129    /// Configures the fields that are encoded, as well as the order in which they
130    /// appear in the output.
131    ///
132    /// If a field is not present in the event, the output for that field is an empty string.
133    ///
134    /// Values of type `Array`, `Object`, and `Regex` are not supported, and the
135    /// output for any of these types is an empty string.
136    pub fields: Vec<ConfigTargetPath>,
137}
138
139const fn default_delimiter() -> u8 {
140    b','
141}
142
143const fn default_escape() -> u8 {
144    b'"'
145}
146
147const fn default_double_quote() -> bool {
148    true
149}
150
151const fn default_capacity() -> usize {
152    8 * (1 << 10)
153}
154
155impl Default for CsvSerializerOptions {
156    fn default() -> Self {
157        Self {
158            delimiter: default_delimiter(),
159            double_quote: default_double_quote(),
160            escape: default_escape(),
161            quote: default_escape(),
162            quote_style: QuoteStyle::default(),
163            capacity: default_capacity(),
164            fields: Vec::new(),
165        }
166    }
167}
168
169impl CsvSerializerOptions {
170    fn csv_quote_style(&self) -> csv_core::QuoteStyle {
171        match self.quote_style {
172            QuoteStyle::Always => csv_core::QuoteStyle::Always,
173            QuoteStyle::Necessary => csv_core::QuoteStyle::Necessary,
174            QuoteStyle::NonNumeric => csv_core::QuoteStyle::NonNumeric,
175            QuoteStyle::Never => csv_core::QuoteStyle::Never,
176        }
177    }
178}
179
180/// Serializer that converts an `Event` to bytes using the CSV format.
181#[derive(Debug, Clone)]
182pub struct CsvSerializer {
183    // Box because of clippy error: 'large size difference between variants'
184    // in SerializerConfig enum
185    writer: Box<Writer>,
186    fields: Vec<ConfigTargetPath>,
187    internal_buffer: Vec<u8>,
188}
189
190impl CsvSerializer {
191    /// Creates a new `CsvSerializer`.
192    pub fn new(config: CsvSerializerConfig) -> Self {
193        // 'flexible' is not needed since every event is a single context free csv line
194        let writer = Box::new(
195            WriterBuilder::new()
196                .delimiter(config.csv.delimiter)
197                .double_quote(config.csv.double_quote)
198                .escape(config.csv.escape)
199                .quote_style(config.csv.csv_quote_style())
200                .quote(config.csv.quote)
201                .build(),
202        );
203
204        let internal_buffer = if config.csv.capacity < 1 {
205            vec![0; 1]
206        } else {
207            vec![0; config.csv.capacity]
208        };
209
210        Self {
211            writer,
212            internal_buffer,
213            fields: config.csv.fields,
214        }
215    }
216}
217
218impl Encoder<Event> for CsvSerializer {
219    type Error = vector_common::Error;
220
221    fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
222        let log = event.into_log();
223
224        let mut used_buffer_bytes = 0;
225        for (fields_written, field) in self.fields.iter().enumerate() {
226            let field_value = log.get(field);
227
228            // write field delimiter
229            if fields_written > 0 {
230                loop {
231                    let (res, bytes_written) = self
232                        .writer
233                        .delimiter(&mut self.internal_buffer[used_buffer_bytes..]);
234                    used_buffer_bytes += bytes_written;
235                    match res {
236                        WriteResult::InputEmpty => {
237                            break;
238                        }
239                        WriteResult::OutputFull => {
240                            buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
241                            used_buffer_bytes = 0;
242                        }
243                    }
244                }
245            }
246
247            // get string value of current field
248            let field_value = match field_value {
249                Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(),
250                Some(Value::Integer(int)) => int.to_string(),
251                Some(Value::Float(float)) => float.to_string(),
252                Some(Value::Boolean(bool)) => bool.to_string(),
253                Some(Value::Timestamp(timestamp)) => {
254                    timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true)
255                }
256                Some(Value::Null) => String::new(),
257                // Other value types: Array, Regex, Object are not supported by the CSV format.
258                Some(_) => String::new(),
259                None => String::new(),
260            };
261
262            // mutable byte_slice so it can be written in chunks if internal_buffer fills up
263            let mut field_value = field_value.as_bytes();
264            // write field_value to internal buffer
265            loop {
266                let (res, bytes_read, bytes_written) = self
267                    .writer
268                    .field(field_value, &mut self.internal_buffer[used_buffer_bytes..]);
269
270                field_value = &field_value[bytes_read..];
271                used_buffer_bytes += bytes_written;
272
273                match res {
274                    WriteResult::InputEmpty => break,
275                    WriteResult::OutputFull => {
276                        buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
277                        used_buffer_bytes = 0;
278                    }
279                }
280            }
281        }
282
283        // finish current event (potentially add closing quotes)
284        loop {
285            let (res, bytes_written) = self
286                .writer
287                .finish(&mut self.internal_buffer[used_buffer_bytes..]);
288            used_buffer_bytes += bytes_written;
289            match res {
290                WriteResult::InputEmpty => break,
291                WriteResult::OutputFull => {
292                    buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
293                    used_buffer_bytes = 0;
294                }
295            }
296        }
297
298        // final flush of internal_buffer
299        if used_buffer_bytes > 0 {
300            buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
301        }
302
303        Ok(())
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use bytes::BytesMut;
310    use chrono::DateTime;
311    use ordered_float::NotNan;
312    use vector_common::btreemap;
313    use vector_core::event::{LogEvent, ObjectMap, Value};
314
315    use super::*;
316
317    fn make_event_with_fields(field_data: Vec<(&str, &str)>) -> (Vec<ConfigTargetPath>, Event) {
318        let mut fields: Vec<ConfigTargetPath> = std::vec::Vec::new();
319        let mut tree = ObjectMap::new();
320
321        for (field_name, field_value) in field_data.into_iter() {
322            let field = field_name.into();
323            fields.push(field);
324
325            let field_value = Value::from(field_value.to_string());
326            tree.insert(field_name.into(), field_value);
327        }
328
329        let event = Event::Log(LogEvent::from(tree));
330        (fields, event)
331    }
332
333    #[test]
334    fn build_error_on_empty_fields() {
335        let opts = CsvSerializerOptions::default();
336        let config = CsvSerializerConfig::new(opts);
337        let err = config.build().unwrap_err();
338        assert_eq!(err.to_string(), "At least one CSV field must be specified");
339    }
340
341    #[test]
342    fn serialize_fields() {
343        let event = Event::Log(LogEvent::from(btreemap! {
344            "foo" => Value::from("bar"),
345            "int" => Value::from(123),
346            "comma" => Value::from("abc,bcd"),
347            "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
348            "space" => Value::from("sp ace"),
349            "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
350            "quote" => Value::from("the \"quote\" should be escaped"),
351            "bool" => Value::from(true),
352            "other" => Value::from("data"),
353        }));
354        let fields = vec![
355            "foo".into(),
356            "int".into(),
357            "comma".into(),
358            "float".into(),
359            "missing".into(),
360            "space".into(),
361            "time".into(),
362            "quote".into(),
363            "bool".into(),
364        ];
365
366        let opts = CsvSerializerOptions {
367            fields,
368            ..Default::default()
369        };
370        let config = CsvSerializerConfig::new(opts);
371        let mut serializer = config.build().unwrap();
372        let mut bytes = BytesMut::new();
373
374        serializer.encode(event, &mut bytes).unwrap();
375
376        assert_eq!(
377            bytes.freeze(),
378            b"bar,123,\"abc,bcd\",3.1415925,,sp ace,2023-02-27T07:04:49.363Z,\"the \"\"quote\"\" should be escaped\",true".as_slice()
379        );
380    }
381
382    #[test]
383    fn serialize_order() {
384        let event = Event::Log(LogEvent::from(btreemap! {
385            "field1" => Value::from("value1"),
386            "field2" => Value::from("value2"),
387            "field3" => Value::from("value3"),
388            "field4" => Value::from("value4"),
389            "field5" => Value::from("value5"),
390        }));
391        let fields = vec![
392            "field1".into(),
393            "field5".into(),
394            "field5".into(),
395            "field3".into(),
396            "field2".into(),
397        ];
398        let opts = CsvSerializerOptions {
399            fields,
400            ..Default::default()
401        };
402        let config = CsvSerializerConfig::new(opts);
403        let mut serializer = config.build().unwrap();
404        let mut bytes = BytesMut::new();
405
406        serializer.encode(event, &mut bytes).unwrap();
407
408        assert_eq!(
409            bytes.freeze(),
410            b"value1,value5,value5,value3,value2".as_slice()
411        );
412    }
413
414    #[test]
415    fn correct_quoting() {
416        let event = Event::Log(LogEvent::from(btreemap! {
417            "field1" => Value::from("hello world"),
418            "field2" => Value::from(1),
419            "field3" => Value::from("foo\"bar"),
420            "field4" => Value::from("baz,bas"),
421        }));
422        let fields = vec![
423            "field1".into(),
424            "field2".into(),
425            "field3".into(),
426            "field4".into(),
427        ];
428
429        let mut default_bytes = BytesMut::new();
430        let mut never_bytes = BytesMut::new();
431        let mut always_bytes = BytesMut::new();
432        let mut non_numeric_bytes = BytesMut::new();
433
434        CsvSerializerConfig::new(CsvSerializerOptions {
435            fields: fields.clone(),
436            ..Default::default()
437        })
438        .build()
439        .unwrap()
440        .encode(event.clone(), &mut default_bytes)
441        .unwrap();
442
443        CsvSerializerConfig::new(CsvSerializerOptions {
444            fields: fields.clone(),
445            quote_style: QuoteStyle::Never,
446            ..Default::default()
447        })
448        .build()
449        .unwrap()
450        .encode(event.clone(), &mut never_bytes)
451        .unwrap();
452
453        CsvSerializerConfig::new(CsvSerializerOptions {
454            fields: fields.clone(),
455            quote_style: QuoteStyle::Always,
456            ..Default::default()
457        })
458        .build()
459        .unwrap()
460        .encode(event.clone(), &mut always_bytes)
461        .unwrap();
462
463        CsvSerializerConfig::new(CsvSerializerOptions {
464            fields: fields.clone(),
465            quote_style: QuoteStyle::NonNumeric,
466            ..Default::default()
467        })
468        .build()
469        .unwrap()
470        .encode(event.clone(), &mut non_numeric_bytes)
471        .unwrap();
472
473        assert_eq!(
474            default_bytes.freeze(),
475            b"hello world,1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
476        );
477        assert_eq!(
478            never_bytes.freeze(),
479            b"hello world,1,foo\"bar,baz,bas".as_slice()
480        );
481        assert_eq!(
482            always_bytes.freeze(),
483            b"\"hello world\",\"1\",\"foo\"\"bar\",\"baz,bas\"".as_slice()
484        );
485        assert_eq!(
486            non_numeric_bytes.freeze(),
487            b"\"hello world\",1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
488        );
489    }
490
491    #[test]
492    fn custom_delimiter() {
493        let (fields, event) =
494            make_event_with_fields(vec![("field1", "value1"), ("field2", "value2")]);
495        let opts = CsvSerializerOptions {
496            fields,
497            delimiter: b'\t',
498            ..Default::default()
499        };
500        let config = CsvSerializerConfig::new(opts);
501        let mut serializer = config.build().unwrap();
502        let mut bytes = BytesMut::new();
503
504        serializer.encode(event, &mut bytes).unwrap();
505
506        assert_eq!(bytes.freeze(), b"value1\tvalue2".as_slice());
507    }
508
509    #[test]
510    fn custom_escape_char() {
511        let (fields, event) = make_event_with_fields(vec![("field1", "foo\"bar")]);
512        let opts = CsvSerializerOptions {
513            fields,
514            double_quote: false,
515            escape: b'\\',
516            ..Default::default()
517        };
518        let config = CsvSerializerConfig::new(opts);
519        let mut serializer = config.build().unwrap();
520        let mut bytes = BytesMut::new();
521
522        serializer.encode(event, &mut bytes).unwrap();
523
524        assert_eq!(bytes.freeze(), b"\"foo\\\"bar\"".as_slice());
525    }
526
527    #[test]
528    fn custom_quote_char() {
529        let (fields, event) = make_event_with_fields(vec![("field1", "foo \" $ bar")]);
530        let opts = CsvSerializerOptions {
531            fields,
532            quote: b'$',
533            ..Default::default()
534        };
535        let config = CsvSerializerConfig::new(opts);
536        let mut serializer = config.build().unwrap();
537        let mut bytes = BytesMut::new();
538
539        serializer.encode(event, &mut bytes).unwrap();
540
541        assert_eq!(bytes.freeze(), b"$foo \" $$ bar$".as_slice());
542    }
543
544    #[test]
545    fn more_input_then_capacity() {
546        let (fields, event) = make_event_with_fields(vec![("field1", "foo bar")]);
547        let opts = CsvSerializerOptions {
548            fields,
549            capacity: 3,
550            ..Default::default()
551        };
552        let config = CsvSerializerConfig::new(opts);
553        let mut serializer = config.build().unwrap();
554        let mut bytes = BytesMut::new();
555
556        serializer.encode(event, &mut bytes).unwrap();
557
558        assert_eq!(bytes.freeze(), b"foo bar".as_slice());
559    }
560
561    #[test]
562    fn multiple_events() {
563        let (fields, event1) = make_event_with_fields(vec![("field1", "foo,")]);
564        let (_, event2) = make_event_with_fields(vec![("field1", "\nbar")]);
565        let opts = CsvSerializerOptions {
566            fields,
567            ..Default::default()
568        };
569        let config = CsvSerializerConfig::new(opts);
570        let mut serializer = config.build().unwrap();
571        let mut bytes = BytesMut::new();
572
573        serializer.encode(event1, &mut bytes).unwrap();
574        serializer.encode(event2, &mut bytes).unwrap();
575
576        assert_eq!(bytes.freeze(), b"\"foo,\"\"\nbar\"".as_slice());
577    }
578}