vector/sources/
logstash.rs

1use std::net::SocketAddr;
2use std::time::Duration;
3use std::{
4    collections::{BTreeMap, VecDeque},
5    convert::TryFrom,
6    io::{self, Read},
7};
8use vector_lib::ipallowlist::IpAllowlistConfig;
9
10use bytes::{Buf, Bytes, BytesMut};
11use flate2::read::ZlibDecoder;
12use smallvec::{smallvec, SmallVec};
13use snafu::{ResultExt, Snafu};
14use tokio_util::codec::Decoder;
15use vector_lib::codecs::{BytesDeserializerConfig, StreamDecodingError};
16use vector_lib::configurable::configurable_component;
17use vector_lib::lookup::{event_path, metadata_path, owned_value_path, path, OwnedValuePath};
18use vector_lib::{
19    config::{LegacyKey, LogNamespace},
20    schema::Definition,
21};
22use vrl::value::kind::Collection;
23use vrl::value::{KeyString, Kind};
24
25use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
26use crate::{
27    config::{
28        log_schema, DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
29        SourceContext, SourceOutput,
30    },
31    event::{Event, LogEvent, Value},
32    serde::bool_or_struct,
33    tcp::TcpKeepaliveConfig,
34    tls::{MaybeTlsSettings, TlsSourceConfig},
35    types,
36};
37
38/// Configuration for the `logstash` source.
39#[configurable_component(source("logstash", "Collect logs from a Logstash agent."))]
40#[derive(Clone, Debug)]
41pub struct LogstashConfig {
42    #[configurable(derived)]
43    address: SocketListenAddr,
44
45    #[configurable(derived)]
46    #[configurable(metadata(docs::advanced))]
47    keepalive: Option<TcpKeepaliveConfig>,
48
49    #[configurable(derived)]
50    pub permit_origin: Option<IpAllowlistConfig>,
51
52    #[configurable(derived)]
53    tls: Option<TlsSourceConfig>,
54
55    /// The size of the receive buffer used for each connection.
56    #[configurable(metadata(docs::type_unit = "bytes"))]
57    #[configurable(metadata(docs::examples = 65536))]
58    #[configurable(metadata(docs::advanced))]
59    receive_buffer_bytes: Option<usize>,
60
61    /// The maximum number of TCP connections that are allowed at any given time.
62    #[configurable(metadata(docs::type_unit = "connections"))]
63    #[configurable(metadata(docs::advanced))]
64    connection_limit: Option<u32>,
65
66    #[configurable(derived)]
67    #[serde(default, deserialize_with = "bool_or_struct")]
68    acknowledgements: SourceAcknowledgementsConfig,
69
70    /// The namespace to use for logs. This overrides the global setting.
71    #[configurable(metadata(docs::hidden))]
72    #[serde(default)]
73    log_namespace: Option<bool>,
74}
75
76impl LogstashConfig {
77    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
78    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
79        // `host_key` is only inserted if not present already.
80        let host_key = log_schema()
81            .host_key()
82            .cloned()
83            .map(LegacyKey::InsertIfEmpty);
84
85        let tls_client_metadata_path = self
86            .tls
87            .as_ref()
88            .and_then(|tls| tls.client_metadata_key.as_ref())
89            .and_then(|k| k.path.clone())
90            .map(LegacyKey::Overwrite);
91
92        BytesDeserializerConfig
93            .schema_definition(log_namespace)
94            .with_standard_vector_source_metadata()
95            .with_source_metadata(
96                LogstashConfig::NAME,
97                None,
98                &owned_value_path!("timestamp"),
99                Kind::timestamp().or_undefined(),
100                Some("timestamp"),
101            )
102            .with_source_metadata(
103                LogstashConfig::NAME,
104                host_key,
105                &owned_value_path!("host"),
106                Kind::bytes(),
107                Some("host"),
108            )
109            .with_source_metadata(
110                Self::NAME,
111                tls_client_metadata_path,
112                &owned_value_path!("tls_client_metadata"),
113                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
114                None,
115            )
116    }
117}
118
119impl Default for LogstashConfig {
120    fn default() -> Self {
121        Self {
122            address: SocketListenAddr::SocketAddr("0.0.0.0:5044".parse().unwrap()),
123            keepalive: None,
124            permit_origin: None,
125            tls: None,
126            receive_buffer_bytes: None,
127            acknowledgements: Default::default(),
128            connection_limit: None,
129            log_namespace: None,
130        }
131    }
132}
133
134impl GenerateConfig for LogstashConfig {
135    fn generate_config() -> toml::Value {
136        toml::Value::try_from(LogstashConfig::default()).unwrap()
137    }
138}
139
140#[async_trait::async_trait]
141#[typetag::serde(name = "logstash")]
142impl SourceConfig for LogstashConfig {
143    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
144        let log_namespace = cx.log_namespace(self.log_namespace);
145        let source = LogstashSource {
146            timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()),
147            legacy_host_key_path: log_schema().host_key().cloned(),
148            log_namespace,
149        };
150        let shutdown_secs = Duration::from_secs(30);
151        let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
152        let tls_client_metadata_key = self
153            .tls
154            .as_ref()
155            .and_then(|tls| tls.client_metadata_key.clone())
156            .and_then(|k| k.path);
157
158        let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
159        source.run(
160            self.address,
161            self.keepalive,
162            shutdown_secs,
163            tls,
164            tls_client_metadata_key,
165            self.receive_buffer_bytes,
166            None,
167            cx,
168            self.acknowledgements,
169            self.connection_limit,
170            self.permit_origin.clone().map(Into::into),
171            LogstashConfig::NAME,
172            log_namespace,
173        )
174    }
175
176    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
177        // There is a global and per-source `log_namespace` config.
178        // The source config overrides the global setting and is merged here.
179        vec![SourceOutput::new_maybe_logs(
180            DataType::Log,
181            self.schema_definition(global_log_namespace.merge(self.log_namespace)),
182        )]
183    }
184
185    fn resources(&self) -> Vec<Resource> {
186        vec![self.address.as_tcp_resource()]
187    }
188
189    fn can_acknowledge(&self) -> bool {
190        true
191    }
192}
193
194#[derive(Debug, Clone)]
195struct LogstashSource {
196    timestamp_converter: types::Conversion,
197    log_namespace: LogNamespace,
198    legacy_host_key_path: Option<OwnedValuePath>,
199}
200
201impl TcpSource for LogstashSource {
202    type Error = DecodeError;
203    type Item = LogstashEventFrame;
204    type Decoder = LogstashDecoder;
205    type Acker = LogstashAcker;
206
207    fn decoder(&self) -> Self::Decoder {
208        LogstashDecoder::new()
209    }
210
211    fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
212        let now = chrono::Utc::now();
213        for event in events {
214            let log = event.as_mut_log();
215
216            self.log_namespace.insert_vector_metadata(
217                log,
218                log_schema().source_type_key(),
219                path!("source_type"),
220                Bytes::from_static(LogstashConfig::NAME.as_bytes()),
221            );
222
223            let log_timestamp = log.get(event_path!("@timestamp")).and_then(|timestamp| {
224                self.timestamp_converter
225                    .convert::<Value>(timestamp.coerce_to_bytes())
226                    .ok()
227            });
228
229            // Vector: always insert `ingest_timestamp`. Insert `timestamp` if found in event.
230            //
231            // Legacy: always insert the global log schema timestamp key- use timestamp from
232            //         event if present, otherwise use ingest.
233            match self.log_namespace {
234                LogNamespace::Vector => {
235                    if let Some(timestamp) = log_timestamp {
236                        log.insert(metadata_path!(LogstashConfig::NAME, "timestamp"), timestamp);
237                    }
238                    log.insert(metadata_path!("vector", "ingest_timestamp"), now);
239                }
240                LogNamespace::Legacy => {
241                    if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
242                        log.insert(
243                            timestamp_key,
244                            log_timestamp.unwrap_or_else(|| Value::from(now)),
245                        );
246                    }
247                }
248            }
249
250            self.log_namespace.insert_source_metadata(
251                LogstashConfig::NAME,
252                log,
253                self.legacy_host_key_path
254                    .as_ref()
255                    .map(LegacyKey::InsertIfEmpty),
256                path!("host"),
257                host.ip().to_string(),
258            );
259        }
260    }
261
262    fn build_acker(&self, frames: &[Self::Item]) -> Self::Acker {
263        LogstashAcker::new(frames)
264    }
265}
266
267struct LogstashAcker {
268    sequence_number: u32,
269    protocol_version: Option<LogstashProtocolVersion>,
270}
271
272impl LogstashAcker {
273    fn new(frames: &[LogstashEventFrame]) -> Self {
274        let mut sequence_number = 0;
275        let mut protocol_version = None;
276
277        for frame in frames {
278            sequence_number = std::cmp::max(sequence_number, frame.sequence_number);
279            // We assume that it's valid to ack via any of the protocol versions that we've seen in
280            // a set of frames from a single stream, so here we just take the last. In reality, we
281            // do not expect stream with multiple protocol versions to occur.
282            protocol_version = Some(frame.protocol);
283        }
284
285        Self {
286            sequence_number,
287            protocol_version,
288        }
289    }
290}
291
292impl TcpSourceAcker for LogstashAcker {
293    // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type
294    fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
295        match (ack, self.protocol_version) {
296            (TcpSourceAck::Ack, Some(protocol_version)) => {
297                let mut bytes: Vec<u8> = Vec::with_capacity(6);
298                bytes.push(protocol_version.into());
299                bytes.push(LogstashFrameType::Ack.into());
300                bytes.extend(self.sequence_number.to_be_bytes().iter());
301                Some(Bytes::from(bytes))
302            }
303            _ => None,
304        }
305    }
306}
307
308#[derive(Debug)]
309enum LogstashDecoderReadState {
310    ReadProtocol,
311    ReadType(LogstashProtocolVersion),
312    ReadFrame(LogstashProtocolVersion, LogstashFrameType),
313    PendingFrames(VecDeque<(LogstashEventFrame, usize)>),
314}
315
316#[derive(Debug)]
317struct LogstashDecoder {
318    state: LogstashDecoderReadState,
319}
320
321impl LogstashDecoder {
322    const fn new() -> Self {
323        Self {
324            state: LogstashDecoderReadState::ReadProtocol,
325        }
326    }
327}
328
329#[derive(Debug, Snafu)]
330pub enum DecodeError {
331    #[snafu(display("i/o error: {}", source))]
332    IO { source: io::Error },
333    #[snafu(display("Unknown logstash protocol version: {}", version))]
334    UnknownProtocolVersion { version: char },
335    #[snafu(display("Unknown logstash protocol message type: {}", frame_type))]
336    UnknownFrameType { frame_type: char },
337    #[snafu(display("Failed to decode JSON frame: {}", source))]
338    JsonFrameFailedDecode { source: serde_json::Error },
339    #[snafu(display("Failed to decompress compressed frame: {}", source))]
340    DecompressionFailed { source: io::Error },
341}
342
343impl StreamDecodingError for DecodeError {
344    fn can_continue(&self) -> bool {
345        use DecodeError::*;
346
347        match self {
348            IO { .. } => false,
349            UnknownProtocolVersion { .. } => false,
350            UnknownFrameType { .. } => false,
351            JsonFrameFailedDecode { .. } => true,
352            DecompressionFailed { .. } => true,
353        }
354    }
355}
356
357impl From<io::Error> for DecodeError {
358    fn from(source: io::Error) -> Self {
359        DecodeError::IO { source }
360    }
361}
362
363#[derive(Debug, Clone, Copy)]
364enum LogstashProtocolVersion {
365    V1, // 1
366    V2, // 2
367}
368
369impl From<LogstashProtocolVersion> for u8 {
370    fn from(frame_type: LogstashProtocolVersion) -> u8 {
371        use LogstashProtocolVersion::*;
372
373        match frame_type {
374            V1 => b'1',
375            V2 => b'2',
376        }
377    }
378}
379
380impl TryFrom<u8> for LogstashProtocolVersion {
381    type Error = DecodeError;
382
383    fn try_from(frame_type: u8) -> Result<LogstashProtocolVersion, DecodeError> {
384        use LogstashProtocolVersion::*;
385
386        match frame_type {
387            b'1' => Ok(V1),
388            b'2' => Ok(V2),
389            version => Err(DecodeError::UnknownProtocolVersion {
390                version: version as char,
391            }),
392        }
393    }
394}
395
396#[derive(Debug, Clone, Copy)]
397enum LogstashFrameType {
398    Ack,        // A
399    WindowSize, // W
400    Data,       // D
401    Json,       // J
402    Compressed, // C
403}
404
405impl From<LogstashFrameType> for u8 {
406    fn from(frame_type: LogstashFrameType) -> u8 {
407        use LogstashFrameType::*;
408
409        match frame_type {
410            Ack => b'A',
411            WindowSize => b'W',
412            Data => b'D',
413            Json => b'J',
414            Compressed => b'C',
415        }
416    }
417}
418
419impl TryFrom<u8> for LogstashFrameType {
420    type Error = DecodeError;
421
422    fn try_from(frame_type: u8) -> Result<LogstashFrameType, DecodeError> {
423        use LogstashFrameType::*;
424
425        match frame_type {
426            b'A' => Ok(Ack),
427            b'W' => Ok(WindowSize),
428            b'D' => Ok(Data),
429            b'J' => Ok(Json),
430            b'C' => Ok(Compressed),
431            frame_type => Err(DecodeError::UnknownFrameType {
432                frame_type: frame_type as char,
433            }),
434        }
435    }
436}
437
438/// Normalized event from logstash frame
439#[derive(Debug)]
440struct LogstashEventFrame {
441    protocol: LogstashProtocolVersion,
442    sequence_number: u32,
443    fields: BTreeMap<KeyString, serde_json::Value>,
444}
445
446// Based on spec at: https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md
447// And implementation from logstash: https://github.com/logstash-plugins/logstash-input-beats/blob/27bad62a26a81fc000a9d21495b8dc7174ab63e9/src/main/java/org/logstash/beats/BeatsParser.java
448impl Decoder for LogstashDecoder {
449    type Item = (LogstashEventFrame, usize);
450    type Error = DecodeError;
451
452    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
453        // This implements a sort of simple state machine to read the frames from the wire
454        //
455        // Each matched arm with either:
456        // * Return that there is not enough data
457        // * Return an error
458        // * Read some bytes and advance the state
459        loop {
460            self.state = match self.state {
461                // if we have any unsent frames, send them before reading new logstash frame
462                LogstashDecoderReadState::PendingFrames(ref mut frames) => {
463                    match frames.pop_front() {
464                        Some(frame) => return Ok(Some(frame)),
465                        None => LogstashDecoderReadState::ReadProtocol,
466                    }
467                }
468                LogstashDecoderReadState::ReadProtocol => {
469                    if src.remaining() < 1 {
470                        return Ok(None);
471                    }
472
473                    use LogstashProtocolVersion::*;
474
475                    match LogstashProtocolVersion::try_from(src.get_u8())? {
476                        V1 => LogstashDecoderReadState::ReadType(V1),
477                        V2 => LogstashDecoderReadState::ReadType(V2),
478                    }
479                }
480                LogstashDecoderReadState::ReadType(protocol) => {
481                    if src.remaining() < 1 {
482                        return Ok(None);
483                    }
484
485                    use LogstashFrameType::*;
486
487                    match LogstashFrameType::try_from(src.get_u8())? {
488                        WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize),
489                        Data => LogstashDecoderReadState::ReadFrame(protocol, Data),
490                        Json => LogstashDecoderReadState::ReadFrame(protocol, Json),
491                        Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed),
492                        Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack),
493                    }
494                }
495                // The window size indicates how many events the writer will send before waiting
496                // for acks. As we forward events as we get them, and ack as they are received, we
497                // do not need to keep track of this.
498                //
499                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#window-size-frame-type
500                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => {
501                    if src.remaining() < 4 {
502                        return Ok(None);
503                    }
504
505                    let _window_size = src.get_u32();
506
507                    LogstashDecoderReadState::ReadProtocol
508                }
509                // we shouldn't receive acks from the writer, just skip
510                //
511                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#ack-frame-type
512                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Ack) => {
513                    if src.remaining() < 4 {
514                        return Ok(None);
515                    }
516
517                    let _sequence_number = src.get_u32();
518
519                    LogstashDecoderReadState::ReadProtocol
520                }
521                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#data-frame-type
522                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => {
523                    let Some(frame) = decode_data_frame(protocol, src) else {
524                        return Ok(None);
525                    };
526
527                    LogstashDecoderReadState::PendingFrames([frame].into())
528                }
529                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#json-frame-type
530                LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => {
531                    let Some(frame) = decode_json_frame(protocol, src)? else {
532                        return Ok(None);
533                    };
534
535                    LogstashDecoderReadState::PendingFrames([frame].into())
536                }
537                // https://github.com/logstash-plugins/logstash-input-beats/blob/master/PROTOCOL.md#compressed-frame-type
538                LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => {
539                    let Some(frames) = decode_compressed_frame(src)? else {
540                        return Ok(None);
541                    };
542
543                    LogstashDecoderReadState::PendingFrames(frames)
544                }
545            };
546        }
547    }
548}
549
550/// Decode the Lumberjack version 1 protocol, which use the Key:Value format.
551fn decode_data_frame(
552    protocol: LogstashProtocolVersion,
553    src: &mut BytesMut,
554) -> Option<(LogstashEventFrame, usize)> {
555    let mut rest = src.as_ref();
556
557    if rest.remaining() < 8 {
558        return None;
559    }
560    let sequence_number = rest.get_u32();
561    let pair_count = rest.get_u32();
562    if pair_count == 0 {
563        return None; // Invalid number of fields
564    }
565
566    let mut fields = BTreeMap::<KeyString, serde_json::Value>::new();
567    for _ in 0..pair_count {
568        let (key, value, right) = decode_pair(rest)?;
569        rest = right;
570
571        fields.insert(
572            String::from_utf8_lossy(key).into(),
573            String::from_utf8_lossy(value).into(),
574        );
575    }
576
577    let byte_size = bytes_remaining(src, rest);
578    src.advance(byte_size);
579
580    Some((
581        LogstashEventFrame {
582            protocol,
583            sequence_number,
584            fields,
585        },
586        byte_size,
587    ))
588}
589
590fn decode_pair(mut rest: &[u8]) -> Option<(&[u8], &[u8], &[u8])> {
591    if rest.remaining() < 4 {
592        return None;
593    }
594    let key_length = rest.get_u32() as usize;
595
596    if rest.remaining() < key_length {
597        return None;
598    }
599    let (key, right) = rest.split_at(key_length);
600    rest = right;
601
602    if rest.remaining() < 4 {
603        return None;
604    }
605    let value_length = rest.get_u32() as usize;
606    if rest.remaining() < value_length {
607        return None;
608    }
609    let (value, right) = rest.split_at(value_length);
610    Some((key, value, right))
611}
612
613fn decode_json_frame(
614    protocol: LogstashProtocolVersion,
615    src: &mut BytesMut,
616) -> Result<Option<(LogstashEventFrame, usize)>, DecodeError> {
617    let mut rest = src.as_ref();
618
619    if rest.remaining() < 8 {
620        return Ok(None);
621    }
622    let sequence_number = rest.get_u32();
623    let payload_size = rest.get_u32() as usize;
624
625    if rest.remaining() < payload_size {
626        return Ok(None);
627    }
628
629    let (slice, right) = rest.split_at(payload_size);
630    rest = right;
631
632    let fields: BTreeMap<KeyString, serde_json::Value> =
633        serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {})?;
634
635    let byte_size = bytes_remaining(src, rest);
636    src.advance(byte_size);
637
638    Ok(Some((
639        LogstashEventFrame {
640            protocol,
641            sequence_number,
642            fields,
643        },
644        byte_size,
645    )))
646}
647
648fn decode_compressed_frame(
649    src: &mut BytesMut,
650) -> Result<Option<VecDeque<(LogstashEventFrame, usize)>>, DecodeError> {
651    let mut rest = src.as_ref();
652
653    if rest.remaining() < 4 {
654        return Ok(None);
655    }
656    let payload_size = rest.get_u32() as usize;
657
658    if rest.remaining() < payload_size {
659        src.reserve(payload_size);
660        return Ok(None);
661    }
662
663    let (slice, right) = rest.split_at(payload_size);
664    rest = right;
665
666    let mut buf = Vec::new();
667
668    let res = ZlibDecoder::new(io::Cursor::new(slice))
669        .read_to_end(&mut buf)
670        .context(DecompressionFailedSnafu)
671        .map(|_| BytesMut::from(&buf[..]));
672
673    let byte_size = bytes_remaining(src, rest);
674    src.advance(byte_size);
675
676    let mut buf = res?;
677
678    let mut decoder = LogstashDecoder::new();
679
680    let mut frames = VecDeque::new();
681
682    while let Some(s) = decoder.decode(&mut buf)? {
683        frames.push_back(s);
684    }
685    Ok(Some(frames))
686}
687
688fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize {
689    let remaining = rest.remaining();
690    src.remaining() - remaining
691}
692
693impl From<LogstashEventFrame> for Event {
694    fn from(frame: LogstashEventFrame) -> Self {
695        Event::Log(LogEvent::from(
696            frame
697                .fields
698                .into_iter()
699                .map(|(key, value)| (key, Value::from(value)))
700                .collect::<BTreeMap<_, _>>(),
701        ))
702    }
703}
704
705impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
706    fn from(frame: LogstashEventFrame) -> Self {
707        smallvec![frame.into()]
708    }
709}
710
711#[cfg(test)]
712mod test {
713    use bytes::BufMut;
714    use futures::Stream;
715    use rand::{rng, Rng};
716    use tokio::io::{AsyncReadExt, AsyncWriteExt};
717    use vector_lib::lookup::OwnedTargetPath;
718    use vrl::value::kind::Collection;
719
720    use super::*;
721    use crate::{
722        event::EventStatus,
723        test_util::{
724            components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
725            next_addr, spawn_collect_n, wait_for_tcp,
726        },
727        SourceSender,
728    };
729
730    #[test]
731    fn generate_config() {
732        crate::test_util::test_generate_config::<LogstashConfig>();
733    }
734
735    #[tokio::test]
736    async fn test_delivered() {
737        test_protocol(EventStatus::Delivered, true).await;
738    }
739
740    #[tokio::test]
741    async fn test_failed() {
742        test_protocol(EventStatus::Rejected, false).await;
743    }
744
745    async fn start_logstash(
746        status: EventStatus,
747    ) -> (SocketAddr, impl Stream<Item = Event> + Unpin) {
748        let (sender, recv) = SourceSender::new_test_finalize(status);
749        let address = next_addr();
750        let source = LogstashConfig {
751            address: address.into(),
752            tls: None,
753            permit_origin: None,
754            keepalive: None,
755            receive_buffer_bytes: None,
756            acknowledgements: true.into(),
757            connection_limit: None,
758            log_namespace: None,
759        }
760        .build(SourceContext::new_test(sender, None))
761        .await
762        .unwrap();
763        tokio::spawn(source);
764        wait_for_tcp(address).await;
765        (address, recv)
766    }
767
768    async fn test_protocol(status: EventStatus, sends_ack: bool) {
769        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
770            let (address, recv) = start_logstash(status).await;
771            spawn_collect_n(
772                send_req(address, &[("message", "Hello, world!")], sends_ack),
773                recv,
774                1,
775            )
776            .await
777        })
778        .await;
779
780        assert_eq!(events.len(), 1);
781        let log = events[0].as_log();
782        assert_eq!(
783            log.get("message").unwrap().to_string_lossy(),
784            "Hello, world!".to_string()
785        );
786        assert_eq!(
787            log.get("source_type").unwrap().to_string_lossy(),
788            "logstash".to_string()
789        );
790        assert!(log.get("host").is_some());
791        assert!(log.get("timestamp").is_some());
792    }
793
794    fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
795        let mut req = BytesMut::new();
796        req.put_u8(b'2');
797        req.put_u8(b'D');
798        req.put_u32(seq);
799        req.put_u32(pairs.len() as u32);
800        for (key, value) in pairs {
801            req.put_u32(key.len() as u32);
802            req.put(key.as_bytes());
803            req.put_u32(value.len() as u32);
804            req.put(value.as_bytes());
805        }
806        req.into()
807    }
808
809    #[test]
810    fn v1_decoder_does_not_panic() {
811        let seq = rng().random_range(1..u32::MAX);
812        let req = encode_req(seq, &[("message", "Hello, World!")]);
813        for i in 0..req.len() - 1 {
814            assert!(
815                decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i]))
816                    .is_none()
817            );
818        }
819    }
820
821    async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) {
822        let seq = rng().random_range(1..u32::MAX);
823        let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
824
825        let req = encode_req(seq, pairs);
826        socket.write_all(&req).await.unwrap();
827
828        let mut output = BytesMut::new();
829        socket.read_buf(&mut output).await.unwrap();
830
831        if sends_ack {
832            assert_eq!(output.get_u8(), b'2');
833            assert_eq!(output.get_u8(), b'A');
834            assert_eq!(output.get_u32(), seq);
835        }
836        assert_eq!(output.len(), 0);
837    }
838
839    #[test]
840    fn output_schema_definition_vector_namespace() {
841        let config = LogstashConfig {
842            log_namespace: Some(true),
843            ..Default::default()
844        };
845
846        let definitions = config
847            .outputs(LogNamespace::Vector)
848            .remove(0)
849            .schema_definition(true);
850
851        let expected_definition =
852            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
853                .with_meaning(OwnedTargetPath::event_root(), "message")
854                .with_metadata_field(
855                    &owned_value_path!("vector", "source_type"),
856                    Kind::bytes(),
857                    None,
858                )
859                .with_metadata_field(
860                    &owned_value_path!("vector", "ingest_timestamp"),
861                    Kind::timestamp(),
862                    None,
863                )
864                .with_metadata_field(
865                    &owned_value_path!(LogstashConfig::NAME, "timestamp"),
866                    Kind::timestamp().or_undefined(),
867                    Some("timestamp"),
868                )
869                .with_metadata_field(
870                    &owned_value_path!(LogstashConfig::NAME, "host"),
871                    Kind::bytes(),
872                    Some("host"),
873                )
874                .with_metadata_field(
875                    &owned_value_path!(LogstashConfig::NAME, "tls_client_metadata"),
876                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
877                    None,
878                );
879
880        assert_eq!(definitions, Some(expected_definition))
881    }
882
883    #[test]
884    fn output_schema_definition_legacy_namespace() {
885        let config = LogstashConfig::default();
886
887        let definitions = config
888            .outputs(LogNamespace::Legacy)
889            .remove(0)
890            .schema_definition(true);
891
892        let expected_definition = Definition::new_with_default_metadata(
893            Kind::object(Collection::empty()),
894            [LogNamespace::Legacy],
895        )
896        .with_event_field(
897            &owned_value_path!("message"),
898            Kind::bytes(),
899            Some("message"),
900        )
901        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
902        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
903        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"));
904
905        assert_eq!(definitions, Some(expected_definition))
906    }
907}
908
909#[cfg(all(test, feature = "logstash-integration-tests"))]
910mod integration_tests {
911    use std::time::Duration;
912
913    use futures::Stream;
914    use tokio::time::timeout;
915
916    use super::*;
917    use crate::{
918        config::SourceContext,
919        event::EventStatus,
920        test_util::{
921            collect_n,
922            components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
923            wait_for_tcp,
924        },
925        tls::{TlsConfig, TlsEnableableConfig},
926        SourceSender,
927    };
928
929    fn heartbeat_address() -> String {
930        std::env::var("HEARTBEAT_ADDRESS")
931            .expect("Address of Beats Heartbeat service must be specified.")
932    }
933
934    #[tokio::test]
935    async fn beats_heartbeat() {
936        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
937            let out = source(heartbeat_address(), None).await;
938
939            timeout(Duration::from_secs(60), collect_n(out, 1))
940                .await
941                .unwrap()
942        })
943        .await;
944
945        assert!(!events.is_empty());
946
947        let log = events[0].as_log();
948        assert_eq!(
949            log.get("@metadata.beat"),
950            Some(String::from("heartbeat").into()).as_ref()
951        );
952        assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
953        assert!(log.get("timestamp").is_some());
954        assert!(log.get("host").is_some());
955    }
956
957    fn logstash_address() -> String {
958        std::env::var("LOGSTASH_ADDRESS")
959            .expect("Listen address for `logstash` source must be specified.")
960    }
961
962    #[tokio::test]
963    async fn logstash() {
964        let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
965            let out = source(
966                logstash_address(),
967                Some(TlsEnableableConfig {
968                    enabled: Some(true),
969                    options: TlsConfig {
970                        crt_file: Some("tests/data/host.docker.internal.crt".into()),
971                        key_file: Some("tests/data/host.docker.internal.key".into()),
972                        ..Default::default()
973                    },
974                }),
975            )
976            .await;
977
978            timeout(Duration::from_secs(60), collect_n(out, 1))
979                .await
980                .unwrap()
981        })
982        .await;
983
984        assert!(!events.is_empty());
985
986        let log = events[0].as_log();
987        assert!(log
988            .get("line")
989            .unwrap()
990            .to_string_lossy()
991            .contains("Hello World"));
992        assert!(log.get("host").is_some());
993    }
994
995    async fn source(
996        address: String,
997        tls: Option<TlsEnableableConfig>,
998    ) -> impl Stream<Item = Event> + Unpin {
999        let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1000        let address: SocketAddr = address.parse().unwrap();
1001        let tls_config = TlsSourceConfig {
1002            client_metadata_key: None,
1003            tls_config: tls.unwrap_or_default(),
1004        };
1005        tokio::spawn(async move {
1006            LogstashConfig {
1007                address: address.into(),
1008                tls: Some(tls_config),
1009                keepalive: None,
1010                permit_origin: None,
1011                receive_buffer_bytes: None,
1012                acknowledgements: false.into(),
1013                connection_limit: None,
1014                log_namespace: None,
1015            }
1016            .build(SourceContext::new_test(sender, None))
1017            .await
1018            .unwrap()
1019            .await
1020            .unwrap()
1021        });
1022        wait_for_tcp(address).await;
1023        recv
1024    }
1025}