1use crate::encoding::BuildError;
2use bytes::BytesMut;
3use chrono::SecondsFormat;
4use csv_core::{WriteResult, Writer, WriterBuilder};
5use lookup::lookup_v2::ConfigTargetPath;
6use tokio_util::codec::Encoder;
7use vector_config_macros::configurable_component;
8use vector_core::{
9 config::DataType,
10 event::{Event, Value},
11 schema,
12};
13
14#[configurable_component]
16#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
17#[serde(rename_all = "snake_case")]
18pub enum QuoteStyle {
19 Always,
21
22 #[default]
27 Necessary,
28
29 NonNumeric,
33
34 Never,
36}
37
38#[configurable_component]
40#[derive(Debug, Clone)]
41pub struct CsvSerializerConfig {
42 pub csv: CsvSerializerOptions,
44}
45
46impl CsvSerializerConfig {
47 pub const fn new(csv: CsvSerializerOptions) -> Self {
49 Self { csv }
50 }
51
52 pub fn build(&self) -> Result<CsvSerializer, BuildError> {
54 if self.csv.fields.is_empty() {
55 Err("At least one CSV field must be specified".into())
56 } else {
57 Ok(CsvSerializer::new(self.clone()))
58 }
59 }
60
61 pub fn input_type(&self) -> DataType {
63 DataType::Log
64 }
65
66 pub fn schema_requirement(&self) -> schema::Requirement {
68 schema::Requirement::empty()
71 }
72}
73
74#[configurable_component]
76#[derive(Debug, Clone)]
77pub struct CsvSerializerOptions {
78 #[configurable(metadata(docs::type_override = "ascii_char"))]
80 #[serde(
81 default = "default_delimiter",
82 with = "vector_core::serde::ascii_char",
83 skip_serializing_if = "vector_core::serde::is_default"
84 )]
85 pub delimiter: u8,
86
87 #[serde(
92 default = "default_double_quote",
93 skip_serializing_if = "vector_core::serde::is_default"
94 )]
95 pub double_quote: bool,
96
97 #[configurable(metadata(docs::type_override = "ascii_char"))]
104 #[serde(
105 default = "default_escape",
106 with = "vector_core::serde::ascii_char",
107 skip_serializing_if = "vector_core::serde::is_default"
108 )]
109 pub escape: u8,
110
111 #[configurable(metadata(docs::type_override = "ascii_char"))]
113 #[serde(
114 default = "default_escape",
115 with = "vector_core::serde::ascii_char",
116 skip_serializing_if = "vector_core::serde::is_default"
117 )]
118 quote: u8,
119
120 #[serde(default, skip_serializing_if = "vector_core::serde::is_default")]
122 pub quote_style: QuoteStyle,
123
124 #[serde(default = "default_capacity")]
127 pub capacity: usize,
128
129 pub fields: Vec<ConfigTargetPath>,
137}
138
139const fn default_delimiter() -> u8 {
140 b','
141}
142
143const fn default_escape() -> u8 {
144 b'"'
145}
146
147const fn default_double_quote() -> bool {
148 true
149}
150
151const fn default_capacity() -> usize {
152 8 * (1 << 10)
153}
154
155impl Default for CsvSerializerOptions {
156 fn default() -> Self {
157 Self {
158 delimiter: default_delimiter(),
159 double_quote: default_double_quote(),
160 escape: default_escape(),
161 quote: default_escape(),
162 quote_style: QuoteStyle::default(),
163 capacity: default_capacity(),
164 fields: Vec::new(),
165 }
166 }
167}
168
169impl CsvSerializerOptions {
170 fn csv_quote_style(&self) -> csv_core::QuoteStyle {
171 match self.quote_style {
172 QuoteStyle::Always => csv_core::QuoteStyle::Always,
173 QuoteStyle::Necessary => csv_core::QuoteStyle::Necessary,
174 QuoteStyle::NonNumeric => csv_core::QuoteStyle::NonNumeric,
175 QuoteStyle::Never => csv_core::QuoteStyle::Never,
176 }
177 }
178}
179
180#[derive(Debug, Clone)]
182pub struct CsvSerializer {
183 writer: Box<Writer>,
186 fields: Vec<ConfigTargetPath>,
187 internal_buffer: Vec<u8>,
188}
189
190impl CsvSerializer {
191 pub fn new(config: CsvSerializerConfig) -> Self {
193 let writer = Box::new(
195 WriterBuilder::new()
196 .delimiter(config.csv.delimiter)
197 .double_quote(config.csv.double_quote)
198 .escape(config.csv.escape)
199 .quote_style(config.csv.csv_quote_style())
200 .quote(config.csv.quote)
201 .build(),
202 );
203
204 let internal_buffer = if config.csv.capacity < 1 {
205 vec![0; 1]
206 } else {
207 vec![0; config.csv.capacity]
208 };
209
210 Self {
211 writer,
212 internal_buffer,
213 fields: config.csv.fields,
214 }
215 }
216}
217
218impl Encoder<Event> for CsvSerializer {
219 type Error = vector_common::Error;
220
221 fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
222 let log = event.into_log();
223
224 let mut used_buffer_bytes = 0;
225 for (fields_written, field) in self.fields.iter().enumerate() {
226 let field_value = log.get(field);
227
228 if fields_written > 0 {
230 loop {
231 let (res, bytes_written) = self
232 .writer
233 .delimiter(&mut self.internal_buffer[used_buffer_bytes..]);
234 used_buffer_bytes += bytes_written;
235 match res {
236 WriteResult::InputEmpty => {
237 break;
238 }
239 WriteResult::OutputFull => {
240 buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
241 used_buffer_bytes = 0;
242 }
243 }
244 }
245 }
246
247 let field_value = match field_value {
249 Some(Value::Bytes(bytes)) => String::from_utf8_lossy(bytes).into_owned(),
250 Some(Value::Integer(int)) => int.to_string(),
251 Some(Value::Float(float)) => float.to_string(),
252 Some(Value::Boolean(bool)) => bool.to_string(),
253 Some(Value::Timestamp(timestamp)) => {
254 timestamp.to_rfc3339_opts(SecondsFormat::AutoSi, true)
255 }
256 Some(Value::Null) => String::new(),
257 Some(_) => String::new(),
259 None => String::new(),
260 };
261
262 let mut field_value = field_value.as_bytes();
264 loop {
266 let (res, bytes_read, bytes_written) = self
267 .writer
268 .field(field_value, &mut self.internal_buffer[used_buffer_bytes..]);
269
270 field_value = &field_value[bytes_read..];
271 used_buffer_bytes += bytes_written;
272
273 match res {
274 WriteResult::InputEmpty => break,
275 WriteResult::OutputFull => {
276 buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
277 used_buffer_bytes = 0;
278 }
279 }
280 }
281 }
282
283 loop {
285 let (res, bytes_written) = self
286 .writer
287 .finish(&mut self.internal_buffer[used_buffer_bytes..]);
288 used_buffer_bytes += bytes_written;
289 match res {
290 WriteResult::InputEmpty => break,
291 WriteResult::OutputFull => {
292 buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
293 used_buffer_bytes = 0;
294 }
295 }
296 }
297
298 if used_buffer_bytes > 0 {
300 buffer.extend_from_slice(&self.internal_buffer[..used_buffer_bytes]);
301 }
302
303 Ok(())
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use bytes::BytesMut;
310 use chrono::DateTime;
311 use ordered_float::NotNan;
312 use vector_common::btreemap;
313 use vector_core::event::{LogEvent, ObjectMap, Value};
314
315 use super::*;
316
317 fn make_event_with_fields(field_data: Vec<(&str, &str)>) -> (Vec<ConfigTargetPath>, Event) {
318 let mut fields: Vec<ConfigTargetPath> = std::vec::Vec::new();
319 let mut tree = ObjectMap::new();
320
321 for (field_name, field_value) in field_data.into_iter() {
322 let field = field_name.into();
323 fields.push(field);
324
325 let field_value = Value::from(field_value.to_string());
326 tree.insert(field_name.into(), field_value);
327 }
328
329 let event = Event::Log(LogEvent::from(tree));
330 (fields, event)
331 }
332
333 #[test]
334 fn build_error_on_empty_fields() {
335 let opts = CsvSerializerOptions::default();
336 let config = CsvSerializerConfig::new(opts);
337 let err = config.build().unwrap_err();
338 assert_eq!(err.to_string(), "At least one CSV field must be specified");
339 }
340
341 #[test]
342 fn serialize_fields() {
343 let event = Event::Log(LogEvent::from(btreemap! {
344 "foo" => Value::from("bar"),
345 "int" => Value::from(123),
346 "comma" => Value::from("abc,bcd"),
347 "float" => Value::Float(NotNan::new(3.1415925).unwrap()),
348 "space" => Value::from("sp ace"),
349 "time" => Value::Timestamp(DateTime::parse_from_rfc3339("2023-02-27T15:04:49.363+08:00").unwrap().into()),
350 "quote" => Value::from("the \"quote\" should be escaped"),
351 "bool" => Value::from(true),
352 "other" => Value::from("data"),
353 }));
354 let fields = vec![
355 "foo".into(),
356 "int".into(),
357 "comma".into(),
358 "float".into(),
359 "missing".into(),
360 "space".into(),
361 "time".into(),
362 "quote".into(),
363 "bool".into(),
364 ];
365
366 let opts = CsvSerializerOptions {
367 fields,
368 ..Default::default()
369 };
370 let config = CsvSerializerConfig::new(opts);
371 let mut serializer = config.build().unwrap();
372 let mut bytes = BytesMut::new();
373
374 serializer.encode(event, &mut bytes).unwrap();
375
376 assert_eq!(
377 bytes.freeze(),
378 b"bar,123,\"abc,bcd\",3.1415925,,sp ace,2023-02-27T07:04:49.363Z,\"the \"\"quote\"\" should be escaped\",true".as_slice()
379 );
380 }
381
382 #[test]
383 fn serialize_order() {
384 let event = Event::Log(LogEvent::from(btreemap! {
385 "field1" => Value::from("value1"),
386 "field2" => Value::from("value2"),
387 "field3" => Value::from("value3"),
388 "field4" => Value::from("value4"),
389 "field5" => Value::from("value5"),
390 }));
391 let fields = vec![
392 "field1".into(),
393 "field5".into(),
394 "field5".into(),
395 "field3".into(),
396 "field2".into(),
397 ];
398 let opts = CsvSerializerOptions {
399 fields,
400 ..Default::default()
401 };
402 let config = CsvSerializerConfig::new(opts);
403 let mut serializer = config.build().unwrap();
404 let mut bytes = BytesMut::new();
405
406 serializer.encode(event, &mut bytes).unwrap();
407
408 assert_eq!(
409 bytes.freeze(),
410 b"value1,value5,value5,value3,value2".as_slice()
411 );
412 }
413
414 #[test]
415 fn correct_quoting() {
416 let event = Event::Log(LogEvent::from(btreemap! {
417 "field1" => Value::from("hello world"),
418 "field2" => Value::from(1),
419 "field3" => Value::from("foo\"bar"),
420 "field4" => Value::from("baz,bas"),
421 }));
422 let fields = vec![
423 "field1".into(),
424 "field2".into(),
425 "field3".into(),
426 "field4".into(),
427 ];
428
429 let mut default_bytes = BytesMut::new();
430 let mut never_bytes = BytesMut::new();
431 let mut always_bytes = BytesMut::new();
432 let mut non_numeric_bytes = BytesMut::new();
433
434 CsvSerializerConfig::new(CsvSerializerOptions {
435 fields: fields.clone(),
436 ..Default::default()
437 })
438 .build()
439 .unwrap()
440 .encode(event.clone(), &mut default_bytes)
441 .unwrap();
442
443 CsvSerializerConfig::new(CsvSerializerOptions {
444 fields: fields.clone(),
445 quote_style: QuoteStyle::Never,
446 ..Default::default()
447 })
448 .build()
449 .unwrap()
450 .encode(event.clone(), &mut never_bytes)
451 .unwrap();
452
453 CsvSerializerConfig::new(CsvSerializerOptions {
454 fields: fields.clone(),
455 quote_style: QuoteStyle::Always,
456 ..Default::default()
457 })
458 .build()
459 .unwrap()
460 .encode(event.clone(), &mut always_bytes)
461 .unwrap();
462
463 CsvSerializerConfig::new(CsvSerializerOptions {
464 fields: fields.clone(),
465 quote_style: QuoteStyle::NonNumeric,
466 ..Default::default()
467 })
468 .build()
469 .unwrap()
470 .encode(event.clone(), &mut non_numeric_bytes)
471 .unwrap();
472
473 assert_eq!(
474 default_bytes.freeze(),
475 b"hello world,1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
476 );
477 assert_eq!(
478 never_bytes.freeze(),
479 b"hello world,1,foo\"bar,baz,bas".as_slice()
480 );
481 assert_eq!(
482 always_bytes.freeze(),
483 b"\"hello world\",\"1\",\"foo\"\"bar\",\"baz,bas\"".as_slice()
484 );
485 assert_eq!(
486 non_numeric_bytes.freeze(),
487 b"\"hello world\",1,\"foo\"\"bar\",\"baz,bas\"".as_slice()
488 );
489 }
490
491 #[test]
492 fn custom_delimiter() {
493 let (fields, event) =
494 make_event_with_fields(vec![("field1", "value1"), ("field2", "value2")]);
495 let opts = CsvSerializerOptions {
496 fields,
497 delimiter: b'\t',
498 ..Default::default()
499 };
500 let config = CsvSerializerConfig::new(opts);
501 let mut serializer = config.build().unwrap();
502 let mut bytes = BytesMut::new();
503
504 serializer.encode(event, &mut bytes).unwrap();
505
506 assert_eq!(bytes.freeze(), b"value1\tvalue2".as_slice());
507 }
508
509 #[test]
510 fn custom_escape_char() {
511 let (fields, event) = make_event_with_fields(vec![("field1", "foo\"bar")]);
512 let opts = CsvSerializerOptions {
513 fields,
514 double_quote: false,
515 escape: b'\\',
516 ..Default::default()
517 };
518 let config = CsvSerializerConfig::new(opts);
519 let mut serializer = config.build().unwrap();
520 let mut bytes = BytesMut::new();
521
522 serializer.encode(event, &mut bytes).unwrap();
523
524 assert_eq!(bytes.freeze(), b"\"foo\\\"bar\"".as_slice());
525 }
526
527 #[test]
528 fn custom_quote_char() {
529 let (fields, event) = make_event_with_fields(vec![("field1", "foo \" $ bar")]);
530 let opts = CsvSerializerOptions {
531 fields,
532 quote: b'$',
533 ..Default::default()
534 };
535 let config = CsvSerializerConfig::new(opts);
536 let mut serializer = config.build().unwrap();
537 let mut bytes = BytesMut::new();
538
539 serializer.encode(event, &mut bytes).unwrap();
540
541 assert_eq!(bytes.freeze(), b"$foo \" $$ bar$".as_slice());
542 }
543
544 #[test]
545 fn more_input_then_capacity() {
546 let (fields, event) = make_event_with_fields(vec![("field1", "foo bar")]);
547 let opts = CsvSerializerOptions {
548 fields,
549 capacity: 3,
550 ..Default::default()
551 };
552 let config = CsvSerializerConfig::new(opts);
553 let mut serializer = config.build().unwrap();
554 let mut bytes = BytesMut::new();
555
556 serializer.encode(event, &mut bytes).unwrap();
557
558 assert_eq!(bytes.freeze(), b"foo bar".as_slice());
559 }
560
561 #[test]
562 fn multiple_events() {
563 let (fields, event1) = make_event_with_fields(vec![("field1", "foo,")]);
564 let (_, event2) = make_event_with_fields(vec![("field1", "\nbar")]);
565 let opts = CsvSerializerOptions {
566 fields,
567 ..Default::default()
568 };
569 let config = CsvSerializerConfig::new(opts);
570 let mut serializer = config.build().unwrap();
571 let mut bytes = BytesMut::new();
572
573 serializer.encode(event1, &mut bytes).unwrap();
574 serializer.encode(event2, &mut bytes).unwrap();
575
576 assert_eq!(bytes.freeze(), b"\"foo,\"\"\nbar\"".as_slice());
577 }
578}