1use bytes::BytesMut;
2use chrono::SecondsFormat;
3use csv_core::{WriteResult, Writer, WriterBuilder};
4use lookup::lookup_v2::ConfigTargetPath;
5use tokio_util::codec::Encoder;
6use vector_config_macros::configurable_component;
7use vector_core::{
8 config::DataType,
9 event::{Event, Value},
10 schema,
11};
12
13use crate::encoding::BuildError;
14
15#[configurable_component]
17#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
18#[serde(rename_all = "snake_case")]
19pub enum QuoteStyle {
20 Always,
22
23 #[default]
28 Necessary,
29
30 NonNumeric,
34
35 Never,
37}
38
39#[configurable_component]
41#[derive(Debug, Clone)]
42pub struct CsvSerializerConfig {
43 pub csv: CsvSerializerOptions,
45}
46
47impl CsvSerializerConfig {
48 pub const fn new(csv: CsvSerializerOptions) -> Self {
50 Self { csv }
51 }
52
53 pub fn build(&self) -> Result<CsvSerializer, BuildError> {
55 if self.csv.fields.is_empty() {
56 Err("At least one CSV field must be specified".into())
57 } else {
58 Ok(CsvSerializer::new(self.clone()))
59 }
60 }
61
62 pub fn input_type(&self) -> DataType {
64 DataType::Log
65 }
66
67 pub fn schema_requirement(&self) -> schema::Requirement {
69 schema::Requirement::empty()
72 }
73}
74
75#[configurable_component]
77#[derive(Debug, Clone)]
78pub struct CsvSerializerOptions {
79 #[configurable(metadata(docs::type_override = "ascii_char"))]
81 #[serde(
82 default = "default_delimiter",
83 with = "vector_core::serde::ascii_char",
84 skip_serializing_if = "vector_core::serde::is_default"
85 )]
86 pub delimiter: u8,
87
88 #[serde(
93 default = "default_double_quote",
94 skip_serializing_if = "vector_core::serde::is_default"
95 )]
96 pub double_quote: bool,
97
98 #[configurable(metadata(docs::type_override = "ascii_char"))]
105 #[serde(
106 default = "default_escape",
107 with = "vector_core::serde::ascii_char",
108 skip_serializing_if = "vector_core::serde::is_default"
109 )]
110 pub escape: u8,
111
112 #[configurable(metadata(docs::type_override = "ascii_char"))]
114 #[serde(
115 default = "default_escape",
116 with = "vector_core::serde::ascii_char",
117 skip_serializing_if = "vector_core::serde::is_default"
118 )]
119 quote: u8,
120
121 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
123 pub quote_style: QuoteStyle,
124
125 #[serde(default = "default_capacity")]
128 pub capacity: usize,
129
130 pub fields: Vec<ConfigTargetPath>,
138}
139
140const fn default_delimiter() -> u8 {
141 b','
142}
143
144const fn default_escape() -> u8 {
145 b'"'
146}
147
148const fn default_double_quote() -> bool {
149 true
150}
151
152const fn default_capacity() -> usize {
153 8 * (1 << 10)
154}
155
156impl Default for CsvSerializerOptions {
157 fn default() -> Self {
158 Self {
159 delimiter: default_delimiter(),
160 double_quote: default_double_quote(),
161 escape: default_escape(),
162 quote: default_escape(),
163 quote_style: QuoteStyle::default(),
164 capacity: default_capacity(),
165 fields: Vec::new(),
166 }
167 }
168}
169
170impl CsvSerializerOptions {
171 fn csv_quote_style(&self) -> csv_core::QuoteStyle {
172 match self.quote_style {
173 QuoteStyle::Always => csv_core::QuoteStyle::Always,
174 QuoteStyle::Necessary => csv_core::QuoteStyle::Necessary,
175 QuoteStyle::NonNumeric => csv_core::QuoteStyle::NonNumeric,
176 QuoteStyle::Never => csv_core::QuoteStyle::Never,
177 }
178 }
179}
180
181#[derive(Debug, Clone)]
183pub struct CsvSerializer {
184 writer: Box<Writer>,
187 fields: Vec<ConfigTargetPath>,
188 internal_buffer: Vec<u8>,
189}
190
191impl CsvSerializer {
192 pub fn new(config: CsvSerializerConfig) -> Self {
194 let writer = Box::new(
196 WriterBuilder::new()
197 .delimiter(config.csv.delimiter)
198 .double_quote(config.csv.double_quote)
199 .escape(config.csv.escape)
200 .quote_style(config.csv.csv_quote_style())
201 .quote(config.csv.quote)
202 .build(),
203 );
204
205 let internal_buffer = if config.csv.capacity < 1 {
206 vec![0; 1]
207 } else {
208 vec![0; config.csv.capacity]
209 };
210
211 Self {
212 writer,
213 internal_buffer,
214 fields: config.csv.fields,
215 }
216 }
217}
218
219impl Encoder<Event> for CsvSerializer {
220 type Error = vector_common::Error;
221
222 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
223 let log = event.into_log();
224
225 let mut used_buffer_bytes = 0;
226 for (fields_written, field) in self.fields.iter().enumerate() {
227 let field_value = log.get(field);
228
229 if fields_written > 0 {
231 loop {
232 let (res, bytes_written) = self
233 .writer
234 .delimiter(&mut self.internal_buffer[used_buffer_bytes..]);
235 used_buffer_bytes += bytes_written;
236 match res {
237 WriteResult::InputEmpty => {
238 break;
239 }
240 WriteResult::OutputFull => {
241 buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
242 used_buffer_bytes = 0;
243 }
244 }
245 }
246 }
247
248 let field_value = match field_value {
250 Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(),
251 Some(Value::Integer(int)) => int.to_string(),
252 Some(Value::Float(float)) => float.to_string(),
253 Some(Value::Boolean(bool)) => bool.to_string(),
254 Some(Value::Timestamp(timestamp)) => {
255 timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true)
256 }
257 Some(Value::Null) => String::new(),
258 Some(_) => String::new(),
260 None => String::new(),
261 };
262
263 let mut field_value = field_value.as_bytes();
265 loop {
267 let (res, bytes_read, bytes_written) = self
268 .writer
269 .field(field_value, &mut self.internal_buffer[used_buffer_bytes..]);
270
271 field_value = &field_value[bytes_read..];
272 used_buffer_bytes += bytes_written;
273
274 match res {
275 WriteResult::InputEmpty => break,
276 WriteResult::OutputFull => {
277 buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
278 used_buffer_bytes = 0;
279 }
280 }
281 }
282 }
283
284 loop {
286 let (res, bytes_written) = self
287 .writer
288 .finish(&mut self.internal_buffer[used_buffer_bytes..]);
289 used_buffer_bytes += bytes_written;
290 match res {
291 WriteResult::InputEmpty => break,
292 WriteResult::OutputFull => {
293 buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
294 used_buffer_bytes = 0;
295 }
296 }
297 }
298
299 if used_buffer_bytes > 0 {
301 buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
302 }
303
304 Ok(())
305 }
306}
307
308#[cfg(test)]
309mod tests {
310 use bytes::BytesMut;
311 use chrono::DateTime;
312 use ordered_float::NotNan;
313 use vector_common::btreemap;
314 use vector_core::event::{LogEvent, ObjectMap, Value};
315
316 use super::*;
317
318 fn make_event_with_fields(field_data: Vec<(&str, &str)>) -> (Vec<ConfigTargetPath>, Event) {
319 let mut fields: Vec<ConfigTargetPath> = std::vec::Vec::new();
320 let mut tree = ObjectMap::new();
321
322 for (field_name, field_value) in field_data.into_iter() {
323 let field = field_name.into();
324 fields.push(field);
325
326 let field_value = Value::from(field_value.to_string());
327 tree.insert(field_name.into(), field_value);
328 }
329
330 let event = Event::Log(LogEvent::from(tree));
331 (fields, event)
332 }
333
334 #[test]
335 fn build_error_on_empty_fields() {
336 let opts = CsvSerializerOptions::default();
337 let config = CsvSerializerConfig::new(opts);
338 let err = config.build().unwrap_err();
339 assert_eq!(err.to_string(), "At least one CSV field must be specified");
340 }
341
342 #[test]
343 fn serialize_fields() {
344 let event = Event::Log(LogEvent::from(btreemap! {
345 "foo" => Value::from("bar"),
346 "int" => Value::from(123),
347 "comma" => Value::from("abc,bcd"),
348 "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
349 "space" => Value::from("sp ace"),
350 "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
351 "quote" => Value::from("the \"quote\" should be escaped"),
352 "bool" => Value::from(true),
353 "other" => Value::from("data"),
354 }));
355 let fields = vec![
356 "foo".into(),
357 "int".into(),
358 "comma".into(),
359 "float".into(),
360 "missing".into(),
361 "space".into(),
362 "time".into(),
363 "quote".into(),
364 "bool".into(),
365 ];
366
367 let opts = CsvSerializerOptions {
368 fields,
369 ..Default::default()
370 };
371 let config = CsvSerializerConfig::new(opts);
372 let mut serializer = config.build().unwrap();
373 let mut bytes = BytesMut::new();
374
375 serializer.encode(event, &mut bytes).unwrap();
376
377 assert_eq!(
378 bytes.freeze(),
379 b"bar,123,\"abc,bcd\",3.1415925,,sp ace,2023-02-27T07:04:49.363Z,\"the \"\"quote\"\" should be escaped\",true".as_slice()
380 );
381 }
382
383 #[test]
384 fn serialize_order() {
385 let event = Event::Log(LogEvent::from(btreemap! {
386 "field1" => Value::from("value1"),
387 "field2" => Value::from("value2"),
388 "field3" => Value::from("value3"),
389 "field4" => Value::from("value4"),
390 "field5" => Value::from("value5"),
391 }));
392 let fields = vec![
393 "field1".into(),
394 "field5".into(),
395 "field5".into(),
396 "field3".into(),
397 "field2".into(),
398 ];
399 let opts = CsvSerializerOptions {
400 fields,
401 ..Default::default()
402 };
403 let config = CsvSerializerConfig::new(opts);
404 let mut serializer = config.build().unwrap();
405 let mut bytes = BytesMut::new();
406
407 serializer.encode(event, &mut bytes).unwrap();
408
409 assert_eq!(
410 bytes.freeze(),
411 b"value1,value5,value5,value3,value2".as_slice()
412 );
413 }
414
415 #[test]
416 fn correct_quoting() {
417 let event = Event::Log(LogEvent::from(btreemap! {
418 "field1" => Value::from("hello world"),
419 "field2" => Value::from(1),
420 "field3" => Value::from("foo\"bar"),
421 "field4" => Value::from("baz,bas"),
422 }));
423 let fields = vec![
424 "field1".into(),
425 "field2".into(),
426 "field3".into(),
427 "field4".into(),
428 ];
429
430 let mut default_bytes = BytesMut::new();
431 let mut never_bytes = BytesMut::new();
432 let mut always_bytes = BytesMut::new();
433 let mut non_numeric_bytes = BytesMut::new();
434
435 CsvSerializerConfig::new(CsvSerializerOptions {
436 fields: fields.clone(),
437 ..Default::default()
438 })
439 .build()
440 .unwrap()
441 .encode(event.clone(), &mut default_bytes)
442 .unwrap();
443
444 CsvSerializerConfig::new(CsvSerializerOptions {
445 fields: fields.clone(),
446 quote_style: QuoteStyle::Never,
447 ..Default::default()
448 })
449 .build()
450 .unwrap()
451 .encode(event.clone(), &mut never_bytes)
452 .unwrap();
453
454 CsvSerializerConfig::new(CsvSerializerOptions {
455 fields: fields.clone(),
456 quote_style: QuoteStyle::Always,
457 ..Default::default()
458 })
459 .build()
460 .unwrap()
461 .encode(event.clone(), &mut always_bytes)
462 .unwrap();
463
464 CsvSerializerConfig::new(CsvSerializerOptions {
465 fields: fields.clone(),
466 quote_style: QuoteStyle::NonNumeric,
467 ..Default::default()
468 })
469 .build()
470 .unwrap()
471 .encode(event.clone(), &mut non_numeric_bytes)
472 .unwrap();
473
474 assert_eq!(
475 default_bytes.freeze(),
476 b"hello world,1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
477 );
478 assert_eq!(
479 never_bytes.freeze(),
480 b"hello world,1,foo\"bar,baz,bas".as_slice()
481 );
482 assert_eq!(
483 always_bytes.freeze(),
484 b"\"hello world\",\"1\",\"foo\"\"bar\",\"baz,bas\"".as_slice()
485 );
486 assert_eq!(
487 non_numeric_bytes.freeze(),
488 b"\"hello world\",1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
489 );
490 }
491
492 #[test]
493 fn custom_delimiter() {
494 let (fields, event) =
495 make_event_with_fields(vec![("field1", "value1"), ("field2", "value2")]);
496 let opts = CsvSerializerOptions {
497 fields,
498 delimiter: b'\t',
499 ..Default::default()
500 };
501 let config = CsvSerializerConfig::new(opts);
502 let mut serializer = config.build().unwrap();
503 let mut bytes = BytesMut::new();
504
505 serializer.encode(event, &mut bytes).unwrap();
506
507 assert_eq!(bytes.freeze(), b"value1\tvalue2".as_slice());
508 }
509
510 #[test]
511 fn custom_escape_char() {
512 let (fields, event) = make_event_with_fields(vec![("field1", "foo\"bar")]);
513 let opts = CsvSerializerOptions {
514 fields,
515 double_quote: false,
516 escape: b'\\',
517 ..Default::default()
518 };
519 let config = CsvSerializerConfig::new(opts);
520 let mut serializer = config.build().unwrap();
521 let mut bytes = BytesMut::new();
522
523 serializer.encode(event, &mut bytes).unwrap();
524
525 assert_eq!(bytes.freeze(), b"\"foo\\\"bar\"".as_slice());
526 }
527
528 #[test]
529 fn custom_quote_char() {
530 let (fields, event) = make_event_with_fields(vec![("field1", "foo \" $ bar")]);
531 let opts = CsvSerializerOptions {
532 fields,
533 quote: b'$',
534 ..Default::default()
535 };
536 let config = CsvSerializerConfig::new(opts);
537 let mut serializer = config.build().unwrap();
538 let mut bytes = BytesMut::new();
539
540 serializer.encode(event, &mut bytes).unwrap();
541
542 assert_eq!(bytes.freeze(), b"$foo \" $$ bar$".as_slice());
543 }
544
545 #[test]
546 fn more_input_then_capacity() {
547 let (fields, event) = make_event_with_fields(vec![("field1", "foo bar")]);
548 let opts = CsvSerializerOptions {
549 fields,
550 capacity: 3,
551 ..Default::default()
552 };
553 let config = CsvSerializerConfig::new(opts);
554 let mut serializer = config.build().unwrap();
555 let mut bytes = BytesMut::new();
556
557 serializer.encode(event, &mut bytes).unwrap();
558
559 assert_eq!(bytes.freeze(), b"foo bar".as_slice());
560 }
561
562 #[test]
563 fn multiple_events() {
564 let (fields, event1) = make_event_with_fields(vec![("field1", "foo,")]);
565 let (_, event2) = make_event_with_fields(vec![("field1", "\nbar")]);
566 let opts = CsvSerializerOptions {
567 fields,
568 ..Default::default()
569 };
570 let config = CsvSerializerConfig::new(opts);
571 let mut serializer = config.build().unwrap();
572 let mut bytes = BytesMut::new();
573
574 serializer.encode(event1, &mut bytes).unwrap();
575 serializer.encode(event2, &mut bytes).unwrap();
576
577 assert_eq!(bytes.freeze(), b"\"foo,\"\"\nbar\"".as_slice());
578 }
579}