1use std::net::SocketAddr;
2use std::time::Duration;
3use std::{
4 collections::{BTreeMap, VecDeque},
5 convert::TryFrom,
6 io::{self, Read},
7};
8use vector_lib::ipallowlist::IpAllowlistConfig;
9
10use bytes::{Buf, Bytes, BytesMut};
11use flate2::read::ZlibDecoder;
12use smallvec::{smallvec, SmallVec};
13use snafu::{ResultExt, Snafu};
14use tokio_util::codec::Decoder;
15use vector_lib::codecs::{BytesDeserializerConfig, StreamDecodingError};
16use vector_lib::configurable::configurable_component;
17use vector_lib::lookup::{event_path, metadata_path, owned_value_path, path, OwnedValuePath};
18use vector_lib::{
19 config::{LegacyKey, LogNamespace},
20 schema::Definition,
21};
22use vrl::value::kind::Collection;
23use vrl::value::{KeyString, Kind};
24
25use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
26use crate::{
27 config::{
28 log_schema, DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
29 SourceContext, SourceOutput,
30 },
31 event::{Event, LogEvent, Value},
32 serde::bool_or_struct,
33 tcp::TcpKeepaliveConfig,
34 tls::{MaybeTlsSettings, TlsSourceConfig},
35 types,
36};
37
38#[configurable_component(source("logstash", "Collect logs from a Logstash agent."))]
40#[derive(Clone, Debug)]
41pub struct LogstashConfig {
42 #[configurable(derived)]
43 address: SocketListenAddr,
44
45 #[configurable(derived)]
46 #[configurable(metadata(docs::advanced))]
47 keepalive: Option<TcpKeepaliveConfig>,
48
49 #[configurable(derived)]
50 pub permit_origin: Option<IpAllowlistConfig>,
51
52 #[configurable(derived)]
53 tls: Option<TlsSourceConfig>,
54
55 #[configurable(metadata(docs::type_unit = "bytes"))]
57 #[configurable(metadata(docs::examples = 65536))]
58 #[configurable(metadata(docs::advanced))]
59 receive_buffer_bytes: Option<usize>,
60
61 #[configurable(metadata(docs::type_unit = "connections"))]
63 #[configurable(metadata(docs::advanced))]
64 connection_limit: Option<u32>,
65
66 #[configurable(derived)]
67 #[serde(default, deserialize_with = "bool_or_struct")]
68 acknowledgements: SourceAcknowledgementsConfig,
69
70 #[configurable(metadata(docs::hidden))]
72 #[serde(default)]
73 log_namespace: Option<bool>,
74}
75
76impl LogstashConfig {
77 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
79 let host_key = log_schema()
81 .host_key()
82 .cloned()
83 .map(LegacyKey::InsertIfEmpty);
84
85 let tls_client_metadata_path = self
86 .tls
87 .as_ref()
88 .and_then(|tls| tls.client_metadata_key.as_ref())
89 .and_then(|k| k.path.clone())
90 .map(LegacyKey::Overwrite);
91
92 BytesDeserializerConfig
93 .schema_definition(log_namespace)
94 .with_standard_vector_source_metadata()
95 .with_source_metadata(
96 LogstashConfig::NAME,
97 None,
98 &owned_value_path!("timestamp"),
99 Kind::timestamp().or_undefined(),
100 Some("timestamp"),
101 )
102 .with_source_metadata(
103 LogstashConfig::NAME,
104 host_key,
105 &owned_value_path!("host"),
106 Kind::bytes(),
107 Some("host"),
108 )
109 .with_source_metadata(
110 Self::NAME,
111 tls_client_metadata_path,
112 &owned_value_path!("tls_client_metadata"),
113 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
114 None,
115 )
116 }
117}
118
119impl Default for LogstashConfig {
120 fn default() -> Self {
121 Self {
122 address: SocketListenAddr::SocketAddr("0.0.0.0:5044".parse().unwrap()),
123 keepalive: None,
124 permit_origin: None,
125 tls: None,
126 receive_buffer_bytes: None,
127 acknowledgements: Default::default(),
128 connection_limit: None,
129 log_namespace: None,
130 }
131 }
132}
133
134impl GenerateConfig for LogstashConfig {
135 fn generate_config() -> toml::Value {
136 toml::Value::try_from(LogstashConfig::default()).unwrap()
137 }
138}
139
140#[async_trait::async_trait]
141#[typetag::serde(name = "logstash")]
142impl SourceConfig for LogstashConfig {
143 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
144 let log_namespace = cx.log_namespace(self.log_namespace);
145 let source = LogstashSource {
146 timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()),
147 legacy_host_key_path: log_schema().host_key().cloned(),
148 log_namespace,
149 };
150 let shutdown_secs = Duration::from_secs(30);
151 let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
152 let tls_client_metadata_key = self
153 .tls
154 .as_ref()
155 .and_then(|tls| tls.client_metadata_key.clone())
156 .and_then(|k| k.path);
157
158 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
159 source.run(
160 self.address,
161 self.keepalive,
162 shutdown_secs,
163 tls,
164 tls_client_metadata_key,
165 self.receive_buffer_bytes,
166 None,
167 cx,
168 self.acknowledgements,
169 self.connection_limit,
170 self.permit_origin.clone().map(Into::into),
171 LogstashConfig::NAME,
172 log_namespace,
173 )
174 }
175
176 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
177 vec![SourceOutput::new_maybe_logs(
180 DataType::Log,
181 self.schema_definition(global_log_namespace.merge(self.log_namespace)),
182 )]
183 }
184
185 fn resources(&self) -> Vec<Resource> {
186 vec![self.address.as_tcp_resource()]
187 }
188
189 fn can_acknowledge(&self) -> bool {
190 true
191 }
192}
193
194#[derive(Debug, Clone)]
195struct LogstashSource {
196 timestamp_converter: types::Conversion,
197 log_namespace: LogNamespace,
198 legacy_host_key_path: Option<OwnedValuePath>,
199}
200
201impl TcpSource for LogstashSource {
202 type Error = DecodeError;
203 type Item = LogstashEventFrame;
204 type Decoder = LogstashDecoder;
205 type Acker = LogstashAcker;
206
207 fn decoder(&self) -> Self::Decoder {
208 LogstashDecoder::new()
209 }
210
211 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
212 let now = chrono::Utc::now();
213 for event in events {
214 let log = event.as_mut_log();
215
216 self.log_namespace.insert_vector_metadata(
217 log,
218 log_schema().source_type_key(),
219 path!("source_type"),
220 Bytes::from_static(LogstashConfig::NAME.as_bytes()),
221 );
222
223 let log_timestamp = log.get(event_path!("@timestamp")).and_then(|timestamp| {
224 self.timestamp_converter
225 .convert::<Value>(timestamp.coerce_to_bytes())
226 .ok()
227 });
228
229 match self.log_namespace {
234 LogNamespace::Vector => {
235 if let Some(timestamp) = log_timestamp {
236 log.insert(metadata_path!(LogstashConfig::NAME, "timestamp"), timestamp);
237 }
238 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
239 }
240 LogNamespace::Legacy => {
241 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
242 log.insert(
243 timestamp_key,
244 log_timestamp.unwrap_or_else(|| Value::from(now)),
245 );
246 }
247 }
248 }
249
250 self.log_namespace.insert_source_metadata(
251 LogstashConfig::NAME,
252 log,
253 self.legacy_host_key_path
254 .as_ref()
255 .map(LegacyKey::InsertIfEmpty),
256 path!("host"),
257 host.ip().to_string(),
258 );
259 }
260 }
261
262 fn build_acker(&self, frames: &[Self::Item]) -> Self::Acker {
263 LogstashAcker::new(frames)
264 }
265}
266
267struct LogstashAcker {
268 sequence_number: u32,
269 protocol_version: Option<LogstashProtocolVersion>,
270}
271
272impl LogstashAcker {
273 fn new(frames: &[LogstashEventFrame]) -> Self {
274 let mut sequence_number = 0;
275 let mut protocol_version = None;
276
277 for frame in frames {
278 sequence_number = std::cmp::max(sequence_number, frame.sequence_number);
279 protocol_version = Some(frame.protocol);
283 }
284
285 Self {
286 sequence_number,
287 protocol_version,
288 }
289 }
290}
291
292impl TcpSourceAcker for LogstashAcker {
293 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
295 match (ack, self.protocol_version) {
296 (TcpSourceAck::Ack, Some(protocol_version)) => {
297 let mut bytes: Vec<u8> = Vec::with_capacity(6);
298 bytes.push(protocol_version.into());
299 bytes.push(LogstashFrameType::Ack.into());
300 bytes.extend(self.sequence_number.to_be_bytes().iter());
301 Some(Bytes::from(bytes))
302 }
303 _ => None,
304 }
305 }
306}
307
308#[derive(Debug)]
309enum LogstashDecoderReadState {
310 ReadProtocol,
311 ReadType(LogstashProtocolVersion),
312 ReadFrame(LogstashProtocolVersion, LogstashFrameType),
313 PendingFrames(VecDeque<(LogstashEventFrame, usize)>),
314}
315
316#[derive(Debug)]
317struct LogstashDecoder {
318 state: LogstashDecoderReadState,
319}
320
321impl LogstashDecoder {
322 const fn new() -> Self {
323 Self {
324 state: LogstashDecoderReadState::ReadProtocol,
325 }
326 }
327}
328
329#[derive(Debug, Snafu)]
330pub enum DecodeError {
331 #[snafu(display("i/o error: {}", source))]
332 IO { source: io::Error },
333 #[snafu(display("Unknown logstash protocol version: {}", version))]
334 UnknownProtocolVersion { version: char },
335 #[snafu(display("Unknown logstash protocol message type: {}", frame_type))]
336 UnknownFrameType { frame_type: char },
337 #[snafu(display("Failed to decode JSON frame: {}", source))]
338 JsonFrameFailedDecode { source: serde_json::Error },
339 #[snafu(display("Failed to decompress compressed frame: {}", source))]
340 DecompressionFailed { source: io::Error },
341}
342
343impl StreamDecodingError for DecodeError {
344 fn can_continue(&self) -> bool {
345 use DecodeError::*;
346
347 match self {
348 IO { .. } => false,
349 UnknownProtocolVersion { .. } => false,
350 UnknownFrameType { .. } => false,
351 JsonFrameFailedDecode { .. } => true,
352 DecompressionFailed { .. } => true,
353 }
354 }
355}
356
357impl From<io::Error> for DecodeError {
358 fn from(source: io::Error) -> Self {
359 DecodeError::IO { source }
360 }
361}
362
363#[derive(Debug, Clone, Copy)]
364enum LogstashProtocolVersion {
365 V1, V2, }
368
369impl From<LogstashProtocolVersion> for u8 {
370 fn from(frame_type: LogstashProtocolVersion) -> u8 {
371 use LogstashProtocolVersion::*;
372
373 match frame_type {
374 V1 => b'1',
375 V2 => b'2',
376 }
377 }
378}
379
380impl TryFrom<u8> for LogstashProtocolVersion {
381 type Error = DecodeError;
382
383 fn try_from(frame_type: u8) -> Result<LogstashProtocolVersion, DecodeError> {
384 use LogstashProtocolVersion::*;
385
386 match frame_type {
387 b'1' => Ok(V1),
388 b'2' => Ok(V2),
389 version => Err(DecodeError::UnknownProtocolVersion {
390 version: version as char,
391 }),
392 }
393 }
394}
395
396#[derive(Debug, Clone, Copy)]
397enum LogstashFrameType {
398 Ack, WindowSize, Data, Json, Compressed, }
404
405impl From<LogstashFrameType> for u8 {
406 fn from(frame_type: LogstashFrameType) -> u8 {
407 use LogstashFrameType::*;
408
409 match frame_type {
410 Ack => b'A',
411 WindowSize => b'W',
412 Data => b'D',
413 Json => b'J',
414 Compressed => b'C',
415 }
416 }
417}
418
419impl TryFrom<u8> for LogstashFrameType {
420 type Error = DecodeError;
421
422 fn try_from(frame_type: u8) -> Result<LogstashFrameType, DecodeError> {
423 use LogstashFrameType::*;
424
425 match frame_type {
426 b'A' => Ok(Ack),
427 b'W' => Ok(WindowSize),
428 b'D' => Ok(Data),
429 b'J' => Ok(Json),
430 b'C' => Ok(Compressed),
431 frame_type => Err(DecodeError::UnknownFrameType {
432 frame_type: frame_type as char,
433 }),
434 }
435 }
436}
437
438#[derive(Debug)]
440struct LogstashEventFrame {
441 protocol: LogstashProtocolVersion,
442 sequence_number: u32,
443 fields: BTreeMap<KeyString, serde_json::Value>,
444}
445
446impl Decoder for LogstashDecoder {
449 type Item = (LogstashEventFrame, usize);
450 type Error = DecodeError;
451
452 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
453 loop {
460 self.state = match self.state {
461 LogstashDecoderReadState::PendingFrames(ref mut frames) => {
463 match frames.pop_front() {
464 Some(frame) => return Ok(Some(frame)),
465 None => LogstashDecoderReadState::ReadProtocol,
466 }
467 }
468 LogstashDecoderReadState::ReadProtocol => {
469 if src.remaining() < 1 {
470 return Ok(None);
471 }
472
473 use LogstashProtocolVersion::*;
474
475 match LogstashProtocolVersion::try_from(src.get_u8())? {
476 V1 => LogstashDecoderReadState::ReadType(V1),
477 V2 => LogstashDecoderReadState::ReadType(V2),
478 }
479 }
480 LogstashDecoderReadState::ReadType(protocol) => {
481 if src.remaining() < 1 {
482 return Ok(None);
483 }
484
485 use LogstashFrameType::*;
486
487 match LogstashFrameType::try_from(src.get_u8())? {
488 WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize),
489 Data => LogstashDecoderReadState::ReadFrame(protocol, Data),
490 Json => LogstashDecoderReadState::ReadFrame(protocol, Json),
491 Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed),
492 Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack),
493 }
494 }
495 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => {
501 if src.remaining() < 4 {
502 return Ok(None);
503 }
504
505 let _window_size = src.get_u32();
506
507 LogstashDecoderReadState::ReadProtocol
508 }
509 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Ack) => {
513 if src.remaining() < 4 {
514 return Ok(None);
515 }
516
517 let _sequence_number = src.get_u32();
518
519 LogstashDecoderReadState::ReadProtocol
520 }
521 LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => {
523 let Some(frame) = decode_data_frame(protocol, src) else {
524 return Ok(None);
525 };
526
527 LogstashDecoderReadState::PendingFrames([frame].into())
528 }
529 LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => {
531 let Some(frame) = decode_json_frame(protocol, src)? else {
532 return Ok(None);
533 };
534
535 LogstashDecoderReadState::PendingFrames([frame].into())
536 }
537 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => {
539 let Some(frames) = decode_compressed_frame(src)? else {
540 return Ok(None);
541 };
542
543 LogstashDecoderReadState::PendingFrames(frames)
544 }
545 };
546 }
547 }
548}
549
550fn decode_data_frame(
552 protocol: LogstashProtocolVersion,
553 src: &mut BytesMut,
554) -> Option<(LogstashEventFrame, usize)> {
555 let mut rest = src.as_ref();
556
557 if rest.remaining() < 8 {
558 return None;
559 }
560 let sequence_number = rest.get_u32();
561 let pair_count = rest.get_u32();
562 if pair_count == 0 {
563 return None; }
565
566 let mut fields = BTreeMap::<KeyString, serde_json::Value>::new();
567 for _ in 0..pair_count {
568 let (key, value, right) = decode_pair(rest)?;
569 rest = right;
570
571 fields.insert(
572 String::from_utf8_lossy(key).into(),
573 String::from_utf8_lossy(value).into(),
574 );
575 }
576
577 let byte_size = bytes_remaining(src, rest);
578 src.advance(byte_size);
579
580 Some((
581 LogstashEventFrame {
582 protocol,
583 sequence_number,
584 fields,
585 },
586 byte_size,
587 ))
588}
589
590fn decode_pair(mut rest: &[u8]) -> Option<(&[u8], &[u8], &[u8])> {
591 if rest.remaining() < 4 {
592 return None;
593 }
594 let key_length = rest.get_u32() as usize;
595
596 if rest.remaining() < key_length {
597 return None;
598 }
599 let (key, right) = rest.split_at(key_length);
600 rest = right;
601
602 if rest.remaining() < 4 {
603 return None;
604 }
605 let value_length = rest.get_u32() as usize;
606 if rest.remaining() < value_length {
607 return None;
608 }
609 let (value, right) = rest.split_at(value_length);
610 Some((key, value, right))
611}
612
613fn decode_json_frame(
614 protocol: LogstashProtocolVersion,
615 src: &mut BytesMut,
616) -> Result<Option<(LogstashEventFrame, usize)>, DecodeError> {
617 let mut rest = src.as_ref();
618
619 if rest.remaining() < 8 {
620 return Ok(None);
621 }
622 let sequence_number = rest.get_u32();
623 let payload_size = rest.get_u32() as usize;
624
625 if rest.remaining() < payload_size {
626 return Ok(None);
627 }
628
629 let (slice, right) = rest.split_at(payload_size);
630 rest = right;
631
632 let fields: BTreeMap<KeyString, serde_json::Value> =
633 serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {})?;
634
635 let byte_size = bytes_remaining(src, rest);
636 src.advance(byte_size);
637
638 Ok(Some((
639 LogstashEventFrame {
640 protocol,
641 sequence_number,
642 fields,
643 },
644 byte_size,
645 )))
646}
647
648fn decode_compressed_frame(
649 src: &mut BytesMut,
650) -> Result<Option<VecDeque<(LogstashEventFrame, usize)>>, DecodeError> {
651 let mut rest = src.as_ref();
652
653 if rest.remaining() < 4 {
654 return Ok(None);
655 }
656 let payload_size = rest.get_u32() as usize;
657
658 if rest.remaining() < payload_size {
659 src.reserve(payload_size);
660 return Ok(None);
661 }
662
663 let (slice, right) = rest.split_at(payload_size);
664 rest = right;
665
666 let mut buf = Vec::new();
667
668 let res = ZlibDecoder::new(io::Cursor::new(slice))
669 .read_to_end(&mut buf)
670 .context(DecompressionFailedSnafu)
671 .map(|_| BytesMut::from(&buf[..]));
672
673 let byte_size = bytes_remaining(src, rest);
674 src.advance(byte_size);
675
676 let mut buf = res?;
677
678 let mut decoder = LogstashDecoder::new();
679
680 let mut frames = VecDeque::new();
681
682 while let Some(s) = decoder.decode(&mut buf)? {
683 frames.push_back(s);
684 }
685 Ok(Some(frames))
686}
687
688fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize {
689 let remaining = rest.remaining();
690 src.remaining() - remaining
691}
692
693impl From<LogstashEventFrame> for Event {
694 fn from(frame: LogstashEventFrame) -> Self {
695 Event::Log(LogEvent::from(
696 frame
697 .fields
698 .into_iter()
699 .map(|(key, value)| (key, Value::from(value)))
700 .collect::<BTreeMap<_, _>>(),
701 ))
702 }
703}
704
705impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
706 fn from(frame: LogstashEventFrame) -> Self {
707 smallvec![frame.into()]
708 }
709}
710
711#[cfg(test)]
712mod test {
713 use bytes::BufMut;
714 use futures::Stream;
715 use rand::{rng, Rng};
716 use tokio::io::{AsyncReadExt, AsyncWriteExt};
717 use vector_lib::lookup::OwnedTargetPath;
718 use vrl::value::kind::Collection;
719
720 use super::*;
721 use crate::{
722 event::EventStatus,
723 test_util::{
724 components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
725 next_addr, spawn_collect_n, wait_for_tcp,
726 },
727 SourceSender,
728 };
729
730 #[test]
731 fn generate_config() {
732 crate::test_util::test_generate_config::<LogstashConfig>();
733 }
734
735 #[tokio::test]
736 async fn test_delivered() {
737 test_protocol(EventStatus::Delivered, true).await;
738 }
739
740 #[tokio::test]
741 async fn test_failed() {
742 test_protocol(EventStatus::Rejected, false).await;
743 }
744
745 async fn start_logstash(
746 status: EventStatus,
747 ) -> (SocketAddr, impl Stream<Item = Event> + Unpin) {
748 let (sender, recv) = SourceSender::new_test_finalize(status);
749 let address = next_addr();
750 let source = LogstashConfig {
751 address: address.into(),
752 tls: None,
753 permit_origin: None,
754 keepalive: None,
755 receive_buffer_bytes: None,
756 acknowledgements: true.into(),
757 connection_limit: None,
758 log_namespace: None,
759 }
760 .build(SourceContext::new_test(sender, None))
761 .await
762 .unwrap();
763 tokio::spawn(source);
764 wait_for_tcp(address).await;
765 (address, recv)
766 }
767
768 async fn test_protocol(status: EventStatus, sends_ack: bool) {
769 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
770 let (address, recv) = start_logstash(status).await;
771 spawn_collect_n(
772 send_req(address, &[("message", "Hello, world!")], sends_ack),
773 recv,
774 1,
775 )
776 .await
777 })
778 .await;
779
780 assert_eq!(events.len(), 1);
781 let log = events[0].as_log();
782 assert_eq!(
783 log.get("message").unwrap().to_string_lossy(),
784 "Hello, world!".to_string()
785 );
786 assert_eq!(
787 log.get("source_type").unwrap().to_string_lossy(),
788 "logstash".to_string()
789 );
790 assert!(log.get("host").is_some());
791 assert!(log.get("timestamp").is_some());
792 }
793
794 fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
795 let mut req = BytesMut::new();
796 req.put_u8(b'2');
797 req.put_u8(b'D');
798 req.put_u32(seq);
799 req.put_u32(pairs.len() as u32);
800 for (key, value) in pairs {
801 req.put_u32(key.len() as u32);
802 req.put(key.as_bytes());
803 req.put_u32(value.len() as u32);
804 req.put(value.as_bytes());
805 }
806 req.into()
807 }
808
809 #[test]
810 fn v1_decoder_does_not_panic() {
811 let seq = rng().random_range(1..u32::MAX);
812 let req = encode_req(seq, &[("message", "Hello, World!")]);
813 for i in 0..req.len() - 1 {
814 assert!(
815 decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i]))
816 .is_none()
817 );
818 }
819 }
820
821 async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) {
822 let seq = rng().random_range(1..u32::MAX);
823 let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
824
825 let req = encode_req(seq, pairs);
826 socket.write_all(&req).await.unwrap();
827
828 let mut output = BytesMut::new();
829 socket.read_buf(&mut output).await.unwrap();
830
831 if sends_ack {
832 assert_eq!(output.get_u8(), b'2');
833 assert_eq!(output.get_u8(), b'A');
834 assert_eq!(output.get_u32(), seq);
835 }
836 assert_eq!(output.len(), 0);
837 }
838
839 #[test]
840 fn output_schema_definition_vector_namespace() {
841 let config = LogstashConfig {
842 log_namespace: Some(true),
843 ..Default::default()
844 };
845
846 let definitions = config
847 .outputs(LogNamespace::Vector)
848 .remove(0)
849 .schema_definition(true);
850
851 let expected_definition =
852 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
853 .with_meaning(OwnedTargetPath::event_root(), "message")
854 .with_metadata_field(
855 &owned_value_path!("vector", "source_type"),
856 Kind::bytes(),
857 None,
858 )
859 .with_metadata_field(
860 &owned_value_path!("vector", "ingest_timestamp"),
861 Kind::timestamp(),
862 None,
863 )
864 .with_metadata_field(
865 &owned_value_path!(LogstashConfig::NAME, "timestamp"),
866 Kind::timestamp().or_undefined(),
867 Some("timestamp"),
868 )
869 .with_metadata_field(
870 &owned_value_path!(LogstashConfig::NAME, "host"),
871 Kind::bytes(),
872 Some("host"),
873 )
874 .with_metadata_field(
875 &owned_value_path!(LogstashConfig::NAME, "tls_client_metadata"),
876 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
877 None,
878 );
879
880 assert_eq!(definitions, Some(expected_definition))
881 }
882
883 #[test]
884 fn output_schema_definition_legacy_namespace() {
885 let config = LogstashConfig::default();
886
887 let definitions = config
888 .outputs(LogNamespace::Legacy)
889 .remove(0)
890 .schema_definition(true);
891
892 let expected_definition = Definition::new_with_default_metadata(
893 Kind::object(Collection::empty()),
894 [LogNamespace::Legacy],
895 )
896 .with_event_field(
897 &owned_value_path!("message"),
898 Kind::bytes(),
899 Some("message"),
900 )
901 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
902 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
903 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"));
904
905 assert_eq!(definitions, Some(expected_definition))
906 }
907}
908
909#[cfg(all(test, feature = "logstash-integration-tests"))]
910mod integration_tests {
911 use std::time::Duration;
912
913 use futures::Stream;
914 use tokio::time::timeout;
915
916 use super::*;
917 use crate::{
918 config::SourceContext,
919 event::EventStatus,
920 test_util::{
921 collect_n,
922 components::{assert_source_compliance, SOCKET_PUSH_SOURCE_TAGS},
923 wait_for_tcp,
924 },
925 tls::{TlsConfig, TlsEnableableConfig},
926 SourceSender,
927 };
928
929 fn heartbeat_address() -> String {
930 std::env::var("HEARTBEAT_ADDRESS")
931 .expect("Address of Beats Heartbeat service must be specified.")
932 }
933
934 #[tokio::test]
935 async fn beats_heartbeat() {
936 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
937 let out = source(heartbeat_address(), None).await;
938
939 timeout(Duration::from_secs(60), collect_n(out, 1))
940 .await
941 .unwrap()
942 })
943 .await;
944
945 assert!(!events.is_empty());
946
947 let log = events[0].as_log();
948 assert_eq!(
949 log.get("@metadata.beat"),
950 Some(String::from("heartbeat").into()).as_ref()
951 );
952 assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
953 assert!(log.get("timestamp").is_some());
954 assert!(log.get("host").is_some());
955 }
956
957 fn logstash_address() -> String {
958 std::env::var("LOGSTASH_ADDRESS")
959 .expect("Listen address for `logstash` source must be specified.")
960 }
961
962 #[tokio::test]
963 async fn logstash() {
964 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
965 let out = source(
966 logstash_address(),
967 Some(TlsEnableableConfig {
968 enabled: Some(true),
969 options: TlsConfig {
970 crt_file: Some("tests/data/host.docker.internal.crt".into()),
971 key_file: Some("tests/data/host.docker.internal.key".into()),
972 ..Default::default()
973 },
974 }),
975 )
976 .await;
977
978 timeout(Duration::from_secs(60), collect_n(out, 1))
979 .await
980 .unwrap()
981 })
982 .await;
983
984 assert!(!events.is_empty());
985
986 let log = events[0].as_log();
987 assert!(log
988 .get("line")
989 .unwrap()
990 .to_string_lossy()
991 .contains("Hello World"));
992 assert!(log.get("host").is_some());
993 }
994
995 async fn source(
996 address: String,
997 tls: Option<TlsEnableableConfig>,
998 ) -> impl Stream<Item = Event> + Unpin {
999 let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1000 let address: SocketAddr = address.parse().unwrap();
1001 let tls_config = TlsSourceConfig {
1002 client_metadata_key: None,
1003 tls_config: tls.unwrap_or_default(),
1004 };
1005 tokio::spawn(async move {
1006 LogstashConfig {
1007 address: address.into(),
1008 tls: Some(tls_config),
1009 keepalive: None,
1010 permit_origin: None,
1011 receive_buffer_bytes: None,
1012 acknowledgements: false.into(),
1013 connection_limit: None,
1014 log_namespace: None,
1015 }
1016 .build(SourceContext::new_test(sender, None))
1017 .await
1018 .unwrap()
1019 .await
1020 .unwrap()
1021 });
1022 wait_for_tcp(address).await;
1023 recv
1024 }
1025}