vector/sources/
logstash.rs

1use std::{
2    collections::{BTreeMap, VecDeque},
3    convert::TryFrom,
4    io::{self, Read},
5    net::SocketAddr,
6    time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use flate2::read::ZlibDecoder;
11use smallvec::{SmallVec, smallvec};
12use snafu::{ResultExt, Snafu};
13use tokio_util::codec::Decoder;
14use vector_lib::{
15    codecs::{BytesDeserializerConfig, StreamDecodingError},
16    config::{LegacyKey, LogNamespace},
17    configurable::configurable_component,
18    ipallowlist::IpAllowlistConfig,
19    lookup::{OwnedValuePath, event_path, metadata_path, owned_value_path, path},
20    schema::Definition,
21};
22use vrl::value::{KeyString, Kind, kind::Collection};
23
24use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
25use crate::{
26    config::{
27        DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
28        SourceContext, SourceOutput, log_schema,
29    },
30    event::{Event, LogEvent, Value},
31    serde::bool_or_struct,
32    tcp::TcpKeepaliveConfig,
33    tls::{MaybeTlsSettings, TlsSourceConfig},
34    types,
35};
36
37/// Configuration for the `logstash` source.
38#[configurable_component(source("logstash", "Collect logs from a Logstash agent."))]
39#[derive(Clone, Debug)]
40pub struct LogstashConfig {
41    #[configurable(derived)]
42    address: SocketListenAddr,
43
44    #[configurable(derived)]
45    #[configurable(metadata(docs::advanced))]
46    keepalive: Option<TcpKeepaliveConfig>,
47
48    #[configurable(derived)]
49    pub permit_origin: Option<IpAllowlistConfig>,
50
51    #[configurable(derived)]
52    tls: Option<TlsSourceConfig>,
53
54    /// The size of the receive buffer used for each connection.
55    #[configurable(metadata(docs::type_unit = "bytes"))]
56    #[configurable(metadata(docs::examples = 65536))]
57    #[configurable(metadata(docs::advanced))]
58    receive_buffer_bytes: Option<usize>,
59
60    /// The maximum number of TCP connections that are allowed at any given time.
61    #[configurable(metadata(docs::type_unit = "connections"))]
62    #[configurable(metadata(docs::advanced))]
63    connection_limit: Option<u32>,
64
65    #[configurable(derived)]
66    #[serde(default, deserialize_with = "bool_or_struct")]
67    acknowledgements: SourceAcknowledgementsConfig,
68
69    /// The namespace to use for logs. This overrides the global setting.
70    #[configurable(metadata(docs::hidden))]
71    #[serde(default)]
72    log_namespace: Option<bool>,
73}
74
75impl LogstashConfig {
76    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
77    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
78        // `host_key` is only inserted if not present already.
79        let host_key = log_schema()
80            .host_key()
81            .cloned()
82            .map(LegacyKey::InsertIfEmpty);
83
84        let tls_client_metadata_path = self
85            .tls
86            .as_ref()
87            .and_then(|tls| tls.client_metadata_key.as_ref())
88            .and_then(|k| k.path.clone())
89            .map(LegacyKey::Overwrite);
90
91        BytesDeserializerConfig
92            .schema_definition(log_namespace)
93            .with_standard_vector_source_metadata()
94            .with_source_metadata(
95                LogstashConfig::NAME,
96                None,
97                &owned_value_path!("timestamp"),
98                Kind::timestamp().or_undefined(),
99                Some("timestamp"),
100            )
101            .with_source_metadata(
102                LogstashConfig::NAME,
103                host_key,
104                &owned_value_path!("host"),
105                Kind::bytes(),
106                Some("host"),
107            )
108            .with_source_metadata(
109                Self::NAME,
110                tls_client_metadata_path,
111                &owned_value_path!("tls_client_metadata"),
112                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
113                None,
114            )
115    }
116}
117
118impl Default for LogstashConfig {
119    fn default() -> Self {
120        Self {
121            address: SocketListenAddr::SocketAddr("0.0.0.0:5044".parse().unwrap()),
122            keepalive: None,
123            permit_origin: None,
124            tls: None,
125            receive_buffer_bytes: None,
126            acknowledgements: Default::default(),
127            connection_limit: None,
128            log_namespace: None,
129        }
130    }
131}
132
133impl GenerateConfig for LogstashConfig {
134    fn generate_config() -> toml::Value {
135        toml::Value::try_from(LogstashConfig::default()).unwrap()
136    }
137}
138
139#[async_trait::async_trait]
140#[typetag::serde(name = "logstash")]
141impl SourceConfig for LogstashConfig {
142    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
143        let log_namespace = cx.log_namespace(self.log_namespace);
144        let source = LogstashSource {
145            timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()),
146            legacy_host_key_path: log_schema().host_key().cloned(),
147            log_namespace,
148        };
149        let shutdown_secs = Duration::from_secs(30);
150        let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
151        let tls_client_metadata_key = self
152            .tls
153            .as_ref()
154            .and_then(|tls| tls.client_metadata_key.clone())
155            .and_then(|k| k.path);
156
157        let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
158        source.run(
159            self.address,
160            self.keepalive,
161            shutdown_secs,
162            tls,
163            tls_client_metadata_key,
164            self.receive_buffer_bytes,
165            None,
166            cx,
167            self.acknowledgements,
168            self.connection_limit,
169            self.permit_origin.clone().map(Into::into),
170            LogstashConfig::NAME,
171            log_namespace,
172        )
173    }
174
175    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
176        // There is a global and per-source `log_namespace` config.
177        // The source config overrides the global setting and is merged here.
178        vec![SourceOutput::new_maybe_logs(
179            DataType::Log,
180            self.schema_definition(global_log_namespace.merge(self.log_namespace)),
181        )]
182    }
183
184    fn resources(&self) -> Vec<Resource> {
185        vec![self.address.as_tcp_resource()]
186    }
187
188    fn can_acknowledge(&self) -> bool {
189        true
190    }
191}
192
193#[derive(Debug, Clone)]
194struct LogstashSource {
195    timestamp_converter: types::Conversion,
196    log_namespace: LogNamespace,
197    legacy_host_key_path: Option<OwnedValuePath>,
198}
199
200impl TcpSource for LogstashSource {
201    type Error = DecodeError;
202    type Item = LogstashEventFrame;
203    type Decoder = LogstashDecoder;
204    type Acker = LogstashAcker;
205
206    fn decoder(&self) -> Self::Decoder {
207        LogstashDecoder::new()
208    }
209
210    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
211        let now = chrono::Utc::now();
212        for event in events {
213            let log = event.as_mut_log();
214
215            self.log_namespace.insert_vector_metadata(
216                log,
217                log_schema().source_type_key(),
218                path!("source_type"),
219                Bytes::from_static(LogstashConfig::NAME.as_bytes()),
220            );
221
222            let log_timestamp = log.get(event_path!("@timestamp")).and_then(|timestamp| {
223                self.timestamp_converter
224                    .convert::<Value>(timestamp.coerce_to_bytes())
225                    .ok()
226            });
227
228            // Vector: always insert `ingest_timestamp`. Insert `timestamp` if found in event.
229            //
230            // Legacy: always insert the global log schema timestamp key- use timestamp from
231            //         event if present, otherwise use ingest.
232            match self.log_namespace {
233                LogNamespace::Vector => {
234                    if let Some(timestamp) = log_timestamp {
235                        log.insert(metadata_path!(LogstashConfig::NAME, "timestamp"), timestamp);
236                    }
237                    log.insert(metadata_path!("vector", "ingest_timestamp"), now);
238                }
239                LogNamespace::Legacy => {
240                    if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
241                        log.insert(
242                            timestamp_key,
243                            log_timestamp.unwrap_or_else(|| Value::from(now)),
244                        );
245                    }
246                }
247            }
248
249            self.log_namespace.insert_source_metadata(
250                LogstashConfig::NAME,
251                log,
252                self.legacy_host_key_path
253                    .as_ref()
254                    .map(LegacyKey::InsertIfEmpty),
255                path!("host"),
256                host.ip().to_string(),
257            );
258        }
259    }
260
261    fn build_acker(&self, frames: &[Self::Item]) -> Self::Acker {
262        LogstashAcker::new(frames)
263    }
264}
265
266struct LogstashAcker {
267    sequence_number: u32,
268    protocol_version: Option<LogstashProtocolVersion>,
269}
270
271impl LogstashAcker {
272    fn new(frames: &[LogstashEventFrame]) -> Self {
273        let mut sequence_number = 0;
274        let mut protocol_version = None;
275
276        for frame in frames {
277            sequence_number = std::cmp::max(sequence_number, frame.sequence_number);
278            // We assume that it's valid to ack via any of the protocol versions that we've seen in
279            // a set of frames from a single stream, so here we just take the last. In reality, we
280            // do not expect stream with multiple protocol versions to occur.
281            protocol_version = Some(frame.protocol);
282        }
283
284        Self {
285            sequence_number,
286            protocol_version,
287        }
288    }
289}
290
291impl TcpSourceAcker for LogstashAcker {
292    // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type
293    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
294        match (ack, self.protocol_version) {
295            (TcpSourceAck::Ack, Some(protocol_version)) => {
296                let mut bytes: Vec<u8> = Vec::with_capacity(6);
297                bytes.push(protocol_version.into());
298                bytes.push(LogstashFrameType::Ack.into());
299                bytes.extend(self.sequence_number.to_be_bytes().iter());
300                Some(Bytes::from(bytes))
301            }
302            _ => None,
303        }
304    }
305}
306
307#[derive(Debug)]
308enum LogstashDecoderReadState {
309    ReadProtocol,
310    ReadType(LogstashProtocolVersion),
311    ReadFrame(LogstashProtocolVersion, LogstashFrameType),
312    PendingFrames(VecDeque<(LogstashEventFrame, usize)>),
313}
314
315#[derive(Debug)]
316struct LogstashDecoder {
317    state: LogstashDecoderReadState,
318}
319
320impl LogstashDecoder {
321    const fn new() -> Self {
322        Self {
323            state: LogstashDecoderReadState::ReadProtocol,
324        }
325    }
326}
327
328#[derive(Debug, Snafu)]
329pub enum DecodeError {
330    #[snafu(display("i/o error: {}", source))]
331    IO { source: io::Error },
332    #[snafu(display("Unknown logstash protocol version: {}", version))]
333    UnknownProtocolVersion { version: char },
334    #[snafu(display("Unknown logstash protocol message type: {}", frame_type))]
335    UnknownFrameType { frame_type: char },
336    #[snafu(display("Failed to decode JSON frame: {}", source))]
337    JsonFrameFailedDecode { source: serde_json::Error },
338    #[snafu(display("Failed to decompress compressed frame: {}", source))]
339    DecompressionFailed { source: io::Error },
340}
341
342impl StreamDecodingError for DecodeError {
343    fn can_continue(&self) -> bool {
344        use DecodeError::*;
345
346        match self {
347            IO { .. } => false,
348            UnknownProtocolVersion { .. } => false,
349            UnknownFrameType { .. } => false,
350            JsonFrameFailedDecode { .. } => true,
351            DecompressionFailed { .. } => true,
352        }
353    }
354}
355
356impl From<io::Error> for DecodeError {
357    fn from(source: io::Error) -> Self {
358        DecodeError::IO { source }
359    }
360}
361
362#[derive(Debug, Clone, Copy)]
363enum LogstashProtocolVersion {
364    V1, // 1
365    V2, // 2
366}
367
368impl From<LogstashProtocolVersion> for u8 {
369    fn from(frame_type: LogstashProtocolVersion) -> u8 {
370        use LogstashProtocolVersion::*;
371
372        match frame_type {
373            V1 => b'1',
374            V2 => b'2',
375        }
376    }
377}
378
379impl TryFrom<u8> for LogstashProtocolVersion {
380    type Error = DecodeError;
381
382    fn try_from(frame_type: u8) -> Result<LogstashProtocolVersion, DecodeError> {
383        use LogstashProtocolVersion::*;
384
385        match frame_type {
386            b'1' => Ok(V1),
387            b'2' => Ok(V2),
388            version => Err(DecodeError::UnknownProtocolVersion {
389                version: version as char,
390            }),
391        }
392    }
393}
394
395#[derive(Debug, Clone, Copy)]
396enum LogstashFrameType {
397    Ack,        // A
398    WindowSize, // W
399    Data,       // D
400    Json,       // J
401    Compressed, // C
402}
403
404impl From<LogstashFrameType> for u8 {
405    fn from(frame_type: LogstashFrameType) -> u8 {
406        use LogstashFrameType::*;
407
408        match frame_type {
409            Ack => b'A',
410            WindowSize => b'W',
411            Data => b'D',
412            Json => b'J',
413            Compressed => b'C',
414        }
415    }
416}
417
418impl TryFrom<u8> for LogstashFrameType {
419    type Error = DecodeError;
420
421    fn try_from(frame_type: u8) -> Result<LogstashFrameType, DecodeError> {
422        use LogstashFrameType::*;
423
424        match frame_type {
425            b'A' => Ok(Ack),
426            b'W' => Ok(WindowSize),
427            b'D' => Ok(Data),
428            b'J' => Ok(Json),
429            b'C' => Ok(Compressed),
430            frame_type => Err(DecodeError::UnknownFrameType {
431                frame_type: frame_type as char,
432            }),
433        }
434    }
435}
436
437/// Normalized event from logstash frame
438#[derive(Debug)]
439struct LogstashEventFrame {
440    protocol: LogstashProtocolVersion,
441    sequence_number: u32,
442    fields: BTreeMap<KeyString, serde_json::Value>,
443}
444
445// Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md
446// And implementation from logstash: https://github.com/logstash-plugins/logstash-input-beats/blob/27bad62a26a81fc000a9d21495b8dc7174ab63e9/src/main/java/org/logstash/beats/BeatsParser.java
447impl Decoder for LogstashDecoder {
448    type Item = (LogstashEventFrame, usize);
449    type Error = DecodeError;
450
451    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
452        // This implements a sort of simple state machine to read the frames from the wire
453        //
454        // Each matched arm with either:
455        // * Return that there is not enough data
456        // * Return an error
457        // * Read some bytes and advance the state
458        loop {
459            self.state = match self.state {
460                // if we have any unsent frames, send them before reading new logstash frame
461                LogstashDecoderReadState::PendingFrames(ref mut frames) => {
462                    match frames.pop_front() {
463                        Some(frame) => return Ok(Some(frame)),
464                        None => LogstashDecoderReadState::ReadProtocol,
465                    }
466                }
467                LogstashDecoderReadState::ReadProtocol => {
468                    if src.remaining() < 1 {
469                        return Ok(None);
470                    }
471
472                    use LogstashProtocolVersion::*;
473
474                    match LogstashProtocolVersion::try_from(src.get_u8())? {
475                        V1 => LogstashDecoderReadState::ReadType(V1),
476                        V2 => LogstashDecoderReadState::ReadType(V2),
477                    }
478                }
479                LogstashDecoderReadState::ReadType(protocol) => {
480                    if src.remaining() < 1 {
481                        return Ok(None);
482                    }
483
484                    use LogstashFrameType::*;
485
486                    match LogstashFrameType::try_from(src.get_u8())? {
487                        WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize),
488                        Data => LogstashDecoderReadState::ReadFrame(protocol, Data),
489                        Json => LogstashDecoderReadState::ReadFrame(protocol, Json),
490                        Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed),
491                        Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack),
492                    }
493                }
494                // The window size indicates how many events the writer will send before waiting
495                // for acks. As we forward events as we get them, and ack as they are received, we
496                // do not need to keep track of this.
497                //
498                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type
499                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => {
500                    if src.remaining() < 4 {
501                        return Ok(None);
502                    }
503
504                    let _window_size = src.get_u32();
505
506                    LogstashDecoderReadState::ReadProtocol
507                }
508                // we shouldn't receive acks from the writer, just skip
509                //
510                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type
511                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Ack) => {
512                    if src.remaining() < 4 {
513                        return Ok(None);
514                    }
515
516                    let _sequence_number = src.get_u32();
517
518                    LogstashDecoderReadState::ReadProtocol
519                }
520                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type
521                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => {
522                    let Some(frame) = decode_data_frame(protocol, src) else {
523                        return Ok(None);
524                    };
525
526                    LogstashDecoderReadState::PendingFrames([frame].into())
527                }
528                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type
529                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => {
530                    let Some(frame) = decode_json_frame(protocol, src)? else {
531                        return Ok(None);
532                    };
533
534                    LogstashDecoderReadState::PendingFrames([frame].into())
535                }
536                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type
537                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => {
538                    let Some(frames) = decode_compressed_frame(src)? else {
539                        return Ok(None);
540                    };
541
542                    LogstashDecoderReadState::PendingFrames(frames)
543                }
544            };
545        }
546    }
547}
548
549/// Decode the Lumberjack version 1 protocol, which use the Key:Value format.
550fn decode_data_frame(
551    protocol: LogstashProtocolVersion,
552    src: &mut BytesMut,
553) -> Option<(LogstashEventFrame, usize)> {
554    let mut rest = src.as_ref();
555
556    if rest.remaining() < 8 {
557        return None;
558    }
559    let sequence_number = rest.get_u32();
560    let pair_count = rest.get_u32();
561    if pair_count == 0 {
562        return None; // Invalid number of fields
563    }
564
565    let mut fields = BTreeMap::<KeyString, serde_json::Value>::new();
566    for _ in 0..pair_count {
567        let (key, value, right) = decode_pair(rest)?;
568        rest = right;
569
570        fields.insert(
571            String::from_utf8_lossy(key).into(),
572            String::from_utf8_lossy(value).into(),
573        );
574    }
575
576    let byte_size = bytes_remaining(src, rest);
577    src.advance(byte_size);
578
579    Some((
580        LogstashEventFrame {
581            protocol,
582            sequence_number,
583            fields,
584        },
585        byte_size,
586    ))
587}
588
589fn decode_pair(mut rest: &[u8]) -> Option<(&[u8], &[u8], &[u8])> {
590    if rest.remaining() < 4 {
591        return None;
592    }
593    let key_length = rest.get_u32() as usize;
594
595    if rest.remaining() < key_length {
596        return None;
597    }
598    let (key, right) = rest.split_at(key_length);
599    rest = right;
600
601    if rest.remaining() < 4 {
602        return None;
603    }
604    let value_length = rest.get_u32() as usize;
605    if rest.remaining() < value_length {
606        return None;
607    }
608    let (value, right) = rest.split_at(value_length);
609    Some((key, value, right))
610}
611
612fn decode_json_frame(
613    protocol: LogstashProtocolVersion,
614    src: &mut BytesMut,
615) -> Result<Option<(LogstashEventFrame, usize)>, DecodeError> {
616    let mut rest = src.as_ref();
617
618    if rest.remaining() < 8 {
619        return Ok(None);
620    }
621    let sequence_number = rest.get_u32();
622    let payload_size = rest.get_u32() as usize;
623
624    if rest.remaining() < payload_size {
625        return Ok(None);
626    }
627
628    let (slice, right) = rest.split_at(payload_size);
629    rest = right;
630
631    let fields: BTreeMap<KeyString, serde_json::Value> =
632        serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {})?;
633
634    let byte_size = bytes_remaining(src, rest);
635    src.advance(byte_size);
636
637    Ok(Some((
638        LogstashEventFrame {
639            protocol,
640            sequence_number,
641            fields,
642        },
643        byte_size,
644    )))
645}
646
647fn decode_compressed_frame(
648    src: &mut BytesMut,
649) -> Result<Option<VecDeque<(LogstashEventFrame, usize)>>, DecodeError> {
650    let mut rest = src.as_ref();
651
652    if rest.remaining() < 4 {
653        return Ok(None);
654    }
655    let payload_size = rest.get_u32() as usize;
656
657    if rest.remaining() < payload_size {
658        src.reserve(payload_size);
659        return Ok(None);
660    }
661
662    let (slice, right) = rest.split_at(payload_size);
663    rest = right;
664
665    let mut buf = Vec::new();
666
667    let res = ZlibDecoder::new(io::Cursor::new(slice))
668        .read_to_end(&mut buf)
669        .context(DecompressionFailedSnafu)
670        .map(|_| BytesMut::from(&buf[..]));
671
672    let byte_size = bytes_remaining(src, rest);
673    src.advance(byte_size);
674
675    let mut buf = res?;
676
677    let mut decoder = LogstashDecoder::new();
678
679    let mut frames = VecDeque::new();
680
681    while let Some(s) = decoder.decode(&mut buf)? {
682        frames.push_back(s);
683    }
684    Ok(Some(frames))
685}
686
687fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize {
688    let remaining = rest.remaining();
689    src.remaining() - remaining
690}
691
692impl From<LogstashEventFrame> for Event {
693    fn from(frame: LogstashEventFrame) -> Self {
694        Event::Log(LogEvent::from(
695            frame
696                .fields
697                .into_iter()
698                .map(|(key, value)| (key, Value::from(value)))
699                .collect::<BTreeMap<_, _>>(),
700        ))
701    }
702}
703
704impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
705    fn from(frame: LogstashEventFrame) -> Self {
706        smallvec![frame.into()]
707    }
708}
709
710#[cfg(test)]
711mod test {
712    use bytes::BufMut;
713    use futures::Stream;
714    use rand::{Rng, rng};
715    use tokio::io::{AsyncReadExt, AsyncWriteExt};
716    use vector_lib::lookup::OwnedTargetPath;
717    use vrl::value::kind::Collection;
718
719    use super::*;
720    use crate::{
721        SourceSender,
722        event::EventStatus,
723        test_util::{
724            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
725            next_addr, spawn_collect_n, wait_for_tcp,
726        },
727    };
728
729    #[test]
730    fn generate_config() {
731        crate::test_util::test_generate_config::<LogstashConfig>();
732    }
733
734    #[tokio::test]
735    async fn test_delivered() {
736        test_protocol(EventStatus::Delivered, true).await;
737    }
738
739    #[tokio::test]
740    async fn test_failed() {
741        test_protocol(EventStatus::Rejected, false).await;
742    }
743
744    async fn start_logstash(
745        status: EventStatus,
746    ) -> (SocketAddr, impl Stream<Item = Event> + Unpin) {
747        let (sender, recv) = SourceSender::new_test_finalize(status);
748        let address = next_addr();
749        let source = LogstashConfig {
750            address: address.into(),
751            tls: None,
752            permit_origin: None,
753            keepalive: None,
754            receive_buffer_bytes: None,
755            acknowledgements: true.into(),
756            connection_limit: None,
757            log_namespace: None,
758        }
759        .build(SourceContext::new_test(sender, None))
760        .await
761        .unwrap();
762        tokio::spawn(source);
763        wait_for_tcp(address).await;
764        (address, recv)
765    }
766
767    async fn test_protocol(status: EventStatus, sends_ack: bool) {
768        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
769            let (address, recv) = start_logstash(status).await;
770            spawn_collect_n(
771                send_req(address, &[("message", "Hello, world!")], sends_ack),
772                recv,
773                1,
774            )
775            .await
776        })
777        .await;
778
779        assert_eq!(events.len(), 1);
780        let log = events[0].as_log();
781        assert_eq!(
782            log.get("message").unwrap().to_string_lossy(),
783            "Hello, world!".to_string()
784        );
785        assert_eq!(
786            log.get("source_type").unwrap().to_string_lossy(),
787            "logstash".to_string()
788        );
789        assert!(log.get("host").is_some());
790        assert!(log.get("timestamp").is_some());
791    }
792
793    fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
794        let mut req = BytesMut::new();
795        req.put_u8(b'2');
796        req.put_u8(b'D');
797        req.put_u32(seq);
798        req.put_u32(pairs.len() as u32);
799        for (key, value) in pairs {
800            req.put_u32(key.len() as u32);
801            req.put(key.as_bytes());
802            req.put_u32(value.len() as u32);
803            req.put(value.as_bytes());
804        }
805        req.into()
806    }
807
808    #[test]
809    fn v1_decoder_does_not_panic() {
810        let seq = rng().random_range(1..u32::MAX);
811        let req = encode_req(seq, &[("message", "Hello, World!")]);
812        for i in 0..req.len() - 1 {
813            assert!(
814                decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i]))
815                    .is_none()
816            );
817        }
818    }
819
820    async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) {
821        let seq = rng().random_range(1..u32::MAX);
822        let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
823
824        let req = encode_req(seq, pairs);
825        socket.write_all(&req).await.unwrap();
826
827        let mut output = BytesMut::new();
828        socket.read_buf(&mut output).await.unwrap();
829
830        if sends_ack {
831            assert_eq!(output.get_u8(), b'2');
832            assert_eq!(output.get_u8(), b'A');
833            assert_eq!(output.get_u32(), seq);
834        }
835        assert_eq!(output.len(), 0);
836    }
837
838    #[test]
839    fn output_schema_definition_vector_namespace() {
840        let config = LogstashConfig {
841            log_namespace: Some(true),
842            ..Default::default()
843        };
844
845        let definitions = config
846            .outputs(LogNamespace::Vector)
847            .remove(0)
848            .schema_definition(true);
849
850        let expected_definition =
851            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
852                .with_meaning(OwnedTargetPath::event_root(), "message")
853                .with_metadata_field(
854                    &owned_value_path!("vector", "source_type"),
855                    Kind::bytes(),
856                    None,
857                )
858                .with_metadata_field(
859                    &owned_value_path!("vector", "ingest_timestamp"),
860                    Kind::timestamp(),
861                    None,
862                )
863                .with_metadata_field(
864                    &owned_value_path!(LogstashConfig::NAME, "timestamp"),
865                    Kind::timestamp().or_undefined(),
866                    Some("timestamp"),
867                )
868                .with_metadata_field(
869                    &owned_value_path!(LogstashConfig::NAME, "host"),
870                    Kind::bytes(),
871                    Some("host"),
872                )
873                .with_metadata_field(
874                    &owned_value_path!(LogstashConfig::NAME, "tls_client_metadata"),
875                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
876                    None,
877                );
878
879        assert_eq!(definitions, Some(expected_definition))
880    }
881
882    #[test]
883    fn output_schema_definition_legacy_namespace() {
884        let config = LogstashConfig::default();
885
886        let definitions = config
887            .outputs(LogNamespace::Legacy)
888            .remove(0)
889            .schema_definition(true);
890
891        let expected_definition = Definition::new_with_default_metadata(
892            Kind::object(Collection::empty()),
893            [LogNamespace::Legacy],
894        )
895        .with_event_field(
896            &owned_value_path!("message"),
897            Kind::bytes(),
898            Some("message"),
899        )
900        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
901        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
902        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"));
903
904        assert_eq!(definitions, Some(expected_definition))
905    }
906}
907
908#[cfg(all(test, feature = "logstash-integration-tests"))]
909mod integration_tests {
910    use std::time::Duration;
911
912    use futures::Stream;
913    use tokio::time::timeout;
914
915    use super::*;
916    use crate::{
917        SourceSender,
918        config::SourceContext,
919        event::EventStatus,
920        test_util::{
921            collect_n,
922            components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
923            wait_for_tcp,
924        },
925        tls::{TlsConfig, TlsEnableableConfig},
926    };
927
928    fn heartbeat_address() -> String {
929        std::env::var("HEARTBEAT_ADDRESS")
930            .expect("Address of Beats Heartbeat service must be specified.")
931    }
932
933    #[tokio::test]
934    async fn beats_heartbeat() {
935        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
936            let out = source(heartbeat_address(), None).await;
937
938            timeout(Duration::from_secs(60), collect_n(out, 1))
939                .await
940                .unwrap()
941        })
942        .await;
943
944        assert!(!events.is_empty());
945
946        let log = events[0].as_log();
947        assert_eq!(
948            log.get("@metadata.beat"),
949            Some(String::from("heartbeat").into()).as_ref()
950        );
951        assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
952        assert!(log.get("timestamp").is_some());
953        assert!(log.get("host").is_some());
954    }
955
956    fn logstash_address() -> String {
957        std::env::var("LOGSTASH_ADDRESS")
958            .expect("Listen address for `logstash` source must be specified.")
959    }
960
961    #[tokio::test]
962    async fn logstash() {
963        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
964            let out = source(
965                logstash_address(),
966                Some(TlsEnableableConfig {
967                    enabled: Some(true),
968                    options: TlsConfig {
969                        crt_file: Some("tests/data/host.docker.internal.crt".into()),
970                        key_file: Some("tests/data/host.docker.internal.key".into()),
971                        ..Default::default()
972                    },
973                }),
974            )
975            .await;
976
977            timeout(Duration::from_secs(60), collect_n(out, 1))
978                .await
979                .unwrap()
980        })
981        .await;
982
983        assert!(!events.is_empty());
984
985        let log = events[0].as_log();
986        assert!(
987            log.get("line")
988                .unwrap()
989                .to_string_lossy()
990                .contains("Hello World")
991        );
992        assert!(log.get("host").is_some());
993    }
994
995    async fn source(
996        address: String,
997        tls: Option<TlsEnableableConfig>,
998    ) -> impl Stream<Item = Event> + Unpin {
999        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1000        let address: SocketAddr = address.parse().unwrap();
1001        let tls_config = TlsSourceConfig {
1002            client_metadata_key: None,
1003            tls_config: tls.unwrap_or_default(),
1004        };
1005        tokio::spawn(async move {
1006            LogstashConfig {
1007                address: address.into(),
1008                tls: Some(tls_config),
1009                keepalive: None,
1010                permit_origin: None,
1011                receive_buffer_bytes: None,
1012                acknowledgements: false.into(),
1013                connection_limit: None,
1014                log_namespace: None,
1015            }
1016            .build(SourceContext::new_test(sender, None))
1017            .await
1018            .unwrap()
1019            .await
1020            .unwrap()
1021        });
1022        wait_for_tcp(address).await;
1023        recv
1024    }
1025}