vector/sources/fluent/
mod.rs

1use std::collections::HashMap;
2use std::io::{self, Read};
3use std::net::SocketAddr;
4use std::time::Duration;
5
6use base64::prelude::{Engine as _, BASE64_STANDARD};
7use bytes::{Buf, Bytes, BytesMut};
8use chrono::Utc;
9use flate2::read::MultiGzDecoder;
10use rmp_serde::{decode, Deserializer, Serializer};
11use serde::{Deserialize, Serialize};
12use smallvec::{smallvec, SmallVec};
13use tokio_util::codec::Decoder;
14use vector_lib::codecs::{BytesDeserializerConfig, StreamDecodingError};
15use vector_lib::config::{LegacyKey, LogNamespace};
16use vector_lib::configurable::configurable_component;
17use vector_lib::ipallowlist::IpAllowlistConfig;
18use vector_lib::lookup::lookup_v2::parse_value_path;
19use vector_lib::lookup::{metadata_path, owned_value_path, path, OwnedValuePath};
20use vector_lib::schema::Definition;
21use vrl::value::kind::Collection;
22use vrl::value::{Kind, Value};
23
24use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
25use crate::{
26    config::{
27        log_schema, DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
28        SourceContext, SourceOutput,
29    },
30    event::{Event, LogEvent},
31    internal_events::{FluentMessageDecodeError, FluentMessageReceived},
32    serde::bool_or_struct,
33    tcp::TcpKeepaliveConfig,
34    tls::{MaybeTlsSettings, TlsSourceConfig},
35};
36
37mod message;
38use self::message::{FluentEntry, FluentMessage, FluentRecord, FluentTag, FluentTimestamp};
39
40/// Configuration for the `fluent` source.
41#[configurable_component(source("fluent", "Collect logs from a Fluentd or Fluent Bit agent."))]
42#[derive(Clone, Debug)]
43pub struct FluentConfig {
44    #[serde(flatten)]
45    mode: FluentMode,
46
47    /// The namespace to use for logs. This overrides the global setting.
48    #[configurable(metadata(docs::hidden))]
49    #[serde(default)]
50    log_namespace: Option<bool>,
51}
52
53/// Listening mode for the `fluent` source.
54#[configurable_component(no_deser)]
55#[derive(Clone, Debug)]
56#[serde(tag = "mode", rename_all = "snake_case")]
57#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
58#[allow(clippy::large_enum_variant)] // just used for configuration
59pub enum FluentMode {
60    /// Listen on TCP port
61    Tcp(FluentTcpConfig),
62
63    /// Listen on unix stream socket
64    #[cfg(unix)]
65    Unix(FluentUnixConfig),
66}
67
68/// Serde doesn't provide a way to specify a default tagged variant when deserializing
69/// So we use a somewhat arcane setup with an untagged and tagged versions to allow
70/// users to not have to specify mode = tcp
71///
72/// See [serde-rs/serde#2231](https://github.com/serde-rs/serde/issues/2231)
73mod deser {
74    use super::*;
75
76    #[allow(clippy::large_enum_variant)]
77    #[derive(Deserialize)]
78    #[serde(tag = "mode")]
79    enum FluentModeTagged {
80        #[serde(rename = "tcp")]
81        Tcp(FluentTcpConfig),
82
83        #[cfg(unix)]
84        #[serde(rename = "unix")]
85        Unix(FluentUnixConfig),
86    }
87
88    #[derive(Deserialize)]
89    #[serde(untagged)]
90    enum FluentModeDe {
91        Tagged(FluentModeTagged),
92
93        // Note: this must be last as serde attempts variants in order
94        Untagged(FluentTcpConfig),
95    }
96
97    impl<'de> Deserialize<'de> for FluentMode {
98        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
99        where
100            D: serde::Deserializer<'de>,
101        {
102            Ok(match FluentModeDe::deserialize(deserializer)? {
103                FluentModeDe::Tagged(FluentModeTagged::Tcp(config)) => FluentMode::Tcp(config),
104                #[cfg(unix)]
105                FluentModeDe::Tagged(FluentModeTagged::Unix(config)) => FluentMode::Unix(config),
106                FluentModeDe::Untagged(config) => FluentMode::Tcp(config),
107            })
108        }
109    }
110
111    #[cfg(test)]
112    mod tests {
113        use super::*;
114
115        #[test]
116        fn test_tcp_default_mode() {
117            let json_data = serde_json::json!({
118                "address": "0.0.0.0:2020",
119                "connection_limit": 2
120            });
121
122            let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
123            assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
124        }
125
126        #[test]
127        fn test_tcp_explicit_mode() {
128            let json_data = serde_json::json!({
129                "mode": "tcp",
130                "address": "0.0.0.0:2020",
131                "connection_limit": 2
132            });
133
134            let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
135            assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
136        }
137
138        #[test]
139        fn test_invalid_unix_mode() {
140            let json_data = serde_json::json!({
141                "mode": "unix",
142                "address": "0.0.0.0:2020",
143                "connection_limit": 2
144            });
145
146            assert!(serde_json::from_value::<FluentConfig>(json_data).is_err());
147        }
148
149        #[cfg(unix)]
150        #[test]
151        fn test_valid_unix_mode() {
152            let json_data = serde_json::json!({
153                "mode": "unix",
154                "path": "/foo"
155            });
156
157            let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
158            assert!(
159                matches!(parsed.mode, FluentMode::Unix(c) if c.path.to_string_lossy() == "/foo")
160            );
161        }
162    }
163}
164
165/// Configuration for the `fluent` TCP source.
166#[configurable_component]
167#[derive(Clone, Debug)]
168#[serde(deny_unknown_fields)]
169pub struct FluentTcpConfig {
170    #[configurable(derived)]
171    address: SocketListenAddr,
172
173    /// The maximum number of TCP connections that are allowed at any given time.
174    #[configurable(metadata(docs::type_unit = "connections"))]
175    connection_limit: Option<u32>,
176
177    #[configurable(derived)]
178    keepalive: Option<TcpKeepaliveConfig>,
179
180    #[configurable(derived)]
181    pub permit_origin: Option<IpAllowlistConfig>,
182
183    /// The size of the receive buffer used for each connection.
184    ///
185    /// This generally should not need to be changed.
186    #[configurable(metadata(docs::type_unit = "bytes"))]
187    #[configurable(metadata(docs::examples = 65536))]
188    receive_buffer_bytes: Option<usize>,
189
190    #[configurable(derived)]
191    tls: Option<TlsSourceConfig>,
192
193    #[configurable(derived)]
194    #[serde(default, deserialize_with = "bool_or_struct")]
195    acknowledgements: SourceAcknowledgementsConfig,
196}
197
198impl FluentTcpConfig {
199    fn build(
200        &self,
201        cx: SourceContext,
202        log_namespace: LogNamespace,
203    ) -> crate::Result<super::Source> {
204        let source = FluentSource::new(log_namespace);
205        let shutdown_secs = Duration::from_secs(30);
206        let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
207        let tls_client_metadata_key = self
208            .tls
209            .as_ref()
210            .and_then(|tls| tls.client_metadata_key.clone())
211            .and_then(|k| k.path);
212        let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
213        source.run(
214            self.address,
215            self.keepalive,
216            shutdown_secs,
217            tls,
218            tls_client_metadata_key,
219            self.receive_buffer_bytes,
220            None,
221            cx,
222            self.acknowledgements,
223            self.connection_limit,
224            self.permit_origin.clone().map(Into::into),
225            FluentConfig::NAME,
226            log_namespace,
227        )
228    }
229}
230
231/// Configuration for the `fluent` unix socket source.
232#[configurable_component]
233#[derive(Clone, Debug)]
234#[serde(deny_unknown_fields)]
235#[cfg(unix)]
236pub struct FluentUnixConfig {
237    /// The Unix socket path.
238    ///
239    /// This should be an absolute path.
240    #[configurable(metadata(docs::examples = "/path/to/socket"))]
241    pub path: std::path::PathBuf,
242
243    /// Unix file mode bits to be applied to the unix socket file as its designated file permissions.
244    ///
245    /// Note: The file mode value can be specified in any numeric format supported by your configuration
246    /// language, but it is most intuitive to use an octal number.
247    #[configurable(metadata(docs::examples = 0o777))]
248    #[configurable(metadata(docs::examples = 0o600))]
249    #[configurable(metadata(docs::examples = 508))]
250    pub socket_file_mode: Option<u32>,
251}
252
253#[cfg(unix)]
254impl FluentUnixConfig {
255    fn build(
256        &self,
257        cx: SourceContext,
258        log_namespace: LogNamespace,
259    ) -> crate::Result<super::Source> {
260        let source = FluentSource::new(log_namespace);
261
262        crate::sources::util::build_unix_stream_source(
263            self.path.clone(),
264            self.socket_file_mode,
265            source.decoder(),
266            move |events, host| source.handle_events_impl(events, host.into()),
267            cx.shutdown,
268            cx.out,
269        )
270    }
271}
272
273impl GenerateConfig for FluentConfig {
274    fn generate_config() -> toml::Value {
275        toml::Value::try_from(Self {
276            mode: FluentMode::Tcp(FluentTcpConfig {
277                address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
278                keepalive: None,
279                permit_origin: None,
280                tls: None,
281                receive_buffer_bytes: None,
282                acknowledgements: Default::default(),
283                connection_limit: Some(2),
284            }),
285            log_namespace: None,
286        })
287        .unwrap()
288    }
289}
290
291#[async_trait::async_trait]
292#[typetag::serde(name = "fluent")]
293impl SourceConfig for FluentConfig {
294    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
295        let log_namespace = cx.log_namespace(self.log_namespace);
296        match &self.mode {
297            FluentMode::Tcp(t) => t.build(cx, log_namespace),
298            #[cfg(unix)]
299            FluentMode::Unix(u) => u.build(cx, log_namespace),
300        }
301    }
302
303    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
304        let log_namespace = global_log_namespace.merge(self.log_namespace);
305        let schema_definition = self.schema_definition(log_namespace);
306
307        vec![SourceOutput::new_maybe_logs(
308            DataType::Log,
309            schema_definition,
310        )]
311    }
312
313    fn resources(&self) -> Vec<Resource> {
314        match &self.mode {
315            FluentMode::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
316            #[cfg(unix)]
317            FluentMode::Unix(_) => vec![],
318        }
319    }
320
321    fn can_acknowledge(&self) -> bool {
322        matches!(self.mode, FluentMode::Tcp(_))
323    }
324}
325
326impl FluentConfig {
327    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
328    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
329        // `host_key` is only inserted if not present already.
330        let host_key = log_schema()
331            .host_key()
332            .cloned()
333            .map(LegacyKey::InsertIfEmpty);
334
335        let tag_key = parse_value_path("tag").ok().map(LegacyKey::Overwrite);
336
337        let tls_client_metadata_path = match &self.mode {
338            FluentMode::Tcp(tcp) => tcp
339                .tls
340                .as_ref()
341                .and_then(|tls| tls.client_metadata_key.as_ref())
342                .and_then(|k| k.path.clone())
343                .map(LegacyKey::Overwrite),
344            #[cfg(unix)]
345            FluentMode::Unix(_) => None,
346        };
347
348        // There is a global and per-source `log_namespace` config.
349        // The source config overrides the global setting and is merged here.
350        let mut schema_definition = BytesDeserializerConfig
351            .schema_definition(log_namespace)
352            .with_standard_vector_source_metadata()
353            .with_source_metadata(
354                FluentConfig::NAME,
355                host_key,
356                &owned_value_path!("host"),
357                Kind::bytes(),
358                Some("host"),
359            )
360            .with_source_metadata(
361                FluentConfig::NAME,
362                tag_key,
363                &owned_value_path!("tag"),
364                Kind::bytes(),
365                None,
366            )
367            .with_source_metadata(
368                FluentConfig::NAME,
369                None,
370                &owned_value_path!("timestamp"),
371                Kind::timestamp(),
372                Some("timestamp"),
373            )
374            // for metadata that is added to the events dynamically from the FluentRecord
375            .with_source_metadata(
376                FluentConfig::NAME,
377                None,
378                &owned_value_path!("record"),
379                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
380                None,
381            )
382            .with_source_metadata(
383                Self::NAME,
384                tls_client_metadata_path,
385                &owned_value_path!("tls_client_metadata"),
386                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
387                None,
388            );
389
390        // for metadata that is added to the events dynamically
391        if log_namespace == LogNamespace::Legacy {
392            schema_definition = schema_definition.unknown_fields(Kind::bytes());
393        }
394
395        schema_definition
396    }
397}
398
399#[derive(Debug, Clone)]
400struct FluentSource {
401    log_namespace: LogNamespace,
402    legacy_host_key_path: Option<OwnedValuePath>,
403}
404
405impl FluentSource {
406    fn new(log_namespace: LogNamespace) -> Self {
407        Self {
408            log_namespace,
409            legacy_host_key_path: log_schema().host_key().cloned(),
410        }
411    }
412
413    fn handle_events_impl(&self, events: &mut [Event], host: Value) {
414        for event in events {
415            let log = event.as_mut_log();
416
417            let legacy_host_key = self
418                .legacy_host_key_path
419                .as_ref()
420                .map(LegacyKey::InsertIfEmpty);
421
422            self.log_namespace.insert_source_metadata(
423                FluentConfig::NAME,
424                log,
425                legacy_host_key,
426                path!("host"),
427                host.clone(),
428            );
429        }
430    }
431}
432
433impl TcpSource for FluentSource {
434    type Error = DecodeError;
435    type Item = FluentFrame;
436    type Decoder = FluentDecoder;
437    type Acker = FluentAcker;
438
439    fn decoder(&self) -> Self::Decoder {
440        FluentDecoder::new(self.log_namespace)
441    }
442
443    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
444        self.handle_events_impl(events, host.ip().to_string().into())
445    }
446
447    fn build_acker(&self, frame: &[Self::Item]) -> Self::Acker {
448        FluentAcker::new(frame)
449    }
450}
451
452#[derive(Debug)]
453pub enum DecodeError {
454    IO(io::Error),
455    Decode(decode::Error),
456    UnknownCompression(String),
457    UnexpectedValue(rmpv::Value),
458}
459
460impl std::fmt::Display for DecodeError {
461    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
462        match self {
463            DecodeError::IO(err) => write!(f, "{err}"),
464            DecodeError::Decode(err) => write!(f, "{err}"),
465            DecodeError::UnknownCompression(compression) => {
466                write!(f, "unknown compression: {compression}")
467            }
468            DecodeError::UnexpectedValue(value) => {
469                write!(f, "unexpected msgpack value, ignoring: {value}")
470            }
471        }
472    }
473}
474
475impl StreamDecodingError for DecodeError {
476    fn can_continue(&self) -> bool {
477        match self {
478            DecodeError::IO(_) => false,
479            DecodeError::Decode(_) => true,
480            DecodeError::UnknownCompression(_) => true,
481            DecodeError::UnexpectedValue(_) => true,
482        }
483    }
484}
485
486impl From<io::Error> for DecodeError {
487    fn from(e: io::Error) -> Self {
488        DecodeError::IO(e)
489    }
490}
491
492impl From<decode::Error> for DecodeError {
493    fn from(e: decode::Error) -> Self {
494        DecodeError::Decode(e)
495    }
496}
497
498#[derive(Debug, Clone)]
499struct FluentDecoder {
500    log_namespace: LogNamespace,
501}
502
503impl FluentDecoder {
504    const fn new(log_namespace: LogNamespace) -> Self {
505        Self { log_namespace }
506    }
507
508    fn handle_message(
509        &mut self,
510        message: Result<FluentMessage, DecodeError>,
511        byte_size: usize,
512    ) -> Result<Option<(FluentFrame, usize)>, DecodeError> {
513        let log_namespace = &self.log_namespace;
514
515        match message? {
516            FluentMessage::Message(tag, timestamp, record) => {
517                let event = Event::from(FluentEvent {
518                    tag,
519                    timestamp,
520                    record,
521                    log_namespace,
522                });
523                let frame = FluentFrame {
524                    events: smallvec![event],
525                    chunk: None,
526                };
527                Ok(Some((frame, byte_size)))
528            }
529            FluentMessage::MessageWithOptions(tag, timestamp, record, options) => {
530                let event = Event::from(FluentEvent {
531                    tag,
532                    timestamp,
533                    record,
534                    log_namespace,
535                });
536                let frame = FluentFrame {
537                    events: smallvec![event],
538                    chunk: options.chunk,
539                };
540                Ok(Some((frame, byte_size)))
541            }
542            FluentMessage::Forward(tag, entries) => {
543                let events = entries
544                    .into_iter()
545                    .map(|FluentEntry(timestamp, record)| {
546                        Event::from(FluentEvent {
547                            tag: tag.clone(),
548                            timestamp,
549                            record,
550                            log_namespace,
551                        })
552                    })
553                    .collect();
554                let frame = FluentFrame {
555                    events,
556                    chunk: None,
557                };
558                Ok(Some((frame, byte_size)))
559            }
560            FluentMessage::ForwardWithOptions(tag, entries, options) => {
561                let events = entries
562                    .into_iter()
563                    .map(|FluentEntry(timestamp, record)| {
564                        Event::from(FluentEvent {
565                            tag: tag.clone(),
566                            timestamp,
567                            record,
568                            log_namespace,
569                        })
570                    })
571                    .collect();
572                let frame = FluentFrame {
573                    events,
574                    chunk: options.chunk,
575                };
576                Ok(Some((frame, byte_size)))
577            }
578            FluentMessage::PackedForward(tag, bin) => {
579                let mut buf = BytesMut::from(&bin[..]);
580
581                let mut events = smallvec![];
582                while let Some(FluentEntry(timestamp, record)) =
583                    FluentEntryStreamDecoder.decode(&mut buf)?
584                {
585                    events.push(Event::from(FluentEvent {
586                        tag: tag.clone(),
587                        timestamp,
588                        record,
589                        log_namespace,
590                    }));
591                }
592                let frame = FluentFrame {
593                    events,
594                    chunk: None,
595                };
596                Ok(Some((frame, byte_size)))
597            }
598            FluentMessage::PackedForwardWithOptions(tag, bin, options) => {
599                let buf = match options.compressed.as_deref() {
600                    Some("gzip") => {
601                        let mut buf = Vec::new();
602                        MultiGzDecoder::new(io::Cursor::new(bin.into_vec()))
603                            .read_to_end(&mut buf)
604                            .map(|_| buf)
605                            .map_err(Into::into)
606                    }
607                    Some("text") | None => Ok(bin.into_vec()),
608                    Some(s) => Err(DecodeError::UnknownCompression(s.to_owned())),
609                }?;
610
611                let mut buf = BytesMut::from(&buf[..]);
612
613                let mut events = smallvec![];
614                while let Some(FluentEntry(timestamp, record)) =
615                    FluentEntryStreamDecoder.decode(&mut buf)?
616                {
617                    events.push(Event::from(FluentEvent {
618                        tag: tag.clone(),
619                        timestamp,
620                        record,
621                        log_namespace,
622                    }));
623                }
624                let frame = FluentFrame {
625                    events,
626                    chunk: options.chunk,
627                };
628                Ok(Some((frame, byte_size)))
629            }
630            FluentMessage::Heartbeat(rmpv::Value::Nil) => Ok(None),
631            FluentMessage::Heartbeat(value) => Err(DecodeError::UnexpectedValue(value)),
632        }
633    }
634}
635
636impl Decoder for FluentDecoder {
637    type Item = (FluentFrame, usize);
638    type Error = DecodeError;
639
640    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
641        loop {
642            if src.is_empty() {
643                return Ok(None);
644            }
645
646            let (byte_size, res) = {
647                let mut des = Deserializer::new(io::Cursor::new(&src[..]));
648
649                let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
650
651                // check for unexpected EOF to indicate that we need more data
652                if let Err(DecodeError::Decode(
653                    decode::Error::InvalidDataRead(ref custom)
654                    | decode::Error::InvalidMarkerRead(ref custom),
655                )) = res
656                {
657                    if custom.kind() == io::ErrorKind::UnexpectedEof {
658                        return Ok(None);
659                    }
660                }
661
662                (des.position() as usize, res)
663            };
664
665            src.advance(byte_size);
666
667            let maybe_item = self.handle_message(res, byte_size).inspect_err(|error| {
668                let base64_encoded_message = BASE64_STANDARD.encode(&src[..]);
669                emit!(FluentMessageDecodeError {
670                    error,
671                    base64_encoded_message
672                });
673            })?;
674            if let Some(item) = maybe_item {
675                return Ok(Some(item));
676            }
677        }
678    }
679}
680
681/// Decoder for decoding MessagePackEventStream which are just a stream of Entries
682#[derive(Clone, Debug)]
683struct FluentEntryStreamDecoder;
684
685impl Decoder for FluentEntryStreamDecoder {
686    type Item = FluentEntry;
687    type Error = DecodeError;
688
689    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
690        if src.is_empty() {
691            return Ok(None);
692        }
693        let (byte_size, res) = {
694            let mut des = Deserializer::new(io::Cursor::new(&src[..]));
695
696            // attempt to parse, if we get unexpected EOF, we need more data
697            let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
698
699            if let Err(DecodeError::Decode(decode::Error::InvalidDataRead(ref custom))) = res {
700                if custom.kind() == io::ErrorKind::UnexpectedEof {
701                    return Ok(None);
702                }
703            }
704
705            let byte_size = des.position();
706
707            emit!(FluentMessageReceived { byte_size });
708
709            (byte_size as usize, res)
710        };
711
712        src.advance(byte_size);
713
714        res
715    }
716}
717
718struct FluentAcker {
719    chunks: Vec<String>,
720}
721
722impl FluentAcker {
723    fn new(frames: &[FluentFrame]) -> Self {
724        Self {
725            chunks: frames.iter().filter_map(|f| f.chunk.clone()).collect(),
726        }
727    }
728}
729
730impl TcpSourceAcker for FluentAcker {
731    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
732        if self.chunks.is_empty() {
733            return None;
734        }
735
736        let mut buf = Vec::new();
737        let mut ser = Serializer::new(&mut buf);
738        let mut ack_map = HashMap::new();
739
740        for chunk in self.chunks {
741            ack_map.clear();
742            if let TcpSourceAck::Ack = ack {
743                ack_map.insert("ack", chunk);
744            };
745            ack_map.serialize(&mut ser).unwrap();
746        }
747        Some(buf.into())
748    }
749}
750
751/// Normalized fluent message.
752#[derive(Debug, PartialEq)]
753struct FluentEvent<'a> {
754    tag: FluentTag,
755    timestamp: FluentTimestamp,
756    record: FluentRecord,
757    log_namespace: &'a LogNamespace,
758}
759
760impl From<FluentEvent<'_>> for Event {
761    fn from(frame: FluentEvent) -> Event {
762        LogEvent::from(frame).into()
763    }
764}
765
766struct FluentFrame {
767    events: SmallVec<[Event; 1]>,
768    chunk: Option<String>,
769}
770
771impl From<FluentFrame> for SmallVec<[Event; 1]> {
772    fn from(frame: FluentFrame) -> Self {
773        frame.events
774    }
775}
776
777impl From<FluentEvent<'_>> for LogEvent {
778    fn from(frame: FluentEvent) -> LogEvent {
779        let FluentEvent {
780            tag,
781            timestamp,
782            record,
783            log_namespace,
784        } = frame;
785
786        let mut log = LogEvent::default();
787
788        log_namespace.insert_vector_metadata(
789            &mut log,
790            log_schema().source_type_key(),
791            path!("source_type"),
792            Bytes::from_static(FluentConfig::NAME.as_bytes()),
793        );
794
795        match log_namespace {
796            LogNamespace::Vector => {
797                log.insert(metadata_path!(FluentConfig::NAME, "timestamp"), timestamp);
798                log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
799            }
800            LogNamespace::Legacy => {
801                log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
802            }
803        }
804
805        log_namespace.insert_source_metadata(
806            FluentConfig::NAME,
807            &mut log,
808            Some(LegacyKey::Overwrite(path!("tag"))),
809            path!("tag"),
810            tag,
811        );
812
813        for (key, value) in record.into_iter() {
814            let value: Value = value.into();
815            log_namespace.insert_source_metadata(
816                FluentConfig::NAME,
817                &mut log,
818                Some(LegacyKey::Overwrite(path!(key.as_str()))),
819                path!("record", key.as_str()),
820                value,
821            );
822        }
823        log
824    }
825}
826
827#[cfg(test)]
828mod tests {
829    use bytes::BytesMut;
830    use chrono::{DateTime, Utc};
831    use rmp_serde::Serializer;
832    use serde::Serialize;
833    use tokio::{
834        io::{AsyncReadExt, AsyncWriteExt},
835        time::{error::Elapsed, timeout, Duration},
836    };
837    use tokio_util::codec::Decoder;
838    use vector_lib::assert_event_data_eq;
839    use vector_lib::lookup::OwnedTargetPath;
840    use vector_lib::schema::Definition;
841    use vrl::value::{kind::Collection, ObjectMap, Value};
842
843    use super::{message::FluentMessageOptions, *};
844    use crate::{
845        config::{SourceConfig, SourceContext},
846        event::EventStatus,
847        test_util::{self, next_addr, trace_init, wait_for_tcp},
848        SourceSender,
849    };
850
851    #[test]
852    fn generate_config() {
853        crate::test_util::test_generate_config::<FluentConfig>();
854    }
855
856    // useful references for msgpack:
857    // Spec: https://github.com/msgpack/msgpack/blob/master/spec.md
858    // Encode to array of bytes: https://kawanet.github.io/msgpack-lite/
859    // Decode base64: https://toolslick.com/conversion/data/messagepack-to-json
860
861    fn mock_event(name: &str, timestamp: &str) -> Event {
862        Event::Log(LogEvent::from(ObjectMap::from([
863            ("message".into(), Value::from(name)),
864            (
865                log_schema().source_type_key().unwrap().to_string().into(),
866                Value::from(FluentConfig::NAME),
867            ),
868            ("tag".into(), Value::from("tag.name")),
869            (
870                "timestamp".into(),
871                Value::Timestamp(DateTime::parse_from_rfc3339(timestamp).unwrap().into()),
872            ),
873        ])))
874    }
875
876    #[test]
877    fn decode_message_mode() {
878        //[
879        //  "tag.name",
880        //  1441588984,
881        //  {"message": "bar"},
882        //]
883        let message: Vec<u8> = vec![
884            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
885            101, 115, 115, 97, 103, 101, 163, 98, 97, 114,
886        ];
887
888        let expected = mock_event("bar", "2015-09-07T01:23:04Z");
889        let got = decode_all(message.clone()).unwrap();
890        assert_event_data_eq!(got.0[0], expected);
891        assert_eq!(got.1, message.len());
892    }
893
894    #[test]
895    fn decode_message_mode_with_options() {
896        //[
897        //  "tag.name",
898        //   1441588984,
899        //   { "message": "bar" },
900        //   { "size": 1 }
901        //]
902        let message: Vec<u8> = vec![
903            148, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
904            101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 129, 164, 115, 105, 122, 101, 1,
905        ];
906
907        let expected = mock_event("bar", "2015-09-07T01:23:04Z");
908        let got = decode_all(message.clone()).unwrap();
909        assert_eq!(got.1, message.len());
910        assert_event_data_eq!(got.0[0], expected);
911    }
912
913    #[test]
914    fn decode_forward_mode() {
915        //[
916        //    "tag.name",
917        //    [
918        //        [1441588984, {"message": "foo"}],
919        //        [1441588985, {"message": "bar"}],
920        //        [1441588986, {"message": "baz"}]
921        //    ]
922        //]
923        let message: Vec<u8> = vec![
924            146, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
925            167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
926            129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
927            250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122,
928        ];
929
930        let expected = vec![
931            mock_event("foo", "2015-09-07T01:23:04Z"),
932            mock_event("bar", "2015-09-07T01:23:05Z"),
933            mock_event("baz", "2015-09-07T01:23:06Z"),
934        ];
935        let got = decode_all(message.clone()).unwrap();
936
937        assert_eq!(got.1, message.len());
938        assert_event_data_eq!(got.0[0], expected[0]);
939        assert_event_data_eq!(got.0[1], expected[1]);
940        assert_event_data_eq!(got.0[2], expected[2]);
941    }
942
943    #[test]
944    fn decode_forward_mode_with_options() {
945        //[
946        //    "tag.name",
947        //    [
948        //        [1441588984, {"message": "foo"}],
949        //        [1441588985, {"message": "bar"}],
950        //        [1441588986, {"message": "baz"}]
951        //    ],
952        //    {"size": 3}
953        //]
954        let message: Vec<u8> = vec![
955            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
956            167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
957            129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
958            250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 164, 115, 105,
959            122, 101, 3,
960        ];
961
962        let expected = vec![
963            mock_event("foo", "2015-09-07T01:23:04Z"),
964            mock_event("bar", "2015-09-07T01:23:05Z"),
965            mock_event("baz", "2015-09-07T01:23:06Z"),
966        ];
967
968        let got = decode_all(message.clone()).unwrap();
969
970        assert_eq!(got.1, message.len());
971
972        assert_event_data_eq!(got.0[0], expected[0]);
973        assert_event_data_eq!(got.0[1], expected[1]);
974        assert_event_data_eq!(got.0[2], expected[2]);
975    }
976
977    #[test]
978    fn decode_packed_forward_mode() {
979        //[
980        //    "tag.name",
981        //    <packed messages>
982        //]
983        //
984        //With packed messages as bin:
985        // [1441588984, {"message": "foo"}]
986        // [1441588985, {"message": "bar"}]
987        // [1441588986, {"message": "baz"}]
988        let message: Vec<u8> = vec![
989            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 57, 146, 206, 85, 236, 230, 248,
990            129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230,
991            249, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236,
992            230, 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 167, 109,
993            101, 115, 115, 97, 103, 101, 163, 102, 111, 111,
994        ];
995
996        let expected = vec![
997            mock_event("foo", "2015-09-07T01:23:04Z"),
998            mock_event("bar", "2015-09-07T01:23:05Z"),
999            mock_event("baz", "2015-09-07T01:23:06Z"),
1000        ];
1001
1002        let got = decode_all(message.clone()).unwrap();
1003
1004        assert_eq!(got.1, message.len());
1005        assert_event_data_eq!(got.0[0], expected[0]);
1006        assert_event_data_eq!(got.0[1], expected[1]);
1007        assert_event_data_eq!(got.0[2], expected[2]);
1008    }
1009
1010    //  TODO
1011    #[test]
1012    fn decode_compressed_packed_forward_mode() {
1013        //[
1014        //    "tag.name",
1015        //    <packed messages>,
1016        //    {"compressed": "gzip"}
1017        //]
1018        //
1019        //With gzip'd packed messages as bin:
1020        // [1441588984, {"message": "foo"}]
1021        // [1441588985, {"message": "bar"}]
1022        // [1441588986, {"message": "baz"}]
1023        let message: Vec<u8> = vec![
1024            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 55, 31, 139, 8, 0, 245, 10, 168,
1025            96, 0, 3, 155, 116, 46, 244, 205, 179, 31, 141, 203, 115, 83, 139, 139, 19, 211, 83,
1026            23, 167, 229, 231, 79, 2, 9, 253, 68, 8, 37, 37, 22, 129, 133, 126, 33, 11, 85, 1, 0,
1027            53, 3, 158, 28, 57, 0, 0, 0, 129, 170, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100,
1028            164, 103, 122, 105, 112,
1029        ];
1030
1031        let expected = vec![
1032            mock_event("foo", "2015-09-07T01:23:04Z"),
1033            mock_event("bar", "2015-09-07T01:23:05Z"),
1034            mock_event("baz", "2015-09-07T01:23:06Z"),
1035        ];
1036
1037        let got = decode_all(message.clone()).unwrap();
1038
1039        assert_eq!(got.1, message.len());
1040        assert_event_data_eq!(got.0[0], expected[0]);
1041        assert_event_data_eq!(got.0[1], expected[1]);
1042        assert_event_data_eq!(got.0[2], expected[2]);
1043    }
1044
1045    fn decode_all(message: Vec<u8>) -> Result<(SmallVec<[Event; 1]>, usize), DecodeError> {
1046        let mut buf = BytesMut::from(&message[..]);
1047
1048        let mut decoder = FluentDecoder::new(LogNamespace::default());
1049
1050        let (frame, byte_size) = decoder.decode(&mut buf)?.unwrap();
1051        Ok((frame.into(), byte_size))
1052    }
1053
1054    #[tokio::test]
1055    async fn ack_delivered_without_chunk() {
1056        let (result, output) = check_acknowledgements(EventStatus::Delivered, false).await;
1057        assert!(result.is_err()); // the `_` inside this error is `Elapsed`
1058        assert!(output.is_empty());
1059    }
1060
1061    #[tokio::test]
1062    async fn ack_delivered_with_chunk() {
1063        let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
1064        assert_eq!(result.unwrap().unwrap(), output.len());
1065        let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; // { "ack": ...
1066        assert_eq!(output[..expected.len()], expected);
1067    }
1068
1069    #[tokio::test]
1070    async fn ack_failed_without_chunk() {
1071        let (result, output) = check_acknowledgements(EventStatus::Rejected, false).await;
1072        assert_eq!(result.unwrap().unwrap(), output.len());
1073        assert!(output.is_empty());
1074    }
1075
1076    #[tokio::test]
1077    async fn ack_failed_with_chunk() {
1078        let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
1079        assert_eq!(result.unwrap().unwrap(), output.len());
1080        let expected: Vec<u8> = vec![0x80]; // { }
1081        assert_eq!(output, expected);
1082    }
1083
1084    async fn check_acknowledgements(
1085        status: EventStatus,
1086        with_chunk: bool,
1087    ) -> (Result<Result<usize, std::io::Error>, Elapsed>, Bytes) {
1088        trace_init();
1089
1090        let (sender, recv) = SourceSender::new_test_finalize(status);
1091        let address = next_addr();
1092        let source = FluentConfig {
1093            mode: FluentMode::Tcp(FluentTcpConfig {
1094                address: address.into(),
1095                tls: None,
1096                keepalive: None,
1097                permit_origin: None,
1098                receive_buffer_bytes: None,
1099                acknowledgements: true.into(),
1100                connection_limit: None,
1101            }),
1102            log_namespace: None,
1103        }
1104        .build(SourceContext::new_test(sender, None))
1105        .await
1106        .unwrap();
1107        tokio::spawn(source);
1108        wait_for_tcp(address).await;
1109
1110        let msg = uuid::Uuid::new_v4().to_string();
1111        let tag = uuid::Uuid::new_v4().to_string();
1112        let req = build_req(&tag, &[("field", &msg)], with_chunk);
1113
1114        let sender = tokio::spawn(async move {
1115            let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
1116            socket.write_all(&req).await.unwrap();
1117
1118            let mut output = BytesMut::new();
1119            (
1120                timeout(Duration::from_millis(250), socket.read_buf(&mut output)).await,
1121                output,
1122            )
1123        });
1124        let events = test_util::collect_n(recv, 1).await;
1125        let (result, output) = sender.await.unwrap();
1126
1127        assert_eq!(events.len(), 1);
1128        let log = events[0].as_log();
1129        assert_eq!(log.get("field").unwrap(), &msg.into());
1130        assert!(matches!(log.get("host").unwrap(), Value::Bytes(_)));
1131        assert!(matches!(log.get("timestamp").unwrap(), Value::Timestamp(_)));
1132        assert_eq!(log.get("tag").unwrap(), &tag.into());
1133
1134        (result, output.into())
1135    }
1136
1137    fn build_req(tag: &str, fields: &[(&str, &str)], with_chunk: bool) -> Vec<u8> {
1138        let mut record = FluentRecord::default();
1139        for (tag, value) in fields {
1140            record.insert((*tag).into(), rmpv::Value::String((*value).into()).into());
1141        }
1142        let chunk = with_chunk.then(|| BASE64_STANDARD.encode(uuid::Uuid::new_v4().as_bytes()));
1143        let req = FluentMessage::MessageWithOptions(
1144            tag.into(),
1145            FluentTimestamp::Unix(Utc::now()),
1146            record,
1147            FluentMessageOptions {
1148                chunk,
1149                ..Default::default()
1150            },
1151        );
1152        let mut buf = Vec::new();
1153        req.serialize(&mut Serializer::new(&mut buf)).unwrap();
1154        buf
1155    }
1156
1157    #[test]
1158    fn output_schema_definition_vector_namespace() {
1159        let config = FluentConfig {
1160            mode: FluentMode::Tcp(FluentTcpConfig {
1161                address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1162                tls: None,
1163                keepalive: None,
1164                permit_origin: None,
1165                receive_buffer_bytes: None,
1166                acknowledgements: false.into(),
1167                connection_limit: None,
1168            }),
1169            log_namespace: Some(true),
1170        };
1171
1172        let definitions = config
1173            .outputs(LogNamespace::Vector)
1174            .remove(0)
1175            .schema_definition(true);
1176
1177        let expected_definition =
1178            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1179                .with_meaning(OwnedTargetPath::event_root(), "message")
1180                .with_metadata_field(
1181                    &owned_value_path!("vector", "source_type"),
1182                    Kind::bytes(),
1183                    None,
1184                )
1185                .with_metadata_field(&owned_value_path!("fluent", "tag"), Kind::bytes(), None)
1186                .with_metadata_field(
1187                    &owned_value_path!("fluent", "timestamp"),
1188                    Kind::timestamp(),
1189                    Some("timestamp"),
1190                )
1191                .with_metadata_field(
1192                    &owned_value_path!("fluent", "record"),
1193                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1194                    None,
1195                )
1196                .with_metadata_field(
1197                    &owned_value_path!("vector", "ingest_timestamp"),
1198                    Kind::timestamp(),
1199                    None,
1200                )
1201                .with_metadata_field(
1202                    &owned_value_path!("fluent", "host"),
1203                    Kind::bytes(),
1204                    Some("host"),
1205                )
1206                .with_metadata_field(
1207                    &owned_value_path!("fluent", "tls_client_metadata"),
1208                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1209                    None,
1210                );
1211
1212        assert_eq!(definitions, Some(expected_definition))
1213    }
1214
1215    #[test]
1216    fn output_schema_definition_legacy_namespace() {
1217        let config = FluentConfig {
1218            mode: FluentMode::Tcp(FluentTcpConfig {
1219                address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1220                tls: None,
1221                keepalive: None,
1222                permit_origin: None,
1223                receive_buffer_bytes: None,
1224                acknowledgements: false.into(),
1225                connection_limit: None,
1226            }),
1227            log_namespace: None,
1228        };
1229
1230        let definitions = config
1231            .outputs(LogNamespace::Legacy)
1232            .remove(0)
1233            .schema_definition(true);
1234
1235        let expected_definition = Definition::new_with_default_metadata(
1236            Kind::object(Collection::empty()),
1237            [LogNamespace::Legacy],
1238        )
1239        .with_event_field(
1240            &owned_value_path!("message"),
1241            Kind::bytes(),
1242            Some("message"),
1243        )
1244        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1245        .with_event_field(&owned_value_path!("tag"), Kind::bytes(), None)
1246        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1247        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
1248        .unknown_fields(Kind::bytes());
1249
1250        assert_eq!(definitions, Some(expected_definition))
1251    }
1252}
1253
1254#[cfg(all(test, feature = "fluent-integration-tests"))]
1255mod integration_tests {
1256    use std::{fs::File, io::Write, net::SocketAddr, time::Duration};
1257
1258    use futures::Stream;
1259    use tokio::time::sleep;
1260    use vector_lib::event::{Event, EventStatus};
1261
1262    use crate::sources::fluent::{FluentMode, FluentTcpConfig};
1263    use crate::{
1264        config::{SourceConfig, SourceContext},
1265        docker::Container,
1266        sources::fluent::FluentConfig,
1267        test_util::{
1268            collect_ready,
1269            components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
1270            next_addr, next_addr_for_ip, random_string, wait_for_tcp,
1271        },
1272        SourceSender,
1273    };
1274
1275    const FLUENT_BIT_IMAGE: &str = "fluent/fluent-bit";
1276    const FLUENT_BIT_TAG: &str = "1.7";
1277    const FLUENTD_IMAGE: &str = "fluent/fluentd";
1278    const FLUENTD_TAG: &str = "v1.12";
1279
1280    fn make_file(name: &str, content: &str) -> tempfile::TempDir {
1281        let dir = tempfile::tempdir().unwrap();
1282        let mut file = File::create(dir.path().join(name)).unwrap();
1283        write!(&mut file, "{content}").unwrap();
1284        dir
1285    }
1286
1287    #[tokio::test]
1288    async fn fluentbit() {
1289        test_fluentbit(EventStatus::Delivered).await;
1290    }
1291
1292    #[tokio::test]
1293    async fn fluentbit_rejection() {
1294        test_fluentbit(EventStatus::Rejected).await;
1295    }
1296
1297    async fn test_fluentbit(status: EventStatus) {
1298        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1299            let test_address = next_addr();
1300            let (out, source_address) = source(status).await;
1301
1302            let dir = make_file(
1303                "fluent-bit.conf",
1304                &format!(
1305                    r#"
1306[SERVICE]
1307    Grace      0
1308    Flush      1
1309    Daemon     off
1310
1311[INPUT]
1312    Name       http
1313    Host       {listen_host}
1314    Port       {listen_port}
1315
1316[OUTPUT]
1317    Name          forward
1318    Match         *
1319    Host          host.docker.internal
1320    Port          {send_port}
1321    Require_ack_response true
1322    "#,
1323                    listen_host = test_address.ip(),
1324                    listen_port = test_address.port(),
1325                    send_port = source_address.port(),
1326                ),
1327            );
1328
1329            let msg = random_string(64);
1330            let body = serde_json::json!({ "message": msg });
1331
1332            let events = Container::new(FLUENT_BIT_IMAGE, FLUENT_BIT_TAG)
1333                .bind(dir.path().display(), "/fluent-bit/etc")
1334                .run(async move {
1335                    wait_for_tcp(test_address).await;
1336                    reqwest::Client::new()
1337                        .post(format!("http://{test_address}/"))
1338                        .header("content-type", "application/json")
1339                        .body(body.to_string())
1340                        .send()
1341                        .await
1342                        .unwrap();
1343                    sleep(Duration::from_secs(2)).await;
1344
1345                    collect_ready(out).await
1346                })
1347                .await;
1348
1349            assert_eq!(events.len(), 1);
1350            let log = events[0].as_log();
1351            assert_eq!(log["tag"], "http.0".into());
1352            assert_eq!(log["message"], msg.into());
1353            assert!(log.get("timestamp").is_some());
1354            assert!(log.get("host").is_some());
1355        })
1356        .await;
1357    }
1358
1359    #[tokio::test]
1360    async fn fluentd() {
1361        test_fluentd(EventStatus::Delivered, "").await;
1362    }
1363
1364    #[tokio::test]
1365    async fn fluentd_gzip() {
1366        test_fluentd(EventStatus::Delivered, "compress gzip").await;
1367    }
1368
1369    #[tokio::test]
1370    async fn fluentd_rejection() {
1371        test_fluentd(EventStatus::Rejected, "").await;
1372    }
1373
1374    async fn test_fluentd(status: EventStatus, options: &str) {
1375        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1376            let test_address = next_addr();
1377            let (out, source_address) = source(status).await;
1378
1379            let config = format!(
1380                r#"
1381<source>
1382  @type http
1383  bind {http_host}
1384  port {http_port}
1385</source>
1386
1387<match *>
1388  @type forward
1389  <server>
1390    name  local
1391    host  host.docker.internal
1392    port  {port}
1393  </server>
1394  <buffer>
1395    flush_mode immediate
1396  </buffer>
1397  require_ack_response true
1398  ack_response_timeout 1
1399  {options}
1400</match>
1401"#,
1402                http_host = test_address.ip(),
1403                http_port = test_address.port(),
1404                port = source_address.port(),
1405                options = options
1406            );
1407
1408            let dir = make_file("fluent.conf", &config);
1409
1410            let msg = random_string(64);
1411            let body = serde_json::json!({ "message": msg });
1412
1413            let events = Container::new(FLUENTD_IMAGE, FLUENTD_TAG)
1414                .bind(dir.path().display(), "/fluentd/etc")
1415                .run(async move {
1416                    wait_for_tcp(test_address).await;
1417                    reqwest::Client::new()
1418                        .post(format!("http://{test_address}/"))
1419                        .header("content-type", "application/json")
1420                        .body(body.to_string())
1421                        .send()
1422                        .await
1423                        .unwrap();
1424                    sleep(Duration::from_secs(2)).await;
1425                    collect_ready(out).await
1426                })
1427                .await;
1428
1429            assert_eq!(events.len(), 1);
1430            assert_eq!(events[0].as_log()["tag"], "".into());
1431            assert_eq!(events[0].as_log()["message"], msg.into());
1432            assert!(events[0].as_log().get("timestamp").is_some());
1433            assert!(events[0].as_log().get("host").is_some());
1434        })
1435        .await;
1436    }
1437
1438    async fn source(status: EventStatus) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
1439        let (sender, recv) = SourceSender::new_test_finalize(status);
1440        let address = next_addr_for_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
1441        tokio::spawn(async move {
1442            FluentConfig {
1443                mode: FluentMode::Tcp(FluentTcpConfig {
1444                    address: address.into(),
1445                    tls: None,
1446                    keepalive: None,
1447                    permit_origin: None,
1448                    receive_buffer_bytes: None,
1449                    acknowledgements: false.into(),
1450                    connection_limit: None,
1451                }),
1452                log_namespace: None,
1453            }
1454            .build(SourceContext::new_test(sender, None))
1455            .await
1456            .unwrap()
1457            .await
1458            .unwrap()
1459        });
1460        wait_for_tcp(address).await;
1461        (recv, address)
1462    }
1463}