vector/sources/
logstash.rs

1use std::{
2    collections::{BTreeMap, VecDeque},
3    convert::TryFrom,
4    io::{self, Read},
5    net::SocketAddr,
6    time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use flate2::read::ZlibDecoder;
11use smallvec::{SmallVec, smallvec};
12use snafu::{ResultExt, Snafu};
13use tokio_util::codec::Decoder;
14use vector_lib::{
15    codecs::{BytesDeserializerConfig, StreamDecodingError},
16    config::{LegacyKey, LogNamespace},
17    configurable::configurable_component,
18    ipallowlist::IpAllowlistConfig,
19    lookup::{OwnedValuePath, event_path, metadata_path, owned_value_path, path},
20    schema::Definition,
21};
22use vrl::value::{KeyString, Kind, kind::Collection};
23
24use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
25use crate::{
26    config::{
27        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
28        SourceContext, SourceOutput, log_schema,
29    },
30    event::{Event, LogEvent, Value},
31    serde::bool_or_struct,
32    tcp::TcpKeepaliveConfig,
33    tls::{MaybeTlsSettings, TlsSourceConfig},
34    types,
35};
36
37/// Configuration for the `logstash` source.
38#[configurable_component(source("logstash", "Collect logs from a Logstash agent."))]
39#[derive(Clone, Debug)]
40pub struct LogstashConfig {
41    #[configurable(derived)]
42    address: SocketListenAddr,
43
44    #[configurable(derived)]
45    #[configurable(metadata(docs::advanced))]
46    keepalive: Option<TcpKeepaliveConfig>,
47
48    #[configurable(derived)]
49    pub permit_origin: Option<IpAllowlistConfig>,
50
51    #[configurable(derived)]
52    tls: Option<TlsSourceConfig>,
53
54    /// The size of the receive buffer used for each connection.
55    #[configurable(metadata(docs::type_unit = "bytes"))]
56    #[configurable(metadata(docs::examples = 65536))]
57    #[configurable(metadata(docs::advanced))]
58    receive_buffer_bytes: Option<usize>,
59
60    /// The maximum number of TCP connections that are allowed at any given time.
61    #[configurable(metadata(docs::type_unit = "connections"))]
62    #[configurable(metadata(docs::advanced))]
63    connection_limit: Option<u32>,
64
65    #[configurable(derived)]
66    #[serde(default, deserialize_with = "bool_or_struct")]
67    acknowledgements: SourceAcknowledgementsConfig,
68
69    /// The namespace to use for logs. This overrides the global setting.
70    #[configurable(metadata(docs::hidden))]
71    #[serde(default)]
72    log_namespace: Option<bool>,
73}
74
75impl LogstashConfig {
76    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
77    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
78        // `host_key` is only inserted if not present already.
79        let host_key = log_schema()
80            .host_key()
81            .cloned()
82            .map(LegacyKey::InsertIfEmpty);
83
84        let tls_client_metadata_path = self
85            .tls
86            .as_ref()
87            .and_then(|tls| tls.client_metadata_key.as_ref())
88            .and_then(|k| k.path.clone())
89            .map(LegacyKey::Overwrite);
90
91        BytesDeserializerConfig
92            .schema_definition(log_namespace)
93            .with_standard_vector_source_metadata()
94            .with_source_metadata(
95                LogstashConfig::NAME,
96                None,
97                &owned_value_path!("timestamp"),
98                Kind::timestamp().or_undefined(),
99                Some("timestamp"),
100            )
101            .with_source_metadata(
102                LogstashConfig::NAME,
103                host_key,
104                &owned_value_path!("host"),
105                Kind::bytes(),
106                Some("host"),
107            )
108            .with_source_metadata(
109                Self::NAME,
110                tls_client_metadata_path,
111                &owned_value_path!("tls_client_metadata"),
112                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
113                None,
114            )
115    }
116}
117
118impl Default for LogstashConfig {
119    fn default() -> Self {
120        Self {
121            address: SocketListenAddr::SocketAddr("0.0.0.0:5044".parse().unwrap()),
122            keepalive: None,
123            permit_origin: None,
124            tls: None,
125            receive_buffer_bytes: None,
126            acknowledgements: Default::default(),
127            connection_limit: None,
128            log_namespace: None,
129        }
130    }
131}
132
133impl GenerateConfig for LogstashConfig {
134    fn generate_config() -> toml::Value {
135        toml::Value::try_from(LogstashConfig::default()).unwrap()
136    }
137}
138
139#[async_trait::async_trait]
140#[typetag::serde(name = "logstash")]
141impl SourceConfig for LogstashConfig {
142    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
143        let log_namespace = cx.log_namespace(self.log_namespace);
144        let source = LogstashSource {
145            timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()),
146            legacy_host_key_path: log_schema().host_key().cloned(),
147            log_namespace,
148        };
149        let shutdown_secs = Duration::from_secs(30);
150        let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
151        let tls_client_metadata_key = self
152            .tls
153            .as_ref()
154            .and_then(|tls| tls.client_metadata_key.clone())
155            .and_then(|k| k.path);
156
157        let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
158        source.run(
159            self.address,
160            self.keepalive,
161            shutdown_secs,
162            tls,
163            tls_client_metadata_key,
164            self.receive_buffer_bytes,
165            None,
166            cx,
167            self.acknowledgements,
168            self.connection_limit,
169            self.permit_origin.clone().map(Into::into),
170            LogstashConfig::NAME,
171            log_namespace,
172        )
173    }
174
175    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
176        // There is a global and per-source `log_namespace` config.
177        // The source config overrides the global setting and is merged here.
178        vec![SourceOutput::new_maybe_logs(
179            DataType::Log,
180            self.schema_definition(global_log_namespace.merge(self.log_namespace)),
181        )]
182    }
183
184    fn resources(&self) -> Vec<Resource> {
185        vec![self.address.as_tcp_resource()]
186    }
187
188    fn can_acknowledge(&self) -> bool {
189        true
190    }
191}
192
193#[derive(Debug, Clone)]
194struct LogstashSource {
195    timestamp_converter: types::Conversion,
196    log_namespace: LogNamespace,
197    legacy_host_key_path: Option<OwnedValuePath>,
198}
199
200impl TcpSource for LogstashSource {
201    type Error = DecodeError;
202    type Item = LogstashEventFrame;
203    type Decoder = LogstashDecoder;
204    type Acker = LogstashAcker;
205
206    fn decoder(&self) -> Self::Decoder {
207        LogstashDecoder::new()
208    }
209
210    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
211        let now = chrono::Utc::now();
212        for event in events {
213            let log = event.as_mut_log();
214
215            self.log_namespace.insert_vector_metadata(
216                log,
217                log_schema().source_type_key(),
218                path!("source_type"),
219                Bytes::from_static(LogstashConfig::NAME.as_bytes()),
220            );
221
222            let log_timestamp = log.get(event_path!("@timestamp")).and_then(|timestamp| {
223                self.timestamp_converter
224                    .convert::<Value>(timestamp.coerce_to_bytes())
225                    .ok()
226            });
227
228            // Vector: always insert `ingest_timestamp`. Insert `timestamp` if found in event.
229            //
230            // Legacy: always insert the global log schema timestamp key- use timestamp from
231            //         event if present, otherwise use ingest.
232            match self.log_namespace {
233                LogNamespace::Vector => {
234                    if let Some(timestamp) = log_timestamp {
235                        log.insert(metadata_path!(LogstashConfig::NAME, "timestamp"), timestamp);
236                    }
237                    log.insert(metadata_path!("vector", "ingest_timestamp"), now);
238                }
239                LogNamespace::Legacy => {
240                    if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
241                        log.insert(
242                            timestamp_key,
243                            log_timestamp.unwrap_or_else(|| Value::from(now)),
244                        );
245                    }
246                }
247            }
248
249            self.log_namespace.insert_source_metadata(
250                LogstashConfig::NAME,
251                log,
252                self.legacy_host_key_path
253                    .as_ref()
254                    .map(LegacyKey::InsertIfEmpty),
255                path!("host"),
256                host.ip().to_string(),
257            );
258        }
259    }
260
261    fn build_acker(&self, frames: &[Self::Item]) -> Self::Acker {
262        LogstashAcker::new(frames)
263    }
264}
265
266struct LogstashAcker {
267    sequence_number: u32,
268    protocol_version: Option<LogstashProtocolVersion>,
269}
270
271impl LogstashAcker {
272    fn new(frames: &[LogstashEventFrame]) -> Self {
273        let mut sequence_number = 0;
274        let mut protocol_version = None;
275
276        for frame in frames {
277            sequence_number = std::cmp::max(sequence_number, frame.sequence_number);
278            // We assume that it's valid to ack via any of the protocol versions that we've seen in
279            // a set of frames from a single stream, so here we just take the last. In reality, we
280            // do not expect stream with multiple protocol versions to occur.
281            protocol_version = Some(frame.protocol);
282        }
283
284        Self {
285            sequence_number,
286            protocol_version,
287        }
288    }
289}
290
291impl TcpSourceAcker for LogstashAcker {
292    // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type
293    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
294        match (ack, self.protocol_version) {
295            (TcpSourceAck::Ack, Some(protocol_version)) => {
296                let mut bytes: Vec<u8> = Vec::with_capacity(6);
297                bytes.push(protocol_version.into());
298                bytes.push(LogstashFrameType::Ack.into());
299                bytes.extend(self.sequence_number.to_be_bytes().iter());
300                Some(Bytes::from(bytes))
301            }
302            _ => None,
303        }
304    }
305}
306
307#[derive(Debug)]
308enum LogstashDecoderReadState {
309    ReadProtocol,
310    ReadType(LogstashProtocolVersion),
311    ReadFrame(LogstashProtocolVersion, LogstashFrameType),
312    PendingFrames(VecDeque<(LogstashEventFrame, usize)>),
313}
314
315#[derive(Debug)]
316struct LogstashDecoder {
317    state: LogstashDecoderReadState,
318}
319
320impl LogstashDecoder {
321    const fn new() -> Self {
322        Self {
323            state: LogstashDecoderReadState::ReadProtocol,
324        }
325    }
326}
327
328#[derive(Debug, Snafu)]
329pub enum DecodeError {
330    #[snafu(display("i/o error: {}", source))]
331    IO { source: io::Error },
332    #[snafu(display("Unknown logstash protocol version: {}", version))]
333    UnknownProtocolVersion { version: char },
334    #[snafu(display("Unknown logstash protocol message type: {}", frame_type))]
335    UnknownFrameType { frame_type: char },
336    #[snafu(display("Failed to decode JSON frame: {}", source))]
337    JsonFrameFailedDecode { source: serde_json::Error },
338    #[snafu(display("Failed to decompress compressed frame: {}", source))]
339    DecompressionFailed { source: io::Error },
340}
341
342impl StreamDecodingError for DecodeError {
343    fn can_continue(&self) -> bool {
344        use DecodeError::*;
345
346        match self {
347            IO { .. } => false,
348            UnknownProtocolVersion { .. } => false,
349            UnknownFrameType { .. } => false,
350            JsonFrameFailedDecode { .. } => true,
351            DecompressionFailed { .. } => true,
352        }
353    }
354}
355
356impl From<io::Error> for DecodeError {
357    fn from(source: io::Error) -> Self {
358        DecodeError::IO { source }
359    }
360}
361
362#[derive(Debug, Clone, Copy)]
363enum LogstashProtocolVersion {
364    V1, // 1
365    V2, // 2
366}
367
368impl From<LogstashProtocolVersion> for u8 {
369    fn from(frame_type: LogstashProtocolVersion) -> u8 {
370        use LogstashProtocolVersion::*;
371
372        match frame_type {
373            V1 => b'1',
374            V2 => b'2',
375        }
376    }
377}
378
379impl TryFrom<u8> for LogstashProtocolVersion {
380    type Error = DecodeError;
381
382    fn try_from(frame_type: u8) -> Result<LogstashProtocolVersion, DecodeError> {
383        use LogstashProtocolVersion::*;
384
385        match frame_type {
386            b'1' => Ok(V1),
387            b'2' => Ok(V2),
388            version => Err(DecodeError::UnknownProtocolVersion {
389                version: version as char,
390            }),
391        }
392    }
393}
394
395#[derive(Debug, Clone, Copy)]
396enum LogstashFrameType {
397    Ack,        // A
398    WindowSize, // W
399    Data,       // D
400    Json,       // J
401    Compressed, // C
402}
403
404impl From<LogstashFrameType> for u8 {
405    fn from(frame_type: LogstashFrameType) -> u8 {
406        use LogstashFrameType::*;
407
408        match frame_type {
409            Ack => b'A',
410            WindowSize => b'W',
411            Data => b'D',
412            Json => b'J',
413            Compressed => b'C',
414        }
415    }
416}
417
418impl TryFrom<u8> for LogstashFrameType {
419    type Error = DecodeError;
420
421    fn try_from(frame_type: u8) -> Result<LogstashFrameType, DecodeError> {
422        use LogstashFrameType::*;
423
424        match frame_type {
425            b'A' => Ok(Ack),
426            b'W' => Ok(WindowSize),
427            b'D' => Ok(Data),
428            b'J' => Ok(Json),
429            b'C' => Ok(Compressed),
430            frame_type => Err(DecodeError::UnknownFrameType {
431                frame_type: frame_type as char,
432            }),
433        }
434    }
435}
436
437/// Normalized event from logstash frame
438#[derive(Debug)]
439struct LogstashEventFrame {
440    protocol: LogstashProtocolVersion,
441    sequence_number: u32,
442    fields: BTreeMap<KeyString, serde_json::Value>,
443}
444
445// Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md
446// And implementation from logstash: https://github.com/logstash-plugins/logstash-input-beats/blob/27bad62a26a81fc000a9d21495b8dc7174ab63e9/src/main/java/org/logstash/beats/BeatsParser.java
447impl Decoder for LogstashDecoder {
448    type Item = (LogstashEventFrame, usize);
449    type Error = DecodeError;
450
451    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
452        // This implements a sort of simple state machine to read the frames from the wire
453        //
454        // Each matched arm with either:
455        // * Return that there is not enough data
456        // * Return an error
457        // * Read some bytes and advance the state
458        loop {
459            self.state = match self.state {
460                // if we have any unsent frames, send them before reading new logstash frame
461                LogstashDecoderReadState::PendingFrames(ref mut frames) => {
462                    match frames.pop_front() {
463                        Some(frame) => return Ok(Some(frame)),
464                        None => LogstashDecoderReadState::ReadProtocol,
465                    }
466                }
467                LogstashDecoderReadState::ReadProtocol => {
468                    if src.remaining() < 1 {
469                        return Ok(None);
470                    }
471
472                    use LogstashProtocolVersion::*;
473
474                    match LogstashProtocolVersion::try_from(src.get_u8())? {
475                        V1 => LogstashDecoderReadState::ReadType(V1),
476                        V2 => LogstashDecoderReadState::ReadType(V2),
477                    }
478                }
479                LogstashDecoderReadState::ReadType(protocol) => {
480                    if src.remaining() < 1 {
481                        return Ok(None);
482                    }
483
484                    use LogstashFrameType::*;
485
486                    match LogstashFrameType::try_from(src.get_u8())? {
487                        WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize),
488                        Data => LogstashDecoderReadState::ReadFrame(protocol, Data),
489                        Json => LogstashDecoderReadState::ReadFrame(protocol, Json),
490                        Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed),
491                        Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack),
492                    }
493                }
494                // The window size indicates how many events the writer will send before waiting
495                // for acks. As we forward events as we get them, and ack as they are received, we
496                // do not need to keep track of this.
497                //
498                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type
499                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => {
500                    if src.remaining() < 4 {
501                        return Ok(None);
502                    }
503
504                    let _window_size = src.get_u32();
505
506                    LogstashDecoderReadState::ReadProtocol
507                }
508                // we shouldn't receive acks from the writer, just skip
509                //
510                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type
511                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Ack) => {
512                    if src.remaining() < 4 {
513                        return Ok(None);
514                    }
515
516                    let _sequence_number = src.get_u32();
517
518                    LogstashDecoderReadState::ReadProtocol
519                }
520                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type
521                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => {
522                    let Some(frame) = decode_data_frame(protocol, src) else {
523                        return Ok(None);
524                    };
525
526                    LogstashDecoderReadState::PendingFrames([frame].into())
527                }
528                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type
529                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => {
530                    let Some(frame) = decode_json_frame(protocol, src)? else {
531                        return Ok(None);
532                    };
533
534                    LogstashDecoderReadState::PendingFrames([frame].into())
535                }
536                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type
537                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => {
538                    let Some(frames) = decode_compressed_frame(src)? else {
539                        return Ok(None);
540                    };
541
542                    LogstashDecoderReadState::PendingFrames(frames)
543                }
544            };
545        }
546    }
547}
548
549/// Decode the Lumberjack version 1 protocol, which use the Key:Value format.
550fn decode_data_frame(
551    protocol: LogstashProtocolVersion,
552    src: &mut BytesMut,
553) -> Option<(LogstashEventFrame, usize)> {
554    let mut rest = src.as_ref();
555
556    if rest.remaining() < 8 {
557        return None;
558    }
559    let sequence_number = rest.get_u32();
560    let pair_count = rest.get_u32();
561    if pair_count == 0 {
562        return None; // Invalid number of fields
563    }
564
565    let mut fields = BTreeMap::<KeyString, serde_json::Value>::new();
566    for _ in 0..pair_count {
567        let (key, value, right) = decode_pair(rest)?;
568        rest = right;
569
570        fields.insert(
571            String::from_utf8_lossy(key).into(),
572            String::from_utf8_lossy(value).into(),
573        );
574    }
575
576    let byte_size = bytes_remaining(src, rest);
577    src.advance(byte_size);
578
579    Some((
580        LogstashEventFrame {
581            protocol,
582            sequence_number,
583            fields,
584        },
585        byte_size,
586    ))
587}
588
589fn decode_pair(mut rest: &[u8]) -> Option<(&[u8], &[u8], &[u8])> {
590    if rest.remaining() < 4 {
591        return None;
592    }
593    let key_length = rest.get_u32() as usize;
594
595    if rest.remaining() < key_length {
596        return None;
597    }
598    let (key, right) = rest.split_at(key_length);
599    rest = right;
600
601    if rest.remaining() < 4 {
602        return None;
603    }
604    let value_length = rest.get_u32() as usize;
605    if rest.remaining() < value_length {
606        return None;
607    }
608    let (value, right) = rest.split_at(value_length);
609    Some((key, value, right))
610}
611
612fn decode_json_frame(
613    protocol: LogstashProtocolVersion,
614    src: &mut BytesMut,
615) -> Result<Option<(LogstashEventFrame, usize)>, DecodeError> {
616    let mut rest = src.as_ref();
617
618    if rest.remaining() < 8 {
619        return Ok(None);
620    }
621    let sequence_number = rest.get_u32();
622    let payload_size = rest.get_u32() as usize;
623
624    if rest.remaining() < payload_size {
625        return Ok(None);
626    }
627
628    let (slice, right) = rest.split_at(payload_size);
629    rest = right;
630
631    let fields: BTreeMap<KeyString, serde_json::Value> =
632        serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {})?;
633
634    let byte_size = bytes_remaining(src, rest);
635    src.advance(byte_size);
636
637    Ok(Some((
638        LogstashEventFrame {
639            protocol,
640            sequence_number,
641            fields,
642        },
643        byte_size,
644    )))
645}
646
647fn decode_compressed_frame(
648    src: &mut BytesMut,
649) -> Result<Option<VecDeque<(LogstashEventFrame, usize)>>, DecodeError> {
650    let mut rest = src.as_ref();
651
652    if rest.remaining() < 4 {
653        return Ok(None);
654    }
655    let payload_size = rest.get_u32() as usize;
656
657    if rest.remaining() < payload_size {
658        src.reserve(payload_size);
659        return Ok(None);
660    }
661
662    let (slice, right) = rest.split_at(payload_size);
663    rest = right;
664
665    let mut buf = Vec::new();
666
667    let res = ZlibDecoder::new(io::Cursor::new(slice))
668        .read_to_end(&mut buf)
669        .context(DecompressionFailedSnafu)
670        .map(|_| BytesMut::from(&buf[..]));
671
672    let byte_size = bytes_remaining(src, rest);
673    src.advance(byte_size);
674
675    let mut buf = res?;
676
677    let mut decoder = LogstashDecoder::new();
678
679    let mut frames = VecDeque::new();
680
681    while let Some(s) = decoder.decode(&mut buf)? {
682        frames.push_back(s);
683    }
684    Ok(Some(frames))
685}
686
687fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize {
688    let remaining = rest.remaining();
689    src.remaining() - remaining
690}
691
692impl From<LogstashEventFrame> for Event {
693    fn from(frame: LogstashEventFrame) -> Self {
694        Event::Log(LogEvent::from(
695            frame
696                .fields
697                .into_iter()
698                .map(|(key, value)| (key, Value::from(value)))
699                .collect::<BTreeMap<_, _>>(),
700        ))
701    }
702}
703
704impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
705    fn from(frame: LogstashEventFrame) -> Self {
706        smallvec![frame.into()]
707    }
708}
709
710#[cfg(test)]
711mod test {
712    use bytes::BufMut;
713    use futures::Stream;
714    use rand::{Rng, rng};
715    use tokio::io::{AsyncReadExt, AsyncWriteExt};
716    use vector_lib::lookup::OwnedTargetPath;
717    use vrl::value::kind::Collection;
718
719    use super::*;
720    use crate::{
721        SourceSender,
722        event::EventStatus,
723        test_util::{
724            addr::next_addr,
725            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
726            spawn_collect_n, wait_for_tcp,
727        },
728    };
729
730    #[test]
731    fn generate_config() {
732        crate::test_util::test_generate_config::<LogstashConfig>();
733    }
734
735    #[tokio::test]
736    async fn test_delivered() {
737        test_protocol(EventStatus::Delivered, true).await;
738    }
739
740    #[tokio::test]
741    async fn test_failed() {
742        test_protocol(EventStatus::Rejected, false).await;
743    }
744
745    async fn start_logstash(
746        status: EventStatus,
747    ) -> (SocketAddr, impl Stream<Item = Event> + Unpin) {
748        let (sender, recv) = SourceSender::new_test_finalize(status);
749        let (_guard, address) = next_addr();
750        let source = LogstashConfig {
751            address: address.into(),
752            tls: None,
753            permit_origin: None,
754            keepalive: None,
755            receive_buffer_bytes: None,
756            acknowledgements: true.into(),
757            connection_limit: None,
758            log_namespace: None,
759        }
760        .build(SourceContext::new_test(sender, None))
761        .await
762        .unwrap();
763        tokio::spawn(source);
764        wait_for_tcp(address).await;
765        (address, recv)
766    }
767
768    async fn test_protocol(status: EventStatus, sends_ack: bool) {
769        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
770            let (address, recv) = start_logstash(status).await;
771            spawn_collect_n(
772                send_req(address, &[("message", "Hello, world!")], sends_ack),
773                recv,
774                1,
775            )
776            .await
777        })
778        .await;
779
780        assert_eq!(events.len(), 1);
781        let log = events[0].as_log();
782        assert_eq!(
783            log.get("message").unwrap().to_string_lossy(),
784            "Hello, world!".to_string()
785        );
786        assert_eq!(
787            log.get("source_type").unwrap().to_string_lossy(),
788            "logstash".to_string()
789        );
790        assert!(log.get("host").is_some());
791        assert!(log.get("timestamp").is_some());
792    }
793
794    fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
795        let mut req = BytesMut::new();
796        req.put_u8(b'2');
797        req.put_u8(b'D');
798        req.put_u32(seq);
799        req.put_u32(pairs.len() as u32);
800        for (key, value) in pairs {
801            req.put_u32(key.len() as u32);
802            req.put(key.as_bytes());
803            req.put_u32(value.len() as u32);
804            req.put(value.as_bytes());
805        }
806        req.into()
807    }
808
809    #[test]
810    fn v1_decoder_does_not_panic() {
811        let seq = rng().random_range(1..u32::MAX);
812        let req = encode_req(seq, &[("message", "Hello, World!")]);
813        for i in 0..req.len() - 1 {
814            assert!(
815                decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i]))
816                    .is_none()
817            );
818        }
819    }
820
821    async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) {
822        let seq = rng().random_range(1..u32::MAX);
823        let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
824
825        let req = encode_req(seq, pairs);
826        socket.write_all(&req).await.unwrap();
827
828        let mut output = BytesMut::new();
829        socket.read_buf(&mut output).await.unwrap();
830
831        if sends_ack {
832            assert_eq!(output.get_u8(), b'2');
833            assert_eq!(output.get_u8(), b'A');
834            assert_eq!(output.get_u32(), seq);
835        }
836        assert_eq!(output.len(), 0);
837    }
838
839    #[test]
840    fn output_schema_definition_vector_namespace() {
841        let config = LogstashConfig {
842            log_namespace: Some(true),
843            ..Default::default()
844        };
845
846        let definitions = config
847            .outputs(LogNamespace::Vector)
848            .remove(0)
849            .schema_definition(true);
850
851        let expected_definition =
852            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
853                .with_meaning(OwnedTargetPath::event_root(), "message")
854                .with_metadata_field(
855                    &owned_value_path!("vector", "source_type"),
856                    Kind::bytes(),
857                    None,
858                )
859                .with_metadata_field(
860                    &owned_value_path!("vector", "ingest_timestamp"),
861                    Kind::timestamp(),
862                    None,
863                )
864                .with_metadata_field(
865                    &owned_value_path!(LogstashConfig::NAME, "timestamp"),
866                    Kind::timestamp().or_undefined(),
867                    Some("timestamp"),
868                )
869                .with_metadata_field(
870                    &owned_value_path!(LogstashConfig::NAME, "host"),
871                    Kind::bytes(),
872                    Some("host"),
873                )
874                .with_metadata_field(
875                    &owned_value_path!(LogstashConfig::NAME, "tls_client_metadata"),
876                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
877                    None,
878                );
879
880        assert_eq!(definitions, Some(expected_definition))
881    }
882
883    #[test]
884    fn output_schema_definition_legacy_namespace() {
885        let config = LogstashConfig::default();
886
887        let definitions = config
888            .outputs(LogNamespace::Legacy)
889            .remove(0)
890            .schema_definition(true);
891
892        let expected_definition = Definition::new_with_default_metadata(
893            Kind::object(Collection::empty()),
894            [LogNamespace::Legacy],
895        )
896        .with_event_field(
897            &owned_value_path!("message"),
898            Kind::bytes(),
899            Some("message"),
900        )
901        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
902        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
903        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"));
904
905        assert_eq!(definitions, Some(expected_definition))
906    }
907}
908
909#[cfg(all(test, feature = "logstash-integration-tests"))]
910mod integration_tests {
911    use std::time::Duration;
912
913    use futures::Stream;
914    use tokio::time::timeout;
915
916    use super::*;
917    use crate::{
918        SourceSender,
919        config::SourceContext,
920        event::EventStatus,
921        test_util::{
922            collect_n,
923            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
924            wait_for_tcp,
925        },
926        tls::{TlsConfig, TlsEnableableConfig},
927    };
928
929    fn heartbeat_address() -> String {
930        std::env::var("HEARTBEAT_ADDRESS")
931            .expect("Address of Beats Heartbeat service must be specified.")
932    }
933
934    #[tokio::test]
935    async fn beats_heartbeat() {
936        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
937            let out = source(heartbeat_address(), None).await;
938
939            timeout(Duration::from_secs(60), collect_n(out, 1))
940                .await
941                .unwrap()
942        })
943        .await;
944
945        assert!(!events.is_empty());
946
947        let log = events[0].as_log();
948        assert_eq!(
949            log.get("@metadata.beat"),
950            Some(String::from("heartbeat").into()).as_ref()
951        );
952        assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
953        assert!(log.get("timestamp").is_some());
954        assert!(log.get("host").is_some());
955    }
956
957    fn logstash_address() -> String {
958        std::env::var("LOGSTASH_ADDRESS")
959            .expect("Listen address for `logstash` source must be specified.")
960    }
961
962    #[tokio::test]
963    async fn logstash() {
964        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
965            let out = source(
966                logstash_address(),
967                Some(TlsEnableableConfig {
968                    enabled: Some(true),
969                    options: TlsConfig {
970                        crt_file: Some(
971                            "tests/integration/shared/data/host.docker.internal.crt".into(),
972                        ),
973                        key_file: Some(
974                            "tests/integration/shared/data/host.docker.internal.key".into(),
975                        ),
976                        ..Default::default()
977                    },
978                }),
979            )
980            .await;
981
982            timeout(Duration::from_secs(60), collect_n(out, 1))
983                .await
984                .unwrap()
985        })
986        .await;
987
988        assert!(!events.is_empty());
989
990        let log = events[0].as_log();
991        assert!(
992            log.get("line")
993                .unwrap()
994                .to_string_lossy()
995                .contains("Hello World")
996        );
997        assert!(log.get("host").is_some());
998    }
999
1000    async fn source(
1001        address: String,
1002        tls: Option<TlsEnableableConfig>,
1003    ) -> impl Stream<Item = Event> + Unpin {
1004        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1005        let address: SocketAddr = address.parse().unwrap();
1006        let tls_config = TlsSourceConfig {
1007            client_metadata_key: None,
1008            tls_config: tls.unwrap_or_default(),
1009        };
1010        tokio::spawn(async move {
1011            LogstashConfig {
1012                address: address.into(),
1013                tls: Some(tls_config),
1014                keepalive: None,
1015                permit_origin: None,
1016                receive_buffer_bytes: None,
1017                acknowledgements: false.into(),
1018                connection_limit: None,
1019                log_namespace: None,
1020            }
1021            .build(SourceContext::new_test(sender, None))
1022            .await
1023            .unwrap()
1024            .await
1025            .unwrap()
1026        });
1027        wait_for_tcp(address).await;
1028        recv
1029    }
1030}