1use std::{
2 collections::{BTreeMap, VecDeque},
3 convert::TryFrom,
4 io::{self, Read},
5 net::SocketAddr,
6 time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use flate2::read::ZlibDecoder;
11use smallvec::{SmallVec, smallvec};
12use snafu::{ResultExt, Snafu};
13use tokio_util::codec::Decoder;
14use vector_lib::{
15 codecs::{BytesDeserializerConfig, StreamDecodingError},
16 config::{LegacyKey, LogNamespace},
17 configurable::configurable_component,
18 ipallowlist::IpAllowlistConfig,
19 lookup::{OwnedValuePath, event_path, metadata_path, owned_value_path, path},
20 schema::Definition,
21};
22use vrl::value::{KeyString, Kind, kind::Collection};
23
24use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
25use crate::{
26 config::{
27 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
28 SourceContext, SourceOutput, log_schema,
29 },
30 event::{Event, LogEvent, Value},
31 serde::bool_or_struct,
32 tcp::TcpKeepaliveConfig,
33 tls::{MaybeTlsSettings, TlsSourceConfig},
34 types,
35};
36
37#[configurable_component(source("logstash", "Collect logs from a Logstash agent."))]
39#[derive(Clone, Debug)]
40pub struct LogstashConfig {
41 #[configurable(derived)]
42 address: SocketListenAddr,
43
44 #[configurable(derived)]
45 #[configurable(metadata(docs::advanced))]
46 keepalive: Option<TcpKeepaliveConfig>,
47
48 #[configurable(derived)]
49 pub permit_origin: Option<IpAllowlistConfig>,
50
51 #[configurable(derived)]
52 tls: Option<TlsSourceConfig>,
53
54 #[configurable(metadata(docs::type_unit = "bytes"))]
56 #[configurable(metadata(docs::examples = 65536))]
57 #[configurable(metadata(docs::advanced))]
58 receive_buffer_bytes: Option<usize>,
59
60 #[configurable(metadata(docs::type_unit = "connections"))]
62 #[configurable(metadata(docs::advanced))]
63 connection_limit: Option<u32>,
64
65 #[configurable(derived)]
66 #[serde(default, deserialize_with = "bool_or_struct")]
67 acknowledgements: SourceAcknowledgementsConfig,
68
69 #[configurable(metadata(docs::hidden))]
71 #[serde(default)]
72 log_namespace: Option<bool>,
73}
74
75impl LogstashConfig {
76 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
78 let host_key = log_schema()
80 .host_key()
81 .cloned()
82 .map(LegacyKey::InsertIfEmpty);
83
84 let tls_client_metadata_path = self
85 .tls
86 .as_ref()
87 .and_then(|tls| tls.client_metadata_key.as_ref())
88 .and_then(|k| k.path.clone())
89 .map(LegacyKey::Overwrite);
90
91 BytesDeserializerConfig
92 .schema_definition(log_namespace)
93 .with_standard_vector_source_metadata()
94 .with_source_metadata(
95 LogstashConfig::NAME,
96 None,
97 &owned_value_path!("timestamp"),
98 Kind::timestamp().or_undefined(),
99 Some("timestamp"),
100 )
101 .with_source_metadata(
102 LogstashConfig::NAME,
103 host_key,
104 &owned_value_path!("host"),
105 Kind::bytes(),
106 Some("host"),
107 )
108 .with_source_metadata(
109 Self::NAME,
110 tls_client_metadata_path,
111 &owned_value_path!("tls_client_metadata"),
112 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
113 None,
114 )
115 }
116}
117
118impl Default for LogstashConfig {
119 fn default() -> Self {
120 Self {
121 address: SocketListenAddr::SocketAddr("0.0.0.0:5044".parse().unwrap()),
122 keepalive: None,
123 permit_origin: None,
124 tls: None,
125 receive_buffer_bytes: None,
126 acknowledgements: Default::default(),
127 connection_limit: None,
128 log_namespace: None,
129 }
130 }
131}
132
133impl GenerateConfig for LogstashConfig {
134 fn generate_config() -> toml::Value {
135 toml::Value::try_from(LogstashConfig::default()).unwrap()
136 }
137}
138
139#[async_trait::async_trait]
140#[typetag::serde(name = "logstash")]
141impl SourceConfig for LogstashConfig {
142 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
143 let log_namespace = cx.log_namespace(self.log_namespace);
144 let source = LogstashSource {
145 timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()),
146 legacy_host_key_path: log_schema().host_key().cloned(),
147 log_namespace,
148 };
149 let shutdown_secs = Duration::from_secs(30);
150 let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
151 let tls_client_metadata_key = self
152 .tls
153 .as_ref()
154 .and_then(|tls| tls.client_metadata_key.clone())
155 .and_then(|k| k.path);
156
157 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
158 source.run(
159 self.address,
160 self.keepalive,
161 shutdown_secs,
162 tls,
163 tls_client_metadata_key,
164 self.receive_buffer_bytes,
165 None,
166 cx,
167 self.acknowledgements,
168 self.connection_limit,
169 self.permit_origin.clone().map(Into::into),
170 LogstashConfig::NAME,
171 log_namespace,
172 )
173 }
174
175 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
176 vec![SourceOutput::new_maybe_logs(
179 DataType::Log,
180 self.schema_definition(global_log_namespace.merge(self.log_namespace)),
181 )]
182 }
183
184 fn resources(&self) -> Vec<Resource> {
185 vec![self.address.as_tcp_resource()]
186 }
187
188 fn can_acknowledge(&self) -> bool {
189 true
190 }
191}
192
193#[derive(Debug, Clone)]
194struct LogstashSource {
195 timestamp_converter: types::Conversion,
196 log_namespace: LogNamespace,
197 legacy_host_key_path: Option<OwnedValuePath>,
198}
199
200impl TcpSource for LogstashSource {
201 type Error = DecodeError;
202 type Item = LogstashEventFrame;
203 type Decoder = LogstashDecoder;
204 type Acker = LogstashAcker;
205
206 fn decoder(&self) -> Self::Decoder {
207 LogstashDecoder::new()
208 }
209
210 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
211 let now = chrono::Utc::now();
212 for event in events {
213 let log = event.as_mut_log();
214
215 self.log_namespace.insert_vector_metadata(
216 log,
217 log_schema().source_type_key(),
218 path!("source_type"),
219 Bytes::from_static(LogstashConfig::NAME.as_bytes()),
220 );
221
222 let log_timestamp = log.get(event_path!("@timestamp")).and_then(|timestamp| {
223 self.timestamp_converter
224 .convert::<Value>(timestamp.coerce_to_bytes())
225 .ok()
226 });
227
228 match self.log_namespace {
233 LogNamespace::Vector => {
234 if let Some(timestamp) = log_timestamp {
235 log.insert(metadata_path!(LogstashConfig::NAME, "timestamp"), timestamp);
236 }
237 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
238 }
239 LogNamespace::Legacy => {
240 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
241 log.insert(
242 timestamp_key,
243 log_timestamp.unwrap_or_else(|| Value::from(now)),
244 );
245 }
246 }
247 }
248
249 self.log_namespace.insert_source_metadata(
250 LogstashConfig::NAME,
251 log,
252 self.legacy_host_key_path
253 .as_ref()
254 .map(LegacyKey::InsertIfEmpty),
255 path!("host"),
256 host.ip().to_string(),
257 );
258 }
259 }
260
261 fn build_acker(&self, frames: &[Self::Item]) -> Self::Acker {
262 LogstashAcker::new(frames)
263 }
264}
265
266struct LogstashAcker {
267 sequence_number: u32,
268 protocol_version: Option<LogstashProtocolVersion>,
269}
270
271impl LogstashAcker {
272 fn new(frames: &[LogstashEventFrame]) -> Self {
273 let mut sequence_number = 0;
274 let mut protocol_version = None;
275
276 for frame in frames {
277 sequence_number = std::cmp::max(sequence_number, frame.sequence_number);
278 protocol_version = Some(frame.protocol);
282 }
283
284 Self {
285 sequence_number,
286 protocol_version,
287 }
288 }
289}
290
291impl TcpSourceAcker for LogstashAcker {
292 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
294 match (ack, self.protocol_version) {
295 (TcpSourceAck::Ack, Some(protocol_version)) => {
296 let mut bytes: Vec<u8> = Vec::with_capacity(6);
297 bytes.push(protocol_version.into());
298 bytes.push(LogstashFrameType::Ack.into());
299 bytes.extend(self.sequence_number.to_be_bytes().iter());
300 Some(Bytes::from(bytes))
301 }
302 _ => None,
303 }
304 }
305}
306
307#[derive(Debug)]
308enum LogstashDecoderReadState {
309 ReadProtocol,
310 ReadType(LogstashProtocolVersion),
311 ReadFrame(LogstashProtocolVersion, LogstashFrameType),
312 PendingFrames(VecDeque<(LogstashEventFrame, usize)>),
313}
314
315#[derive(Debug)]
316struct LogstashDecoder {
317 state: LogstashDecoderReadState,
318}
319
320impl LogstashDecoder {
321 const fn new() -> Self {
322 Self {
323 state: LogstashDecoderReadState::ReadProtocol,
324 }
325 }
326}
327
328#[derive(Debug, Snafu)]
329pub enum DecodeError {
330 #[snafu(display("i/o error: {}", source))]
331 IO { source: io::Error },
332 #[snafu(display("Unknown logstash protocol version: {}", version))]
333 UnknownProtocolVersion { version: char },
334 #[snafu(display("Unknown logstash protocol message type: {}", frame_type))]
335 UnknownFrameType { frame_type: char },
336 #[snafu(display("Failed to decode JSON frame: {}", source))]
337 JsonFrameFailedDecode { source: serde_json::Error },
338 #[snafu(display("Failed to decompress compressed frame: {}", source))]
339 DecompressionFailed { source: io::Error },
340}
341
342impl StreamDecodingError for DecodeError {
343 fn can_continue(&self) -> bool {
344 use DecodeError::*;
345
346 match self {
347 IO { .. } => false,
348 UnknownProtocolVersion { .. } => false,
349 UnknownFrameType { .. } => false,
350 JsonFrameFailedDecode { .. } => true,
351 DecompressionFailed { .. } => true,
352 }
353 }
354}
355
356impl From<io::Error> for DecodeError {
357 fn from(source: io::Error) -> Self {
358 DecodeError::IO { source }
359 }
360}
361
362#[derive(Debug, Clone, Copy)]
363enum LogstashProtocolVersion {
364 V1, V2, }
367
368impl From<LogstashProtocolVersion> for u8 {
369 fn from(frame_type: LogstashProtocolVersion) -> u8 {
370 use LogstashProtocolVersion::*;
371
372 match frame_type {
373 V1 => b'1',
374 V2 => b'2',
375 }
376 }
377}
378
379impl TryFrom<u8> for LogstashProtocolVersion {
380 type Error = DecodeError;
381
382 fn try_from(frame_type: u8) -> Result<LogstashProtocolVersion, DecodeError> {
383 use LogstashProtocolVersion::*;
384
385 match frame_type {
386 b'1' => Ok(V1),
387 b'2' => Ok(V2),
388 version => Err(DecodeError::UnknownProtocolVersion {
389 version: version as char,
390 }),
391 }
392 }
393}
394
395#[derive(Debug, Clone, Copy)]
396enum LogstashFrameType {
397 Ack, WindowSize, Data, Json, Compressed, }
403
404impl From<LogstashFrameType> for u8 {
405 fn from(frame_type: LogstashFrameType) -> u8 {
406 use LogstashFrameType::*;
407
408 match frame_type {
409 Ack => b'A',
410 WindowSize => b'W',
411 Data => b'D',
412 Json => b'J',
413 Compressed => b'C',
414 }
415 }
416}
417
418impl TryFrom<u8> for LogstashFrameType {
419 type Error = DecodeError;
420
421 fn try_from(frame_type: u8) -> Result<LogstashFrameType, DecodeError> {
422 use LogstashFrameType::*;
423
424 match frame_type {
425 b'A' => Ok(Ack),
426 b'W' => Ok(WindowSize),
427 b'D' => Ok(Data),
428 b'J' => Ok(Json),
429 b'C' => Ok(Compressed),
430 frame_type => Err(DecodeError::UnknownFrameType {
431 frame_type: frame_type as char,
432 }),
433 }
434 }
435}
436
437#[derive(Debug)]
439struct LogstashEventFrame {
440 protocol: LogstashProtocolVersion,
441 sequence_number: u32,
442 fields: BTreeMap<KeyString, serde_json::Value>,
443}
444
445impl Decoder for LogstashDecoder {
448 type Item = (LogstashEventFrame, usize);
449 type Error = DecodeError;
450
451 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
452 loop {
459 self.state = match self.state {
460 LogstashDecoderReadState::PendingFrames(ref mut frames) => {
462 match frames.pop_front() {
463 Some(frame) => return Ok(Some(frame)),
464 None => LogstashDecoderReadState::ReadProtocol,
465 }
466 }
467 LogstashDecoderReadState::ReadProtocol => {
468 if src.remaining() < 1 {
469 return Ok(None);
470 }
471
472 use LogstashProtocolVersion::*;
473
474 match LogstashProtocolVersion::try_from(src.get_u8())? {
475 V1 => LogstashDecoderReadState::ReadType(V1),
476 V2 => LogstashDecoderReadState::ReadType(V2),
477 }
478 }
479 LogstashDecoderReadState::ReadType(protocol) => {
480 if src.remaining() < 1 {
481 return Ok(None);
482 }
483
484 use LogstashFrameType::*;
485
486 match LogstashFrameType::try_from(src.get_u8())? {
487 WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize),
488 Data => LogstashDecoderReadState::ReadFrame(protocol, Data),
489 Json => LogstashDecoderReadState::ReadFrame(protocol, Json),
490 Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed),
491 Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack),
492 }
493 }
494 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => {
500 if src.remaining() < 4 {
501 return Ok(None);
502 }
503
504 let _window_size = src.get_u32();
505
506 LogstashDecoderReadState::ReadProtocol
507 }
508 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Ack) => {
512 if src.remaining() < 4 {
513 return Ok(None);
514 }
515
516 let _sequence_number = src.get_u32();
517
518 LogstashDecoderReadState::ReadProtocol
519 }
520 LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => {
522 let Some(frame) = decode_data_frame(protocol, src) else {
523 return Ok(None);
524 };
525
526 LogstashDecoderReadState::PendingFrames([frame].into())
527 }
528 LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => {
530 let Some(frame) = decode_json_frame(protocol, src)? else {
531 return Ok(None);
532 };
533
534 LogstashDecoderReadState::PendingFrames([frame].into())
535 }
536 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => {
538 let Some(frames) = decode_compressed_frame(src)? else {
539 return Ok(None);
540 };
541
542 LogstashDecoderReadState::PendingFrames(frames)
543 }
544 };
545 }
546 }
547}
548
549fn decode_data_frame(
551 protocol: LogstashProtocolVersion,
552 src: &mut BytesMut,
553) -> Option<(LogstashEventFrame, usize)> {
554 let mut rest = src.as_ref();
555
556 if rest.remaining() < 8 {
557 return None;
558 }
559 let sequence_number = rest.get_u32();
560 let pair_count = rest.get_u32();
561 if pair_count == 0 {
562 return None; }
564
565 let mut fields = BTreeMap::<KeyString, serde_json::Value>::new();
566 for _ in 0..pair_count {
567 let (key, value, right) = decode_pair(rest)?;
568 rest = right;
569
570 fields.insert(
571 String::from_utf8_lossy(key).into(),
572 String::from_utf8_lossy(value).into(),
573 );
574 }
575
576 let byte_size = bytes_remaining(src, rest);
577 src.advance(byte_size);
578
579 Some((
580 LogstashEventFrame {
581 protocol,
582 sequence_number,
583 fields,
584 },
585 byte_size,
586 ))
587}
588
589fn decode_pair(mut rest: &[u8]) -> Option<(&[u8], &[u8], &[u8])> {
590 if rest.remaining() < 4 {
591 return None;
592 }
593 let key_length = rest.get_u32() as usize;
594
595 if rest.remaining() < key_length {
596 return None;
597 }
598 let (key, right) = rest.split_at(key_length);
599 rest = right;
600
601 if rest.remaining() < 4 {
602 return None;
603 }
604 let value_length = rest.get_u32() as usize;
605 if rest.remaining() < value_length {
606 return None;
607 }
608 let (value, right) = rest.split_at(value_length);
609 Some((key, value, right))
610}
611
612fn decode_json_frame(
613 protocol: LogstashProtocolVersion,
614 src: &mut BytesMut,
615) -> Result<Option<(LogstashEventFrame, usize)>, DecodeError> {
616 let mut rest = src.as_ref();
617
618 if rest.remaining() < 8 {
619 return Ok(None);
620 }
621 let sequence_number = rest.get_u32();
622 let payload_size = rest.get_u32() as usize;
623
624 if rest.remaining() < payload_size {
625 return Ok(None);
626 }
627
628 let (slice, right) = rest.split_at(payload_size);
629 rest = right;
630
631 let fields: BTreeMap<KeyString, serde_json::Value> =
632 serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {})?;
633
634 let byte_size = bytes_remaining(src, rest);
635 src.advance(byte_size);
636
637 Ok(Some((
638 LogstashEventFrame {
639 protocol,
640 sequence_number,
641 fields,
642 },
643 byte_size,
644 )))
645}
646
647fn decode_compressed_frame(
648 src: &mut BytesMut,
649) -> Result<Option<VecDeque<(LogstashEventFrame, usize)>>, DecodeError> {
650 let mut rest = src.as_ref();
651
652 if rest.remaining() < 4 {
653 return Ok(None);
654 }
655 let payload_size = rest.get_u32() as usize;
656
657 if rest.remaining() < payload_size {
658 src.reserve(payload_size);
659 return Ok(None);
660 }
661
662 let (slice, right) = rest.split_at(payload_size);
663 rest = right;
664
665 let mut buf = Vec::new();
666
667 let res = ZlibDecoder::new(io::Cursor::new(slice))
668 .read_to_end(&mut buf)
669 .context(DecompressionFailedSnafu)
670 .map(|_| BytesMut::from(&buf[..]));
671
672 let byte_size = bytes_remaining(src, rest);
673 src.advance(byte_size);
674
675 let mut buf = res?;
676
677 let mut decoder = LogstashDecoder::new();
678
679 let mut frames = VecDeque::new();
680
681 while let Some(s) = decoder.decode(&mut buf)? {
682 frames.push_back(s);
683 }
684 Ok(Some(frames))
685}
686
687fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize {
688 let remaining = rest.remaining();
689 src.remaining() - remaining
690}
691
692impl From<LogstashEventFrame> for Event {
693 fn from(frame: LogstashEventFrame) -> Self {
694 Event::Log(LogEvent::from(
695 frame
696 .fields
697 .into_iter()
698 .map(|(key, value)| (key, Value::from(value)))
699 .collect::<BTreeMap<_, _>>(),
700 ))
701 }
702}
703
704impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
705 fn from(frame: LogstashEventFrame) -> Self {
706 smallvec![frame.into()]
707 }
708}
709
710#[cfg(test)]
711mod test {
712 use bytes::BufMut;
713 use futures::Stream;
714 use rand::{Rng, rng};
715 use tokio::io::{AsyncReadExt, AsyncWriteExt};
716 use vector_lib::lookup::OwnedTargetPath;
717 use vrl::value::kind::Collection;
718
719 use super::*;
720 use crate::{
721 SourceSender,
722 event::EventStatus,
723 test_util::{
724 addr::next_addr,
725 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
726 spawn_collect_n, wait_for_tcp,
727 },
728 };
729
730 #[test]
731 fn generate_config() {
732 crate::test_util::test_generate_config::<LogstashConfig>();
733 }
734
735 #[tokio::test]
736 async fn test_delivered() {
737 test_protocol(EventStatus::Delivered, true).await;
738 }
739
740 #[tokio::test]
741 async fn test_failed() {
742 test_protocol(EventStatus::Rejected, false).await;
743 }
744
745 async fn start_logstash(
746 status: EventStatus,
747 ) -> (SocketAddr, impl Stream<Item = Event> + Unpin) {
748 let (sender, recv) = SourceSender::new_test_finalize(status);
749 let (_guard, address) = next_addr();
750 let source = LogstashConfig {
751 address: address.into(),
752 tls: None,
753 permit_origin: None,
754 keepalive: None,
755 receive_buffer_bytes: None,
756 acknowledgements: true.into(),
757 connection_limit: None,
758 log_namespace: None,
759 }
760 .build(SourceContext::new_test(sender, None))
761 .await
762 .unwrap();
763 tokio::spawn(source);
764 wait_for_tcp(address).await;
765 (address, recv)
766 }
767
768 async fn test_protocol(status: EventStatus, sends_ack: bool) {
769 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
770 let (address, recv) = start_logstash(status).await;
771 spawn_collect_n(
772 send_req(address, &[("message", "Hello, world!")], sends_ack),
773 recv,
774 1,
775 )
776 .await
777 })
778 .await;
779
780 assert_eq!(events.len(), 1);
781 let log = events[0].as_log();
782 assert_eq!(
783 log.get("message").unwrap().to_string_lossy(),
784 "Hello, world!".to_string()
785 );
786 assert_eq!(
787 log.get("source_type").unwrap().to_string_lossy(),
788 "logstash".to_string()
789 );
790 assert!(log.get("host").is_some());
791 assert!(log.get("timestamp").is_some());
792 }
793
794 fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
795 let mut req = BytesMut::new();
796 req.put_u8(b'2');
797 req.put_u8(b'D');
798 req.put_u32(seq);
799 req.put_u32(pairs.len() as u32);
800 for (key, value) in pairs {
801 req.put_u32(key.len() as u32);
802 req.put(key.as_bytes());
803 req.put_u32(value.len() as u32);
804 req.put(value.as_bytes());
805 }
806 req.into()
807 }
808
809 #[test]
810 fn v1_decoder_does_not_panic() {
811 let seq = rng().random_range(1..u32::MAX);
812 let req = encode_req(seq, &[("message", "Hello, World!")]);
813 for i in 0..req.len() - 1 {
814 assert!(
815 decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i]))
816 .is_none()
817 );
818 }
819 }
820
821 async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) {
822 let seq = rng().random_range(1..u32::MAX);
823 let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
824
825 let req = encode_req(seq, pairs);
826 socket.write_all(&req).await.unwrap();
827
828 let mut output = BytesMut::new();
829 socket.read_buf(&mut output).await.unwrap();
830
831 if sends_ack {
832 assert_eq!(output.get_u8(), b'2');
833 assert_eq!(output.get_u8(), b'A');
834 assert_eq!(output.get_u32(), seq);
835 }
836 assert_eq!(output.len(), 0);
837 }
838
839 #[test]
840 fn output_schema_definition_vector_namespace() {
841 let config = LogstashConfig {
842 log_namespace: Some(true),
843 ..Default::default()
844 };
845
846 let definitions = config
847 .outputs(LogNamespace::Vector)
848 .remove(0)
849 .schema_definition(true);
850
851 let expected_definition =
852 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
853 .with_meaning(OwnedTargetPath::event_root(), "message")
854 .with_metadata_field(
855 &owned_value_path!("vector", "source_type"),
856 Kind::bytes(),
857 None,
858 )
859 .with_metadata_field(
860 &owned_value_path!("vector", "ingest_timestamp"),
861 Kind::timestamp(),
862 None,
863 )
864 .with_metadata_field(
865 &owned_value_path!(LogstashConfig::NAME, "timestamp"),
866 Kind::timestamp().or_undefined(),
867 Some("timestamp"),
868 )
869 .with_metadata_field(
870 &owned_value_path!(LogstashConfig::NAME, "host"),
871 Kind::bytes(),
872 Some("host"),
873 )
874 .with_metadata_field(
875 &owned_value_path!(LogstashConfig::NAME, "tls_client_metadata"),
876 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
877 None,
878 );
879
880 assert_eq!(definitions, Some(expected_definition))
881 }
882
883 #[test]
884 fn output_schema_definition_legacy_namespace() {
885 let config = LogstashConfig::default();
886
887 let definitions = config
888 .outputs(LogNamespace::Legacy)
889 .remove(0)
890 .schema_definition(true);
891
892 let expected_definition = Definition::new_with_default_metadata(
893 Kind::object(Collection::empty()),
894 [LogNamespace::Legacy],
895 )
896 .with_event_field(
897 &owned_value_path!("message"),
898 Kind::bytes(),
899 Some("message"),
900 )
901 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
902 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
903 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"));
904
905 assert_eq!(definitions, Some(expected_definition))
906 }
907}
908
909#[cfg(all(test, feature = "logstash-integration-tests"))]
910mod integration_tests {
911 use std::time::Duration;
912
913 use futures::Stream;
914 use tokio::time::timeout;
915
916 use super::*;
917 use crate::{
918 SourceSender,
919 config::SourceContext,
920 event::EventStatus,
921 test_util::{
922 collect_n,
923 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
924 wait_for_tcp,
925 },
926 tls::{TlsConfig, TlsEnableableConfig},
927 };
928
929 fn heartbeat_address() -> String {
930 std::env::var("HEARTBEAT_ADDRESS")
931 .expect("Address of Beats Heartbeat service must be specified.")
932 }
933
934 #[tokio::test]
935 async fn beats_heartbeat() {
936 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
937 let out = source(heartbeat_address(), None).await;
938
939 timeout(Duration::from_secs(60), collect_n(out, 1))
940 .await
941 .unwrap()
942 })
943 .await;
944
945 assert!(!events.is_empty());
946
947 let log = events[0].as_log();
948 assert_eq!(
949 log.get("@metadata.beat"),
950 Some(String::from("heartbeat").into()).as_ref()
951 );
952 assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
953 assert!(log.get("timestamp").is_some());
954 assert!(log.get("host").is_some());
955 }
956
957 fn logstash_address() -> String {
958 std::env::var("LOGSTASH_ADDRESS")
959 .expect("Listen address for `logstash` source must be specified.")
960 }
961
962 #[tokio::test]
963 async fn logstash() {
964 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
965 let out = source(
966 logstash_address(),
967 Some(TlsEnableableConfig {
968 enabled: Some(true),
969 options: TlsConfig {
970 crt_file: Some(
971 "tests/integration/shared/data/host.docker.internal.crt".into(),
972 ),
973 key_file: Some(
974 "tests/integration/shared/data/host.docker.internal.key".into(),
975 ),
976 ..Default::default()
977 },
978 }),
979 )
980 .await;
981
982 timeout(Duration::from_secs(60), collect_n(out, 1))
983 .await
984 .unwrap()
985 })
986 .await;
987
988 assert!(!events.is_empty());
989
990 let log = events[0].as_log();
991 assert!(
992 log.get("line")
993 .unwrap()
994 .to_string_lossy()
995 .contains("Hello World")
996 );
997 assert!(log.get("host").is_some());
998 }
999
1000 async fn source(
1001 address: String,
1002 tls: Option<TlsEnableableConfig>,
1003 ) -> impl Stream<Item = Event> + Unpin {
1004 let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1005 let address: SocketAddr = address.parse().unwrap();
1006 let tls_config = TlsSourceConfig {
1007 client_metadata_key: None,
1008 tls_config: tls.unwrap_or_default(),
1009 };
1010 tokio::spawn(async move {
1011 LogstashConfig {
1012 address: address.into(),
1013 tls: Some(tls_config),
1014 keepalive: None,
1015 permit_origin: None,
1016 receive_buffer_bytes: None,
1017 acknowledgements: false.into(),
1018 connection_limit: None,
1019 log_namespace: None,
1020 }
1021 .build(SourceContext::new_test(sender, None))
1022 .await
1023 .unwrap()
1024 .await
1025 .unwrap()
1026 });
1027 wait_for_tcp(address).await;
1028 recv
1029 }
1030}