vector/sources/fluent/
mod.rs

1use std::{
2    collections::HashMap,
3    io::{self, Read},
4    net::SocketAddr,
5    time::Duration,
6};
7
8use base64::prelude::{BASE64_STANDARD, Engine as _};
9use bytes::{Buf, Bytes, BytesMut};
10use chrono::Utc;
11use flate2::read::MultiGzDecoder;
12use rmp_serde::{Deserializer, Serializer, decode};
13use serde::{Deserialize, Serialize};
14use smallvec::{SmallVec, smallvec};
15use tokio_util::codec::Decoder;
16use vector_lib::{
17    codecs::{BytesDeserializerConfig, StreamDecodingError},
18    config::{LegacyKey, LogNamespace},
19    configurable::configurable_component,
20    ipallowlist::IpAllowlistConfig,
21    lookup::{OwnedValuePath, lookup_v2::parse_value_path, metadata_path, owned_value_path, path},
22    schema::Definition,
23};
24use vrl::value::{Kind, Value, kind::Collection};
25
26use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
27use crate::{
28    config::{
29        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
30        SourceContext, SourceOutput, log_schema,
31    },
32    event::{Event, LogEvent},
33    internal_events::{FluentMessageDecodeError, FluentMessageReceived},
34    serde::bool_or_struct,
35    tcp::TcpKeepaliveConfig,
36    tls::{MaybeTlsSettings, TlsSourceConfig},
37};
38
39mod message;
40use self::message::{FluentEntry, FluentMessage, FluentRecord, FluentTag, FluentTimestamp};
41
42/// Configuration for the `fluent` source.
43#[configurable_component(source("fluent", "Collect logs from a Fluentd or Fluent Bit agent."))]
44#[derive(Clone, Debug)]
45pub struct FluentConfig {
46    #[serde(flatten)]
47    mode: FluentMode,
48
49    /// The namespace to use for logs. This overrides the global setting.
50    #[configurable(metadata(docs::hidden))]
51    #[serde(default)]
52    log_namespace: Option<bool>,
53}
54
55/// Listening mode for the `fluent` source.
56#[configurable_component(no_deser)]
57#[derive(Clone, Debug)]
58#[serde(tag = "mode", rename_all = "snake_case")]
59#[configurable(metadata(docs::enum_tag_description = "The type of socket to use."))]
60#[allow(clippy::large_enum_variant)] // just used for configuration
61pub enum FluentMode {
62    /// Listen on TCP port
63    Tcp(FluentTcpConfig),
64
65    /// Listen on unix stream socket
66    #[cfg(unix)]
67    Unix(FluentUnixConfig),
68}
69
70/// Serde doesn't provide a way to specify a default tagged variant when deserializing
71/// So we use a somewhat arcane setup with an untagged and tagged versions to allow
72/// users to not have to specify mode = tcp
73///
74/// See [serde-rs/serde#2231](https://github.com/serde-rs/serde/issues/2231)
75mod deser {
76    use super::*;
77
78    #[allow(clippy::large_enum_variant)]
79    #[derive(Deserialize)]
80    #[serde(tag = "mode")]
81    enum FluentModeTagged {
82        #[serde(rename = "tcp")]
83        Tcp(FluentTcpConfig),
84
85        #[cfg(unix)]
86        #[serde(rename = "unix")]
87        Unix(FluentUnixConfig),
88    }
89
90    #[derive(Deserialize)]
91    #[serde(untagged)]
92    enum FluentModeDe {
93        Tagged(FluentModeTagged),
94
95        // Note: this must be last as serde attempts variants in order
96        Untagged(FluentTcpConfig),
97    }
98
99    impl<'de> Deserialize<'de> for FluentMode {
100        fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
101        where
102            D: serde::Deserializer<'de>,
103        {
104            Ok(match FluentModeDe::deserialize(deserializer)? {
105                FluentModeDe::Tagged(FluentModeTagged::Tcp(config)) => FluentMode::Tcp(config),
106                #[cfg(unix)]
107                FluentModeDe::Tagged(FluentModeTagged::Unix(config)) => FluentMode::Unix(config),
108                FluentModeDe::Untagged(config) => FluentMode::Tcp(config),
109            })
110        }
111    }
112
113    #[cfg(test)]
114    mod tests {
115        use super::*;
116
117        #[test]
118        fn test_tcp_default_mode() {
119            let json_data = serde_json::json!({
120                "address": "0.0.0.0:2020",
121                "connection_limit": 2
122            });
123
124            let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
125            assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
126        }
127
128        #[test]
129        fn test_tcp_explicit_mode() {
130            let json_data = serde_json::json!({
131                "mode": "tcp",
132                "address": "0.0.0.0:2020",
133                "connection_limit": 2
134            });
135
136            let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
137            assert!(matches!(parsed.mode, FluentMode::Tcp(c) if c.connection_limit.unwrap() == 2));
138        }
139
140        #[test]
141        fn test_invalid_unix_mode() {
142            let json_data = serde_json::json!({
143                "mode": "unix",
144                "address": "0.0.0.0:2020",
145                "connection_limit": 2
146            });
147
148            assert!(serde_json::from_value::<FluentConfig>(json_data).is_err());
149        }
150
151        #[cfg(unix)]
152        #[test]
153        fn test_valid_unix_mode() {
154            let json_data = serde_json::json!({
155                "mode": "unix",
156                "path": "/foo"
157            });
158
159            let parsed: FluentConfig = serde_json::from_value(json_data).unwrap();
160            assert!(
161                matches!(parsed.mode, FluentMode::Unix(c) if c.path.to_string_lossy() == "/foo")
162            );
163        }
164    }
165}
166
167/// Configuration for the `fluent` TCP source.
168#[configurable_component]
169#[derive(Clone, Debug)]
170#[serde(deny_unknown_fields)]
171pub struct FluentTcpConfig {
172    #[configurable(derived)]
173    address: SocketListenAddr,
174
175    /// The maximum number of TCP connections that are allowed at any given time.
176    #[configurable(metadata(docs::type_unit = "connections"))]
177    connection_limit: Option<u32>,
178
179    #[configurable(derived)]
180    keepalive: Option<TcpKeepaliveConfig>,
181
182    #[configurable(derived)]
183    pub permit_origin: Option<IpAllowlistConfig>,
184
185    /// The size of the receive buffer used for each connection.
186    ///
187    /// This generally should not need to be changed.
188    #[configurable(metadata(docs::type_unit = "bytes"))]
189    #[configurable(metadata(docs::examples = 65536))]
190    receive_buffer_bytes: Option<usize>,
191
192    #[configurable(derived)]
193    tls: Option<TlsSourceConfig>,
194
195    #[configurable(derived)]
196    #[serde(default, deserialize_with = "bool_or_struct")]
197    acknowledgements: SourceAcknowledgementsConfig,
198}
199
200impl FluentTcpConfig {
201    fn build(
202        &self,
203        cx: SourceContext,
204        log_namespace: LogNamespace,
205    ) -> crate::Result<super::Source> {
206        let source = FluentSource::new(log_namespace);
207        let shutdown_secs = Duration::from_secs(30);
208        let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
209        let tls_client_metadata_key = self
210            .tls
211            .as_ref()
212            .and_then(|tls| tls.client_metadata_key.clone())
213            .and_then(|k| k.path);
214        let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
215        source.run(
216            self.address,
217            self.keepalive,
218            shutdown_secs,
219            tls,
220            tls_client_metadata_key,
221            self.receive_buffer_bytes,
222            None,
223            cx,
224            self.acknowledgements,
225            self.connection_limit,
226            self.permit_origin.clone().map(Into::into),
227            FluentConfig::NAME,
228            log_namespace,
229        )
230    }
231}
232
233/// Configuration for the `fluent` unix socket source.
234#[configurable_component]
235#[derive(Clone, Debug)]
236#[serde(deny_unknown_fields)]
237#[cfg(unix)]
238pub struct FluentUnixConfig {
239    /// The Unix socket path.
240    ///
241    /// This should be an absolute path.
242    #[configurable(metadata(docs::examples = "/path/to/socket"))]
243    pub path: std::path::PathBuf,
244
245    /// Unix file mode bits to be applied to the unix socket file as its designated file permissions.
246    ///
247    /// Note: The file mode value can be specified in any numeric format supported by your configuration
248    /// language, but it is most intuitive to use an octal number.
249    #[configurable(metadata(docs::examples = 0o777))]
250    #[configurable(metadata(docs::examples = 0o600))]
251    #[configurable(metadata(docs::examples = 508))]
252    pub socket_file_mode: Option<u32>,
253}
254
255#[cfg(unix)]
256impl FluentUnixConfig {
257    fn build(
258        &self,
259        cx: SourceContext,
260        log_namespace: LogNamespace,
261    ) -> crate::Result<super::Source> {
262        let source = FluentSource::new(log_namespace);
263
264        crate::sources::util::build_unix_stream_source(
265            self.path.clone(),
266            self.socket_file_mode,
267            source.decoder(),
268            move |events, host| source.handle_events_impl(events, host.into()),
269            cx.shutdown,
270            cx.out,
271        )
272    }
273}
274
275impl GenerateConfig for FluentConfig {
276    fn generate_config() -> toml::Value {
277        toml::Value::try_from(Self {
278            mode: FluentMode::Tcp(FluentTcpConfig {
279                address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
280                keepalive: None,
281                permit_origin: None,
282                tls: None,
283                receive_buffer_bytes: None,
284                acknowledgements: Default::default(),
285                connection_limit: Some(2),
286            }),
287            log_namespace: None,
288        })
289        .unwrap()
290    }
291}
292
293#[async_trait::async_trait]
294#[typetag::serde(name = "fluent")]
295impl SourceConfig for FluentConfig {
296    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
297        let log_namespace = cx.log_namespace(self.log_namespace);
298        match &self.mode {
299            FluentMode::Tcp(t) => t.build(cx, log_namespace),
300            #[cfg(unix)]
301            FluentMode::Unix(u) => u.build(cx, log_namespace),
302        }
303    }
304
305    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
306        let log_namespace = global_log_namespace.merge(self.log_namespace);
307        let schema_definition = self.schema_definition(log_namespace);
308
309        vec![SourceOutput::new_maybe_logs(
310            DataType::Log,
311            schema_definition,
312        )]
313    }
314
315    fn resources(&self) -> Vec<Resource> {
316        match &self.mode {
317            FluentMode::Tcp(tcp) => vec![tcp.address.as_tcp_resource()],
318            #[cfg(unix)]
319            FluentMode::Unix(_) => vec![],
320        }
321    }
322
323    fn can_acknowledge(&self) -> bool {
324        matches!(self.mode, FluentMode::Tcp(_))
325    }
326}
327
328impl FluentConfig {
329    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
330    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
331        // `host_key` is only inserted if not present already.
332        let host_key = log_schema()
333            .host_key()
334            .cloned()
335            .map(LegacyKey::InsertIfEmpty);
336
337        let tag_key = parse_value_path("tag").ok().map(LegacyKey::Overwrite);
338
339        let tls_client_metadata_path = match &self.mode {
340            FluentMode::Tcp(tcp) => tcp
341                .tls
342                .as_ref()
343                .and_then(|tls| tls.client_metadata_key.as_ref())
344                .and_then(|k| k.path.clone())
345                .map(LegacyKey::Overwrite),
346            #[cfg(unix)]
347            FluentMode::Unix(_) => None,
348        };
349
350        // There is a global and per-source `log_namespace` config.
351        // The source config overrides the global setting and is merged here.
352        let mut schema_definition = BytesDeserializerConfig
353            .schema_definition(log_namespace)
354            .with_standard_vector_source_metadata()
355            .with_source_metadata(
356                FluentConfig::NAME,
357                host_key,
358                &owned_value_path!("host"),
359                Kind::bytes(),
360                Some("host"),
361            )
362            .with_source_metadata(
363                FluentConfig::NAME,
364                tag_key,
365                &owned_value_path!("tag"),
366                Kind::bytes(),
367                None,
368            )
369            .with_source_metadata(
370                FluentConfig::NAME,
371                None,
372                &owned_value_path!("timestamp"),
373                Kind::timestamp(),
374                Some("timestamp"),
375            )
376            // for metadata that is added to the events dynamically from the FluentRecord
377            .with_source_metadata(
378                FluentConfig::NAME,
379                None,
380                &owned_value_path!("record"),
381                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
382                None,
383            )
384            .with_source_metadata(
385                Self::NAME,
386                tls_client_metadata_path,
387                &owned_value_path!("tls_client_metadata"),
388                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
389                None,
390            );
391
392        // for metadata that is added to the events dynamically
393        if log_namespace == LogNamespace::Legacy {
394            schema_definition = schema_definition.unknown_fields(Kind::bytes());
395        }
396
397        schema_definition
398    }
399}
400
401#[derive(Debug, Clone)]
402struct FluentSource {
403    log_namespace: LogNamespace,
404    legacy_host_key_path: Option<OwnedValuePath>,
405}
406
407impl FluentSource {
408    fn new(log_namespace: LogNamespace) -> Self {
409        Self {
410            log_namespace,
411            legacy_host_key_path: log_schema().host_key().cloned(),
412        }
413    }
414
415    fn handle_events_impl(&self, events: &mut [Event], host: Value) {
416        for event in events {
417            let log = event.as_mut_log();
418
419            let legacy_host_key = self
420                .legacy_host_key_path
421                .as_ref()
422                .map(LegacyKey::InsertIfEmpty);
423
424            self.log_namespace.insert_source_metadata(
425                FluentConfig::NAME,
426                log,
427                legacy_host_key,
428                path!("host"),
429                host.clone(),
430            );
431        }
432    }
433}
434
435impl TcpSource for FluentSource {
436    type Error = DecodeError;
437    type Item = FluentFrame;
438    type Decoder = FluentDecoder;
439    type Acker = FluentAcker;
440
441    fn decoder(&self) -> Self::Decoder {
442        FluentDecoder::new(self.log_namespace)
443    }
444
445    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
446        self.handle_events_impl(events, host.ip().to_string().into())
447    }
448
449    fn build_acker(&self, frame: &[Self::Item]) -> Self::Acker {
450        FluentAcker::new(frame)
451    }
452}
453
454#[derive(Debug)]
455pub enum DecodeError {
456    IO(io::Error),
457    Decode(decode::Error),
458    UnknownCompression(String),
459    UnexpectedValue(rmpv::Value),
460}
461
462impl std::fmt::Display for DecodeError {
463    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464        match self {
465            DecodeError::IO(err) => write!(f, "{err}"),
466            DecodeError::Decode(err) => write!(f, "{err}"),
467            DecodeError::UnknownCompression(compression) => {
468                write!(f, "unknown compression: {compression}")
469            }
470            DecodeError::UnexpectedValue(value) => {
471                write!(f, "unexpected msgpack value, ignoring: {value}")
472            }
473        }
474    }
475}
476
477impl StreamDecodingError for DecodeError {
478    fn can_continue(&self) -> bool {
479        match self {
480            DecodeError::IO(_) => false,
481            DecodeError::Decode(_) => true,
482            DecodeError::UnknownCompression(_) => true,
483            DecodeError::UnexpectedValue(_) => true,
484        }
485    }
486}
487
488impl From<io::Error> for DecodeError {
489    fn from(e: io::Error) -> Self {
490        DecodeError::IO(e)
491    }
492}
493
494impl From<decode::Error> for DecodeError {
495    fn from(e: decode::Error) -> Self {
496        DecodeError::Decode(e)
497    }
498}
499
500#[derive(Debug, Clone)]
501struct FluentDecoder {
502    log_namespace: LogNamespace,
503}
504
505impl FluentDecoder {
506    const fn new(log_namespace: LogNamespace) -> Self {
507        Self { log_namespace }
508    }
509
510    fn handle_message(
511        &mut self,
512        message: Result<FluentMessage, DecodeError>,
513        byte_size: usize,
514    ) -> Result<Option<(FluentFrame, usize)>, DecodeError> {
515        let log_namespace = &self.log_namespace;
516
517        match message? {
518            FluentMessage::Message(tag, timestamp, record) => {
519                let event = Event::from(FluentEvent {
520                    tag,
521                    timestamp,
522                    record,
523                    log_namespace,
524                });
525                let frame = FluentFrame {
526                    events: smallvec![event],
527                    chunk: None,
528                };
529                Ok(Some((frame, byte_size)))
530            }
531            FluentMessage::MessageWithOptions(tag, timestamp, record, options) => {
532                let event = Event::from(FluentEvent {
533                    tag,
534                    timestamp,
535                    record,
536                    log_namespace,
537                });
538                let frame = FluentFrame {
539                    events: smallvec![event],
540                    chunk: options.chunk,
541                };
542                Ok(Some((frame, byte_size)))
543            }
544            FluentMessage::Forward(tag, entries) => {
545                let events = entries
546                    .into_iter()
547                    .map(|FluentEntry(timestamp, record)| {
548                        Event::from(FluentEvent {
549                            tag: tag.clone(),
550                            timestamp,
551                            record,
552                            log_namespace,
553                        })
554                    })
555                    .collect();
556                let frame = FluentFrame {
557                    events,
558                    chunk: None,
559                };
560                Ok(Some((frame, byte_size)))
561            }
562            FluentMessage::ForwardWithOptions(tag, entries, options) => {
563                let events = entries
564                    .into_iter()
565                    .map(|FluentEntry(timestamp, record)| {
566                        Event::from(FluentEvent {
567                            tag: tag.clone(),
568                            timestamp,
569                            record,
570                            log_namespace,
571                        })
572                    })
573                    .collect();
574                let frame = FluentFrame {
575                    events,
576                    chunk: options.chunk,
577                };
578                Ok(Some((frame, byte_size)))
579            }
580            FluentMessage::PackedForward(tag, bin) => {
581                let mut buf = BytesMut::from(&bin[..]);
582
583                let mut events = smallvec![];
584                while let Some(FluentEntry(timestamp, record)) =
585                    FluentEntryStreamDecoder.decode(&mut buf)?
586                {
587                    events.push(Event::from(FluentEvent {
588                        tag: tag.clone(),
589                        timestamp,
590                        record,
591                        log_namespace,
592                    }));
593                }
594                let frame = FluentFrame {
595                    events,
596                    chunk: None,
597                };
598                Ok(Some((frame, byte_size)))
599            }
600            FluentMessage::PackedForwardWithOptions(tag, bin, options) => {
601                let buf = match options.compressed.as_deref() {
602                    Some("gzip") => {
603                        let mut buf = Vec::new();
604                        MultiGzDecoder::new(io::Cursor::new(bin.into_vec()))
605                            .read_to_end(&mut buf)
606                            .map(|_| buf)
607                            .map_err(Into::into)
608                    }
609                    Some("text") | None => Ok(bin.into_vec()),
610                    Some(s) => Err(DecodeError::UnknownCompression(s.to_owned())),
611                }?;
612
613                let mut buf = BytesMut::from(&buf[..]);
614
615                let mut events = smallvec![];
616                while let Some(FluentEntry(timestamp, record)) =
617                    FluentEntryStreamDecoder.decode(&mut buf)?
618                {
619                    events.push(Event::from(FluentEvent {
620                        tag: tag.clone(),
621                        timestamp,
622                        record,
623                        log_namespace,
624                    }));
625                }
626                let frame = FluentFrame {
627                    events,
628                    chunk: options.chunk,
629                };
630                Ok(Some((frame, byte_size)))
631            }
632            FluentMessage::Heartbeat(rmpv::Value::Nil) => Ok(None),
633            FluentMessage::Heartbeat(value) => Err(DecodeError::UnexpectedValue(value)),
634        }
635    }
636}
637
638impl Decoder for FluentDecoder {
639    type Item = (FluentFrame, usize);
640    type Error = DecodeError;
641
642    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
643        loop {
644            if src.is_empty() {
645                return Ok(None);
646            }
647
648            let (byte_size, res) = {
649                let mut des = Deserializer::new(io::Cursor::new(&src[..]));
650
651                let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
652
653                // check for unexpected EOF to indicate that we need more data
654                if let Err(DecodeError::Decode(
655                    decode::Error::InvalidDataRead(ref custom)
656                    | decode::Error::InvalidMarkerRead(ref custom),
657                )) = res
658                    && custom.kind() == io::ErrorKind::UnexpectedEof
659                {
660                    return Ok(None);
661                }
662
663                (des.position() as usize, res)
664            };
665
666            src.advance(byte_size);
667
668            let maybe_item = self.handle_message(res, byte_size).inspect_err(|error| {
669                let base64_encoded_message = BASE64_STANDARD.encode(&src[..]);
670                emit!(FluentMessageDecodeError {
671                    error,
672                    base64_encoded_message
673                });
674            })?;
675            if let Some(item) = maybe_item {
676                return Ok(Some(item));
677            }
678        }
679    }
680}
681
682/// Decoder for decoding MessagePackEventStream which are just a stream of Entries
683#[derive(Clone, Debug)]
684struct FluentEntryStreamDecoder;
685
686impl Decoder for FluentEntryStreamDecoder {
687    type Item = FluentEntry;
688    type Error = DecodeError;
689
690    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
691        if src.is_empty() {
692            return Ok(None);
693        }
694        let (byte_size, res) = {
695            let mut des = Deserializer::new(io::Cursor::new(&src[..]));
696
697            // attempt to parse, if we get unexpected EOF, we need more data
698            let res = Deserialize::deserialize(&mut des).map_err(DecodeError::Decode);
699
700            if let Err(DecodeError::Decode(decode::Error::InvalidDataRead(ref custom))) = res
701                && custom.kind() == io::ErrorKind::UnexpectedEof
702            {
703                return Ok(None);
704            }
705
706            let byte_size = des.position();
707
708            emit!(FluentMessageReceived { byte_size });
709
710            (byte_size as usize, res)
711        };
712
713        src.advance(byte_size);
714
715        res
716    }
717}
718
719struct FluentAcker {
720    chunks: Vec<String>,
721}
722
723impl FluentAcker {
724    fn new(frames: &[FluentFrame]) -> Self {
725        Self {
726            chunks: frames.iter().filter_map(|f| f.chunk.clone()).collect(),
727        }
728    }
729}
730
731impl TcpSourceAcker for FluentAcker {
732    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
733        if self.chunks.is_empty() {
734            return None;
735        }
736
737        let mut buf = Vec::new();
738        let mut ser = Serializer::new(&mut buf);
739        let mut ack_map = HashMap::new();
740
741        for chunk in self.chunks {
742            ack_map.clear();
743            if let TcpSourceAck::Ack = ack {
744                ack_map.insert("ack", chunk);
745            };
746            ack_map.serialize(&mut ser).unwrap();
747        }
748        Some(buf.into())
749    }
750}
751
752/// Normalized fluent message.
753#[derive(Debug, PartialEq)]
754struct FluentEvent<'a> {
755    tag: FluentTag,
756    timestamp: FluentTimestamp,
757    record: FluentRecord,
758    log_namespace: &'a LogNamespace,
759}
760
761impl From<FluentEvent<'_>> for Event {
762    fn from(frame: FluentEvent) -> Event {
763        LogEvent::from(frame).into()
764    }
765}
766
767struct FluentFrame {
768    events: SmallVec<[Event; 1]>,
769    chunk: Option<String>,
770}
771
772impl From<FluentFrame> for SmallVec<[Event; 1]> {
773    fn from(frame: FluentFrame) -> Self {
774        frame.events
775    }
776}
777
778impl From<FluentEvent<'_>> for LogEvent {
779    fn from(frame: FluentEvent) -> LogEvent {
780        let FluentEvent {
781            tag,
782            timestamp,
783            record,
784            log_namespace,
785        } = frame;
786
787        let mut log = LogEvent::default();
788
789        log_namespace.insert_vector_metadata(
790            &mut log,
791            log_schema().source_type_key(),
792            path!("source_type"),
793            Bytes::from_static(FluentConfig::NAME.as_bytes()),
794        );
795
796        match log_namespace {
797            LogNamespace::Vector => {
798                log.insert(metadata_path!(FluentConfig::NAME, "timestamp"), timestamp);
799                log.insert(metadata_path!("vector", "ingest_timestamp"), Utc::now());
800            }
801            LogNamespace::Legacy => {
802                log.maybe_insert(log_schema().timestamp_key_target_path(), timestamp);
803            }
804        }
805
806        log_namespace.insert_source_metadata(
807            FluentConfig::NAME,
808            &mut log,
809            Some(LegacyKey::Overwrite(path!("tag"))),
810            path!("tag"),
811            tag,
812        );
813
814        for (key, value) in record.into_iter() {
815            let value: Value = value.into();
816            log_namespace.insert_source_metadata(
817                FluentConfig::NAME,
818                &mut log,
819                Some(LegacyKey::Overwrite(path!(key.as_str()))),
820                path!("record", key.as_str()),
821                value,
822            );
823        }
824        log
825    }
826}
827
828#[cfg(test)]
829mod tests {
830    use bytes::BytesMut;
831    use chrono::{DateTime, Utc};
832    use rmp_serde::Serializer;
833    use serde::Serialize;
834    use tokio::{
835        io::{AsyncReadExt, AsyncWriteExt},
836        time::{Duration, error::Elapsed, timeout},
837    };
838    use tokio_util::codec::Decoder;
839    use vector_lib::{assert_event_data_eq, lookup::OwnedTargetPath, schema::Definition};
840    use vrl::value::{ObjectMap, Value, kind::Collection};
841
842    use super::{message::FluentMessageOptions, *};
843    use crate::{
844        SourceSender,
845        config::{SourceConfig, SourceContext},
846        event::EventStatus,
847        test_util::{self, next_addr, trace_init, wait_for_tcp},
848    };
849
850    #[test]
851    fn generate_config() {
852        crate::test_util::test_generate_config::<FluentConfig>();
853    }
854
855    // useful references for msgpack:
856    // Spec: https://github.com/msgpack/msgpack/blob/master/spec.md
857    // Encode to array of bytes: https://kawanet.github.io/msgpack-lite/
858    // Decode base64: https://toolslick.com/conversion/data/messagepack-to-json
859
860    fn mock_event(name: &str, timestamp: &str) -> Event {
861        Event::Log(LogEvent::from(ObjectMap::from([
862            ("message".into(), Value::from(name)),
863            (
864                log_schema().source_type_key().unwrap().to_string().into(),
865                Value::from(FluentConfig::NAME),
866            ),
867            ("tag".into(), Value::from("tag.name")),
868            (
869                "timestamp".into(),
870                Value::Timestamp(DateTime::parse_from_rfc3339(timestamp).unwrap().into()),
871            ),
872        ])))
873    }
874
875    #[test]
876    fn decode_message_mode() {
877        //[
878        //  "tag.name",
879        //  1441588984,
880        //  {"message": "bar"},
881        //]
882        let message: Vec<u8> = vec![
883            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
884            101, 115, 115, 97, 103, 101, 163, 98, 97, 114,
885        ];
886
887        let expected = mock_event("bar", "2015-09-07T01:23:04Z");
888        let got = decode_all(message.clone()).unwrap();
889        assert_event_data_eq!(got.0[0], expected);
890        assert_eq!(got.1, message.len());
891    }
892
893    #[test]
894    fn decode_message_mode_with_options() {
895        //[
896        //  "tag.name",
897        //   1441588984,
898        //   { "message": "bar" },
899        //   { "size": 1 }
900        //]
901        let message: Vec<u8> = vec![
902            148, 168, 116, 97, 103, 46, 110, 97, 109, 101, 206, 85, 236, 230, 248, 129, 167, 109,
903            101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 129, 164, 115, 105, 122, 101, 1,
904        ];
905
906        let expected = mock_event("bar", "2015-09-07T01:23:04Z");
907        let got = decode_all(message.clone()).unwrap();
908        assert_eq!(got.1, message.len());
909        assert_event_data_eq!(got.0[0], expected);
910    }
911
912    #[test]
913    fn decode_forward_mode() {
914        //[
915        //    "tag.name",
916        //    [
917        //        [1441588984, {"message": "foo"}],
918        //        [1441588985, {"message": "bar"}],
919        //        [1441588986, {"message": "baz"}]
920        //    ]
921        //]
922        let message: Vec<u8> = vec![
923            146, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
924            167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
925            129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
926            250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122,
927        ];
928
929        let expected = vec![
930            mock_event("foo", "2015-09-07T01:23:04Z"),
931            mock_event("bar", "2015-09-07T01:23:05Z"),
932            mock_event("baz", "2015-09-07T01:23:06Z"),
933        ];
934        let got = decode_all(message.clone()).unwrap();
935
936        assert_eq!(got.1, message.len());
937        assert_event_data_eq!(got.0[0], expected[0]);
938        assert_event_data_eq!(got.0[1], expected[1]);
939        assert_event_data_eq!(got.0[2], expected[2]);
940    }
941
942    #[test]
943    fn decode_forward_mode_with_options() {
944        //[
945        //    "tag.name",
946        //    [
947        //        [1441588984, {"message": "foo"}],
948        //        [1441588985, {"message": "bar"}],
949        //        [1441588986, {"message": "baz"}]
950        //    ],
951        //    {"size": 3}
952        //]
953        let message: Vec<u8> = vec![
954            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 147, 146, 206, 85, 236, 230, 248, 129,
955            167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230, 249,
956            129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236, 230,
957            250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 164, 115, 105,
958            122, 101, 3,
959        ];
960
961        let expected = vec![
962            mock_event("foo", "2015-09-07T01:23:04Z"),
963            mock_event("bar", "2015-09-07T01:23:05Z"),
964            mock_event("baz", "2015-09-07T01:23:06Z"),
965        ];
966
967        let got = decode_all(message.clone()).unwrap();
968
969        assert_eq!(got.1, message.len());
970
971        assert_event_data_eq!(got.0[0], expected[0]);
972        assert_event_data_eq!(got.0[1], expected[1]);
973        assert_event_data_eq!(got.0[2], expected[2]);
974    }
975
976    #[test]
977    fn decode_packed_forward_mode() {
978        //[
979        //    "tag.name",
980        //    <packed messages>
981        //]
982        //
983        //With packed messages as bin:
984        // [1441588984, {"message": "foo"}]
985        // [1441588985, {"message": "bar"}]
986        // [1441588986, {"message": "baz"}]
987        let message: Vec<u8> = vec![
988            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 57, 146, 206, 85, 236, 230, 248,
989            129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 102, 111, 111, 146, 206, 85, 236, 230,
990            249, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 114, 146, 206, 85, 236,
991            230, 250, 129, 167, 109, 101, 115, 115, 97, 103, 101, 163, 98, 97, 122, 129, 167, 109,
992            101, 115, 115, 97, 103, 101, 163, 102, 111, 111,
993        ];
994
995        let expected = vec![
996            mock_event("foo", "2015-09-07T01:23:04Z"),
997            mock_event("bar", "2015-09-07T01:23:05Z"),
998            mock_event("baz", "2015-09-07T01:23:06Z"),
999        ];
1000
1001        let got = decode_all(message.clone()).unwrap();
1002
1003        assert_eq!(got.1, message.len());
1004        assert_event_data_eq!(got.0[0], expected[0]);
1005        assert_event_data_eq!(got.0[1], expected[1]);
1006        assert_event_data_eq!(got.0[2], expected[2]);
1007    }
1008
1009    //  TODO
1010    #[test]
1011    fn decode_compressed_packed_forward_mode() {
1012        //[
1013        //    "tag.name",
1014        //    <packed messages>,
1015        //    {"compressed": "gzip"}
1016        //]
1017        //
1018        //With gzip'd packed messages as bin:
1019        // [1441588984, {"message": "foo"}]
1020        // [1441588985, {"message": "bar"}]
1021        // [1441588986, {"message": "baz"}]
1022        let message: Vec<u8> = vec![
1023            147, 168, 116, 97, 103, 46, 110, 97, 109, 101, 196, 55, 31, 139, 8, 0, 245, 10, 168,
1024            96, 0, 3, 155, 116, 46, 244, 205, 179, 31, 141, 203, 115, 83, 139, 139, 19, 211, 83,
1025            23, 167, 229, 231, 79, 2, 9, 253, 68, 8, 37, 37, 22, 129, 133, 126, 33, 11, 85, 1, 0,
1026            53, 3, 158, 28, 57, 0, 0, 0, 129, 170, 99, 111, 109, 112, 114, 101, 115, 115, 101, 100,
1027            164, 103, 122, 105, 112,
1028        ];
1029
1030        let expected = vec![
1031            mock_event("foo", "2015-09-07T01:23:04Z"),
1032            mock_event("bar", "2015-09-07T01:23:05Z"),
1033            mock_event("baz", "2015-09-07T01:23:06Z"),
1034        ];
1035
1036        let got = decode_all(message.clone()).unwrap();
1037
1038        assert_eq!(got.1, message.len());
1039        assert_event_data_eq!(got.0[0], expected[0]);
1040        assert_event_data_eq!(got.0[1], expected[1]);
1041        assert_event_data_eq!(got.0[2], expected[2]);
1042    }
1043
1044    fn decode_all(message: Vec<u8>) -> Result<(SmallVec<[Event; 1]>, usize), DecodeError> {
1045        let mut buf = BytesMut::from(&message[..]);
1046
1047        let mut decoder = FluentDecoder::new(LogNamespace::default());
1048
1049        let (frame, byte_size) = decoder.decode(&mut buf)?.unwrap();
1050        Ok((frame.into(), byte_size))
1051    }
1052
1053    #[tokio::test]
1054    async fn ack_delivered_without_chunk() {
1055        let (result, output) = check_acknowledgements(EventStatus::Delivered, false).await;
1056        assert!(result.is_err()); // the `_` inside this error is `Elapsed`
1057        assert!(output.is_empty());
1058    }
1059
1060    #[tokio::test]
1061    async fn ack_delivered_with_chunk() {
1062        let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
1063        assert_eq!(result.unwrap().unwrap(), output.len());
1064        let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; // { "ack": ...
1065        assert_eq!(output[..expected.len()], expected);
1066    }
1067
1068    #[tokio::test]
1069    async fn ack_failed_without_chunk() {
1070        let (result, output) = check_acknowledgements(EventStatus::Rejected, false).await;
1071        assert_eq!(result.unwrap().unwrap(), output.len());
1072        assert!(output.is_empty());
1073    }
1074
1075    #[tokio::test]
1076    async fn ack_failed_with_chunk() {
1077        let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
1078        assert_eq!(result.unwrap().unwrap(), output.len());
1079        let expected: Vec<u8> = vec![0x80]; // { }
1080        assert_eq!(output, expected);
1081    }
1082
1083    async fn check_acknowledgements(
1084        status: EventStatus,
1085        with_chunk: bool,
1086    ) -> (Result<Result<usize, std::io::Error>, Elapsed>, Bytes) {
1087        trace_init();
1088
1089        let (sender, recv) = SourceSender::new_test_finalize(status);
1090        let address = next_addr();
1091        let source = FluentConfig {
1092            mode: FluentMode::Tcp(FluentTcpConfig {
1093                address: address.into(),
1094                tls: None,
1095                keepalive: None,
1096                permit_origin: None,
1097                receive_buffer_bytes: None,
1098                acknowledgements: true.into(),
1099                connection_limit: None,
1100            }),
1101            log_namespace: None,
1102        }
1103        .build(SourceContext::new_test(sender, None))
1104        .await
1105        .unwrap();
1106        tokio::spawn(source);
1107        wait_for_tcp(address).await;
1108
1109        let msg = uuid::Uuid::new_v4().to_string();
1110        let tag = uuid::Uuid::new_v4().to_string();
1111        let req = build_req(&tag, &[("field", &msg)], with_chunk);
1112
1113        let sender = tokio::spawn(async move {
1114            let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
1115            socket.write_all(&req).await.unwrap();
1116
1117            let mut output = BytesMut::new();
1118            (
1119                timeout(Duration::from_millis(250), socket.read_buf(&mut output)).await,
1120                output,
1121            )
1122        });
1123        let events = test_util::collect_n(recv, 1).await;
1124        let (result, output) = sender.await.unwrap();
1125
1126        assert_eq!(events.len(), 1);
1127        let log = events[0].as_log();
1128        assert_eq!(log.get("field").unwrap(), &msg.into());
1129        assert!(matches!(log.get("host").unwrap(), Value::Bytes(_)));
1130        assert!(matches!(log.get("timestamp").unwrap(), Value::Timestamp(_)));
1131        assert_eq!(log.get("tag").unwrap(), &tag.into());
1132
1133        (result, output.into())
1134    }
1135
1136    fn build_req(tag: &str, fields: &[(&str, &str)], with_chunk: bool) -> Vec<u8> {
1137        let mut record = FluentRecord::default();
1138        for (tag, value) in fields {
1139            record.insert((*tag).into(), rmpv::Value::String((*value).into()).into());
1140        }
1141        let chunk = with_chunk.then(|| BASE64_STANDARD.encode(uuid::Uuid::new_v4().as_bytes()));
1142        let req = FluentMessage::MessageWithOptions(
1143            tag.into(),
1144            FluentTimestamp::Unix(Utc::now()),
1145            record,
1146            FluentMessageOptions {
1147                chunk,
1148                ..Default::default()
1149            },
1150        );
1151        let mut buf = Vec::new();
1152        req.serialize(&mut Serializer::new(&mut buf)).unwrap();
1153        buf
1154    }
1155
1156    #[test]
1157    fn output_schema_definition_vector_namespace() {
1158        let config = FluentConfig {
1159            mode: FluentMode::Tcp(FluentTcpConfig {
1160                address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1161                tls: None,
1162                keepalive: None,
1163                permit_origin: None,
1164                receive_buffer_bytes: None,
1165                acknowledgements: false.into(),
1166                connection_limit: None,
1167            }),
1168            log_namespace: Some(true),
1169        };
1170
1171        let definitions = config
1172            .outputs(LogNamespace::Vector)
1173            .remove(0)
1174            .schema_definition(true);
1175
1176        let expected_definition =
1177            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1178                .with_meaning(OwnedTargetPath::event_root(), "message")
1179                .with_metadata_field(
1180                    &owned_value_path!("vector", "source_type"),
1181                    Kind::bytes(),
1182                    None,
1183                )
1184                .with_metadata_field(&owned_value_path!("fluent", "tag"), Kind::bytes(), None)
1185                .with_metadata_field(
1186                    &owned_value_path!("fluent", "timestamp"),
1187                    Kind::timestamp(),
1188                    Some("timestamp"),
1189                )
1190                .with_metadata_field(
1191                    &owned_value_path!("fluent", "record"),
1192                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1193                    None,
1194                )
1195                .with_metadata_field(
1196                    &owned_value_path!("vector", "ingest_timestamp"),
1197                    Kind::timestamp(),
1198                    None,
1199                )
1200                .with_metadata_field(
1201                    &owned_value_path!("fluent", "host"),
1202                    Kind::bytes(),
1203                    Some("host"),
1204                )
1205                .with_metadata_field(
1206                    &owned_value_path!("fluent", "tls_client_metadata"),
1207                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
1208                    None,
1209                );
1210
1211        assert_eq!(definitions, Some(expected_definition))
1212    }
1213
1214    #[test]
1215    fn output_schema_definition_legacy_namespace() {
1216        let config = FluentConfig {
1217            mode: FluentMode::Tcp(FluentTcpConfig {
1218                address: SocketListenAddr::SocketAddr("0.0.0.0:24224".parse().unwrap()),
1219                tls: None,
1220                keepalive: None,
1221                permit_origin: None,
1222                receive_buffer_bytes: None,
1223                acknowledgements: false.into(),
1224                connection_limit: None,
1225            }),
1226            log_namespace: None,
1227        };
1228
1229        let definitions = config
1230            .outputs(LogNamespace::Legacy)
1231            .remove(0)
1232            .schema_definition(true);
1233
1234        let expected_definition = Definition::new_with_default_metadata(
1235            Kind::object(Collection::empty()),
1236            [LogNamespace::Legacy],
1237        )
1238        .with_event_field(
1239            &owned_value_path!("message"),
1240            Kind::bytes(),
1241            Some("message"),
1242        )
1243        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1244        .with_event_field(&owned_value_path!("tag"), Kind::bytes(), None)
1245        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
1246        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
1247        .unknown_fields(Kind::bytes());
1248
1249        assert_eq!(definitions, Some(expected_definition))
1250    }
1251}
1252
1253#[cfg(all(test, feature = "fluent-integration-tests"))]
1254mod integration_tests {
1255    use std::{fs::File, io::Write, net::SocketAddr, time::Duration};
1256
1257    use futures::Stream;
1258    use tokio::time::sleep;
1259    use vector_lib::event::{Event, EventStatus};
1260
1261    use crate::{
1262        SourceSender,
1263        config::{SourceConfig, SourceContext},
1264        docker::Container,
1265        sources::fluent::{FluentConfig, FluentMode, FluentTcpConfig},
1266        test_util::{
1267            collect_ready,
1268            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
1269            next_addr, next_addr_for_ip, random_string, wait_for_tcp,
1270        },
1271    };
1272
1273    const FLUENT_BIT_IMAGE: &str = "fluent/fluent-bit";
1274    const FLUENT_BIT_TAG: &str = "1.7";
1275    const FLUENTD_IMAGE: &str = "fluent/fluentd";
1276    const FLUENTD_TAG: &str = "v1.12";
1277
1278    fn make_file(name: &str, content: &str) -> tempfile::TempDir {
1279        let dir = tempfile::tempdir().unwrap();
1280        let mut file = File::create(dir.path().join(name)).unwrap();
1281        write!(&mut file, "{content}").unwrap();
1282        dir
1283    }
1284
1285    #[tokio::test]
1286    async fn fluentbit() {
1287        test_fluentbit(EventStatus::Delivered).await;
1288    }
1289
1290    #[tokio::test]
1291    async fn fluentbit_rejection() {
1292        test_fluentbit(EventStatus::Rejected).await;
1293    }
1294
1295    async fn test_fluentbit(status: EventStatus) {
1296        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1297            let test_address = next_addr();
1298            let (out, source_address) = source(status).await;
1299
1300            let dir = make_file(
1301                "fluent-bit.conf",
1302                &format!(
1303                    r#"
1304[SERVICE]
1305    Grace      0
1306    Flush      1
1307    Daemon     off
1308
1309[INPUT]
1310    Name       http
1311    Host       {listen_host}
1312    Port       {listen_port}
1313
1314[OUTPUT]
1315    Name          forward
1316    Match         *
1317    Host          host.docker.internal
1318    Port          {send_port}
1319    Require_ack_response true
1320    "#,
1321                    listen_host = test_address.ip(),
1322                    listen_port = test_address.port(),
1323                    send_port = source_address.port(),
1324                ),
1325            );
1326
1327            let msg = random_string(64);
1328            let body = serde_json::json!({ "message": msg });
1329
1330            let events = Container::new(FLUENT_BIT_IMAGE, FLUENT_BIT_TAG)
1331                .bind(dir.path().display(), "/fluent-bit/etc")
1332                .run(async move {
1333                    wait_for_tcp(test_address).await;
1334                    reqwest::Client::new()
1335                        .post(format!("http://{test_address}/"))
1336                        .header("content-type", "application/json")
1337                        .body(body.to_string())
1338                        .send()
1339                        .await
1340                        .unwrap();
1341                    sleep(Duration::from_secs(2)).await;
1342
1343                    collect_ready(out).await
1344                })
1345                .await;
1346
1347            assert_eq!(events.len(), 1);
1348            let log = events[0].as_log();
1349            assert_eq!(log["tag"], "http.0".into());
1350            assert_eq!(log["message"], msg.into());
1351            assert!(log.get("timestamp").is_some());
1352            assert!(log.get("host").is_some());
1353        })
1354        .await;
1355    }
1356
1357    #[tokio::test]
1358    async fn fluentd() {
1359        test_fluentd(EventStatus::Delivered, "").await;
1360    }
1361
1362    #[tokio::test]
1363    async fn fluentd_gzip() {
1364        test_fluentd(EventStatus::Delivered, "compress gzip").await;
1365    }
1366
1367    #[tokio::test]
1368    async fn fluentd_rejection() {
1369        test_fluentd(EventStatus::Rejected, "").await;
1370    }
1371
1372    async fn test_fluentd(status: EventStatus, options: &str) {
1373        assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async move {
1374            let test_address = next_addr();
1375            let (out, source_address) = source(status).await;
1376
1377            let config = format!(
1378                r#"
1379<source>
1380  @type http
1381  bind {http_host}
1382  port {http_port}
1383</source>
1384
1385<match *>
1386  @type forward
1387  <server>
1388    name  local
1389    host  host.docker.internal
1390    port  {port}
1391  </server>
1392  <buffer>
1393    flush_mode immediate
1394  </buffer>
1395  require_ack_response true
1396  ack_response_timeout 1
1397  {options}
1398</match>
1399"#,
1400                http_host = test_address.ip(),
1401                http_port = test_address.port(),
1402                port = source_address.port(),
1403                options = options
1404            );
1405
1406            let dir = make_file("fluent.conf", &config);
1407
1408            let msg = random_string(64);
1409            let body = serde_json::json!({ "message": msg });
1410
1411            let events = Container::new(FLUENTD_IMAGE, FLUENTD_TAG)
1412                .bind(dir.path().display(), "/fluentd/etc")
1413                .run(async move {
1414                    wait_for_tcp(test_address).await;
1415                    reqwest::Client::new()
1416                        .post(format!("http://{test_address}/"))
1417                        .header("content-type", "application/json")
1418                        .body(body.to_string())
1419                        .send()
1420                        .await
1421                        .unwrap();
1422                    sleep(Duration::from_secs(2)).await;
1423                    collect_ready(out).await
1424                })
1425                .await;
1426
1427            assert_eq!(events.len(), 1);
1428            assert_eq!(events[0].as_log()["tag"], "".into());
1429            assert_eq!(events[0].as_log()["message"], msg.into());
1430            assert!(events[0].as_log().get("timestamp").is_some());
1431            assert!(events[0].as_log().get("host").is_some());
1432        })
1433        .await;
1434    }
1435
1436    async fn source(status: EventStatus) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
1437        let (sender, recv) = SourceSender::new_test_finalize(status);
1438        let address = next_addr_for_ip(std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED));
1439        tokio::spawn(async move {
1440            FluentConfig {
1441                mode: FluentMode::Tcp(FluentTcpConfig {
1442                    address: address.into(),
1443                    tls: None,
1444                    keepalive: None,
1445                    permit_origin: None,
1446                    receive_buffer_bytes: None,
1447                    acknowledgements: false.into(),
1448                    connection_limit: None,
1449                }),
1450                log_namespace: None,
1451            }
1452            .build(SourceContext::new_test(sender, None))
1453            .await
1454            .unwrap()
1455            .await
1456            .unwrap()
1457        });
1458        wait_for_tcp(address).await;
1459        (recv, address)
1460    }
1461}