1use std::{
2 collections::HashMap,
3 io::{self, Read},
4 net::SocketAddr,
5 time::Duration,
6};
7
8use base64::prelude::{BASE64_STANDARD, Engine as _};
9use bytes::{Buf, Bytes, BytesMut};
10use chrono::Utc;
11use flate2::read::MultiGzDecoder;
12use rmp_serde::{Deserializer, Serializer, decode};
13use serde::{Deserialize, Serialize};
14use smallvec::{SmallVec, smallvec};
15use tokio_util::codec::Decoder;
16use vector_lib::{
17 codecs::{BytesDeserializerConfig, StreamDecodingError},
18 config::{LegacyKey, LogNamespace},
19 configurable::configurable_component,
20 ipallowlist::IpAllowlistConfig,
21 lookup::{OwnedValuePath, lookup_v2::parse_value_path, metadata_path, owned_value_path, path},
22 schema::Definition,
23};
24use vrl::value::{Kind, Value, kind::Collection};
25
26use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
27use crate::{
28 config::{
29 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
30 SourceContext, SourceOutput, log_schema,
31 },
32 event::{Event, LogEvent},
33 internal_events::{FluentMessageDecodeError, FluentMessageReceived},
34 serde::bool_or_struct,
35 tcp::TcpKeepaliveConfig,
36 tls::{MaybeTlsSettings, TlsSourceConfig},
37};
38
39mod message;
40use self::message::{FluentEntry, FluentMessage, FluentRecord, FluentTag, FluentTimestamp};
41
42#[configurable_component(source("fluent", "Collect logs from a Fluentd or Fluent Bit agent."))]
44#[derive(Clone, Debug)]
45pub struct FluentConfig {
46 #[serde(flatten)]
47 mode: FluentMode,
48
49 #[configurable(metadata(docs::hidden))]
51 #[serde(default)]
52 log_namespace: Option<bool>,
53}
54
55#[configurable_component(no_deser)]
57#[derive(Clone, Debug)]
58#[serde(tag = "mode", rename_all = "snake_case")]
59#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
60#[allow(clippy::large_enum_variant)] pub enum FluentMode {
62 Tcp(FluentTcpConfig),
64
65 #[cfg(unix)]
67 Unix(FluentUnixConfig),
68}
69
70mod deser {
76 use super::*;
77
78 #[allow(clippy::large_enum_variant)]
79 #[derive(Deserialize)]
80 #[serde(tag = "mode")]
81 enum FluentModeTagged {
82 #[serde(rename = "tcp")]
83 Tcp(FluentTcpConfig),
84
85 #[cfg(unix)]
86 #[serde(rename = "unix")]
87 Unix(FluentUnixConfig),
88 }
89
90 #[derive(Deserialize)]
91 #[serde(untagged)]
92 enum FluentModeDe {
93 Tagged(FluentModeTagged),
94
95 Untagged(FluentTcpConfig),
97 }
98
99 impl<'de> Deserialize<'de> for FluentMode {
100 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
101 where
102 D: serde::Deserializer<'de>,
103 {
104 Ok(match FluentModeDe::deserialize(deserializer)? {
105 FluentModeDe::Tagged(FluentModeTagged::Tcp(config)) => FluentMode::Tcp(config),
106 #[cfg(unix)]
107 FluentModeDe::Tagged(FluentModeTagged::Unix(config)) => FluentMode::Unix(config),
108 FluentModeDe::Untagged(config) => FluentMode::Tcp(config),
109 })
110 }
111 }
112
113 #[cfg(test)]
114 mod tests {
115 use super::*;
116
117 #[test]
118 fn test_tcp_default_mode() {
119 let json_data = serde_json::json!({
120 "address": "0.0.0.0:2020",
121 "connection_limit": 2
122 });
123
124 let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
125 assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
126 }
127
128 #[test]
129 fn test_tcp_explicit_mode() {
130 let json_data = serde_json::json!({
131 "mode": "tcp",
132 "address": "0.0.0.0:2020",
133 "connection_limit": 2
134 });
135
136 let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
137 assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
138 }
139
140 #[test]
141 fn test_invalid_unix_mode() {
142 let json_data = serde_json::json!({
143 "mode": "unix",
144 "address": "0.0.0.0:2020",
145 "connection_limit": 2
146 });
147
148 assert!(serde_json::from_value::<FluentConfig>(json_data).is_err());
149 }
150
151 #[cfg(unix)]
152 #[test]
153 fn test_valid_unix_mode() {
154 let json_data = serde_json::json!({
155 "mode": "unix",
156 "path": "/foo"
157 });
158
159 let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
160 assert!(
161 matches!(parsed.mode, FluentMode::Unix(c) if c.path.to_string_lossy() == "/foo")
162 );
163 }
164 }
165}
166
167#[configurable_component]
169#[derive(Clone, Debug)]
170#[serde(deny_unknown_fields)]
171pub struct FluentTcpConfig {
172 #[configurable(derived)]
173 address: SocketListenAddr,
174
175 #[configurable(metadata(docs::type_unit = "connections"))]
177 connection_limit: Option<u32>,
178
179 #[configurable(derived)]
180 keepalive: Option<TcpKeepaliveConfig>,
181
182 #[configurable(derived)]
183 pub permit_origin: Option<IpAllowlistConfig>,
184
185 #[configurable(metadata(docs::type_unit = "bytes"))]
189 #[configurable(metadata(docs::examples = 65536))]
190 receive_buffer_bytes: Option<usize>,
191
192 #[configurable(derived)]
193 tls: Option<TlsSourceConfig>,
194
195 #[configurable(derived)]
196 #[serde(default, deserialize_with = "bool_or_struct")]
197 acknowledgements: SourceAcknowledgementsConfig,
198}
199
200impl FluentTcpConfig {
201 fn build(
202 &self,
203 cx: SourceContext,
204 log_namespace: LogNamespace,
205 ) -> crate::Result<super::Source> {
206 let source = FluentSource::new(log_namespace);
207 let shutdown_secs = Duration::from_secs(30);
208 let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
209 let tls_client_metadata_key = self
210 .tls
211 .as_ref()
212 .and_then(|tls| tls.client_metadata_key.clone())
213 .and_then(|k| k.path);
214 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
215 source.run(
216 self.address,
217 self.keepalive,
218 shutdown_secs,
219 tls,
220 tls_client_metadata_key,
221 self.receive_buffer_bytes,
222 None,
223 cx,
224 self.acknowledgements,
225 self.connection_limit,
226 self.permit_origin.clone().map(Into::into),
227 FluentConfig::NAME,
228 log_namespace,
229 )
230 }
231}
232
233#[configurable_component]
235#[derive(Clone, Debug)]
236#[serde(deny_unknown_fields)]
237#[cfg(unix)]
238pub struct FluentUnixConfig {
239 #[configurable(metadata(docs::examples = "/path/to/socket"))]
243 pub path: std::path::PathBuf,
244
245 #[configurable(metadata(docs::examples = 0o777))]
250 #[configurable(metadata(docs::examples = 0o600))]
251 #[configurable(metadata(docs::examples = 508))]
252 pub socket_file_mode: Option<u32>,
253}
254
255#[cfg(unix)]
256impl FluentUnixConfig {
257 fn build(
258 &self,
259 cx: SourceContext,
260 log_namespace: LogNamespace,
261 ) -> crate::Result<super::Source> {
262 let source = FluentSource::new(log_namespace);
263
264 crate::sources::util::build_unix_stream_source(
265 self.path.clone(),
266 self.socket_file_mode,
267 source.decoder(),
268 move |events, host| source.handle_events_impl(events, host.into()),
269 cx.shutdown,
270 cx.out,
271 )
272 }
273}
274
275impl GenerateConfig for FluentConfig {
276 fn generate_config() -> toml::Value {
277 toml::Value::try_from(Self {
278 mode: FluentMode::Tcp(FluentTcpConfig {
279 address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
280 keepalive: None,
281 permit_origin: None,
282 tls: None,
283 receive_buffer_bytes: None,
284 acknowledgements: Default::default(),
285 connection_limit: Some(2),
286 }),
287 log_namespace: None,
288 })
289 .unwrap()
290 }
291}
292
293#[async_trait::async_trait]
294#[typetag::serde(name = "fluent")]
295impl SourceConfig for FluentConfig {
296 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
297 let log_namespace = cx.log_namespace(self.log_namespace);
298 match &self.mode {
299 FluentMode::Tcp(t) => t.build(cx, log_namespace),
300 #[cfg(unix)]
301 FluentMode::Unix(u) => u.build(cx, log_namespace),
302 }
303 }
304
305 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
306 let log_namespace = global_log_namespace.merge(self.log_namespace);
307 let schema_definition = self.schema_definition(log_namespace);
308
309 vec![SourceOutput::new_maybe_logs(
310 DataType::Log,
311 schema_definition,
312 )]
313 }
314
315 fn resources(&self) -> Vec<Resource> {
316 match &self.mode {
317 FluentMode::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
318 #[cfg(unix)]
319 FluentMode::Unix(_) => vec![],
320 }
321 }
322
323 fn can_acknowledge(&self) -> bool {
324 matches!(self.mode, FluentMode::Tcp(_))
325 }
326}
327
328impl FluentConfig {
329 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
331 let host_key = log_schema()
333 .host_key()
334 .cloned()
335 .map(LegacyKey::InsertIfEmpty);
336
337 let tag_key = parse_value_path("tag").ok().map(LegacyKey::Overwrite);
338
339 let tls_client_metadata_path = match &self.mode {
340 FluentMode::Tcp(tcp) => tcp
341 .tls
342 .as_ref()
343 .and_then(|tls| tls.client_metadata_key.as_ref())
344 .and_then(|k| k.path.clone())
345 .map(LegacyKey::Overwrite),
346 #[cfg(unix)]
347 FluentMode::Unix(_) => None,
348 };
349
350 let mut schema_definition = BytesDeserializerConfig
353 .schema_definition(log_namespace)
354 .with_standard_vector_source_metadata()
355 .with_source_metadata(
356 FluentConfig::NAME,
357 host_key,
358 &owned_value_path!("host"),
359 Kind::bytes(),
360 Some("host"),
361 )
362 .with_source_metadata(
363 FluentConfig::NAME,
364 tag_key,
365 &owned_value_path!("tag"),
366 Kind::bytes(),
367 None,
368 )
369 .with_source_metadata(
370 FluentConfig::NAME,
371 None,
372 &owned_value_path!("timestamp"),
373 Kind::timestamp(),
374 Some("timestamp"),
375 )
376 .with_source_metadata(
378 FluentConfig::NAME,
379 None,
380 &owned_value_path!("record"),
381 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
382 None,
383 )
384 .with_source_metadata(
385 Self::NAME,
386 tls_client_metadata_path,
387 &owned_value_path!("tls_client_metadata"),
388 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
389 None,
390 );
391
392 if log_namespace == LogNamespace::Legacy {
394 schema_definition = schema_definition.unknown_fields(Kind::bytes());
395 }
396
397 schema_definition
398 }
399}
400
401#[derive(Debug, Clone)]
402struct FluentSource {
403 log_namespace: LogNamespace,
404 legacy_host_key_path: Option<OwnedValuePath>,
405}
406
407impl FluentSource {
408 fn new(log_namespace: LogNamespace) -> Self {
409 Self {
410 log_namespace,
411 legacy_host_key_path: log_schema().host_key().cloned(),
412 }
413 }
414
415 fn handle_events_impl(&self, events: &mut [Event], host: Value) {
416 for event in events {
417 let log = event.as_mut_log();
418
419 let legacy_host_key = self
420 .legacy_host_key_path
421 .as_ref()
422 .map(LegacyKey::InsertIfEmpty);
423
424 self.log_namespace.insert_source_metadata(
425 FluentConfig::NAME,
426 log,
427 legacy_host_key,
428 path!("host"),
429 host.clone(),
430 );
431 }
432 }
433}
434
435impl TcpSource for FluentSource {
436 type Error = DecodeError;
437 type Item = FluentFrame;
438 type Decoder = FluentDecoder;
439 type Acker = FluentAcker;
440
441 fn decoder(&self) -> Self::Decoder {
442 FluentDecoder::new(self.log_namespace)
443 }
444
445 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
446 self.handle_events_impl(events, host.ip().to_string().into())
447 }
448
449 fn build_acker(&self, frame: &[Self::Item]) -> Self::Acker {
450 FluentAcker::new(frame)
451 }
452}
453
454#[derive(Debug)]
455pub enum DecodeError {
456 IO(io::Error),
457 Decode(decode::Error),
458 UnknownCompression(String),
459 UnexpectedValue(rmpv::Value),
460}
461
462impl std::fmt::Display for DecodeError {
463 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464 match self {
465 DecodeError::IO(err) => write!(f, "{err}"),
466 DecodeError::Decode(err) => write!(f, "{err}"),
467 DecodeError::UnknownCompression(compression) => {
468 write!(f, "unknown compression: {compression}")
469 }
470 DecodeError::UnexpectedValue(value) => {
471 write!(f, "unexpected msgpack value, ignoring: {value}")
472 }
473 }
474 }
475}
476
477impl StreamDecodingError for DecodeError {
478 fn can_continue(&self) -> bool {
479 match self {
480 DecodeError::IO(_) => false,
481 DecodeError::Decode(_) => true,
482 DecodeError::UnknownCompression(_) => true,
483 DecodeError::UnexpectedValue(_) => true,
484 }
485 }
486}
487
488impl From<io::Error> for DecodeError {
489 fn from(e: io::Error) -> Self {
490 DecodeError::IO(e)
491 }
492}
493
494impl From<decode::Error> for DecodeError {
495 fn from(e: decode::Error) -> Self {
496 DecodeError::Decode(e)
497 }
498}
499
500#[derive(Debug, Clone)]
501struct FluentDecoder {
502 log_namespace: LogNamespace,
503}
504
505impl FluentDecoder {
506 const fn new(log_namespace: LogNamespace) -> Self {
507 Self { log_namespace }
508 }
509
510 fn handle_message(
511 &mut self,
512 message: Result<FluentMessage, DecodeError>,
513 byte_size: usize,
514 ) -> Result<Option<(FluentFrame, usize)>, DecodeError> {
515 let log_namespace = &self.log_namespace;
516
517 match message? {
518 FluentMessage::Message(tag, timestamp, record) => {
519 let event = Event::from(FluentEvent {
520 tag,
521 timestamp,
522 record,
523 log_namespace,
524 });
525 let frame = FluentFrame {
526 events: smallvec![event],
527 chunk: None,
528 };
529 Ok(Some((frame, byte_size)))
530 }
531 FluentMessage::MessageWithOptions(tag, timestamp, record, options) => {
532 let event = Event::from(FluentEvent {
533 tag,
534 timestamp,
535 record,
536 log_namespace,
537 });
538 let frame = FluentFrame {
539 events: smallvec![event],
540 chunk: options.chunk,
541 };
542 Ok(Some((frame, byte_size)))
543 }
544 FluentMessage::Forward(tag, entries) => {
545 let events = entries
546 .into_iter()
547 .map(|FluentEntry(timestamp, record)| {
548 Event::from(FluentEvent {
549 tag: tag.clone(),
550 timestamp,
551 record,
552 log_namespace,
553 })
554 })
555 .collect();
556 let frame = FluentFrame {
557 events,
558 chunk: None,
559 };
560 Ok(Some((frame, byte_size)))
561 }
562 FluentMessage::ForwardWithOptions(tag, entries, options) => {
563 let events = entries
564 .into_iter()
565 .map(|FluentEntry(timestamp, record)| {
566 Event::from(FluentEvent {
567 tag: tag.clone(),
568 timestamp,
569 record,
570 log_namespace,
571 })
572 })
573 .collect();
574 let frame = FluentFrame {
575 events,
576 chunk: options.chunk,
577 };
578 Ok(Some((frame, byte_size)))
579 }
580 FluentMessage::PackedForward(tag, bin) => {
581 let mut buf = BytesMut::from(&bin[..]);
582
583 let mut events = smallvec![];
584 while let Some(FluentEntry(timestamp, record)) =
585 FluentEntryStreamDecoder.decode(&mut buf)?
586 {
587 events.push(Event::from(FluentEvent {
588 tag: tag.clone(),
589 timestamp,
590 record,
591 log_namespace,
592 }));
593 }
594 let frame = FluentFrame {
595 events,
596 chunk: None,
597 };
598 Ok(Some((frame, byte_size)))
599 }
600 FluentMessage::PackedForwardWithOptions(tag, bin, options) => {
601 let buf = match options.compressed.as_deref() {
602 Some("gzip") => {
603 let mut buf = Vec::new();
604 MultiGzDecoder::new(io::Cursor::new(bin.into_vec()))
605 .read_to_end(&mut buf)
606 .map(|_| buf)
607 .map_err(Into::into)
608 }
609 Some("text") | None => Ok(bin.into_vec()),
610 Some(s) => Err(DecodeError::UnknownCompression(s.to_owned())),
611 }?;
612
613 let mut buf = BytesMut::from(&buf[..]);
614
615 let mut events = smallvec![];
616 while let Some(FluentEntry(timestamp, record)) =
617 FluentEntryStreamDecoder.decode(&mut buf)?
618 {
619 events.push(Event::from(FluentEvent {
620 tag: tag.clone(),
621 timestamp,
622 record,
623 log_namespace,
624 }));
625 }
626 let frame = FluentFrame {
627 events,
628 chunk: options.chunk,
629 };
630 Ok(Some((frame, byte_size)))
631 }
632 FluentMessage::Heartbeat(rmpv::Value::Nil) => Ok(None),
633 FluentMessage::Heartbeat(value) => Err(DecodeError::UnexpectedValue(value)),
634 }
635 }
636}
637
638impl Decoder for FluentDecoder {
639 type Item = (FluentFrame, usize);
640 type Error = DecodeError;
641
642 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
643 loop {
644 if src.is_empty() {
645 return Ok(None);
646 }
647
648 let (byte_size, res) = {
649 let mut des = Deserializer::new(io::Cursor::new(&src[..]));
650
651 let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
652
653 if let Err(DecodeError::Decode(
655 decode::Error::InvalidDataRead(ref custom)
656 | decode::Error::InvalidMarkerRead(ref custom),
657 )) = res
658 && custom.kind() == io::ErrorKind::UnexpectedEof
659 {
660 return Ok(None);
661 }
662
663 (des.position() as usize, res)
664 };
665
666 src.advance(byte_size);
667
668 let maybe_item = self.handle_message(res, byte_size).inspect_err(|error| {
669 let base64_encoded_message = BASE64_STANDARD.encode(&src[..]);
670 emit!(FluentMessageDecodeError {
671 error,
672 base64_encoded_message
673 });
674 })?;
675 if let Some(item) = maybe_item {
676 return Ok(Some(item));
677 }
678 }
679 }
680}
681
682#[derive(Clone, Debug)]
684struct FluentEntryStreamDecoder;
685
686impl Decoder for FluentEntryStreamDecoder {
687 type Item = FluentEntry;
688 type Error = DecodeError;
689
690 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
691 if src.is_empty() {
692 return Ok(None);
693 }
694 let (byte_size, res) = {
695 let mut des = Deserializer::new(io::Cursor::new(&src[..]));
696
697 let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
699
700 if let Err(DecodeError::Decode(decode::Error::InvalidDataRead(ref custom))) = res
701 && custom.kind() == io::ErrorKind::UnexpectedEof
702 {
703 return Ok(None);
704 }
705
706 let byte_size = des.position();
707
708 emit!(FluentMessageReceived { byte_size });
709
710 (byte_size as usize, res)
711 };
712
713 src.advance(byte_size);
714
715 res
716 }
717}
718
719struct FluentAcker {
720 chunks: Vec<String>,
721}
722
723impl FluentAcker {
724 fn new(frames: &[FluentFrame]) -> Self {
725 Self {
726 chunks: frames.iter().filter_map(|f| f.chunk.clone()).collect(),
727 }
728 }
729}
730
731impl TcpSourceAcker for FluentAcker {
732 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
733 if self.chunks.is_empty() {
734 return None;
735 }
736
737 let mut buf = Vec::new();
738 let mut ser = Serializer::new(&mut buf);
739 let mut ack_map = HashMap::new();
740
741 for chunk in self.chunks {
742 ack_map.clear();
743 if let TcpSourceAck::Ack = ack {
744 ack_map.insert("ack", chunk);
745 };
746 ack_map.serialize(&mut ser).unwrap();
747 }
748 Some(buf.into())
749 }
750}
751
752#[derive(Debug, PartialEq)]
754struct FluentEvent<'a> {
755 tag: FluentTag,
756 timestamp: FluentTimestamp,
757 record: FluentRecord,
758 log_namespace: &'a LogNamespace,
759}
760
761impl From<FluentEvent<'_>> for Event {
762 fn from(frame: FluentEvent) -> Event {
763 LogEvent::from(frame).into()
764 }
765}
766
767struct FluentFrame {
768 events: SmallVec<[Event; 1]>,
769 chunk: Option<String>,
770}
771
772impl From<FluentFrame> for SmallVec<[Event; 1]> {
773 fn from(frame: FluentFrame) -> Self {
774 frame.events
775 }
776}
777
778impl From<FluentEvent<'_>> for LogEvent {
779 fn from(frame: FluentEvent) -> LogEvent {
780 let FluentEvent {
781 tag,
782 timestamp,
783 record,
784 log_namespace,
785 } = frame;
786
787 let mut log = LogEvent::default();
788
789 log_namespace.insert_vector_metadata(
790 &mut log,
791 log_schema().source_type_key(),
792 path!("source_type"),
793 Bytes::from_static(FluentConfig::NAME.as_bytes()),
794 );
795
796 match log_namespace {
797 LogNamespace::Vector => {
798 log.insert(metadata_path!(FluentConfig::NAME, "timestamp"), timestamp);
799 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
800 }
801 LogNamespace::Legacy => {
802 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
803 }
804 }
805
806 log_namespace.insert_source_metadata(
807 FluentConfig::NAME,
808 &mut log,
809 Some(LegacyKey::Overwrite(path!("tag"))),
810 path!("tag"),
811 tag,
812 );
813
814 for (key, value) in record.into_iter() {
815 let value: Value = value.into();
816 log_namespace.insert_source_metadata(
817 FluentConfig::NAME,
818 &mut log,
819 Some(LegacyKey::Overwrite(path!(key.as_str()))),
820 path!("record", key.as_str()),
821 value,
822 );
823 }
824 log
825 }
826}
827
828#[cfg(test)]
829mod tests {
830 use bytes::BytesMut;
831 use chrono::{DateTime, Utc};
832 use rmp_serde::Serializer;
833 use serde::Serialize;
834 use tokio::{
835 io::{AsyncReadExt, AsyncWriteExt},
836 time::{Duration, error::Elapsed, timeout},
837 };
838 use tokio_util::codec::Decoder;
839 use vector_lib::{assert_event_data_eq, lookup::OwnedTargetPath, schema::Definition};
840 use vrl::value::{ObjectMap, Value, kind::Collection};
841
842 use super::{message::FluentMessageOptions, *};
843 use crate::{
844 SourceSender,
845 config::{SourceConfig, SourceContext},
846 event::EventStatus,
847 test_util::{self, next_addr, trace_init, wait_for_tcp},
848 };
849
850 #[test]
851 fn generate_config() {
852 crate::test_util::test_generate_config::<FluentConfig>();
853 }
854
855 fn mock_event(name: &str, timestamp: &str) -> Event {
861 Event::Log(LogEvent::from(ObjectMap::from([
862 ("message".into(), Value::from(name)),
863 (
864 log_schema().source_type_key().unwrap().to_string().into(),
865 Value::from(FluentConfig::NAME),
866 ),
867 ("tag".into(), Value::from("tag.name")),
868 (
869 "timestamp".into(),
870 Value::Timestamp(DateTime::parse_from_rfc3339(timestamp).unwrap().into()),
871 ),
872 ])))
873 }
874
875 #[test]
876 fn decode_message_mode() {
877 let message: Vec<u8> = vec![
883 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
884 101, 115, 115, 97, 103, 101, 163, 98, 97, 114,
885 ];
886
887 let expected = mock_event("bar", "2015-09-07T01:23:04Z");
888 let got = decode_all(message.clone()).unwrap();
889 assert_event_data_eq!(got.0[0], expected);
890 assert_eq!(got.1, message.len());
891 }
892
893 #[test]
894 fn decode_message_mode_with_options() {
895 let message: Vec<u8> = vec![
902 148, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
903 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 129, 164, 115, 105, 122, 101, 1,
904 ];
905
906 let expected = mock_event("bar", "2015-09-07T01:23:04Z");
907 let got = decode_all(message.clone()).unwrap();
908 assert_eq!(got.1, message.len());
909 assert_event_data_eq!(got.0[0], expected);
910 }
911
912 #[test]
913 fn decode_forward_mode() {
914 let message: Vec<u8> = vec![
923 146, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
924 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
925 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
926 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122,
927 ];
928
929 let expected = vec![
930 mock_event("foo", "2015-09-07T01:23:04Z"),
931 mock_event("bar", "2015-09-07T01:23:05Z"),
932 mock_event("baz", "2015-09-07T01:23:06Z"),
933 ];
934 let got = decode_all(message.clone()).unwrap();
935
936 assert_eq!(got.1, message.len());
937 assert_event_data_eq!(got.0[0], expected[0]);
938 assert_event_data_eq!(got.0[1], expected[1]);
939 assert_event_data_eq!(got.0[2], expected[2]);
940 }
941
942 #[test]
943 fn decode_forward_mode_with_options() {
944 let message: Vec<u8> = vec![
954 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
955 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
956 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
957 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 164, 115, 105,
958 122, 101, 3,
959 ];
960
961 let expected = vec![
962 mock_event("foo", "2015-09-07T01:23:04Z"),
963 mock_event("bar", "2015-09-07T01:23:05Z"),
964 mock_event("baz", "2015-09-07T01:23:06Z"),
965 ];
966
967 let got = decode_all(message.clone()).unwrap();
968
969 assert_eq!(got.1, message.len());
970
971 assert_event_data_eq!(got.0[0], expected[0]);
972 assert_event_data_eq!(got.0[1], expected[1]);
973 assert_event_data_eq!(got.0[2], expected[2]);
974 }
975
976 #[test]
977 fn decode_packed_forward_mode() {
978 let message: Vec<u8> = vec![
988 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 57, 146, 206, 85, 236, 230, 248,
989 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230,
990 249, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236,
991 230, 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 167, 109,
992 101, 115, 115, 97, 103, 101, 163, 102, 111, 111,
993 ];
994
995 let expected = vec![
996 mock_event("foo", "2015-09-07T01:23:04Z"),
997 mock_event("bar", "2015-09-07T01:23:05Z"),
998 mock_event("baz", "2015-09-07T01:23:06Z"),
999 ];
1000
1001 let got = decode_all(message.clone()).unwrap();
1002
1003 assert_eq!(got.1, message.len());
1004 assert_event_data_eq!(got.0[0], expected[0]);
1005 assert_event_data_eq!(got.0[1], expected[1]);
1006 assert_event_data_eq!(got.0[2], expected[2]);
1007 }
1008
1009 #[test]
1011 fn decode_compressed_packed_forward_mode() {
1012 let message: Vec<u8> = vec![
1023 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 55, 31, 139, 8, 0, 245, 10, 168,
1024 96, 0, 3, 155, 116, 46, 244, 205, 179, 31, 141, 203, 115, 83, 139, 139, 19, 211, 83,
1025 23, 167, 229, 231, 79, 2, 9, 253, 68, 8, 37, 37, 22, 129, 133, 126, 33, 11, 85, 1, 0,
1026 53, 3, 158, 28, 57, 0, 0, 0, 129, 170, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100,
1027 164, 103, 122, 105, 112,
1028 ];
1029
1030 let expected = vec![
1031 mock_event("foo", "2015-09-07T01:23:04Z"),
1032 mock_event("bar", "2015-09-07T01:23:05Z"),
1033 mock_event("baz", "2015-09-07T01:23:06Z"),
1034 ];
1035
1036 let got = decode_all(message.clone()).unwrap();
1037
1038 assert_eq!(got.1, message.len());
1039 assert_event_data_eq!(got.0[0], expected[0]);
1040 assert_event_data_eq!(got.0[1], expected[1]);
1041 assert_event_data_eq!(got.0[2], expected[2]);
1042 }
1043
1044 fn decode_all(message: Vec<u8>) -> Result<(SmallVec<[Event; 1]>, usize), DecodeError> {
1045 let mut buf = BytesMut::from(&message[..]);
1046
1047 let mut decoder = FluentDecoder::new(LogNamespace::default());
1048
1049 let (frame, byte_size) = decoder.decode(&mut buf)?.unwrap();
1050 Ok((frame.into(), byte_size))
1051 }
1052
1053 #[tokio::test]
1054 async fn ack_delivered_without_chunk() {
1055 let (result, output) = check_acknowledgements(EventStatus::Delivered, false).await;
1056 assert!(result.is_err()); assert!(output.is_empty());
1058 }
1059
1060 #[tokio::test]
1061 async fn ack_delivered_with_chunk() {
1062 let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
1063 assert_eq!(result.unwrap().unwrap(), output.len());
1064 let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; assert_eq!(output[..expected.len()], expected);
1066 }
1067
1068 #[tokio::test]
1069 async fn ack_failed_without_chunk() {
1070 let (result, output) = check_acknowledgements(EventStatus::Rejected, false).await;
1071 assert_eq!(result.unwrap().unwrap(), output.len());
1072 assert!(output.is_empty());
1073 }
1074
1075 #[tokio::test]
1076 async fn ack_failed_with_chunk() {
1077 let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
1078 assert_eq!(result.unwrap().unwrap(), output.len());
1079 let expected: Vec<u8> = vec![0x80]; assert_eq!(output, expected);
1081 }
1082
1083 async fn check_acknowledgements(
1084 status: EventStatus,
1085 with_chunk: bool,
1086 ) -> (Result<Result<usize, std::io::Error>, Elapsed>, Bytes) {
1087 trace_init();
1088
1089 let (sender, recv) = SourceSender::new_test_finalize(status);
1090 let address = next_addr();
1091 let source = FluentConfig {
1092 mode: FluentMode::Tcp(FluentTcpConfig {
1093 address: address.into(),
1094 tls: None,
1095 keepalive: None,
1096 permit_origin: None,
1097 receive_buffer_bytes: None,
1098 acknowledgements: true.into(),
1099 connection_limit: None,
1100 }),
1101 log_namespace: None,
1102 }
1103 .build(SourceContext::new_test(sender, None))
1104 .await
1105 .unwrap();
1106 tokio::spawn(source);
1107 wait_for_tcp(address).await;
1108
1109 let msg = uuid::Uuid::new_v4().to_string();
1110 let tag = uuid::Uuid::new_v4().to_string();
1111 let req = build_req(&tag, &[("field", &msg)], with_chunk);
1112
1113 let sender = tokio::spawn(async move {
1114 let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
1115 socket.write_all(&req).await.unwrap();
1116
1117 let mut output = BytesMut::new();
1118 (
1119 timeout(Duration::from_millis(250), socket.read_buf(&mut output)).await,
1120 output,
1121 )
1122 });
1123 let events = test_util::collect_n(recv, 1).await;
1124 let (result, output) = sender.await.unwrap();
1125
1126 assert_eq!(events.len(), 1);
1127 let log = events[0].as_log();
1128 assert_eq!(log.get("field").unwrap(), &msg.into());
1129 assert!(matches!(log.get("host").unwrap(), Value::Bytes(_)));
1130 assert!(matches!(log.get("timestamp").unwrap(), Value::Timestamp(_)));
1131 assert_eq!(log.get("tag").unwrap(), &tag.into());
1132
1133 (result, output.into())
1134 }
1135
1136 fn build_req(tag: &str, fields: &[(&str, &str)], with_chunk: bool) -> Vec<u8> {
1137 let mut record = FluentRecord::default();
1138 for (tag, value) in fields {
1139 record.insert((*tag).into(), rmpv::Value::String((*value).into()).into());
1140 }
1141 let chunk = with_chunk.then(|| BASE64_STANDARD.encode(uuid::Uuid::new_v4().as_bytes()));
1142 let req = FluentMessage::MessageWithOptions(
1143 tag.into(),
1144 FluentTimestamp::Unix(Utc::now()),
1145 record,
1146 FluentMessageOptions {
1147 chunk,
1148 ..Default::default()
1149 },
1150 );
1151 let mut buf = Vec::new();
1152 req.serialize(&mut Serializer::new(&mut buf)).unwrap();
1153 buf
1154 }
1155
1156 #[test]
1157 fn output_schema_definition_vector_namespace() {
1158 let config = FluentConfig {
1159 mode: FluentMode::Tcp(FluentTcpConfig {
1160 address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1161 tls: None,
1162 keepalive: None,
1163 permit_origin: None,
1164 receive_buffer_bytes: None,
1165 acknowledgements: false.into(),
1166 connection_limit: None,
1167 }),
1168 log_namespace: Some(true),
1169 };
1170
1171 let definitions = config
1172 .outputs(LogNamespace::Vector)
1173 .remove(0)
1174 .schema_definition(true);
1175
1176 let expected_definition =
1177 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1178 .with_meaning(OwnedTargetPath::event_root(), "message")
1179 .with_metadata_field(
1180 &owned_value_path!("vector", "source_type"),
1181 Kind::bytes(),
1182 None,
1183 )
1184 .with_metadata_field(&owned_value_path!("fluent", "tag"), Kind::bytes(), None)
1185 .with_metadata_field(
1186 &owned_value_path!("fluent", "timestamp"),
1187 Kind::timestamp(),
1188 Some("timestamp"),
1189 )
1190 .with_metadata_field(
1191 &owned_value_path!("fluent", "record"),
1192 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1193 None,
1194 )
1195 .with_metadata_field(
1196 &owned_value_path!("vector", "ingest_timestamp"),
1197 Kind::timestamp(),
1198 None,
1199 )
1200 .with_metadata_field(
1201 &owned_value_path!("fluent", "host"),
1202 Kind::bytes(),
1203 Some("host"),
1204 )
1205 .with_metadata_field(
1206 &owned_value_path!("fluent", "tls_client_metadata"),
1207 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1208 None,
1209 );
1210
1211 assert_eq!(definitions, Some(expected_definition))
1212 }
1213
1214 #[test]
1215 fn output_schema_definition_legacy_namespace() {
1216 let config = FluentConfig {
1217 mode: FluentMode::Tcp(FluentTcpConfig {
1218 address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1219 tls: None,
1220 keepalive: None,
1221 permit_origin: None,
1222 receive_buffer_bytes: None,
1223 acknowledgements: false.into(),
1224 connection_limit: None,
1225 }),
1226 log_namespace: None,
1227 };
1228
1229 let definitions = config
1230 .outputs(LogNamespace::Legacy)
1231 .remove(0)
1232 .schema_definition(true);
1233
1234 let expected_definition = Definition::new_with_default_metadata(
1235 Kind::object(Collection::empty()),
1236 [LogNamespace::Legacy],
1237 )
1238 .with_event_field(
1239 &owned_value_path!("message"),
1240 Kind::bytes(),
1241 Some("message"),
1242 )
1243 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1244 .with_event_field(&owned_value_path!("tag"), Kind::bytes(), None)
1245 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1246 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
1247 .unknown_fields(Kind::bytes());
1248
1249 assert_eq!(definitions, Some(expected_definition))
1250 }
1251}
1252
1253#[cfg(all(test, feature = "fluent-integration-tests"))]
1254mod integration_tests {
1255 use std::{fs::File, io::Write, net::SocketAddr, time::Duration};
1256
1257 use futures::Stream;
1258 use tokio::time::sleep;
1259 use vector_lib::event::{Event, EventStatus};
1260
1261 use crate::{
1262 SourceSender,
1263 config::{SourceConfig, SourceContext},
1264 docker::Container,
1265 sources::fluent::{FluentConfig, FluentMode, FluentTcpConfig},
1266 test_util::{
1267 collect_ready,
1268 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
1269 next_addr, next_addr_for_ip, random_string, wait_for_tcp,
1270 },
1271 };
1272
1273 const FLUENT_BIT_IMAGE: &str = "fluent/fluent-bit";
1274 const FLUENT_BIT_TAG: &str = "1.7";
1275 const FLUENTD_IMAGE: &str = "fluent/fluentd";
1276 const FLUENTD_TAG: &str = "v1.12";
1277
1278 fn make_file(name: &str, content: &str) -> tempfile::TempDir {
1279 let dir = tempfile::tempdir().unwrap();
1280 let mut file = File::create(dir.path().join(name)).unwrap();
1281 write!(&mut file, "{content}").unwrap();
1282 dir
1283 }
1284
1285 #[tokio::test]
1286 async fn fluentbit() {
1287 test_fluentbit(EventStatus::Delivered).await;
1288 }
1289
1290 #[tokio::test]
1291 async fn fluentbit_rejection() {
1292 test_fluentbit(EventStatus::Rejected).await;
1293 }
1294
1295 async fn test_fluentbit(status: EventStatus) {
1296 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1297 let test_address = next_addr();
1298 let (out, source_address) = source(status).await;
1299
1300 let dir = make_file(
1301 "fluent-bit.conf",
1302 &format!(
1303 r#"
1304[SERVICE]
1305 Grace 0
1306 Flush 1
1307 Daemon off
1308
1309[INPUT]
1310 Name http
1311 Host {listen_host}
1312 Port {listen_port}
1313
1314[OUTPUT]
1315 Name forward
1316 Match *
1317 Host host.docker.internal
1318 Port {send_port}
1319 Require_ack_response true
1320 "#,
1321 listen_host = test_address.ip(),
1322 listen_port = test_address.port(),
1323 send_port = source_address.port(),
1324 ),
1325 );
1326
1327 let msg = random_string(64);
1328 let body = serde_json::json!({ "message": msg });
1329
1330 let events = Container::new(FLUENT_BIT_IMAGE, FLUENT_BIT_TAG)
1331 .bind(dir.path().display(), "/fluent-bit/etc")
1332 .run(async move {
1333 wait_for_tcp(test_address).await;
1334 reqwest::Client::new()
1335 .post(format!("http://{test_address}/"))
1336 .header("content-type", "application/json")
1337 .body(body.to_string())
1338 .send()
1339 .await
1340 .unwrap();
1341 sleep(Duration::from_secs(2)).await;
1342
1343 collect_ready(out).await
1344 })
1345 .await;
1346
1347 assert_eq!(events.len(), 1);
1348 let log = events[0].as_log();
1349 assert_eq!(log["tag"], "http.0".into());
1350 assert_eq!(log["message"], msg.into());
1351 assert!(log.get("timestamp").is_some());
1352 assert!(log.get("host").is_some());
1353 })
1354 .await;
1355 }
1356
1357 #[tokio::test]
1358 async fn fluentd() {
1359 test_fluentd(EventStatus::Delivered, "").await;
1360 }
1361
1362 #[tokio::test]
1363 async fn fluentd_gzip() {
1364 test_fluentd(EventStatus::Delivered, "compress gzip").await;
1365 }
1366
1367 #[tokio::test]
1368 async fn fluentd_rejection() {
1369 test_fluentd(EventStatus::Rejected, "").await;
1370 }
1371
1372 async fn test_fluentd(status: EventStatus, options: &str) {
1373 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1374 let test_address = next_addr();
1375 let (out, source_address) = source(status).await;
1376
1377 let config = format!(
1378 r#"
1379<source>
1380 @type http
1381 bind {http_host}
1382 port {http_port}
1383</source>
1384
1385<match *>
1386 @type forward
1387 <server>
1388 name local
1389 host host.docker.internal
1390 port {port}
1391 </server>
1392 <buffer>
1393 flush_mode immediate
1394 </buffer>
1395 require_ack_response true
1396 ack_response_timeout 1
1397 {options}
1398</match>
1399"#,
1400 http_host = test_address.ip(),
1401 http_port = test_address.port(),
1402 port = source_address.port(),
1403 options = options
1404 );
1405
1406 let dir = make_file("fluent.conf", &config);
1407
1408 let msg = random_string(64);
1409 let body = serde_json::json!({ "message": msg });
1410
1411 let events = Container::new(FLUENTD_IMAGE, FLUENTD_TAG)
1412 .bind(dir.path().display(), "/fluentd/etc")
1413 .run(async move {
1414 wait_for_tcp(test_address).await;
1415 reqwest::Client::new()
1416 .post(format!("http://{test_address}/"))
1417 .header("content-type", "application/json")
1418 .body(body.to_string())
1419 .send()
1420 .await
1421 .unwrap();
1422 sleep(Duration::from_secs(2)).await;
1423 collect_ready(out).await
1424 })
1425 .await;
1426
1427 assert_eq!(events.len(), 1);
1428 assert_eq!(events[0].as_log()["tag"], "".into());
1429 assert_eq!(events[0].as_log()["message"], msg.into());
1430 assert!(events[0].as_log().get("timestamp").is_some());
1431 assert!(events[0].as_log().get("host").is_some());
1432 })
1433 .await;
1434 }
1435
1436 async fn source(status: EventStatus) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
1437 let (sender, recv) = SourceSender::new_test_finalize(status);
1438 let address = next_addr_for_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
1439 tokio::spawn(async move {
1440 FluentConfig {
1441 mode: FluentMode::Tcp(FluentTcpConfig {
1442 address: address.into(),
1443 tls: None,
1444 keepalive: None,
1445 permit_origin: None,
1446 receive_buffer_bytes: None,
1447 acknowledgements: false.into(),
1448 connection_limit: None,
1449 }),
1450 log_namespace: None,
1451 }
1452 .build(SourceContext::new_test(sender, None))
1453 .await
1454 .unwrap()
1455 .await
1456 .unwrap()
1457 });
1458 wait_for_tcp(address).await;
1459 (recv, address)
1460 }
1461}