1use std::collections::HashMap;
2use std::io::{self, Read};
3use std::net::SocketAddr;
4use std::time::Duration;
5
6use base64::prelude::{Engine as _, BASE64_STANDARD};
7use bytes::{Buf, Bytes, BytesMut};
8use chrono::Utc;
9use flate2::read::MultiGzDecoder;
10use rmp_serde::{decode, Deserializer, Serializer};
11use serde::{Deserialize, Serialize};
12use smallvec::{smallvec, SmallVec};
13use tokio_util::codec::Decoder;
14use vector_lib::codecs::{BytesDeserializerConfig, StreamDecodingError};
15use vector_lib::config::{LegacyKey, LogNamespace};
16use vector_lib::configurable::configurable_component;
17use vector_lib::ipallowlist::IpAllowlistConfig;
18use vector_lib::lookup::lookup_v2::parse_value_path;
19use vector_lib::lookup::{metadata_path, owned_value_path, path, OwnedValuePath};
20use vector_lib::schema::Definition;
21use vrl::value::kind::Collection;
22use vrl::value::{Kind, Value};
23
24use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
25use crate::{
26 config::{
27 log_schema, DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
28 SourceContext, SourceOutput,
29 },
30 event::{Event, LogEvent},
31 internal_events::{FluentMessageDecodeError, FluentMessageReceived},
32 serde::bool_or_struct,
33 tcp::TcpKeepaliveConfig,
34 tls::{MaybeTlsSettings, TlsSourceConfig},
35};
36
37mod message;
38use self::message::{FluentEntry, FluentMessage, FluentRecord, FluentTag, FluentTimestamp};
39
40#[configurable_component(source("fluent", "Collect logs from a Fluentd or Fluent Bit agent."))]
42#[derive(Clone, Debug)]
43pub struct FluentConfig {
44 #[serde(flatten)]
45 mode: FluentMode,
46
47 #[configurable(metadata(docs::hidden))]
49 #[serde(default)]
50 log_namespace: Option<bool>,
51}
52
53#[configurable_component(no_deser)]
55#[derive(Clone, Debug)]
56#[serde(tag = "mode", rename_all = "snake_case")]
57#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
58#[allow(clippy::large_enum_variant)] pub enum FluentMode {
60 Tcp(FluentTcpConfig),
62
63 #[cfg(unix)]
65 Unix(FluentUnixConfig),
66}
67
68mod deser {
74 use super::*;
75
76 #[allow(clippy::large_enum_variant)]
77 #[derive(Deserialize)]
78 #[serde(tag = "mode")]
79 enum FluentModeTagged {
80 #[serde(rename = "tcp")]
81 Tcp(FluentTcpConfig),
82
83 #[cfg(unix)]
84 #[serde(rename = "unix")]
85 Unix(FluentUnixConfig),
86 }
87
88 #[derive(Deserialize)]
89 #[serde(untagged)]
90 enum FluentModeDe {
91 Tagged(FluentModeTagged),
92
93 Untagged(FluentTcpConfig),
95 }
96
97 impl<'de> Deserialize<'de> for FluentMode {
98 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
99 where
100 D: serde::Deserializer<'de>,
101 {
102 Ok(match FluentModeDe::deserialize(deserializer)? {
103 FluentModeDe::Tagged(FluentModeTagged::Tcp(config)) => FluentMode::Tcp(config),
104 #[cfg(unix)]
105 FluentModeDe::Tagged(FluentModeTagged::Unix(config)) => FluentMode::Unix(config),
106 FluentModeDe::Untagged(config) => FluentMode::Tcp(config),
107 })
108 }
109 }
110
111 #[cfg(test)]
112 mod tests {
113 use super::*;
114
115 #[test]
116 fn test_tcp_default_mode() {
117 let json_data = serde_json::json!({
118 "address": "0.0.0.0:2020",
119 "connection_limit": 2
120 });
121
122 let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
123 assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
124 }
125
126 #[test]
127 fn test_tcp_explicit_mode() {
128 let json_data = serde_json::json!({
129 "mode": "tcp",
130 "address": "0.0.0.0:2020",
131 "connection_limit": 2
132 });
133
134 let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
135 assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
136 }
137
138 #[test]
139 fn test_invalid_unix_mode() {
140 let json_data = serde_json::json!({
141 "mode": "unix",
142 "address": "0.0.0.0:2020",
143 "connection_limit": 2
144 });
145
146 assert!(serde_json::from_value::<FluentConfig>(json_data).is_err());
147 }
148
149 #[cfg(unix)]
150 #[test]
151 fn test_valid_unix_mode() {
152 let json_data = serde_json::json!({
153 "mode": "unix",
154 "path": "/foo"
155 });
156
157 let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
158 assert!(
159 matches!(parsed.mode, FluentMode::Unix(c) if c.path.to_string_lossy() == "/foo")
160 );
161 }
162 }
163}
164
165#[configurable_component]
167#[derive(Clone, Debug)]
168#[serde(deny_unknown_fields)]
169pub struct FluentTcpConfig {
170 #[configurable(derived)]
171 address: SocketListenAddr,
172
173 #[configurable(metadata(docs::type_unit = "connections"))]
175 connection_limit: Option<u32>,
176
177 #[configurable(derived)]
178 keepalive: Option<TcpKeepaliveConfig>,
179
180 #[configurable(derived)]
181 pub permit_origin: Option<IpAllowlistConfig>,
182
183 #[configurable(metadata(docs::type_unit = "bytes"))]
187 #[configurable(metadata(docs::examples = 65536))]
188 receive_buffer_bytes: Option<usize>,
189
190 #[configurable(derived)]
191 tls: Option<TlsSourceConfig>,
192
193 #[configurable(derived)]
194 #[serde(default, deserialize_with = "bool_or_struct")]
195 acknowledgements: SourceAcknowledgementsConfig,
196}
197
198impl FluentTcpConfig {
199 fn build(
200 &self,
201 cx: SourceContext,
202 log_namespace: LogNamespace,
203 ) -> crate::Result<super::Source> {
204 let source = FluentSource::new(log_namespace);
205 let shutdown_secs = Duration::from_secs(30);
206 let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
207 let tls_client_metadata_key = self
208 .tls
209 .as_ref()
210 .and_then(|tls| tls.client_metadata_key.clone())
211 .and_then(|k| k.path);
212 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
213 source.run(
214 self.address,
215 self.keepalive,
216 shutdown_secs,
217 tls,
218 tls_client_metadata_key,
219 self.receive_buffer_bytes,
220 None,
221 cx,
222 self.acknowledgements,
223 self.connection_limit,
224 self.permit_origin.clone().map(Into::into),
225 FluentConfig::NAME,
226 log_namespace,
227 )
228 }
229}
230
231#[configurable_component]
233#[derive(Clone, Debug)]
234#[serde(deny_unknown_fields)]
235#[cfg(unix)]
236pub struct FluentUnixConfig {
237 #[configurable(metadata(docs::examples = "/path/to/socket"))]
241 pub path: std::path::PathBuf,
242
243 #[configurable(metadata(docs::examples = 0o777))]
248 #[configurable(metadata(docs::examples = 0o600))]
249 #[configurable(metadata(docs::examples = 508))]
250 pub socket_file_mode: Option<u32>,
251}
252
253#[cfg(unix)]
254impl FluentUnixConfig {
255 fn build(
256 &self,
257 cx: SourceContext,
258 log_namespace: LogNamespace,
259 ) -> crate::Result<super::Source> {
260 let source = FluentSource::new(log_namespace);
261
262 crate::sources::util::build_unix_stream_source(
263 self.path.clone(),
264 self.socket_file_mode,
265 source.decoder(),
266 move |events, host| source.handle_events_impl(events, host.into()),
267 cx.shutdown,
268 cx.out,
269 )
270 }
271}
272
273impl GenerateConfig for FluentConfig {
274 fn generate_config() -> toml::Value {
275 toml::Value::try_from(Self {
276 mode: FluentMode::Tcp(FluentTcpConfig {
277 address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
278 keepalive: None,
279 permit_origin: None,
280 tls: None,
281 receive_buffer_bytes: None,
282 acknowledgements: Default::default(),
283 connection_limit: Some(2),
284 }),
285 log_namespace: None,
286 })
287 .unwrap()
288 }
289}
290
291#[async_trait::async_trait]
292#[typetag::serde(name = "fluent")]
293impl SourceConfig for FluentConfig {
294 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
295 let log_namespace = cx.log_namespace(self.log_namespace);
296 match &self.mode {
297 FluentMode::Tcp(t) => t.build(cx, log_namespace),
298 #[cfg(unix)]
299 FluentMode::Unix(u) => u.build(cx, log_namespace),
300 }
301 }
302
303 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
304 let log_namespace = global_log_namespace.merge(self.log_namespace);
305 let schema_definition = self.schema_definition(log_namespace);
306
307 vec![SourceOutput::new_maybe_logs(
308 DataType::Log,
309 schema_definition,
310 )]
311 }
312
313 fn resources(&self) -> Vec<Resource> {
314 match &self.mode {
315 FluentMode::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
316 #[cfg(unix)]
317 FluentMode::Unix(_) => vec![],
318 }
319 }
320
321 fn can_acknowledge(&self) -> bool {
322 matches!(self.mode, FluentMode::Tcp(_))
323 }
324}
325
326impl FluentConfig {
327 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
329 let host_key = log_schema()
331 .host_key()
332 .cloned()
333 .map(LegacyKey::InsertIfEmpty);
334
335 let tag_key = parse_value_path("tag").ok().map(LegacyKey::Overwrite);
336
337 let tls_client_metadata_path = match &self.mode {
338 FluentMode::Tcp(tcp) => tcp
339 .tls
340 .as_ref()
341 .and_then(|tls| tls.client_metadata_key.as_ref())
342 .and_then(|k| k.path.clone())
343 .map(LegacyKey::Overwrite),
344 #[cfg(unix)]
345 FluentMode::Unix(_) => None,
346 };
347
348 let mut schema_definition = BytesDeserializerConfig
351 .schema_definition(log_namespace)
352 .with_standard_vector_source_metadata()
353 .with_source_metadata(
354 FluentConfig::NAME,
355 host_key,
356 &owned_value_path!("host"),
357 Kind::bytes(),
358 Some("host"),
359 )
360 .with_source_metadata(
361 FluentConfig::NAME,
362 tag_key,
363 &owned_value_path!("tag"),
364 Kind::bytes(),
365 None,
366 )
367 .with_source_metadata(
368 FluentConfig::NAME,
369 None,
370 &owned_value_path!("timestamp"),
371 Kind::timestamp(),
372 Some("timestamp"),
373 )
374 .with_source_metadata(
376 FluentConfig::NAME,
377 None,
378 &owned_value_path!("record"),
379 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
380 None,
381 )
382 .with_source_metadata(
383 Self::NAME,
384 tls_client_metadata_path,
385 &owned_value_path!("tls_client_metadata"),
386 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
387 None,
388 );
389
390 if log_namespace == LogNamespace::Legacy {
392 schema_definition = schema_definition.unknown_fields(Kind::bytes());
393 }
394
395 schema_definition
396 }
397}
398
399#[derive(Debug, Clone)]
400struct FluentSource {
401 log_namespace: LogNamespace,
402 legacy_host_key_path: Option<OwnedValuePath>,
403}
404
405impl FluentSource {
406 fn new(log_namespace: LogNamespace) -> Self {
407 Self {
408 log_namespace,
409 legacy_host_key_path: log_schema().host_key().cloned(),
410 }
411 }
412
413 fn handle_events_impl(&self, events: &mut [Event], host: Value) {
414 for event in events {
415 let log = event.as_mut_log();
416
417 let legacy_host_key = self
418 .legacy_host_key_path
419 .as_ref()
420 .map(LegacyKey::InsertIfEmpty);
421
422 self.log_namespace.insert_source_metadata(
423 FluentConfig::NAME,
424 log,
425 legacy_host_key,
426 path!("host"),
427 host.clone(),
428 );
429 }
430 }
431}
432
433impl TcpSource for FluentSource {
434 type Error = DecodeError;
435 type Item = FluentFrame;
436 type Decoder = FluentDecoder;
437 type Acker = FluentAcker;
438
439 fn decoder(&self) -> Self::Decoder {
440 FluentDecoder::new(self.log_namespace)
441 }
442
443 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
444 self.handle_events_impl(events, host.ip().to_string().into())
445 }
446
447 fn build_acker(&self, frame: &[Self::Item]) -> Self::Acker {
448 FluentAcker::new(frame)
449 }
450}
451
452#[derive(Debug)]
453pub enum DecodeError {
454 IO(io::Error),
455 Decode(decode::Error),
456 UnknownCompression(String),
457 UnexpectedValue(rmpv::Value),
458}
459
460impl std::fmt::Display for DecodeError {
461 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462 match self {
463 DecodeError::IO(err) => write!(f, "{err}"),
464 DecodeError::Decode(err) => write!(f, "{err}"),
465 DecodeError::UnknownCompression(compression) => {
466 write!(f, "unknown compression: {compression}")
467 }
468 DecodeError::UnexpectedValue(value) => {
469 write!(f, "unexpected msgpack value, ignoring: {value}")
470 }
471 }
472 }
473}
474
475impl StreamDecodingError for DecodeError {
476 fn can_continue(&self) -> bool {
477 match self {
478 DecodeError::IO(_) => false,
479 DecodeError::Decode(_) => true,
480 DecodeError::UnknownCompression(_) => true,
481 DecodeError::UnexpectedValue(_) => true,
482 }
483 }
484}
485
486impl From<io::Error> for DecodeError {
487 fn from(e: io::Error) -> Self {
488 DecodeError::IO(e)
489 }
490}
491
492impl From<decode::Error> for DecodeError {
493 fn from(e: decode::Error) -> Self {
494 DecodeError::Decode(e)
495 }
496}
497
498#[derive(Debug, Clone)]
499struct FluentDecoder {
500 log_namespace: LogNamespace,
501}
502
503impl FluentDecoder {
504 const fn new(log_namespace: LogNamespace) -> Self {
505 Self { log_namespace }
506 }
507
508 fn handle_message(
509 &mut self,
510 message: Result<FluentMessage, DecodeError>,
511 byte_size: usize,
512 ) -> Result<Option<(FluentFrame, usize)>, DecodeError> {
513 let log_namespace = &self.log_namespace;
514
515 match message? {
516 FluentMessage::Message(tag, timestamp, record) => {
517 let event = Event::from(FluentEvent {
518 tag,
519 timestamp,
520 record,
521 log_namespace,
522 });
523 let frame = FluentFrame {
524 events: smallvec![event],
525 chunk: None,
526 };
527 Ok(Some((frame, byte_size)))
528 }
529 FluentMessage::MessageWithOptions(tag, timestamp, record, options) => {
530 let event = Event::from(FluentEvent {
531 tag,
532 timestamp,
533 record,
534 log_namespace,
535 });
536 let frame = FluentFrame {
537 events: smallvec![event],
538 chunk: options.chunk,
539 };
540 Ok(Some((frame, byte_size)))
541 }
542 FluentMessage::Forward(tag, entries) => {
543 let events = entries
544 .into_iter()
545 .map(|FluentEntry(timestamp, record)| {
546 Event::from(FluentEvent {
547 tag: tag.clone(),
548 timestamp,
549 record,
550 log_namespace,
551 })
552 })
553 .collect();
554 let frame = FluentFrame {
555 events,
556 chunk: None,
557 };
558 Ok(Some((frame, byte_size)))
559 }
560 FluentMessage::ForwardWithOptions(tag, entries, options) => {
561 let events = entries
562 .into_iter()
563 .map(|FluentEntry(timestamp, record)| {
564 Event::from(FluentEvent {
565 tag: tag.clone(),
566 timestamp,
567 record,
568 log_namespace,
569 })
570 })
571 .collect();
572 let frame = FluentFrame {
573 events,
574 chunk: options.chunk,
575 };
576 Ok(Some((frame, byte_size)))
577 }
578 FluentMessage::PackedForward(tag, bin) => {
579 let mut buf = BytesMut::from(&bin[..]);
580
581 let mut events = smallvec![];
582 while let Some(FluentEntry(timestamp, record)) =
583 FluentEntryStreamDecoder.decode(&mut buf)?
584 {
585 events.push(Event::from(FluentEvent {
586 tag: tag.clone(),
587 timestamp,
588 record,
589 log_namespace,
590 }));
591 }
592 let frame = FluentFrame {
593 events,
594 chunk: None,
595 };
596 Ok(Some((frame, byte_size)))
597 }
598 FluentMessage::PackedForwardWithOptions(tag, bin, options) => {
599 let buf = match options.compressed.as_deref() {
600 Some("gzip") => {
601 let mut buf = Vec::new();
602 MultiGzDecoder::new(io::Cursor::new(bin.into_vec()))
603 .read_to_end(&mut buf)
604 .map(|_| buf)
605 .map_err(Into::into)
606 }
607 Some("text") | None => Ok(bin.into_vec()),
608 Some(s) => Err(DecodeError::UnknownCompression(s.to_owned())),
609 }?;
610
611 let mut buf = BytesMut::from(&buf[..]);
612
613 let mut events = smallvec![];
614 while let Some(FluentEntry(timestamp, record)) =
615 FluentEntryStreamDecoder.decode(&mut buf)?
616 {
617 events.push(Event::from(FluentEvent {
618 tag: tag.clone(),
619 timestamp,
620 record,
621 log_namespace,
622 }));
623 }
624 let frame = FluentFrame {
625 events,
626 chunk: options.chunk,
627 };
628 Ok(Some((frame, byte_size)))
629 }
630 FluentMessage::Heartbeat(rmpv::Value::Nil) => Ok(None),
631 FluentMessage::Heartbeat(value) => Err(DecodeError::UnexpectedValue(value)),
632 }
633 }
634}
635
636impl Decoder for FluentDecoder {
637 type Item = (FluentFrame, usize);
638 type Error = DecodeError;
639
640 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
641 loop {
642 if src.is_empty() {
643 return Ok(None);
644 }
645
646 let (byte_size, res) = {
647 let mut des = Deserializer::new(io::Cursor::new(&src[..]));
648
649 let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
650
651 if let Err(DecodeError::Decode(
653 decode::Error::InvalidDataRead(ref custom)
654 | decode::Error::InvalidMarkerRead(ref custom),
655 )) = res
656 {
657 if custom.kind() == io::ErrorKind::UnexpectedEof {
658 return Ok(None);
659 }
660 }
661
662 (des.position() as usize, res)
663 };
664
665 src.advance(byte_size);
666
667 let maybe_item = self.handle_message(res, byte_size).inspect_err(|error| {
668 let base64_encoded_message = BASE64_STANDARD.encode(&src[..]);
669 emit!(FluentMessageDecodeError {
670 error,
671 base64_encoded_message
672 });
673 })?;
674 if let Some(item) = maybe_item {
675 return Ok(Some(item));
676 }
677 }
678 }
679}
680
681#[derive(Clone, Debug)]
683struct FluentEntryStreamDecoder;
684
685impl Decoder for FluentEntryStreamDecoder {
686 type Item = FluentEntry;
687 type Error = DecodeError;
688
689 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
690 if src.is_empty() {
691 return Ok(None);
692 }
693 let (byte_size, res) = {
694 let mut des = Deserializer::new(io::Cursor::new(&src[..]));
695
696 let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
698
699 if let Err(DecodeError::Decode(decode::Error::InvalidDataRead(ref custom))) = res {
700 if custom.kind() == io::ErrorKind::UnexpectedEof {
701 return Ok(None);
702 }
703 }
704
705 let byte_size = des.position();
706
707 emit!(FluentMessageReceived { byte_size });
708
709 (byte_size as usize, res)
710 };
711
712 src.advance(byte_size);
713
714 res
715 }
716}
717
718struct FluentAcker {
719 chunks: Vec<String>,
720}
721
722impl FluentAcker {
723 fn new(frames: &[FluentFrame]) -> Self {
724 Self {
725 chunks: frames.iter().filter_map(|f| f.chunk.clone()).collect(),
726 }
727 }
728}
729
730impl TcpSourceAcker for FluentAcker {
731 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
732 if self.chunks.is_empty() {
733 return None;
734 }
735
736 let mut buf = Vec::new();
737 let mut ser = Serializer::new(&mut buf);
738 let mut ack_map = HashMap::new();
739
740 for chunk in self.chunks {
741 ack_map.clear();
742 if let TcpSourceAck::Ack = ack {
743 ack_map.insert("ack", chunk);
744 };
745 ack_map.serialize(&mut ser).unwrap();
746 }
747 Some(buf.into())
748 }
749}
750
751#[derive(Debug, PartialEq)]
753struct FluentEvent<'a> {
754 tag: FluentTag,
755 timestamp: FluentTimestamp,
756 record: FluentRecord,
757 log_namespace: &'a LogNamespace,
758}
759
760impl From<FluentEvent<'_>> for Event {
761 fn from(frame: FluentEvent) -> Event {
762 LogEvent::from(frame).into()
763 }
764}
765
766struct FluentFrame {
767 events: SmallVec<[Event; 1]>,
768 chunk: Option<String>,
769}
770
771impl From<FluentFrame> for SmallVec<[Event; 1]> {
772 fn from(frame: FluentFrame) -> Self {
773 frame.events
774 }
775}
776
777impl From<FluentEvent<'_>> for LogEvent {
778 fn from(frame: FluentEvent) -> LogEvent {
779 let FluentEvent {
780 tag,
781 timestamp,
782 record,
783 log_namespace,
784 } = frame;
785
786 let mut log = LogEvent::default();
787
788 log_namespace.insert_vector_metadata(
789 &mut log,
790 log_schema().source_type_key(),
791 path!("source_type"),
792 Bytes::from_static(FluentConfig::NAME.as_bytes()),
793 );
794
795 match log_namespace {
796 LogNamespace::Vector => {
797 log.insert(metadata_path!(FluentConfig::NAME, "timestamp"), timestamp);
798 log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
799 }
800 LogNamespace::Legacy => {
801 log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
802 }
803 }
804
805 log_namespace.insert_source_metadata(
806 FluentConfig::NAME,
807 &mut log,
808 Some(LegacyKey::Overwrite(path!("tag"))),
809 path!("tag"),
810 tag,
811 );
812
813 for (key, value) in record.into_iter() {
814 let value: Value = value.into();
815 log_namespace.insert_source_metadata(
816 FluentConfig::NAME,
817 &mut log,
818 Some(LegacyKey::Overwrite(path!(key.as_str()))),
819 path!("record", key.as_str()),
820 value,
821 );
822 }
823 log
824 }
825}
826
827#[cfg(test)]
828mod tests {
829 use bytes::BytesMut;
830 use chrono::{DateTime, Utc};
831 use rmp_serde::Serializer;
832 use serde::Serialize;
833 use tokio::{
834 io::{AsyncReadExt, AsyncWriteExt},
835 time::{error::Elapsed, timeout, Duration},
836 };
837 use tokio_util::codec::Decoder;
838 use vector_lib::assert_event_data_eq;
839 use vector_lib::lookup::OwnedTargetPath;
840 use vector_lib::schema::Definition;
841 use vrl::value::{kind::Collection, ObjectMap, Value};
842
843 use super::{message::FluentMessageOptions, *};
844 use crate::{
845 config::{SourceConfig, SourceContext},
846 event::EventStatus,
847 test_util::{self, next_addr, trace_init, wait_for_tcp},
848 SourceSender,
849 };
850
851 #[test]
852 fn generate_config() {
853 crate::test_util::test_generate_config::<FluentConfig>();
854 }
855
856 fn mock_event(name: &str, timestamp: &str) -> Event {
862 Event::Log(LogEvent::from(ObjectMap::from([
863 ("message".into(), Value::from(name)),
864 (
865 log_schema().source_type_key().unwrap().to_string().into(),
866 Value::from(FluentConfig::NAME),
867 ),
868 ("tag".into(), Value::from("tag.name")),
869 (
870 "timestamp".into(),
871 Value::Timestamp(DateTime::parse_from_rfc3339(timestamp).unwrap().into()),
872 ),
873 ])))
874 }
875
876 #[test]
877 fn decode_message_mode() {
878 let message: Vec<u8> = vec![
884 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
885 101, 115, 115, 97, 103, 101, 163, 98, 97, 114,
886 ];
887
888 let expected = mock_event("bar", "2015-09-07T01:23:04Z");
889 let got = decode_all(message.clone()).unwrap();
890 assert_event_data_eq!(got.0[0], expected);
891 assert_eq!(got.1, message.len());
892 }
893
894 #[test]
895 fn decode_message_mode_with_options() {
896 let message: Vec<u8> = vec![
903 148, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
904 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 129, 164, 115, 105, 122, 101, 1,
905 ];
906
907 let expected = mock_event("bar", "2015-09-07T01:23:04Z");
908 let got = decode_all(message.clone()).unwrap();
909 assert_eq!(got.1, message.len());
910 assert_event_data_eq!(got.0[0], expected);
911 }
912
913 #[test]
914 fn decode_forward_mode() {
915 let message: Vec<u8> = vec![
924 146, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
925 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
926 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
927 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122,
928 ];
929
930 let expected = vec![
931 mock_event("foo", "2015-09-07T01:23:04Z"),
932 mock_event("bar", "2015-09-07T01:23:05Z"),
933 mock_event("baz", "2015-09-07T01:23:06Z"),
934 ];
935 let got = decode_all(message.clone()).unwrap();
936
937 assert_eq!(got.1, message.len());
938 assert_event_data_eq!(got.0[0], expected[0]);
939 assert_event_data_eq!(got.0[1], expected[1]);
940 assert_event_data_eq!(got.0[2], expected[2]);
941 }
942
943 #[test]
944 fn decode_forward_mode_with_options() {
945 let message: Vec<u8> = vec![
955 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
956 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
957 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
958 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 164, 115, 105,
959 122, 101, 3,
960 ];
961
962 let expected = vec![
963 mock_event("foo", "2015-09-07T01:23:04Z"),
964 mock_event("bar", "2015-09-07T01:23:05Z"),
965 mock_event("baz", "2015-09-07T01:23:06Z"),
966 ];
967
968 let got = decode_all(message.clone()).unwrap();
969
970 assert_eq!(got.1, message.len());
971
972 assert_event_data_eq!(got.0[0], expected[0]);
973 assert_event_data_eq!(got.0[1], expected[1]);
974 assert_event_data_eq!(got.0[2], expected[2]);
975 }
976
977 #[test]
978 fn decode_packed_forward_mode() {
979 let message: Vec<u8> = vec![
989 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 57, 146, 206, 85, 236, 230, 248,
990 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230,
991 249, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236,
992 230, 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 167, 109,
993 101, 115, 115, 97, 103, 101, 163, 102, 111, 111,
994 ];
995
996 let expected = vec![
997 mock_event("foo", "2015-09-07T01:23:04Z"),
998 mock_event("bar", "2015-09-07T01:23:05Z"),
999 mock_event("baz", "2015-09-07T01:23:06Z"),
1000 ];
1001
1002 let got = decode_all(message.clone()).unwrap();
1003
1004 assert_eq!(got.1, message.len());
1005 assert_event_data_eq!(got.0[0], expected[0]);
1006 assert_event_data_eq!(got.0[1], expected[1]);
1007 assert_event_data_eq!(got.0[2], expected[2]);
1008 }
1009
1010 #[test]
1012 fn decode_compressed_packed_forward_mode() {
1013 let message: Vec<u8> = vec![
1024 147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 55, 31, 139, 8, 0, 245, 10, 168,
1025 96, 0, 3, 155, 116, 46, 244, 205, 179, 31, 141, 203, 115, 83, 139, 139, 19, 211, 83,
1026 23, 167, 229, 231, 79, 2, 9, 253, 68, 8, 37, 37, 22, 129, 133, 126, 33, 11, 85, 1, 0,
1027 53, 3, 158, 28, 57, 0, 0, 0, 129, 170, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100,
1028 164, 103, 122, 105, 112,
1029 ];
1030
1031 let expected = vec![
1032 mock_event("foo", "2015-09-07T01:23:04Z"),
1033 mock_event("bar", "2015-09-07T01:23:05Z"),
1034 mock_event("baz", "2015-09-07T01:23:06Z"),
1035 ];
1036
1037 let got = decode_all(message.clone()).unwrap();
1038
1039 assert_eq!(got.1, message.len());
1040 assert_event_data_eq!(got.0[0], expected[0]);
1041 assert_event_data_eq!(got.0[1], expected[1]);
1042 assert_event_data_eq!(got.0[2], expected[2]);
1043 }
1044
1045 fn decode_all(message: Vec<u8>) -> Result<(SmallVec<[Event; 1]>, usize), DecodeError> {
1046 let mut buf = BytesMut::from(&message[..]);
1047
1048 let mut decoder = FluentDecoder::new(LogNamespace::default());
1049
1050 let (frame, byte_size) = decoder.decode(&mut buf)?.unwrap();
1051 Ok((frame.into(), byte_size))
1052 }
1053
1054 #[tokio::test]
1055 async fn ack_delivered_without_chunk() {
1056 let (result, output) = check_acknowledgements(EventStatus::Delivered, false).await;
1057 assert!(result.is_err()); assert!(output.is_empty());
1059 }
1060
1061 #[tokio::test]
1062 async fn ack_delivered_with_chunk() {
1063 let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
1064 assert_eq!(result.unwrap().unwrap(), output.len());
1065 let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; assert_eq!(output[..expected.len()], expected);
1067 }
1068
1069 #[tokio::test]
1070 async fn ack_failed_without_chunk() {
1071 let (result, output) = check_acknowledgements(EventStatus::Rejected, false).await;
1072 assert_eq!(result.unwrap().unwrap(), output.len());
1073 assert!(output.is_empty());
1074 }
1075
1076 #[tokio::test]
1077 async fn ack_failed_with_chunk() {
1078 let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
1079 assert_eq!(result.unwrap().unwrap(), output.len());
1080 let expected: Vec<u8> = vec![0x80]; assert_eq!(output, expected);
1082 }
1083
1084 async fn check_acknowledgements(
1085 status: EventStatus,
1086 with_chunk: bool,
1087 ) -> (Result<Result<usize, std::io::Error>, Elapsed>, Bytes) {
1088 trace_init();
1089
1090 let (sender, recv) = SourceSender::new_test_finalize(status);
1091 let address = next_addr();
1092 let source = FluentConfig {
1093 mode: FluentMode::Tcp(FluentTcpConfig {
1094 address: address.into(),
1095 tls: None,
1096 keepalive: None,
1097 permit_origin: None,
1098 receive_buffer_bytes: None,
1099 acknowledgements: true.into(),
1100 connection_limit: None,
1101 }),
1102 log_namespace: None,
1103 }
1104 .build(SourceContext::new_test(sender, None))
1105 .await
1106 .unwrap();
1107 tokio::spawn(source);
1108 wait_for_tcp(address).await;
1109
1110 let msg = uuid::Uuid::new_v4().to_string();
1111 let tag = uuid::Uuid::new_v4().to_string();
1112 let req = build_req(&tag, &[("field", &msg)], with_chunk);
1113
1114 let sender = tokio::spawn(async move {
1115 let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
1116 socket.write_all(&req).await.unwrap();
1117
1118 let mut output = BytesMut::new();
1119 (
1120 timeout(Duration::from_millis(250), socket.read_buf(&mut output)).await,
1121 output,
1122 )
1123 });
1124 let events = test_util::collect_n(recv, 1).await;
1125 let (result, output) = sender.await.unwrap();
1126
1127 assert_eq!(events.len(), 1);
1128 let log = events[0].as_log();
1129 assert_eq!(log.get("field").unwrap(), &msg.into());
1130 assert!(matches!(log.get("host").unwrap(), Value::Bytes(_)));
1131 assert!(matches!(log.get("timestamp").unwrap(), Value::Timestamp(_)));
1132 assert_eq!(log.get("tag").unwrap(), &tag.into());
1133
1134 (result, output.into())
1135 }
1136
1137 fn build_req(tag: &str, fields: &[(&str, &str)], with_chunk: bool) -> Vec<u8> {
1138 let mut record = FluentRecord::default();
1139 for (tag, value) in fields {
1140 record.insert((*tag).into(), rmpv::Value::String((*value).into()).into());
1141 }
1142 let chunk = with_chunk.then(|| BASE64_STANDARD.encode(uuid::Uuid::new_v4().as_bytes()));
1143 let req = FluentMessage::MessageWithOptions(
1144 tag.into(),
1145 FluentTimestamp::Unix(Utc::now()),
1146 record,
1147 FluentMessageOptions {
1148 chunk,
1149 ..Default::default()
1150 },
1151 );
1152 let mut buf = Vec::new();
1153 req.serialize(&mut Serializer::new(&mut buf)).unwrap();
1154 buf
1155 }
1156
1157 #[test]
1158 fn output_schema_definition_vector_namespace() {
1159 let config = FluentConfig {
1160 mode: FluentMode::Tcp(FluentTcpConfig {
1161 address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1162 tls: None,
1163 keepalive: None,
1164 permit_origin: None,
1165 receive_buffer_bytes: None,
1166 acknowledgements: false.into(),
1167 connection_limit: None,
1168 }),
1169 log_namespace: Some(true),
1170 };
1171
1172 let definitions = config
1173 .outputs(LogNamespace::Vector)
1174 .remove(0)
1175 .schema_definition(true);
1176
1177 let expected_definition =
1178 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1179 .with_meaning(OwnedTargetPath::event_root(), "message")
1180 .with_metadata_field(
1181 &owned_value_path!("vector", "source_type"),
1182 Kind::bytes(),
1183 None,
1184 )
1185 .with_metadata_field(&owned_value_path!("fluent", "tag"), Kind::bytes(), None)
1186 .with_metadata_field(
1187 &owned_value_path!("fluent", "timestamp"),
1188 Kind::timestamp(),
1189 Some("timestamp"),
1190 )
1191 .with_metadata_field(
1192 &owned_value_path!("fluent", "record"),
1193 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1194 None,
1195 )
1196 .with_metadata_field(
1197 &owned_value_path!("vector", "ingest_timestamp"),
1198 Kind::timestamp(),
1199 None,
1200 )
1201 .with_metadata_field(
1202 &owned_value_path!("fluent", "host"),
1203 Kind::bytes(),
1204 Some("host"),
1205 )
1206 .with_metadata_field(
1207 &owned_value_path!("fluent", "tls_client_metadata"),
1208 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1209 None,
1210 );
1211
1212 assert_eq!(definitions, Some(expected_definition))
1213 }
1214
1215 #[test]
1216 fn output_schema_definition_legacy_namespace() {
1217 let config = FluentConfig {
1218 mode: FluentMode::Tcp(FluentTcpConfig {
1219 address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1220 tls: None,
1221 keepalive: None,
1222 permit_origin: None,
1223 receive_buffer_bytes: None,
1224 acknowledgements: false.into(),
1225 connection_limit: None,
1226 }),
1227 log_namespace: None,
1228 };
1229
1230 let definitions = config
1231 .outputs(LogNamespace::Legacy)
1232 .remove(0)
1233 .schema_definition(true);
1234
1235 let expected_definition = Definition::new_with_default_metadata(
1236 Kind::object(Collection::empty()),
1237 [LogNamespace::Legacy],
1238 )
1239 .with_event_field(
1240 &owned_value_path!("message"),
1241 Kind::bytes(),
1242 Some("message"),
1243 )
1244 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1245 .with_event_field(&owned_value_path!("tag"), Kind::bytes(), None)
1246 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1247 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
1248 .unknown_fields(Kind::bytes());
1249
1250 assert_eq!(definitions, Some(expected_definition))
1251 }
1252}
1253
1254#[cfg(all(test, feature = "fluent-integration-tests"))]
1255mod integration_tests {
1256 use std::{fs::File, io::Write, net::SocketAddr, time::Duration};
1257
1258 use futures::Stream;
1259 use tokio::time::sleep;
1260 use vector_lib::event::{Event, EventStatus};
1261
1262 use crate::sources::fluent::{FluentMode, FluentTcpConfig};
1263 use crate::{
1264 config::{SourceConfig, SourceContext},
1265 docker::Container,
1266 sources::fluent::FluentConfig,
1267 test_util::{
1268 collect_ready,
1269 components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
1270 next_addr, next_addr_for_ip, random_string, wait_for_tcp,
1271 },
1272 SourceSender,
1273 };
1274
1275 const FLUENT_BIT_IMAGE: &str = "fluent/fluent-bit";
1276 const FLUENT_BIT_TAG: &str = "1.7";
1277 const FLUENTD_IMAGE: &str = "fluent/fluentd";
1278 const FLUENTD_TAG: &str = "v1.12";
1279
1280 fn make_file(name: &str, content: &str) -> tempfile::TempDir {
1281 let dir = tempfile::tempdir().unwrap();
1282 let mut file = File::create(dir.path().join(name)).unwrap();
1283 write!(&mut file, "{content}").unwrap();
1284 dir
1285 }
1286
1287 #[tokio::test]
1288 async fn fluentbit() {
1289 test_fluentbit(EventStatus::Delivered).await;
1290 }
1291
1292 #[tokio::test]
1293 async fn fluentbit_rejection() {
1294 test_fluentbit(EventStatus::Rejected).await;
1295 }
1296
1297 async fn test_fluentbit(status: EventStatus) {
1298 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1299 let test_address = next_addr();
1300 let (out, source_address) = source(status).await;
1301
1302 let dir = make_file(
1303 "fluent-bit.conf",
1304 &format!(
1305 r#"
1306[SERVICE]
1307 Grace 0
1308 Flush 1
1309 Daemon off
1310
1311[INPUT]
1312 Name http
1313 Host {listen_host}
1314 Port {listen_port}
1315
1316[OUTPUT]
1317 Name forward
1318 Match *
1319 Host host.docker.internal
1320 Port {send_port}
1321 Require_ack_response true
1322 "#,
1323 listen_host = test_address.ip(),
1324 listen_port = test_address.port(),
1325 send_port = source_address.port(),
1326 ),
1327 );
1328
1329 let msg = random_string(64);
1330 let body = serde_json::json!({ "message": msg });
1331
1332 let events = Container::new(FLUENT_BIT_IMAGE, FLUENT_BIT_TAG)
1333 .bind(dir.path().display(), "/fluent-bit/etc")
1334 .run(async move {
1335 wait_for_tcp(test_address).await;
1336 reqwest::Client::new()
1337 .post(format!("http://{test_address}/"))
1338 .header("content-type", "application/json")
1339 .body(body.to_string())
1340 .send()
1341 .await
1342 .unwrap();
1343 sleep(Duration::from_secs(2)).await;
1344
1345 collect_ready(out).await
1346 })
1347 .await;
1348
1349 assert_eq!(events.len(), 1);
1350 let log = events[0].as_log();
1351 assert_eq!(log["tag"], "http.0".into());
1352 assert_eq!(log["message"], msg.into());
1353 assert!(log.get("timestamp").is_some());
1354 assert!(log.get("host").is_some());
1355 })
1356 .await;
1357 }
1358
1359 #[tokio::test]
1360 async fn fluentd() {
1361 test_fluentd(EventStatus::Delivered, "").await;
1362 }
1363
1364 #[tokio::test]
1365 async fn fluentd_gzip() {
1366 test_fluentd(EventStatus::Delivered, "compress gzip").await;
1367 }
1368
1369 #[tokio::test]
1370 async fn fluentd_rejection() {
1371 test_fluentd(EventStatus::Rejected, "").await;
1372 }
1373
1374 async fn test_fluentd(status: EventStatus, options: &str) {
1375 assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1376 let test_address = next_addr();
1377 let (out, source_address) = source(status).await;
1378
1379 let config = format!(
1380 r#"
1381<source>
1382 @type http
1383 bind {http_host}
1384 port {http_port}
1385</source>
1386
1387<match *>
1388 @type forward
1389 <server>
1390 name local
1391 host host.docker.internal
1392 port {port}
1393 </server>
1394 <buffer>
1395 flush_mode immediate
1396 </buffer>
1397 require_ack_response true
1398 ack_response_timeout 1
1399 {options}
1400</match>
1401"#,
1402 http_host = test_address.ip(),
1403 http_port = test_address.port(),
1404 port = source_address.port(),
1405 options = options
1406 );
1407
1408 let dir = make_file("fluent.conf", &config);
1409
1410 let msg = random_string(64);
1411 let body = serde_json::json!({ "message": msg });
1412
1413 let events = Container::new(FLUENTD_IMAGE, FLUENTD_TAG)
1414 .bind(dir.path().display(), "/fluentd/etc")
1415 .run(async move {
1416 wait_for_tcp(test_address).await;
1417 reqwest::Client::new()
1418 .post(format!("http://{test_address}/"))
1419 .header("content-type", "application/json")
1420 .body(body.to_string())
1421 .send()
1422 .await
1423 .unwrap();
1424 sleep(Duration::from_secs(2)).await;
1425 collect_ready(out).await
1426 })
1427 .await;
1428
1429 assert_eq!(events.len(), 1);
1430 assert_eq!(events[0].as_log()["tag"], "".into());
1431 assert_eq!(events[0].as_log()["message"], msg.into());
1432 assert!(events[0].as_log().get("timestamp").is_some());
1433 assert!(events[0].as_log().get("host").is_some());
1434 })
1435 .await;
1436 }
1437
1438 async fn source(status: EventStatus) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
1439 let (sender, recv) = SourceSender::new_test_finalize(status);
1440 let address = next_addr_for_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
1441 tokio::spawn(async move {
1442 FluentConfig {
1443 mode: FluentMode::Tcp(FluentTcpConfig {
1444 address: address.into(),
1445 tls: None,
1446 keepalive: None,
1447 permit_origin: None,
1448 receive_buffer_bytes: None,
1449 acknowledgements: false.into(),
1450 connection_limit: None,
1451 }),
1452 log_namespace: None,
1453 }
1454 .build(SourceContext::new_test(sender, None))
1455 .await
1456 .unwrap()
1457 .await
1458 .unwrap()
1459 });
1460 wait_for_tcp(address).await;
1461 (recv, address)
1462 }
1463}