1use std::{
2 collections::{BTreeMap, VecDeque},
3 convert::TryFrom,
4 io::{self, Read},
5 net::SocketAddr,
6 time::Duration,
7};
8
9use bytes::{Buf, Bytes, BytesMut};
10use flate2::read::ZlibDecoder;
11use smallvec::{SmallVec, smallvec};
12use snafu::{ResultExt, Snafu};
13use tokio_util::codec::Decoder;
14use vector_lib::{
15 codecs::{BytesDeserializerConfig, StreamDecodingError},
16 config::{LegacyKey, LogNamespace},
17 configurable::configurable_component,
18 ipallowlist::IpAllowlistConfig,
19 lookup::{OwnedValuePath, event_path, metadata_path, owned_value_path, path},
20 schema::Definition,
21};
22use vrl::value::{KeyString, Kind, kind::Collection};
23
24use super::util::net::{SocketListenAddr, TcpSource, TcpSourceAck, TcpSourceAcker};
25use crate::{
26 config::{
27 DataType, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
28 SourceContext, SourceOutput, log_schema,
29 },
30 event::{Event, LogEvent, Value},
31 serde::bool_or_struct,
32 tcp::TcpKeepaliveConfig,
33 tls::{MaybeTlsSettings, TlsSourceConfig},
34 types,
35};
36
37#[configurable_component(source("logstash", "Collect logs from a Logstash agent."))]
39#[derive(Clone, Debug)]
40pub struct LogstashConfig {
41 #[configurable(derived)]
42 address: SocketListenAddr,
43
44 #[configurable(derived)]
45 #[configurable(metadata(docs::advanced))]
46 keepalive: Option<TcpKeepaliveConfig>,
47
48 #[configurable(derived)]
49 pub permit_origin: Option<IpAllowlistConfig>,
50
51 #[configurable(derived)]
52 tls: Option<TlsSourceConfig>,
53
54 #[configurable(metadata(docs::type_unit = "bytes"))]
56 #[configurable(metadata(docs::examples = 65536))]
57 #[configurable(metadata(docs::advanced))]
58 receive_buffer_bytes: Option<usize>,
59
60 #[configurable(metadata(docs::type_unit = "connections"))]
62 #[configurable(metadata(docs::advanced))]
63 connection_limit: Option<u32>,
64
65 #[configurable(derived)]
66 #[serde(default, deserialize_with = "bool_or_struct")]
67 acknowledgements: SourceAcknowledgementsConfig,
68
69 #[configurable(metadata(docs::hidden))]
71 #[serde(default)]
72 log_namespace: Option<bool>,
73}
74
75impl LogstashConfig {
76 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
78 let host_key = log_schema()
80 .host_key()
81 .cloned()
82 .map(LegacyKey::InsertIfEmpty);
83
84 let tls_client_metadata_path = self
85 .tls
86 .as_ref()
87 .and_then(|tls| tls.client_metadata_key.as_ref())
88 .and_then(|k| k.path.clone())
89 .map(LegacyKey::Overwrite);
90
91 BytesDeserializerConfig
92 .schema_definition(log_namespace)
93 .with_standard_vector_source_metadata()
94 .with_source_metadata(
95 LogstashConfig::NAME,
96 None,
97 &owned_value_path!("timestamp"),
98 Kind::timestamp().or_undefined(),
99 Some("timestamp"),
100 )
101 .with_source_metadata(
102 LogstashConfig::NAME,
103 host_key,
104 &owned_value_path!("host"),
105 Kind::bytes(),
106 Some("host"),
107 )
108 .with_source_metadata(
109 Self::NAME,
110 tls_client_metadata_path,
111 &owned_value_path!("tls_client_metadata"),
112 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
113 None,
114 )
115 }
116}
117
118impl Default for LogstashConfig {
119 fn default() -> Self {
120 Self {
121 address: SocketListenAddr::SocketAddr("0.0.0.0:5044".parse().unwrap()),
122 keepalive: None,
123 permit_origin: None,
124 tls: None,
125 receive_buffer_bytes: None,
126 acknowledgements: Default::default(),
127 connection_limit: None,
128 log_namespace: None,
129 }
130 }
131}
132
133impl GenerateConfig for LogstashConfig {
134 fn generate_config() -> toml::Value {
135 toml::Value::try_from(LogstashConfig::default()).unwrap()
136 }
137}
138
139#[async_trait::async_trait]
140#[typetag::serde(name = "logstash")]
141impl SourceConfig for LogstashConfig {
142 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
143 let log_namespace = cx.log_namespace(self.log_namespace);
144 let source = LogstashSource {
145 timestamp_converter: types::Conversion::Timestamp(cx.globals.timezone()),
146 legacy_host_key_path: log_schema().host_key().cloned(),
147 log_namespace,
148 };
149 let shutdown_secs = Duration::from_secs(30);
150 let tls_config = self.tls.as_ref().map(|tls| tls.tls_config.clone());
151 let tls_client_metadata_key = self
152 .tls
153 .as_ref()
154 .and_then(|tls| tls.client_metadata_key.clone())
155 .and_then(|k| k.path);
156
157 let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
158 source.run(
159 self.address,
160 self.keepalive,
161 shutdown_secs,
162 tls,
163 tls_client_metadata_key,
164 self.receive_buffer_bytes,
165 None,
166 cx,
167 self.acknowledgements,
168 self.connection_limit,
169 self.permit_origin.clone().map(Into::into),
170 LogstashConfig::NAME,
171 log_namespace,
172 )
173 }
174
175 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
176 vec![SourceOutput::new_maybe_logs(
179 DataType::Log,
180 self.schema_definition(global_log_namespace.merge(self.log_namespace)),
181 )]
182 }
183
184 fn resources(&self) -> Vec<Resource> {
185 vec![self.address.as_tcp_resource()]
186 }
187
188 fn can_acknowledge(&self) -> bool {
189 true
190 }
191}
192
193#[derive(Debug, Clone)]
194struct LogstashSource {
195 timestamp_converter: types::Conversion,
196 log_namespace: LogNamespace,
197 legacy_host_key_path: Option<OwnedValuePath>,
198}
199
200impl TcpSource for LogstashSource {
201 type Error = DecodeError;
202 type Item = LogstashEventFrame;
203 type Decoder = LogstashDecoder;
204 type Acker = LogstashAcker;
205
206 fn decoder(&self) -> Self::Decoder {
207 LogstashDecoder::new()
208 }
209
210 fn handle_events(&self, events: &mut [Event], host: SocketAddr) {
211 let now = chrono::Utc::now();
212 for event in events {
213 let log = event.as_mut_log();
214
215 self.log_namespace.insert_vector_metadata(
216 log,
217 log_schema().source_type_key(),
218 path!("source_type"),
219 Bytes::from_static(LogstashConfig::NAME.as_bytes()),
220 );
221
222 let log_timestamp = log.get(event_path!("@timestamp")).and_then(|timestamp| {
223 self.timestamp_converter
224 .convert::<Value>(timestamp.coerce_to_bytes())
225 .ok()
226 });
227
228 match self.log_namespace {
233 LogNamespace::Vector => {
234 if let Some(timestamp) = log_timestamp {
235 log.insert(metadata_path!(LogstashConfig::NAME, "timestamp"), timestamp);
236 }
237 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
238 }
239 LogNamespace::Legacy => {
240 if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
241 log.insert(
242 timestamp_key,
243 log_timestamp.unwrap_or_else(|| Value::from(now)),
244 );
245 }
246 }
247 }
248
249 self.log_namespace.insert_source_metadata(
250 LogstashConfig::NAME,
251 log,
252 self.legacy_host_key_path
253 .as_ref()
254 .map(LegacyKey::InsertIfEmpty),
255 path!("host"),
256 host.ip().to_string(),
257 );
258 }
259 }
260
261 fn build_acker(&self, frames: &[Self::Item]) -> Self::Acker {
262 LogstashAcker::new(frames)
263 }
264}
265
266struct LogstashAcker {
267 sequence_number: u32,
268 protocol_version: Option<LogstashProtocolVersion>,
269}
270
271impl LogstashAcker {
272 fn new(frames: &[LogstashEventFrame]) -> Self {
273 let mut sequence_number = 0;
274 let mut protocol_version = None;
275
276 for frame in frames {
277 sequence_number = std::cmp::max(sequence_number, frame.sequence_number);
278 protocol_version = Some(frame.protocol);
282 }
283
284 Self {
285 sequence_number,
286 protocol_version,
287 }
288 }
289}
290
291impl TcpSourceAcker for LogstashAcker {
292 fn build_ack(self, ack: TcpSourceAck) -> Option<Bytes> {
294 match (ack, self.protocol_version) {
295 (TcpSourceAck::Ack, Some(protocol_version)) => {
296 let mut bytes: Vec<u8> = Vec::with_capacity(6);
297 bytes.push(protocol_version.into());
298 bytes.push(LogstashFrameType::Ack.into());
299 bytes.extend(self.sequence_number.to_be_bytes().iter());
300 Some(Bytes::from(bytes))
301 }
302 _ => None,
303 }
304 }
305}
306
307#[derive(Debug)]
308enum LogstashDecoderReadState {
309 ReadProtocol,
310 ReadType(LogstashProtocolVersion),
311 ReadFrame(LogstashProtocolVersion, LogstashFrameType),
312 PendingFrames(VecDeque<(LogstashEventFrame, usize)>),
313}
314
315#[derive(Debug)]
316struct LogstashDecoder {
317 state: LogstashDecoderReadState,
318}
319
320impl LogstashDecoder {
321 const fn new() -> Self {
322 Self {
323 state: LogstashDecoderReadState::ReadProtocol,
324 }
325 }
326}
327
328#[derive(Debug, Snafu)]
329pub enum DecodeError {
330 #[snafu(display("i/o error: {}", source))]
331 IO { source: io::Error },
332 #[snafu(display("Unknown logstash protocol version: {}", version))]
333 UnknownProtocolVersion { version: char },
334 #[snafu(display("Unknown logstash protocol message type: {}", frame_type))]
335 UnknownFrameType { frame_type: char },
336 #[snafu(display("Failed to decode JSON frame: {}", source))]
337 JsonFrameFailedDecode { source: serde_json::Error },
338 #[snafu(display("Failed to decompress compressed frame: {}", source))]
339 DecompressionFailed { source: io::Error },
340}
341
342impl StreamDecodingError for DecodeError {
343 fn can_continue(&self) -> bool {
344 use DecodeError::*;
345
346 match self {
347 IO { .. } => false,
348 UnknownProtocolVersion { .. } => false,
349 UnknownFrameType { .. } => false,
350 JsonFrameFailedDecode { .. } => true,
351 DecompressionFailed { .. } => true,
352 }
353 }
354}
355
356impl From<io::Error> for DecodeError {
357 fn from(source: io::Error) -> Self {
358 DecodeError::IO { source }
359 }
360}
361
362#[derive(Debug, Clone, Copy)]
363enum LogstashProtocolVersion {
364 V1, V2, }
367
368impl From<LogstashProtocolVersion> for u8 {
369 fn from(frame_type: LogstashProtocolVersion) -> u8 {
370 use LogstashProtocolVersion::*;
371
372 match frame_type {
373 V1 => b'1',
374 V2 => b'2',
375 }
376 }
377}
378
379impl TryFrom<u8> for LogstashProtocolVersion {
380 type Error = DecodeError;
381
382 fn try_from(frame_type: u8) -> Result<LogstashProtocolVersion, DecodeError> {
383 use LogstashProtocolVersion::*;
384
385 match frame_type {
386 b'1' => Ok(V1),
387 b'2' => Ok(V2),
388 version => Err(DecodeError::UnknownProtocolVersion {
389 version: version as char,
390 }),
391 }
392 }
393}
394
395#[derive(Debug, Clone, Copy)]
396enum LogstashFrameType {
397 Ack, WindowSize, Data, Json, Compressed, }
403
404impl From<LogstashFrameType> for u8 {
405 fn from(frame_type: LogstashFrameType) -> u8 {
406 use LogstashFrameType::*;
407
408 match frame_type {
409 Ack => b'A',
410 WindowSize => b'W',
411 Data => b'D',
412 Json => b'J',
413 Compressed => b'C',
414 }
415 }
416}
417
418impl TryFrom<u8> for LogstashFrameType {
419 type Error = DecodeError;
420
421 fn try_from(frame_type: u8) -> Result<LogstashFrameType, DecodeError> {
422 use LogstashFrameType::*;
423
424 match frame_type {
425 b'A' => Ok(Ack),
426 b'W' => Ok(WindowSize),
427 b'D' => Ok(Data),
428 b'J' => Ok(Json),
429 b'C' => Ok(Compressed),
430 frame_type => Err(DecodeError::UnknownFrameType {
431 frame_type: frame_type as char,
432 }),
433 }
434 }
435}
436
437#[derive(Debug)]
439struct LogstashEventFrame {
440 protocol: LogstashProtocolVersion,
441 sequence_number: u32,
442 fields: BTreeMap<KeyString, serde_json::Value>,
443}
444
445impl Decoder for LogstashDecoder {
448 type Item = (LogstashEventFrame, usize);
449 type Error = DecodeError;
450
451 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
452 loop {
459 self.state = match self.state {
460 LogstashDecoderReadState::PendingFrames(ref mut frames) => {
462 match frames.pop_front() {
463 Some(frame) => return Ok(Some(frame)),
464 None => LogstashDecoderReadState::ReadProtocol,
465 }
466 }
467 LogstashDecoderReadState::ReadProtocol => {
468 if src.remaining() < 1 {
469 return Ok(None);
470 }
471
472 use LogstashProtocolVersion::*;
473
474 match LogstashProtocolVersion::try_from(src.get_u8())? {
475 V1 => LogstashDecoderReadState::ReadType(V1),
476 V2 => LogstashDecoderReadState::ReadType(V2),
477 }
478 }
479 LogstashDecoderReadState::ReadType(protocol) => {
480 if src.remaining() < 1 {
481 return Ok(None);
482 }
483
484 use LogstashFrameType::*;
485
486 match LogstashFrameType::try_from(src.get_u8())? {
487 WindowSize => LogstashDecoderReadState::ReadFrame(protocol, WindowSize),
488 Data => LogstashDecoderReadState::ReadFrame(protocol, Data),
489 Json => LogstashDecoderReadState::ReadFrame(protocol, Json),
490 Compressed => LogstashDecoderReadState::ReadFrame(protocol, Compressed),
491 Ack => LogstashDecoderReadState::ReadFrame(protocol, Ack),
492 }
493 }
494 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::WindowSize) => {
500 if src.remaining() < 4 {
501 return Ok(None);
502 }
503
504 let _window_size = src.get_u32();
505
506 LogstashDecoderReadState::ReadProtocol
507 }
508 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Ack) => {
512 if src.remaining() < 4 {
513 return Ok(None);
514 }
515
516 let _sequence_number = src.get_u32();
517
518 LogstashDecoderReadState::ReadProtocol
519 }
520 LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Data) => {
522 let Some(frame) = decode_data_frame(protocol, src) else {
523 return Ok(None);
524 };
525
526 LogstashDecoderReadState::PendingFrames([frame].into())
527 }
528 LogstashDecoderReadState::ReadFrame(protocol, LogstashFrameType::Json) => {
530 let Some(frame) = decode_json_frame(protocol, src)? else {
531 return Ok(None);
532 };
533
534 LogstashDecoderReadState::PendingFrames([frame].into())
535 }
536 LogstashDecoderReadState::ReadFrame(_protocol, LogstashFrameType::Compressed) => {
538 let Some(frames) = decode_compressed_frame(src)? else {
539 return Ok(None);
540 };
541
542 LogstashDecoderReadState::PendingFrames(frames)
543 }
544 };
545 }
546 }
547}
548
549fn decode_data_frame(
551 protocol: LogstashProtocolVersion,
552 src: &mut BytesMut,
553) -> Option<(LogstashEventFrame, usize)> {
554 let mut rest = src.as_ref();
555
556 if rest.remaining() < 8 {
557 return None;
558 }
559 let sequence_number = rest.get_u32();
560 let pair_count = rest.get_u32();
561 if pair_count == 0 {
562 return None; }
564
565 let mut fields = BTreeMap::<KeyString, serde_json::Value>::new();
566 for _ in 0..pair_count {
567 let (key, value, right) = decode_pair(rest)?;
568 rest = right;
569
570 fields.insert(
571 String::from_utf8_lossy(key).into(),
572 String::from_utf8_lossy(value).into(),
573 );
574 }
575
576 let byte_size = bytes_remaining(src, rest);
577 src.advance(byte_size);
578
579 Some((
580 LogstashEventFrame {
581 protocol,
582 sequence_number,
583 fields,
584 },
585 byte_size,
586 ))
587}
588
589fn decode_pair(mut rest: &[u8]) -> Option<(&[u8], &[u8], &[u8])> {
590 if rest.remaining() < 4 {
591 return None;
592 }
593 let key_length = rest.get_u32() as usize;
594
595 if rest.remaining() < key_length {
596 return None;
597 }
598 let (key, right) = rest.split_at(key_length);
599 rest = right;
600
601 if rest.remaining() < 4 {
602 return None;
603 }
604 let value_length = rest.get_u32() as usize;
605 if rest.remaining() < value_length {
606 return None;
607 }
608 let (value, right) = rest.split_at(value_length);
609 Some((key, value, right))
610}
611
612fn decode_json_frame(
613 protocol: LogstashProtocolVersion,
614 src: &mut BytesMut,
615) -> Result<Option<(LogstashEventFrame, usize)>, DecodeError> {
616 let mut rest = src.as_ref();
617
618 if rest.remaining() < 8 {
619 return Ok(None);
620 }
621 let sequence_number = rest.get_u32();
622 let payload_size = rest.get_u32() as usize;
623
624 if rest.remaining() < payload_size {
625 return Ok(None);
626 }
627
628 let (slice, right) = rest.split_at(payload_size);
629 rest = right;
630
631 let fields: BTreeMap<KeyString, serde_json::Value> =
632 serde_json::from_slice(slice).context(JsonFrameFailedDecodeSnafu {})?;
633
634 let byte_size = bytes_remaining(src, rest);
635 src.advance(byte_size);
636
637 Ok(Some((
638 LogstashEventFrame {
639 protocol,
640 sequence_number,
641 fields,
642 },
643 byte_size,
644 )))
645}
646
647fn decode_compressed_frame(
648 src: &mut BytesMut,
649) -> Result<Option<VecDeque<(LogstashEventFrame, usize)>>, DecodeError> {
650 let mut rest = src.as_ref();
651
652 if rest.remaining() < 4 {
653 return Ok(None);
654 }
655 let payload_size = rest.get_u32() as usize;
656
657 if rest.remaining() < payload_size {
658 src.reserve(payload_size);
659 return Ok(None);
660 }
661
662 let (slice, right) = rest.split_at(payload_size);
663 rest = right;
664
665 let mut buf = Vec::new();
666
667 let res = ZlibDecoder::new(io::Cursor::new(slice))
668 .read_to_end(&mut buf)
669 .context(DecompressionFailedSnafu)
670 .map(|_| BytesMut::from(&buf[..]));
671
672 let byte_size = bytes_remaining(src, rest);
673 src.advance(byte_size);
674
675 let mut buf = res?;
676
677 let mut decoder = LogstashDecoder::new();
678
679 let mut frames = VecDeque::new();
680
681 while let Some(s) = decoder.decode(&mut buf)? {
682 frames.push_back(s);
683 }
684 Ok(Some(frames))
685}
686
687fn bytes_remaining(src: &BytesMut, rest: &[u8]) -> usize {
688 let remaining = rest.remaining();
689 src.remaining() - remaining
690}
691
692impl From<LogstashEventFrame> for Event {
693 fn from(frame: LogstashEventFrame) -> Self {
694 Event::Log(LogEvent::from(
695 frame
696 .fields
697 .into_iter()
698 .map(|(key, value)| (key, Value::from(value)))
699 .collect::<BTreeMap<_, _>>(),
700 ))
701 }
702}
703
704impl From<LogstashEventFrame> for SmallVec<[Event; 1]> {
705 fn from(frame: LogstashEventFrame) -> Self {
706 smallvec![frame.into()]
707 }
708}
709
710#[cfg(test)]
711mod test {
712 use bytes::BufMut;
713 use futures::Stream;
714 use rand::{Rng, rng};
715 use tokio::io::{AsyncReadExt, AsyncWriteExt};
716 use vector_lib::lookup::OwnedTargetPath;
717 use vrl::value::kind::Collection;
718
719 use super::*;
720 use crate::{
721 SourceSender,
722 event::EventStatus,
723 test_util::{
724 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
725 next_addr, spawn_collect_n, wait_for_tcp,
726 },
727 };
728
729 #[test]
730 fn generate_config() {
731 crate::test_util::test_generate_config::<LogstashConfig>();
732 }
733
734 #[tokio::test]
735 async fn test_delivered() {
736 test_protocol(EventStatus::Delivered, true).await;
737 }
738
739 #[tokio::test]
740 async fn test_failed() {
741 test_protocol(EventStatus::Rejected, false).await;
742 }
743
744 async fn start_logstash(
745 status: EventStatus,
746 ) -> (SocketAddr, impl Stream<Item = Event> + Unpin) {
747 let (sender, recv) = SourceSender::new_test_finalize(status);
748 let address = next_addr();
749 let source = LogstashConfig {
750 address: address.into(),
751 tls: None,
752 permit_origin: None,
753 keepalive: None,
754 receive_buffer_bytes: None,
755 acknowledgements: true.into(),
756 connection_limit: None,
757 log_namespace: None,
758 }
759 .build(SourceContext::new_test(sender, None))
760 .await
761 .unwrap();
762 tokio::spawn(source);
763 wait_for_tcp(address).await;
764 (address, recv)
765 }
766
767 async fn test_protocol(status: EventStatus, sends_ack: bool) {
768 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
769 let (address, recv) = start_logstash(status).await;
770 spawn_collect_n(
771 send_req(address, &[("message", "Hello, world!")], sends_ack),
772 recv,
773 1,
774 )
775 .await
776 })
777 .await;
778
779 assert_eq!(events.len(), 1);
780 let log = events[0].as_log();
781 assert_eq!(
782 log.get("message").unwrap().to_string_lossy(),
783 "Hello, world!".to_string()
784 );
785 assert_eq!(
786 log.get("source_type").unwrap().to_string_lossy(),
787 "logstash".to_string()
788 );
789 assert!(log.get("host").is_some());
790 assert!(log.get("timestamp").is_some());
791 }
792
793 fn encode_req(seq: u32, pairs: &[(&str, &str)]) -> Bytes {
794 let mut req = BytesMut::new();
795 req.put_u8(b'2');
796 req.put_u8(b'D');
797 req.put_u32(seq);
798 req.put_u32(pairs.len() as u32);
799 for (key, value) in pairs {
800 req.put_u32(key.len() as u32);
801 req.put(key.as_bytes());
802 req.put_u32(value.len() as u32);
803 req.put(value.as_bytes());
804 }
805 req.into()
806 }
807
808 #[test]
809 fn v1_decoder_does_not_panic() {
810 let seq = rng().random_range(1..u32::MAX);
811 let req = encode_req(seq, &[("message", "Hello, World!")]);
812 for i in 0..req.len() - 1 {
813 assert!(
814 decode_data_frame(LogstashProtocolVersion::V1, &mut BytesMut::from(&req[..i]))
815 .is_none()
816 );
817 }
818 }
819
820 async fn send_req(address: SocketAddr, pairs: &[(&str, &str)], sends_ack: bool) {
821 let seq = rng().random_range(1..u32::MAX);
822 let mut socket = tokio::net::TcpStream::connect(address).await.unwrap();
823
824 let req = encode_req(seq, pairs);
825 socket.write_all(&req).await.unwrap();
826
827 let mut output = BytesMut::new();
828 socket.read_buf(&mut output).await.unwrap();
829
830 if sends_ack {
831 assert_eq!(output.get_u8(), b'2');
832 assert_eq!(output.get_u8(), b'A');
833 assert_eq!(output.get_u32(), seq);
834 }
835 assert_eq!(output.len(), 0);
836 }
837
838 #[test]
839 fn output_schema_definition_vector_namespace() {
840 let config = LogstashConfig {
841 log_namespace: Some(true),
842 ..Default::default()
843 };
844
845 let definitions = config
846 .outputs(LogNamespace::Vector)
847 .remove(0)
848 .schema_definition(true);
849
850 let expected_definition =
851 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
852 .with_meaning(OwnedTargetPath::event_root(), "message")
853 .with_metadata_field(
854 &owned_value_path!("vector", "source_type"),
855 Kind::bytes(),
856 None,
857 )
858 .with_metadata_field(
859 &owned_value_path!("vector", "ingest_timestamp"),
860 Kind::timestamp(),
861 None,
862 )
863 .with_metadata_field(
864 &owned_value_path!(LogstashConfig::NAME, "timestamp"),
865 Kind::timestamp().or_undefined(),
866 Some("timestamp"),
867 )
868 .with_metadata_field(
869 &owned_value_path!(LogstashConfig::NAME, "host"),
870 Kind::bytes(),
871 Some("host"),
872 )
873 .with_metadata_field(
874 &owned_value_path!(LogstashConfig::NAME, "tls_client_metadata"),
875 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
876 None,
877 );
878
879 assert_eq!(definitions, Some(expected_definition))
880 }
881
882 #[test]
883 fn output_schema_definition_legacy_namespace() {
884 let config = LogstashConfig::default();
885
886 let definitions = config
887 .outputs(LogNamespace::Legacy)
888 .remove(0)
889 .schema_definition(true);
890
891 let expected_definition = Definition::new_with_default_metadata(
892 Kind::object(Collection::empty()),
893 [LogNamespace::Legacy],
894 )
895 .with_event_field(
896 &owned_value_path!("message"),
897 Kind::bytes(),
898 Some("message"),
899 )
900 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
901 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
902 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"));
903
904 assert_eq!(definitions, Some(expected_definition))
905 }
906}
907
908#[cfg(all(test, feature = "logstash-integration-tests"))]
909mod integration_tests {
910 use std::time::Duration;
911
912 use futures::Stream;
913 use tokio::time::timeout;
914
915 use super::*;
916 use crate::{
917 SourceSender,
918 config::SourceContext,
919 event::EventStatus,
920 test_util::{
921 collect_n,
922 components::{SOCKET_PUSH_SOURCE_TAGS, assert_source_compliance},
923 wait_for_tcp,
924 },
925 tls::{TlsConfig, TlsEnableableConfig},
926 };
927
928 fn heartbeat_address() -> String {
929 std::env::var("HEARTBEAT_ADDRESS")
930 .expect("Address of Beats Heartbeat service must be specified.")
931 }
932
933 #[tokio::test]
934 async fn beats_heartbeat() {
935 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
936 let out = source(heartbeat_address(), None).await;
937
938 timeout(Duration::from_secs(60), collect_n(out, 1))
939 .await
940 .unwrap()
941 })
942 .await;
943
944 assert!(!events.is_empty());
945
946 let log = events[0].as_log();
947 assert_eq!(
948 log.get("@metadata.beat"),
949 Some(String::from("heartbeat").into()).as_ref()
950 );
951 assert_eq!(log.get("summary.up"), Some(1.into()).as_ref());
952 assert!(log.get("timestamp").is_some());
953 assert!(log.get("host").is_some());
954 }
955
956 fn logstash_address() -> String {
957 std::env::var("LOGSTASH_ADDRESS")
958 .expect("Listen address for `logstash` source must be specified.")
959 }
960
961 #[tokio::test]
962 async fn logstash() {
963 let events = assert_source_compliance(&SOCKET_PUSH_SOURCE_TAGS, async {
964 let out = source(
965 logstash_address(),
966 Some(TlsEnableableConfig {
967 enabled: Some(true),
968 options: TlsConfig {
969 crt_file: Some("tests/data/host.docker.internal.crt".into()),
970 key_file: Some("tests/data/host.docker.internal.key".into()),
971 ..Default::default()
972 },
973 }),
974 )
975 .await;
976
977 timeout(Duration::from_secs(60), collect_n(out, 1))
978 .await
979 .unwrap()
980 })
981 .await;
982
983 assert!(!events.is_empty());
984
985 let log = events[0].as_log();
986 assert!(
987 log.get("line")
988 .unwrap()
989 .to_string_lossy()
990 .contains("Hello World")
991 );
992 assert!(log.get("host").is_some());
993 }
994
995 async fn source(
996 address: String,
997 tls: Option<TlsEnableableConfig>,
998 ) -> impl Stream<Item = Event> + Unpin {
999 let (sender, recv) = SourceSender::new_test_finalize(EventStatus::Delivered);
1000 let address: SocketAddr = address.parse().unwrap();
1001 let tls_config = TlsSourceConfig {
1002 client_metadata_key: None,
1003 tls_config: tls.unwrap_or_default(),
1004 };
1005 tokio::spawn(async move {
1006 LogstashConfig {
1007 address: address.into(),
1008 tls: Some(tls_config),
1009 keepalive: None,
1010 permit_origin: None,
1011 receive_buffer_bytes: None,
1012 acknowledgements: false.into(),
1013 connection_limit: None,
1014 log_namespace: None,
1015 }
1016 .build(SourceContext::new_test(sender, None))
1017 .await
1018 .unwrap()
1019 .await
1020 .unwrap()
1021 });
1022 wait_for_tcp(address).await;
1023 recv
1024 }
1025}