1use std::{
2 collections::HashMap,
3 io::{self, Read},
4 net::SocketAddr,
5 time::Duration,
6};
7
8use base64::prelude::{BASE64_STANDARD, Engine as _};
9use bytes::{Buf, Bytes, BytesMut};
10use chrono::Utc;
11use flate2::read::MultiGzDecoder;
12use rmp_serde::{Deserializer, Serializer, decode};
13use serde::{Deserialize, Serialize};
14use smallvec::{SmallVec, smallvec};
15use tokio_util::codec::Decoder;
16use vector_lib::{
17 codecs::{BytesDeserializerConfig, StreamDecodingError},
18 config::{LegacyKey, LogNamespace},
19 configurable::configurable_component,
20 ipallowlist::IpAllowlistConfig,
21 lookup::{OwnedValuePath, lookup_v2::parse_value_path, metadata_path, owned_value_path, path},
22 schema::Definition,
23};
24use vrl::value::{Kind, Value, kind::Collection};
25
26use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
27use crate::{
28 config::{
29 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
30 SourceContext, SourceOutput, log_schema,
31 },
32 event::{Event, LogEvent},
33 internal_events::{FluentMessageDecodeError, FluentMessageReceived},
34 serde::bool_or_struct,
35 tcp::TcpKeepaliveConfig,
36 tls::{MaybeTlsSettings, TlsSourceConfig},
37};
38
39mod message;
40use self::message::{FluentEntry, FluentMessage, FluentRecord, FluentTag, FluentTimestamp};
41
42#[configurable_component(source("fluent", "Collect logs from a Fluentd or Fluent Bit agent."))]
44#[derive(Clone, Debug)]
45pub struct FluentConfig {
46 #[serde(flatten)]
47 mode: FluentMode,
48
49 #[configurable(metadata(docs::hidden))]
51 #[serde(default)]
52 log_namespace: Option<bool>,
53}
54
55#[configurable_component(no_deser)]
57#[derive(Clone, Debug)]
58#[serde(tag = "mode", rename_all = "snake_case")]
59#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
60#[allow(clippy::large_enum_variant)] pub enum FluentMode {
62 Tcp(FluentTcpConfig),
64
65 #[cfg(unix)]
67 Unix(FluentUnixConfig),
68}
69
70mod deser {
76 use super::*;
77
78 #[allow(clippy::large_enum_variant)]
79 #[derive(Deserialize)]
80 #[serde(tag = "mode")]
81 enum FluentModeTagged {
82 #[serde(rename = "tcp")]
83 Tcp(FluentTcpConfig),
84
85 #[cfg(unix)]
86 #[serde(rename = "unix")]
87 Unix(FluentUnixConfig),
88 }
89
90 #[derive(Deserialize)]
91 #[serde(untagged)]
92 enum FluentModeDe {
93 Tagged(FluentModeTagged),
94
95 Untagged(FluentTcpConfig),
97 }
98
99 impl<'de> Deserialize<'de> for FluentMode {
100 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
101 where
102 D: serde::Deserializer<'de>,
103 {
104 Ok(match FluentModeDe::deserialize(deserializer)? {
105 FluentModeDe::Tagged(FluentModeTagged::Tcp(config)) => FluentMode::Tcp(config),
106 #[cfg(unix)]
107 FluentModeDe::Tagged(FluentModeTagged::Unix(config)) => FluentMode::Unix(config),
108 FluentModeDe::Untagged(config) => FluentMode::Tcp(config),
109 })
110 }
111 }
112
113 #[cfg(test)]
114 mod tests {
115 use super::*;
116
117 #[test]
118 fn test_tcp_default_mode() {
119 let json_data = serde_json::json!({
120 "address": "0.0.0.0:2020",
121 "connection_limit": 2
122 });
123
124 let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
125 assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
126 }
127
128 #[test]
129 fn test_tcp_explicit_mode() {
130 let json_data = serde_json::json!({
131 "mode": "tcp",
132 "address": "0.0.0.0:2020",
133 "connection_limit": 2
134 });
135
136 let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
137 assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
138 }
139
140 #[test]
141 fn test_invalid_unix_mode() {
142 let json_data = serde_json::json!({
143 "mode": "unix",
144 "address": "0.0.0.0:2020",
145 "connection_limit": 2
146 });
147
148 assert!(serde_json::from_value::<FluentConfig>(json_data).is_err());
149 }
150
151 #[cfg(unix)]
152 #[test]
153 fn test_valid_unix_mode() {
154 let json_data = serde_json::json!({
155 "mode": "unix",
156 "path": "/foo"
157 });
158
159 let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
160 assert!(
161 matches!(parsed.mode, FluentMode::Unix(c) if c.path.to_string_lossy() == "/foo")
162 );
163 }
164 }
165}
166
167#[configurable_component]
169#[derive(Clone, Debug)]
170#[serde(deny_unknown_fields)]
171pub struct FluentTcpConfig {
172 #[configurable(derived)]
173 address: SocketListenAddr,
174
175 #[configurable(metadata(docs::type_unit = "connections"))]
177 connection_limit: Option<u32>,
178
179 #[configurable(derived)]
180 keepalive: Option<TcpKeepaliveConfig>,
181
182 #[configurable(derived)]
183 pub permit_origin: Option<IpAllowlistConfig>,
184
185 #[configurable(metadata(docs::type_unit = "bytes"))]
189 #[configurable(metadata(docs::examples = 65536))]
190 receive_buffer_bytes: Option<usize>,
191
192 #[configurable(derived)]
193 tls: Option<TlsSourceConfig>,
194
195 #[configurable(derived)]
196 #[serde(default, deserialize_with = "bool_or_struct")]
197 acknowledgements: SourceAcknowledgementsConfig,
198}
199
200impl FluentTcpConfig {
201 fn build(
202 &self,
203 cx: SourceContext,
204 log_namespace: LogNamespace,
205 ) -> crate::Result<super::Source> {
206 let source = FluentSource::new(log_namespace);
207 let shutdown_secs = Duration::from_secs(30);
208 let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
209 let tls_client_metadata_key = self
210 .tls
211 .as_ref()
212 .and_then(|tls| tls.client_metadata_key.clone())
213 .and_then(|k| k.path);
214 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
215 source.run(
216 self.address,
217 self.keepalive,
218 shutdown_secs,
219 tls,
220 tls_client_metadata_key,
221 self.receive_buffer_bytes,
222 None,
223 cx,
224 self.acknowledgements,
225 self.connection_limit,
226 self.permit_origin.clone().map(Into::into),
227 FluentConfig::NAME,
228 log_namespace,
229 )
230 }
231}
232
233#[configurable_component]
235#[derive(Clone, Debug)]
236#[serde(deny_unknown_fields)]
237#[cfg(unix)]
238pub struct FluentUnixConfig {
239 #[configurable(metadata(docs::examples = "/path/to/socket"))]
243 pub path: std::path::PathBuf,
244
245 #[configurable(metadata(docs::examples = 0o777))]
250 #[configurable(metadata(docs::examples = 0o600))]
251 #[configurable(metadata(docs::examples = 508))]
252 pub socket_file_mode: Option<u32>,
253}
254
255#[cfg(unix)]
256impl FluentUnixConfig {
257 fn build(
258 &self,
259 cx: SourceContext,
260 log_namespace: LogNamespace,
261 ) -> crate::Result<super::Source> {
262 let source = FluentSource::new(log_namespace);
263
264 crate::sources::util::build_unix_stream_source(
265 self.path.clone(),
266 self.socket_file_mode,
267 source.decoder(),
268 move |events, host| source.handle_events_impl(events, host.into()),
269 cx.shutdown,
270 cx.out,
271 )
272 }
273}
274
275impl GenerateConfig for FluentConfig {
276 fn generate_config() -> toml::Value {
277 toml::Value::try_from(Self {
278 mode: FluentMode::Tcp(FluentTcpConfig {
279 address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
280 keepalive: None,
281 permit_origin: None,
282 tls: None,
283 receive_buffer_bytes: None,
284 acknowledgements: Default::default(),
285 connection_limit: Some(2),
286 }),
287 log_namespace: None,
288 })
289 .unwrap()
290 }
291}
292
293#[async_trait::async_trait]
294#[typetag::serde(name = "fluent")]
295impl SourceConfig for FluentConfig {
296 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
297 let log_namespace = cx.log_namespace(self.log_namespace);
298 match &self.mode {
299 FluentMode::Tcp(t) => t.build(cx, log_namespace),
300 #[cfg(unix)]
301 FluentMode::Unix(u) => u.build(cx, log_namespace),
302 }
303 }
304
305 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
306 let log_namespace = global_log_namespace.merge(self.log_namespace);
307 let schema_definition = self.schema_definition(log_namespace);
308
309 vec![SourceOutput::new_maybe_logs(
310 DataType::Log,
311 schema_definition,
312 )]
313 }
314
315 fn resources(&self) -> Vec<Resource> {
316 match &self.mode {
317 FluentMode::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
318 #[cfg(unix)]
319 FluentMode::Unix(_) => vec![],
320 }
321 }
322
323 fn can_acknowledge(&self) -> bool {
324 matches!(self.mode, FluentMode::Tcp(_))
325 }
326}
327
328impl FluentConfig {
329 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
331 let host_key = log_schema()
333 .host_key()
334 .cloned()
335 .map(LegacyKey::InsertIfEmpty);
336
337 let tag_key = parse_value_path("tag").ok().map(LegacyKey::Overwrite);
338
339 let tls_client_metadata_path = match &self.mode {
340 FluentMode::Tcp(tcp) => tcp
341 .tls
342 .as_ref()
343 .and_then(|tls| tls.client_metadata_key.as_ref())
344 .and_then(|k| k.path.clone())
345 .map(LegacyKey::Overwrite),
346 #[cfg(unix)]
347 FluentMode::Unix(_) => None,
348 };
349
350 let mut schema_definition = BytesDeserializerConfig
353 .schema_definition(log_namespace)
354 .with_standard_vector_source_metadata()
355 .with_source_metadata(
356 FluentConfig::NAME,
357 host_key,
358 &owned_value_path!("host"),
359 Kind::bytes(),
360 Some("host"),
361 )
362 .with_source_metadata(
363 FluentConfig::NAME,
364 tag_key,
365 &owned_value_path!("tag"),
366 Kind::bytes(),
367 None,
368 )
369 .with_source_metadata(
370 FluentConfig::NAME,
371 None,
372 &owned_value_path!("timestamp"),
373 Kind::timestamp(),
374 Some("timestamp"),
375 )
376 .with_source_metadata(
378 FluentConfig::NAME,
379 None,
380 &owned_value_path!("record"),
381 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
382 None,
383 )
384 .with_source_metadata(
385 Self::NAME,
386 tls_client_metadata_path,
387 &owned_value_path!("tls_client_metadata"),
388 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
389 None,
390 );
391
392 if log_namespace == LogNamespace::Legacy {
394 schema_definition = schema_definition.unknown_fields(Kind::bytes());
395 }
396
397 schema_definition
398 }
399}
400
401#[derive(Debug, Clone)]
402struct FluentSource {
403 log_namespace: LogNamespace,
404 legacy_host_key_path: Option<OwnedValuePath>,
405}
406
407impl FluentSource {
408 fn new(log_namespace: LogNamespace) -> Self {
409 Self {
410 log_namespace,
411 legacy_host_key_path: log_schema().host_key().cloned(),
412 }
413 }
414
415 fn handle_events_impl(&self, events: &mut [Event], host: Value) {
416 for event in events {
417 let log = event.as_mut_log();
418
419 let legacy_host_key = self
420 .legacy_host_key_path
421 .as_ref()
422 .map(LegacyKey::InsertIfEmpty);
423
424 self.log_namespace.insert_source_metadata(
425 FluentConfig::NAME,
426 log,
427 legacy_host_key,
428 path!("host"),
429 host.clone(),
430 );
431 }
432 }
433}
434
435impl TcpSource for FluentSource {
436 type Error = DecodeError;
437 type Item = FluentFrame;
438 type Decoder = FluentDecoder;
439 type Acker = FluentAcker;
440
441 fn decoder(&self) -> Self::Decoder {
442 FluentDecoder::new(self.log_namespace)
443 }
444
445 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
446 self.handle_events_impl(events, host.ip().to_string().into())
447 }
448
449 fn build_acker(&self, frame: &[Self::Item]) -> Self::Acker {
450 FluentAcker::new(frame)
451 }
452}
453
454#[derive(Debug)]
455pub enum DecodeError {
456 IO(io::Error),
457 Decode(decode::Error),
458 UnknownCompression(String),
459 UnexpectedValue(rmpv::Value),
460}
461
462impl std::fmt::Display for DecodeError {
463 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464 match self {
465 DecodeError::IO(err) => write!(f, "{err}"),
466 DecodeError::Decode(err) => write!(f, "{err}"),
467 DecodeError::UnknownCompression(compression) => {
468 write!(f, "unknown compression: {compression}")
469 }
470 DecodeError::UnexpectedValue(value) => {
471 write!(f, "unexpected msgpack value, ignoring: {value}")
472 }
473 }
474 }
475}
476
477impl StreamDecodingError for DecodeError {
478 fn can_continue(&self) -> bool {
479 match self {
480 DecodeError::IO(_) => false,
481 DecodeError::Decode(_) => true,
482 DecodeError::UnknownCompression(_) => true,
483 DecodeError::UnexpectedValue(_) => true,
484 }
485 }
486}
487
488impl From<io::Error> for DecodeError {
489 fn from(e: io::Error) -> Self {
490 DecodeError::IO(e)
491 }
492}
493
494impl From<decode::Error> for DecodeError {
495 fn from(e: decode::Error) -> Self {
496 DecodeError::Decode(e)
497 }
498}
499
500#[derive(Debug, Clone)]
501struct FluentDecoder {
502 log_namespace: LogNamespace,
503}
504
505impl FluentDecoder {
506 const fn new(log_namespace: LogNamespace) -> Self {
507 Self { log_namespace }
508 }
509
510 fn handle_message(
511 &mut self,
512 message: Result<FluentMessage, DecodeError>,
513 byte_size: usize,
514 ) -> Result<Option<(FluentFrame, usize)>, DecodeError> {
515 let log_namespace = &self.log_namespace;
516
517 match message? {
518 FluentMessage::Message(tag, timestamp, record) => {
519 let event = Event::from(FluentEvent {
520 tag,
521 timestamp,
522 record,
523 log_namespace,
524 });
525 let frame = FluentFrame {
526 events: smallvec![event],
527 chunk: None,
528 };
529 Ok(Some((frame, byte_size)))
530 }
531 FluentMessage::MessageWithOptions(tag, timestamp, record, options) => {
532 let event = Event::from(FluentEvent {
533 tag,
534 timestamp,
535 record,
536 log_namespace,
537 });
538 let frame = FluentFrame {
539 events: smallvec![event],
540 chunk: options.chunk,
541 };
542 Ok(Some((frame, byte_size)))
543 }
544 FluentMessage::Forward(tag, entries) => {
545 let events = entries
546 .into_iter()
547 .map(|FluentEntry(timestamp, record)| {
548 Event::from(FluentEvent {
549 tag: tag.clone(),
550 timestamp,
551 record,
552 log_namespace,
553 })
554 })
555 .collect();
556 let frame = FluentFrame {
557 events,
558 chunk: None,
559 };
560 Ok(Some((frame, byte_size)))
561 }
562 FluentMessage::ForwardWithOptions(tag, entries, options) => {
563 let events = entries
564 .into_iter()
565 .map(|FluentEntry(timestamp, record)| {
566 Event::from(FluentEvent {
567 tag: tag.clone(),
568 timestamp,
569 record,
570 log_namespace,
571 })
572 })
573 .collect();
574 let frame = FluentFrame {
575 events,
576 chunk: options.chunk,
577 };
578 Ok(Some((frame, byte_size)))
579 }
580 FluentMessage::PackedForward(tag, bin) => {
581 let mut buf = BytesMut::from(&bin[..]);
582
583 let mut events = smallvec![];
584 while let Some(FluentEntry(timestamp, record)) =
585 FluentEntryStreamDecoder.decode(&mut buf)?
586 {
587 events.push(Event::from(FluentEvent {
588 tag: tag.clone(),
589 timestamp,
590 record,
591 log_namespace,
592 }));
593 }
594 let frame = FluentFrame {
595 events,
596 chunk: None,
597 };
598 Ok(Some((frame, byte_size)))
599 }
600 FluentMessage::PackedForwardWithOptions(tag, bin, options) => {
601 let buf = match options.compressed.as_deref() {
602 Some("gzip") => {
603 let mut buf = Vec::new();
604 MultiGzDecoder::new(io::Cursor::new(bin.into_vec()))
605 .read_to_end(&mut buf)
606 .map(|_| buf)
607 .map_err(Into::into)
608 }
609 Some("text") | None => Ok(bin.into_vec()),
610 Some(s) => Err(DecodeError::UnknownCompression(s.to_owned())),
611 }?;
612
613 let mut buf = BytesMut::from(&buf[..]);
614
615 let mut events = smallvec![];
616 while let Some(FluentEntry(timestamp, record)) =
617 FluentEntryStreamDecoder.decode(&mut buf)?
618 {
619 events.push(Event::from(FluentEvent {
620 tag: tag.clone(),
621 timestamp,
622 record,
623 log_namespace,
624 }));
625 }
626 let frame = FluentFrame {
627 events,
628 chunk: options.chunk,
629 };
630 Ok(Some((frame, byte_size)))
631 }
632 FluentMessage::Heartbeat(rmpv::Value::Nil) => Ok(None),
633 FluentMessage::Heartbeat(value) => Err(DecodeError::UnexpectedValue(value)),
634 }
635 }
636}
637
638impl Decoder for FluentDecoder {
639 type Item = (FluentFrame, usize);
640 type Error = DecodeError;
641
642 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
643 loop {
644 if src.is_empty() {
645 return Ok(None);
646 }
647
648 let (byte_size, res) = {
649 let mut des = Deserializer::new(io::Cursor::new(&src[..]));
650
651 let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
652
653 if let Err(DecodeError::Decode(
655 decode::Error::InvalidDataRead(ref custom)
656 | decode::Error::InvalidMarkerRead(ref custom),
657 )) = res
658 && custom.kind() == io::ErrorKind::UnexpectedEof
659 {
660 return Ok(None);
661 }
662
663 (des.position() as usize, res)
664 };
665
666 src.advance(byte_size);
667
668 let maybe_item = self.handle_message(res, byte_size).inspect_err(|error| {
669 let base64_encoded_message = BASE64_STANDARD.encode(&src[..]);
670 emit!(FluentMessageDecodeError {
671 error,
672 base64_encoded_message
673 });
674 })?;
675 if let Some(item) = maybe_item {
676 return Ok(Some(item));
677 }
678 }
679 }
680}
681
682#[derive(Clone, Debug)]
684struct FluentEntryStreamDecoder;
685
686impl Decoder for FluentEntryStreamDecoder {
687 type Item = FluentEntry;
688 type Error = DecodeError;
689
690 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
691 if src.is_empty() {
692 return Ok(None);
693 }
694 let (byte_size, res) = {
695 let mut des = Deserializer::new(io::Cursor::new(&src[..]));
696
697 let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
699
700 if let Err(DecodeError::Decode(decode::Error::InvalidDataRead(ref custom))) = res
701 && custom.kind() == io::ErrorKind::UnexpectedEof
702 {
703 return Ok(None);
704 }
705
706 let byte_size = des.position();
707
708 emit!(FluentMessageReceived { byte_size });
709
710 (byte_size as usize, res)
711 };
712
713 src.advance(byte_size);
714
715 res
716 }
717}
718
719struct FluentAcker {
720 chunks: Vec<String>,
721}
722
723impl FluentAcker {
724 fn new(frames: &[FluentFrame]) -> Self {
725 Self {
726 chunks: frames.iter().filter_map(|f| f.chunk.clone()).collect(),
727 }
728 }
729}
730
731impl TcpSourceAcker for FluentAcker {
732 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
733 if self.chunks.is_empty() {
734 return None;
735 }
736
737 let mut buf = Vec::new();
738 let mut ser = Serializer::new(&mut buf);
739 let mut ack_map = HashMap::new();
740
741 for chunk in self.chunks {
742 ack_map.clear();
743 if let TcpSourceAck::Ack = ack {
744 ack_map.insert("ack", chunk);
745 };
746 ack_map.serialize(&mut ser).unwrap();
747 }
748 Some(buf.into())
749 }
750}
751
752#[derive(Debug, PartialEq)]
754struct FluentEvent<'a> {
755 tag: FluentTag,
756 timestamp: FluentTimestamp,
757 record: FluentRecord,
758 log_namespace: &'a LogNamespace,
759}
760
761impl From<FluentEvent<'_>> for Event {
762 fn from(frame: FluentEvent) -> Event {
763 LogEvent::from(frame).into()
764 }
765}
766
767struct FluentFrame {
768 events: SmallVec<[Event; 1]>,
769 chunk: Option<String>,
770}
771
772impl From<FluentFrame> for SmallVec<[Event; 1]> {
773 fn from(frame: FluentFrame) -> Self {
774 frame.events
775 }
776}
777
778impl From<FluentEvent<'_>> for LogEvent {
779 fn from(frame: FluentEvent) -> LogEvent {
780 let FluentEvent {
781 tag,
782 timestamp,
783 record,
784 log_namespace,
785 } = frame;
786
787 let mut log = LogEvent::default();
788
789 log_namespace.insert_vector_metadata(
790 &mut log,
791 log_schema().source_type_key(),
792 path!("source_type"),
793 Bytes::from_static(FluentConfig::NAME.as_bytes()),
794 );
795
796 match log_namespace {
797 LogNamespace::Vector => {
798 log.insert(metadata_path!(FluentConfig::NAME, "timestamp"), timestamp);
799 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
800 }
801 LogNamespace::Legacy => {
802 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
803 }
804 }
805
806 log_namespace.insert_source_metadata(
807 FluentConfig::NAME,
808 &mut log,
809 Some(LegacyKey::Overwrite(path!("tag"))),
810 path!("tag"),
811 tag,
812 );
813
814 for (key, value) in record.into_iter() {
815 let value: Value = value.into();
816 log_namespace.insert_source_metadata(
817 FluentConfig::NAME,
818 &mut log,
819 Some(LegacyKey::Overwrite(path!(key.as_str()))),
820 path!("record", key.as_str()),
821 value,
822 );
823 }
824 log
825 }
826}
827
828#[cfg(test)]
829mod tests {
830 use bytes::BytesMut;
831 use chrono::{DateTime, Utc};
832 use rmp_serde::Serializer;
833 use serde::Serialize;
834 use tokio::{
835 io::{AsyncReadExt, AsyncWriteExt},
836 time::{Duration, error::Elapsed, timeout},
837 };
838 use tokio_util::codec::Decoder;
839 use vector_lib::{assert_event_data_eq, lookup::OwnedTargetPath, schema::Definition};
840 use vrl::value::{ObjectMap, Value, kind::Collection};
841
842 use super::{message::FluentMessageOptions, *};
843 use crate::{
844 SourceSender,
845 config::{SourceConfig, SourceContext},
846 event::EventStatus,
847 test_util::{self, addr::next_addr, trace_init, wait_for_tcp},
848 };
849
850 #[test]
851 fn generate_config() {
852 crate::test_util::test_generate_config::<FluentConfig>();
853 }
854
855 fn mock_event(name: &str, timestamp: &str) -> Event {
861 Event::Log(LogEvent::from(ObjectMap::from([
862 ("message".into(), Value::from(name)),
863 (
864 log_schema().source_type_key().unwrap().to_string().into(),
865 Value::from(FluentConfig::NAME),
866 ),
867 ("tag".into(), Value::from("tag.name")),
868 (
869 "timestamp".into(),
870 Value::Timestamp(DateTime::parse_from_rfc3339(timestamp).unwrap().into()),
871 ),
872 ])))
873 }
874
875 #[test]
876 fn decode_message_mode() {
877 let message: Vec<u8> = vec![
883 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
884 101, 115, 115, 97, 103, 101, 163, 98, 97, 114,
885 ];
886
887 let expected = mock_event("bar", "2015-09-07T01:23:04Z");
888 let got = decode_all(message.clone()).unwrap();
889 assert_event_data_eq!(got.0[0], expected);
890 assert_eq!(got.1, message.len());
891 }
892
893 #[test]
894 fn decode_message_mode_with_options() {
895 let message: Vec<u8> = vec![
902 148, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
903 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 129, 164, 115, 105, 122, 101, 1,
904 ];
905
906 let expected = mock_event("bar", "2015-09-07T01:23:04Z");
907 let got = decode_all(message.clone()).unwrap();
908 assert_eq!(got.1, message.len());
909 assert_event_data_eq!(got.0[0], expected);
910 }
911
912 #[test]
913 fn decode_forward_mode() {
914 let message: Vec<u8> = vec![
923 146, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
924 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
925 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
926 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122,
927 ];
928
929 let expected = [
930 mock_event("foo", "2015-09-07T01:23:04Z"),
931 mock_event("bar", "2015-09-07T01:23:05Z"),
932 mock_event("baz", "2015-09-07T01:23:06Z"),
933 ];
934 let got = decode_all(message.clone()).unwrap();
935
936 assert_eq!(got.1, message.len());
937 assert_event_data_eq!(got.0[0], expected[0]);
938 assert_event_data_eq!(got.0[1], expected[1]);
939 assert_event_data_eq!(got.0[2], expected[2]);
940 }
941
942 #[test]
943 fn decode_forward_mode_with_options() {
944 let message: Vec<u8> = vec![
954 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
955 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
956 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
957 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 164, 115, 105,
958 122, 101, 3,
959 ];
960
961 let expected = [
962 mock_event("foo", "2015-09-07T01:23:04Z"),
963 mock_event("bar", "2015-09-07T01:23:05Z"),
964 mock_event("baz", "2015-09-07T01:23:06Z"),
965 ];
966
967 let got = decode_all(message.clone()).unwrap();
968
969 assert_eq!(got.1, message.len());
970
971 assert_event_data_eq!(got.0[0], expected[0]);
972 assert_event_data_eq!(got.0[1], expected[1]);
973 assert_event_data_eq!(got.0[2], expected[2]);
974 }
975
976 #[test]
977 fn decode_packed_forward_mode() {
978 let message: Vec<u8> = vec![
988 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 57, 146, 206, 85, 236, 230, 248,
989 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230,
990 249, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236,
991 230, 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 167, 109,
992 101, 115, 115, 97, 103, 101, 163, 102, 111, 111,
993 ];
994
995 let expected = [
996 mock_event("foo", "2015-09-07T01:23:04Z"),
997 mock_event("bar", "2015-09-07T01:23:05Z"),
998 mock_event("baz", "2015-09-07T01:23:06Z"),
999 ];
1000
1001 let got = decode_all(message.clone()).unwrap();
1002
1003 assert_eq!(got.1, message.len());
1004 assert_event_data_eq!(got.0[0], expected[0]);
1005 assert_event_data_eq!(got.0[1], expected[1]);
1006 assert_event_data_eq!(got.0[2], expected[2]);
1007 }
1008
1009 #[test]
1011 fn decode_compressed_packed_forward_mode() {
1012 let message: Vec<u8> = vec![
1023 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 55, 31, 139, 8, 0, 245, 10, 168,
1024 96, 0, 3, 155, 116, 46, 244, 205, 179, 31, 141, 203, 115, 83, 139, 139, 19, 211, 83,
1025 23, 167, 229, 231, 79, 2, 9, 253, 68, 8, 37, 37, 22, 129, 133, 126, 33, 11, 85, 1, 0,
1026 53, 3, 158, 28, 57, 0, 0, 0, 129, 170, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100,
1027 164, 103, 122, 105, 112,
1028 ];
1029
1030 let expected = [
1031 mock_event("foo", "2015-09-07T01:23:04Z"),
1032 mock_event("bar", "2015-09-07T01:23:05Z"),
1033 mock_event("baz", "2015-09-07T01:23:06Z"),
1034 ];
1035
1036 let got = decode_all(message.clone()).unwrap();
1037
1038 assert_eq!(got.1, message.len());
1039 assert_event_data_eq!(got.0[0], expected[0]);
1040 assert_event_data_eq!(got.0[1], expected[1]);
1041 assert_event_data_eq!(got.0[2], expected[2]);
1042 }
1043
1044 fn decode_all(message: Vec<u8>) -> Result<(SmallVec<[Event; 1]>, usize), DecodeError> {
1045 let mut buf = BytesMut::from(&message[..]);
1046
1047 let mut decoder = FluentDecoder::new(LogNamespace::default());
1048
1049 let (frame, byte_size) = decoder.decode(&mut buf)?.unwrap();
1050 Ok((frame.into(), byte_size))
1051 }
1052
1053 #[tokio::test]
1054 async fn ack_delivered_without_chunk() {
1055 let (result, output) = check_acknowledgements(EventStatus::Delivered, false).await;
1056 assert!(result.is_err()); assert!(output.is_empty());
1058 }
1059
1060 #[tokio::test]
1061 async fn ack_delivered_with_chunk() {
1062 let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
1063 assert_eq!(result.unwrap().unwrap(), output.len());
1064 let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; assert_eq!(output[..expected.len()], expected);
1066 }
1067
1068 #[tokio::test]
1069 async fn ack_failed_without_chunk() {
1070 let (result, output) = check_acknowledgements(EventStatus::Rejected, false).await;
1071 assert_eq!(result.unwrap().unwrap(), output.len());
1072 assert!(output.is_empty());
1073 }
1074
1075 #[tokio::test]
1076 async fn ack_failed_with_chunk() {
1077 let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
1078 assert_eq!(result.unwrap().unwrap(), output.len());
1079 let expected: Vec<u8> = vec![0x80]; assert_eq!(output, expected);
1081 }
1082
1083 async fn check_acknowledgements(
1084 status: EventStatus,
1085 with_chunk: bool,
1086 ) -> (Result<Result<usize, std::io::Error>, Elapsed>, Bytes) {
1087 trace_init();
1088
1089 let (sender, recv) = SourceSender::new_test_finalize(status);
1090 let (_guard, address) = next_addr();
1091 let source = FluentConfig {
1092 mode: FluentMode::Tcp(FluentTcpConfig {
1093 address: address.into(),
1094 tls: None,
1095 keepalive: None,
1096 permit_origin: None,
1097 receive_buffer_bytes: None,
1098 acknowledgements: true.into(),
1099 connection_limit: None,
1100 }),
1101 log_namespace: None,
1102 }
1103 .build(SourceContext::new_test(sender, None))
1104 .await
1105 .unwrap();
1106 tokio::spawn(source);
1107 wait_for_tcp(address).await;
1108
1109 let msg = uuid::Uuid::new_v4().to_string();
1110 let tag = uuid::Uuid::new_v4().to_string();
1111 let req = build_req(&tag, &[("field", &msg)], with_chunk);
1112
1113 let sender = tokio::spawn(async move {
1114 let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
1115 socket.write_all(&req).await.unwrap();
1116
1117 let mut output = BytesMut::new();
1118 (
1119 timeout(Duration::from_millis(250), socket.read_buf(&mut output)).await,
1120 output,
1121 )
1122 });
1123 let events = test_util::collect_n(recv, 1).await;
1124 let (result, output) = sender.await.unwrap();
1125
1126 assert_eq!(events.len(), 1);
1127 let log = events[0].as_log();
1128 assert_eq!(log.get("field").unwrap(), &msg.into());
1129 assert!(matches!(log.get("host").unwrap(), Value::Bytes(_)));
1130 assert!(matches!(log.get("timestamp").unwrap(), Value::Timestamp(_)));
1131 assert_eq!(log.get("tag").unwrap(), &tag.into());
1132
1133 (result, output.into())
1134 }
1135
1136 fn build_req(tag: &str, fields: &[(&str, &str)], with_chunk: bool) -> Vec<u8> {
1137 let mut record = FluentRecord::default();
1138 for (tag, value) in fields {
1139 record.insert((*tag).into(), rmpv::Value::String((*value).into()).into());
1140 }
1141 let chunk = with_chunk.then(|| BASE64_STANDARD.encode(uuid::Uuid::new_v4().as_bytes()));
1142 let req = FluentMessage::MessageWithOptions(
1143 tag.into(),
1144 FluentTimestamp::Unix(Utc::now()),
1145 record,
1146 FluentMessageOptions {
1147 chunk,
1148 ..Default::default()
1149 },
1150 );
1151 let mut buf = Vec::new();
1152 req.serialize(&mut Serializer::new(&mut buf)).unwrap();
1153 buf
1154 }
1155
1156 #[test]
1157 fn output_schema_definition_vector_namespace() {
1158 let config = FluentConfig {
1159 mode: FluentMode::Tcp(FluentTcpConfig {
1160 address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1161 tls: None,
1162 keepalive: None,
1163 permit_origin: None,
1164 receive_buffer_bytes: None,
1165 acknowledgements: false.into(),
1166 connection_limit: None,
1167 }),
1168 log_namespace: Some(true),
1169 };
1170
1171 let definitions = config
1172 .outputs(LogNamespace::Vector)
1173 .remove(0)
1174 .schema_definition(true);
1175
1176 let expected_definition =
1177 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1178 .with_meaning(OwnedTargetPath::event_root(), "message")
1179 .with_metadata_field(
1180 &owned_value_path!("vector", "source_type"),
1181 Kind::bytes(),
1182 None,
1183 )
1184 .with_metadata_field(&owned_value_path!("fluent", "tag"), Kind::bytes(), None)
1185 .with_metadata_field(
1186 &owned_value_path!("fluent", "timestamp"),
1187 Kind::timestamp(),
1188 Some("timestamp"),
1189 )
1190 .with_metadata_field(
1191 &owned_value_path!("fluent", "record"),
1192 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1193 None,
1194 )
1195 .with_metadata_field(
1196 &owned_value_path!("vector", "ingest_timestamp"),
1197 Kind::timestamp(),
1198 None,
1199 )
1200 .with_metadata_field(
1201 &owned_value_path!("fluent", "host"),
1202 Kind::bytes(),
1203 Some("host"),
1204 )
1205 .with_metadata_field(
1206 &owned_value_path!("fluent", "tls_client_metadata"),
1207 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1208 None,
1209 );
1210
1211 assert_eq!(definitions, Some(expected_definition))
1212 }
1213
1214 #[test]
1215 fn output_schema_definition_legacy_namespace() {
1216 let config = FluentConfig {
1217 mode: FluentMode::Tcp(FluentTcpConfig {
1218 address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1219 tls: None,
1220 keepalive: None,
1221 permit_origin: None,
1222 receive_buffer_bytes: None,
1223 acknowledgements: false.into(),
1224 connection_limit: None,
1225 }),
1226 log_namespace: None,
1227 };
1228
1229 let definitions = config
1230 .outputs(LogNamespace::Legacy)
1231 .remove(0)
1232 .schema_definition(true);
1233
1234 let expected_definition = Definition::new_with_default_metadata(
1235 Kind::object(Collection::empty()),
1236 [LogNamespace::Legacy],
1237 )
1238 .with_event_field(
1239 &owned_value_path!("message"),
1240 Kind::bytes(),
1241 Some("message"),
1242 )
1243 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1244 .with_event_field(&owned_value_path!("tag"), Kind::bytes(), None)
1245 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1246 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
1247 .unknown_fields(Kind::bytes());
1248
1249 assert_eq!(definitions, Some(expected_definition))
1250 }
1251}
1252
1253#[cfg(all(test, feature = "fluent-integration-tests"))]
1254mod integration_tests {
1255 use std::{fs::File, io::Write, net::SocketAddr, time::Duration};
1256
1257 use futures::Stream;
1258 use tokio::time::sleep;
1259 use vector_lib::event::{Event, EventStatus};
1260
1261 use crate::{
1262 SourceSender,
1263 config::{SourceConfig, SourceContext},
1264 docker::Container,
1265 sources::fluent::{FluentConfig, FluentMode, FluentTcpConfig},
1266 test_util::{
1267 addr::{PortGuard, next_addr, next_addr_for_ip},
1268 collect_ready,
1269 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
1270 random_string, wait_for_tcp,
1271 },
1272 };
1273
1274 const FLUENT_BIT_IMAGE: &str = "fluent/fluent-bit";
1275 const FLUENT_BIT_TAG: &str = "1.7";
1276 const FLUENTD_IMAGE: &str = "fluent/fluentd";
1277 const FLUENTD_TAG: &str = "v1.12";
1278
1279 fn make_file(name: &str, content: &str) -> tempfile::TempDir {
1280 let dir = tempfile::tempdir().unwrap();
1281 let mut file = File::create(dir.path().join(name)).unwrap();
1282 write!(&mut file, "{content}").unwrap();
1283 dir
1284 }
1285
1286 #[tokio::test]
1287 async fn fluentbit() {
1288 test_fluentbit(EventStatus::Delivered).await;
1289 }
1290
1291 #[tokio::test]
1292 async fn fluentbit_rejection() {
1293 test_fluentbit(EventStatus::Rejected).await;
1294 }
1295
1296 async fn test_fluentbit(status: EventStatus) {
1297 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1298 let (_guard, test_address) = next_addr();
1299 let (out, source_address, _guard) = source(status).await;
1300
1301 let dir = make_file(
1302 "fluent-bit.conf",
1303 &format!(
1304 r#"
1305[SERVICE]
1306 Grace 0
1307 Flush 1
1308 Daemon off
1309
1310[INPUT]
1311 Name http
1312 Host {listen_host}
1313 Port {listen_port}
1314
1315[OUTPUT]
1316 Name forward
1317 Match *
1318 Host host.docker.internal
1319 Port {send_port}
1320 Require_ack_response true
1321 "#,
1322 listen_host = test_address.ip(),
1323 listen_port = test_address.port(),
1324 send_port = source_address.port(),
1325 ),
1326 );
1327
1328 let msg = random_string(64);
1329 let body = serde_json::json!({ "message": msg });
1330
1331 let events = Container::new(FLUENT_BIT_IMAGE, FLUENT_BIT_TAG)
1332 .bind(dir.path().display(), "/fluent-bit/etc")
1333 .run(async move {
1334 wait_for_tcp(test_address).await;
1335 reqwest::Client::new()
1336 .post(format!("http://{test_address}/"))
1337 .header("content-type", "application/json")
1338 .body(body.to_string())
1339 .send()
1340 .await
1341 .unwrap();
1342 sleep(Duration::from_secs(2)).await;
1343
1344 collect_ready(out).await
1345 })
1346 .await;
1347
1348 assert_eq!(events.len(), 1);
1349 let log = events[0].as_log();
1350 assert_eq!(log["tag"], "http.0".into());
1351 assert_eq!(log["message"], msg.into());
1352 assert!(log.get("timestamp").is_some());
1353 assert!(log.get("host").is_some());
1354 })
1355 .await;
1356 }
1357
1358 #[tokio::test]
1359 async fn fluentd() {
1360 test_fluentd(EventStatus::Delivered, "").await;
1361 }
1362
1363 #[tokio::test]
1364 async fn fluentd_gzip() {
1365 test_fluentd(EventStatus::Delivered, "compress gzip").await;
1366 }
1367
1368 #[tokio::test]
1369 async fn fluentd_rejection() {
1370 test_fluentd(EventStatus::Rejected, "").await;
1371 }
1372
1373 async fn test_fluentd(status: EventStatus, options: &str) {
1374 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1375 let (_guard, test_address) = next_addr();
1376 let (out, source_address, _guard) = source(status).await;
1377
1378 let config = format!(
1379 r#"
1380<source>
1381 @type http
1382 bind {http_host}
1383 port {http_port}
1384</source>
1385
1386<match *>
1387 @type forward
1388 <server>
1389 name local
1390 host host.docker.internal
1391 port {port}
1392 </server>
1393 <buffer>
1394 flush_mode immediate
1395 </buffer>
1396 require_ack_response true
1397 ack_response_timeout 1
1398 {options}
1399</match>
1400"#,
1401 http_host = test_address.ip(),
1402 http_port = test_address.port(),
1403 port = source_address.port(),
1404 options = options
1405 );
1406
1407 let dir = make_file("fluent.conf", &config);
1408
1409 let msg = random_string(64);
1410 let body = serde_json::json!({ "message": msg });
1411
1412 let events = Container::new(FLUENTD_IMAGE, FLUENTD_TAG)
1413 .bind(dir.path().display(), "/fluentd/etc")
1414 .run(async move {
1415 wait_for_tcp(test_address).await;
1416 reqwest::Client::new()
1417 .post(format!("http://{test_address}/"))
1418 .header("content-type", "application/json")
1419 .body(body.to_string())
1420 .send()
1421 .await
1422 .unwrap();
1423 sleep(Duration::from_secs(2)).await;
1424 collect_ready(out).await
1425 })
1426 .await;
1427
1428 assert_eq!(events.len(), 1);
1429 assert_eq!(events[0].as_log()["tag"], "".into());
1430 assert_eq!(events[0].as_log()["message"], msg.into());
1431 assert!(events[0].as_log().get("timestamp").is_some());
1432 assert!(events[0].as_log().get("host").is_some());
1433 })
1434 .await;
1435 }
1436
1437 async fn source(
1438 status: EventStatus,
1439 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
1440 let (sender, recv) = SourceSender::new_test_finalize(status);
1441 let (_guard, address) =
1442 next_addr_for_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
1443 tokio::spawn(async move {
1444 FluentConfig {
1445 mode: FluentMode::Tcp(FluentTcpConfig {
1446 address: address.into(),
1447 tls: None,
1448 keepalive: None,
1449 permit_origin: None,
1450 receive_buffer_bytes: None,
1451 acknowledgements: false.into(),
1452 connection_limit: None,
1453 }),
1454 log_namespace: None,
1455 }
1456 .build(SourceContext::new_test(sender, None))
1457 .await
1458 .unwrap()
1459 .await
1460 .unwrap()
1461 });
1462 wait_for_tcp(address).await;
1463 (recv, address, _guard)
1464 }
1465}